Open Model Railroad Network (OpenMRN)
Loading...
Searching...
No Matches
StreamSender.hxx
Go to the documentation of this file.
1
36#ifndef _OPENLCB_STREAMSENDER_HXX_
37#define _OPENLCB_STREAMSENDER_HXX_
38
40#include "openlcb/CanDefs.hxx"
42#include "openlcb/IfCan.hxx"
44#include "utils/ByteBuffer.hxx"
45#include "utils/LimitedPool.hxx"
47
48namespace openlcb
49{
50
51class StreamSender : public StateFlow<ByteBuffer, QList<1>>
52{
53public:
56 {
57 }
58
77};
78
82{
83public:
84 StreamSenderCan(Service *service, IfCan *iface)
85 : StreamSender(service)
86 , ifCan_(iface)
87 , sleeping_(false)
88 , requestClose_(false)
89 , requestInit_(false)
90 {
91 }
92
106 uint8_t source_stream_id,
107 uint8_t dst_stream_id = StreamDefs::INVALID_STREAM_ID)
108 {
109 DASSERT(state_ == IDLE);
110 state_ = STARTED;
111 node_ = src;
112 dst_ = dst;
113 totalByteCount_ = 0;
114 localStreamId_ = source_stream_id;
115 dstStreamId_ = dst_stream_id;
116 HASSERT(sleeping_ == false);
118 requestInit_ = true;
119 trigger();
120 streamFlags_ = 0;
124 errorCode_ = 0;
125 return *this;
126 }
127
133 void close_stream(uint16_t error_code = 0)
134 {
135 requestClose_ = true;
136 trigger();
137 }
138
146 {
148 streamWindowSize_ = window_size;
149 return *this;
150 }
151
159 {
162 return *this;
163 }
164
167 void clear()
168 {
169 if (state_ == STATE_ERROR || state_ == CLOSING)
170 {
171 state_ = IDLE;
172 }
173 }
174
175#ifdef GTEST
178 bool shutdown()
179 {
180 if (sleeping_)
181 {
182 timer_.trigger();
183 sleeping_ = false;
184 return true;
185 }
186 return false;
187 }
188#endif
189
192 {
193 return state_;
194 }
195
197 uint16_t get_error()
198 {
199 return errorCode_;
200 }
201
204 {
205 return localStreamId_;
206 }
207
210 {
211 return dstStreamId_;
212 }
213
216 Action entry() override
217 {
218 if (requestInit_)
219 {
220 requestInit_ = 0;
221 return call_immediately(STATE(initiate_stream));
222 }
223 if (state_ == STATE_ERROR || state_ == CLOSING)
224 {
225 return release_and_exit();
226 }
229 {
230 // We ran out of the current stream window size.
231 return call_immediately(STATE(wait_for_stream_proceed));
232 }
233 if (!remaining())
234 {
235 // We ran out of the current chunk of stream payload from the
236 // source.
237 if (requestClose_ && queue_empty())
238 {
239 requestClose_ = false;
240 return call_immediately(STATE(do_close_stream));
241 }
242 return release_and_exit();
243 }
244 return call_immediately(STATE(allocate_can_buffer));
245 }
246
247private:
249 void trigger()
250 {
251 auto *b = alloc();
252 this->send(b);
253 }
254
258 {
259 // Grabs alias / node ID from the cache.
260 node_->iface()->canonicalize_handle(&dst_);
261 return allocate_and_call(node_->iface()->addressed_message_write_flow(),
263 }
264
267 {
268 auto *b = get_allocation_result(
270 b->data()->reset(Defs::MTI_STREAM_INITIATE_REQUEST, node_->node_id(),
271 dst_,
274
275 node_->iface()->dispatcher()->register_handler(
278
280 sleeping_ = true;
282 LOG(VERBOSE, "wait for stream init reply");
283 return sleep_and_call(&timer_, SEC_TO_NSEC(STREAM_INIT_TIMEOUT_SEC),
285 }
286
290 {
291 LOG(VERBOSE, "stream init reply: %s",
292 string_to_hex(message->data()->payload).c_str());
293 auto rb = get_buffer_deleter(message);
294 if (message->data()->dstNode != node_ ||
295 !node_->iface()->matching_node(dst_, message->data()->src))
296 {
297 LOG(INFO, "stream reply not for me");
298 // Not for me.
299 return;
300 }
301 const auto &payload = message->data()->payload;
302 if (payload.size() < 6 || (uint8_t)payload[4] != localStreamId_)
303 {
304 LOG(INFO, "wrong stream ID %x %x", payload[4], localStreamId_);
305 // Talking about another stream or incorrect data.
306 return;
307 }
311 streamWindowSize_ = (payload[0] << 8) | payload[1];
312 // Grabs alias / node ID from the cache.
313 node_->iface()->canonicalize_handle(&dst_);
314
315 // We save the remote alias here if we haven't got any yet.
316 if (message->data()->src.alias)
317 {
318 dst_.alias = message->data()->src.alias;
319 }
320
322 (node_->iface()->lookup_local_node_handle(dst_) != nullptr);
323
324 sleeping_ = false;
325 timer_.trigger();
326 }
327
331 {
332 LOG(VERBOSE, "stream init reply wait done");
336 if (!(streamFlags_ & StreamDefs::FLAG_ACCEPT))
337 {
339 {
340 return return_error(DatagramDefs::PERMANENT_ERROR |
342 "Stream initiate request was denied (permanent error).");
343 }
344 else
345 {
346 return return_error(Defs::ERROR_TEMPORARY |
348 "Stream initiate request was denied (temporary error).");
349 }
350 }
352 {
353 return return_error(DatagramDefs::PERMANENT_ERROR,
354 "Inconsistency: zero buffer length but "
355 "accepted stream request.");
356 }
358 node_->iface()->dispatcher()->register_handler(
360 state_ = RUNNING;
361 return entry();
362 }
363
373
376 {
377 auto *b = get_allocation_result(
379 b->data()->reset(Defs::MTI_STREAM_COMPLETE, node_->node_id(), dst_,
382
384 state_ = CLOSING;
385 return entry();
386 }
387
390 {
391 return allocate_and_call(
393 }
394
397 {
398 auto *b = get_allocation_result(ifCan_->frame_write_flow());
399
400 uint32_t can_id;
401 NodeAlias local_alias =
402 ifCan_->local_aliases()->lookup(node_->node_id());
403 NodeAlias remote_alias = dst_.alias;
405 &can_id, local_alias, remote_alias, CanDefs::STREAM_DATA);
406 auto *frame = b->data()->mutable_frame();
407 SET_CAN_FRAME_ID_EFF(*frame, can_id);
408
409 size_t len = compute_next_can_length();
410
411 frame->can_dlc = len + 1;
412 frame->data[0] = dstStreamId_;
413 memcpy(&frame->data[1], payload(), len);
414 advance(len);
415
417 {
419 }
420 else
421 {
423 }
424 return entry();
425 }
426
430 {
432 {
433 // received early stream_proceed response
434 return call_immediately(STATE(stream_proceed_timeout));
435 }
436 sleeping_ = true;
437 state_ = FULL;
438 return sleep_and_call(&timer_, SEC_TO_NSEC(STREAM_PROCEED_TIMEOUT_SEC),
440 }
441
444 {
445 auto rb = get_buffer_deleter(message);
446 if (message->data()->dstNode != node_ ||
447 !node_->iface()->matching_node(dst_, message->data()->src))
448 {
449 // Not for me.
450 return;
451 }
452
453 const auto &payload = message->data()->payload;
454 if (payload.size() < 2 || (uint8_t)payload[0] != localStreamId_)
455 {
456 // Talking about another stream or incorrect data.
457 return;
458 }
459
461
463 if (sleeping_)
464 {
465 sleeping_ = false;
466 timer_.trigger();
467 }
468 }
469
471 {
472 if (!streamWindowRemaining_) // no proceed arrived
473 {
476 return return_error(Defs::ERROR_TEMPORARY,
477 "Timed out waiting for stream proceed message.");
478 // return call_immediately(STATE(close_stream));
479 }
480 state_ = RUNNING;
481 return entry();
482 }
483
484private:
487 {
488 size_t ret = remaining();
489 // Cannot exceed CAN frame max payload.
491 {
493 }
494 // Cannot exceed remaining bytes in stream window.
495 if (ret > streamWindowRemaining_)
496 {
498 }
499 return ret;
500 }
501
503 size_t remaining()
504 {
505 return message()->data()->size_;
506 }
507
509 uint8_t *payload()
510 {
511 return message()->data()->data_;
512 }
513
517 void advance(size_t num_bytes)
518 {
519 message()->data()->advance(num_bytes);
520 totalByteCount_ += num_bytes;
521 streamWindowRemaining_ -= num_bytes;
522 }
523
524 Action return_error(uint32_t code, string message)
525 {
526 LOG(INFO, "error %x: %s", (unsigned)code, message.c_str());
527 errorCode_ = code;
529 return release_and_exit();
530 }
531
534 static constexpr size_t STREAM_PROCEED_TIMEOUT_SEC = 20;
535
538 static constexpr size_t STREAM_INIT_TIMEOUT_SEC = 20;
539
541 static constexpr size_t MAX_BYTES_PAYLOAD_PER_CAN_FRAME = 7;
542
544 static constexpr size_t MAX_FRAMES_IN_FLIGHT = 4;
545
547 static constexpr size_t CAN_FRAME_ALLOC_SIZE =
549
551 MessageHandler::GenericHandler streamProceedHandler_ {
554 MessageHandler::GenericHandler streamInitiateReplyHandler_ {
556
561 Node *node_ {nullptr};
566 size_t totalByteCount_ {0};
575 uint8_t isLoopbackStream_ : 1;
577 uint8_t sleeping_ : 1;
579 uint8_t requestClose_ : 1;
581 uint8_t requestInit_ : 1;
583 uint8_t streamFlags_ {0};
591 uint32_t errorCode_ {0};
597 StateFlowTimer timer_ {this};
598};
599
600class StreamRendererCan : public StateFlow<ByteBuffer, QList<1>>
601{ };
602
603} // namespace openlcb
604
605#endif // _OPENLCB_STREAMSENDER_HXX_
BufferPtr< T > get_buffer_deleter(Buffer< T > *b)
Helper function to create a BufferPtr of an appropriate type without having to explicitly specify the...
Definition Buffer.hxx:272
#define STATE(_fn)
Turns a function name into an argument to be supplied to functions expecting a state.
Definition StateFlow.hxx:61
Base class for all QMember types that hold data in an expandable format.
Definition Buffer.hxx:195
OutgoingFrameHandler * loopback_frame_write_flow()
Definition CanIf.hxx:221
OutgoingFrameHandler * frame_write_flow()
Definition CanIf.hxx:214
void register_handler(HandlerType *handler, ID id, ID mask)
Adds a new handler to this dispatcher.
void unregister_handler(HandlerType *handler, ID id, ID mask)
Removes a specific instance of a handler from this dispatcher.
virtual void send(MessageType *message, unsigned priority=UINT_MAX)=0
Entry point to the flow.
Buffer< CanHubData > message_type
Stores the message template type for external reference.
MessageType * alloc()
Synchronously allocates a message buffer from the pool of this flow.
Implementation of a Pool interface that takes memory from mainBufferPool (configurable) but limits th...
Collection of related state machines that pend on incoming messages.
State flow with a given typed input queue.
Base::Action Action
Allows using Action without having StateFlowBase:: prefix in front of it.
void send(MessageType *msg, unsigned priority=UINT_MAX) OVERRIDE
Sends a message to the state flow for processing.
MessageType * message()
NodeAlias lookup(NodeID id)
Lookup a node's alias based on its Node ID.
Implementation of the OpenLCB interface abstraction for the CAN-bus interface standard.
Definition IfCan.hxx:65
AliasCache * local_aliases()
Definition IfCan.hxx:96
virtual void canonicalize_handle(NodeHandle *h)
Canonicalizes the node handle: fills in id and/or alias from the maps the interface holds internally.
Definition If.hxx:327
MessageDispatchFlow * dispatcher()
Definition If.hxx:224
virtual Node * lookup_local_node_handle(NodeHandle handle)
Looks up a node ID in the local nodes' registry.
Definition If.hxx:279
MessageHandler * addressed_message_write_flow()
Definition If.hxx:210
virtual bool matching_node(NodeHandle expected, NodeHandle actual)=0
Base class for NMRAnet nodes conforming to the asynchronous interface.
Definition Node.hxx:52
Helper class for sending stream data to a CAN interface.
static constexpr size_t MAX_BYTES_PAYLOAD_PER_CAN_FRAME
How many bytes payload we can copy into a single CAN frame.
LimitedPool canFramePool_
Source of buffers for outgoing CAN frames.
void clear()
Sets the stream sender to be available for reuse after a stream has been closed or reached error.
static constexpr size_t STREAM_INIT_TIMEOUT_SEC
How many seconds for waiting for a stream init before we give up with a timeout.
Action wait_for_stream_proceed()
Starts sleeping until a proceed message arrives.
void trigger()
Sends an empty message to *this, thereby waking up the state machine.
StreamSenderCan & set_stream_uid(NodeID stream_uid)
Specifies the Stream UID to send in the stream initiate request.
Node * node_
Which node are we sending the outgoing data from.
uint8_t requestClose_
1 if there is a pending close request.
IfCan * ifCan_
CAN-bus interface.
static constexpr size_t CAN_FRAME_ALLOC_SIZE
How many bytes the allocation of a single CAN frame should be.
Action send_close_stream()
Sends the stream close message.
void advance(size_t num_bytes)
Consumes a certain number of bytes from the beginning of the data to send.
uint8_t sleeping_
True if we are waiting for the timer.
uint8_t isLoopbackStream_
Determines whether the stream transmission is happening to localhost.
NodeHandle dst_
Destination node that we are sending to.
uint16_t streamWindowSize_
Total stream window size.
void stream_proceed_received(Buffer< GenMessage > *message)
Callback from the handler flow.
uint8_t streamAdditionalFlags_
More flags from the remote node that we got in stream initiate reply.
Action entry() override
Start of state machine, called when a buffer of data to send arrives from the application layer.
static constexpr size_t MAX_FRAMES_IN_FLIGHT
How many CAN frames should we allocate at a given time.
void close_stream(uint16_t error_code=0)
Closes the stream when all the bytes are transferred.
Action received_init_stream()
State executed after wakeup from the stream initiate reply received handler.
uint32_t errorCode_
When the stream process fails, this variable contains an error code.
size_t totalByteCount_
How many bytes we have transmitted in this stream so far.
Action do_close_stream()
Allocates a GenMessage buffer and sends out the stream close message to the destination.
static constexpr size_t STREAM_PROCEED_TIMEOUT_SEC
How many seconds for waiting for a stream proceed before we give up with a timeout.
StateFlowTimer timer_
Helper object for timeouts.
StreamSenderState get_state()
StreamSenderCan & start_stream(Node *src, NodeHandle dst, uint8_t source_stream_id, uint8_t dst_stream_id=StreamDefs::INVALID_STREAM_ID)
Initiates using the stream sender.
void stream_initiate_replied(Buffer< GenMessage > *message)
Callback from GenHandler when a stream initiate reply message arrives at the local interface.
uint8_t dstStreamId_
Stream ID at the destination node.
uint16_t streamWindowRemaining_
Remaining stream window size.
StreamSenderState state_
What state the current class is in.
uint8_t streamFlags_
Flags from the remote node that we got in stream initiate reply.
Action allocate_can_buffer()
Allocates a buffer for a CAN frame (for payload send).
Action initiate_stream()
Allocates a GenMessage buffer and sends out the stream initiate message to the destination.
Action got_frame()
Got a buffer for an output frame (payload send).
MessageHandler::GenericHandler streamProceedHandler_
Handles incoming stream proceed messages.
MessageHandler::GenericHandler streamInitiateReplyHandler_
Handles incoming stream initiate reply messages.
uint8_t requestInit_
1 if there is a pending initialize request.
StreamSenderCan & set_proposed_window_size(uint16_t window_size)
Specifies what the source should propose as window size to the destination.
Action send_init_stream()
Sends the stream initiate message.
uint8_t localStreamId_
Stream ID at the source node.
StreamSenderState
Describes the different states in the stream sender.
@ STATE_ERROR
An error occurred.
@ STARTED
The local client has started using the stream sender (via API).
@ RUNNING
Stream is open and data can be transferred.
@ IDLE
This stream sender is not in use now.
@ INITIATING
The stream initiate message was sent.
@ FULL
Stream buffer is full, waiting for proceed message.
@ CLOSING
Stream close message was sent.
#define LOG(level, message...)
Conditionally write a message to the logging output.
Definition logging.h:99
static const int VERBOSE
Loglevel that is usually not printed, reporting debugging information.
Definition logging.h:59
static const int INFO
Loglevel that is printed by default, reporting some status information.
Definition logging.h:57
#define HASSERT(x)
Checks that the value of expression x is true, else terminates the current process.
Definition macros.h:138
#define DASSERT(x)
Debug assertion facility.
Definition macros.h:159
uint64_t NodeID
48-bit NMRAnet Node ID type
uint16_t NodeAlias
Alias to a 48-bit NMRAnet Node ID type.
#define SEC_TO_NSEC(_sec)
Convert a second value to a nanosecond value.
Definition os.h:286
static void set_datagram_fields(uint32_t *can_id, NodeAlias src, NodeAlias dst, CanFrameType can_type)
Set all the CAN ID fields for datagram or stream message.
Definition CanDefs.hxx:305
@ STREAM_DATA
stream data frame
Definition CanDefs.hxx:105
@ PERMANENT_ERROR
Permanent error occurred.
@ MTI_STREAM_PROCEED
stream flow control
@ MTI_EXACT
match mask for a single MTI
@ MTI_STREAM_INITIATE_REPLY
Stream initiate reply.
@ MTI_STREAM_INITIATE_REQUEST
Stream initiate request.
@ MTI_STREAM_COMPLETE
stream terminate connection
Container of both a NodeID and NodeAlias.
NodeAlias alias
alias to NMRAnet Node ID
static constexpr uint8_t INVALID_STREAM_ID
This value is invalid as a source or destination stream ID.
static Payload create_close_request(uint8_t src_stream_id, uint8_t dst_stream_id, uint32_t total_bytes=INVALID_TOTAL_BYTE_COUNT)
Creates the payload for a stream close message.
static Payload create_initiate_request(uint16_t max_buffer_size, bool has_ident, uint8_t src_stream_id, uint8_t dst_stream_id=INVALID_STREAM_ID)
Creates a Stream Initiate Request message payload.
static constexpr uint16_t MAX_PAYLOAD
Maximum window size for stream send.