Open Model Railroad Network (OpenMRN)
Loading...
Searching...
No Matches
openlcb::StreamReceiverCan Class Reference
Inheritance diagram for openlcb::StreamReceiverCan:
openlcb::StreamReceiverInterface CallableFlow< StreamReceiveRequest > StateFlow< MessageType, QueueType > TypedStateFlow< MessageType, UntypedStateFlow< QueueType > > UntypedStateFlow< QueueType > FlowInterface< MessageType > StateFlowWithQueue StateFlowBase Atomic LinkedObject< StateFlowWithQueue > Executable Notifiable QMember Destructable

Classes

class  StreamDataHandler
 

Public Member Functions

 StreamReceiverCan (IfCan *interface, uint8_t local_stream_id)
 Constructor.
 
void send (Buffer< StreamReceiveRequest > *msg, unsigned prio=0) override
 Implements the flow interface for the request API.
 
void cancel_request () override
 Cancels the currently pending stream receive request.
 
- Public Member Functions inherited from openlcb::StreamReceiverInterface
 StreamReceiverInterface (Service *s)
 
- Public Member Functions inherited from CallableFlow< StreamReceiveRequest >
 CallableFlow (Service *s)
 Creates a callable flow.
 
- Public Member Functions inherited from StateFlow< MessageType, QueueType >
 StateFlow (Service *service)
 Constructor.
 
- Public Member Functions inherited from TypedStateFlow< MessageType, UntypedStateFlow< QueueType > >
 TypedStateFlow (Service *service)
 Constructor.
 
virtual ~TypedStateFlow ()
 Destructor.
 
void send (MessageType *msg, unsigned priority=UINT_MAX) OVERRIDE
 Sends a message to the state flow for processing.
 
- Public Member Functions inherited from UntypedStateFlow< QueueType >
 UntypedStateFlow (Service *service)
 Constructor.
 
- Public Member Functions inherited from StateFlowWithQueue
void notify () override
 Wakeup call arrived. Schedules *this on the executor.
 
bool is_waiting ()
 
- Public Member Functions inherited from StateFlowBase
void run () override
 Callback from the executor.
 
Serviceservice ()
 Return a pointer to the service I am bound to.
 
- Public Member Functions inherited from Executable
void test_deletion ()
 
- Public Member Functions inherited from QMember
void init ()
 Initiailize a QMember, in place of a public placement construction.
 
- Public Member Functions inherited from LinkedObject< StateFlowWithQueue >
StateFlowWithQueuelink_next ()
 
- Public Member Functions inherited from FlowInterface< MessageType >
virtual Poolpool ()
 
virtual MessageType * type_helper ()
 This function is never user in the code, but GDB can use it to infer the correct message types.
 
MessageType * alloc ()
 Synchronously allocates a message buffer from the pool of this flow.
 
void alloc_async (Executable *target)
 Asynchronously allocates a message buffer from the pool of this flow.
 

Private Member Functions

void announced_stream ()
 Helper function for send() when a stream has to start synchronously.
 
Action entry () override
 This state is not used, but it's virtual abstract.
 
Action wait_for_wakeup ()
 
Action wakeup ()
 Root of the flow when something happens in the handlers.
 
Action init_reply ()
 Invoked when we get the stream initiate request.
 
Action init_buffer_ready ()
 
Action window_reached ()
 Invoked when the stream window runs out.
 
Action have_raw_buffer ()
 Called when the allocation of the raw buffer is successful.
 
void handle_stream_initiate (Buffer< GenMessage > *message)
 Invoked by the GenericHandler when a stream initiate message arrives.
 
void handle_bytes_received (const uint8_t *data, size_t len)
 Handles data arriving from the network.
 
void handle_stream_complete (Buffer< GenMessage > *message)
 Invoked by the GenericHandler when a stream complete message arrives.
 
void unregister_handlers ()
 Removes all handlers that are registered.
 
IfCanif_can ()
 
Nodenode ()
 

Private Attributes

MessageHandler::GenericHandler streamInitiateHandler_
 Helper class for incoming message for stream initiate.
 
MessageHandler::GenericHandler streamCompleteHandler_
 Helper class for incoming message for stream complete.
 
LimitedPool lastBufferPool_ {sizeof(RawBuffer), 2, rawBufferPool}
 This pool is used to allocate one raw buffer per stream window size.
 
ByteBufferPtr currentBuffer_
 The buffer that we are currently filling with incoming data.
 
