ESPHome 2025.9.0-dev
Loading...
Searching...
No Matches
audio_pipeline.cpp
Go to the documentation of this file.
1#include "audio_pipeline.h"
2
3#ifdef USE_ESP_IDF
4
6#include "esphome/core/hal.h"
8#include "esphome/core/log.h"
9
10namespace esphome {
11namespace speaker {
12
13static const uint32_t INITIAL_BUFFER_MS = 1000; // Start playback after buffering this duration of the file
14
15static const uint32_t READ_TASK_STACK_SIZE = 5 * 1024;
16static const uint32_t DECODE_TASK_STACK_SIZE = 3 * 1024;
17
18static const uint32_t INFO_ERROR_QUEUE_COUNT = 5;
19
20static const char *const TAG = "speaker_media_player.pipeline";
21
22enum EventGroupBits : uint32_t {
23 // MESSAGE_* bits are only set by their respective tasks
24
25 // Stops all activity in the pipeline elements; cleared by process_state() and set by stop() or by each task
27
28 // Read audio from an HTTP source; cleared by reader task and set by start_url
30 // Read audio from an audio file from the flash; cleared by reader task and set by start_file
32
33 // Audio file type is read after checking it is supported; cleared by decoder task
35 // Reader is done (either through a failure or just end of the stream); cleared by reader task
37 // Error reading the file; cleared by process_state()
39
40 // Decoder is done (either through a faiilure or the end of the stream); cleared by decoder task
42 // Error decoding the file; cleared by process_state() by decoder task
44};
45
46AudioPipeline::AudioPipeline(speaker::Speaker *speaker, size_t buffer_size, bool task_stack_in_psram,
47 std::string base_name, UBaseType_t priority)
48 : base_name_(std::move(base_name)),
49 priority_(priority),
50 task_stack_in_psram_(task_stack_in_psram),
51 speaker_(speaker),
52 buffer_size_(buffer_size) {
54 this->transfer_buffer_size_ = std::min(buffer_size_ / 4, DEFAULT_TRANSFER_BUFFER_SIZE);
55}
56
57void AudioPipeline::start_url(const std::string &uri) {
58 if (this->is_playing_) {
59 xEventGroupSetBits(this->event_group_, PIPELINE_COMMAND_STOP);
60 }
61 this->current_uri_ = uri;
62 this->pending_url_ = true;
63}
64
66 if (this->is_playing_) {
67 xEventGroupSetBits(this->event_group_, PIPELINE_COMMAND_STOP);
68 }
69 this->current_audio_file_ = audio_file;
70 this->pending_file_ = true;
71}
72
74 xEventGroupSetBits(this->event_group_, EventGroupBits::PIPELINE_COMMAND_STOP);
75
76 return ESP_OK;
77}
78void AudioPipeline::set_pause_state(bool pause_state) {
79 this->speaker_->set_pause_state(pause_state);
80
81 this->pause_state_ = pause_state;
82}
83
85 if (this->read_task_handle_ != nullptr) {
86 vTaskSuspend(this->read_task_handle_);
87 }
88 if (this->decode_task_handle_ != nullptr) {
89 vTaskSuspend(this->decode_task_handle_);
90 }
91}
92
94 if (this->read_task_handle_ != nullptr) {
95 vTaskResume(this->read_task_handle_);
96 }
97 if (this->decode_task_handle_ != nullptr) {
98 vTaskResume(this->decode_task_handle_);
99 }
100}
101
103 /*
104 * Log items from info error queue
105 */
106 InfoErrorEvent event;
107 if (this->info_error_queue_ != nullptr) {
108 while (xQueueReceive(this->info_error_queue_, &event, 0)) {
109 switch (event.source) {
111 if (event.err.has_value()) {
112 ESP_LOGE(TAG, "Media reader encountered an error: %s", esp_err_to_name(event.err.value()));
113 } else if (event.file_type.has_value()) {
114 ESP_LOGD(TAG, "Reading %s file type", audio_file_type_to_string(event.file_type.value()));
115 }
116
117 break;
119 if (event.err.has_value()) {
120 ESP_LOGE(TAG, "Decoder encountered an error: %s", esp_err_to_name(event.err.value()));
121 }
122
123 if (event.audio_stream_info.has_value()) {
124 ESP_LOGD(TAG, "Decoded audio has %d channels, %" PRId32 " Hz sample rate, and %d bits per sample",
125 event.audio_stream_info.value().get_channels(), event.audio_stream_info.value().get_sample_rate(),
126 event.audio_stream_info.value().get_bits_per_sample());
127 }
128
129 if (event.decoding_err.has_value()) {
130 switch (event.decoding_err.value()) {
132 ESP_LOGE(TAG, "Failed to parse the file's header.");
133 break;
135 ESP_LOGE(TAG, "Incompatible bits per sample. Only 16 bits per sample is supported");
136 break;
138 ESP_LOGE(TAG, "Incompatible number of channels. Only 1 or 2 channel audio is supported.");
139 break;
140 }
141 }
142 break;
143 }
144 }
145 }
146
147 /*
148 * Determine the current state based on the event group bits and tasks' status
149 */
150
151 EventBits_t event_bits = xEventGroupGetBits(this->event_group_);
152
153 if (this->pending_url_ || this->pending_file_) {
154 // Init command pending
155 if (!(event_bits & EventGroupBits::PIPELINE_COMMAND_STOP)) {
156 // Only start if there is no pending stop command
157 if ((this->read_task_handle_ == nullptr) || (this->decode_task_handle_ == nullptr)) {
158 // At least one task isn't running
159 this->start_tasks_();
160 }
161
162 if (this->pending_url_) {
164 this->playback_ms_ = 0;
165 this->pending_url_ = false;
166 } else if (this->pending_file_) {
168 this->playback_ms_ = 0;
169 this->pending_file_ = false;
170 }
171
172 this->is_playing_ = true;
174 }
175 }
176
177 if ((event_bits & EventGroupBits::READER_MESSAGE_ERROR)) {
178 xEventGroupClearBits(this->event_group_, EventGroupBits::READER_MESSAGE_ERROR);
180 }
181
182 if ((event_bits & EventGroupBits::DECODER_MESSAGE_ERROR)) {
183 xEventGroupClearBits(this->event_group_, EventGroupBits::DECODER_MESSAGE_ERROR);
185 }
186
187 if ((event_bits & EventGroupBits::READER_MESSAGE_FINISHED) &&
190 // Tasks are finished and there's no media in between the reader and decoder
191
192 if (event_bits & EventGroupBits::PIPELINE_COMMAND_STOP) {
193 // Stop command is fully processed, so clear the command bit
194 xEventGroupClearBits(this->event_group_, EventGroupBits::PIPELINE_COMMAND_STOP);
195 this->hard_stop_ = true;
196 }
197
198 if (!this->is_playing_) {
199 // The tasks have been stopped for two ``process_state`` calls in a row, so delete the tasks
200 if ((this->read_task_handle_ != nullptr) || (this->decode_task_handle_ != nullptr)) {
201 this->delete_tasks_();
202 if (this->hard_stop_) {
203 // Stop command was sent, so immediately end the playback
204 this->speaker_->stop();
205 this->hard_stop_ = false;
206 } else {
207 // Decoded all the audio, so let the speaker finish playing before stopping
208 this->speaker_->finish();
209 }
210 }
211 }
212 this->is_playing_ = false;
213 if (!this->speaker_->is_running()) {
215 } else {
216 this->is_finishing_ = true;
217 }
218 }
219
220 if (this->pause_state_) {
222 }
223
224 if (this->is_finishing_) {
225 if (!this->speaker_->is_running()) {
226 this->is_finishing_ = false;
227 } else {
229 }
230 }
231
232 if ((this->read_task_handle_ == nullptr) && (this->decode_task_handle_ == nullptr)) {
233 // No tasks are running, so the pipeline is stopped.
234 xEventGroupClearBits(this->event_group_, EventGroupBits::PIPELINE_COMMAND_STOP);
236 }
237
238 this->is_playing_ = true;
240}
241
243 if (this->event_group_ == nullptr)
244 this->event_group_ = xEventGroupCreate();
245
246 if (this->event_group_ == nullptr) {
247 return ESP_ERR_NO_MEM;
248 }
249
250 if (this->info_error_queue_ == nullptr)
251 this->info_error_queue_ = xQueueCreate(INFO_ERROR_QUEUE_COUNT, sizeof(InfoErrorEvent));
252
253 if (this->info_error_queue_ == nullptr)
254 return ESP_ERR_NO_MEM;
255
256 return ESP_OK;
257}
258
260 if (this->read_task_handle_ == nullptr) {
261 if (this->read_task_stack_buffer_ == nullptr) {
262 // Reader task uses the AudioReader class which uses esp_http_client. This crashes on IDF 5.4 if the task stack is
263 // in PSRAM. As a workaround, always allocate the read task in internal memory.
265 this->read_task_stack_buffer_ = stack_allocator.allocate(READ_TASK_STACK_SIZE);
266 }
267
268 if (this->read_task_stack_buffer_ == nullptr) {
269 return ESP_ERR_NO_MEM;
270 }
271
272 if (this->read_task_handle_ == nullptr) {
273 this->read_task_handle_ =
274 xTaskCreateStatic(read_task, (this->base_name_ + "_read").c_str(), READ_TASK_STACK_SIZE, (void *) this,
276 }
277
278 if (this->read_task_handle_ == nullptr) {
279 return ESP_ERR_INVALID_STATE;
280 }
281 }
282
283 if (this->decode_task_handle_ == nullptr) {
284 if (this->decode_task_stack_buffer_ == nullptr) {
285 if (this->task_stack_in_psram_) {
287 this->decode_task_stack_buffer_ = stack_allocator.allocate(DECODE_TASK_STACK_SIZE);
288 } else {
290 this->decode_task_stack_buffer_ = stack_allocator.allocate(DECODE_TASK_STACK_SIZE);
291 }
292 }
293
294 if (this->decode_task_stack_buffer_ == nullptr) {
295 return ESP_ERR_NO_MEM;
296 }
297
298 if (this->decode_task_handle_ == nullptr) {
299 this->decode_task_handle_ =
300 xTaskCreateStatic(decode_task, (this->base_name_ + "_decode").c_str(), DECODE_TASK_STACK_SIZE, (void *) this,
302 }
303
304 if (this->decode_task_handle_ == nullptr) {
305 return ESP_ERR_INVALID_STATE;
306 }
307 }
308
309 return ESP_OK;
310}
311
313 if (this->read_task_handle_ != nullptr) {
314 vTaskDelete(this->read_task_handle_);
315
316 if (this->read_task_stack_buffer_ != nullptr) {
317 if (this->task_stack_in_psram_) {
319 stack_allocator.deallocate(this->read_task_stack_buffer_, READ_TASK_STACK_SIZE);
320 } else {
322 stack_allocator.deallocate(this->read_task_stack_buffer_, READ_TASK_STACK_SIZE);
323 }
324
325 this->read_task_stack_buffer_ = nullptr;
326 this->read_task_handle_ = nullptr;
327 }
328 }
329
330 if (this->decode_task_handle_ != nullptr) {
331 vTaskDelete(this->decode_task_handle_);
332
333 if (this->decode_task_stack_buffer_ != nullptr) {
334 if (this->task_stack_in_psram_) {
336 stack_allocator.deallocate(this->decode_task_stack_buffer_, DECODE_TASK_STACK_SIZE);
337 } else {
339 stack_allocator.deallocate(this->decode_task_stack_buffer_, DECODE_TASK_STACK_SIZE);
340 }
341
342 this->decode_task_stack_buffer_ = nullptr;
343 this->decode_task_handle_ = nullptr;
344 }
345 }
346}
347
348void AudioPipeline::read_task(void *params) {
349 AudioPipeline *this_pipeline = (AudioPipeline *) params;
350
351 while (true) {
352 xEventGroupSetBits(this_pipeline->event_group_, EventGroupBits::READER_MESSAGE_FINISHED);
353
354 // Wait until the pipeline notifies us the source of the media file
355 EventBits_t event_bits = xEventGroupWaitBits(
356 this_pipeline->event_group_,
358 pdFALSE, // Clear the bit on exit
359 pdFALSE, // Wait for all the bits,
360 portMAX_DELAY); // Block indefinitely until bit is set
361
362 if (!(event_bits & EventGroupBits::PIPELINE_COMMAND_STOP)) {
363 xEventGroupClearBits(this_pipeline->event_group_, EventGroupBits::READER_MESSAGE_FINISHED |
366 InfoErrorEvent event;
368 esp_err_t err = ESP_OK;
369
370 std::unique_ptr<audio::AudioReader> reader =
371 make_unique<audio::AudioReader>(this_pipeline->transfer_buffer_size_);
372
374 err = reader->start(this_pipeline->current_audio_file_, this_pipeline->current_audio_file_type_);
375 } else {
376 err = reader->start(this_pipeline->current_uri_, this_pipeline->current_audio_file_type_);
377 }
378
379 if (err == ESP_OK) {
380 size_t file_ring_buffer_size = this_pipeline->buffer_size_;
381
382 std::shared_ptr<RingBuffer> temp_ring_buffer;
383
384 if (!this_pipeline->raw_file_ring_buffer_.use_count()) {
385 temp_ring_buffer = RingBuffer::create(file_ring_buffer_size);
386 this_pipeline->raw_file_ring_buffer_ = temp_ring_buffer;
387 }
388
389 if (!this_pipeline->raw_file_ring_buffer_.use_count()) {
390 err = ESP_ERR_NO_MEM;
391 } else {
392 reader->add_sink(this_pipeline->raw_file_ring_buffer_);
393 }
394 }
395
396 if (err != ESP_OK) {
397 // Send specific error message
398 event.err = err;
399 xQueueSend(this_pipeline->info_error_queue_, &event, portMAX_DELAY);
400
401 // Setting up the reader failed, stop the pipeline
402 xEventGroupSetBits(this_pipeline->event_group_,
404 } else {
405 // Send the file type to the pipeline
406 event.file_type = this_pipeline->current_audio_file_type_;
407 xQueueSend(this_pipeline->info_error_queue_, &event, portMAX_DELAY);
408 xEventGroupSetBits(this_pipeline->event_group_, EventGroupBits::READER_MESSAGE_LOADED_MEDIA_TYPE);
409 }
410
411 while (true) {
412 event_bits = xEventGroupGetBits(this_pipeline->event_group_);
413
414 if (event_bits & EventGroupBits::PIPELINE_COMMAND_STOP) {
415 break;
416 }
417
418 audio::AudioReaderState reader_state = reader->read();
419
420 if (reader_state == audio::AudioReaderState::FINISHED) {
421 break;
422 } else if (reader_state == audio::AudioReaderState::FAILED) {
423 xEventGroupSetBits(this_pipeline->event_group_,
425 break;
426 }
427 }
428 event_bits = xEventGroupGetBits(this_pipeline->event_group_);
430 (this_pipeline->raw_file_ring_buffer_.use_count() == 1)) {
431 // Decoder task hasn't started yet, so delay a bit before releasing ownership of the ring buffer
432 delay(10);
433 }
434 }
435 }
436}
437
438void AudioPipeline::decode_task(void *params) {
439 AudioPipeline *this_pipeline = (AudioPipeline *) params;
440
441 while (true) {
442 xEventGroupSetBits(this_pipeline->event_group_, EventGroupBits::DECODER_MESSAGE_FINISHED);
443
444 // Wait until the reader notifies us that the media type is available
445 EventBits_t event_bits =
446 xEventGroupWaitBits(this_pipeline->event_group_,
448 pdFALSE, // Clear the bit on exit
449 pdFALSE, // Wait for all the bits,
450 portMAX_DELAY); // Block indefinitely until bit is set
451
452 xEventGroupClearBits(this_pipeline->event_group_,
454
455 if (!(event_bits & EventGroupBits::PIPELINE_COMMAND_STOP)) {
456 InfoErrorEvent event;
458
459 std::unique_ptr<audio::AudioDecoder> decoder =
460 make_unique<audio::AudioDecoder>(this_pipeline->transfer_buffer_size_, this_pipeline->transfer_buffer_size_);
461
462 esp_err_t err = decoder->start(this_pipeline->current_audio_file_type_);
463 decoder->add_source(this_pipeline->raw_file_ring_buffer_);
464
465 if (err != ESP_OK) {
466 // Send specific error message
467 event.err = err;
468 xQueueSend(this_pipeline->info_error_queue_, &event, portMAX_DELAY);
469
470 // Setting up the decoder failed, stop the pipeline
471 xEventGroupSetBits(this_pipeline->event_group_,
473 }
474
475 bool has_stream_info = false;
476 bool started_playback = false;
477
478 size_t initial_bytes_to_buffer = 0;
479
480 while (true) {
481 event_bits = xEventGroupGetBits(this_pipeline->event_group_);
482
483 if (event_bits & EventGroupBits::PIPELINE_COMMAND_STOP) {
484 break;
485 }
486
487 // Update pause state
488 if (!started_playback) {
489 if (!(event_bits & EventGroupBits::READER_MESSAGE_FINISHED)) {
490 decoder->set_pause_output_state(true);
491 } else {
492 started_playback = true;
493 }
494 } else {
495 decoder->set_pause_output_state(this_pipeline->pause_state_);
496 }
497
498 // Stop gracefully if the reader has finished
499 audio::AudioDecoderState decoder_state = decoder->decode(event_bits & EventGroupBits::READER_MESSAGE_FINISHED);
500
501 if ((decoder_state == audio::AudioDecoderState::DECODING) ||
502 (decoder_state == audio::AudioDecoderState::FINISHED)) {
503 this_pipeline->playback_ms_ = decoder->get_playback_ms();
504 }
505
506 if (decoder_state == audio::AudioDecoderState::FINISHED) {
507 break;
508 } else if (decoder_state == audio::AudioDecoderState::FAILED) {
509 if (!has_stream_info) {
510 event.decoding_err = DecodingError::FAILED_HEADER;
511 xQueueSend(this_pipeline->info_error_queue_, &event, portMAX_DELAY);
512 }
513 xEventGroupSetBits(this_pipeline->event_group_,
515 break;
516 }
517
518 if (!has_stream_info && decoder->get_audio_stream_info().has_value()) {
519 has_stream_info = true;
520
521 this_pipeline->current_audio_stream_info_ = decoder->get_audio_stream_info().value();
522
523 // Send the stream information to the pipeline
524 event.audio_stream_info = this_pipeline->current_audio_stream_info_;
525
526 if (this_pipeline->current_audio_stream_info_.get_bits_per_sample() != 16) {
527 // Error state, incompatible bits per sample
529 xEventGroupSetBits(this_pipeline->event_group_,
531 } else if ((this_pipeline->current_audio_stream_info_.get_channels() > 2)) {
532 // Error state, incompatible number of channels
533 event.decoding_err = DecodingError::INCOMPATIBLE_CHANNELS;
534 xEventGroupSetBits(this_pipeline->event_group_,
536 } else {
537 // Send audio directly to the speaker
538 this_pipeline->speaker_->set_audio_stream_info(this_pipeline->current_audio_stream_info_);
539 decoder->add_sink(this_pipeline->speaker_);
540 }
541
542 initial_bytes_to_buffer = std::min(this_pipeline->current_audio_stream_info_.ms_to_bytes(INITIAL_BUFFER_MS),
543 this_pipeline->buffer_size_ * 3 / 4);
544
545 switch (this_pipeline->current_audio_file_type_) {
546#ifdef USE_AUDIO_MP3_SUPPORT
548 initial_bytes_to_buffer /= 8; // Estimate the MP3 compression factor is 8
549 break;
550#endif
551#ifdef USE_AUDIO_FLAC_SUPPORT
553 initial_bytes_to_buffer /= 2; // Estimate the FLAC compression factor is 2
554 break;
555#endif
556 default:
557 break;
558 }
559 xQueueSend(this_pipeline->info_error_queue_, &event, portMAX_DELAY);
560 }
561
562 if (!started_playback && has_stream_info) {
563 // Verify enough data is available before starting playback
564 std::shared_ptr<RingBuffer> temp_ring_buffer = this_pipeline->raw_file_ring_buffer_.lock();
565 if (temp_ring_buffer->available() >= initial_bytes_to_buffer) {
566 started_playback = true;
567 }
568 }
569 }
570 }
571 }
572}
573
574} // namespace speaker
575} // namespace esphome
576
577#endif
An STL allocator that uses SPI or internal RAM.
Definition helpers.h:818
void deallocate(T *p, size_t n)
Definition helpers.h:876
T * allocate(size_t n)
Definition helpers.h:838
static std::unique_ptr< RingBuffer > create(size_t len)
size_t ms_to_bytes(uint32_t ms) const
Converts duration to bytes.
Definition audio.h:73
uint8_t get_bits_per_sample() const
Definition audio.h:28
uint8_t get_channels() const
Definition audio.h:29
bool has_value() const
Definition optional.h:92
value_type const & value() const
Definition optional.h:94
static void read_task(void *params)
void suspend_tasks()
Suspends any running tasks.
void set_pause_state(bool pause_state)
void delete_tasks_()
Resets the task related pointers and deallocates their stacks.
std::weak_ptr< RingBuffer > raw_file_ring_buffer_
void start_url(const std::string &uri)
Starts an audio pipeline given a media url.
esp_err_t allocate_communications_()
Allocates the event group and info error queue.
esp_err_t start_tasks_()
Common start code for the pipeline, regardless if the source is a file or url.
audio::AudioStreamInfo current_audio_stream_info_
void start_file(audio::AudioFile *audio_file)
Starts an audio pipeline given a AudioFile pointer.
esp_err_t stop()
Stops the pipeline.
static void decode_task(void *params)
AudioPipeline(speaker::Speaker *speaker, size_t buffer_size, bool task_stack_in_psram, std::string base_name, UBaseType_t priority)
void resume_tasks()
Resumes any running tasks.
audio::AudioFile * current_audio_file_
audio::AudioFileType current_audio_file_type_
AudioPipelineState process_state()
Processes the state of the audio pipeline based on the info_error_queue_ and event_group_.
bool is_running() const
Definition speaker.h:66
virtual void set_pause_state(bool pause_state)
Definition speaker.h:61
void set_audio_stream_info(const audio::AudioStreamInfo &audio_stream_info)
Definition speaker.h:99
virtual void finish()
Definition speaker.h:58
virtual void stop()=0
uint8_t priority
Providing packet encoding functions for exchanging data with a remote host.
Definition a01nyub.cpp:7
void IRAM_ATTR HOT delay(uint32_t ms)
Definition core.cpp:29
optional< DecodingError > decoding_err
optional< audio::AudioFileType > file_type
optional< audio::AudioStreamInfo > audio_stream_info