ESPHome 2026.6.0-dev
Loading...
Searching...
No Matches
audio_pipeline.cpp
Go to the documentation of this file.
1#include "audio_pipeline.h"
2
3#ifdef USE_ESP32
4
6#include "esphome/core/hal.h"
8#include "esphome/core/log.h"
9
10namespace esphome::speaker {
11
12static const uint32_t INITIAL_BUFFER_MS = 1000; // Start playback after buffering this duration of the file
13
14static const uint32_t READ_TASK_STACK_SIZE = 5 * 1024;
15// Opus decoding uses more stack than other codecs
16#ifdef USE_AUDIO_OPUS_SUPPORT
17static const uint32_t DECODE_TASK_STACK_SIZE = 5 * 1024;
18#else
19static const uint32_t DECODE_TASK_STACK_SIZE = 3 * 1024;
20#endif
21
22static const uint32_t INFO_ERROR_QUEUE_COUNT = 5;
23
24static const char *const TAG = "speaker_media_player.pipeline";
25
27 // MESSAGE_* bits are only set by their respective tasks
28
29 // Stops all activity in the pipeline elements; cleared by process_state() and set by stop() or by each task
31
32 // Read audio from an HTTP source; cleared by reader task and set by start_url
34 // Read audio from an audio file from the flash; cleared by reader task and set by start_file
36
37 // Audio file type is read after checking it is supported; cleared by decoder task
39 // Reader is done (either through a failure or just end of the stream); cleared by reader task
41 // Error reading the file; cleared by process_state()
43
44 // Decoder is done (either through a faiilure or the end of the stream); cleared by decoder task
46 // Error decoding the file; cleared by process_state() by decoder task
48};
49
50AudioPipeline::AudioPipeline(speaker::Speaker *speaker, size_t buffer_size, bool task_stack_in_psram,
51 std::string base_name, UBaseType_t priority)
52 : base_name_(std::move(base_name)),
53 priority_(priority),
54 task_stack_in_psram_(task_stack_in_psram),
55 speaker_(speaker),
56 buffer_size_(buffer_size) {
58 this->transfer_buffer_size_ = std::min(buffer_size_ / 4, DEFAULT_TRANSFER_BUFFER_SIZE);
59}
60
61void AudioPipeline::start_url(const std::string &uri) {
62 if (this->is_playing_) {
63 xEventGroupSetBits(this->event_group_, PIPELINE_COMMAND_STOP);
64 }
65 this->current_uri_ = uri;
66 this->pending_url_ = true;
67}
68
70 if (this->is_playing_) {
71 xEventGroupSetBits(this->event_group_, PIPELINE_COMMAND_STOP);
72 }
73 this->current_audio_file_ = audio_file;
74 this->pending_file_ = true;
75}
76
78 xEventGroupSetBits(this->event_group_, EventGroupBits::PIPELINE_COMMAND_STOP);
79
80 return ESP_OK;
81}
82void AudioPipeline::set_pause_state(bool pause_state) {
83 this->speaker_->set_pause_state(pause_state);
84
85 this->pause_state_ = pause_state;
86}
87
89 if (this->read_task_.is_created()) {
90 vTaskSuspend(this->read_task_.get_handle());
91 }
92 if (this->decode_task_.is_created()) {
93 vTaskSuspend(this->decode_task_.get_handle());
94 }
95}
96
98 if (this->read_task_.is_created()) {
99 vTaskResume(this->read_task_.get_handle());
100 }
101 if (this->decode_task_.is_created()) {
102 vTaskResume(this->decode_task_.get_handle());
103 }
104}
105
107 /*
108 * Log items from info error queue
109 */
110 InfoErrorEvent event;
111 if (this->info_error_queue_ != nullptr) {
112 while (xQueueReceive(this->info_error_queue_, &event, 0)) {
113 switch (event.source) {
115 if (event.err.has_value()) {
116 ESP_LOGE(TAG, "Media reader encountered an error: %s", esp_err_to_name(event.err.value()));
117 } else if (event.file_type.has_value()) {
118 ESP_LOGD(TAG, "Reading %s file type", audio_file_type_to_string(event.file_type.value()));
119 }
120
121 break;
123 if (event.err.has_value()) {
124 ESP_LOGE(TAG, "Decoder encountered an error: %s", esp_err_to_name(event.err.value()));
125 }
126
127 if (event.audio_stream_info.has_value()) {
128 ESP_LOGD(TAG, "Decoded audio has %d channels, %" PRId32 " Hz sample rate, and %d bits per sample",
129 event.audio_stream_info.value().get_channels(), event.audio_stream_info.value().get_sample_rate(),
130 event.audio_stream_info.value().get_bits_per_sample());
131 }
132
133 if (event.decoding_err.has_value()) {
134 switch (event.decoding_err.value()) {
136 ESP_LOGE(TAG, "Failed to parse the file's header.");
137 break;
139 ESP_LOGE(TAG, "Incompatible bits per sample. Only 16 bits per sample is supported");
140 break;
142 ESP_LOGE(TAG, "Incompatible number of channels. Only 1 or 2 channel audio is supported.");
143 break;
144 }
145 }
146 break;
147 }
148 }
149 }
150
151 /*
152 * Determine the current state based on the event group bits and tasks' status
153 */
154
155 EventBits_t event_bits = xEventGroupGetBits(this->event_group_);
156
157 if (this->pending_url_ || this->pending_file_) {
158 // Init command pending
159 if (!(event_bits & EventGroupBits::PIPELINE_COMMAND_STOP)) {
160 // Only start if there is no pending stop command
161 if (!this->read_task_.is_created() || !this->decode_task_.is_created()) {
162 // At least one task isn't running
163 this->start_tasks_();
164 }
165
166 if (this->pending_url_) {
168 this->playback_ms_ = 0;
169 this->pending_url_ = false;
170 } else if (this->pending_file_) {
172 this->playback_ms_ = 0;
173 this->pending_file_ = false;
174 }
175
176 this->is_playing_ = true;
178 }
179 }
180
181 if ((event_bits & EventGroupBits::READER_MESSAGE_ERROR)) {
182 xEventGroupClearBits(this->event_group_, EventGroupBits::READER_MESSAGE_ERROR);
184 }
185
186 if ((event_bits & EventGroupBits::DECODER_MESSAGE_ERROR)) {
187 xEventGroupClearBits(this->event_group_, EventGroupBits::DECODER_MESSAGE_ERROR);
189 }
190
191 if ((event_bits & EventGroupBits::READER_MESSAGE_FINISHED) &&
194 // Tasks are finished and there's no media in between the reader and decoder
195
196 if (event_bits & EventGroupBits::PIPELINE_COMMAND_STOP) {
197 // Stop command is fully processed, so clear the command bit
198 xEventGroupClearBits(this->event_group_, EventGroupBits::PIPELINE_COMMAND_STOP);
199 this->hard_stop_ = true;
200 }
201
202 if (!this->is_playing_) {
203 // The tasks have been stopped for two ``process_state`` calls in a row, so delete the tasks
204 if (this->read_task_.is_created() || this->decode_task_.is_created()) {
205 this->read_task_.deallocate();
206 this->decode_task_.deallocate();
207 if (this->hard_stop_) {
208 // Stop command was sent, so immediately end the playback
209 this->speaker_->stop();
210 this->hard_stop_ = false;
211 } else {
212 // Decoded all the audio, so let the speaker finish playing before stopping
213 this->speaker_->finish();
214 }
215 }
216 }
217 this->is_playing_ = false;
218 if (!this->speaker_->is_running()) {
220 } else {
221 this->is_finishing_ = true;
222 }
223 }
224
225 if (this->pause_state_) {
227 }
228
229 if (this->is_finishing_) {
230 if (!this->speaker_->is_running()) {
231 this->is_finishing_ = false;
232 } else {
234 }
235 }
236
237 if (!this->read_task_.is_created() && !this->decode_task_.is_created()) {
238 // No tasks are running, so the pipeline is stopped.
239 xEventGroupClearBits(this->event_group_, EventGroupBits::PIPELINE_COMMAND_STOP);
241 }
242
243 this->is_playing_ = true;
245}
246
248 if (this->event_group_ == nullptr)
249 this->event_group_ = xEventGroupCreate();
250
251 if (this->event_group_ == nullptr) {
252 return ESP_ERR_NO_MEM;
253 }
254
255 if (this->info_error_queue_ == nullptr)
256 this->info_error_queue_ = xQueueCreate(INFO_ERROR_QUEUE_COUNT, sizeof(InfoErrorEvent));
257
258 if (this->info_error_queue_ == nullptr)
259 return ESP_ERR_NO_MEM;
260
261 return ESP_OK;
262}
263
265 if (!this->read_task_.is_created()) {
266 // Reader task uses the AudioReader class which uses esp_http_client. This crashes on IDF 5.4 if the task stack is
267 // in PSRAM. As a workaround, always allocate the read task in internal memory.
268 if (!this->read_task_.create(read_task, (this->base_name_ + "_read").c_str(), READ_TASK_STACK_SIZE, (void *) this,
269 this->priority_, false)) {
270 return ESP_ERR_NO_MEM;
271 }
272 }
273
274 if (!this->decode_task_.is_created()) {
275 if (!this->decode_task_.create(decode_task, (this->base_name_ + "_decode").c_str(), DECODE_TASK_STACK_SIZE,
276 (void *) this, this->priority_, this->task_stack_in_psram_)) {
277 return ESP_ERR_NO_MEM;
278 }
279 }
280
281 return ESP_OK;
282}
283
284void AudioPipeline::read_task(void *params) {
285 AudioPipeline *this_pipeline = (AudioPipeline *) params;
286
287 while (true) {
288 xEventGroupSetBits(this_pipeline->event_group_, EventGroupBits::READER_MESSAGE_FINISHED);
289
290 // Wait until the pipeline notifies us the source of the media file
291 EventBits_t event_bits = xEventGroupWaitBits(
292 this_pipeline->event_group_,
294 pdFALSE, // Clear the bit on exit
295 pdFALSE, // Wait for all the bits,
296 portMAX_DELAY); // Block indefinitely until bit is set
297
298 if (!(event_bits & EventGroupBits::PIPELINE_COMMAND_STOP)) {
299 xEventGroupClearBits(this_pipeline->event_group_, EventGroupBits::READER_MESSAGE_FINISHED |
302 InfoErrorEvent event;
304 esp_err_t err = ESP_OK;
305
306 std::unique_ptr<audio::AudioReader> reader =
307 make_unique<audio::AudioReader>(this_pipeline->transfer_buffer_size_);
308
310 err = reader->start(this_pipeline->current_audio_file_, this_pipeline->current_audio_file_type_);
311 } else {
312 err = reader->start(this_pipeline->current_uri_, this_pipeline->current_audio_file_type_);
313 }
314
315 if (err == ESP_OK) {
316 size_t file_ring_buffer_size = this_pipeline->buffer_size_;
317
318 std::shared_ptr<ring_buffer::RingBuffer> temp_ring_buffer;
319
320 if (!this_pipeline->raw_file_ring_buffer_.use_count()) {
321 temp_ring_buffer = ring_buffer::RingBuffer::create(file_ring_buffer_size);
322 this_pipeline->raw_file_ring_buffer_ = temp_ring_buffer;
323 }
324
325 if (!this_pipeline->raw_file_ring_buffer_.use_count()) {
326 err = ESP_ERR_NO_MEM;
327 } else {
328 reader->add_sink(this_pipeline->raw_file_ring_buffer_);
329 }
330 }
331
332 if (err != ESP_OK) {
333 // Send specific error message
334 event.err = err;
335 xQueueSend(this_pipeline->info_error_queue_, &event, portMAX_DELAY);
336
337 // Setting up the reader failed, stop the pipeline
338 xEventGroupSetBits(this_pipeline->event_group_,
340 } else {
341 // Send the file type to the pipeline
342 event.file_type = this_pipeline->current_audio_file_type_;
343 xQueueSend(this_pipeline->info_error_queue_, &event, portMAX_DELAY);
344 xEventGroupSetBits(this_pipeline->event_group_, EventGroupBits::READER_MESSAGE_LOADED_MEDIA_TYPE);
345 }
346
347 while (true) {
348 event_bits = xEventGroupGetBits(this_pipeline->event_group_);
349
350 if (event_bits & EventGroupBits::PIPELINE_COMMAND_STOP) {
351 break;
352 }
353
354 audio::AudioReaderState reader_state = reader->read();
355
356 if (reader_state == audio::AudioReaderState::FINISHED) {
357 break;
358 } else if (reader_state == audio::AudioReaderState::FAILED) {
359 xEventGroupSetBits(this_pipeline->event_group_,
361 break;
362 }
363 }
364 event_bits = xEventGroupGetBits(this_pipeline->event_group_);
366 (this_pipeline->raw_file_ring_buffer_.use_count() == 1)) {
367 // Decoder task hasn't started yet, so delay a bit before releasing ownership of the ring buffer
368 delay(10);
369 }
370 }
371 }
372}
373
374void AudioPipeline::decode_task(void *params) {
375 AudioPipeline *this_pipeline = (AudioPipeline *) params;
376
377 while (true) {
378 xEventGroupSetBits(this_pipeline->event_group_, EventGroupBits::DECODER_MESSAGE_FINISHED);
379
380 // Wait until the reader notifies us that the media type is available
381 EventBits_t event_bits =
382 xEventGroupWaitBits(this_pipeline->event_group_,
384 pdFALSE, // Clear the bit on exit
385 pdFALSE, // Wait for all the bits,
386 portMAX_DELAY); // Block indefinitely until bit is set
387
388 xEventGroupClearBits(this_pipeline->event_group_,
390
391 if (!(event_bits & EventGroupBits::PIPELINE_COMMAND_STOP)) {
392 InfoErrorEvent event;
394
395 std::unique_ptr<audio::AudioDecoder> decoder =
396 make_unique<audio::AudioDecoder>(this_pipeline->transfer_buffer_size_, this_pipeline->transfer_buffer_size_);
397
398 esp_err_t err = decoder->start(this_pipeline->current_audio_file_type_);
399 decoder->add_source(this_pipeline->raw_file_ring_buffer_);
400
401 if (err != ESP_OK) {
402 // Send specific error message
403 event.err = err;
404 xQueueSend(this_pipeline->info_error_queue_, &event, portMAX_DELAY);
405
406 // Setting up the decoder failed, stop the pipeline
407 xEventGroupSetBits(this_pipeline->event_group_,
409 }
410
411 bool has_stream_info = false;
412 bool started_playback = false;
413
414 size_t initial_bytes_to_buffer = 0;
415
416 while (true) {
417 event_bits = xEventGroupGetBits(this_pipeline->event_group_);
418
419 if (event_bits & EventGroupBits::PIPELINE_COMMAND_STOP) {
420 break;
421 }
422
423 // Update pause state
424 if (!started_playback) {
425 if (!(event_bits & EventGroupBits::READER_MESSAGE_FINISHED)) {
426 decoder->set_pause_output_state(true);
427 } else {
428 started_playback = true;
429 }
430 } else {
431 decoder->set_pause_output_state(this_pipeline->pause_state_);
432 }
433
434 // Stop gracefully if the reader has finished
435 audio::AudioDecoderState decoder_state = decoder->decode(event_bits & EventGroupBits::READER_MESSAGE_FINISHED);
436
437 if ((decoder_state == audio::AudioDecoderState::DECODING) ||
438 (decoder_state == audio::AudioDecoderState::FINISHED)) {
439 this_pipeline->playback_ms_ = decoder->get_playback_ms();
440 }
441
442 if (decoder_state == audio::AudioDecoderState::FINISHED) {
443 break;
444 } else if (decoder_state == audio::AudioDecoderState::FAILED) {
445 if (!has_stream_info) {
446 event.decoding_err = DecodingError::FAILED_HEADER;
447 xQueueSend(this_pipeline->info_error_queue_, &event, portMAX_DELAY);
448 }
449 xEventGroupSetBits(this_pipeline->event_group_,
451 break;
452 }
453
454 if (!has_stream_info && decoder->get_audio_stream_info().has_value()) {
455 has_stream_info = true;
456
457 this_pipeline->current_audio_stream_info_ = decoder->get_audio_stream_info().value();
458
459 // Send the stream information to the pipeline
460 event.audio_stream_info = this_pipeline->current_audio_stream_info_;
461
462 if (this_pipeline->current_audio_stream_info_.get_bits_per_sample() != 16) {
463 // Error state, incompatible bits per sample
465 xEventGroupSetBits(this_pipeline->event_group_,
467 } else if ((this_pipeline->current_audio_stream_info_.get_channels() > 2)) {
468 // Error state, incompatible number of channels
469 event.decoding_err = DecodingError::INCOMPATIBLE_CHANNELS;
470 xEventGroupSetBits(this_pipeline->event_group_,
472 } else {
473 // Send audio directly to the speaker
474 this_pipeline->speaker_->set_audio_stream_info(this_pipeline->current_audio_stream_info_);
475 decoder->add_sink(this_pipeline->speaker_);
476 }
477
478 initial_bytes_to_buffer = std::min(this_pipeline->current_audio_stream_info_.ms_to_bytes(INITIAL_BUFFER_MS),
479 this_pipeline->buffer_size_ * 3 / 4);
480
481 switch (this_pipeline->current_audio_file_type_) {
482#ifdef USE_AUDIO_MP3_SUPPORT
484 initial_bytes_to_buffer /= 8; // Estimate the MP3 compression factor is 8
485 break;
486#endif
487#ifdef USE_AUDIO_FLAC_SUPPORT
489 initial_bytes_to_buffer /= 2; // Estimate the FLAC compression factor is 2
490 break;
491#endif
492#ifdef USE_AUDIO_OPUS_SUPPORT
494 initial_bytes_to_buffer /= 8; // Estimate the Opus compression factor is 8
495 break;
496#endif
497 default:
498 break;
499 }
500 xQueueSend(this_pipeline->info_error_queue_, &event, portMAX_DELAY);
501 }
502
503 if (!started_playback && has_stream_info) {
504 // Verify enough data is available before starting playback
505 std::shared_ptr<ring_buffer::RingBuffer> temp_ring_buffer = this_pipeline->raw_file_ring_buffer_.lock();
506 if (temp_ring_buffer != nullptr && temp_ring_buffer->available() >= initial_bytes_to_buffer) {
507 started_playback = true;
508 }
509 }
510 }
511 }
512 }
513}
514
515} // namespace esphome::speaker
516
517#endif
bool create(TaskFunction_t fn, const char *name, uint32_t stack_size, void *param, UBaseType_t priority, bool use_psram)
Allocate stack and create task.
bool is_created() const
Check if the task has been created and not yet destroyed.
Definition static_task.h:18
TaskHandle_t get_handle() const
Get the FreeRTOS task handle.
Definition static_task.h:21
void deallocate()
Delete the task (if running) and free the stack buffer.
size_t ms_to_bytes(uint32_t ms) const
Converts duration to bytes.
Definition audio.h:73
uint8_t get_bits_per_sample() const
Definition audio.h:28
uint8_t get_channels() const
Definition audio.h:29
static std::unique_ptr< RingBuffer > create(size_t len, MemoryPreference preference=MemoryPreference::EXTERNAL_FIRST)
static void read_task(void *params)
void suspend_tasks()
Suspends any running tasks.
void set_pause_state(bool pause_state)
void start_url(const std::string &uri)
Starts an audio pipeline given a media url.
esp_err_t allocate_communications_()
Allocates the event group and info error queue.
esp_err_t start_tasks_()
Common start code for the pipeline, regardless if the source is a file or url.
audio::AudioStreamInfo current_audio_stream_info_
void start_file(audio::AudioFile *audio_file)
Starts an audio pipeline given a AudioFile pointer.
esp_err_t stop()
Stops the pipeline.
static void decode_task(void *params)
AudioPipeline(speaker::Speaker *speaker, size_t buffer_size, bool task_stack_in_psram, std::string base_name, UBaseType_t priority)
void resume_tasks()
Resumes any running tasks.
audio::AudioFile * current_audio_file_
audio::AudioFileType current_audio_file_type_
std::weak_ptr< ring_buffer::RingBuffer > raw_file_ring_buffer_
AudioPipelineState process_state()
Processes the state of the audio pipeline based on the info_error_queue_ and event_group_.
bool is_running() const
Definition speaker.h:65
virtual void set_pause_state(bool pause_state)
Definition speaker.h:60
void set_audio_stream_info(const audio::AudioStreamInfo &audio_stream_info)
Definition speaker.h:98
virtual void finish()
Definition speaker.h:57
virtual void stop()=0
uint8_t priority
void HOT delay(uint32_t ms)
Definition hal.cpp:85
static void uint32_t
optional< DecodingError > decoding_err
optional< audio::AudioFileType > file_type
optional< audio::AudioStreamInfo > audio_stream_info