14static const char *
const TAG =
"mqtt.idf";
46 mqtt_cfg_.broker.address.transport = MQTT_TRANSPORT_OVER_SSL;
53 mqtt_cfg_.broker.address.transport = MQTT_TRANSPORT_OVER_TCP;
56 auto *mqtt_client = esp_mqtt_client_init(&
mqtt_cfg_);
61#if defined(USE_MQTT_IDF_ENQUEUE)
67 ESP_LOGE(TAG,
"Failed to create MQTT task");
78 ESP_LOGE(TAG,
"Failed to init client");
92#if defined(USE_MQTT_IDF_ENQUEUE)
104 uint16_t dropped = this->
mqtt_queue_.get_and_reset_dropped_count();
114 ESP_LOGV(TAG,
"Event dispatched from event loop event_id=%d", event.event_id);
115 switch (event.event_id) {
116 case MQTT_EVENT_BEFORE_CONNECT:
117 ESP_LOGV(TAG,
"MQTT_EVENT_BEFORE_CONNECT");
120 case MQTT_EVENT_CONNECTED:
121 ESP_LOGV(TAG,
"MQTT_EVENT_CONNECTED");
123#if defined(USE_MQTT_IDF_ENQUEUE)
129 case MQTT_EVENT_DISCONNECTED:
130 ESP_LOGV(TAG,
"MQTT_EVENT_DISCONNECTED");
133#if defined(USE_MQTT_IDF_ENQUEUE)
140 case MQTT_EVENT_SUBSCRIBED:
141 ESP_LOGV(TAG,
"MQTT_EVENT_SUBSCRIBED, msg_id=%d", event.msg_id);
145 case MQTT_EVENT_UNSUBSCRIBED:
146 ESP_LOGV(TAG,
"MQTT_EVENT_UNSUBSCRIBED, msg_id=%d", event.msg_id);
149 case MQTT_EVENT_PUBLISHED:
150 ESP_LOGV(TAG,
"MQTT_EVENT_PUBLISHED, msg_id=%d", event.msg_id);
153 case MQTT_EVENT_DATA: {
154 static std::string topic;
155 if (!event.topic.empty()) {
162 ESP_LOGV(TAG,
"MQTT_EVENT_DATA %s", topic.c_str());
163 this->
on_message_.call(topic.c_str(), event.data.data(), event.data.size(), event.current_data_offset,
164 event.total_data_len);
166 case MQTT_EVENT_ERROR:
167 ESP_LOGE(TAG,
"MQTT_EVENT_ERROR");
168 if (event.error_handle.error_type == MQTT_ERROR_TYPE_TCP_TRANSPORT) {
169 ESP_LOGE(TAG,
"Last error code reported from esp-tls: 0x%x", event.error_handle.esp_tls_last_esp_err);
170 ESP_LOGE(TAG,
"Last tls stack error number: 0x%x", event.error_handle.esp_tls_stack_err);
171 ESP_LOGE(TAG,
"Last captured errno : %d (%s)", event.error_handle.esp_transport_sock_errno,
172 strerror(event.error_handle.esp_transport_sock_errno));
173 }
else if (event.error_handle.error_type == MQTT_ERROR_TYPE_CONNECTION_REFUSED) {
174 ESP_LOGE(TAG,
"Connection refused error: 0x%x", event.error_handle.connect_return_code);
176 ESP_LOGE(TAG,
"Unknown error type: 0x%x", event.error_handle.error_type);
180 ESP_LOGV(TAG,
"Other event id:%d", event.event_id);
191 auto event = *
static_cast<esp_mqtt_event_t *
>(event_data);
196#if defined(USE_MQTT_IDF_ENQUEUE)
202 ulTaskNotifyTake(pdTRUE, portMAX_DELAY);
206 while ((elem = this_mqtt->
mqtt_queue_.pop()) !=
nullptr) {
208 switch (elem->
type) {
210 esp_mqtt_client_subscribe(this_mqtt->
handler_.get(), elem->
topic, elem->
qos);
214 esp_mqtt_client_unsubscribe(this_mqtt->
handler_.get(), elem->
topic);
223 ESP_LOGE(TAG,
"Invalid operation type from MQTT queue");
233 while ((elem = this_mqtt->
mqtt_queue_.pop()) !=
nullptr) {
239 vTaskDelete(
nullptr);
259 elem->retain = retain;
262 if (!elem->set_data(topic, payload,
len)) {
uint32_t IRAM_ATTR HOT get_loop_component_start_time() const
Get the cached time in milliseconds from when the current component started its loop execution.
CallbackManager< on_connect_callback_t > on_connect_
CallbackManager< on_disconnect_callback_t > on_disconnect_
static const size_t TASK_STACK_SIZE_TLS
std::queue< Event > mqtt_events_
CallbackManager< on_message_callback_t > on_message_
static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data)
static - Dispatch event to instance method
optional< std::string > ca_certificate_
static const size_t TASK_STACK_SIZE
CallbackManager< on_subscribe_callback_t > on_subscribe_
static void esphome_mqtt_task(void *params)
bool enqueue_(MqttQueueTypeT type, const char *topic, int qos=0, bool retain=false, const char *payload=NULL, size_t len=0)
EventPool< struct QueueElement, MQTT_QUEUE_LENGTH > mqtt_event_pool_
CallbackManager< on_unsubscribe_callback_t > on_unsubscribe_
static const ssize_t TASK_PRIORITY
esp_mqtt_client_config_t mqtt_cfg_
TaskHandle_t task_handle_
CallbackManager< on_publish_user_callback_t > on_publish_
static constexpr uint32_t DROP_LOG_INTERVAL_MS
NotifyingLockFreeQueue< struct QueueElement, MQTT_QUEUE_LENGTH > mqtt_queue_
uint32_t last_dropped_log_time_
optional< std::string > cl_key_
void mqtt_event_handler_(const Event &event)
optional< std::string > cl_certificate_
value_type const & value() const
@ MQTT_QUEUE_TYPE_SUBSCRIBE
@ MQTT_QUEUE_TYPE_UNSUBSCRIBE
@ MQTT_QUEUE_TYPE_PUBLISH
Providing packet encoding functions for exchanging data with a remote host.
Application App
Global storage of Application pointer - only one Application can exist.