ESPHome 2026.5.0-dev
Loading...
Searching...
No Matches
audio_pipeline.cpp
Go to the documentation of this file.
1#include "audio_pipeline.h"
2
3#ifdef USE_ESP32
4
6#include "esphome/core/hal.h"
8#include "esphome/core/log.h"
9
10namespace esphome {
11namespace speaker {
12
13static const uint32_t INITIAL_BUFFER_MS = 1000; // Start playback after buffering this duration of the file
14
15static const uint32_t READ_TASK_STACK_SIZE = 5 * 1024;
16// Opus decoding uses more stack than other codecs
17#ifdef USE_AUDIO_OPUS_SUPPORT
18static const uint32_t DECODE_TASK_STACK_SIZE = 5 * 1024;
19#else
20static const uint32_t DECODE_TASK_STACK_SIZE = 3 * 1024;
21#endif
22
23static const uint32_t INFO_ERROR_QUEUE_COUNT = 5;
24
25static const char *const TAG = "speaker_media_player.pipeline";
26
28 // MESSAGE_* bits are only set by their respective tasks
29
30 // Stops all activity in the pipeline elements; cleared by process_state() and set by stop() or by each task
32
33 // Read audio from an HTTP source; cleared by reader task and set by start_url
35 // Read audio from an audio file from the flash; cleared by reader task and set by start_file
37
38 // Audio file type is read after checking it is supported; cleared by decoder task
40 // Reader is done (either through a failure or just end of the stream); cleared by reader task
42 // Error reading the file; cleared by process_state()
44
45 // Decoder is done (either through a faiilure or the end of the stream); cleared by decoder task
47 // Error decoding the file; cleared by process_state() by decoder task
49};
50
51AudioPipeline::AudioPipeline(speaker::Speaker *speaker, size_t buffer_size, bool task_stack_in_psram,
52 std::string base_name, UBaseType_t priority)
53 : base_name_(std::move(base_name)),
54 priority_(priority),
55 task_stack_in_psram_(task_stack_in_psram),
56 speaker_(speaker),
57 buffer_size_(buffer_size) {
59 this->transfer_buffer_size_ = std::min(buffer_size_ / 4, DEFAULT_TRANSFER_BUFFER_SIZE);
60}
61
62void AudioPipeline::start_url(const std::string &uri) {
63 if (this->is_playing_) {
64 xEventGroupSetBits(this->event_group_, PIPELINE_COMMAND_STOP);
65 }
66 this->current_uri_ = uri;
67 this->pending_url_ = true;
68}
69
71 if (this->is_playing_) {
72 xEventGroupSetBits(this->event_group_, PIPELINE_COMMAND_STOP);
73 }
74 this->current_audio_file_ = audio_file;
75 this->pending_file_ = true;
76}
77
79 xEventGroupSetBits(this->event_group_, EventGroupBits::PIPELINE_COMMAND_STOP);
80
81 return ESP_OK;
82}
83void AudioPipeline::set_pause_state(bool pause_state) {
84 this->speaker_->set_pause_state(pause_state);
85
86 this->pause_state_ = pause_state;
87}
88
90 if (this->read_task_.is_created()) {
91 vTaskSuspend(this->read_task_.get_handle());
92 }
93 if (this->decode_task_.is_created()) {
94 vTaskSuspend(this->decode_task_.get_handle());
95 }
96}
97
99 if (this->read_task_.is_created()) {
100 vTaskResume(this->read_task_.get_handle());
101 }
102 if (this->decode_task_.is_created()) {
103 vTaskResume(this->decode_task_.get_handle());
104 }
105}
106
108 /*
109 * Log items from info error queue
110 */
111 InfoErrorEvent event;
112 if (this->info_error_queue_ != nullptr) {
113 while (xQueueReceive(this->info_error_queue_, &event, 0)) {
114 switch (event.source) {
116 if (event.err.has_value()) {
117 ESP_LOGE(TAG, "Media reader encountered an error: %s", esp_err_to_name(event.err.value()));
118 } else if (event.file_type.has_value()) {
119 ESP_LOGD(TAG, "Reading %s file type", audio_file_type_to_string(event.file_type.value()));
120 }
121
122 break;
124 if (event.err.has_value()) {
125 ESP_LOGE(TAG, "Decoder encountered an error: %s", esp_err_to_name(event.err.value()));
126 }
127
128 if (event.audio_stream_info.has_value()) {
129 ESP_LOGD(TAG, "Decoded audio has %d channels, %" PRId32 " Hz sample rate, and %d bits per sample",
130 event.audio_stream_info.value().get_channels(), event.audio_stream_info.value().get_sample_rate(),
131 event.audio_stream_info.value().get_bits_per_sample());
132 }
133
134 if (event.decoding_err.has_value()) {
135 switch (event.decoding_err.value()) {
137 ESP_LOGE(TAG, "Failed to parse the file's header.");
138 break;
140 ESP_LOGE(TAG, "Incompatible bits per sample. Only 16 bits per sample is supported");
141 break;
143 ESP_LOGE(TAG, "Incompatible number of channels. Only 1 or 2 channel audio is supported.");
144 break;
145 }
146 }
147 break;
148 }
149 }
150 }
151
152 /*
153 * Determine the current state based on the event group bits and tasks' status
154 */
155
156 EventBits_t event_bits = xEventGroupGetBits(this->event_group_);
157
158 if (this->pending_url_ || this->pending_file_) {
159 // Init command pending
160 if (!(event_bits & EventGroupBits::PIPELINE_COMMAND_STOP)) {
161 // Only start if there is no pending stop command
162 if (!this->read_task_.is_created() || !this->decode_task_.is_created()) {
163 // At least one task isn't running
164 this->start_tasks_();
165 }
166
167 if (this->pending_url_) {
169 this->playback_ms_ = 0;
170 this->pending_url_ = false;
171 } else if (this->pending_file_) {
173 this->playback_ms_ = 0;
174 this->pending_file_ = false;
175 }
176
177 this->is_playing_ = true;
179 }
180 }
181
182 if ((event_bits & EventGroupBits::READER_MESSAGE_ERROR)) {
183 xEventGroupClearBits(this->event_group_, EventGroupBits::READER_MESSAGE_ERROR);
185 }
186
187 if ((event_bits & EventGroupBits::DECODER_MESSAGE_ERROR)) {
188 xEventGroupClearBits(this->event_group_, EventGroupBits::DECODER_MESSAGE_ERROR);
190 }
191
192 if ((event_bits & EventGroupBits::READER_MESSAGE_FINISHED) &&
195 // Tasks are finished and there's no media in between the reader and decoder
196
197 if (event_bits & EventGroupBits::PIPELINE_COMMAND_STOP) {
198 // Stop command is fully processed, so clear the command bit
199 xEventGroupClearBits(this->event_group_, EventGroupBits::PIPELINE_COMMAND_STOP);
200 this->hard_stop_ = true;
201 }
202
203 if (!this->is_playing_) {
204 // The tasks have been stopped for two ``process_state`` calls in a row, so delete the tasks
205 if (this->read_task_.is_created() || this->decode_task_.is_created()) {
206 this->read_task_.deallocate();
207 this->decode_task_.deallocate();
208 if (this->hard_stop_) {
209 // Stop command was sent, so immediately end the playback
210 this->speaker_->stop();
211 this->hard_stop_ = false;
212 } else {
213 // Decoded all the audio, so let the speaker finish playing before stopping
214 this->speaker_->finish();
215 }
216 }
217 }
218 this->is_playing_ = false;
219 if (!this->speaker_->is_running()) {
221 } else {
222 this->is_finishing_ = true;
223 }
224 }
225
226 if (this->pause_state_) {
228 }
229
230 if (this->is_finishing_) {
231 if (!this->speaker_->is_running()) {
232 this->is_finishing_ = false;
233 } else {
235 }
236 }
237
238 if (!this->read_task_.is_created() && !this->decode_task_.is_created()) {
239 // No tasks are running, so the pipeline is stopped.
240 xEventGroupClearBits(this->event_group_, EventGroupBits::PIPELINE_COMMAND_STOP);
242 }
243
244 this->is_playing_ = true;
246}
247
249 if (this->event_group_ == nullptr)
250 this->event_group_ = xEventGroupCreate();
251
252 if (this->event_group_ == nullptr) {
253 return ESP_ERR_NO_MEM;
254 }
255
256 if (this->info_error_queue_ == nullptr)
257 this->info_error_queue_ = xQueueCreate(INFO_ERROR_QUEUE_COUNT, sizeof(InfoErrorEvent));
258
259 if (this->info_error_queue_ == nullptr)
260 return ESP_ERR_NO_MEM;
261
262 return ESP_OK;
263}
264
266 if (!this->read_task_.is_created()) {
267 // Reader task uses the AudioReader class which uses esp_http_client. This crashes on IDF 5.4 if the task stack is
268 // in PSRAM. As a workaround, always allocate the read task in internal memory.
269 if (!this->read_task_.create(read_task, (this->base_name_ + "_read").c_str(), READ_TASK_STACK_SIZE, (void *) this,
270 this->priority_, false)) {
271 return ESP_ERR_NO_MEM;
272 }
273 }
274
275 if (!this->decode_task_.is_created()) {
276 if (!this->decode_task_.create(decode_task, (this->base_name_ + "_decode").c_str(), DECODE_TASK_STACK_SIZE,
277 (void *) this, this->priority_, this->task_stack_in_psram_)) {
278 return ESP_ERR_NO_MEM;
279 }
280 }
281
282 return ESP_OK;
283}
284
285void AudioPipeline::read_task(void *params) {
286 AudioPipeline *this_pipeline = (AudioPipeline *) params;
287
288 while (true) {
289 xEventGroupSetBits(this_pipeline->event_group_, EventGroupBits::READER_MESSAGE_FINISHED);
290
291 // Wait until the pipeline notifies us the source of the media file
292 EventBits_t event_bits = xEventGroupWaitBits(
293 this_pipeline->event_group_,
295 pdFALSE, // Clear the bit on exit
296 pdFALSE, // Wait for all the bits,
297 portMAX_DELAY); // Block indefinitely until bit is set
298
299 if (!(event_bits & EventGroupBits::PIPELINE_COMMAND_STOP)) {
300 xEventGroupClearBits(this_pipeline->event_group_, EventGroupBits::READER_MESSAGE_FINISHED |
303 InfoErrorEvent event;
305 esp_err_t err = ESP_OK;
306
307 std::unique_ptr<audio::AudioReader> reader =
308 make_unique<audio::AudioReader>(this_pipeline->transfer_buffer_size_);
309
311 err = reader->start(this_pipeline->current_audio_file_, this_pipeline->current_audio_file_type_);
312 } else {
313 err = reader->start(this_pipeline->current_uri_, this_pipeline->current_audio_file_type_);
314 }
315
316 if (err == ESP_OK) {
317 size_t file_ring_buffer_size = this_pipeline->buffer_size_;
318
319 std::shared_ptr<RingBuffer> temp_ring_buffer;
320
321 if (!this_pipeline->raw_file_ring_buffer_.use_count()) {
322 temp_ring_buffer = RingBuffer::create(file_ring_buffer_size);
323 this_pipeline->raw_file_ring_buffer_ = temp_ring_buffer;
324 }
325
326 if (!this_pipeline->raw_file_ring_buffer_.use_count()) {
327 err = ESP_ERR_NO_MEM;
328 } else {
329 reader->add_sink(this_pipeline->raw_file_ring_buffer_);
330 }
331 }
332
333 if (err != ESP_OK) {
334 // Send specific error message
335 event.err = err;
336 xQueueSend(this_pipeline->info_error_queue_, &event, portMAX_DELAY);
337
338 // Setting up the reader failed, stop the pipeline
339 xEventGroupSetBits(this_pipeline->event_group_,
341 } else {
342 // Send the file type to the pipeline
343 event.file_type = this_pipeline->current_audio_file_type_;
344 xQueueSend(this_pipeline->info_error_queue_, &event, portMAX_DELAY);
345 xEventGroupSetBits(this_pipeline->event_group_, EventGroupBits::READER_MESSAGE_LOADED_MEDIA_TYPE);
346 }
347
348 while (true) {
349 event_bits = xEventGroupGetBits(this_pipeline->event_group_);
350
351 if (event_bits & EventGroupBits::PIPELINE_COMMAND_STOP) {
352 break;
353 }
354
355 audio::AudioReaderState reader_state = reader->read();
356
357 if (reader_state == audio::AudioReaderState::FINISHED) {
358 break;
359 } else if (reader_state == audio::AudioReaderState::FAILED) {
360 xEventGroupSetBits(this_pipeline->event_group_,
362 break;
363 }
364 }
365 event_bits = xEventGroupGetBits(this_pipeline->event_group_);
367 (this_pipeline->raw_file_ring_buffer_.use_count() == 1)) {
368 // Decoder task hasn't started yet, so delay a bit before releasing ownership of the ring buffer
369 delay(10);
370 }
371 }
372 }
373}
374
375void AudioPipeline::decode_task(void *params) {
376 AudioPipeline *this_pipeline = (AudioPipeline *) params;
377
378 while (true) {
379 xEventGroupSetBits(this_pipeline->event_group_, EventGroupBits::DECODER_MESSAGE_FINISHED);
380
381 // Wait until the reader notifies us that the media type is available
382 EventBits_t event_bits =
383 xEventGroupWaitBits(this_pipeline->event_group_,
385 pdFALSE, // Clear the bit on exit
386 pdFALSE, // Wait for all the bits,
387 portMAX_DELAY); // Block indefinitely until bit is set
388
389 xEventGroupClearBits(this_pipeline->event_group_,
391
392 if (!(event_bits & EventGroupBits::PIPELINE_COMMAND_STOP)) {
393 InfoErrorEvent event;
395
396 std::unique_ptr<audio::AudioDecoder> decoder =
397 make_unique<audio::AudioDecoder>(this_pipeline->transfer_buffer_size_, this_pipeline->transfer_buffer_size_);
398
399 esp_err_t err = decoder->start(this_pipeline->current_audio_file_type_);
400 decoder->add_source(this_pipeline->raw_file_ring_buffer_);
401
402 if (err != ESP_OK) {
403 // Send specific error message
404 event.err = err;
405 xQueueSend(this_pipeline->info_error_queue_, &event, portMAX_DELAY);
406
407 // Setting up the decoder failed, stop the pipeline
408 xEventGroupSetBits(this_pipeline->event_group_,
410 }
411
412 bool has_stream_info = false;
413 bool started_playback = false;
414
415 size_t initial_bytes_to_buffer = 0;
416
417 while (true) {
418 event_bits = xEventGroupGetBits(this_pipeline->event_group_);
419
420 if (event_bits & EventGroupBits::PIPELINE_COMMAND_STOP) {
421 break;
422 }
423
424 // Update pause state
425 if (!started_playback) {
426 if (!(event_bits & EventGroupBits::READER_MESSAGE_FINISHED)) {
427 decoder->set_pause_output_state(true);
428 } else {
429 started_playback = true;
430 }
431 } else {
432 decoder->set_pause_output_state(this_pipeline->pause_state_);
433 }
434
435 // Stop gracefully if the reader has finished
436 audio::AudioDecoderState decoder_state = decoder->decode(event_bits & EventGroupBits::READER_MESSAGE_FINISHED);
437
438 if ((decoder_state == audio::AudioDecoderState::DECODING) ||
439 (decoder_state == audio::AudioDecoderState::FINISHED)) {
440 this_pipeline->playback_ms_ = decoder->get_playback_ms();
441 }
442
443 if (decoder_state == audio::AudioDecoderState::FINISHED) {
444 break;
445 } else if (decoder_state == audio::AudioDecoderState::FAILED) {
446 if (!has_stream_info) {
447 event.decoding_err = DecodingError::FAILED_HEADER;
448 xQueueSend(this_pipeline->info_error_queue_, &event, portMAX_DELAY);
449 }
450 xEventGroupSetBits(this_pipeline->event_group_,
452 break;
453 }
454
455 if (!has_stream_info && decoder->get_audio_stream_info().has_value()) {
456 has_stream_info = true;
457
458 this_pipeline->current_audio_stream_info_ = decoder->get_audio_stream_info().value();
459
460 // Send the stream information to the pipeline
461 event.audio_stream_info = this_pipeline->current_audio_stream_info_;
462
463 if (this_pipeline->current_audio_stream_info_.get_bits_per_sample() != 16) {
464 // Error state, incompatible bits per sample
466 xEventGroupSetBits(this_pipeline->event_group_,
468 } else if ((this_pipeline->current_audio_stream_info_.get_channels() > 2)) {
469 // Error state, incompatible number of channels
470 event.decoding_err = DecodingError::INCOMPATIBLE_CHANNELS;
471 xEventGroupSetBits(this_pipeline->event_group_,
473 } else {
474 // Send audio directly to the speaker
475 this_pipeline->speaker_->set_audio_stream_info(this_pipeline->current_audio_stream_info_);
476 decoder->add_sink(this_pipeline->speaker_);
477 }
478
479 initial_bytes_to_buffer = std::min(this_pipeline->current_audio_stream_info_.ms_to_bytes(INITIAL_BUFFER_MS),
480 this_pipeline->buffer_size_ * 3 / 4);
481
482 switch (this_pipeline->current_audio_file_type_) {
483#ifdef USE_AUDIO_MP3_SUPPORT
485 initial_bytes_to_buffer /= 8; // Estimate the MP3 compression factor is 8
486 break;
487#endif
488#ifdef USE_AUDIO_FLAC_SUPPORT
490 initial_bytes_to_buffer /= 2; // Estimate the FLAC compression factor is 2
491 break;
492#endif
493#ifdef USE_AUDIO_OPUS_SUPPORT
495 initial_bytes_to_buffer /= 8; // Estimate the Opus compression factor is 8
496 break;
497#endif
498 default:
499 break;
500 }
501 xQueueSend(this_pipeline->info_error_queue_, &event, portMAX_DELAY);
502 }
503
504 if (!started_playback && has_stream_info) {
505 // Verify enough data is available before starting playback
506 std::shared_ptr<RingBuffer> temp_ring_buffer = this_pipeline->raw_file_ring_buffer_.lock();
507 if (temp_ring_buffer != nullptr && temp_ring_buffer->available() >= initial_bytes_to_buffer) {
508 started_playback = true;
509 }
510 }
511 }
512 }
513 }
514}
515
516} // namespace speaker
517} // namespace esphome
518
519#endif
static std::unique_ptr< RingBuffer > create(size_t len)
bool create(TaskFunction_t fn, const char *name, uint32_t stack_size, void *param, UBaseType_t priority, bool use_psram)
Allocate stack and create task.
bool is_created() const
Check if the task has been created and not yet destroyed.
Definition static_task.h:18
TaskHandle_t get_handle() const
Get the FreeRTOS task handle.
Definition static_task.h:21
void deallocate()
Delete the task (if running) and free the stack buffer.
size_t ms_to_bytes(uint32_t ms) const
Converts duration to bytes.
Definition audio.h:73
uint8_t get_bits_per_sample() const
Definition audio.h:28
uint8_t get_channels() const
Definition audio.h:29
static void read_task(void *params)
void suspend_tasks()
Suspends any running tasks.
void set_pause_state(bool pause_state)
std::weak_ptr< RingBuffer > raw_file_ring_buffer_
void start_url(const std::string &uri)
Starts an audio pipeline given a media url.
esp_err_t allocate_communications_()
Allocates the event group and info error queue.
esp_err_t start_tasks_()
Common start code for the pipeline, regardless if the source is a file or url.
audio::AudioStreamInfo current_audio_stream_info_
void start_file(audio::AudioFile *audio_file)
Starts an audio pipeline given a AudioFile pointer.
esp_err_t stop()
Stops the pipeline.
static void decode_task(void *params)
AudioPipeline(speaker::Speaker *speaker, size_t buffer_size, bool task_stack_in_psram, std::string base_name, UBaseType_t priority)
void resume_tasks()
Resumes any running tasks.
audio::AudioFile * current_audio_file_
audio::AudioFileType current_audio_file_type_
AudioPipelineState process_state()
Processes the state of the audio pipeline based on the info_error_queue_ and event_group_.
bool is_running() const
Definition speaker.h:66
virtual void set_pause_state(bool pause_state)
Definition speaker.h:61
void set_audio_stream_info(const audio::AudioStreamInfo &audio_stream_info)
Definition speaker.h:99
virtual void finish()
Definition speaker.h:58
virtual void stop()=0
uint8_t priority
Providing packet encoding functions for exchanging data with a remote host.
Definition a01nyub.cpp:7
void HOT delay(uint32_t ms)
Definition core.cpp:28
static void uint32_t
optional< DecodingError > decoding_err
optional< audio::AudioFileType > file_type
optional< audio::AudioStreamInfo > audio_stream_info