ESPHome 2025.9.0-dev
Loading...
Searching...
No Matches
mqtt_backend_esp32.cpp
Go to the documentation of this file.
2
3#ifdef USE_MQTT
4#ifdef USE_ESP32
5
6#include <string>
8#include "esphome/core/log.h"
10
11namespace esphome {
12namespace mqtt {
13
14static const char *const TAG = "mqtt.idf";
15
17 mqtt_cfg_.broker.address.hostname = this->host_.c_str();
18 mqtt_cfg_.broker.address.port = this->port_;
19 mqtt_cfg_.session.keepalive = this->keep_alive_;
20 mqtt_cfg_.session.disable_clean_session = !this->clean_session_;
21
22 if (!this->username_.empty()) {
23 mqtt_cfg_.credentials.username = this->username_.c_str();
24 if (!this->password_.empty()) {
25 mqtt_cfg_.credentials.authentication.password = this->password_.c_str();
26 }
27 }
28
29 if (!this->lwt_topic_.empty()) {
30 mqtt_cfg_.session.last_will.topic = this->lwt_topic_.c_str();
31 this->mqtt_cfg_.session.last_will.qos = this->lwt_qos_;
32 this->mqtt_cfg_.session.last_will.retain = this->lwt_retain_;
33
34 if (!this->lwt_message_.empty()) {
35 mqtt_cfg_.session.last_will.msg = this->lwt_message_.c_str();
36 mqtt_cfg_.session.last_will.msg_len = this->lwt_message_.size();
37 }
38 }
39
40 if (!this->client_id_.empty()) {
41 mqtt_cfg_.credentials.client_id = this->client_id_.c_str();
42 }
44 mqtt_cfg_.broker.verification.certificate = ca_certificate_.value().c_str();
45 mqtt_cfg_.broker.verification.skip_cert_common_name_check = skip_cert_cn_check_;
46 mqtt_cfg_.broker.address.transport = MQTT_TRANSPORT_OVER_SSL;
47
48 if (this->cl_certificate_.has_value() && this->cl_key_.has_value()) {
49 mqtt_cfg_.credentials.authentication.certificate = this->cl_certificate_.value().c_str();
50 mqtt_cfg_.credentials.authentication.key = this->cl_key_.value().c_str();
51 }
52 } else {
53 mqtt_cfg_.broker.address.transport = MQTT_TRANSPORT_OVER_TCP;
54 }
55
56 auto *mqtt_client = esp_mqtt_client_init(&mqtt_cfg_);
57 if (mqtt_client) {
58 handler_.reset(mqtt_client);
59 is_initalized_ = true;
60 esp_mqtt_client_register_event(mqtt_client, MQTT_EVENT_ANY, mqtt_event_handler, this);
61#if defined(USE_MQTT_IDF_ENQUEUE)
62 // Create the task only after MQTT client is initialized successfully
63 // Use larger stack size when TLS is enabled
64 size_t stack_size = this->ca_certificate_.has_value() ? TASK_STACK_SIZE_TLS : TASK_STACK_SIZE;
65 xTaskCreate(esphome_mqtt_task, "esphome_mqtt", stack_size, (void *) this, TASK_PRIORITY, &this->task_handle_);
66 if (this->task_handle_ == nullptr) {
67 ESP_LOGE(TAG, "Failed to create MQTT task");
68 // Clean up MQTT client since we can't start the async task
69 handler_.reset();
70 is_initalized_ = false;
71 return false;
72 }
73 // Set the task handle so the queue can notify it
74 this->mqtt_queue_.set_task_to_notify(this->task_handle_);
75#endif
76 return true;
77 } else {
78 ESP_LOGE(TAG, "Failed to init client");
79 return false;
80 }
81}
82
84 // process new events
85 // handle only 1 message per loop iteration
86 if (!mqtt_events_.empty()) {
87 auto &event = mqtt_events_.front();
89 mqtt_events_.pop();
90 }
91
92#if defined(USE_MQTT_IDF_ENQUEUE)
93 // Periodically log dropped messages to avoid blocking during spikes.
94 // During high load, many messages can be dropped in quick succession.
95 // Logging each drop immediately would flood the logs and potentially
96 // cause more drops if MQTT logging is enabled (cascade effect).
97 // Instead, we accumulate the count and log a summary periodically.
98 // IMPORTANT: Don't move this to the scheduler - if drops are due to memory
99 // pressure, the scheduler's heap allocations would make things worse.
101 // Handle rollover: (now - last_time) works correctly with unsigned arithmetic
102 // even when now < last_time due to rollover
103 if ((now - this->last_dropped_log_time_) >= DROP_LOG_INTERVAL_MS) {
104 uint16_t dropped = this->mqtt_queue_.get_and_reset_dropped_count();
105 if (dropped > 0) {
106 ESP_LOGW(TAG, "Dropped %u messages (%us)", dropped, DROP_LOG_INTERVAL_MS / 1000);
107 }
108 this->last_dropped_log_time_ = now;
109 }
110#endif
111}
112
114 ESP_LOGV(TAG, "Event dispatched from event loop event_id=%d", event.event_id);
115 switch (event.event_id) {
116 case MQTT_EVENT_BEFORE_CONNECT:
117 ESP_LOGV(TAG, "MQTT_EVENT_BEFORE_CONNECT");
118 break;
119
120 case MQTT_EVENT_CONNECTED:
121 ESP_LOGV(TAG, "MQTT_EVENT_CONNECTED");
122 this->is_connected_ = true;
123#if defined(USE_MQTT_IDF_ENQUEUE)
124 this->last_dropped_log_time_ = 0;
125 xTaskNotifyGive(this->task_handle_);
126#endif
127 this->on_connect_.call(event.session_present);
128 break;
129 case MQTT_EVENT_DISCONNECTED:
130 ESP_LOGV(TAG, "MQTT_EVENT_DISCONNECTED");
131 // TODO is there a way to get the disconnect reason?
132 this->is_connected_ = false;
133#if defined(USE_MQTT_IDF_ENQUEUE)
134 this->last_dropped_log_time_ = 0;
135 xTaskNotifyGive(this->task_handle_);
136#endif
138 break;
139
140 case MQTT_EVENT_SUBSCRIBED:
141 ESP_LOGV(TAG, "MQTT_EVENT_SUBSCRIBED, msg_id=%d", event.msg_id);
142 // hardcode QoS to 0. QoS is not used in this context but required to mirror the AsyncMqtt interface
143 this->on_subscribe_.call((int) event.msg_id, 0);
144 break;
145 case MQTT_EVENT_UNSUBSCRIBED:
146 ESP_LOGV(TAG, "MQTT_EVENT_UNSUBSCRIBED, msg_id=%d", event.msg_id);
147 this->on_unsubscribe_.call((int) event.msg_id);
148 break;
149 case MQTT_EVENT_PUBLISHED:
150 ESP_LOGV(TAG, "MQTT_EVENT_PUBLISHED, msg_id=%d", event.msg_id);
151 this->on_publish_.call((int) event.msg_id);
152 break;
153 case MQTT_EVENT_DATA: {
154 static std::string topic;
155 if (!event.topic.empty()) {
156 // When a single message arrives as multiple chunks, the topic will be empty
157 // on any but the first message, leading to event.topic being an empty string.
158 // To ensure handlers get the correct topic, cache the last seen topic to
159 // simulate always receiving the topic from underlying library
160 topic = event.topic;
161 }
162 ESP_LOGV(TAG, "MQTT_EVENT_DATA %s", topic.c_str());
163 this->on_message_.call(topic.c_str(), event.data.data(), event.data.size(), event.current_data_offset,
164 event.total_data_len);
165 } break;
166 case MQTT_EVENT_ERROR:
167 ESP_LOGE(TAG, "MQTT_EVENT_ERROR");
168 if (event.error_handle.error_type == MQTT_ERROR_TYPE_TCP_TRANSPORT) {
169 ESP_LOGE(TAG, "Last error code reported from esp-tls: 0x%x", event.error_handle.esp_tls_last_esp_err);
170 ESP_LOGE(TAG, "Last tls stack error number: 0x%x", event.error_handle.esp_tls_stack_err);
171 ESP_LOGE(TAG, "Last captured errno : %d (%s)", event.error_handle.esp_transport_sock_errno,
172 strerror(event.error_handle.esp_transport_sock_errno));
173 } else if (event.error_handle.error_type == MQTT_ERROR_TYPE_CONNECTION_REFUSED) {
174 ESP_LOGE(TAG, "Connection refused error: 0x%x", event.error_handle.connect_return_code);
175 } else {
176 ESP_LOGE(TAG, "Unknown error type: 0x%x", event.error_handle.error_type);
177 }
178 break;
179 default:
180 ESP_LOGV(TAG, "Other event id:%d", event.event_id);
181 break;
182 }
183}
184
186void MQTTBackendESP32::mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id,
187 void *event_data) {
188 MQTTBackendESP32 *instance = static_cast<MQTTBackendESP32 *>(handler_args);
189 // queue event to decouple processing
190 if (instance) {
191 auto event = *static_cast<esp_mqtt_event_t *>(event_data);
192 instance->mqtt_events_.emplace(event);
193 }
194}
195
196#if defined(USE_MQTT_IDF_ENQUEUE)
198 MQTTBackendESP32 *this_mqtt = (MQTTBackendESP32 *) params;
199
200 while (true) {
201 // Wait for notification indefinitely
202 ulTaskNotifyTake(pdTRUE, portMAX_DELAY);
203
204 // Process all queued items
205 struct QueueElement *elem;
206 while ((elem = this_mqtt->mqtt_queue_.pop()) != nullptr) {
207 if (this_mqtt->is_connected_) {
208 switch (elem->type) {
210 esp_mqtt_client_subscribe(this_mqtt->handler_.get(), elem->topic, elem->qos);
211 break;
212
214 esp_mqtt_client_unsubscribe(this_mqtt->handler_.get(), elem->topic);
215 break;
216
218 esp_mqtt_client_publish(this_mqtt->handler_.get(), elem->topic, elem->payload, elem->payload_len, elem->qos,
219 elem->retain);
220 break;
221
222 default:
223 ESP_LOGE(TAG, "Invalid operation type from MQTT queue");
224 break;
225 }
226 }
227 this_mqtt->mqtt_event_pool_.release(elem);
228 }
229 }
230
231 // Clean up any remaining items in the queue
232 struct QueueElement *elem;
233 while ((elem = this_mqtt->mqtt_queue_.pop()) != nullptr) {
234 this_mqtt->mqtt_event_pool_.release(elem);
235 }
236
237 // Note: EventPool destructor will clean up the pool itself
238 // Task will delete itself
239 vTaskDelete(nullptr);
240}
241
242bool MQTTBackendESP32::enqueue_(MqttQueueTypeT type, const char *topic, int qos, bool retain, const char *payload,
243 size_t len) {
244 auto *elem = this->mqtt_event_pool_.allocate();
245
246 if (!elem) {
247 // Queue is full - increment counter but don't log immediately.
248 // Logging here can cause a cascade effect: if MQTT logging is enabled,
249 // each dropped message would generate a log message, which could itself
250 // be sent via MQTT, causing more drops and more logs in a feedback loop
251 // that eventually triggers a watchdog reset. Instead, we log periodically
252 // in loop() to prevent blocking the event loop during spikes.
253 this->mqtt_queue_.increment_dropped_count();
254 return false;
255 }
256
257 elem->type = type;
258 elem->qos = qos;
259 elem->retain = retain;
260
261 // Use the helper to allocate and copy data
262 if (!elem->set_data(topic, payload, len)) {
263 // Allocation failed, return elem to pool
264 this->mqtt_event_pool_.release(elem);
265 // Increment counter without logging to avoid cascade effect during memory pressure
266 this->mqtt_queue_.increment_dropped_count();
267 return false;
268 }
269
270 // Push to queue - always succeeds since we allocated from the pool
271 this->mqtt_queue_.push(elem);
272 return true;
273}
274#endif // USE_MQTT_IDF_ENQUEUE
275
276} // namespace mqtt
277} // namespace esphome
278#endif // USE_ESP32
279#endif
uint32_t IRAM_ATTR HOT get_loop_component_start_time() const
Get the cached time in milliseconds from when the current component started its loop execution.
CallbackManager< on_connect_callback_t > on_connect_
CallbackManager< on_disconnect_callback_t > on_disconnect_
CallbackManager< on_message_callback_t > on_message_
static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data)
static - Dispatch event to instance method
optional< std::string > ca_certificate_
CallbackManager< on_subscribe_callback_t > on_subscribe_
static void esphome_mqtt_task(void *params)
bool enqueue_(MqttQueueTypeT type, const char *topic, int qos=0, bool retain=false, const char *payload=NULL, size_t len=0)
EventPool< struct QueueElement, MQTT_QUEUE_LENGTH > mqtt_event_pool_
CallbackManager< on_unsubscribe_callback_t > on_unsubscribe_
esp_mqtt_client_config_t mqtt_cfg_
CallbackManager< on_publish_user_callback_t > on_publish_
static constexpr uint32_t DROP_LOG_INTERVAL_MS
NotifyingLockFreeQueue< struct QueueElement, MQTT_QUEUE_LENGTH > mqtt_queue_
void mqtt_event_handler_(const Event &event)
optional< std::string > cl_certificate_
bool has_value() const
Definition optional.h:92
value_type const & value() const
Definition optional.h:94
uint8_t type
Providing packet encoding functions for exchanging data with a remote host.
Definition a01nyub.cpp:7
std::string size_t len
Definition helpers.h:279
Application App
Global storage of Application pointer - only one Application can exist.
uint8_t event_id
Definition tt21100.cpp:3