34#ifndef _UTILS_HUBDEVICESELECT_HXX_
35#define _UTILS_HUBDEVICESELECT_HXX_
41#ifdef OPENMRN_FEATURE_EXECUTOR_SELECT
64 b->
data()->resize(64);
73 b->
data()->resize(64 - remaining);
156int hubdevice_incoming_packet_limit();
169 typename HFlow::port_type *dst,
typename HFlow::port_type *skip_member)
180 void set_limit_input(
bool should_throttle)
183 int limit = hubdevice_incoming_packet_limit();
227 (
void *)
b_->data()->data(),
b_->data()->size(),
233 (
void *)
b_->data()->data(),
b_->data()->size(),
281 typename HFlow::port_type *
dst_;
297template <
class HFlow,
class ReadFlow = HubDeviceSelectReadFlow<HFlow>>
304 HFlow *
hub,
const char *path,
Notifiable *on_error =
nullptr)
306 hub->service()->
executor(), ::open(path, O_RDWR | O_NONBLOCK))
338 unsigned long par = 1;
339 ioctlsocket(
fd_, FIONBIO, &par);
341 ::fcntl(
fd, F_SETFL, O_RDWR | O_NONBLOCK);
358 bool completed =
false;
385 LOG(
VERBOSE,
"HubDeviceSelect::unregister write port %p %p",
458 this->
message()->data()->data(),
int fcntl(int fd, int cmd,...)
Manipulate a file descriptor.
#define STATE(_fn)
Turns a function name into an argument to be supplied to functions expecting a state.
See OSMutexLock in os/OS.hxx.
Lightweight locking class for protecting small critical sections.
void notify() override
Implementation of the barrier semantics.
BarrierNotifiable * reset(Notifiable *done)
Resets the barrier. Returns &*this. Asserts that is_done().
BarrierNotifiable * new_child()
Call this for each child task.
Base class for all QMember types that hold data in an expandable format.
T * data()
get a pointer to the start of the data.
A notifiable class that calls a particular function object once when it is invoked,...
static Notifiable * DefaultInstance()
void sync_run(std::function< void()> fn)
Synchronously runs a closure on this executor.
virtual void add(Executable *action, unsigned priority=UINT_MAX)=0
Send a message to this Executor's queue.
void unselect(Selectable *job)
Removes a job from the select loop.
int fd_
The device file descriptor.
Shared base class for thread-based and select-based hub devices.
virtual void report_read_error()=0
Callback from the readflow when it encounters an error.
BarrierNotifiable barrier_
This notifiable will be called (if not NULL) upon read or write error.
MessageType * alloc()
Synchronously allocates a message buffer from the pool of this flow.
Data type wrapper for sending data through a Hub.
State flow implementing select-aware fd reads.
FdHubPortService * device()
buffer_type * b_
Buffer that we are currently filling.
HFlow::port_type * skipMember_
What should be the source port designation.
bool shouldThrottle_
True if we are throttling via a limited pool.
void shutdown()
Unregisters the current flow from the hub.
HubDeviceSelectReadFlow(FdHubPortService *device, typename HFlow::port_type *dst, typename HFlow::port_type *skip_member)
Constructor.
void notify_barrier()
Calls into the parent flow's barrier notify, but makes sure to only do this once in the lifetime of *...
std::unique_ptr< LimitedPool > inputPool_
Throttling input helper.
HFlow::buffer_type buffer_type
Buffer type.
bool barrierOwned_
true iff pending parent->barrier_.notify()
HFlow::port_type * dst_
Where do we forward the messages we created.
Action read_done()
Called when the stateflow read call(s) are completed.
StateFlowSelectHelper selectHelper_
Helper object for read/write FD asynchronously.
Action allocate_buffer()
Allocates a new buffer for incoming data.
Action try_read()
Attempts to read into the current buffer from the target fd.
State flow implementing select-aware fd writes.
StateFlowBase::StateFlowSelectHelper selectHelper_
Helper class for asynchronous writes.
WriteFlow(HubDeviceSelect *dev)
Constructor.
StateFlowBase::Action entry() OVERRIDE
Entry into the StateFlow activity.
HubDeviceSelect * device()
StateFlowBase::Action write_done()
State flow call.
void shutdown()
Unregisters this object from the flows.
HubPort that connects a select-aware device to a strongly typed Hub.
HubDeviceSelect(HFlow *hub, int fd, Notifiable *on_error=nullptr)
Creates a select-aware hub port for the opened device specified by ‘fd’.
HubDeviceSelect(HFlow *hub, const char *path, Notifiable *on_error=nullptr)
Creates a select-aware hub port for the device specified by ‘path’.
ReadFlow readFlow_
StateFlow for reading data from the fd. Woken when data arrives.
void report_write_error() override
The assumption here is that the write flow still has entries in its queue that need to be removed.
HFlow::port_type * write_port()
void unregister_write_port()
Removes the current write port from the registry of the source hub.
StateFlow< typename HFlow::buffer_type, QList< 1 > > WriteFlowBase
Base stateflow for the WriteFlow.
bool isRegistered_
True when the write flow is registered in the hub.
WriteFlow writeFlow_
StateFlow for writing data to the fd.
virtual ~HubDeviceSelect()
If the barrier has not been called yet, will notify it inline.
HFlow * hub_
Hub whose data we are trying to send.
void report_read_error() override
Callback from the ReadFlow when the read call has seen an error.
Implementation of a Pool interface that takes memory from mainBufferPool (configurable) but limits th...
An object that can schedule itself on an executor to run.
ExecutorBase * executor()
Return type for a state flow callback.
Base class for state machines.
Action read_repeated(StateFlowSelectHelper *helper, int fd, void *buf, size_t size, Callback c, unsigned priority=Selectable::MAX_PRIO)
Blocks until size bytes are read and then invokes the next state.
Action read_single(StateFlowSelectHelper *helper, int fd, void *buf, size_t size, Callback c, unsigned priority=Selectable::MAX_PRIO)
Attempts to read at most size_t bytes, and blocks the caller until at least one byte is read.
Service * service()
Return a pointer to the service I am bound to.
Action allocate_and_call(FlowInterface< Buffer< T > > *target_flow, Callback c, Pool *pool=nullptr)
Allocates a buffer from a pool and proceed to the next state when allocation is successful.
Action exit()
Terminate current StateFlow activity.
void start_flow(Callback c)
Resets the flow to the specified state and starts it.
Buffer< T > * get_allocation_result(FlowInterface< Buffer< T > > *target_flow)
Takes the result of the asynchronous allocation.
Action call_immediately(Callback c)
Imediately call the next state upon return.
Action set_terminated()
Sets the flow to terminated state.
Action write_repeated(StateFlowSelectHelper *helper, int fd, const void *buf, size_t size, Callback c, unsigned priority=Selectable::MAX_PRIO)
Writes some data into a file descriptor, repeating the operation as necessary until all bytes are wri...
void notify() override
Wakeup call arrived. Schedules *this on the executor.
Action release_and_exit()
Terminates the processing of the current message.
State flow with a given typed input queue.
Container for an arbitrary structure to pass through a Hub.
void send(MessageType *msg, unsigned priority=UINT_MAX) OVERRIDE
Sends a message to the state flow for processing.
#define LOG(level, message...)
Conditionally write a message to the logging output.
static const int VERBOSE
Loglevel that is usually not printed, reporting debugging information.
#define OVERRIDE
Function attribute for virtual functions declaring that this funciton is overriding a funciton that s...
#define HASSERT(x)
Checks that the value of expression x is true, else terminates the current process.
static bool limit_input()
static void resize_target(buffer_type *b)
CAN buffers do not need to be resized.
Buffer< CanHubData > buffer_type
Helper type for declaring the payload buffer type.
static bool needs_read_fully()
static void check_target_size(buffer_type *b, int remaining)
a CAN buffer is only okay if the entire buffer was read in in one go.
static void resize_target(buffer_type *b)
struct buffers do not need to be resized.
static bool limit_input()
static bool needs_read_fully()
Buffer< HubContainer< StructContainer< T > > > buffer_type
Helper type for declaring the payload buffer type.
static void check_target_size(buffer_type *b, int remaining)
a struct buffer is only okay if the entire buffer was read in in one go.
static void resize_target(HubFlow::buffer_type *b)
Preps a buffer for receiving data.
static void check_target_size(HubFlow::buffer_type *b, int remaining)
Clears out all potential empty space left after a buffer has been partially filled.
static bool needs_read_fully()
static bool limit_input()
Generic template for the buffer traits.
Use this class to read from an fd using select() in a state flow.
unsigned remaining_
Number of bytes still outstanding to read.
unsigned hasError_
1 if there was an error reading of writing.