15static const char *
const TAG =
"scheduler";
24static constexpr size_t MAX_POOL_SIZE = 5;
29static constexpr uint32_t MAX_LOGICALLY_DELETED_ITEMS = 5;
31static constexpr uint32_t HALF_MAX_UINT32 = std::numeric_limits<uint32_t>::max() / 2;
33static constexpr uint32_t MAX_INTERVAL_DELAY = 5000;
38#ifdef ESPHOME_DEBUG_SCHEDULER
40static void validate_static_string(
const char *name) {
46 uintptr_t addr =
reinterpret_cast<uintptr_t
>(name);
50 uintptr_t stack_addr =
reinterpret_cast<uintptr_t
>(&stack_var);
54 if (addr > (stack_addr - 0x2000) && addr < (stack_addr + 0x2000)) {
56 "WARNING: Scheduler name '%s' at %p appears to be on the stack - this is unsafe!\n"
57 " Stack reference at %p",
58 name, name, &stack_var);
63 static const char *static_str =
"test";
64 uintptr_t static_addr =
reinterpret_cast<uintptr_t
>(static_str);
67 if (addr > static_addr + 0x100000 || (static_addr > 0x100000 && addr < static_addr - 0x100000)) {
68 ESP_LOGW(TAG,
"WARNING: Scheduler name '%s' at %p might be on heap (static ref at %p)", name, name, static_str);
79void HOT Scheduler::set_timer_common_(Component *
component, SchedulerItem::Type
type,
bool is_static_string,
80 const void *name_ptr, uint32_t
delay, std::function<
void()> func,
bool is_retry,
83 const char *name_cstr = this->get_name_cstr_(is_static_string, name_ptr);
85 if (
delay == SCHEDULER_DONT_RUN) {
88 LockGuard guard{this->lock_};
95 const uint64_t now = this->millis_64_(
millis());
98 LockGuard guard{this->lock_};
101 std::unique_ptr<SchedulerItem> item;
102 if (!this->scheduler_item_pool_.empty()) {
104 item = std::move(this->scheduler_item_pool_.back());
105 this->scheduler_item_pool_.pop_back();
106#ifdef ESPHOME_DEBUG_SCHEDULER
107 ESP_LOGD(TAG,
"Reused item from pool (pool size now: %zu)", this->scheduler_item_pool_.size());
111 item = make_unique<SchedulerItem>();
112#ifdef ESPHOME_DEBUG_SCHEDULER
113 ESP_LOGD(TAG,
"Allocated new item (pool empty)");
117 item->set_name(name_cstr, !is_static_string);
119 item->callback = std::move(func);
121 this->set_item_removed_(item.get(),
false);
122 item->is_retry = is_retry;
124#ifndef ESPHOME_THREAD_SINGLE
127 if (
delay == 0 &&
type == SchedulerItem::TIMEOUT) {
132 this->defer_queue_.push_back(std::move(item));
138 if (
type == SchedulerItem::INTERVAL) {
139 item->interval =
delay;
143 item->set_next_execution(now + offset);
144 ESP_LOGV(TAG,
"Scheduler interval for %s is %" PRIu32
"ms, offset %" PRIu32
"ms", name_cstr ? name_cstr :
"",
delay,
148 item->set_next_execution(now +
delay);
151#ifdef ESPHOME_DEBUG_SCHEDULER
152 this->debug_log_timer_(item.get(), is_static_string, name_cstr,
type,
delay, now);
156 if (is_retry && name_cstr !=
nullptr &&
type == SchedulerItem::TIMEOUT &&
157 (has_cancelled_timeout_in_container_locked_(this->items_,
component, name_cstr,
true) ||
158 has_cancelled_timeout_in_container_locked_(this->to_add_,
component, name_cstr,
true))) {
160#ifdef ESPHOME_DEBUG_SCHEDULER
161 ESP_LOGD(TAG,
"Skipping retry '%s' - found cancelled item", name_cstr);
173 this->to_add_.push_back(std::move(item));
176void HOT Scheduler::set_timeout(Component *
component,
const char *name, uint32_t timeout, std::function<
void()> func) {
177 this->set_timer_common_(
component, SchedulerItem::TIMEOUT,
true, name, timeout, std::move(func));
180void HOT Scheduler::set_timeout(Component *
component,
const std::string &name, uint32_t timeout,
181 std::function<
void()> func) {
182 this->set_timer_common_(
component, SchedulerItem::TIMEOUT,
false, &name, timeout, std::move(func));
184bool HOT Scheduler::cancel_timeout(Component *
component,
const std::string &name) {
185 return this->cancel_item_(
component,
false, &name, SchedulerItem::TIMEOUT);
187bool HOT Scheduler::cancel_timeout(Component *
component,
const char *name) {
188 return this->cancel_item_(
component,
true, name, SchedulerItem::TIMEOUT);
190void HOT Scheduler::set_interval(Component *
component,
const std::string &name, uint32_t interval,
191 std::function<
void()> func) {
192 this->set_timer_common_(
component, SchedulerItem::INTERVAL,
false, &name, interval, std::move(func));
195void HOT Scheduler::set_interval(Component *
component,
const char *name, uint32_t interval,
196 std::function<
void()> func) {
197 this->set_timer_common_(
component, SchedulerItem::INTERVAL,
true, name, interval, std::move(func));
199bool HOT Scheduler::cancel_interval(Component *
component,
const std::string &name) {
200 return this->cancel_item_(
component,
false, &name, SchedulerItem::INTERVAL);
202bool HOT Scheduler::cancel_interval(Component *
component,
const char *name) {
203 return this->cancel_item_(
component,
true, name, SchedulerItem::INTERVAL);
210 Scheduler *scheduler;
213 float backoff_increase_factor;
214 uint8_t retry_countdown;
215 bool name_is_dynamic;
218 if (this->name_is_dynamic && this->name) {
225 RetryResult const retry_result = args->func(--args->retry_countdown);
231 args->scheduler->set_timer_common_(
232 args->component, Scheduler::SchedulerItem::TIMEOUT,
true, args->name, args->current_interval,
233 [args]() { retry_handler(args); },
true);
235 args->current_interval *= args->backoff_increase_factor;
238void HOT Scheduler::set_retry_common_(Component *
component,
bool is_static_string,
const void *name_ptr,
239 uint32_t initial_wait_time, uint8_t max_attempts,
240 std::function<
RetryResult(uint8_t)> func,
float backoff_increase_factor) {
241 const char *name_cstr = this->get_name_cstr_(is_static_string, name_ptr);
243 if (name_cstr !=
nullptr)
244 this->cancel_retry(
component, name_cstr);
246 if (initial_wait_time == SCHEDULER_DONT_RUN)
249 ESP_LOGVV(TAG,
"set_retry(name='%s', initial_wait_time=%" PRIu32
", max_attempts=%u, backoff_factor=%0.1f)",
250 name_cstr ? name_cstr :
"", initial_wait_time, max_attempts, backoff_increase_factor);
252 if (backoff_increase_factor < 0.0001) {
253 ESP_LOGE(TAG,
"backoff_factor %0.1f too small, using 1.0: %s", backoff_increase_factor, name_cstr ? name_cstr :
"");
254 backoff_increase_factor = 1;
257 auto args = std::make_shared<RetryArgs>();
258 args->func = std::move(func);
260 args->scheduler =
this;
261 args->current_interval = initial_wait_time;
262 args->backoff_increase_factor = backoff_increase_factor;
263 args->retry_countdown = max_attempts;
266 if (name_cstr ==
nullptr || name_cstr[0] ==
'\0') {
269 args->name_is_dynamic =
false;
270 }
else if (is_static_string) {
272 args->name = name_cstr;
273 args->name_is_dynamic =
false;
276 size_t len = strlen(name_cstr);
277 char *copy =
new char[
len + 1];
278 memcpy(copy, name_cstr,
len + 1);
280 args->name_is_dynamic =
true;
286 this->set_timer_common_(
287 component, SchedulerItem::TIMEOUT,
true, args->name, 0, [args]() { retry_handler(args); },
291void HOT Scheduler::set_retry(Component *
component,
const std::string &name, uint32_t initial_wait_time,
292 uint8_t max_attempts, std::function<
RetryResult(uint8_t)> func,
293 float backoff_increase_factor) {
294 this->set_retry_common_(
component,
false, &name, initial_wait_time, max_attempts, std::move(func),
295 backoff_increase_factor);
298void HOT Scheduler::set_retry(Component *
component,
const char *name, uint32_t initial_wait_time, uint8_t max_attempts,
299 std::function<
RetryResult(uint8_t)> func,
float backoff_increase_factor) {
300 this->set_retry_common_(
component,
true, name, initial_wait_time, max_attempts, std::move(func),
301 backoff_increase_factor);
303bool HOT Scheduler::cancel_retry(Component *
component,
const std::string &name) {
304 return this->cancel_retry(
component, name.c_str());
307bool HOT Scheduler::cancel_retry(Component *
component,
const char *name) {
309 LockGuard guard{this->lock_};
310 return this->cancel_item_locked_(
component, name, SchedulerItem::TIMEOUT,
true);
313optional<uint32_t> HOT Scheduler::next_schedule_in(uint32_t now) {
319 if (this->cleanup_() == 0)
322 auto &item = this->items_[0];
324 const auto now_64 = this->millis_64_(now);
325 const uint64_t next_exec = item->get_next_execution();
326 if (next_exec < now_64)
328 return next_exec - now_64;
331void Scheduler::full_cleanup_removed_items_() {
337 LockGuard guard{this->lock_};
339 std::vector<std::unique_ptr<SchedulerItem>> valid_items;
342 for (
auto &item : this->items_) {
343 if (!is_item_removed_(item.get())) {
344 valid_items.push_back(std::move(item));
347 this->recycle_item_main_loop_(std::move(item));
352 this->items_ = std::move(valid_items);
354 std::make_heap(this->items_.begin(), this->items_.end(), SchedulerItem::cmp);
355 this->to_remove_ = 0;
358void HOT Scheduler::call(uint32_t now) {
359#ifndef ESPHOME_THREAD_SINGLE
360 this->process_defer_queue_(now);
364 const auto now_64 = this->millis_64_(now);
365 this->process_to_add();
368 bool has_added_items =
false;
370#ifdef ESPHOME_DEBUG_SCHEDULER
371 static uint64_t last_print = 0;
373 if (now_64 - last_print > 2000) {
375 std::vector<std::unique_ptr<SchedulerItem>> old_items;
376#ifdef ESPHOME_THREAD_MULTI_ATOMICS
377 const auto last_dbg = this->last_millis_.load(std::memory_order_relaxed);
378 const auto major_dbg = this->millis_major_.load(std::memory_order_relaxed);
379 ESP_LOGD(TAG,
"Items: count=%zu, pool=%zu, now=%" PRIu64
" (%" PRIu16
", %" PRIu32
")", this->items_.size(),
380 this->scheduler_item_pool_.size(), now_64, major_dbg, last_dbg);
382 ESP_LOGD(TAG,
"Items: count=%zu, pool=%zu, now=%" PRIu64
" (%" PRIu16
", %" PRIu32
")", this->items_.size(),
383 this->scheduler_item_pool_.size(), now_64, this->millis_major_, this->last_millis_);
387 while (!this->items_.empty()) {
388 std::unique_ptr<SchedulerItem> item;
390 LockGuard guard{this->lock_};
391 item = this->pop_raw_locked_();
394 const char *name = item->get_name();
395 bool is_cancelled = is_item_removed_(item.get());
396 ESP_LOGD(TAG,
" %s '%s/%s' interval=%" PRIu32
" next_execution in %" PRIu64
"ms at %" PRIu64
"%s",
397 item->get_type_str(), LOG_STR_ARG(item->get_source()), name ? name :
"(null)", item->interval,
398 item->get_next_execution() - now_64, item->get_next_execution(), is_cancelled ?
" [CANCELLED]" :
"");
400 old_items.push_back(std::move(item));
405 LockGuard guard{this->lock_};
406 this->items_ = std::move(old_items);
408 std::make_heap(this->items_.begin(), this->items_.end(), SchedulerItem::cmp);
419 if (this->to_remove_ >= MAX_LOGICALLY_DELETED_ITEMS) {
420 this->full_cleanup_removed_items_();
422 while (!this->items_.empty()) {
424 auto &item = this->items_[0];
425 if (item->get_next_execution() > now_64) {
430 if (item->component !=
nullptr && item->component->is_failed()) {
431 LockGuard guard{this->lock_};
432 this->recycle_item_main_loop_(this->pop_raw_locked_());
440#ifdef ESPHOME_THREAD_MULTI_NO_ATOMICS
443 LockGuard guard{this->lock_};
444 if (is_item_removed_(item.get())) {
445 this->recycle_item_main_loop_(this->pop_raw_locked_());
452 if (is_item_removed_(item.get())) {
453 LockGuard guard{this->lock_};
454 this->recycle_item_main_loop_(this->pop_raw_locked_());
460#ifdef ESPHOME_DEBUG_SCHEDULER
461 const char *item_name = item->get_name();
462 ESP_LOGV(TAG,
"Running %s '%s/%s' with interval=%" PRIu32
" next_execution=%" PRIu64
" (now=%" PRIu64
")",
463 item->get_type_str(), LOG_STR_ARG(item->get_source()), item_name ? item_name :
"(null)", item->interval,
464 item->get_next_execution(), now_64);
470 now = this->execute_item_(item.get(), now);
472 LockGuard guard{this->lock_};
476 auto executed_item = this->pop_raw_locked_();
478 if (executed_item->remove) {
481 this->recycle_item_main_loop_(std::move(executed_item));
485 if (executed_item->type == SchedulerItem::INTERVAL) {
486 executed_item->set_next_execution(now_64 + executed_item->interval);
489 this->to_add_.push_back(std::move(executed_item));
492 this->recycle_item_main_loop_(std::move(executed_item));
495 has_added_items |= !this->to_add_.empty();
498 if (has_added_items) {
499 this->process_to_add();
502void HOT Scheduler::process_to_add() {
503 LockGuard guard{this->lock_};
504 for (
auto &it : this->to_add_) {
505 if (is_item_removed_(it.get())) {
507 this->recycle_item_main_loop_(std::move(it));
511 this->items_.push_back(std::move(it));
512 std::push_heap(this->items_.begin(), this->items_.end(), SchedulerItem::cmp);
514 this->to_add_.clear();
516size_t HOT Scheduler::cleanup_() {
524 if (this->to_remove_ == 0)
525 return this->items_.size();
535 LockGuard guard{this->lock_};
536 while (!this->items_.empty()) {
537 auto &item = this->items_[0];
541 this->recycle_item_main_loop_(this->pop_raw_locked_());
543 return this->items_.size();
545std::unique_ptr<Scheduler::SchedulerItem> HOT Scheduler::pop_raw_locked_() {
546 std::pop_heap(this->items_.begin(), this->items_.end(), SchedulerItem::cmp);
549 auto item = std::move(this->items_.back());
551 this->items_.pop_back();
556uint32_t HOT Scheduler::execute_item_(SchedulerItem *item, uint32_t now) {
558 WarnIfComponentBlockingGuard guard{item->component, now};
560 return guard.finish();
564bool HOT Scheduler::cancel_item_(Component *
component,
bool is_static_string,
const void *name_ptr,
565 SchedulerItem::Type
type) {
567 const char *name_cstr = this->get_name_cstr_(is_static_string, name_ptr);
570 LockGuard guard{this->lock_};
571 return this->cancel_item_locked_(
component, name_cstr,
type);
575bool HOT Scheduler::cancel_item_locked_(Component *
component,
const char *name_cstr, SchedulerItem::Type
type,
578 if (name_cstr ==
nullptr) {
582 size_t total_cancelled = 0;
585#ifndef ESPHOME_THREAD_SINGLE
587 if (
type == SchedulerItem::TIMEOUT) {
589 this->mark_matching_items_removed_locked_(this->defer_queue_,
component, name_cstr,
type, match_retry);
598 if (!this->items_.empty()) {
599 size_t heap_cancelled =
600 this->mark_matching_items_removed_locked_(this->items_,
component, name_cstr,
type, match_retry);
601 total_cancelled += heap_cancelled;
602 this->to_remove_ += heap_cancelled;
606 total_cancelled += this->mark_matching_items_removed_locked_(this->to_add_,
component, name_cstr,
type, match_retry);
608 return total_cancelled > 0;
611uint64_t Scheduler::millis_64_(uint32_t now) {
625#ifdef ESPHOME_THREAD_SINGLE
631 uint16_t major = this->millis_major_;
635 if (now < last && (last - now) > HALF_MAX_UINT32) {
636 this->millis_major_++;
638 this->last_millis_ = now;
639#ifdef ESPHOME_DEBUG_SCHEDULER
640 ESP_LOGD(TAG,
"Detected true 32-bit rollover at %" PRIu32
"ms (was %" PRIu32
")", now, last);
642 }
else if (now > last) {
644 this->last_millis_ = now;
648 return now + (
static_cast<uint64_t
>(major) << 32);
650#elif defined(ESPHOME_THREAD_MULTI_NO_ATOMICS)
658 uint16_t major = this->millis_major_;
663 static const uint32_t ROLLOVER_WINDOW = 10000;
666 bool near_rollover = (last > (std::numeric_limits<uint32_t>::max() - ROLLOVER_WINDOW)) || (now < ROLLOVER_WINDOW);
668 if (near_rollover || (now < last && (last - now) > HALF_MAX_UINT32)) {
670 LockGuard guard{this->lock_};
672 last = this->last_millis_;
674 if (now < last && (last - now) > HALF_MAX_UINT32) {
676 this->millis_major_++;
678#ifdef ESPHOME_DEBUG_SCHEDULER
679 ESP_LOGD(TAG,
"Detected true 32-bit rollover at %" PRIu32
"ms (was %" PRIu32
")", now, last);
683 this->last_millis_ = now;
684 }
else if (now > last) {
691 this->last_millis_ = now;
697 return now + (
static_cast<uint64_t
>(major) << 32);
699#elif defined(ESPHOME_THREAD_MULTI_ATOMICS)
710 uint16_t major = this->millis_major_.load(std::memory_order_acquire);
718 uint32_t last = this->last_millis_.load(std::memory_order_acquire);
722 if (now < last && (last - now) > HALF_MAX_UINT32) {
724 LockGuard guard{this->lock_};
726 last = this->last_millis_.load(std::memory_order_relaxed);
728 if (now < last && (last - now) > HALF_MAX_UINT32) {
730 this->millis_major_.fetch_add(1, std::memory_order_relaxed);
732#ifdef ESPHOME_DEBUG_SCHEDULER
733 ESP_LOGD(TAG,
"Detected true 32-bit rollover at %" PRIu32
"ms (was %" PRIu32
")", now, last);
741 this->last_millis_.store(now, std::memory_order_release);
745 while (now > last && (now - last) < HALF_MAX_UINT32) {
746 if (this->last_millis_.compare_exchange_weak(last, now,
747 std::memory_order_release,
748 std::memory_order_relaxed)) {
755 uint16_t major_end = this->millis_major_.load(std::memory_order_relaxed);
756 if (major_end == major)
757 return now + (
static_cast<uint64_t
>(major) << 32);
760 __builtin_unreachable();
764 "No platform threading model defined. One of ESPHOME_THREAD_SINGLE, ESPHOME_THREAD_MULTI_NO_ATOMICS, or ESPHOME_THREAD_MULTI_ATOMICS must be defined."
768bool HOT Scheduler::SchedulerItem::cmp(
const std::unique_ptr<SchedulerItem> &a,
769 const std::unique_ptr<SchedulerItem> &b) {
772 return (a->next_execution_high_ == b->next_execution_high_) ? (a->next_execution_low_ > b->next_execution_low_)
773 : (a->next_execution_high_ > b->next_execution_high_);
780void Scheduler::recycle_item_main_loop_(std::unique_ptr<SchedulerItem> item) {
784 if (this->scheduler_item_pool_.size() < MAX_POOL_SIZE) {
786 item->callback =
nullptr;
788 item->clear_dynamic_name();
789 this->scheduler_item_pool_.push_back(std::move(item));
790#ifdef ESPHOME_DEBUG_SCHEDULER
791 ESP_LOGD(TAG,
"Recycled item to pool (pool size now: %zu)", this->scheduler_item_pool_.size());
794#ifdef ESPHOME_DEBUG_SCHEDULER
795 ESP_LOGD(TAG,
"Pool full (size: %zu), deleting item", this->scheduler_item_pool_.size());
801#ifdef ESPHOME_DEBUG_SCHEDULER
802void Scheduler::debug_log_timer_(
const SchedulerItem *item,
bool is_static_string,
const char *name_cstr,
803 SchedulerItem::Type
type, uint32_t
delay, uint64_t now) {
805 if (is_static_string && name_cstr !=
nullptr) {
806 validate_static_string(name_cstr);
810 const char *type_str = (
type == SchedulerItem::TIMEOUT) ?
"timeout" :
"interval";
811 if (
type == SchedulerItem::TIMEOUT) {
812 ESP_LOGD(TAG,
"set_%s(name='%s/%s', %s=%" PRIu32
")", type_str, LOG_STR_ARG(item->get_source()),
813 name_cstr ? name_cstr :
"(null)", type_str,
delay);
815 ESP_LOGD(TAG,
"set_%s(name='%s/%s', %s=%" PRIu32
", offset=%" PRIu32
")", type_str, LOG_STR_ARG(item->get_source()),
816 name_cstr ? name_cstr :
"(null)", type_str,
delay,
817 static_cast<uint32_t>(item->get_next_execution() - now));
void set_current_component(Component *component)
const Component * component
Providing packet encoding functions for exchanging data with a remote host.
float random_float()
Return a random float between 0 and 1.
void retry_handler(const std::shared_ptr< RetryArgs > &args)
void IRAM_ATTR HOT delay(uint32_t ms)
uint32_t IRAM_ATTR HOT millis()
Application App
Global storage of Application pointer - only one Application can exist.