ESPHome 2026.3.0-dev
Loading...
Searching...
No Matches
audio_pipeline.cpp
Go to the documentation of this file.
1#include "audio_pipeline.h"
2
3#ifdef USE_ESP32
4
6#include "esphome/core/hal.h"
8#include "esphome/core/log.h"
9
10namespace esphome {
11namespace speaker {
12
13static const uint32_t INITIAL_BUFFER_MS = 1000; // Start playback after buffering this duration of the file
14
15static const uint32_t READ_TASK_STACK_SIZE = 5 * 1024;
16// Opus decoding uses more stack than other codecs
17#ifdef USE_AUDIO_OPUS_SUPPORT
18static const uint32_t DECODE_TASK_STACK_SIZE = 5 * 1024;
19#else
20static const uint32_t DECODE_TASK_STACK_SIZE = 3 * 1024;
21#endif
22
23static const uint32_t INFO_ERROR_QUEUE_COUNT = 5;
24
25static const char *const TAG = "speaker_media_player.pipeline";
26
27enum EventGroupBits : uint32_t {
28 // MESSAGE_* bits are only set by their respective tasks
29
30 // Stops all activity in the pipeline elements; cleared by process_state() and set by stop() or by each task
32
33 // Read audio from an HTTP source; cleared by reader task and set by start_url
35 // Read audio from an audio file from the flash; cleared by reader task and set by start_file
37
38 // Audio file type is read after checking it is supported; cleared by decoder task
40 // Reader is done (either through a failure or just end of the stream); cleared by reader task
42 // Error reading the file; cleared by process_state()
44
45 // Decoder is done (either through a faiilure or the end of the stream); cleared by decoder task
47 // Error decoding the file; cleared by process_state() by decoder task
49};
50
51AudioPipeline::AudioPipeline(speaker::Speaker *speaker, size_t buffer_size, bool task_stack_in_psram,
52 std::string base_name, UBaseType_t priority)
53 : base_name_(std::move(base_name)),
54 priority_(priority),
55 task_stack_in_psram_(task_stack_in_psram),
56 speaker_(speaker),
57 buffer_size_(buffer_size) {
59 this->transfer_buffer_size_ = std::min(buffer_size_ / 4, DEFAULT_TRANSFER_BUFFER_SIZE);
60}
61
62void AudioPipeline::start_url(const std::string &uri) {
63 if (this->is_playing_) {
64 xEventGroupSetBits(this->event_group_, PIPELINE_COMMAND_STOP);
65 }
66 this->current_uri_ = uri;
67 this->pending_url_ = true;
68}
69
71 if (this->is_playing_) {
72 xEventGroupSetBits(this->event_group_, PIPELINE_COMMAND_STOP);
73 }
74 this->current_audio_file_ = audio_file;
75 this->pending_file_ = true;
76}
77
79 xEventGroupSetBits(this->event_group_, EventGroupBits::PIPELINE_COMMAND_STOP);
80
81 return ESP_OK;
82}
83void AudioPipeline::set_pause_state(bool pause_state) {
84 this->speaker_->set_pause_state(pause_state);
85
86 this->pause_state_ = pause_state;
87}
88
90 if (this->read_task_handle_ != nullptr) {
91 vTaskSuspend(this->read_task_handle_);
92 }
93 if (this->decode_task_handle_ != nullptr) {
94 vTaskSuspend(this->decode_task_handle_);
95 }
96}
97
99 if (this->read_task_handle_ != nullptr) {
100 vTaskResume(this->read_task_handle_);
101 }
102 if (this->decode_task_handle_ != nullptr) {
103 vTaskResume(this->decode_task_handle_);
104 }
105}
106
108 /*
109 * Log items from info error queue
110 */
111 InfoErrorEvent event;
112 if (this->info_error_queue_ != nullptr) {
113 while (xQueueReceive(this->info_error_queue_, &event, 0)) {
114 switch (event.source) {
116 if (event.err.has_value()) {
117 ESP_LOGE(TAG, "Media reader encountered an error: %s", esp_err_to_name(event.err.value()));
118 } else if (event.file_type.has_value()) {
119 ESP_LOGD(TAG, "Reading %s file type", audio_file_type_to_string(event.file_type.value()));
120 }
121
122 break;
124 if (event.err.has_value()) {
125 ESP_LOGE(TAG, "Decoder encountered an error: %s", esp_err_to_name(event.err.value()));
126 }
127
128 if (event.audio_stream_info.has_value()) {
129 ESP_LOGD(TAG, "Decoded audio has %d channels, %" PRId32 " Hz sample rate, and %d bits per sample",
130 event.audio_stream_info.value().get_channels(), event.audio_stream_info.value().get_sample_rate(),
131 event.audio_stream_info.value().get_bits_per_sample());
132 }
133
134 if (event.decoding_err.has_value()) {
135 switch (event.decoding_err.value()) {
137 ESP_LOGE(TAG, "Failed to parse the file's header.");
138 break;
140 ESP_LOGE(TAG, "Incompatible bits per sample. Only 16 bits per sample is supported");
141 break;
143 ESP_LOGE(TAG, "Incompatible number of channels. Only 1 or 2 channel audio is supported.");
144 break;
145 }
146 }
147 break;
148 }
149 }
150 }
151
152 /*
153 * Determine the current state based on the event group bits and tasks' status
154 */
155
156 EventBits_t event_bits = xEventGroupGetBits(this->event_group_);
157
158 if (this->pending_url_ || this->pending_file_) {
159 // Init command pending
160 if (!(event_bits & EventGroupBits::PIPELINE_COMMAND_STOP)) {
161 // Only start if there is no pending stop command
162 if ((this->read_task_handle_ == nullptr) || (this->decode_task_handle_ == nullptr)) {
163 // At least one task isn't running
164 this->start_tasks_();
165 }
166
167 if (this->pending_url_) {
169 this->playback_ms_ = 0;
170 this->pending_url_ = false;
171 } else if (this->pending_file_) {
173 this->playback_ms_ = 0;
174 this->pending_file_ = false;
175 }
176
177 this->is_playing_ = true;
179 }
180 }
181
182 if ((event_bits & EventGroupBits::READER_MESSAGE_ERROR)) {
183 xEventGroupClearBits(this->event_group_, EventGroupBits::READER_MESSAGE_ERROR);
185 }
186
187 if ((event_bits & EventGroupBits::DECODER_MESSAGE_ERROR)) {
188 xEventGroupClearBits(this->event_group_, EventGroupBits::DECODER_MESSAGE_ERROR);
190 }
191
192 if ((event_bits & EventGroupBits::READER_MESSAGE_FINISHED) &&
195 // Tasks are finished and there's no media in between the reader and decoder
196
197 if (event_bits & EventGroupBits::PIPELINE_COMMAND_STOP) {
198 // Stop command is fully processed, so clear the command bit
199 xEventGroupClearBits(this->event_group_, EventGroupBits::PIPELINE_COMMAND_STOP);
200 this->hard_stop_ = true;
201 }
202
203 if (!this->is_playing_) {
204 // The tasks have been stopped for two ``process_state`` calls in a row, so delete the tasks
205 if ((this->read_task_handle_ != nullptr) || (this->decode_task_handle_ != nullptr)) {
206 this->delete_tasks_();
207 if (this->hard_stop_) {
208 // Stop command was sent, so immediately end the playback
209 this->speaker_->stop();
210 this->hard_stop_ = false;
211 } else {
212 // Decoded all the audio, so let the speaker finish playing before stopping
213 this->speaker_->finish();
214 }
215 }
216 }
217 this->is_playing_ = false;
218 if (!this->speaker_->is_running()) {
220 } else {
221 this->is_finishing_ = true;
222 }
223 }
224
225 if (this->pause_state_) {
227 }
228
229 if (this->is_finishing_) {
230 if (!this->speaker_->is_running()) {
231 this->is_finishing_ = false;
232 } else {
234 }
235 }
236
237 if ((this->read_task_handle_ == nullptr) && (this->decode_task_handle_ == nullptr)) {
238 // No tasks are running, so the pipeline is stopped.
239 xEventGroupClearBits(this->event_group_, EventGroupBits::PIPELINE_COMMAND_STOP);
241 }
242
243 this->is_playing_ = true;
245}
246
248 if (this->event_group_ == nullptr)
249 this->event_group_ = xEventGroupCreate();
250
251 if (this->event_group_ == nullptr) {
252 return ESP_ERR_NO_MEM;
253 }
254
255 if (this->info_error_queue_ == nullptr)
256 this->info_error_queue_ = xQueueCreate(INFO_ERROR_QUEUE_COUNT, sizeof(InfoErrorEvent));
257
258 if (this->info_error_queue_ == nullptr)
259 return ESP_ERR_NO_MEM;
260
261 return ESP_OK;
262}
263
265 if (this->read_task_handle_ == nullptr) {
266 if (this->read_task_stack_buffer_ == nullptr) {
267 // Reader task uses the AudioReader class which uses esp_http_client. This crashes on IDF 5.4 if the task stack is
268 // in PSRAM. As a workaround, always allocate the read task in internal memory.
270 this->read_task_stack_buffer_ = stack_allocator.allocate(READ_TASK_STACK_SIZE);
271 }
272
273 if (this->read_task_stack_buffer_ == nullptr) {
274 return ESP_ERR_NO_MEM;
275 }
276
277 if (this->read_task_handle_ == nullptr) {
278 this->read_task_handle_ =
279 xTaskCreateStatic(read_task, (this->base_name_ + "_read").c_str(), READ_TASK_STACK_SIZE, (void *) this,
281 }
282
283 if (this->read_task_handle_ == nullptr) {
284 return ESP_ERR_INVALID_STATE;
285 }
286 }
287
288 if (this->decode_task_handle_ == nullptr) {
289 if (this->decode_task_stack_buffer_ == nullptr) {
290 if (this->task_stack_in_psram_) {
292 this->decode_task_stack_buffer_ = stack_allocator.allocate(DECODE_TASK_STACK_SIZE);
293 } else {
295 this->decode_task_stack_buffer_ = stack_allocator.allocate(DECODE_TASK_STACK_SIZE);
296 }
297 }
298
299 if (this->decode_task_stack_buffer_ == nullptr) {
300 return ESP_ERR_NO_MEM;
301 }
302
303 if (this->decode_task_handle_ == nullptr) {
304 this->decode_task_handle_ =
305 xTaskCreateStatic(decode_task, (this->base_name_ + "_decode").c_str(), DECODE_TASK_STACK_SIZE, (void *) this,
307 }
308
309 if (this->decode_task_handle_ == nullptr) {
310 return ESP_ERR_INVALID_STATE;
311 }
312 }
313
314 return ESP_OK;
315}
316
318 if (this->read_task_handle_ != nullptr) {
319 vTaskDelete(this->read_task_handle_);
320
321 if (this->read_task_stack_buffer_ != nullptr) {
322 if (this->task_stack_in_psram_) {
324 stack_allocator.deallocate(this->read_task_stack_buffer_, READ_TASK_STACK_SIZE);
325 } else {
327 stack_allocator.deallocate(this->read_task_stack_buffer_, READ_TASK_STACK_SIZE);
328 }
329
330 this->read_task_stack_buffer_ = nullptr;
331 this->read_task_handle_ = nullptr;
332 }
333 }
334
335 if (this->decode_task_handle_ != nullptr) {
336 vTaskDelete(this->decode_task_handle_);
337
338 if (this->decode_task_stack_buffer_ != nullptr) {
339 if (this->task_stack_in_psram_) {
341 stack_allocator.deallocate(this->decode_task_stack_buffer_, DECODE_TASK_STACK_SIZE);
342 } else {
344 stack_allocator.deallocate(this->decode_task_stack_buffer_, DECODE_TASK_STACK_SIZE);
345 }
346
347 this->decode_task_stack_buffer_ = nullptr;
348 this->decode_task_handle_ = nullptr;
349 }
350 }
351}
352
353void AudioPipeline::read_task(void *params) {
354 AudioPipeline *this_pipeline = (AudioPipeline *) params;
355
356 while (true) {
357 xEventGroupSetBits(this_pipeline->event_group_, EventGroupBits::READER_MESSAGE_FINISHED);
358
359 // Wait until the pipeline notifies us the source of the media file
360 EventBits_t event_bits = xEventGroupWaitBits(
361 this_pipeline->event_group_,
363 pdFALSE, // Clear the bit on exit
364 pdFALSE, // Wait for all the bits,
365 portMAX_DELAY); // Block indefinitely until bit is set
366
367 if (!(event_bits & EventGroupBits::PIPELINE_COMMAND_STOP)) {
368 xEventGroupClearBits(this_pipeline->event_group_, EventGroupBits::READER_MESSAGE_FINISHED |
371 InfoErrorEvent event;
373 esp_err_t err = ESP_OK;
374
375 std::unique_ptr<audio::AudioReader> reader =
376 make_unique<audio::AudioReader>(this_pipeline->transfer_buffer_size_);
377
379 err = reader->start(this_pipeline->current_audio_file_, this_pipeline->current_audio_file_type_);
380 } else {
381 err = reader->start(this_pipeline->current_uri_, this_pipeline->current_audio_file_type_);
382 }
383
384 if (err == ESP_OK) {
385 size_t file_ring_buffer_size = this_pipeline->buffer_size_;
386
387 std::shared_ptr<RingBuffer> temp_ring_buffer;
388
389 if (!this_pipeline->raw_file_ring_buffer_.use_count()) {
390 temp_ring_buffer = RingBuffer::create(file_ring_buffer_size);
391 this_pipeline->raw_file_ring_buffer_ = temp_ring_buffer;
392 }
393
394 if (!this_pipeline->raw_file_ring_buffer_.use_count()) {
395 err = ESP_ERR_NO_MEM;
396 } else {
397 reader->add_sink(this_pipeline->raw_file_ring_buffer_);
398 }
399 }
400
401 if (err != ESP_OK) {
402 // Send specific error message
403 event.err = err;
404 xQueueSend(this_pipeline->info_error_queue_, &event, portMAX_DELAY);
405
406 // Setting up the reader failed, stop the pipeline
407 xEventGroupSetBits(this_pipeline->event_group_,
409 } else {
410 // Send the file type to the pipeline
411 event.file_type = this_pipeline->current_audio_file_type_;
412 xQueueSend(this_pipeline->info_error_queue_, &event, portMAX_DELAY);
413 xEventGroupSetBits(this_pipeline->event_group_, EventGroupBits::READER_MESSAGE_LOADED_MEDIA_TYPE);
414 }
415
416 while (true) {
417 event_bits = xEventGroupGetBits(this_pipeline->event_group_);
418
419 if (event_bits & EventGroupBits::PIPELINE_COMMAND_STOP) {
420 break;
421 }
422
423 audio::AudioReaderState reader_state = reader->read();
424
425 if (reader_state == audio::AudioReaderState::FINISHED) {
426 break;
427 } else if (reader_state == audio::AudioReaderState::FAILED) {
428 xEventGroupSetBits(this_pipeline->event_group_,
430 break;
431 }
432 }
433 event_bits = xEventGroupGetBits(this_pipeline->event_group_);
435 (this_pipeline->raw_file_ring_buffer_.use_count() == 1)) {
436 // Decoder task hasn't started yet, so delay a bit before releasing ownership of the ring buffer
437 delay(10);
438 }
439 }
440 }
441}
442
443void AudioPipeline::decode_task(void *params) {
444 AudioPipeline *this_pipeline = (AudioPipeline *) params;
445
446 while (true) {
447 xEventGroupSetBits(this_pipeline->event_group_, EventGroupBits::DECODER_MESSAGE_FINISHED);
448
449 // Wait until the reader notifies us that the media type is available
450 EventBits_t event_bits =
451 xEventGroupWaitBits(this_pipeline->event_group_,
453 pdFALSE, // Clear the bit on exit
454 pdFALSE, // Wait for all the bits,
455 portMAX_DELAY); // Block indefinitely until bit is set
456
457 xEventGroupClearBits(this_pipeline->event_group_,
459
460 if (!(event_bits & EventGroupBits::PIPELINE_COMMAND_STOP)) {
461 InfoErrorEvent event;
463
464 std::unique_ptr<audio::AudioDecoder> decoder =
465 make_unique<audio::AudioDecoder>(this_pipeline->transfer_buffer_size_, this_pipeline->transfer_buffer_size_);
466
467 esp_err_t err = decoder->start(this_pipeline->current_audio_file_type_);
468 decoder->add_source(this_pipeline->raw_file_ring_buffer_);
469
470 if (err != ESP_OK) {
471 // Send specific error message
472 event.err = err;
473 xQueueSend(this_pipeline->info_error_queue_, &event, portMAX_DELAY);
474
475 // Setting up the decoder failed, stop the pipeline
476 xEventGroupSetBits(this_pipeline->event_group_,
478 }
479
480 bool has_stream_info = false;
481 bool started_playback = false;
482
483 size_t initial_bytes_to_buffer = 0;
484
485 while (true) {
486 event_bits = xEventGroupGetBits(this_pipeline->event_group_);
487
488 if (event_bits & EventGroupBits::PIPELINE_COMMAND_STOP) {
489 break;
490 }
491
492 // Update pause state
493 if (!started_playback) {
494 if (!(event_bits & EventGroupBits::READER_MESSAGE_FINISHED)) {
495 decoder->set_pause_output_state(true);
496 } else {
497 started_playback = true;
498 }
499 } else {
500 decoder->set_pause_output_state(this_pipeline->pause_state_);
501 }
502
503 // Stop gracefully if the reader has finished
504 audio::AudioDecoderState decoder_state = decoder->decode(event_bits & EventGroupBits::READER_MESSAGE_FINISHED);
505
506 if ((decoder_state == audio::AudioDecoderState::DECODING) ||
507 (decoder_state == audio::AudioDecoderState::FINISHED)) {
508 this_pipeline->playback_ms_ = decoder->get_playback_ms();
509 }
510
511 if (decoder_state == audio::AudioDecoderState::FINISHED) {
512 break;
513 } else if (decoder_state == audio::AudioDecoderState::FAILED) {
514 if (!has_stream_info) {
515 event.decoding_err = DecodingError::FAILED_HEADER;
516 xQueueSend(this_pipeline->info_error_queue_, &event, portMAX_DELAY);
517 }
518 xEventGroupSetBits(this_pipeline->event_group_,
520 break;
521 }
522
523 if (!has_stream_info && decoder->get_audio_stream_info().has_value()) {
524 has_stream_info = true;
525
526 this_pipeline->current_audio_stream_info_ = decoder->get_audio_stream_info().value();
527
528 // Send the stream information to the pipeline
529 event.audio_stream_info = this_pipeline->current_audio_stream_info_;
530
531 if (this_pipeline->current_audio_stream_info_.get_bits_per_sample() != 16) {
532 // Error state, incompatible bits per sample
534 xEventGroupSetBits(this_pipeline->event_group_,
536 } else if ((this_pipeline->current_audio_stream_info_.get_channels() > 2)) {
537 // Error state, incompatible number of channels
538 event.decoding_err = DecodingError::INCOMPATIBLE_CHANNELS;
539 xEventGroupSetBits(this_pipeline->event_group_,
541 } else {
542 // Send audio directly to the speaker
543 this_pipeline->speaker_->set_audio_stream_info(this_pipeline->current_audio_stream_info_);
544 decoder->add_sink(this_pipeline->speaker_);
545 }
546
547 initial_bytes_to_buffer = std::min(this_pipeline->current_audio_stream_info_.ms_to_bytes(INITIAL_BUFFER_MS),
548 this_pipeline->buffer_size_ * 3 / 4);
549
550 switch (this_pipeline->current_audio_file_type_) {
551#ifdef USE_AUDIO_MP3_SUPPORT
553 initial_bytes_to_buffer /= 8; // Estimate the MP3 compression factor is 8
554 break;
555#endif
556#ifdef USE_AUDIO_FLAC_SUPPORT
558 initial_bytes_to_buffer /= 2; // Estimate the FLAC compression factor is 2
559 break;
560#endif
561#ifdef USE_AUDIO_OPUS_SUPPORT
563 initial_bytes_to_buffer /= 8; // Estimate the Opus compression factor is 8
564 break;
565#endif
566 default:
567 break;
568 }
569 xQueueSend(this_pipeline->info_error_queue_, &event, portMAX_DELAY);
570 }
571
572 if (!started_playback && has_stream_info) {
573 // Verify enough data is available before starting playback
574 std::shared_ptr<RingBuffer> temp_ring_buffer = this_pipeline->raw_file_ring_buffer_.lock();
575 if (temp_ring_buffer->available() >= initial_bytes_to_buffer) {
576 started_playback = true;
577 }
578 }
579 }
580 }
581 }
582}
583
584} // namespace speaker
585} // namespace esphome
586
587#endif
An STL allocator that uses SPI or internal RAM.
Definition helpers.h:1794
void deallocate(T *p, size_t n)
Definition helpers.h:1849
T * allocate(size_t n)
Definition helpers.h:1811
static std::unique_ptr< RingBuffer > create(size_t len)
size_t ms_to_bytes(uint32_t ms) const
Converts duration to bytes.
Definition audio.h:73
uint8_t get_bits_per_sample() const
Definition audio.h:28
uint8_t get_channels() const
Definition audio.h:29
bool has_value() const
Definition optional.h:92
value_type const & value() const
Definition optional.h:94
static void read_task(void *params)
void suspend_tasks()
Suspends any running tasks.
void set_pause_state(bool pause_state)
void delete_tasks_()
Resets the task related pointers and deallocates their stacks.
std::weak_ptr< RingBuffer > raw_file_ring_buffer_
void start_url(const std::string &uri)
Starts an audio pipeline given a media url.
esp_err_t allocate_communications_()
Allocates the event group and info error queue.
esp_err_t start_tasks_()
Common start code for the pipeline, regardless if the source is a file or url.
audio::AudioStreamInfo current_audio_stream_info_
void start_file(audio::AudioFile *audio_file)
Starts an audio pipeline given a AudioFile pointer.
esp_err_t stop()
Stops the pipeline.
static void decode_task(void *params)
AudioPipeline(speaker::Speaker *speaker, size_t buffer_size, bool task_stack_in_psram, std::string base_name, UBaseType_t priority)
void resume_tasks()
Resumes any running tasks.
audio::AudioFile * current_audio_file_
audio::AudioFileType current_audio_file_type_
AudioPipelineState process_state()
Processes the state of the audio pipeline based on the info_error_queue_ and event_group_.
bool is_running() const
Definition speaker.h:66
virtual void set_pause_state(bool pause_state)
Definition speaker.h:61
void set_audio_stream_info(const audio::AudioStreamInfo &audio_stream_info)
Definition speaker.h:99
virtual void finish()
Definition speaker.h:58
virtual void stop()=0
uint8_t priority
Providing packet encoding functions for exchanging data with a remote host.
Definition a01nyub.cpp:7
void HOT delay(uint32_t ms)
Definition core.cpp:27
optional< DecodingError > decoding_err
optional< audio::AudioFileType > file_type
optional< audio::AudioStreamInfo > audio_stream_info