Open Model Railroad Network (OpenMRN)
Loading...
Searching...
No Matches
StreamReceiver.cxx
Go to the documentation of this file.
1
37
38#include <endian.h>
39
40#include "nmranet_config.h"
41#include "openlcb/CanDefs.hxx"
42#include "openlcb/Defs.hxx"
44
45namespace openlcb
46{
47
49{
50 // Resets state bits.
51 streamClosed_ = 0;
52 pendingInit_ = 0;
54 isWaiting_ = 0;
55
56 if (!request()->streamWindowSize_)
57 {
58 request()->streamWindowSize_ =
59 config_stream_receiver_default_window_size();
60 }
64}
65
67{
68 reset_message(msg, prio);
69
70 if (request()->localStreamId_ == StreamDefs::INVALID_STREAM_ID)
71 {
72 request()->localStreamId_ = assignedStreamId_;
73 }
74
75 if (!request()->target_)
76 {
77 // asking for stream ID.
78 request()->localStreamId_ = assignedStreamId_;
80 return;
81 }
82
84 wait_for_wakeup();
85}
86
88{
89 auto rb = get_buffer_deleter(message);
90
91 if (message->data()->dstNode != node() ||
92 !node()->iface()->matching_node(request()->src_, message->data()->src))
93 {
94 LOG(INFO, "stream init not for me");
95 // Not for me.
96 return;
97 }
98 // Saves alias as well.
99 request()->src_ = message->data()->src;
100 const auto &payload = message->data()->payload;
101 uint16_t proposed_window;
102 uint8_t incoming_src_id = StreamDefs::INVALID_STREAM_ID;
103 if (payload.size() >= 5)
104 {
105 incoming_src_id = payload[4];
106 if (request()->srcStreamId_ != StreamDefs::INVALID_STREAM_ID &&
107 request()->srcStreamId_ != incoming_src_id)
108 {
109 LOG(INFO, "stream init ID not for me");
110 // Not for me.
111 return;
112 }
113 request()->srcStreamId_ = incoming_src_id;
114 }
115 if (payload.size() < 5 ||
116 incoming_src_id == StreamDefs::INVALID_STREAM_ID ||
117 ((proposed_window = data_to_error(&payload[0])) == 0))
118 {
119 LOG(INFO, "Incoming stream: invalid arguments.");
120 // Invalid arguments. This will synchronously allocate a buffer and
121 // send the message to the interface.
123 message->data()->src,
124 StreamDefs::create_initiate_response(0, incoming_src_id,
125 request()->localStreamId_,
127 node()->iface()->dispatcher()->unregister_handler(
130 return;
131 }
132 if (proposed_window < request()->streamWindowSize_)
133 {
134 request()->streamWindowSize_ = proposed_window;
135 }
136
137 streamWindowRemaining_ = request()->streamWindowSize_;
138 totalByteCount_ = 0;
139
140 node()->iface()->dispatcher()->register_handler(
142
145
146 pendingInit_ = 1;
147 notify();
148}
149
150void StreamReceiverCan::handle_bytes_received(const uint8_t *data, size_t len)
151{
152 while (len > 0)
153 {
154 if (!currentBuffer_)
155 {
156 // Need to allocate a new chunk first.
158 // Add an empty raw buffer to it.
159 RawBufferPtr rb;
161 {
162 // We need to use the last raw buffer.
163 rb = std::move(lastBuffer_);
164 }
165 else
166 {
167 // We need a new (middle) raw buffer.
168 rawBufferPool->alloc(&rb);
169 }
170 currentBuffer_->data()->set_from(std::move(rb), 0);
171 }
172 size_t copied = currentBuffer_->data()->append(data, len);
173 data += copied;
174 len -= copied;
175 totalByteCount_ += copied;
176 if (copied <= streamWindowRemaining_)
177 {
178 streamWindowRemaining_ -= copied;
179 }
180 else
181 {
182 LOG(WARNING, "Unexpected stream bytes, window is negative.");
184 }
185 if (!currentBuffer_->data()->free_space() || !streamWindowRemaining_)
186 {
187 // Sends off the buffer and clears currentBuffer_.
188 request()->target_->send(currentBuffer_.release());
189 }
190 } // while len > 0
192 {
193 // wake up state flow to send ack to the stream
194 notify();
195 }
196}
197
199{
200 auto rb = get_buffer_deleter(message);
201
202 if (message->data()->dstNode != node() ||
203 !node()->iface()->matching_node(request()->src_, message->data()->src))
204 {
205 LOG(INFO, "stream complete not for me");
206 // Not for me.
207 return;
208 }
209
210 if (message->data()->payload.size() < 2)
211 {
212 // Invalid arguments. Ignore.
213 return;
214 }
215
216 if (((uint8_t)message->data()->payload[0]) != request()->srcStreamId_ ||
217 ((uint8_t)message->data()->payload[1]) != request()->localStreamId_)
218 {
219 // Different stream.
220 LOG(INFO, "stream complete different stream");
221 return;
222 }
223
224 uint32_t total_size = StreamDefs::INVALID_TOTAL_BYTE_COUNT;
225
226 if (message->data()->payload.size() >= 6)
227 {
228 memcpy(&total_size, message->data()->payload.data() + 2, 4);
229 total_size = be32toh(total_size);
230 }
231
232 streamClosed_ = true;
233
234 if (total_size != StreamDefs::INVALID_TOTAL_BYTE_COUNT)
235 {
236 // We have to wait for the remaining bytes to show up.
238 }
239 else
240 {
242 }
243
245 {
246 // wake up the flow.
247 notify();
248 }
249
250 node()->iface()->dispatcher()->unregister_handler(
252}
253
255{
256public:
258 : parent_(parent)
259 { }
260
262 void start(NodeAlias remote_alias, NodeAlias local_alias)
263 {
264 HASSERT(remote_alias);
265 HASSERT(local_alias);
266 uint32_t frame_id = 0;
268 &frame_id, remote_alias, local_alias, CanDefs::STREAM_DATA);
269 LOG(VERBOSE, "register frame ID %x", (unsigned)frame_id);
271 this, frame_id, CanDefs::STREAM_DG_RECV_MASK);
272 }
273
275 void stop()
276 {
278 }
279
281 void send(Buffer<CanMessageData> *message, unsigned priority) override
282 {
283 auto rb = get_buffer_deleter(message);
284
285 if (message->data()->can_dlc <= 0)
286 {
287 return; // no payload
288 }
289 if (message->data()->data[0] != parent_->request()->localStreamId_)
290 {
291 return; // different stream
292 }
294 message->data()->data + 1, message->data()->can_dlc - 1);
295 }
296
297private:
300};
301
302StreamReceiverCan::StreamReceiverCan(IfCan *interface, uint8_t local_stream_id)
303 : StreamReceiverInterface(interface)
304 , dataHandler_(new StreamDataHandler(this))
305 , assignedStreamId_(local_stream_id)
306 , streamClosed_(0)
307 , pendingInit_(0)
308 , pendingCancel_(0)
309 , isWaiting_(0)
310{ }
311
312StreamReceiverCan::~StreamReceiverCan()
313{ }
314
316{
317 pendingCancel_ = 1;
318 if (isWaiting_)
319 {
320 isWaiting_ = 0;
321 notify();
322 }
323}
324
333
335{
336 isWaiting_ = 0;
337 // Checks reason for wakeup.
338 if (pendingCancel_)
339 {
341 if (currentBuffer_)
342 {
343 // Sends off the buffer and clears currentBuffer_.
344 request()->target_->send(currentBuffer_.release());
345 }
347 }
348 if (pendingInit_)
349 {
350 pendingInit_ = 0;
352 }
354 {
355 if (streamClosed_)
356 {
357 streamClosed_ = 0;
358 dataHandler_->stop();
359 if (currentBuffer_)
360 {
361 // Sends off the buffer and clears currentBuffer_.
362 request()->target_->send(currentBuffer_.release());
363 }
364 return return_ok();
365 }
366 // Need to send an ack.
368 }
369 return wait();
370}
371
373{
374 // Initialize the last buffer for the first window.
375 return allocate_and_call<RawData>(
376 nullptr, STATE(init_buffer_ready), &lastBufferPool_);
377}
378
379StateFlowBase::Action StreamReceiverCan::init_buffer_ready()
380{
381 lastBuffer_.reset(get_allocation_result<RawData>(nullptr));
382
383 node()->iface()->canonicalize_handle(&request()->src_);
384 NodeHandle local(node()->node_id());
385 node()->iface()->canonicalize_handle(&local);
386 dataHandler_->start(request()->src_.alias, local.alias);
387
390 request()->srcStreamId_, request()->localStreamId_));
391
392 return wait_for_wakeup();
393}
394
396{
397 return allocate_and_call<RawData>(
399}
400
402{
403 lastBuffer_.reset(get_allocation_result<RawData>(nullptr));
404 streamWindowRemaining_ = request()->streamWindowSize_;
407 request()->srcStreamId_, request()->localStreamId_));
408 return wait_for_wakeup();
409}
410
411} // namespace openlcb
Pool * rawBufferPool
Use this BufferPool to allocate raw buffers.
Definition Buffer.cxx:38
DynamicPool * mainBufferPool
main buffer pool instance
Definition Buffer.cxx:37
BufferPtr< T > get_buffer_deleter(Buffer< T > *b)
Helper function to create a BufferPtr of an appropriate type without having to explicitly specify the...
Definition Buffer.hxx:272
BufferPtr< RawData > RawBufferPtr
Holds a raw buffer.
#define STATE(_fn)
Turns a function name into an argument to be supplied to functions expecting a state.
Definition StateFlow.hxx:61
Base class for all QMember types that hold data in an expandable format.
Definition Buffer.hxx:195
Action return_ok()
Terminates the flow and returns the request buffer to the caller with an error code of OK (zero).
Action return_with_error(int error)
Terminates the flow and returns the request buffer to the caller with an specific error code.
StreamReceiveRequest * request()
FrameDispatchFlow * frame_dispatcher()
Definition CanIf.hxx:208
void register_handler(HandlerType *handler, ID id, ID mask)
Adds a new handler to this dispatcher.
void unregister_handler_all(HandlerType *handler)
Removes all instances of a handler from this dispatcher.
void unregister_handler(HandlerType *handler, ID id, ID mask)
Removes a specific instance of a handler from this dispatcher.
void alloc(Buffer< BufferType > **result, Executable *flow=NULL)
Get a free item out of the pool.
Definition Buffer.hxx:292
Return type for a state flow callback.
Action wait()
Wait for an asynchronous call.
void notify() override
Wakeup call arrived. Schedules *this on the executor.
void reset_message(BufferBase *message, unsigned priority)
Sets the current message being processed.
void return_buffer()
For state flows that are operated using invoke_subflow_and_wait this is a way to hand back the buffer...
Action call_immediately(Callback c)
Imediately call the next state upon return.
Implementation of the OpenLCB interface abstraction for the CAN-bus interface standard.
Definition IfCan.hxx:65
virtual void canonicalize_handle(NodeHandle *h)
Canonicalizes the node handle: fills in id and/or alias from the maps the interface holds internally.
Definition If.hxx:327
MessageDispatchFlow * dispatcher()
Definition If.hxx:224
void start(NodeAlias remote_alias, NodeAlias local_alias)
Starts registration for receiving stream data with the given aliases.
StreamReceiverCan * parent_
Owning stream receiver object.
void send(Buffer< CanMessageData > *message, unsigned priority) override
Handler callback for incoming messages.
void stop()
Stops receiving stream data.
void cancel_request() override
Cancels the currently pending stream receive request.
Action wakeup()
Root of the flow when something happens in the handlers.
size_t totalByteCount_
How many bytes we have transmitted in this stream so far.
void unregister_handlers()
Removes all handlers that are registered.
void handle_stream_initiate(Buffer< GenMessage > *message)
Invoked by the GenericHandler when a stream initiate message arrives.
std::unique_ptr< StreamDataHandler > dataHandler_
Helper object that receives the actual stream CAN frames.
ByteBufferPtr currentBuffer_
The buffer that we are currently filling with incoming data.
Action have_raw_buffer()
Called when the allocation of the raw buffer is successful.
LimitedPool lastBufferPool_
This pool is used to allocate one raw buffer per stream window size.
Action init_reply()
Invoked when we get the stream initiate request.
StreamReceiverCan(IfCan *interface, uint8_t local_stream_id)
Constructor.
void announced_stream()
Helper function for send() when a stream has to start synchronously.
uint8_t streamClosed_
1 if we received the stream complete message.
MessageHandler::GenericHandler streamInitiateHandler_
Helper class for incoming message for stream initiate.
uint8_t pendingInit_
1 if we received the stream init request message.
void send(Buffer< StreamReceiveRequest > *msg, unsigned prio=0) override
Implements the flow interface for the request API.
Action window_reached()
Invoked when the stream window runs out.
void handle_stream_complete(Buffer< GenMessage > *message)
Invoked by the GenericHandler when a stream complete message arrives.
uint16_t streamWindowRemaining_
Remaining stream window size.
uint8_t isWaiting_
1 if we are currently waiting for a notification
uint8_t pendingCancel_
1 if we received a cancel request
RawBufferPtr lastBuffer_
The buffer that will be the last one in this stream window.
const uint8_t assignedStreamId_
Unique stream ID at the destination (local) node, assigned at construction time.
void handle_bytes_received(const uint8_t *data, size_t len)
Handles data arriving from the network.
MessageHandler::GenericHandler streamCompleteHandler_
Helper class for incoming message for stream complete.
#define LOG(level, message...)
Conditionally write a message to the logging output.
Definition logging.h:99
static const int VERBOSE
Loglevel that is usually not printed, reporting debugging information.
Definition logging.h:59
static const int WARNING
Loglevel that is always printed, reporting a warning or a retryable error.
Definition logging.h:55
static const int INFO
Loglevel that is printed by default, reporting some status information.
Definition logging.h:57
#define HASSERT(x)
Checks that the value of expression x is true, else terminates the current process.
Definition macros.h:138
uint16_t data_to_error(const void *data)
Parses an error code from a payload object at a given pointer.
Definition If.cxx:84
void send_message(Node *src_node, Defs::MTI mti, Args &&...args)
Sends an OpenLCB message to the bus.
Definition If.hxx:426
uint16_t NodeAlias
Alias to a 48-bit NMRAnet Node ID type.
static constexpr size_t MAX_SIZE
Maximum length that can be stored in a single RawBuffer.
static void set_datagram_fields(uint32_t *can_id, NodeAlias src, NodeAlias dst, CanFrameType can_type)
Set all the CAN ID fields for datagram or stream message.
Definition CanDefs.hxx:305
@ STREAM_DATA
stream data frame
Definition CanDefs.hxx:105
@ STREAM_DG_RECV_MASK
mask for receiving datagram and stream frames.
Definition CanDefs.hxx:69
@ MTI_STREAM_PROCEED
stream flow control
@ MTI_EXACT
match mask for a single MTI
@ MTI_STREAM_INITIATE_REPLY
Stream initiate reply.
@ MTI_STREAM_INITIATE_REQUEST
Stream initiate request.
@ MTI_STREAM_COMPLETE
stream terminate connection
Container of both a NodeID and NodeAlias.
static constexpr uint8_t INVALID_STREAM_ID
This value is invalid as a source or destination stream ID.
static constexpr uint16_t STREAM_ERROR_INVALID_ARGS
This code is sent back in the error code field in the stream initiate reply if the stream is rejected...
static constexpr uint32_t INVALID_TOTAL_BYTE_COUNT
Supply this value to the total byte count in stream close to mark it as invalid.
static Payload create_initiate_response(uint16_t max_buffer_size, uint8_t src_stream_id, uint8_t dst_stream_id, uint16_t error_code=STREAM_ACCEPT)
Creates a Stream Initiate Reply message payload.
static Payload create_data_proceed(uint8_t src_stream_id, uint8_t dst_stream_id)
Creates a Stream Data Proceed message payload.
@ ERROR_CANCELED
The operation was canceled by the caller using cancel_request()