RawBufferPtr lastBuffer_
 The buffer that will be the last one in this stream window.
 
std::unique_ptr< StreamDataHandlerdataHandler_
 Helper object that receives the actual stream CAN frames.
 
size_t totalByteCount_
 How many bytes we have transmitted in this stream so far.
 
uint16_t streamWindowRemaining_
 Remaining stream window size.
 
const uint8_t assignedStreamId_
 Unique stream ID at the destination (local) node, assigned at construction time.
 
uint8_t streamClosed_: 1
 1 if we received the stream complete message.
 
uint8_t pendingInit_: 1
 1 if we received the stream init request message.
 
uint8_t pendingCancel_: 1
 1 if we received a cancel request
 
uint8_t isWaiting_: 1
 1 if we are currently waiting for a notification
 

Friends

class StreamDataHandler
 

Additional Inherited Members

- Public Types inherited from TypedStateFlow< MessageType, UntypedStateFlow< QueueType > >
typedef Base::Action Action
 Allows using Action without having StateFlowBase:: prefix in front of it.
 
- Public Types inherited from FlowInterface< MessageType >
typedef MessageType message_type
 Stores the message template type for external reference.
 
- Static Public Member Functions inherited from StateFlowBase
template<class T , typename... Args>
static void invoke_subflow_and_ignore_result (FlowInterface< Buffer< T > > *target_flow, Args &&... args)
 Calls a helper flow to perform some actions.
 
- Static Public Member Functions inherited from LinkedObject< StateFlowWithQueue >
static StateFlowWithQueuelink_head ()
 
static Atomichead_mu ()
 Locks the list for modification (at any entry!).
 
- Static Public Member Functions inherited from FlowInterface< MessageType >
static MessageType * cast_alloc (QMember *entry)
 Down casts and initializes an asynchronous allocation result to the appropriate flow's buffer type.
 
- Protected Types inherited from CallableFlow< StreamReceiveRequest >
using Action = StateFlowBase::Action
 
- Protected Types inherited from UntypedStateFlow< QueueType >
typedef Action(StateFlowBase::* Callback) ()
 State Flow callback prototype.
 
- Protected Types inherited from StateFlowBase
typedef Action(StateFlowBase::* Callback) ()
 State Flow callback prototype.
 
- Protected Member Functions inherited from CallableFlow< StreamReceiveRequest >
StreamReceiveRequest * request ()
 
bool has_request ()
 
Action return_ok ()
 Terminates the flow and returns the request buffer to the caller with an error code of OK (zero).
 
Action wait_and_return_ok ()
 Waits to be notified before moving onto the next state for termination.
 
Action wait_done ()
 Terminates the flow and returns the request buffer to the caller with an error code of OK (zero).
 
Action return_with_error (int error)
 Terminates the flow and returns the request buffer to the caller with an specific error code.
 
- Protected Member Functions inherited from TypedStateFlow< MessageType, UntypedStateFlow< QueueType > >
void release () OVERRIDE
 Unrefs the current buffer.
 
void return_buffer ()
 For state flows that are operated using invoke_subflow_and_wait this is a way to hand back the buffer to the caller.
 
MessageType * message ()
 
MessageType * transfer_message ()
 Releases ownership of the current message.
 
- Protected Member Functions inherited from UntypedStateFlow< QueueType >
void send (BufferBase *msg, unsigned priority=UINT_MAX)
 Sends a message to the state flow for processing.
 
QMemberqueue_next (unsigned *priority) OVERRIDE
 Takes the front entry in the queue.
 
bool queue_empty () OVERRIDE
 
Action call_immediately (Callback c)
 Imediately call the next state upon return.
 
- Protected Member Functions inherited from StateFlowWithQueue
 StateFlowWithQueue (Service *service)
 Constructor.
 
Action exit ()
 Terminates the processing of this flow.
 
Action release_and_exit ()
 Terminates the processing of the current message.
 
BufferBasemessage ()
 
BufferBasetransfer_message ()
 Releases ownership of the current message.
 
void reset_message (BufferBase *message, unsigned priority)
 Sets the current message being processed.
 
unsigned priority ()
 
void set_priority (unsigned priority)
 Overrides the current priority.
 
void start_flow_at_init (Callback c)
 Call this from the constructor of the child class to do some work before the main queue processing loop begins.
 
