Open Model Railroad Network (OpenMRN)
Loading...
Searching...
No Matches
StateFlow.hxx
Go to the documentation of this file.
1
35#ifndef _EXECUTOR_STATEFLOW_HXX_
36#define _EXECUTOR_STATEFLOW_HXX_
37
38#include <unistd.h>
39#include <type_traits>
40#include <functional>
41#include <sys/stat.h>
42
43#include "executor/Service.hxx"
44#include "executor/Timer.hxx"
45#include "utils/Buffer.hxx"
46#include "utils/Queue.hxx"
48
61#define STATE(_fn) \
62 (StateFlowBase::Callback)( \
63 &std::remove_reference<decltype(*this)>::type::_fn)
64
68#define STATE_FLOW_STATE(_state) Action _state()
69
75#define STATE_FLOW_START(_name, _message, _priorities) \
76 class _name : public StateFlow<_message, _priorities> \
77 { \
78 public: \
79 _name(Service *service) : StateFlow<_priorities, _priorities>(service) \
80 { \
81 } \
82 \
83 private: \
84 STATE_FLOW_STATE(entry); \
85 \
86 _name(); \
87 \
88 DISALLOW_COPY_AND_ASSIGN(_name)
89
94#define STATE_FLOW_START_WITH_TIMER(_name, _priorities) \
95 class _name : public StateFlow<_priorities> \
96 { \
97 public: \
98 _name(Service *service) \
99 : StateFlow<_priorities>(service) \
100 , timer(TIMEOUT_FROM(service, state_flow_timeout), service, this) \
101 , timerMsg(NULL) \
102 { \
103 } \
104 \
105 ~_name() \
106 { \
107 } \
108 \
109 void timeout() \
110 { \
111 timerMsg ? me()->send(timerMsg) :; \
112 timerMsg = NULL; \
113 } \
114 \
115 void trigger() \
116 { \
117 timer.trigger(); \
118 } \
119 \
120 private: \
121 STATE_FLOW_STATE(entry); \
122 \
123 Action timeout_and_call(Callback c, Message *msg, long long period) \
124 { \
125 msg->id(msg->id() | Message::IN_PROCESS_MSK); \
126 timerMsg = msg; \
127 timer.start(period); \
128 return Action(c); \
129 } \
131 Timer timer; \
132 Message *timerMsg; \
133 \
134 bool early() \
135 { \
136 return timer.early(); \
137 } \
138 \
139 _name(); \
140 \
141 DISALLOW_COPY_AND_ASSIGN(_name)
142
145#define STATE_FLOW_END() }
146
147template <class T> class FlowInterface;
148
149class StateFlowTimer;
150
171{
172public:
176 void run() override;
177
180 void notify() override;
181
182#if OPENMRN_FEATURE_RTOS_FROM_ISR
185 virtual void notify_from_isr() OVERRIDE;
186#endif // OPENMRN_FEATURE_RTOS_FROM_ISR
187
192 {
193 return service_;
194 }
195
196protected:
206
210 {
211 }
212
213 /* forward prototype */
214 class Action;
215
219
222 class Action
223 {
224 public:
230
234 {
235 return nextState_;
236 }
237
238 private:
241 };
242
248 {
249 state_ = c;
250 }
251
254 {
255 return state_ == c;
256 }
257
260 {
261 return is_state(STATE(terminated));
262 }
263
268 {
271 }
272
273 /*========== ACTION COMMANDS ===============*/
274 /* StateFlow implementations will have to use one of the following commands
275 * to return from a state handler to indicate what action to take. */
276
281 {
282 return Action(state_);
283 }
284
291 {
292 return STATE(terminated);
293 }
294
299 {
301 delete this;
302 // Ensures that Run() does not touch the class member variables
303 // anymore.
304 return wait();
305 }
306
311 return wait();
312 }
313
319 {
320 return Action(c);
321 }
322
328 {
329 return Action(nullptr);
330 }
331
337 {
338 state_ = c;
339 return wait();
340 }
341
350 template <class T>
352 Pool *pool = nullptr)
353 {
354 allocationResult_ = nullptr;
355 Pool *p = pool;
356 if (!p)
357 {
358 HASSERT(target_flow != nullptr);
359 p = target_flow->pool();
360 }
361 LOG(VERBOSE, "allocate from pool %p, main pool %p", p, mainBufferPool);
362 p->alloc_async<T>(this);
363 return wait_and_call(c);
364 }
365
373 {
374 allocationResult_ = nullptr;
375 queue->next_async(this);
376 return wait_and_call(c);
377 }
378
385 template <class T>
387 {
388 Buffer<T> *result = static_cast<Buffer<T> *>(allocationResult_);
389 return result;
390 }
391
398 template <class T>
400 {
401 T *result = static_cast<T*>(allocationResult_);
402 return result;
403 }
404
410 template <class T>
411 void cast_allocation_result(T** member)
412 {
413 *member = static_cast<T*>(allocationResult_);
414 }
415
420 template <class T>
421 inline Buffer<T> *
423
431 {
432 state_ = c;
433 notify();
434 return wait();
435 }
436
443 {
444 notify();
445 return wait();
446 }
447
462 class StateFlowTimer : public ::Timer
463 {
464 public:
467 : Timer(parent->service()->executor()->active_timers())
468 , parent_(parent)
469 {
470 }
471
472 long long timeout() override
473 {
474 parent_->notify();
475 return NONE;
476 }
477
478 protected:
481 };
482
493 Action sleep_and_call(::Timer *timer, long long timeout_nsec, Callback c)
494 {
495 timer->start(timeout_nsec);
496 return wait_and_call(c);
497 }
498
520 template <class T, typename... Args>
522 FlowInterface<Buffer<T>> *target_flow, Callback c, Args &&... args)
523 {
524 Buffer<T> *b;
526 b->data()->reset(std::forward<Args>(args)...);
527 b->data()->done.reset(this);
528 allocationResult_ = b->ref();
529 target_flow->send(b);
530 return wait_and_call(c);
531 }
532
533public:
547 template <class T, typename... Args>
549 FlowInterface<Buffer<T>> *target_flow, Args &&... args)
550 {
551 Buffer<T> *b;
553 b->data()->reset(std::forward<Args>(args)...);
554 b->data()->done.reset(EmptyNotifiable::DefaultInstance());
555 target_flow->send(b);
556 }
557
558protected:
559 struct StateFlowSelectHelper;
560 struct StateFlowTimedSelectHelper;
561
570 Action read_repeated(StateFlowSelectHelper* helper, int fd, void* buf, size_t size, Callback c, unsigned priority = Selectable::MAX_PRIO) {
571 helper->reset(Selectable::READ, fd, priority);
572 helper->set_wakeup(this);
573 helper->rbuf_ = static_cast<uint8_t*>(buf);
574 helper->remaining_ = size;
575 helper->readFully_ = 1;
576 helper->readNonblocking_ = 0;
577 helper->readWithTimeout_ = 0;
578 helper->nextState_ = c;
579 helper->hasError_ = 0;
580 allocationResult_ = helper;
582 }
583
594 Action read_single(StateFlowSelectHelper *helper, int fd, void *buf,
595 size_t size, Callback c, unsigned priority = Selectable::MAX_PRIO)
596 {
597 helper->reset(Selectable::READ, fd, priority);
598 helper->set_wakeup(this);
599 helper->rbuf_ = static_cast<uint8_t*>(buf);
600 helper->remaining_ = size;
601 helper->readFully_ = 0;
602 helper->readNonblocking_ = 0;
603 helper->readWithTimeout_ = 0;
604 helper->nextState_ = c;
605 helper->hasError_ = 0;
606 allocationResult_ = helper;
608 }
609
619 Action read_nonblocking(StateFlowSelectHelper* helper, int fd, void* buf, size_t size, Callback c, unsigned priority = Selectable::MAX_PRIO) {
620 helper->reset(Selectable::READ, fd, priority);
621 helper->set_wakeup(this);
622 helper->rbuf_ = static_cast<uint8_t*>(buf);
623 helper->remaining_ = size;
624 helper->readFully_ = 0;
625 helper->readNonblocking_ = 1;
626 helper->readWithTimeout_ = 0;
627 helper->nextState_ = c;
628 helper->hasError_ = 0;
629 allocationResult_ = helper;
631 }
632
647 long long timeout_nsec, int fd, void *buf, size_t size, Callback c,
648 unsigned priority = Selectable::MAX_PRIO)
649 {
650 helper->reset(Selectable::READ, fd, priority);
651 helper->set_timed_wakeup();
652 helper->rbuf_ = static_cast<uint8_t*>(buf);
653 helper->remaining_ = size;
654 helper->expiry_ = OSTime::get_monotonic() + timeout_nsec;
655 helper->readFully_ = 1;
656 helper->readNonblocking_ = 0;
657 helper->readWithTimeout_ = 1;
658 helper->timer_.set_triggered(); // Needed for the first iteration
659 helper->nextState_ = c;
660 helper->hasError_ = 0;
661 allocationResult_ = static_cast<StateFlowSelectHelper *>(helper);
663 }
664
668 {
671 if (h->readWithTimeout_)
672 {
673 if (service()->executor()->is_selected(h))
674 {
675 service()->executor()->unselect(h);
676 }
677 }
678 if (!h->remaining_)
679 {
680 h->rbuf_ = nullptr;
681 return call_immediately(h->nextState_);
682 }
683 int count = ::read(h->fd(), h->rbuf_, h->remaining_);
684 if (count > 0)
685 {
686 h->remaining_ -= count;
687 h->rbuf_ += count;
688 if (h->remaining_ && h->readFully_ && !h->readNonblocking_)
689 {
690 return again();
691 }
692 else
693 {
694 h->rbuf_ = nullptr;
695 return call_immediately(h->nextState_);
696 }
697 }
698 if (count < 0 &&
699 (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR))
700 {
701 if (h->readNonblocking_)
702 {
703 h->rbuf_ = nullptr;
704 return call_immediately(h->nextState_);
705 }
706 else
707 {
708 // Blocked.
709 if (h->readWithTimeout_)
710 {
711 auto *hh = static_cast<StateFlowTimedSelectHelper *>(h);
712 if (!hh->timer_.is_triggered())
713 {
714 // We actually got a timeout notification.
715 h->rbuf_ = nullptr;
716 return call_immediately(h->nextState_);
717 }
718 hh->timer_.start_absolute(hh->expiry_);
719 }
720 service()->executor()->select(h);
721 return wait();
722 }
723 }
724 // Now: we are at an unknown error or EOF.
725 h->rbuf_ = nullptr;
726 h->hasError_ = 1;
727 return call_immediately(h->nextState_);
728 }
729
730
731#if OPENMRN_FEATURE_BSD_SOCKETS
738 Action listen_and_call(StateFlowSelectHelper *helper, int fd, Callback c)
739 {
740#if OPENMRN_HAVE_SOCKET_FSTAT
741 // verify that the fd is a socket
742 struct stat stat;
743 fstat(fd, &stat);
744 HASSERT(S_ISSOCK(stat.st_mode));
745#endif // OPENMRN_HAVE_SOCKET_FSTAT
746
747 helper->reset(Selectable::READ, fd, Selectable::MAX_PRIO);
748 helper->set_wakeup(this);
749
750 service()->executor()->select(helper);
751 return wait_and_call(c);
752 }
753
759 Action connect_and_call(StateFlowSelectHelper *helper, int fd, Callback c)
760 {
761#if OPENMRN_HAVE_SOCKET_FSTAT
762 // verify that the fd is a socket
763 struct stat stat;
764 fstat(fd, &stat);
765 HASSERT(S_ISSOCK(stat.st_mode));
766#endif // OPENMRN_HAVE_SOCKET_FSTAT
767
768 helper->reset(Selectable::WRITE, fd, Selectable::MAX_PRIO);
769 helper->set_wakeup(this);
770
771 service()->executor()->select(helper);
772 return wait_and_call(c);
773 }
774#endif // OPENMRN_FEATURE_BSD_SOCKETS
775
791 const void *buf, size_t size, Callback c,
792 unsigned priority = Selectable::MAX_PRIO)
793 {
794 helper->reset(Selectable::WRITE, fd, priority);
795 helper->set_wakeup(this);
796 helper->wbuf_ = static_cast<const uint8_t*>(buf);
797 helper->remaining_ = size;
798 helper->readFully_ = 1;
799 helper->readWithTimeout_ = 0;
800 helper->nextState_ = c;
801 helper->hasError_ = 0;
802 allocationResult_ = helper;
804 }
805
809 {
812 if (!h->remaining_)
813 {
814 return call_immediately(h->nextState_);
815 }
816 int count = ::write(h->fd(), h->wbuf_, h->remaining_);
817 if (count > 0)
818 {
819 h->remaining_ -= count;
820 h->wbuf_ += count;
821 return again();
822 }
823 if (count <= 0 &&
824 (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR))
825 {
826 // Blocked.
827 service()->executor()->select(h);
828 return wait();
829 }
830 h->hasError_ = 1;
831#ifdef STATEFLOW_DEBUG_WRITE_ERRORS
832 static volatile int scount;
833 static volatile int serrno;
834 scount = count;
835 serrno = errno;
836 LOG(FATAL, "failed to write count=%d errno=%d", scount, serrno);
837 DIE("failed write");
838#endif
839 // Now: we are at an unknown error or EOF.
840 return call_immediately(h->nextState_);
841 }
842
861 {
868
869 union
870 {
871 const uint8_t *wbuf_;
872 uint8_t *rbuf_;
873 };
874
879 unsigned readFully_ : 1;
881 unsigned readNonblocking_ : 1;
884 unsigned readWithTimeout_ : 1;
886 unsigned hasError_ : 1;
888 unsigned remaining_ : 28;
889 };
890
896 private Executable
897 {
904
908 set_wakeup((Executable*)this);
909 }
910
919
923 long long expiry_;
924
925 private:
926 // Executable interface. Called by select.
927 void run() override {
929 }
930 };
931
932private:
935
942
945 void alloc_result(QMember *b) override
946 {
948 notify();
949 }
950
953
956
960
962};
963
964template <class T, class S> class StateFlow;
965
968class StateFlowWithQueue : public StateFlowBase, protected Atomic, public LinkedObject<StateFlowWithQueue>
969{
970public:
972
974 void notify() override;
975
976#if OPENMRN_FEATURE_RTOS_FROM_ISR
978 void notify_from_isr() OVERRIDE;
979#endif // OPENMRN_FEATURE_RTOS_FROM_ISR
980
983 {
984 AtomicHolder h(this);
985 if (!queue_empty()) return false;
986 return isWaiting_;
987 }
988
989protected:
993
999 virtual Action entry() = 0;
1000
1006 virtual QMember *queue_next(unsigned *priority) = 0;
1007
1010 virtual bool queue_empty() = 0;
1011
1015 virtual void release() = 0;
1016
1021 {
1022 return call_immediately(STATE(wait_for_message));
1023 }
1024
1030 {
1031 release();
1032 return exit();
1033 }
1034
1037 {
1038 return currentMessage_;
1039 }
1040
1045 {
1046 BufferBase *m = message();
1047 currentMessage_ = nullptr;
1048 return m;
1049 }
1050
1060
1062 unsigned priority()
1063 {
1064 return currentPriority_;
1065 }
1066
1068 void set_priority(unsigned priority)
1069 {
1071 }
1072
1078 {
1079 reset_flow(c);
1080 notify();
1081 isWaiting_ = 0;
1082 }
1083
1084private:
1085 STATE_FLOW_STATE(wait_for_message);
1086
1089 unsigned queueSize_;
1090
1093
1095 unsigned currentPriority_ : 31;
1096
1099 unsigned isWaiting_ : 1;
1100
1101 template <class Q> friend class UntypedStateFlow;
1102 template <class M, class B> friend class TypedStateFlow;
1103 friend class GlobalEventFlow;
1104
1106 static const unsigned MAX_PRIORITY_ = 0x7FFFFFFFU;
1107};
1108
1109template <class MessageType> class FlowInterface;
1110
1116template <class MessageType> class FlowInterface
1117{
1118public:
1120 typedef MessageType message_type;
1121
1122 virtual ~FlowInterface() {}
1123
1128 virtual Pool *pool()
1129 {
1130 return mainBufferPool;
1131 }
1132
1139 virtual void send(MessageType *message, unsigned priority = UINT_MAX) = 0;
1140
1144 virtual MessageType *type_helper()
1145 {
1146 return nullptr;
1147 }
1148
1151 MessageType *alloc()
1152 {
1153 MessageType *ret;
1154 pool()->alloc(&ret);
1155 return ret;
1156 }
1157
1163 {
1164 typedef typename MessageType::value_type T;
1165 Pool* p = pool();
1166 p->alloc_async<T>(target);
1167 }
1168
1175 static MessageType *cast_alloc(QMember *entry)
1176 {
1177 MessageType *result;
1178 Pool::alloc_async_init(static_cast<BufferBase *>(entry), &result);
1179 return result;
1180 }
1181
1182 class GenericHandler;
1183};
1184
1185template <class MessageType>
1187 : public FlowInterface<MessageType>
1188{
1189public:
1191 typedef std::function<void(message_type *)> HandlerFn;
1197 : handler_(handler)
1198 {
1199 }
1200
1204 template<class T>
1205 GenericHandler(T* ptr, void (T::*fn)(message_type*))
1206 : handler_(std::bind(fn, ptr, std::placeholders::_1)) {}
1207
1209 void send(MessageType *message, unsigned priority) OVERRIDE
1210 {
1211 handler_(message);
1212 }
1213
1214private:
1217};
1218
1219template <class T>
1220Buffer<T> *
1222{
1223 return target_flow->cast_alloc(allocationResult_);
1224}
1225
1226
1233template<class QueueType>
1235public:
1239
1241 {
1242 }
1243
1244protected:
1251 void send(BufferBase *msg, unsigned priority = UINT_MAX)
1252 {
1253 AtomicHolder h(this);
1254 queue_.insert_locked(msg, priority);
1255 queueSize_ = queue_.size();
1256 if (isWaiting_)
1257 {
1258 isWaiting_ = 0;
1260 this->notify();
1261 }
1262 }
1263
1266
1273 {
1274 typename QueueType::Result r = queue_.next_locked();
1275 if (r.item)
1276 {
1277 *priority = r.index;
1278 }
1279 return r.item;
1280 }
1281
1285 AtomicHolder h(this);
1286 return queue_.empty();
1287 }
1288
1289private:
1291 QueueType queue_;
1292};
1293
1296template <class MessageType, class Base>
1297class TypedStateFlow : public Base, public FlowInterface<MessageType>
1298{
1299public:
1302 typedef typename Base::Action Action;
1303
1307 TypedStateFlow(Service *service) : Base(service) {}
1308
1312 {
1313 }
1314
1321 void send(MessageType *msg, unsigned priority = UINT_MAX) OVERRIDE
1322 {
1323 Base::send(msg, priority);
1324 }
1325
1330 virtual Action entry() override = 0;
1331
1332protected:
1335 {
1336 if (message())
1337 {
1338 message()->unref();
1339 }
1340 this->currentMessage_ = nullptr;
1341 }
1342
1347 {
1348 message()->data()->done.notify();
1349 release();
1350 }
1351
1353 MessageType *message()
1354 {
1355 return static_cast<MessageType *>(Base::message());
1356 }
1357
1361 MessageType *transfer_message()
1362 {
1363 return static_cast<MessageType *>(
1364 Base::transfer_message());
1365 }
1366};
1367
1368
1373template<class MessageType, class QueueType>
1374class StateFlow : public TypedStateFlow<MessageType, UntypedStateFlow<QueueType> > {
1375public:
1379 : TypedStateFlow<MessageType, UntypedStateFlow<QueueType>>(service)
1380 {
1381 }
1382};
1383
1384#endif /* _EXECUTOR_STATEFLOW_HXX_ */
DynamicPool * mainBufferPool
main buffer pool instance
Definition Buffer.cxx:37
int bind(int socket, const struct sockaddr *address, socklen_t address_len)
Bind a name to a socket.
Definition Socket.cxx:159
#define STATE_FLOW_STATE(_state)
Declare a state callback in a StateFlow.
Definition StateFlow.hxx:68
#define STATE(_fn)
Turns a function name into an argument to be supplied to functions expecting a state.
Definition StateFlow.hxx:61
See OSMutexLock in os/OS.hxx.
Definition Atomic.hxx:153
Lightweight locking class for protecting small critical sections.
Definition Atomic.hxx:130
Abstract base class for all Buffers.
Definition Buffer.hxx:85
Base class for all QMember types that hold data in an expandable format.
Definition Buffer.hxx:195
Buffer< T > * ref()
Add another reference to the buffer.
Definition Buffer.hxx:203
T * data()
get a pointer to the start of the data.
Definition Buffer.hxx:215
static Notifiable * DefaultInstance()
An object that can be scheduled on an executor to run.
void select(Selectable *job)
Adds a file descriptor to be watched to the select loop.
Definition Executor.cxx:332
bool is_selected(Selectable *job)
Definition Executor.cxx:352
void unselect(Selectable *job)
Removes a job from the select loop.
Definition Executor.cxx:359
std::function< void(message_type *)> HandlerFn
Interface of the callback function.
void send(MessageType *message, unsigned priority) OVERRIDE
Overridden method for sending the message.
HandlerFn handler_
The configured handler callback.
GenericHandler(T *ptr, void(T::*fn)(message_type *))
Constructor to be called with an object member function.
GenericHandler(HandlerFn handler)
Constructor.
Abstract class for message recipients.
virtual void send(MessageType *message, unsigned priority=UINT_MAX)=0
Entry point to the flow.
virtual Pool * pool()
static MessageType * cast_alloc(QMember *entry)
Down casts and initializes an asynchronous allocation result to the appropriate flow's buffer type.
MessageType message_type
Stores the message template type for external reference.
virtual MessageType * type_helper()
This function is never user in the code, but GDB can use it to infer the correct message types.
MessageType * alloc()
Synchronously allocates a message buffer from the pool of this flow.
void alloc_async(Executable *target)
Asynchronously allocates a message buffer from the pool of this flow.
Using this class as a base class will cause the given class to have all its instances linked up in a ...
static long long get_monotonic()
Get the monotonic time since the system started.
Definition OS.hxx:560
Pool of previously allocated, but currently unused, items.
Definition Buffer.hxx:278
static void alloc_async_init(BufferBase *base, Buffer< BufferType > **result)
Cast the result of an asynchronous allocation and perform a placement new on it.
Definition Buffer.hxx:331
void alloc(Buffer< BufferType > **result, Executable *flow=NULL)
Get a free item out of the pool.
Definition Buffer.hxx:292
void alloc_async(Executable *flow)
Get a free item out of the pool.
Definition Buffer.hxx:319
Asynchronous specialization of Q.
Definition Queue.hxx:254
void next_async(Executable *flow)
Get an item from the front of the queue.
Definition Queue.hxx:305
Essentially a "next" pointer container.
Definition QMember.hxx:42
Handler structure that ExecutorBase knows about each entry to the select call.
@ MAX_PRIO
Largest priority we accept (otherwise we clip).
Executable * parent()
void reset(SelectType type, int fd, unsigned priority)
Re-initialize a Selectable preparing to add it to select().
void set_wakeup(Executable *e)
Can be used to override the executable to wake up.
Collection of related state machines that pend on incoming messages.
ExecutorBase * executor()
Return type for a state flow callback.
Action(Callback s)
Constructor.
Callback nextState_
next state in state flow
Callback next_state()
Get the next state for the StateFlowAction.
Use this timer class to deliver the timeout notification to a stateflow.
long long timeout() override
Clients of timer should override this function.
StateFlowTimer(StateFlowBase *parent)
Constructor.
StateFlowBase * parent_
The timer will deliver notifications to this flow.
Base class for state machines.
QMember * allocationResult_
The result of the next allocation that comes in.
Action yield_and_call(Callback c)
Place the current flow to the back of the executor, and transition to a new state after we get the CP...
Action read_repeated(StateFlowSelectHelper *helper, int fd, void *buf, size_t size, Callback c, unsigned priority=Selectable::MAX_PRIO)
Blocks until size bytes are read and then invokes the next state.
Action read_single(StateFlowSelectHelper *helper, int fd, void *buf, size_t size, Callback c, unsigned priority=Selectable::MAX_PRIO)
Attempts to read at most size_t bytes, and blocks the caller until at least one byte is read.
Action read_repeated_with_timeout(StateFlowTimedSelectHelper *helper, long long timeout_nsec, int fd, void *buf, size_t size, Callback c, unsigned priority=Selectable::MAX_PRIO)
Blocks until size bytes are read, or a timeout expires.
Service * service()
Return a pointer to the service I am bound to.
bool is_terminated()
void run() override
Callback from the executor.
Definition StateFlow.cxx:58
void notify() override
Wakeup call arrived.
Definition StateFlow.cxx:97
Service * service_
Service this StateFlow belongs to.
Action allocate_and_call(FlowInterface< Buffer< T > > *target_flow, Callback c, Pool *pool=nullptr)
Allocates a buffer from a pool and proceed to the next state when allocation is successful.
StateFlowBase(Service *service)
Constructor.
StateFlowBase()
Default constructor.
Action internal_try_write()
Implementation state that gets repeatedly called upon every wakeup and tries to make progress on writ...
Action yield()
Place the current flow to the back of the executor, and re-try the current state after we get the CPU...
Action delete_this()
Terminates the flow and deletes *this.
Action allocate_and_call(Callback c, QAsync *queue)
Allocates an entry from an asynchronous queue, and transitions to a state once the allocation is comp...
Buffer< T > * full_allocation_result(FlowInterface< Buffer< T > > *target_flow)
Takes the result of the asynchronous allocation without resetting the object.
Action exit()
Terminate current StateFlow activity.
Action read_nonblocking(StateFlowSelectHelper *helper, int fd, void *buf, size_t size, Callback c, unsigned priority=Selectable::MAX_PRIO)
Attempts to read at most size bytes, and then invokes the next state, even if only zero bytes are ava...
Callback state_
current active state in the flow
void start_flow(Callback c)
Resets the flow to the specified state and starts it.
Buffer< T > * get_allocation_result(FlowInterface< Buffer< T > > *target_flow)
Takes the result of the asynchronous allocation.
Action wait()
Wait for an asynchronous call.
Action again()
Call the current state again via call_immediately.
void reset_flow(Callback c)
Resets the flow to the specified state.
Action invoke_subflow_and_wait(FlowInterface< Buffer< T > > *target_flow, Callback c, Args &&... args)
Calls a helper flow to perform some actions.
Action call_immediately(Callback c)
Imediately call the next state upon return.
Action set_terminated()
Sets the flow to terminated state.
static void invoke_subflow_and_ignore_result(FlowInterface< Buffer< T > > *target_flow, Args &&... args)
Calls a helper flow to perform some actions.
Action wait_and_call(Callback c)
Wait for resource to become available before proceeding to next state.
Action(StateFlowBase::* Callback)()
State Flow callback prototype.
T * full_allocation_result(TypedQAsync< T > *queue)
Takes the result of the asynchronous allocation without resetting the object.
~StateFlowBase()
Destructor.
bool is_state(Callback c)
Action terminated()
Terminates the current StateFlow activity.
void cast_allocation_result(T **member)
Takes the result of the asynchronous allocation without resetting the object.
Action write_repeated(StateFlowSelectHelper *helper, int fd, const void *buf, size_t size, Callback c, unsigned priority=Selectable::MAX_PRIO)
Writes some data into a file descriptor, repeating the operation as necessary until all bytes are wri...
Action internal_try_read()
Implementation state that gets repeatedly called upon every wakeup and tries to make progress on read...
void alloc_result(QMember *b) override
Callback from a Pool in case of an asynchronous allocation.
Action sleep_and_call(::Timer *timer, long long timeout_nsec, Callback c)
Suspends execution of this control flow for a specified time.
A state flow that has an incoming message queue, pends on that queue, and runs a flow for every messa...
virtual QMember * queue_next(unsigned *priority)=0
Takes the front entry in the queue.
unsigned isWaiting_
True if we are in the pending state, waiting for an entry to show up in the queue.
void notify() override
Wakeup call arrived. Schedules *this on the executor.
Action exit()
Terminates the processing of this flow.
Action release_and_exit()
Terminates the processing of the current message.
BufferBase * message()
BufferBase * currentMessage_
Message we are currently processing.
void reset_message(BufferBase *message, unsigned priority)
Sets the current message being processed.
BufferBase * transfer_message()
Releases ownership of the current message.
static const unsigned MAX_PRIORITY_
Largest acceptable priority value for a stateflow.
void start_flow_at_init(Callback c)
Call this from the constructor of the child class to do some work before the main queue processing lo...
unsigned currentPriority_
Priority of the current message we are processing.
virtual Action entry()=0
Entry into the StateFlow activity.
void set_priority(unsigned priority)
Overrides the current priority.
virtual bool queue_empty()=0
virtual void release()=0
Releases the current message buffer back to the pool it came from.
unsigned queueSize_
For debugging: how many entries are currently waiting in the queue of this stateflow.
State flow with a given typed input queue.
StateFlow(Service *service)
Constructor.
A timer that can schedule itself to run on an executor at specified times in the future.
Definition Timer.hxx:134
@ NONE
Do not restart the timer.
Definition Timer.hxx:161
void set_triggered()
Sets the timer as if it was woken up by a trigger(), even if it was never started.
Definition Timer.hxx:280
void start(long long period=-1)
Starts a timer.
Definition Timer.hxx:185
void ensure_triggered()
Triggers the timer if it is not expired yet.
Definition Timer.hxx:249
Strongly typed queue class with asynchronous access.
Definition Queue.hxx:406
Helper class in the StateFlow hierarchy.
void release() OVERRIDE
Unrefs the current buffer.
Base::Action Action
Allows using Action without having StateFlowBase:: prefix in front of it.
TypedStateFlow(Service *service)
Constructor.
virtual ~TypedStateFlow()
Destructor.
virtual Action entry() override=0
Entry into the StateFlow activity.
void send(MessageType *msg, unsigned priority=UINT_MAX) OVERRIDE
Sends a message to the state flow for processing.
MessageType * transfer_message()
Releases ownership of the current message.
MessageType * message()
void return_buffer()
For state flows that are operated using invoke_subflow_and_wait this is a way to hand back the buffer...
State flow base class with queue but generic message type.
UntypedStateFlow(Service *service)
Constructor.
void send(BufferBase *msg, unsigned priority=UINT_MAX)
Sends a message to the state flow for processing.
QueueType queue_
Implementation of the queue.
QMember * queue_next(unsigned *priority) OVERRIDE
Takes the front entry in the queue.
bool queue_empty() OVERRIDE
#define LOG(level, message...)
Conditionally write a message to the logging output.
Definition logging.h:99
static const int VERBOSE
Loglevel that is usually not printed, reporting debugging information.
Definition logging.h:59
static const int FATAL
Loglevel that kills the current process.
Definition logging.h:51
#define OVERRIDE
Function attribute for virtual functions declaring that this funciton is overriding a funciton that s...
Definition macros.h:180
#define HASSERT(x)
Checks that the value of expression x is true, else terminates the current process.
Definition macros.h:138
#define DIE(MSG)
Unconditionally terminates the current process with a message.
Definition macros.h:143
#define DISALLOW_COPY_AND_ASSIGN(TypeName)
Removes default copy-constructor and assignment added by C++.
Definition macros.h:171
Use this class to read from an fd using select() in a state flow.
unsigned readWithTimeout_
1 if there is also a timer involved; in this case *this must be a StateFlowTimedSelectHelper.
unsigned remaining_
Number of bytes still outstanding to read.
unsigned hasError_
1 if there was an error reading of writing.
unsigned readFully_
1 if we need to read until all remaining_ is consumed.
StateFlowSelectHelper(StateFlowBase *parent)
unsigned readNonblocking_
1 if we need a non-blocking read, in other words, try once
Callback nextState_
State to transition to after the read is complete.
Use this class to read from an fd with select and timeout.
StateFlowTimedSelectHelper(StateFlowBase *parent)
long long expiry_
End of the wakeup timeout.
StateFlowTimer timer_
This timer is used to wake up the StateFlow.
void set_timed_wakeup()
Called from the stateflow's internal state to instruct to set the wakeup target for the timer.
void run() override
Entry point.