35#ifndef _EXECUTOR_STATEFLOW_HXX_
36#define _EXECUTOR_STATEFLOW_HXX_
62 (StateFlowBase::Callback)( \
63 &std::remove_reference<decltype(*this)>::type::_fn)
68#define STATE_FLOW_STATE(_state) Action _state()
75#define STATE_FLOW_START(_name, _message, _priorities) \
76 class _name : public StateFlow<_message, _priorities> \
79 _name(Service *service) : StateFlow<_priorities, _priorities>(service) \
84 STATE_FLOW_STATE(entry); \
88 DISALLOW_COPY_AND_ASSIGN(_name)
94#define STATE_FLOW_START_WITH_TIMER(_name, _priorities) \
95 class _name : public StateFlow<_priorities> \
98 _name(Service *service) \
99 : StateFlow<_priorities>(service) \
100 , timer(TIMEOUT_FROM(service, state_flow_timeout), service, this) \
111 timerMsg ? me()->send(timerMsg) :; \
121 STATE_FLOW_STATE(entry); \
123 Action timeout_and_call(Callback c, Message *msg, long long period) \
125 msg->id(msg->id() | Message::IN_PROCESS_MSK); \
127 timer.start(period); \
136 return timer.early(); \
141 DISALLOW_COPY_AND_ASSIGN(_name)
145#define STATE_FLOW_END() }
182#if OPENMRN_FEATURE_RTOS_FROM_ISR
185 virtual void notify_from_isr()
OVERRIDE;
352 Pool *pool =
nullptr)
358 HASSERT(target_flow !=
nullptr);
359 p = target_flow->pool();
495 timer->
start(timeout_nsec);
520 template <
class T,
typename... Args>
526 b->
data()->reset(std::forward<Args>(args)...);
527 b->
data()->done.reset(
this);
529 target_flow->send(b);
547 template <
class T,
typename... Args>
553 b->
data()->reset(std::forward<Args>(args)...);
555 target_flow->send(b);
559 struct StateFlowSelectHelper;
560 struct StateFlowTimedSelectHelper;
571 helper->
reset(Selectable::READ, fd, priority);
573 helper->rbuf_ =
static_cast<uint8_t*
>(buf);
597 helper->
reset(Selectable::READ, fd, priority);
599 helper->rbuf_ =
static_cast<uint8_t*
>(buf);
620 helper->
reset(Selectable::READ, fd, priority);
622 helper->rbuf_ =
static_cast<uint8_t*
>(buf);
647 long long timeout_nsec,
int fd,
void *buf,
size_t size,
Callback c,
650 helper->
reset(Selectable::READ, fd, priority);
652 helper->rbuf_ =
static_cast<uint8_t*
>(buf);
699 (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR))
712 if (!hh->timer_.is_triggered())
718 hh->timer_.start_absolute(hh->expiry_);
731#if OPENMRN_FEATURE_BSD_SOCKETS
738 Action listen_and_call(StateFlowSelectHelper *helper,
int fd,
Callback c)
740#if OPENMRN_HAVE_SOCKET_FSTAT
744 HASSERT(S_ISSOCK(stat.st_mode));
748 helper->set_wakeup(
this);
759 Action connect_and_call(StateFlowSelectHelper *helper,
int fd,
Callback c)
761#if OPENMRN_HAVE_SOCKET_FSTAT
765 HASSERT(S_ISSOCK(stat.st_mode));
769 helper->set_wakeup(
this);
791 const void *buf,
size_t size,
Callback c,
794 helper->
reset(Selectable::WRITE, fd, priority);
796 helper->wbuf_ =
static_cast<const uint8_t*
>(buf);
824 (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR))
831#ifdef STATEFLOW_DEBUG_WRITE_ERRORS
832 static volatile int scount;
833 static volatile int serrno;
836 LOG(
FATAL,
"failed to write count=%d errno=%d", scount, serrno);
871 const uint8_t *wbuf_;
964template <
class T,
class S>
class StateFlow;
976#if OPENMRN_FEATURE_RTOS_FROM_ISR
1103 friend class GlobalEventFlow;
1139 virtual void send(MessageType *message,
unsigned priority = UINT_MAX) = 0;
1164 typedef typename MessageType::value_type T;
1177 MessageType *result;
1182 class GenericHandler;
1185template <
class MessageType>
1206 : handler_(std::
bind(fn, ptr, std::placeholders::_1)) {}
1233template<
class QueueType>
1274 typename QueueType::Result r =
queue_.next_locked();
1296template <
class MessageType,
class Base>
1323 Base::send(msg, priority);
1340 this->currentMessage_ =
nullptr;
1348 message()->data()->done.notify();
1355 return static_cast<MessageType *
>(Base::message());
1363 return static_cast<MessageType *
>(
1364 Base::transfer_message());
1373template<
class MessageType,
class QueueType>
DynamicPool * mainBufferPool
main buffer pool instance
int bind(int socket, const struct sockaddr *address, socklen_t address_len)
Bind a name to a socket.
#define STATE_FLOW_STATE(_state)
Declare a state callback in a StateFlow.
#define STATE(_fn)
Turns a function name into an argument to be supplied to functions expecting a state.
See OSMutexLock in os/OS.hxx.
Lightweight locking class for protecting small critical sections.
Abstract base class for all Buffers.
Base class for all QMember types that hold data in an expandable format.
Buffer< T > * ref()
Add another reference to the buffer.
T * data()
get a pointer to the start of the data.
static Notifiable * DefaultInstance()
An object that can be scheduled on an executor to run.
void select(Selectable *job)
Adds a file descriptor to be watched to the select loop.
bool is_selected(Selectable *job)
void unselect(Selectable *job)
Removes a job from the select loop.
std::function< void(message_type *)> HandlerFn
Interface of the callback function.
void send(MessageType *message, unsigned priority) OVERRIDE
Overridden method for sending the message.
HandlerFn handler_
The configured handler callback.
GenericHandler(T *ptr, void(T::*fn)(message_type *))
Constructor to be called with an object member function.
GenericHandler(HandlerFn handler)
Constructor.
Abstract class for message recipients.
virtual void send(MessageType *message, unsigned priority=UINT_MAX)=0
Entry point to the flow.
static MessageType * cast_alloc(QMember *entry)
Down casts and initializes an asynchronous allocation result to the appropriate flow's buffer type.
MessageType message_type
Stores the message template type for external reference.
virtual MessageType * type_helper()
This function is never user in the code, but GDB can use it to infer the correct message types.
MessageType * alloc()
Synchronously allocates a message buffer from the pool of this flow.
void alloc_async(Executable *target)
Asynchronously allocates a message buffer from the pool of this flow.
Using this class as a base class will cause the given class to have all its instances linked up in a ...
static long long get_monotonic()
Get the monotonic time since the system started.
Pool of previously allocated, but currently unused, items.
static void alloc_async_init(BufferBase *base, Buffer< BufferType > **result)
Cast the result of an asynchronous allocation and perform a placement new on it.
void alloc(Buffer< BufferType > **result, Executable *flow=NULL)
Get a free item out of the pool.
void alloc_async(Executable *flow)
Get a free item out of the pool.
Asynchronous specialization of Q.
void next_async(Executable *flow)
Get an item from the front of the queue.
Essentially a "next" pointer container.
Handler structure that ExecutorBase knows about each entry to the select call.
@ MAX_PRIO
Largest priority we accept (otherwise we clip).
void reset(SelectType type, int fd, unsigned priority)
Re-initialize a Selectable preparing to add it to select().
void set_wakeup(Executable *e)
Can be used to override the executable to wake up.
Collection of related state machines that pend on incoming messages.
ExecutorBase * executor()
Return type for a state flow callback.
Action(Callback s)
Constructor.
Callback nextState_
next state in state flow
Callback next_state()
Get the next state for the StateFlowAction.
Use this timer class to deliver the timeout notification to a stateflow.
long long timeout() override
Clients of timer should override this function.
StateFlowTimer(StateFlowBase *parent)
Constructor.
StateFlowBase * parent_
The timer will deliver notifications to this flow.
Base class for state machines.
QMember * allocationResult_
The result of the next allocation that comes in.
Action yield_and_call(Callback c)
Place the current flow to the back of the executor, and transition to a new state after we get the CP...
Action read_repeated(StateFlowSelectHelper *helper, int fd, void *buf, size_t size, Callback c, unsigned priority=Selectable::MAX_PRIO)
Blocks until size bytes are read and then invokes the next state.
Action read_single(StateFlowSelectHelper *helper, int fd, void *buf, size_t size, Callback c, unsigned priority=Selectable::MAX_PRIO)
Attempts to read at most size_t bytes, and blocks the caller until at least one byte is read.
Action read_repeated_with_timeout(StateFlowTimedSelectHelper *helper, long long timeout_nsec, int fd, void *buf, size_t size, Callback c, unsigned priority=Selectable::MAX_PRIO)
Blocks until size bytes are read, or a timeout expires.
Service * service()
Return a pointer to the service I am bound to.
void run() override
Callback from the executor.
void notify() override
Wakeup call arrived.
Service * service_
Service this StateFlow belongs to.
Action allocate_and_call(FlowInterface< Buffer< T > > *target_flow, Callback c, Pool *pool=nullptr)
Allocates a buffer from a pool and proceed to the next state when allocation is successful.
StateFlowBase(Service *service)
Constructor.
StateFlowBase()
Default constructor.
Action internal_try_write()
Implementation state that gets repeatedly called upon every wakeup and tries to make progress on writ...
Action yield()
Place the current flow to the back of the executor, and re-try the current state after we get the CPU...
Action delete_this()
Terminates the flow and deletes *this.
Action allocate_and_call(Callback c, QAsync *queue)
Allocates an entry from an asynchronous queue, and transitions to a state once the allocation is comp...
Buffer< T > * full_allocation_result(FlowInterface< Buffer< T > > *target_flow)
Takes the result of the asynchronous allocation without resetting the object.
Action exit()
Terminate current StateFlow activity.
Action read_nonblocking(StateFlowSelectHelper *helper, int fd, void *buf, size_t size, Callback c, unsigned priority=Selectable::MAX_PRIO)
Attempts to read at most size bytes, and then invokes the next state, even if only zero bytes are ava...
Callback state_
current active state in the flow
void start_flow(Callback c)
Resets the flow to the specified state and starts it.
Buffer< T > * get_allocation_result(FlowInterface< Buffer< T > > *target_flow)
Takes the result of the asynchronous allocation.
Action wait()
Wait for an asynchronous call.
Action again()
Call the current state again via call_immediately.
void reset_flow(Callback c)
Resets the flow to the specified state.
Action invoke_subflow_and_wait(FlowInterface< Buffer< T > > *target_flow, Callback c, Args &&... args)
Calls a helper flow to perform some actions.
Action call_immediately(Callback c)
Imediately call the next state upon return.
Action set_terminated()
Sets the flow to terminated state.
static void invoke_subflow_and_ignore_result(FlowInterface< Buffer< T > > *target_flow, Args &&... args)
Calls a helper flow to perform some actions.
Action wait_and_call(Callback c)
Wait for resource to become available before proceeding to next state.
Action(StateFlowBase::* Callback)()
State Flow callback prototype.
T * full_allocation_result(TypedQAsync< T > *queue)
Takes the result of the asynchronous allocation without resetting the object.
~StateFlowBase()
Destructor.
bool is_state(Callback c)
Action terminated()
Terminates the current StateFlow activity.
void cast_allocation_result(T **member)
Takes the result of the asynchronous allocation without resetting the object.
Action write_repeated(StateFlowSelectHelper *helper, int fd, const void *buf, size_t size, Callback c, unsigned priority=Selectable::MAX_PRIO)
Writes some data into a file descriptor, repeating the operation as necessary until all bytes are wri...
Action internal_try_read()
Implementation state that gets repeatedly called upon every wakeup and tries to make progress on read...
void alloc_result(QMember *b) override
Callback from a Pool in case of an asynchronous allocation.
Action sleep_and_call(::Timer *timer, long long timeout_nsec, Callback c)
Suspends execution of this control flow for a specified time.
A state flow that has an incoming message queue, pends on that queue, and runs a flow for every messa...
virtual QMember * queue_next(unsigned *priority)=0
Takes the front entry in the queue.
unsigned isWaiting_
True if we are in the pending state, waiting for an entry to show up in the queue.
void notify() override
Wakeup call arrived. Schedules *this on the executor.
Action exit()
Terminates the processing of this flow.
Action release_and_exit()
Terminates the processing of the current message.
BufferBase * currentMessage_
Message we are currently processing.
void reset_message(BufferBase *message, unsigned priority)
Sets the current message being processed.
BufferBase * transfer_message()
Releases ownership of the current message.
static const unsigned MAX_PRIORITY_
Largest acceptable priority value for a stateflow.
void start_flow_at_init(Callback c)
Call this from the constructor of the child class to do some work before the main queue processing lo...
unsigned currentPriority_
Priority of the current message we are processing.
virtual Action entry()=0
Entry into the StateFlow activity.
void set_priority(unsigned priority)
Overrides the current priority.
virtual bool queue_empty()=0
virtual void release()=0
Releases the current message buffer back to the pool it came from.
unsigned queueSize_
For debugging: how many entries are currently waiting in the queue of this stateflow.
State flow with a given typed input queue.
StateFlow(Service *service)
Constructor.
A timer that can schedule itself to run on an executor at specified times in the future.
@ NONE
Do not restart the timer.
void set_triggered()
Sets the timer as if it was woken up by a trigger(), even if it was never started.
void start(long long period=-1)
Starts a timer.
void ensure_triggered()
Triggers the timer if it is not expired yet.
Strongly typed queue class with asynchronous access.
Helper class in the StateFlow hierarchy.
void release() OVERRIDE
Unrefs the current buffer.
Base::Action Action
Allows using Action without having StateFlowBase:: prefix in front of it.
TypedStateFlow(Service *service)
Constructor.
virtual ~TypedStateFlow()
Destructor.
virtual Action entry() override=0
Entry into the StateFlow activity.
void send(MessageType *msg, unsigned priority=UINT_MAX) OVERRIDE
Sends a message to the state flow for processing.
MessageType * transfer_message()
Releases ownership of the current message.
void return_buffer()
For state flows that are operated using invoke_subflow_and_wait this is a way to hand back the buffer...
State flow base class with queue but generic message type.
UntypedStateFlow(Service *service)
Constructor.
void send(BufferBase *msg, unsigned priority=UINT_MAX)
Sends a message to the state flow for processing.
QueueType queue_
Implementation of the queue.
QMember * queue_next(unsigned *priority) OVERRIDE
Takes the front entry in the queue.
bool queue_empty() OVERRIDE
#define LOG(level, message...)
Conditionally write a message to the logging output.
static const int VERBOSE
Loglevel that is usually not printed, reporting debugging information.
static const int FATAL
Loglevel that kills the current process.
#define OVERRIDE
Function attribute for virtual functions declaring that this funciton is overriding a funciton that s...
#define HASSERT(x)
Checks that the value of expression x is true, else terminates the current process.
#define DIE(MSG)
Unconditionally terminates the current process with a message.
#define DISALLOW_COPY_AND_ASSIGN(TypeName)
Removes default copy-constructor and assignment added by C++.
Use this class to read from an fd using select() in a state flow.
unsigned readWithTimeout_
1 if there is also a timer involved; in this case *this must be a StateFlowTimedSelectHelper.
unsigned remaining_
Number of bytes still outstanding to read.
unsigned hasError_
1 if there was an error reading of writing.
unsigned readFully_
1 if we need to read until all remaining_ is consumed.
StateFlowSelectHelper(StateFlowBase *parent)
unsigned readNonblocking_
1 if we need a non-blocking read, in other words, try once
Callback nextState_
State to transition to after the read is complete.
Use this class to read from an fd with select and timeout.
StateFlowTimedSelectHelper(StateFlowBase *parent)
long long expiry_
End of the wakeup timeout.
StateFlowTimer timer_
This timer is used to wake up the StateFlow.
void set_timed_wakeup()
Called from the stateflow's internal state to instruct to set the wakeup target for the timer.
void run() override
Entry point.