- Protected Member Functions inherited from StateFlowBase
 StateFlowBase (Service *service)
 Constructor.
 
 ~StateFlowBase ()
 Destructor.
 
void reset_flow (Callback c)
 Resets the flow to the specified state.
 
bool is_state (Callback c)
 
bool is_terminated ()
 
void start_flow (Callback c)
 Resets the flow to the specified state and starts it.
 
Action again ()
 Call the current state again via call_immediately.
 
Action exit ()
 Terminate current StateFlow activity.
 
Action delete_this ()
 Terminates the flow and deletes *this.
 
Action set_terminated ()
 Sets the flow to terminated state.
 
Action call_immediately (Callback c)
 Imediately call the next state upon return.
 
Action wait ()
 Wait for an asynchronous call.
 
Action wait_and_call (Callback c)
 Wait for resource to become available before proceeding to next state.
 
template<class T >
Action allocate_and_call (FlowInterface< Buffer< T > > *target_flow, Callback c, Pool *pool=nullptr)
 Allocates a buffer from a pool and proceed to the next state when allocation is successful.
 
Action allocate_and_call (Callback c, QAsync *queue)
 Allocates an entry from an asynchronous queue, and transitions to a state once the allocation is complete.
 
template<class T >
Buffer< T > * full_allocation_result (FlowInterface< Buffer< T > > *target_flow)
 Takes the result of the asynchronous allocation without resetting the object.
 
template<class T >
T * full_allocation_result (TypedQAsync< T > *queue)
 Takes the result of the asynchronous allocation without resetting the object.
 
template<class T >
void cast_allocation_result (T **member)
 Takes the result of the asynchronous allocation without resetting the object.
 
template<class T >
Buffer< T > * get_allocation_result (FlowInterface< Buffer< T > > *target_flow)
 Takes the result of the asynchronous allocation.
 
Action yield_and_call (Callback c)
 Place the current flow to the back of the executor, and transition to a new state after we get the CPU again.
 
Action yield ()
 Place the current flow to the back of the executor, and re-try the current state after we get the CPU again.
 
Action sleep_and_call (::Timer *timer, long long timeout_nsec, Callback c)
 Suspends execution of this control flow for a specified time.
 
template<class T , typename... Args>
Action invoke_subflow_and_wait (FlowInterface< Buffer< T > > *target_flow, Callback c, Args &&... args)
 Calls a helper flow to perform some actions.
 
Action read_repeated (StateFlowSelectHelper *helper, int fd, void *buf, size_t size, Callback c, unsigned priority=Selectable::MAX_PRIO)
 Blocks until size bytes are read and then invokes the next state.
 
Action read_single (StateFlowSelectHelper *helper, int fd, void *buf, size_t size, Callback c, unsigned priority=Selectable::MAX_PRIO)
 Attempts to read at most size_t bytes, and blocks the caller until at least one byte is read.
 
Action read_nonblocking (StateFlowSelectHelper *helper, int fd, void *buf, size_t size, Callback c, unsigned priority=Selectable::MAX_PRIO)
 Attempts to read at most size bytes, and then invokes the next state, even if only zero bytes are available right now.
 
Action read_repeated_with_timeout (StateFlowTimedSelectHelper *helper, long long timeout_nsec, int fd, void *buf, size_t size, Callback c, unsigned priority=Selectable::MAX_PRIO)
 Blocks until size bytes are read, or a timeout expires.
 
Action internal_try_read ()
 Implementation state that gets repeatedly called upon every wakeup and tries to make progress on reading.
 
Action write_repeated (StateFlowSelectHelper *helper, int fd, const void *buf, size_t size, Callback c, unsigned priority=Selectable::MAX_PRIO)
 Writes some data into a file descriptor, repeating the operation as necessary until all bytes are written.
 
Action internal_try_write ()
 Implementation state that gets repeatedly called upon every wakeup and tries to make progress on writing.
 
- Protected Member Functions inherited from QMember
 QMember ()
 Constructor.
 
 ~QMember ()
 Destructor.
 
- Protected Member Functions inherited from Atomic
void lock ()
 
void unlock ()
 
- Protected Member Functions inherited from LinkedObject< StateFlowWithQueue >
StateFlowWithQueuelink_this ()
 
 LinkedObject ()
 Constructor. Puts *this on the linked list.
 
 ~LinkedObject ()
 Constructor. Removes *this from the linked list.
 
StateFlowWithQueuelink_next ()
 
