Open Model Railroad Network (OpenMRN)
Loading...
Searching...
No Matches
IfTcpImpl.hxx
Go to the documentation of this file.
1
35#ifndef _OPENLCB_IFTCPIMPL_HXX_
36#define _OPENLCB_IFTCPIMPL_HXX_
37
38#include "openlcb/If.hxx"
39#include "utils/Hub.hxx"
40#include "utils/HubDeviceSelect.hxx"
41
42namespace openlcb
43{
44
48{
49public:
56 static void render_tcp_message(const GenMessage &msg,
57 NodeID gateway_node_id, long long sequence, string *tgt)
58 {
59 bool has_dst = Defs::get_mti_address(msg.mti);
60 HASSERT(tgt);
61 string &target = *tgt;
62 target.assign(HDR_LEN + msg.payload.size() +
64 '\0');
65 uint16_t flags = FLAGS_OPENLCB_MSG;
66 error_to_data(flags, &target[HDR_FLAG_OFS]);
67 unsigned sz = target.size() - HDR_SIZE_END;
68 target[HDR_SIZE_OFS] = (sz >> 16) & 0xff;
69 target[HDR_SIZE_OFS + 1] = (sz >> 8) & 0xff;
70 target[HDR_SIZE_OFS + 2] = sz & 0xff;
71 node_id_to_data(gateway_node_id, &target[HDR_GATEWAY_OFS]);
72 node_id_to_data(sequence, &target[HDR_TIMESTAMP_OFS]);
73 error_to_data(msg.mti, &target[HDR_LEN + MSG_MTI_OFS]);
74 node_id_to_data(msg.src.id, &target[HDR_LEN + MSG_SRC_OFS]);
75 if (has_dst)
76 {
77 node_id_to_data(msg.dst.id, &target[HDR_LEN + MSG_DST_OFS]);
78 memcpy(&target[HDR_LEN + MSG_ADR_PAYLOAD_OFS], msg.payload.data(),
79 msg.payload.size());
80 }
81 else
82 {
83 memcpy(&target[HDR_LEN + MSG_GLOBAL_PAYLOAD_OFS],
84 msg.payload.data(), msg.payload.size());
85 }
86 }
87
96 static int get_tcp_message_len(const void *data, size_t len)
97 {
98 if (len < HDR_SIZE_END)
99 {
100 return -1;
101 }
102 const uint8_t *src = static_cast<const uint8_t *>(data);
103 uint32_t sz = src[HDR_SIZE_OFS];
104 sz <<= 8;
105 sz |= src[HDR_SIZE_OFS + 1];
106 sz <<= 8;
107 sz |= src[HDR_SIZE_OFS + 2];
108 return sz + HDR_SIZE_END;
109 }
110
117 static bool parse_tcp_message(const string &src, GenMessage *tgt)
118 {
119 int expected_size = get_tcp_message_len(src.data(), src.size());
120 if (expected_size > (int)src.size() || expected_size < MIN_MESSAGE_SIZE)
121 {
122 LOG(WARNING, "Incomplete or incorrectly formatted TCP message.");
123 return false;
124 }
125 uint16_t flags = data_to_error(&src[0]);
126 if ((flags & FLAGS_OPENLCB_MSG) == 0)
127 {
128 return false;
129 }
130 tgt->clear();
131 HASSERT((flags & FLAGS_CHAINING) == 0);
132 tgt->flagsDst = 0;
133 tgt->flagsSrc = 0;
134 if (flags & FLAGS_FRAGMENT_NOT_FIRST)
135 {
136 tgt->set_flag_dst(GenMessage::DSTFLAG_NOT_FIRST_MESSAGE);
137 }
138 if (flags & FLAGS_FRAGMENT_NOT_LAST)
139 {
140 tgt->set_flag_dst(GenMessage::DSTFLAG_NOT_LAST_MESSAGE);
141 }
142 const char *msg = &src[HDR_LEN];
143 // We have already checked the size to be long enough for a global
144 // message with 0 bytes payload. So source address and MTI is ok to
145 // parse now.
146 tgt->mti = (Defs::MTI)data_to_error(msg + MSG_MTI_OFS);
147 tgt->src.clear();
148 tgt->dst.clear();
149 tgt->src.id = data_to_node_id(msg + MSG_SRC_OFS);
150 // now contains the length of the message after the header.
151 int payload_bytes = expected_size - HDR_LEN;
152 int payload_ofs;
153 if (Defs::get_mti_address(tgt->mti))
154 {
155 payload_ofs = MSG_ADR_PAYLOAD_OFS;
156 tgt->dst.id = data_to_node_id(msg + MSG_DST_OFS);
157 }
158 else
159 {
160 payload_ofs = MSG_GLOBAL_PAYLOAD_OFS;
161 }
162 payload_bytes -= payload_ofs;
163 if (payload_bytes < 0)
164 {
165 LOG(WARNING, "TCP message to short.");
166 return false;
167 }
168 tgt->payload.assign(msg + payload_ofs, payload_bytes);
169 return true;
170 }
171
175 static unsigned guess_priority(const string &tcp_payload)
176 {
177 if (tcp_payload.size() < ABS_MTI_OFS + 2)
178 {
179 return UINT_MAX;
180 }
181 auto mti = (Defs::MTI)data_to_error(tcp_payload.data() + ABS_MTI_OFS);
182 return Defs::mti_priority(mti);
183 }
184
185 enum
186 {
201
211 HDR_TIMESTAMP_OFS = 2 + 3 + 6,
213 HDR_LEN = 2 + 3 + 6 + 6,
214
221 MSG_DST_OFS = 2 + 6,
225
229
234
238 };
239
240private:
243};
244
248{
249public:
251 virtual long long get_sequence_number() = 0;
252};
253
257{
258public:
260 long long get_sequence_number() override
261 {
262 long long current_time_ms = os_get_time_monotonic() / 1000000;
263 if (current_time_ms > sequence_)
264 {
265 sequence_ = current_time_ms;
266 }
267 else
268 {
269 ++sequence_;
270 }
271 return sequence_;
272 }
273
274private:
276 long long sequence_ = 0;
277};
278
282{
283public:
296 TcpSendFlow(If *service, NodeID gateway_node_id,
297 HubPortInterface *send_target, HubPortInterface *skip_member,
298 SequenceNumberGenerator *sequence)
299 : MessageStateFlowBase(service)
300 , sendTarget_(send_target)
301 , skipMember_(skip_member)
302 , gatewayId_(gateway_node_id)
303 , sequenceNumberGenerator_(sequence)
304 {
305 }
306
309 {
310 return gatewayId_;
311 }
312
313private:
316 Action entry() override
317 {
318 auto id = nmsg()->dst.id;
319 if (id)
320 {
321 auto *n = iface()->lookup_local_node(id);
322 if (n)
323 {
324 // Addressed message loopback.
325 auto *b = transfer_message();
326 b->data()->dstNode = n;
327 iface()->dispatcher()->send(b, priority());
328 return release_and_exit();
329 }
330 }
331 return allocate_and_call(sendTarget_, STATE(render_src_message));
332 }
333
339 {
340 auto b = get_buffer_deleter(get_allocation_result(sendTarget_));
343 b->data()->skipMember_ = skipMember_;
344 sendTarget_->send(b.release(), nmsg()->priority());
345
346 // Checks and performs global loopback.
347 if (!nmsg()->dst.id)
348 {
349 iface()->dispatcher()->send(transfer_message(), priority());
350 }
351 return release_and_exit();
352 }
353
356 {
357 return message()->data();
358 }
359
362 {
363 return static_cast<If *>(service());
364 }
365
375};
376
381{
382public:
390 HubPortInterface *skipMember)
391 : StateFlowBase(s)
392 , dst_(dst)
393 , skipMember_(skipMember)
394 {
395 HASSERT(s->fd() >= 0);
396 bufEnd_ = 0;
397 bufOfs_ = 0;
399 }
400
402 void shutdown()
403 {
404 auto *e = this->service()->executor();
405 if (e->is_selected(&helper_))
406 {
407 e->unselect(&helper_);
408 helper_.remaining_ = 0;
409 }
412 }
413
414private:
417 {
418 return static_cast<FdHubPortService *>(service());
419 }
420
425 {
426 msg_.clear();
427 expectedLen_ = -1;
428 return parse_bytes();
429 }
430
438 {
439 if (bufEnd_ <= bufOfs_)
440 {
442 }
443 int available = bufEnd_ - bufOfs_;
444 if (expectedLen_ < 0)
445 {
446 // We have bytes in the read buffer but don't know the expected
447 // size yet, so none in the assembly buffer. Let's guess at the
448 // desired size of the assembly buffer.
451 if (expectedLen_ < 0)
452 {
453 // failed. Not enough bytes. Maybe move existing bytes to the
454 // beginning of the buffer and try to read more bytes.
455 if (bufOfs_ > (READ_BUFFER_SIZE / 2))
456 {
457 memmove(buffer_, buffer_ + bufOfs_, available);
458 bufEnd_ -= bufOfs_;
459 bufOfs_ = 0;
460 }
462 }
463 }
464 // now: we have an expected length.
466 if (msg_.empty())
467 {
468 msg_.reserve(expectedLen_);
469 }
470 // Copy some bytes to the assembly buffer.
471 int needed = expectedLen_ - msg_.size();
472 if (needed > available)
473 {
474 needed = available;
475 }
476 msg_.append((const char *)(buffer_ + bufOfs_), needed);
477 bufOfs_ += needed;
478 if (msg_.size() >= (unsigned)expectedLen_)
479 {
480 // we're done.
481 return send_entry();
482 }
483 else
484 {
487 }
488 }
489
494 {
495 if (bufEnd_ <= bufOfs_)
496 {
497 bufOfs_ = 0;
498 bufEnd_ = 0;
499 }
500 return read_single(&helper_, device()->fd(), buffer_ + bufEnd_,
502 }
503
507 {
508 if (helper_.hasError_)
509 {
513 return exit();
514 }
516 return parse_bytes();
517 }
518
525
529 {
530 auto *b = get_allocation_result(dst_);
531 auto prio = TcpDefs::guess_priority(msg_);
532 b->data()->assign(std::move(msg_));
533 b->data()->skipMember_ = skipMember_;
537 dst_->send(b, prio);
539 }
540
544 {
545 if (barrierOwned_)
546 {
547 barrierOwned_ = false;
549 }
550 }
551
553 static constexpr unsigned READ_BUFFER_SIZE = 300;
556 static constexpr unsigned DEFAULT_PACKET_SIZE =
558 static constexpr unsigned MIN_SIZE_GUESS = TcpDefs::HDR_SIZE_END;
560 static constexpr unsigned READ_PRIO = Selectable::MAX_PRIO;
561
565 uint8_t barrierOwned_{1};
567 uint16_t bufEnd_;
569 uint16_t bufOfs_;
574 string msg_;
579 StateFlowSelectHelper helper_{this};
580};
581
582using TcpHubDeviceSelect = HubDeviceSelect<HubFlow, FdToTcpParser>;
583
589{
590public:
594 : target_(target)
595 {
596 }
597
602 void send(Buffer<HubData> *data, unsigned prio) override
603 {
604 auto src = get_buffer_deleter(data);
605 auto dst = get_buffer_deleter(target_->alloc());
606 dst->set_done(data->new_child());
607 if (TcpDefs::parse_tcp_message(*src->data(), dst->data()))
608 {
609 target_->send(dst.release(), prio);
610 }
611 }
612
613private:
616};
617
618} // namespace openlcb
619
620#endif // _OPENLCB_IFTCPIMPL_HXX_
BufferPtr< T > get_buffer_deleter(Buffer< T > *b)
Helper function to create a BufferPtr of an appropriate type without having to explicitly specify the...
Definition Buffer.hxx:272
#define STATE(_fn)
Turns a function name into an argument to be supplied to functions expecting a state.
Definition StateFlow.hxx:61
void notify() override
Implementation of the barrier semantics.
BarrierNotifiable * new_child()
Creates a new child notifiable of the current done notifiable.
Definition Buffer.hxx:108
Base class for all QMember types that hold data in an expandable format.
Definition Buffer.hxx:195
Base class of everything with a virtual destructor.
void unselect(Selectable *job)
Removes a job from the select loop.
Definition Executor.cxx:359
Shared base class for thread-based and select-based hub devices.
Definition Hub.hxx:243
virtual void report_read_error()=0
Callback from the readflow when it encounters an error.
BarrierNotifiable barrier_
This notifiable will be called (if not NULL) upon read or write error.
Definition Hub.hxx:266
virtual void send(MessageType *message, unsigned priority=UINT_MAX)=0
Entry point to the flow.
MessageType * alloc()
Synchronously allocates a message buffer from the pool of this flow.
HubPort that connects a select-aware device to a strongly typed Hub.
@ MAX_PRIO
Largest priority we accept (otherwise we clip).
ExecutorBase * executor()
Return type for a state flow callback.
Base class for state machines.
Action read_single(StateFlowSelectHelper *helper, int fd, void *buf, size_t size, Callback c, unsigned priority=Selectable::MAX_PRIO)
Attempts to read at most size_t bytes, and blocks the caller until at least one byte is read.
Service * service()
Return a pointer to the service I am bound to.
Action allocate_and_call(FlowInterface< Buffer< T > > *target_flow, Callback c, Pool *pool=nullptr)
Allocates a buffer from a pool and proceed to the next state when allocation is successful.
Action exit()
Terminate current StateFlow activity.
void start_flow(Callback c)
Resets the flow to the specified state and starts it.
Buffer< T > * get_allocation_result(FlowInterface< Buffer< T > > *target_flow)
Takes the result of the asynchronous allocation.
Action call_immediately(Callback c)
Imediately call the next state upon return.
Action set_terminated()
Sets the flow to terminated state.
Base::Action Action
Allows using Action without having StateFlowBase:: prefix in front of it.
void send(MessageType *msg, unsigned priority=UINT_MAX) OVERRIDE
Sends a message to the state flow for processing.
MessageType * transfer_message()
Releases ownership of the current message.
MessageType * message()
Implementation of sequence number generator that uses the real clock.
long long sequence_
Sequence number of last sent message.
This flow is listening to data from a TCP connection, segments the incoming data into TcpMessages bas...
Action start_msg()
In this state the internal buffer (bufOfs_) points at the beginning of a message.
uint16_t bufOfs_
First active byte (offset of the beginning) in the read buffer.
HubPortInterface * skipMember_
Parsed messages will be initialized to this skipMember_.
static constexpr unsigned READ_BUFFER_SIZE
We attempt to read this many bytes in one go from the FD.
static constexpr unsigned READ_PRIO
What priority to use for reads from fds.
int expectedLen_
How many bytes we think the current message will be.
Action read_more_bytes()
Helper state to call the stateflow kernel to read bytes from the fd into the internal buffer.
FdToTcpParser(FdHubPortService *s, HubPortInterface *dst, HubPortInterface *skipMember)
Constructor.
HubPortInterface * dst_
Where to send parsed messages to.
uint8_t buffer_[READ_BUFFER_SIZE]
Temporary buffer to read into from the FD.
void notify_barrier()
Calls into the parent flow's barrier notify, but makes sure to only do this once in the lifetime of *...
Action parse_bytes()
Intermediate state where we are.
void shutdown()
Stops listening and terminates the flow.
string msg_
Assembly buffer. Holds the captured message so far.
Action have_alloc_msg()
Sends an assembled message (2) to the destination flow.
Action read_done()
Callback state when the kernel read is completed.
uint16_t bufEnd_
Offset of the end in the read buffer.
FdHubPortService * device()
static constexpr unsigned DEFAULT_PACKET_SIZE
If we have to guess at the size of the packet, we start by allocating this many bytes.
Action send_entry()
Sends an assembled message (1) to the destination flow.
uint8_t barrierOwned_
true iff pending parent->barrier_.notify()
Abstract class representing an OpenLCB Interface.
Definition If.hxx:185
MessageDispatchFlow * dispatcher()
Definition If.hxx:224
Node * lookup_local_node(NodeID id)
Looks up a node ID in the local nodes' registry.
Definition If.hxx:263
Virtual clock interface.
virtual long long get_sequence_number()=0
Returns the next strictly monotonic sequence number.
Static class for constants and utilities related to the TCP transport protocol.
Definition IfTcpImpl.hxx:48
static unsigned guess_priority(const string &tcp_payload)
static void render_tcp_message(const GenMessage &msg, NodeID gateway_node_id, long long sequence, string *tgt)
Renders a TCP message into a single buffer, ready to transmit.
Definition IfTcpImpl.hxx:56
TcpDefs()
No usable constructor; this is a static-only class.
@ HDR_SIZE_OFS
Offset in the header of the size field.
@ MSG_DST_OFS
Offset in the message (=after the header) of the dst node ID field (for addressed MTI only).
@ HDR_TIMESTAMP_OFS
Offset in the header of the timestamp (seq no) field.
@ HDR_SIZE_END
Offset in the header of the end of the size field.
@ MSG_SRC_OFS
Offset in the message (=after the header) of the src node ID field.
@ MSG_MTI_OFS
Offset in the message (=after the header) of the MTI field.
@ HDR_FLAG_OFS
Offset in the header of the flags field.
@ HDR_GATEWAY_OFS
Offset in the header of the source gateway field.
@ MSG_ADR_PAYLOAD_OFS
Offset in the message (=after the header) of the message payload (for addressed MTI only).
@ FLAGS_FRAGMENT_NOT_LAST
"not last" fragmenting bit in the flags field.
@ MSG_GLOBAL_PAYLOAD_OFS
Offset in the message (=after the header) of the message payload (for global MTI only).
@ FLAGS_FRAGMENT_NOT_FIRST
"not first" fragmenting bit in the flags field.
@ HDR_LEN
Total length of the fixed header.
@ FLAGS_RESVD1_ZERO_CHECK
Reserved bits in the flags field. Check as zero.
@ FLAGS_CHAINING
Chaining bit in the uint16 flags field.
@ FLAGS_OPENLCB_MSG
Bit in the uint16 flags field in the header that signals frames that carry an OpenLCB message.
@ MIN_ADR_MESSAGE_SIZE
Minimum length of a valid message that has an addressed MTI.
@ FLAGS_RESVD2_IGNORED
Reserved bits in the flags field. Ignore (by the standard).
@ ABS_MTI_OFS
Offset from the header of the MTI field in the message.
@ MIN_MESSAGE_SIZE
Minimum length of a valid message.
static bool parse_tcp_message(const string &src, GenMessage *tgt)
Parses a TCP message format (from binary payload) into a general OpenLCB message.
static int get_tcp_message_len(const void *data, size_t len)
Guesses the length of a tcp message from looking at the prefix of the payload.
Definition IfTcpImpl.hxx:96
Simple stateless translator for incoming TCP messages from binary format into the structured format.
void send(Buffer< HubData > *data, unsigned prio) override
Entry point for the incoming (binary) data.
MessageHandler * target_
Flow(interface) where to pass on the parsed GenMessage.
TcpRecvFlow(MessageHandler *target)
This flow renders outgoing OpenLCB messages to their TCP stream representation.
NodeID get_gateway_node_id()
Action entry() override
Handler where dequeueing of messages to be sent starts.
NodeID gatewayId_
Populated into the source gateway field of the outgoing messages.
HubPortInterface * skipMember_
This value will be populated to the skipMember_ field.
Action render_src_message()
Callback state after allocation succeeded.
GenMessage * nmsg()
HubPortInterface * sendTarget_
Where to send the rendered messages to.
SequenceNumberGenerator * sequenceNumberGenerator_
Responsible for generating the sequence numbers of the outgoing messages.
TcpSendFlow(If *service, NodeID gateway_node_id, HubPortInterface *send_target, HubPortInterface *skip_member, SequenceNumberGenerator *sequence)
Constructor.
#define LOG(level, message...)
Conditionally write a message to the logging output.
Definition logging.h:99
static const int WARNING
Loglevel that is always printed, reporting a warning or a retryable error.
Definition logging.h:55
#define HASSERT(x)
Checks that the value of expression x is true, else terminates the current process.
Definition macros.h:138
#define DASSERT(x)
Debug assertion facility.
Definition macros.h:159
void node_id_to_data(NodeID id, void *data)
Convenience function to render a 48-bit NMRAnet node ID into an existing buffer.
Definition If.cxx:52
NodeID data_to_node_id(const void *d)
Converts 6 bytes of big-endian data to a node ID.
Definition If.cxx:59
uint64_t NodeID
48-bit NMRAnet Node ID type
uint16_t data_to_error(const void *data)
Parses an error code from a payload object at a given pointer.
Definition If.cxx:84
void error_to_data(uint16_t error_code, void *data)
Writes an error code into a payload object at a given pointer.
Definition If.cxx:78
long long os_get_time_monotonic(void)
Get the monotonic time since the system started.
Definition os.c:571
Use this class to read from an fd using select() in a state flow.
unsigned remaining_
Number of bytes still outstanding to read.
unsigned hasError_
1 if there was an error reading of writing.
static unsigned int mti_priority(MTI mti)
Get the MTI priority (value 0 through 3).
MTI
Known Message type indicators.
static bool get_mti_address(MTI mti)
Get the MTI address present value field.
This class is used in the dispatching of incoming or outgoing NMRAnet messages to the message handler...
Definition If.hxx:72
@ DSTFLAG_NOT_FIRST_MESSAGE
Signals to the stack that we need to set the continuation bits in the outgoing message to indicate th...
Definition If.hxx:163
@ DSTFLAG_NOT_LAST_MESSAGE
Signals to the stack that we need to set the continuation bits in the outgoing message to indicate th...
Definition If.hxx:167
NodeHandle dst
Destination node.
Definition If.hxx:106
NodeHandle src
Source node.
Definition If.hxx:104
Defs::MTI mti
OpenLCB MTI of the incoming message.
Definition If.hxx:108
string payload
Data content in the message body.
Definition If.hxx:113
NodeID id
48-bit NMRAnet Node ID
void clear()
Resets node handle to global (broadcast) handle.