Open Model Railroad Network (OpenMRN)
Loading...
Searching...
No Matches
DirectHubPortSelect Class Reference

Connects a (bytes typed) hub to an FD. More...

Inheritance diagram for DirectHubPortSelect:
DirectHubPort< uint8_t[]> StateFlowBase HubSource

Classes

class  DirectHubReadFlow
 State flow that reads the FD and sends the read data to the direct hub. More...
 
struct  OutputDataEntry
 Holds the necessary information we need to keep in the queue about a single output entry. More...
 

Public Member Functions

 DirectHubPortSelect (DirectHubInterface< uint8_t[]> *hub, int fd, std::unique_ptr< MessageSegmenter > segmenter, Notifiable *on_error=nullptr)
 
void send (MessageAccessor< uint8_t[]> *msg) override
 Synchronous output routine called by the hub.
 
- Public Member Functions inherited from DirectHubPort< uint8_t[]>
virtual void send (MessageAccessor< uint8_t > *msg)=0
 Send some data out on this port.
 

Private Types

typedef Buffer< OutputDataEntryBufferType
 Type of buffers we are enqueuing for output.
 
typedef Q QueueType
 Type of the queue used to keep the output buffer queue.
 
- Private Types inherited from StateFlowBase
typedef Action(StateFlowBase::* Callback) ()
 State Flow callback prototype.
 

Private Member Functions

void shutdown ()
 Called on the main executor when a read error wants to cancel the write flow.
 
Action read_queue ()
 
Action do_write ()
 
Action write_done ()
 
Action check_for_new_message ()
 
Action report_and_exit ()
 Terminates the flow, reporting to the barrier.
 
void report_write_error ()
 Called by the write flow when it sees an error.
 
void report_read_error ()
 Callback from the ReadFlow when the read call has seen an error.
 
void read_flow_exit ()
 Callback from the read flow that it has exited.
 
void write_flow_exit ()
 Marks the write flow as exited. May delete this.
 
void flow_exit (bool read)
 Marks a flow to be exited, and once both are exited, notifies done and deletes this.
 
Atomiclock ()
 
- Private Member Functions inherited from StateFlowBase
void run () override
 Callback from the executor.
 
void notify () override
 Wakeup call arrived.
 
Serviceservice ()
 Return a pointer to the service I am bound to.
 
 StateFlowBase (Service *service)
 Constructor.
 
 ~StateFlowBase ()
 Destructor.
 
void reset_flow (Callback c)
 Resets the flow to the specified state.
 
bool is_state (Callback c)
 
bool is_terminated ()
 
void start_flow (Callback c)
 Resets the flow to the specified state and starts it.
 
Action again ()
 Call the current state again via call_immediately.
 
Action exit ()
 Terminate current StateFlow activity.
 
Action delete_this ()
 Terminates the flow and deletes *this.
 
Action set_terminated ()
 Sets the flow to terminated state.
 
Action call_immediately (Callback c)
 Imediately call the next state upon return.
 
Action wait ()
 Wait for an asynchronous call.
 
Action wait_and_call (Callback c)
 Wait for resource to become available before proceeding to next state.
 
template<class T >
Action allocate_and_call (FlowInterface< Buffer< T > > *target_flow, Callback c, Pool *pool=nullptr)
 Allocates a buffer from a pool and proceed to the next state when allocation is successful.
 
Action allocate_and_call (Callback c, QAsync *queue)
 Allocates an entry from an asynchronous queue, and transitions to a state once the allocation is complete.
 
template<class T >
Buffer< T > * full_allocation_result (FlowInterface< Buffer< T > > *target_flow)
 Takes the result of the asynchronous allocation without resetting the object.
 
template<class T >
T * full_allocation_result (TypedQAsync< T > *queue)
 Takes the result of the asynchronous allocation without resetting the object.
 
template<class T >
void cast_allocation_result (T **member)
 Takes the result of the asynchronous allocation without resetting the object.
 
template<class T >
Buffer< T > * get_allocation_result (FlowInterface< Buffer< T > > *target_flow)
 Takes the result of the asynchronous allocation.
 
Action yield_and_call (Callback c)
 Place the current flow to the back of the executor, and transition to a new state after we get the CPU again.
 
Action yield ()
 Place the current flow to the back of the executor, and re-try the current state after we get the CPU again.
 
Action sleep_and_call (::Timer *timer, long long timeout_nsec, Callback c)
 Suspends execution of this control flow for a specified time.
 
template<class T , typename... Args>
Action invoke_subflow_and_wait (FlowInterface< Buffer< T > > *target_flow, Callback c, Args &&... args)
 Calls a helper flow to perform some actions.
 
Action read_repeated (StateFlowSelectHelper *helper, int fd, void *buf, size_t size, Callback c, unsigned priority=Selectable::MAX_PRIO)
 Blocks until size bytes are read and then invokes the next state.
 
Action read_single (StateFlowSelectHelper *helper, int fd, void *buf, size_t size, Callback c, unsigned priority=Selectable::MAX_PRIO)
 Attempts to read at most size_t bytes, and blocks the caller until at least one byte is read.
 