- Static Protected Member Functions inherited from LinkedObject< StateFlowWithQueue >
static StateFlowWithQueuelink_head ()
 
static Atomichead_mu ()
 Locks the list for modification (at any entry!).
 
- Protected Attributes inherited from QMember
QMembernext
 pointer to the next member in the queue
 
- Protected Attributes inherited from LinkedObject< StateFlowWithQueue >
StateFlowWithQueuelink_
 Linked list pointer.
 
- Static Protected Attributes inherited from LinkedObject< StateFlowWithQueue >
static StateFlowWithQueuehead_
 Beginning of the list.
 

Detailed Description

Definition at line 49 of file StreamReceiver.hxx.

Constructor & Destructor Documentation

◆ StreamReceiverCan()

openlcb::StreamReceiverCan::StreamReceiverCan ( IfCan interface,
uint8_t  local_stream_id 
)

Constructor.

Parameters
interfacethe CAN interface that owns this stream receiver.
local_stream_idwhat should be the local stream ID for the streams used for this receiver.

Definition at line 302 of file StreamReceiver.cxx.

◆ ~StreamReceiverCan()

openlcb::StreamReceiverCan::~StreamReceiverCan ( )

Definition at line 312 of file StreamReceiver.cxx.

Member Function Documentation

◆ announced_stream()

void openlcb::StreamReceiverCan::announced_stream ( )
private

Helper function for send() when a stream has to start synchronously.

Definition at line 48 of file StreamReceiver.cxx.

◆ cancel_request()

void openlcb::StreamReceiverCan::cancel_request ( )
overridevirtual

Cancels the currently pending stream receive request.

The message will then be asynchronously returned using the regular mechanism with a temporary error.

Implements openlcb::StreamReceiverInterface.

Definition at line 315 of file StreamReceiver.cxx.

◆ entry()

Action openlcb::StreamReceiverCan::entry ( )
inlineoverrideprivatevirtual

This state is not used, but it's virtual abstract.

Implements TypedStateFlow< MessageType, UntypedStateFlow< QueueType > >.

Definition at line 76 of file StreamReceiver.hxx.

◆ handle_bytes_received()

void openlcb::StreamReceiverCan::handle_bytes_received ( const uint8_t *  data,
size_t  len 
)
inlineprivate

Handles data arriving from the network.

Definition at line 150 of file StreamReceiver.cxx.

◆ handle_stream_complete()

void openlcb::StreamReceiverCan::handle_stream_complete ( Buffer< GenMessage > *  message)
private

Invoked by the GenericHandler when a stream complete message arrives.

Parameters
messagebuffer with stream complete message.

Definition at line 198 of file StreamReceiver.cxx.

◆ handle_stream_initiate()

void openlcb::StreamReceiverCan::handle_stream_initiate ( Buffer< GenMessage > *  message)
private

Invoked by the GenericHandler when a stream initiate message arrives.

Parameters
messagebuffer with stream initiate message.

Definition at line 87 of file StreamReceiver.cxx.

◆ have_raw_buffer()

StateFlowBase::Action openlcb::StreamReceiverCan::have_raw_buffer ( )
private

Called when the allocation of the raw buffer is successful.

Sends off the stream proceed message.

Definition at line 401 of file StreamReceiver.cxx.

◆ if_can()

IfCan * openlcb::StreamReceiverCan::if_can ( )
inlineprivate
Returns
the local CAN interface.

Definition at line 125 of file StreamReceiver.hxx.

◆ init_buffer_ready()

StateFlowBase::Action openlcb::StreamReceiverCan::init_buffer_ready ( )
private

Definition at line 379 of file StreamReceiver.cxx.

◆ init_reply()

StateFlowBase::Action openlcb::StreamReceiverCan::init_reply ( )
private

Invoked when we get the stream initiate request.

Initializes receive buffers and sends stream init response.

Definition at line 372 of file StreamReceiver.cxx.

◆ node()

Node * openlcb::StreamReceiverCan::node ( )
inlineprivate
Returns
the local node pointer.

Definition at line 131 of file StreamReceiver.hxx.

◆ send()

void openlcb::StreamReceiverCan::send ( Buffer< StreamReceiveRequest > *  msg,
unsigned  prio = 0 
)
override

Implements the flow interface for the request API.

This is not based on entry() because the registration has to be synchrnous with the calling of send().

Definition at line 66 of file StreamReceiver.cxx.

