Open Model Railroad Network (OpenMRN)
Loading...
Searching...
No Matches
Executor.cxx
Go to the documentation of this file.
1
36#define _DEFAULT_SOURCE
37
38#include "executor/Executor.hxx"
39
40#include "openmrn_features.h"
41#include <unistd.h>
42
43#ifdef __WINNT__
44#include <winsock2.h>
45#else
46#include <sys/select.h>
47#endif
48
49#ifdef __EMSCRIPTEN__
50#include <emscripten.h>
51#endif
52
53#ifdef ESP_NONOS
54extern "C" {
55#include <ets_sys.h>
56#include <osapi.h>
57#include <user_interface.h>
58}
59#endif
60
61#include "executor/Service.hxx"
62#include "nmranet_config.h"
63
64void __attribute__((weak,noinline)) Executable::test_deletion() {}
65
66Executable::~Executable() {
67 test_deletion();
68}
69
70
74 : name_(NULL)
75 , activeTimers_(this)
76 , done_(0)
77 , started_(0)
78 , selectPrescaler_(0)
79{
80 FD_ZERO(&selectRead_);
81 FD_ZERO(&selectWrite_);
82 FD_ZERO(&selectExcept_);
83 selectNFds_ = 0;
84}
85
90ExecutorBase *ExecutorBase::by_name(const char *name, bool wait)
91{
95 for (; /* forever */;)
96 {
97 {
98 AtomicHolder hld(head_mu());
100 while (current)
101 {
102 if (!strcmp(name, current->name_))
103 {
104 return current;
105 }
106 current = current->link_next();
107 }
108 }
109 if (wait)
110 {
111 sleep(1);
112 }
113 else
114 {
115 return NULL;
116 }
117 }
118}
119
128{
129public:
133 SyncExecutable(ExecutorBase *e, std::function<void()>&& fn)
134 : fn_(std::move(fn))
135 {
136 e->add(this);
138 }
139
141 {
142 fn_();
143 n_.notify();
144 }
146 std::function<void()> fn_;
149};
150
151void ExecutorBase::sync_run(std::function<void()> fn)
152{
154 {
155 // run inline.
156 fn();
157 }
158 else
159 {
160 // run externally and block.
161 SyncExecutable(this, std::move(fn));
162 }
163}
164
166{
167 ScopedSetThreadHandle h(this);
168 unsigned priority;
170 Executable* msg = next(&priority);
171 if (!msg)
172 {
173 return false;
174 }
175 if (msg == this)
176 {
177 // exit closure
178 done_ = 1;
179 return false;
180 }
181 current_ = msg;
182 msg->run();
183 current_ = nullptr;
184 return true;
185}
186
188 ScopedSetThreadHandle h(this);
189 for (int i = 12; i > 0; --i) {
190 Executable *msg = nullptr;
191 unsigned priority = UINT_MAX;
192 long long wait_length = activeTimers_.get_next_timeout();
193 if (empty()) {
194 return wait_length;
195 }
196 msg = next(&priority);
197 if (msg == this)
198 {
199 // exit closure
200 done_ = 1;
201 return INT64_MAX;
202 }
203 if (msg != NULL)
204 {
205 current_ = msg;
206 msg->run();
207 current_ = nullptr;
208 }
209 }
210 // Still stuff pending to run.
211 return 0;
212}
213
214#if defined(__EMSCRIPTEN__)
215
216void executor_loop_some(void* arg)
217{
218 ExecutorBase* b = static_cast<ExecutorBase*>(arg);
219 while (b->loop_once());
220}
221
223{
224 started_ = 1;
225 sequence_ = 0;
226 ExecutorBase* b = this;
228 emscripten_set_main_loop_arg(&executor_loop_some, b, 100, true);
229 return nullptr;
230}
231
232#elif defined(ESP_NONOS)
233
234#define EXECUTOR_TASK_PRIO USER_TASK_PRIO_0
235
236static os_event_t appl_task_queue[1];
237static os_timer_t appl_task_timer;
238static bool timer_pending = false;
239
240extern "C" {
241void ets_timer_setfn(os_timer_t *ptimer, os_timer_func_t *pfunction, void *);
242void ets_timer_arm_new(os_timer_t *, int, int, int);
243void ets_timer_disarm(os_timer_t *ptimer);
244} // extern C
245
246static void timer_fun(void* arg) {
247 timer_pending = false;
248 system_os_post(EXECUTOR_TASK_PRIO, 0, (uint32_t)arg);
249}
250
251extern void wakeup_executor(ExecutorBase* executor);
252
253void wakeup_executor(ExecutorBase* arg) {
254 system_os_post(EXECUTOR_TASK_PRIO, 0, (uint32_t)arg);
255}
256
257static void appl_task(os_event_t *e)
258{
259 ExecutorBase* eb = (ExecutorBase*)e->par;
260 long long sleep_time = eb->loop_some();
261 if (sleep_time == 0) {
262 system_os_post(EXECUTOR_TASK_PRIO, 0, e->par);
263 } else {
264 if (true || timer_pending) {
265 os_timer_disarm(&appl_task_timer);
266 }
267 os_timer_arm(&appl_task_timer, sleep_time / 1000000, false);
268 timer_pending = true;
269 }
270}
271
273{
274 started_ = 1;
275 os_timer_setfn(&appl_task_timer, &timer_fun, this);
276 system_os_task(appl_task, EXECUTOR_TASK_PRIO, appl_task_queue, 1);
277 system_os_post(EXECUTOR_TASK_PRIO, 0, (uint32_t)this);
278 return nullptr;
279}
280
281#elif OPENMRN_FEATURE_SINGLE_THREADED
282
284{
285 DIE("Arduino code should not start the executor.");
286 return nullptr;
287}
288
289#else
294{
295 started_ = 1;
296 sequence_ = 0;
298 /* wait for messages to process */
299 for (; /* forever */;)
300 {
301 Executable *msg = nullptr;
302 unsigned priority = UINT_MAX;
303 if (!selectPrescaler_ || ((msg = next(&priority)) == nullptr))
304 {
305 long long wait_length = activeTimers_.get_next_timeout();
306 wait_with_select(wait_length);
307 selectPrescaler_ = config_executor_select_prescaler();
308 msg = next(&priority);
309 }
310 else
311 {
313 }
314 if (msg == this)
315 {
316 // exit closure
317 done_ = 1;
318 return NULL;
319 }
320 if (msg != NULL)
321 {
322 ++sequence_;
323 current_ = msg;
324 msg->run();
325 current_ = nullptr;
326 }
327 }
328
329 return NULL;
330}
331
333{
334 fd_set *s = get_select_set(job->type());
335 int fd = job->fd_;
336 if (FD_ISSET(fd, s))
337 {
338 LOG(FATAL,
339 "Multiple Selectables are waiting for the same fd %d type %u", fd,
340 job->selectType_);
341 }
342 FD_SET(fd, s);
343 if (fd >= selectNFds_)
344 {
345 selectNFds_ = fd + 1;
346 }
347 HASSERT(!job->next);
348 // Inserts the job into the select queue.
350}
351
353{
354 fd_set *s = get_select_set(job->type());
355 int fd = job->fd_;
356 return FD_ISSET(fd, s);
357}
358
360{
361 fd_set *s = get_select_set(job->type());
362 int fd = job->fd_;
363 if (!FD_ISSET(fd, s))
364 {
365 LOG(FATAL, "Tried to remove a non-active selectable: fd %d type %u", fd,
366 job->selectType_);
367 }
368 FD_CLR((unsigned)fd, s);
369 auto it = selectables_.begin();
370 unsigned max_fd = 0;
371 while (it != selectables_.end())
372 {
373 if (&*it == job)
374 {
376 continue;
377 }
378 max_fd = std::max(max_fd, it->fd_ + 1U);
379 ++it;
380 }
381 selectNFds_ = max_fd;
382}
383
384void ExecutorBase::wait_with_select(long long wait_length)
385{
386 fd_set fd_r(selectRead_);
387 fd_set fd_w(selectWrite_);
388 fd_set fd_x(selectExcept_);
389 // We will check the queue for any prior wakeups after this call. If we
390 // already processed the executables, the wakeup is not necessary. Without
391 // this clear, there would always be two select() iterations happening when
392 // we are done with work and can go to sleep.
394 if (!empty())
395 {
396 wait_length = 0;
397 }
398 long long max_sleep = MSEC_TO_NSEC(config_executor_max_sleep_msec());
399 if (wait_length > max_sleep)
400 {
401 wait_length = max_sleep;
402 }
403 int ret = selectHelper_.select(selectNFds_, &fd_r, &fd_w, &fd_x, wait_length);
404 if (ret <= 0) {
405 return; // nothing to do
406 }
407 unsigned max_fd = 0;
408 for (auto it = selectables_.begin(); it != selectables_.end();) {
409 fd_set* s = nullptr;
410 fd_set* os = get_select_set(it->type());
411 switch(it->type()) {
412 case Selectable::READ: s = &fd_r; break;
413 case Selectable::WRITE: s = &fd_w; break;
414 case Selectable::EXCEPT: s = &fd_x; break;
415 }
416 if (FD_ISSET(it->fd_, s)) {
417 add(it->wakeup_, it->priority_);
418 FD_CLR(it->fd_, os);
420 continue;
421 }
422 max_fd = std::max(max_fd, it->fd_ + 1U);
423 ++it;
424 }
425 selectNFds_ = max_fd;
426}
427
428#endif
429
430#if defined(ARDUINO)
431// declare the function rather than include Arduino.h
432extern "C"
433{
434void delay(unsigned long);
435}
436#endif // ARDUINO
438{
439 if (!started_) return;
440 add(this);
441#if defined(__EMSCRIPTEN__)
442 emscripten_cancel_main_loop();
443 return;
444#endif
445 while (!done_)
446 {
447#if defined(ARDUINO)
448 delay(1);
449#else
450 usleep(100);
451#endif
452 }
453}
454
456{
457 if (!done_)
458 {
459 shutdown();
460 }
461}
long long get_next_timeout()
Tell when the first timer will expire.
Definition Timer.cxx:84
See OSMutexLock in os/OS.hxx.
Definition Atomic.hxx:153
An object that can be scheduled on an executor to run.
virtual void run()=0
Entry point.
This class implements an execution of tasks pulled off an input queue.
Definition Executor.hxx:64
Executable *volatile current_
Currently executing closure.
Definition Executor.hxx:217
void sync_run(std::function< void()> fn)
Synchronously runs a closure on this executor.
Definition Executor.cxx:151
void shutdown()
Terminates the executor thread.
Definition Executor.cxx:437
Executable * current()
Helper function for debugging and tracing.
Definition Executor.hxx:168
~ExecutorBase()
Destructor.
Definition Executor.cxx:455
virtual bool empty()=0
long long loop_some() ICACHE_FLASH_ATTR
Performs a few loops of the executor on the calling thread.
Definition Executor.cxx:187
void * entry() override
Thread entry point.
Definition Executor.cxx:293
unsigned selectPrescaler_
How many executables we schedule blindly before calling a select() in order to find more data to read...
Definition Executor.hxx:240
fd_set selectRead_
fd to select for read.
Definition Executor.hxx:223
fd_set selectExcept_
fd to select for except.
Definition Executor.hxx:227
void select(Selectable *job)
Adds a file descriptor to be watched to the select loop.
Definition Executor.cxx:332
bool is_selected(Selectable *job)
Definition Executor.cxx:352
volatile unsigned sequence_
Sequence number.
Definition Executor.hxx:244
ExecutorBase()
Constructor.
Definition Executor.cxx:73
static ExecutorBase * by_name(const char *name, bool wait)
Lookup an executor by its name.
Definition Executor.cxx:90
virtual void add(Executable *action, unsigned priority=UINT_MAX)=0
Send a message to this Executor's queue.
void wait_with_select(long long next_timer_nsec)
Executes a select call, and schedules any necessary executables based on the return.
Definition Executor.cxx:384
OSSelectWakeup selectHelper_
Helper object for interruptible select calls.
Definition Executor.hxx:179
fd_set selectWrite_
fd to select for write.
Definition Executor.hxx:225
bool loop_once()
Performs one loop of the execution on the calling thread.
Definition Executor.cxx:165
std::atomic_uint_least8_t started_
1 if the executor is already running
Definition Executor.hxx:237
void unselect(Selectable *job)
Removes a job from the select loop.
Definition Executor.cxx:359
TypedQueue< Selectable > selectables_
Head of the linked list for the select calls.
Definition Executor.hxx:231
ActiveTimers activeTimers_
List of active timers.
Definition Executor.hxx:220
fd_set * get_select_set(Selectable::SelectType type)
Helper function.
Definition Executor.hxx:201
std::atomic_uint_least8_t done_
Set to 1 when the executor thread has exited and it is safe to delete *this.
Definition Executor.hxx:235
int selectNFds_
maximum fd to select for + 1
Definition Executor.hxx:229
static Atomic * head_mu()
Locks the list for modification (at any entry!).
static ExecutorBase * head_
Beginning of the list.
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, long long deadline_nsec)
Portable call to a select that can be woken up asynchronously from a different thread or an ISR conte...
os_thread_t main_thread()
void clear_wakeup()
Called from the main thread after being woken up.
void lock_to_thread()
Prepares the current thread for asynchronous wakeups.
void unlock_from_thread()
Resets the thread handle to none.
Definition OS.hxx:152
QMember * next
pointer to the next member in the queue
Definition QMember.hxx:65
Handler structure that ExecutorBase knows about each entry to the select call.
SelectType type()
unsigned selectType_
What to watch the file for. See SelectType.
unsigned fd_
File descriptor to watch.
end_iterator end()
void erase(const iterator &position)
Removes the entry pointed to by the iterator.
An Executable that runs a callback on the executor and returns once the run is complete.
Definition Executor.cxx:128
void run() OVERRIDE
Entry point.
Definition Executor.cxx:140
SyncNotifiable n_
Blocks the calling thread until the callback is done running.
Definition Executor.cxx:148
std::function< void()> fn_
Callback to run.
Definition Executor.cxx:146
SyncExecutable(ExecutorBase *e, std::function< void()> &&fn)
Definition Executor.cxx:133
A Notifiable for synchronously waiting for a notification.
void notify() override
Implementation of notification receive.
void wait_for_notification()
Blocks the current thread until the notification is delivered.
iterator begin()
void push_front(T *entry)
Inserts an entry to the front of the queue.
#define LOG(level, message...)
Conditionally write a message to the logging output.
Definition logging.h:99
static const int FATAL
Loglevel that kills the current process.
Definition logging.h:51
#define OVERRIDE
Function attribute for virtual functions declaring that this funciton is overriding a funciton that s...
Definition macros.h:180
#define HASSERT(x)
Checks that the value of expression x is true, else terminates the current process.
Definition macros.h:138
#define DIE(MSG)
Unconditionally terminates the current process with a message.
Definition macros.h:143
#define ICACHE_FLASH_ATTR
Declares (on the ESP8266) that the current function is not executed too often and should be placed in...
Definition macros.h:222
#define MSEC_TO_NSEC(_msec)
Convert a millisecond value to a nanosecond value.
Definition os.h:268
OS_INLINE os_thread_t os_thread_self(void)
Return a handle to the calling thread.
Definition os.h:370
Helper class for using lock_to_thread.
Definition OS.hxx:159