Open Model Railroad Network (OpenMRN)
Loading...
Searching...
No Matches
Executor.hxx
Go to the documentation of this file.
1
36#ifndef _EXECUTOR_EXECUTOR_HXX_
37#define _EXECUTOR_EXECUTOR_HXX_
38
39#include <functional>
40#include <atomic>
41
45#include "executor/Timer.hxx"
46#include "utils/Queue.hxx"
47#include "utils/SimpleQueue.hxx"
49#include "utils/logging.h"
50#include "utils/macros.h"
51#include "os/OSSelectWakeup.hxx"
52
53#ifdef ESP_NONOS
54extern "C" {
55#include <ets_sys.h>
56}
57#endif
58
59class ActiveTimers;
60
63class ExecutorBase : protected OSThread, protected Executable, public LinkedObject<ExecutorBase>
64{
65public:
69
73
79 static ExecutorBase *by_name(const char *name, bool wait);
80
85 virtual void add(Executable *action, unsigned priority = UINT_MAX) = 0;
86
89 void sync_run(std::function<void()> fn);
90
91#if OPENMRN_FEATURE_RTOS_FROM_ISR
97 virtual void add_from_isr(Executable *action,
98 unsigned priority = UINT_MAX) = 0;
99#endif // OPENMRN_FEATURE_RTOS_FROM_ISR
100
109 void select(Selectable* job);
110
117 bool is_selected(Selectable* job);
118
128 void unselect(Selectable* job);
129
133 bool loop_once();
134
141 long long loop_some() ICACHE_FLASH_ATTR;
142
145
148 void shutdown();
149
152 virtual bool empty() = 0;
153
155 os_thread_t thread_handle() { return OSThread::get_handle(); }
156
157 OSThread& thread() { return *this; }
158
161
164 virtual uint32_t sequence() = 0;
165
169
170protected:
174 void *entry() override;
175
176 void run() override {}
177
180
181private:
186 virtual Executable *next(unsigned *priority) = 0;
187
193 void wait_with_select(long long next_timer_nsec);
194
202 {
203 switch (type)
204 {
205 case Selectable::READ: return &selectRead_;
206 case Selectable::WRITE: return &selectWrite_;
207 case Selectable::EXCEPT: return &selectExcept_;
208 }
209 LOG(FATAL, "Unexpected select type %d", type);
210 return nullptr;
211 }
212
214 const char *name_;
215
218
221
232
235 std::atomic_uint_least8_t done_;
237 std::atomic_uint_least8_t started_;
240 unsigned selectPrescaler_ : 5;
241
242protected:
244 volatile unsigned sequence_ : 25;
245
247 friend class Service;
248
250};
251
257public:
258 NO_THREAD() {}
259};
260
264template <unsigned NUM_PRIO>
265class Executor : public ExecutorBase
266{
267public:
268
274 Executor(const char *name, int priority, size_t stack_size)
275 {
276 start_thread(name, priority, stack_size);
277 }
278
284 explicit Executor(const NO_THREAD& unused) {}
285
293 void start_thread(const char *name, int priority, size_t stack_size)
294 {
295 OSThread::start(name, priority, stack_size);
296 }
297
301
306 void add(Executable *msg, unsigned priority = UINT_MAX) OVERRIDE
307 {
308 queue_.insert(
309 msg, priority >= NUM_PRIO ? NUM_PRIO - 1 : priority);
310#ifdef ESP_NONOS
311 extern void wakeup_executor(ExecutorBase* executor);
312 wakeup_executor(this);
313#else
315#endif
316 }
317
318#if OPENMRN_FEATURE_RTOS_FROM_ISR
324 void add_from_isr(Executable *msg, unsigned priority = UINT_MAX) override
325 {
326#ifdef ESP_PLATFORM
327 // On the ESP32 we need to call insert instead of insert_locked to
328 // ensure that all code paths lock the queue for consistency since
329 // this code path is not guaranteed to be protected by a critical
330 // section.
331 queue_.insert(
332 msg, priority >= NUM_PRIO ? NUM_PRIO - 1 : priority);
333#else
334 queue_.insert_locked(
335 msg, priority >= NUM_PRIO ? NUM_PRIO - 1 : priority);
336#endif // ESP_PLATFORM
337 selectHelper_.wakeup_from_isr();
338 }
339#endif // OPENMRN_FEATURE_RTOS_FROM_ISR
340
344 void thread_body() {
345 inherit();
346 }
347
351 {
352 return queue_.empty();
353 }
354
355 uint32_t sequence() OVERRIDE { return sequence_; }
356
357private:
362 Executable *next(unsigned *priority) OVERRIDE
363 {
364 auto result = queue_.next();
365 *priority = result.index;
366 return static_cast<Executable*>(result.item);
367 }
368
372
374
377};
378
382class ExecutorGuard : private ::Timer, public SyncNotifiable
383{
384public:
387 : ::Timer(e->active_timers())
388 , executor_(e)
389 {
390 // We wait on the front of the timer queue by expiring immediately.
391 start();
392 }
393
395 long long timeout() override {
396 if (executor_->empty()) {
398 return NONE;
399 } else {
400 return RESTART; // wait more on the front of the timer queue
401 }
402 }
403
404private:
407};
408
409template <unsigned NUM_PRIO>
412{
413 shutdown();
414}
415
416#endif /* _EXECUTOR_EXECUTOR_HXX_ */
Class that manages the list of active timers.
Definition Timer.hxx:49
An object that can be scheduled on an executor to run.
This class implements an execution of tasks pulled off an input queue.
Definition Executor.hxx:64
Executable *volatile current_
Currently executing closure.
Definition Executor.hxx:217
void sync_run(std::function< void()> fn)
Synchronously runs a closure on this executor.
Definition Executor.cxx:151
void shutdown()
Terminates the executor thread.
Definition Executor.cxx:437
Executable * current()
Helper function for debugging and tracing.
Definition Executor.hxx:168
~ExecutorBase()
Destructor.
Definition Executor.cxx:455
virtual bool empty()=0
long long loop_some() ICACHE_FLASH_ATTR
Performs a few loops of the executor on the calling thread.
Definition Executor.cxx:187
void * entry() override
Thread entry point.
Definition Executor.cxx:293
unsigned selectPrescaler_
How many executables we schedule blindly before calling a select() in order to find more data to read...
Definition Executor.hxx:240
fd_set selectRead_
fd to select for read.
Definition Executor.hxx:223
fd_set selectExcept_
fd to select for except.
Definition Executor.hxx:227
virtual Executable * next(unsigned *priority)=0
Retrieve an item from the front of the queue.
void select(Selectable *job)
Adds a file descriptor to be watched to the select loop.
Definition Executor.cxx:332
bool is_selected(Selectable *job)
Definition Executor.cxx:352
virtual uint32_t sequence()=0
volatile unsigned sequence_
Sequence number.
Definition Executor.hxx:244
const char * name_
name of this Executor
Definition Executor.hxx:214
ActiveTimers * active_timers()
Definition Executor.hxx:144
void run() override
Entry point.
Definition Executor.hxx:176
ExecutorBase()
Constructor.
Definition Executor.cxx:73
static ExecutorBase * by_name(const char *name, bool wait)
Lookup an executor by its name.
Definition Executor.cxx:90
virtual void add(Executable *action, unsigned priority=UINT_MAX)=0
Send a message to this Executor's queue.
void wait_with_select(long long next_timer_nsec)
Executes a select call, and schedules any necessary executables based on the return.
Definition Executor.cxx:384
OSSelectWakeup selectHelper_
Helper object for interruptible select calls.
Definition Executor.hxx:179
fd_set selectWrite_
fd to select for write.
Definition Executor.hxx:225
bool loop_once()
Performs one loop of the execution on the calling thread.
Definition Executor.cxx:165
void assert_current()
Die if we are not on the current executor.
Definition Executor.hxx:160
os_thread_t thread_handle()
Definition Executor.hxx:155
std::atomic_uint_least8_t started_
1 if the executor is already running
Definition Executor.hxx:237
void unselect(Selectable *job)
Removes a job from the select loop.
Definition Executor.cxx:359
TypedQueue< Selectable > selectables_
Head of the linked list for the select calls.
Definition Executor.hxx:231
ActiveTimers activeTimers_
List of active timers.
Definition Executor.hxx:220
fd_set * get_select_set(Selectable::SelectType type)
Helper function.
Definition Executor.hxx:201
std::atomic_uint_least8_t done_
Set to 1 when the executor thread has exited and it is safe to delete *this.
Definition Executor.hxx:235
int selectNFds_
maximum fd to select for + 1
Definition Executor.hxx:229
This class can be given an executor, and will notify itself when that executor is out of work.
Definition Executor.hxx:383
ExecutorBase * executor_
Parent.
Definition Executor.hxx:406
ExecutorGuard(ExecutorBase *e)
Constructor.
Definition Executor.hxx:386
long long timeout() override
Implementation of the guard functionality. Called on the executor.
Definition Executor.hxx:395
Implementation the ExecutorBase with a specific number of priority bands.
Definition Executor.hxx:266
Executor()
Default Constructor.
bool empty() OVERRIDE
Definition Executor.hxx:350
Executable * next(unsigned *priority) OVERRIDE
Retrieve an item from the front of the queue.
Definition Executor.hxx:362
~Executor()
Destructor.
Definition Executor.hxx:411
void add(Executable *msg, unsigned priority=UINT_MAX) OVERRIDE
Send a message to this Executor's queue.
Definition Executor.hxx:306
Executor(const NO_THREAD &unused)
Constructor that does not create a thread for running the executor.
Definition Executor.hxx:284
QListProtected< NUM_PRIO > queue_
Internal queue of executables waiting to be scheduled.
Definition Executor.hxx:376
void start_thread(const char *name, int priority, size_t stack_size)
Creates a new thread for running this executor.
Definition Executor.hxx:293
uint32_t sequence() OVERRIDE
Definition Executor.hxx:355
void thread_body()
If the executor was created with NO_THREAD, then this function needs to be called to run the executor...
Definition Executor.hxx:344
Executor(const char *name, int priority, size_t stack_size)
Constructor.
Definition Executor.hxx:274
Using this class as a base class will cause the given class to have all its instances linked up in a ...
This is an empty struct.
Definition Executor.hxx:256
Helper class that allows a select to be asynchronously woken up.
void wakeup()
Wakes up the select in the locked thread.
This class provides a threading API.
Definition OS.hxx:46
os_thread_t get_handle()
Definition OS.hxx:138
void inherit()
Inherits the current thread.
Definition OS.hxx:96
void start(const char *name, int priority, size_t stack_size)
Starts the thread.
Definition OS.hxx:78
A list of queues.
Definition Queue.hxx:466
QMember * next
pointer to the next member in the queue
Definition QMember.hxx:65
Handler structure that ExecutorBase knows about each entry to the select call.
SelectType
Which operation this file should be selected upon.
Collection of related state machines that pend on incoming messages.
A Notifiable for synchronously waiting for a notification.
void notify() override
Implementation of notification receive.
A timer that can schedule itself to run on an executor at specified times in the future.
Definition Timer.hxx:134
@ RESTART
Restart the timer with existing period.
Definition Timer.hxx:162
@ NONE
Do not restart the timer.
Definition Timer.hxx:161
void start(long long period=-1)
Starts a timer.
Definition Timer.hxx:185
A simple, fast, type-safe single-linked queue class with non-virtual methods.
#define LOG(level, message...)
Conditionally write a message to the logging output.
Definition logging.h:99
static const int FATAL
Loglevel that kills the current process.
Definition logging.h:51
#define OVERRIDE
Function attribute for virtual functions declaring that this funciton is overriding a funciton that s...
Definition macros.h:180
#define HASSERT(x)
Checks that the value of expression x is true, else terminates the current process.
Definition macros.h:138
#define DISALLOW_COPY_AND_ASSIGN(TypeName)
Removes default copy-constructor and assignment added by C++.
Definition macros.h:171
#define ICACHE_FLASH_ATTR
Declares (on the ESP8266) that the current function is not executed too often and should be placed in...
Definition macros.h:222
OS_INLINE os_thread_t os_thread_self(void)
Return a handle to the calling thread.
Definition os.h:370