ESPHome 2026.3.0-dev
Loading...
Searching...
No Matches
mqtt_backend_esp32.cpp
Go to the documentation of this file.
2
3#ifdef USE_MQTT
4#ifdef USE_ESP32
5
6#include <string>
8#include "esphome/core/log.h"
10
11namespace esphome::mqtt {
12
13static const char *const TAG = "mqtt.idf";
14
16 mqtt_cfg_.broker.address.hostname = this->host_.c_str();
17 mqtt_cfg_.broker.address.port = this->port_;
18 mqtt_cfg_.session.keepalive = this->keep_alive_;
19 mqtt_cfg_.session.disable_clean_session = !this->clean_session_;
20
21 if (!this->username_.empty()) {
22 mqtt_cfg_.credentials.username = this->username_.c_str();
23 if (!this->password_.empty()) {
24 mqtt_cfg_.credentials.authentication.password = this->password_.c_str();
25 }
26 }
27
28 if (!this->lwt_topic_.empty()) {
29 mqtt_cfg_.session.last_will.topic = this->lwt_topic_.c_str();
30 this->mqtt_cfg_.session.last_will.qos = this->lwt_qos_;
31 this->mqtt_cfg_.session.last_will.retain = this->lwt_retain_;
32
33 if (!this->lwt_message_.empty()) {
34 mqtt_cfg_.session.last_will.msg = this->lwt_message_.c_str();
35 mqtt_cfg_.session.last_will.msg_len = this->lwt_message_.size();
36 }
37 }
38
39 if (!this->client_id_.empty()) {
40 mqtt_cfg_.credentials.client_id = this->client_id_.c_str();
41 }
43 mqtt_cfg_.broker.verification.certificate = ca_certificate_.value().c_str();
44 mqtt_cfg_.broker.verification.skip_cert_common_name_check = skip_cert_cn_check_;
45 mqtt_cfg_.broker.address.transport = MQTT_TRANSPORT_OVER_SSL;
46
47 if (this->cl_certificate_.has_value() && this->cl_key_.has_value()) {
48 mqtt_cfg_.credentials.authentication.certificate = this->cl_certificate_.value().c_str();
49 mqtt_cfg_.credentials.authentication.key = this->cl_key_.value().c_str();
50 }
51 } else {
52 mqtt_cfg_.broker.address.transport = MQTT_TRANSPORT_OVER_TCP;
53 }
54
55 auto *mqtt_client = esp_mqtt_client_init(&mqtt_cfg_);
56 if (mqtt_client) {
57 handler_.reset(mqtt_client);
58 is_initalized_ = true;
59 esp_mqtt_client_register_event(mqtt_client, MQTT_EVENT_ANY, mqtt_event_handler, this);
60#if defined(USE_MQTT_IDF_ENQUEUE)
61 // Create the task only after MQTT client is initialized successfully
62 // Use larger stack size when TLS is enabled
63 size_t stack_size = this->ca_certificate_.has_value() ? TASK_STACK_SIZE_TLS : TASK_STACK_SIZE;
64 xTaskCreate(esphome_mqtt_task, "esphome_mqtt", stack_size, (void *) this, TASK_PRIORITY, &this->task_handle_);
65 if (this->task_handle_ == nullptr) {
66 ESP_LOGE(TAG, "Failed to create MQTT task");
67 // Clean up MQTT client since we can't start the async task
68 handler_.reset();
69 is_initalized_ = false;
70 return false;
71 }
72 // Set the task handle so the queue can notify it
73 this->mqtt_queue_.set_task_to_notify(this->task_handle_);
74#endif
75 return true;
76 } else {
77 ESP_LOGE(TAG, "Failed to init client");
78 return false;
79 }
80}
81
83 // process new events
84 // handle only 1 message per loop iteration
85 if (!mqtt_events_.empty()) {
86 auto &event = mqtt_events_.front();
88 mqtt_events_.pop();
89 }
90
91#if defined(USE_MQTT_IDF_ENQUEUE)
92 // Periodically log dropped messages to avoid blocking during spikes.
93 // During high load, many messages can be dropped in quick succession.
94 // Logging each drop immediately would flood the logs and potentially
95 // cause more drops if MQTT logging is enabled (cascade effect).
96 // Instead, we accumulate the count and log a summary periodically.
97 // IMPORTANT: Don't move this to the scheduler - if drops are due to memory
98 // pressure, the scheduler's heap allocations would make things worse.
100 // Handle rollover: (now - last_time) works correctly with unsigned arithmetic
101 // even when now < last_time due to rollover
102 if ((now - this->last_dropped_log_time_) >= DROP_LOG_INTERVAL_MS) {
103 uint16_t dropped = this->mqtt_queue_.get_and_reset_dropped_count();
104 if (dropped > 0) {
105 ESP_LOGW(TAG, "Dropped %u messages (%us)", dropped, DROP_LOG_INTERVAL_MS / 1000);
106 }
107 this->last_dropped_log_time_ = now;
108 }
109#endif
110}
111
113 ESP_LOGV(TAG, "Event dispatched from event loop event_id=%d", event.event_id);
114 switch (event.event_id) {
115 case MQTT_EVENT_BEFORE_CONNECT:
116 ESP_LOGV(TAG, "MQTT_EVENT_BEFORE_CONNECT");
117 break;
118
119 case MQTT_EVENT_CONNECTED:
120 ESP_LOGV(TAG, "MQTT_EVENT_CONNECTED");
121 this->is_connected_ = true;
122#if defined(USE_MQTT_IDF_ENQUEUE)
123 this->last_dropped_log_time_ = 0;
124 xTaskNotifyGive(this->task_handle_);
125#endif
126 this->on_connect_.call(event.session_present);
127 break;
128 case MQTT_EVENT_DISCONNECTED:
129 ESP_LOGV(TAG, "MQTT_EVENT_DISCONNECTED");
130 // TODO is there a way to get the disconnect reason?
131 this->is_connected_ = false;
132#if defined(USE_MQTT_IDF_ENQUEUE)
133 this->last_dropped_log_time_ = 0;
134 xTaskNotifyGive(this->task_handle_);
135#endif
137 break;
138
139 case MQTT_EVENT_SUBSCRIBED:
140 ESP_LOGV(TAG, "MQTT_EVENT_SUBSCRIBED, msg_id=%d", event.msg_id);
141 // hardcode QoS to 0. QoS is not used in this context but required to mirror the AsyncMqtt interface
142 this->on_subscribe_.call((int) event.msg_id, 0);
143 break;
144 case MQTT_EVENT_UNSUBSCRIBED:
145 ESP_LOGV(TAG, "MQTT_EVENT_UNSUBSCRIBED, msg_id=%d", event.msg_id);
146 this->on_unsubscribe_.call((int) event.msg_id);
147 break;
148 case MQTT_EVENT_PUBLISHED:
149 ESP_LOGV(TAG, "MQTT_EVENT_PUBLISHED, msg_id=%d", event.msg_id);
150 this->on_publish_.call((int) event.msg_id);
151 break;
152 case MQTT_EVENT_DATA: {
153 static std::string topic;
154 if (!event.topic.empty()) {
155 // When a single message arrives as multiple chunks, the topic will be empty
156 // on any but the first message, leading to event.topic being an empty string.
157 // To ensure handlers get the correct topic, cache the last seen topic to
158 // simulate always receiving the topic from underlying library
159 topic = event.topic;
160 }
161 ESP_LOGV(TAG, "MQTT_EVENT_DATA %s", topic.c_str());
162 this->on_message_.call(topic.c_str(), event.data.data(), event.data.size(), event.current_data_offset,
163 event.total_data_len);
164 } break;
165 case MQTT_EVENT_ERROR:
166 ESP_LOGE(TAG, "MQTT_EVENT_ERROR");
167 if (event.error_handle.error_type == MQTT_ERROR_TYPE_TCP_TRANSPORT) {
168 ESP_LOGE(TAG, "Last esp-tls error: 0x%x, tls stack error: 0x%x, socket errno: %d (%s)",
169 event.error_handle.esp_tls_last_esp_err, event.error_handle.esp_tls_stack_err,
170 event.error_handle.esp_transport_sock_errno, strerror(event.error_handle.esp_transport_sock_errno));
171 } else if (event.error_handle.error_type == MQTT_ERROR_TYPE_CONNECTION_REFUSED) {
172 ESP_LOGE(TAG, "Connection refused error: 0x%x", event.error_handle.connect_return_code);
173 } else {
174 ESP_LOGE(TAG, "Unknown error type: 0x%x", event.error_handle.error_type);
175 }
176 break;
177 default:
178 ESP_LOGV(TAG, "Other event id:%d", event.event_id);
179 break;
180 }
181}
182
184void MQTTBackendESP32::mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id,
185 void *event_data) {
186 MQTTBackendESP32 *instance = static_cast<MQTTBackendESP32 *>(handler_args);
187 // queue event to decouple processing
188 if (instance) {
189 auto event = *static_cast<esp_mqtt_event_t *>(event_data);
190 instance->mqtt_events_.emplace(event);
191
192 // Wake main loop immediately to process MQTT event instead of waiting for select() timeout
193#if defined(USE_SOCKET_SELECT_SUPPORT) && defined(USE_WAKE_LOOP_THREADSAFE)
195#endif
196 }
197}
198
199#if defined(USE_MQTT_IDF_ENQUEUE)
201 MQTTBackendESP32 *this_mqtt = (MQTTBackendESP32 *) params;
202
203 while (true) {
204 // Wait for notification indefinitely
205 ulTaskNotifyTake(pdTRUE, portMAX_DELAY);
206
207 // Process all queued items
208 struct QueueElement *elem;
209 while ((elem = this_mqtt->mqtt_queue_.pop()) != nullptr) {
210 if (this_mqtt->is_connected_) {
211 switch (elem->type) {
213 esp_mqtt_client_subscribe(this_mqtt->handler_.get(), elem->topic, elem->qos);
214 break;
215
217 esp_mqtt_client_unsubscribe(this_mqtt->handler_.get(), elem->topic);
218 break;
219
221 esp_mqtt_client_publish(this_mqtt->handler_.get(), elem->topic, elem->payload, elem->payload_len, elem->qos,
222 elem->retain);
223 break;
224
225 default:
226 ESP_LOGE(TAG, "Invalid operation type from MQTT queue");
227 break;
228 }
229 }
230 this_mqtt->mqtt_event_pool_.release(elem);
231 }
232 }
233}
234
235bool MQTTBackendESP32::enqueue_(MqttQueueTypeT type, const char *topic, int qos, bool retain, const char *payload,
236 size_t len) {
237 auto *elem = this->mqtt_event_pool_.allocate();
238
239 if (!elem) {
240 // Queue is full - increment counter but don't log immediately.
241 // Logging here can cause a cascade effect: if MQTT logging is enabled,
242 // each dropped message would generate a log message, which could itself
243 // be sent via MQTT, causing more drops and more logs in a feedback loop
244 // that eventually triggers a watchdog reset. Instead, we log periodically
245 // in loop() to prevent blocking the event loop during spikes.
246 this->mqtt_queue_.increment_dropped_count();
247 return false;
248 }
249
250 elem->type = type;
251 elem->qos = qos;
252 elem->retain = retain;
253
254 // Use the helper to allocate and copy data
255 if (!elem->set_data(topic, payload, len)) {
256 // Allocation failed, return elem to pool
257 this->mqtt_event_pool_.release(elem);
258 // Increment counter without logging to avoid cascade effect during memory pressure
259 this->mqtt_queue_.increment_dropped_count();
260 return false;
261 }
262
263 // Push to queue - always succeeds since we allocated from the pool
264 this->mqtt_queue_.push(elem);
265 return true;
266}
267#endif // USE_MQTT_IDF_ENQUEUE
268
269} // namespace esphome::mqtt
270#endif // USE_ESP32
271#endif
void wake_loop_threadsafe()
Wake the main event loop from another FreeRTOS task.
uint32_t IRAM_ATTR HOT get_loop_component_start_time() const
Get the cached time in milliseconds from when the current component started its loop execution.
CallbackManager< on_connect_callback_t > on_connect_
CallbackManager< on_disconnect_callback_t > on_disconnect_
CallbackManager< on_message_callback_t > on_message_
static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data)
static - Dispatch event to instance method
optional< std::string > ca_certificate_
CallbackManager< on_subscribe_callback_t > on_subscribe_
static void esphome_mqtt_task(void *params)
bool enqueue_(MqttQueueTypeT type, const char *topic, int qos=0, bool retain=false, const char *payload=NULL, size_t len=0)
EventPool< struct QueueElement, MQTT_QUEUE_LENGTH > mqtt_event_pool_
static constexpr size_t TASK_STACK_SIZE_TLS
CallbackManager< on_unsubscribe_callback_t > on_unsubscribe_
esp_mqtt_client_config_t mqtt_cfg_
static constexpr size_t TASK_STACK_SIZE
CallbackManager< on_publish_user_callback_t > on_publish_
static constexpr uint32_t DROP_LOG_INTERVAL_MS
NotifyingLockFreeQueue< struct QueueElement, MQTT_QUEUE_LENGTH > mqtt_queue_
void mqtt_event_handler_(const Event &event)
optional< std::string > cl_certificate_
static constexpr ssize_t TASK_PRIORITY
bool has_value() const
Definition optional.h:92
value_type const & value() const
Definition optional.h:94
uint16_t type
std::string size_t len
Definition helpers.h:817
Application App
Global storage of Application pointer - only one Application can exist.
uint8_t event_id
Definition tt21100.cpp:3