Action read_nonblocking (StateFlowSelectHelper *helper, int fd, void *buf, size_t size, Callback c, unsigned priority=Selectable::MAX_PRIO)
 Attempts to read at most size bytes, and then invokes the next state, even if only zero bytes are available right now.
 
Action read_repeated_with_timeout (StateFlowTimedSelectHelper *helper, long long timeout_nsec, int fd, void *buf, size_t size, Callback c, unsigned priority=Selectable::MAX_PRIO)
 Blocks until size bytes are read, or a timeout expires.
 
Action internal_try_read ()
 Implementation state that gets repeatedly called upon every wakeup and tries to make progress on reading.
 
Action write_repeated (StateFlowSelectHelper *helper, int fd, const void *buf, size_t size, Callback c, unsigned priority=Selectable::MAX_PRIO)
 Writes some data into a file descriptor, repeating the operation as necessary until all bytes are written.
 
Action internal_try_write ()
 Implementation state that gets repeatedly called upon every wakeup and tries to make progress on writing.
 
- Private Member Functions inherited from Executable
void test_deletion ()
 
- Private Member Functions inherited from QMember
void init ()
 Initiailize a QMember, in place of a public placement construction.
 
 QMember ()
 Constructor.
 
 ~QMember ()
 Destructor.
 

Private Attributes

DirectHubPortSelect::DirectHubReadFlow readFlow_
 
size_t totalWritten_ {0}
 total number of bytes written to the port.
 
BufferPtr< OutputDataEntrycurrentHead_
 The buffer that is taken out of the queue while flushing.
 
DataBuffernextToWrite_
 Data we are currently writing to a buffer.
 
unsigned nextToSkip_
 Skip_ parameter matching nextToWrite_;.
 
unsigned nextToSize_
 Size_ parameter matching nextToWrite_;.
 
StateFlowSelectHelper selectHelper_ {this}
 Helper object for performing asynchronous writes.
 
QueueType pendingQueue_
 Time when the last buffer flush has happened. Not used yet.
 
OutputDataEntrypendingTail_ = nullptr
 Last tail pointer in the pendingQueue.
 
size_t totalPendingSize_ = 0
 Total numberof bytes in the pendingQueue.
 
uint8_t notRunning_: 1
 1 if the state flow is paused, waiting for the notification.
 
uint8_t readFlowPending_
 1 if the read flow is still running.
 
uint8_t writeFlowPending_
 1 if the write flow is still running.
 
DirectHubInterface< uint8_t[]> * hub_
 Parent hub where output data is coming from.
 
int fd_
 File descriptor for input/output.
 
NotifiableonError_ = nullptr
 This notifiable will be called before exiting.
 
- Private Attributes inherited from QMember
QMembernext
 pointer to the next member in the queue
 

Friends

class DirectHubReadFlow
 

Additional Inherited Members

- Static Private Member Functions inherited from StateFlowBase
template<class T , typename... Args>
static void invoke_subflow_and_ignore_result (FlowInterface< Buffer< T > > *target_flow, Args &&... args)
 Calls a helper flow to perform some actions.
 

Detailed Description

Connects a (bytes typed) hub to an FD.

This state flow is the write flow; i.e., it waits for messages coming from the hub and writes them into the fd. The object is self-owning, i.e. will delete itself when the input goes dead or when the port is shutdown (eventually).

Definition at line 262 of file DirectHub.cxx.

Member Typedef Documentation

◆ BufferType

Type of buffers we are enqueuing for output.

Definition at line 836 of file DirectHub.cxx.

◆ QueueType

Type of the queue used to keep the output buffer queue.

Definition at line 838 of file DirectHub.cxx.

Constructor & Destructor Documentation

◆ DirectHubPortSelect()

DirectHubPortSelect::DirectHubPortSelect ( DirectHubInterface< uint8_t[]> *  hub,
int  fd,
std::unique_ptr< MessageSegmenter segmenter,
Notifiable on_error = nullptr 
)
inline

Definition at line 530 of file DirectHub.cxx.

◆ ~DirectHubPortSelect()

DirectHubPortSelect::~DirectHubPortSelect ( )
inline

Definition at line 558 of file DirectHub.cxx.

Member Function Documentation

◆ check_for_new_message()

Action DirectHubPortSelect::check_for_new_message ( )
inlineprivate

Definition at line 696 of file DirectHub.cxx.

◆ do_write()

Action DirectHubPortSelect::do_write ( )
inlineprivate

Definition at line 653 of file DirectHub.cxx.

◆ flow_exit()

void DirectHubPortSelect::flow_exit ( bool  read)
inlineprivate

Marks a flow to be exited, and once both are exited, notifies done and deletes this.

Parameters
readif true, marks the read flow done, if false, marks the write flow done.

Definition at line 791 of file DirectHub.cxx.

◆ lock()

Atomic * DirectHubPortSelect::lock ( )
inlineprivate
Returns
lock usable for the write flow and the port altogether.

Definition at line 820 of file DirectHub.cxx.

◆ read_flow_exit()

void DirectHubPortSelect::read_flow_exit ( )
inlineprivate

Callback from the read flow that it has exited.

This is triggered after the shutdown() call. May delete this.