◆ unregister_handlers()

void openlcb::StreamReceiverCan::unregister_handlers ( )
private

Removes all handlers that are registered.

Definition at line 325 of file StreamReceiver.cxx.

◆ wait_for_wakeup()

Action openlcb::StreamReceiverCan::wait_for_wakeup ( )
inlineprivate

Definition at line 81 of file StreamReceiver.hxx.

◆ wakeup()

StateFlowBase::Action openlcb::StreamReceiverCan::wakeup ( )
private

Root of the flow when something happens in the handlers.

Definition at line 334 of file StreamReceiver.cxx.

◆ window_reached()

StateFlowBase::Action openlcb::StreamReceiverCan::window_reached ( )
private

Invoked when the stream window runs out.

Maybe waits for the data to be consumed below the low-watermark.

Definition at line 395 of file StreamReceiver.cxx.

Friends And Related Symbol Documentation

◆ StreamDataHandler

friend class StreamDataHandler
friend

Definition at line 141 of file StreamReceiver.hxx.

Member Data Documentation

◆ assignedStreamId_

const uint8_t openlcb::StreamReceiverCan::assignedStreamId_
private

Unique stream ID at the destination (local) node, assigned at construction time.

Definition at line 171 of file StreamReceiver.hxx.

◆ currentBuffer_

ByteBufferPtr openlcb::StreamReceiverCan::currentBuffer_
private

The buffer that we are currently filling with incoming data.

Definition at line 154 of file StreamReceiver.hxx.

◆ dataHandler_

std::unique_ptr<StreamDataHandler> openlcb::StreamReceiverCan::dataHandler_
private

Helper object that receives the actual stream CAN frames.

Definition at line 161 of file StreamReceiver.hxx.

◆ isWaiting_

uint8_t openlcb::StreamReceiverCan::isWaiting_
private

1 if we are currently waiting for a notification

Definition at line 180 of file StreamReceiver.hxx.

◆ lastBuffer_

RawBufferPtr openlcb::StreamReceiverCan::lastBuffer_
private

The buffer that will be the last one in this stream window.

This buffer comes from the lastBufferPool_ to function as throttling signal.

Definition at line 158 of file StreamReceiver.hxx.

◆ lastBufferPool_

LimitedPool openlcb::StreamReceiverCan::lastBufferPool_ {sizeof(RawBuffer), 2, rawBufferPool}
private

This pool is used to allocate one raw buffer per stream window size.

This pool therefore functions as a throttling for the data producer. We have a fixed size of 2, meaning that we are allowing ourselves to load 2x the stream window size into our RAM.

Definition at line 151 of file StreamReceiver.hxx.

◆ pendingCancel_

uint8_t openlcb::StreamReceiverCan::pendingCancel_
private

1 if we received a cancel request

Definition at line 178 of file StreamReceiver.hxx.

◆ pendingInit_

uint8_t openlcb::StreamReceiverCan::pendingInit_
private

1 if we received the stream init request message.

Definition at line 176 of file StreamReceiver.hxx.

◆ streamClosed_

uint8_t openlcb::StreamReceiverCan::streamClosed_
private

1 if we received the stream complete message.

Definition at line 174 of file StreamReceiver.hxx.

◆ streamCompleteHandler_

MessageHandler::GenericHandler openlcb::StreamReceiverCan::streamCompleteHandler_
private
Initial value:
{
void handle_stream_complete(Buffer< GenMessage > *message)
Invoked by the GenericHandler when a stream complete message arrives.

Helper class for incoming message for stream complete.

Definition at line 144 of file StreamReceiver.hxx.

◆ streamInitiateHandler_

MessageHandler::GenericHandler openlcb::StreamReceiverCan::streamInitiateHandler_
private
Initial value:
{
void handle_stream_initiate(Buffer< GenMessage > *message)
Invoked by the GenericHandler when a stream initiate message arrives.

Helper class for incoming message for stream initiate.

Definition at line 137 of file StreamReceiver.hxx.

◆ streamWindowRemaining_

uint16_t openlcb::StreamReceiverCan::streamWindowRemaining_
private

Remaining stream window size.

Definition at line 167 of file StreamReceiver.hxx.

◆ totalByteCount_

size_t openlcb::StreamReceiverCan::totalByteCount_
private

How many bytes we have transmitted in this stream so far.

Definition at line 164 of file StreamReceiver.hxx.


The documentation for this class was generated from the following files: