56 if (!
request()->streamWindowSize_)
59 config_stream_receiver_default_window_size();
94 LOG(
INFO,
"stream init not for me");
100 const auto &payload =
message->data()->payload;
101 uint16_t proposed_window;
103 if (payload.size() >= 5)
105 incoming_src_id = payload[4];
107 request()->srcStreamId_ != incoming_src_id)
109 LOG(
INFO,
"stream init ID not for me");
113 request()->srcStreamId_ = incoming_src_id;
115 if (payload.size() < 5 ||
119 LOG(
INFO,
"Incoming stream: invalid arguments.");
132 if (proposed_window <
request()->streamWindowSize_)
134 request()->streamWindowSize_ = proposed_window;
182 LOG(
WARNING,
"Unexpected stream bytes, window is negative.");
205 LOG(
INFO,
"stream complete not for me");
210 if (
message->data()->payload.size() < 2)
216 if (((uint8_t)
message->data()->payload[0]) !=
request()->srcStreamId_ ||
217 ((uint8_t)
message->data()->payload[1]) !=
request()->localStreamId_)
220 LOG(
INFO,
"stream complete different stream");
226 if (
message->data()->payload.size() >= 6)
228 memcpy(&total_size,
message->data()->payload.data() + 2, 4);
229 total_size = be32toh(total_size);
266 uint32_t frame_id = 0;
269 LOG(
VERBOSE,
"register frame ID %x", (
unsigned)frame_id);
285 if (
message->data()->can_dlc <= 0)
305 , assignedStreamId_(local_stream_id)
312StreamReceiverCan::~StreamReceiverCan()
375 return allocate_and_call<RawData>(
381 lastBuffer_.reset(get_allocation_result<RawData>(
nullptr));
392 return wait_for_wakeup();
397 return allocate_and_call<RawData>(
403 lastBuffer_.reset(get_allocation_result<RawData>(
nullptr));
408 return wait_for_wakeup();
Pool * rawBufferPool
Use this BufferPool to allocate raw buffers.
DynamicPool * mainBufferPool
main buffer pool instance
BufferPtr< T > get_buffer_deleter(Buffer< T > *b)
Helper function to create a BufferPtr of an appropriate type without having to explicitly specify the...
BufferPtr< RawData > RawBufferPtr
Holds a raw buffer.
#define STATE(_fn)
Turns a function name into an argument to be supplied to functions expecting a state.
Base class for all QMember types that hold data in an expandable format.
Action return_ok()
Terminates the flow and returns the request buffer to the caller with an error code of OK (zero).
Action return_with_error(int error)
Terminates the flow and returns the request buffer to the caller with an specific error code.
StreamReceiveRequest * request()
FrameDispatchFlow * frame_dispatcher()
void register_handler(HandlerType *handler, ID id, ID mask)
Adds a new handler to this dispatcher.
void unregister_handler_all(HandlerType *handler)
Removes all instances of a handler from this dispatcher.
void unregister_handler(HandlerType *handler, ID id, ID mask)
Removes a specific instance of a handler from this dispatcher.
void alloc(Buffer< BufferType > **result, Executable *flow=NULL)
Get a free item out of the pool.
Return type for a state flow callback.
Action wait()
Wait for an asynchronous call.
void notify() override
Wakeup call arrived. Schedules *this on the executor.
void reset_message(BufferBase *message, unsigned priority)
Sets the current message being processed.
void return_buffer()
For state flows that are operated using invoke_subflow_and_wait this is a way to hand back the buffer...
Action call_immediately(Callback c)
Imediately call the next state upon return.
Implementation of the OpenLCB interface abstraction for the CAN-bus interface standard.
virtual void canonicalize_handle(NodeHandle *h)
Canonicalizes the node handle: fills in id and/or alias from the maps the interface holds internally.
MessageDispatchFlow * dispatcher()
void start(NodeAlias remote_alias, NodeAlias local_alias)
Starts registration for receiving stream data with the given aliases.
StreamReceiverCan * parent_
Owning stream receiver object.
void send(Buffer< CanMessageData > *message, unsigned priority) override
Handler callback for incoming messages.
void stop()
Stops receiving stream data.
void cancel_request() override
Cancels the currently pending stream receive request.
Action wakeup()
Root of the flow when something happens in the handlers.
size_t totalByteCount_
How many bytes we have transmitted in this stream so far.
void unregister_handlers()
Removes all handlers that are registered.
void handle_stream_initiate(Buffer< GenMessage > *message)
Invoked by the GenericHandler when a stream initiate message arrives.
std::unique_ptr< StreamDataHandler > dataHandler_
Helper object that receives the actual stream CAN frames.
ByteBufferPtr currentBuffer_
The buffer that we are currently filling with incoming data.
Action have_raw_buffer()
Called when the allocation of the raw buffer is successful.
LimitedPool lastBufferPool_
This pool is used to allocate one raw buffer per stream window size.
Action init_reply()
Invoked when we get the stream initiate request.
StreamReceiverCan(IfCan *interface, uint8_t local_stream_id)
Constructor.
void announced_stream()
Helper function for send() when a stream has to start synchronously.
uint8_t streamClosed_
1 if we received the stream complete message.
MessageHandler::GenericHandler streamInitiateHandler_
Helper class for incoming message for stream initiate.
uint8_t pendingInit_
1 if we received the stream init request message.
void send(Buffer< StreamReceiveRequest > *msg, unsigned prio=0) override
Implements the flow interface for the request API.
Action window_reached()
Invoked when the stream window runs out.
void handle_stream_complete(Buffer< GenMessage > *message)
Invoked by the GenericHandler when a stream complete message arrives.
uint16_t streamWindowRemaining_
Remaining stream window size.
uint8_t isWaiting_
1 if we are currently waiting for a notification
uint8_t pendingCancel_
1 if we received a cancel request
RawBufferPtr lastBuffer_
The buffer that will be the last one in this stream window.
const uint8_t assignedStreamId_
Unique stream ID at the destination (local) node, assigned at construction time.
void handle_bytes_received(const uint8_t *data, size_t len)
Handles data arriving from the network.
MessageHandler::GenericHandler streamCompleteHandler_
Helper class for incoming message for stream complete.
#define LOG(level, message...)
Conditionally write a message to the logging output.
static const int VERBOSE
Loglevel that is usually not printed, reporting debugging information.
static const int WARNING
Loglevel that is always printed, reporting a warning or a retryable error.
static const int INFO
Loglevel that is printed by default, reporting some status information.
#define HASSERT(x)
Checks that the value of expression x is true, else terminates the current process.
uint16_t data_to_error(const void *data)
Parses an error code from a payload object at a given pointer.
void send_message(Node *src_node, Defs::MTI mti, Args &&...args)
Sends an OpenLCB message to the bus.
uint16_t NodeAlias
Alias to a 48-bit NMRAnet Node ID type.
static constexpr size_t MAX_SIZE
Maximum length that can be stored in a single RawBuffer.
static void set_datagram_fields(uint32_t *can_id, NodeAlias src, NodeAlias dst, CanFrameType can_type)
Set all the CAN ID fields for datagram or stream message.
@ STREAM_DATA
stream data frame
@ STREAM_DG_RECV_MASK
mask for receiving datagram and stream frames.
@ MTI_STREAM_PROCEED
stream flow control
@ MTI_EXACT
match mask for a single MTI
@ MTI_STREAM_INITIATE_REPLY
Stream initiate reply.
@ MTI_STREAM_INITIATE_REQUEST
Stream initiate request.
@ MTI_STREAM_COMPLETE
stream terminate connection
Container of both a NodeID and NodeAlias.
static constexpr uint8_t INVALID_STREAM_ID
This value is invalid as a source or destination stream ID.
static constexpr uint16_t STREAM_ERROR_INVALID_ARGS
This code is sent back in the error code field in the stream initiate reply if the stream is rejected...
static constexpr uint32_t INVALID_TOTAL_BYTE_COUNT
Supply this value to the total byte count in stream close to mark it as invalid.
static Payload create_initiate_response(uint16_t max_buffer_size, uint8_t src_stream_id, uint8_t dst_stream_id, uint16_t error_code=STREAM_ACCEPT)
Creates a Stream Initiate Reply message payload.
static Payload create_data_proceed(uint8_t src_stream_id, uint8_t dst_stream_id)
Creates a Stream Data Proceed message payload.
@ ERROR_CANCELED
The operation was canceled by the caller using cancel_request()