Definition at line 774 of file DirectHub.cxx.

◆ read_queue()

Action DirectHubPortSelect::read_queue ( )
inlineprivate

Definition at line 634 of file DirectHub.cxx.

◆ report_and_exit()

Action DirectHubPortSelect::report_and_exit ( )
inlineprivate

Terminates the flow, reporting to the barrier.

Definition at line 718 of file DirectHub.cxx.

◆ report_read_error()

void DirectHubPortSelect::report_read_error ( )
inlineprivate

Callback from the ReadFlow when the read call has seen an error.

The read flow is assumed to be exited. Takes the read entry out of the barrier, notifies the write flow to stop and possibly deletes *this. Called on the main executor.

Definition at line 752 of file DirectHub.cxx.

◆ report_write_error()

void DirectHubPortSelect::report_write_error ( )
inlineprivate

Called by the write flow when it sees an error.

Called on the main executor. The assumption here is that the write flow still has entries in its queue that need to be removed. Closes the socket, and notifies the read flow to exit. Does not typically delete this, because the write flow needs to exit separately.

Definition at line 731 of file DirectHub.cxx.

◆ send()

void DirectHubPortSelect::send ( MessageAccessor< uint8_t[]> *  msg)
inlineoverride

Synchronous output routine called by the hub.

Todo:
we should try to collect the bytes into a buffer first before enqueueing them.

Definition at line 563 of file DirectHub.cxx.

◆ shutdown()

void DirectHubPortSelect::shutdown ( )
inlineprivate

Called on the main executor when a read error wants to cancel the write flow.

Before calling, fd_ must be -1.

Definition at line 617 of file DirectHub.cxx.

◆ write_done()

Action DirectHubPortSelect::write_done ( )
inlineprivate

Definition at line 677 of file DirectHub.cxx.

◆ write_flow_exit()

void DirectHubPortSelect::write_flow_exit ( )
inlineprivate

Marks the write flow as exited. May delete this.

Definition at line 781 of file DirectHub.cxx.

Friends And Related Symbol Documentation

◆ DirectHubReadFlow

Definition at line 527 of file DirectHub.cxx.

Member Data Documentation

◆ currentHead_

BufferPtr<OutputDataEntry> DirectHubPortSelect::currentHead_
private

The buffer that is taken out of the queue while flushing.

Definition at line 844 of file DirectHub.cxx.

◆ fd_

int DirectHubPortSelect::fd_
private

File descriptor for input/output.

Definition at line 872 of file DirectHub.cxx.

◆ hub_

DirectHubInterface<uint8_t[]>* DirectHubPortSelect::hub_
private

Parent hub where output data is coming from.

Definition at line 870 of file DirectHub.cxx.

◆ nextToSize_

unsigned DirectHubPortSelect::nextToSize_
private

Size_ parameter matching nextToWrite_;.

Definition at line 850 of file DirectHub.cxx.

◆ nextToSkip_

unsigned DirectHubPortSelect::nextToSkip_
private

Skip_ parameter matching nextToWrite_;.

Definition at line 848 of file DirectHub.cxx.

◆ nextToWrite_

DataBuffer* DirectHubPortSelect::nextToWrite_
private

Data we are currently writing to a buffer.

Definition at line 846 of file DirectHub.cxx.

◆ notRunning_

uint8_t DirectHubPortSelect::notRunning_
private

1 if the state flow is paused, waiting for the notification.

Definition at line 864 of file DirectHub.cxx.

◆ onError_

Notifiable* DirectHubPortSelect::onError_ = nullptr
private

This notifiable will be called before exiting.

Definition at line 874 of file DirectHub.cxx.

◆ pendingQueue_

QueueType DirectHubPortSelect::pendingQueue_
private

Time when the last buffer flush has happened. Not used yet.

Contains buffers of OutputDataEntries to write.

Definition at line 857 of file DirectHub.cxx.

◆ pendingTail_

OutputDataEntry* DirectHubPortSelect::pendingTail_ = nullptr
private

Last tail pointer in the pendingQueue.

If queue is empty, nullptr. Protected by pendingQueue_.lock().

Definition at line 860 of file DirectHub.cxx.

◆ readFlowPending_

uint8_t DirectHubPortSelect::readFlowPending_
private

1 if the read flow is still running.

Definition at line 866 of file DirectHub.cxx.

◆ selectHelper_

StateFlowSelectHelper DirectHubPortSelect::selectHelper_ {this}
private

Helper object for performing asynchronous writes.

Definition at line 852 of file DirectHub.cxx.

◆ totalPendingSize_

size_t DirectHubPortSelect::totalPendingSize_ = 0
private

Total numberof bytes in the pendingQueue.

Definition at line 862 of file DirectHub.cxx.

◆ totalWritten_

size_t DirectHubPortSelect::totalWritten_ {0}
private

total number of bytes written to the port.

Definition at line 841 of file DirectHub.cxx.

◆ writeFlowPending_

uint8_t DirectHubPortSelect::writeFlowPending_
private

1 if the write flow is still running.

Definition at line 868 of file DirectHub.cxx.


The documentation for this class was generated from the following file: