36#ifndef _OPENLCB_STREAMSENDER_HXX_
37#define _OPENLCB_STREAMSENDER_HXX_
106 uint8_t source_stream_id,
225 return release_and_exit();
242 return release_and_exit();
268 auto *b = get_allocation_result(
292 string_to_hex(
message->data()->payload).c_str());
297 LOG(
INFO,
"stream reply not for me");
316 if (
message->data()->src.alias)
342 "Stream initiate request was denied (permanent error).");
346 return return_error(Defs::ERROR_TEMPORARY |
348 "Stream initiate request was denied (temporary error).");
354 "Inconsistency: zero buffer length but "
355 "accepted stream request.");
377 auto *b = get_allocation_result(
391 return allocate_and_call(
406 auto *frame = b->data()->mutable_frame();
407 SET_CAN_FRAME_ID_EFF(*frame, can_id);
411 frame->can_dlc = len + 1;
413 memcpy(&frame->data[1],
payload(), len);
476 return return_error(Defs::ERROR_TEMPORARY,
477 "Timed out waiting for stream proceed message.");
505 return message()->data()->size_;
511 return message()->data()->data_;
519 message()->data()->advance(num_bytes);
529 return release_and_exit();
BufferPtr< T > get_buffer_deleter(Buffer< T > *b)
Helper function to create a BufferPtr of an appropriate type without having to explicitly specify the...
#define STATE(_fn)
Turns a function name into an argument to be supplied to functions expecting a state.
Base class for all QMember types that hold data in an expandable format.
OutgoingFrameHandler * loopback_frame_write_flow()
OutgoingFrameHandler * frame_write_flow()
void register_handler(HandlerType *handler, ID id, ID mask)
Adds a new handler to this dispatcher.
void unregister_handler(HandlerType *handler, ID id, ID mask)
Removes a specific instance of a handler from this dispatcher.
virtual void send(MessageType *message, unsigned priority=UINT_MAX)=0
Entry point to the flow.
Buffer< CanHubData > message_type
Stores the message template type for external reference.
MessageType * alloc()
Synchronously allocates a message buffer from the pool of this flow.
Implementation of a Pool interface that takes memory from mainBufferPool (configurable) but limits th...
Collection of related state machines that pend on incoming messages.
State flow with a given typed input queue.
Base::Action Action
Allows using Action without having StateFlowBase:: prefix in front of it.
void send(MessageType *msg, unsigned priority=UINT_MAX) OVERRIDE
Sends a message to the state flow for processing.
NodeAlias lookup(NodeID id)
Lookup a node's alias based on its Node ID.
Implementation of the OpenLCB interface abstraction for the CAN-bus interface standard.
AliasCache * local_aliases()
virtual void canonicalize_handle(NodeHandle *h)
Canonicalizes the node handle: fills in id and/or alias from the maps the interface holds internally.
MessageDispatchFlow * dispatcher()
virtual Node * lookup_local_node_handle(NodeHandle handle)
Looks up a node ID in the local nodes' registry.
MessageHandler * addressed_message_write_flow()
virtual bool matching_node(NodeHandle expected, NodeHandle actual)=0
Base class for NMRAnet nodes conforming to the asynchronous interface.
Helper class for sending stream data to a CAN interface.
static constexpr size_t MAX_BYTES_PAYLOAD_PER_CAN_FRAME
How many bytes payload we can copy into a single CAN frame.
LimitedPool canFramePool_
Source of buffers for outgoing CAN frames.
void clear()
Sets the stream sender to be available for reuse after a stream has been closed or reached error.
static constexpr size_t STREAM_INIT_TIMEOUT_SEC
How many seconds for waiting for a stream init before we give up with a timeout.
Action wait_for_stream_proceed()
Starts sleeping until a proceed message arrives.
void trigger()
Sends an empty message to *this, thereby waking up the state machine.
StreamSenderCan & set_stream_uid(NodeID stream_uid)
Specifies the Stream UID to send in the stream initiate request.
Node * node_
Which node are we sending the outgoing data from.
uint8_t requestClose_
1 if there is a pending close request.
IfCan * ifCan_
CAN-bus interface.
static constexpr size_t CAN_FRAME_ALLOC_SIZE
How many bytes the allocation of a single CAN frame should be.
Action send_close_stream()
Sends the stream close message.
void advance(size_t num_bytes)
Consumes a certain number of bytes from the beginning of the data to send.
uint8_t sleeping_
True if we are waiting for the timer.
uint8_t isLoopbackStream_
Determines whether the stream transmission is happening to localhost.
NodeHandle dst_
Destination node that we are sending to.
uint16_t streamWindowSize_
Total stream window size.
void stream_proceed_received(Buffer< GenMessage > *message)
Callback from the handler flow.
uint8_t streamAdditionalFlags_
More flags from the remote node that we got in stream initiate reply.
Action entry() override
Start of state machine, called when a buffer of data to send arrives from the application layer.
static constexpr size_t MAX_FRAMES_IN_FLIGHT
How many CAN frames should we allocate at a given time.
void close_stream(uint16_t error_code=0)
Closes the stream when all the bytes are transferred.
Action received_init_stream()
State executed after wakeup from the stream initiate reply received handler.
uint32_t errorCode_
When the stream process fails, this variable contains an error code.
size_t totalByteCount_
How many bytes we have transmitted in this stream so far.
Action do_close_stream()
Allocates a GenMessage buffer and sends out the stream close message to the destination.
static constexpr size_t STREAM_PROCEED_TIMEOUT_SEC
How many seconds for waiting for a stream proceed before we give up with a timeout.
StateFlowTimer timer_
Helper object for timeouts.
StreamSenderState get_state()
StreamSenderCan & start_stream(Node *src, NodeHandle dst, uint8_t source_stream_id, uint8_t dst_stream_id=StreamDefs::INVALID_STREAM_ID)
Initiates using the stream sender.
size_t compute_next_can_length()
void stream_initiate_replied(Buffer< GenMessage > *message)
Callback from GenHandler when a stream initiate reply message arrives at the local interface.
uint8_t dstStreamId_
Stream ID at the destination node.
uint8_t get_src_stream_id()
uint16_t streamWindowRemaining_
Remaining stream window size.
StreamSenderState state_
What state the current class is in.
uint8_t streamFlags_
Flags from the remote node that we got in stream initiate reply.
Action allocate_can_buffer()
Allocates a buffer for a CAN frame (for payload send).
Action initiate_stream()
Allocates a GenMessage buffer and sends out the stream initiate message to the destination.
uint8_t get_dst_stream_id()
Action got_frame()
Got a buffer for an output frame (payload send).
MessageHandler::GenericHandler streamProceedHandler_
Handles incoming stream proceed messages.
MessageHandler::GenericHandler streamInitiateReplyHandler_
Handles incoming stream initiate reply messages.
uint8_t requestInit_
1 if there is a pending initialize request.
StreamSenderCan & set_proposed_window_size(uint16_t window_size)
Specifies what the source should propose as window size to the destination.
Action stream_proceed_timeout()
Action send_init_stream()
Sends the stream initiate message.
uint8_t localStreamId_
Stream ID at the source node.
StreamSenderState
Describes the different states in the stream sender.
@ STATE_ERROR
An error occurred.
@ STARTED
The local client has started using the stream sender (via API).
@ RUNNING
Stream is open and data can be transferred.
@ IDLE
This stream sender is not in use now.
@ INITIATING
The stream initiate message was sent.
@ FULL
Stream buffer is full, waiting for proceed message.
@ CLOSING
Stream close message was sent.
#define LOG(level, message...)
Conditionally write a message to the logging output.
static const int VERBOSE
Loglevel that is usually not printed, reporting debugging information.
static const int INFO
Loglevel that is printed by default, reporting some status information.
#define HASSERT(x)
Checks that the value of expression x is true, else terminates the current process.
#define DASSERT(x)
Debug assertion facility.
uint64_t NodeID
48-bit NMRAnet Node ID type
uint16_t NodeAlias
Alias to a 48-bit NMRAnet Node ID type.
#define SEC_TO_NSEC(_sec)
Convert a second value to a nanosecond value.
static void set_datagram_fields(uint32_t *can_id, NodeAlias src, NodeAlias dst, CanFrameType can_type)
Set all the CAN ID fields for datagram or stream message.
@ STREAM_DATA
stream data frame
@ PERMANENT_ERROR
Permanent error occurred.
@ MTI_STREAM_PROCEED
stream flow control
@ MTI_EXACT
match mask for a single MTI
@ MTI_STREAM_INITIATE_REPLY
Stream initiate reply.
@ MTI_STREAM_INITIATE_REQUEST
Stream initiate request.
@ MTI_STREAM_COMPLETE
stream terminate connection
Container of both a NodeID and NodeAlias.
NodeAlias alias
alias to NMRAnet Node ID
static constexpr uint8_t INVALID_STREAM_ID
This value is invalid as a source or destination stream ID.
static Payload create_close_request(uint8_t src_stream_id, uint8_t dst_stream_id, uint32_t total_bytes=INVALID_TOTAL_BYTE_COUNT)
Creates the payload for a stream close message.
static Payload create_initiate_request(uint16_t max_buffer_size, bool has_ident, uint8_t src_stream_id, uint8_t dst_stream_id=INVALID_STREAM_ID)
Creates a Stream Initiate Request message payload.
static constexpr uint16_t MAX_PAYLOAD
Maximum window size for stream send.