Open Model Railroad Network (OpenMRN)
Loading...
Searching...
No Matches
DatagramImpl.hxx
1
35#include "openlcb/Datagram.hxx"
37
38namespace openlcb
39{
40
43extern long long DATAGRAM_RESPONSE_TIMEOUT_NSEC;
44
53 public StateFlowBase,
54 public LinkedObject<DatagramClientImpl>
55{
56public:
63 , sendFlow_(send_flow)
64 , listener_(this)
65 , isSleeping_(0)
66 , sendPending_(0)
67 {
68 }
69
70 void write_datagram(Buffer<GenMessage> *b, unsigned priority) OVERRIDE
71 {
72 if (!b->data()->mti)
73 {
74 b->data()->mti = Defs::MTI_DATAGRAM;
75 }
76 HASSERT(b->data()->mti == Defs::MTI_DATAGRAM);
77 result_ = OPERATION_PENDING;
78 reset_message(b, priority);
80 }
81
85 {
86 DIE("Canceling datagram send operation is not yet implemented.");
87 }
88
89private:
93 void reset_message(Buffer<GenMessage> *b, unsigned priority)
94 {
95 set_priority(priority);
96 message_ = b;
97 }
98
102 {
103 iface()->canonicalize_handle(&message_->data()->src);
104 iface()->canonicalize_handle(&message_->data()->dst);
105 src_ = message_->data()->src;
106 dst_ = message_->data()->dst;
107 return acquire_srcdst_lock();
108 }
109
114 {
115 // First check if there is another datagram client sending a datagram
116 // to the same target node.
117 {
119 for (DatagramClientImpl *c =
121 c; c = c->LinkedObject<DatagramClientImpl>::link_next())
122 {
123 // this will catch c == this.
124 if (!c->sendPending_) continue;
125 if (c->src_.id != src_.id) continue;
126 if (!iface()->matching_node(c->dst_, dst_))
127 continue;
128 // Now: there is another datagram client sending a datagram to
129 // this destination. We need to wait for that transaction to
130 // complete.
131 c->waitingClients_.push_front(this);
132 return wait();
133 }
134 }
135
136 return do_send();
137 }
138
142 {
143 auto *b = message_;
144 message_ = nullptr;
145 // These two statements transfer the barrier's ownership from the
146 // BufferBase to our pointer variable.
147 done_ = b->new_child();
148 b->set_done(nullptr);
149
150 register_handlers();
151 // Transfers ownership.
153
154 isSleeping_ = 1;
156 STATE(timeout_waiting_for_dg_response));
157 }
158
159 enum
160 {
163 MASK_1 = ~(MTI_1a ^ MTI_1b),
164 MTI_1 = MTI_1a,
165 MTI_2a = Defs::MTI_DATAGRAM_OK,
167 MASK_2 = ~(MTI_2a ^ MTI_2b),
168 MTI_2 = MTI_2a,
170 MASK_3 = Defs::MTI_EXACT,
171 };
172
173 void register_handlers()
174 {
175 hasResponse_ = 0;
176 isSleeping_ = 0;
177 sendPending_ = 1;
178 iface()->dispatcher()->register_handler(&listener_, MTI_1, MASK_1);
179 iface()->dispatcher()->register_handler(&listener_, MTI_2, MASK_2);
180 iface()->dispatcher()->register_handler(&listener_, MTI_3, MASK_3);
181 }
182
188 {
189 result_ |= PERMANENT_ERROR | DST_NOT_FOUND;
190 unregister_response_handler();
191 return call_immediately(STATE(datagram_finalize));
192 }
193
194 Action timeout_waiting_for_dg_response()
195 {
196 LOG(INFO,
197 "CanDatagramWriteFlow: No datagram response arrived from "
198 "destination %012" PRIx64 ".",
199 dst_.id);
200 isSleeping_ = 0;
201 unregister_response_handler();
202 result_ |= PERMANENT_ERROR | TIMEOUT;
203 return call_immediately(STATE(datagram_finalize));
204 }
205
206 void unregister_response_handler()
207 {
208 iface()->dispatcher()->unregister_handler(&listener_, MTI_1, MASK_1);
209 iface()->dispatcher()->unregister_handler(&listener_, MTI_2, MASK_2);
210 iface()->dispatcher()->unregister_handler(&listener_, MTI_3, MASK_3);
211 sendPending_ = 0;
212 if (!waitingClients_.empty())
213 {
214 DatagramClientImpl *c =
215 static_cast<DatagramClientImpl *>(waitingClients_.pop_front());
216 // Hands off all waiting clients to c.
217 HASSERT(c->waitingClients_.empty());
218 std::swap(waitingClients_, c->waitingClients_);
219 c->notify();
220 }
221 }
222
223 Action datagram_finalize()
224 {
226 HASSERT(result_ & OPERATION_PENDING);
227 result_ &= ~OPERATION_PENDING;
228 if (done_)
229 {
230 done_->notify();
231 done_ = nullptr;
232 }
233 return set_terminated();
234 }
235
239 {
240 public:
242 : parent_(parent)
243 {
244 }
245
246 void send(message_type *buffer, unsigned priority = UINT_MAX) OVERRIDE
247 {
248 parent_->handle_response(buffer->data());
249 buffer->unref();
250 }
251
252 private:
253 DatagramClientImpl *parent_;
254 };
255
260 {
261 // LOG(INFO, "%p: Incoming response to datagram: mti %x from %x", this,
262 // (int)message->mti, (int)message->src.alias);
263
264 // Check for reboot (unaddressed message) first.
266 {
267 if (message->payload.size() != 6)
268 {
269 // Malformed message inbound.
270 return;
271 }
272 NodeHandle rebooted(message->src);
273 rebooted.id = buffer_to_node_id(message->payload);
274 if (iface()->matching_node(dst_, rebooted))
275 {
276 // Destination node has rebooted. Kill datagram flow.
277 result_ |= DST_REBOOT;
279 }
280 return; // everything else below is for addressed message
281 }
282
283 // First we check that the response is for this source node.
284 if (!iface()->matching_node(message->dst, src_))
285 {
286 LOG(VERBOSE, "wrong dst");
287 return;
288 }
289 // We also check that the source of the response is our destination.
290 if (!iface()->matching_node(message->src, dst_))
291 {
292 LOG(VERBOSE, "wrong src");
293 return;
294 }
295
296 uint16_t error_code = 0;
297 uint8_t payload_length = 0;
298 const uint8_t *payload = nullptr;
299 if (!message->payload.empty())
300 {
301 payload =
302 reinterpret_cast<const uint8_t *>(message->payload.data());
303 payload_length = message->payload.size();
304 }
305 if (payload_length >= 2)
306 {
307 error_code = (((uint16_t)payload[0]) << 8) | payload[1];
308 }
309
310 switch (message->mti)
311 {
314 {
315 if (payload_length >= 4)
316 {
317 uint16_t return_mti = payload[2];
318 return_mti <<= 8;
319 return_mti |= payload[3];
320 if (return_mti != Defs::MTI_DATAGRAM)
321 {
322 // This must be a rejection of some other
323 // message. Ignore.
324 LOG(VERBOSE, "wrong rejection mti");
325 return;
326 }
327 }
328 } // fall through
330 {
331 result_ &= ~0xffff;
332 result_ |= error_code;
333 // Ensures that an error response is visible in the flags.
334 if (!(result_ & (PERMANENT_ERROR | RESEND_OK)))
335 {
336 result_ |= PERMANENT_ERROR;
337 }
338 break;
339 }
341 {
342 if (payload_length)
343 {
344 result_ &= ~(0xff << RESPONSE_FLAGS_SHIFT);
345 result_ |= payload[0] << RESPONSE_FLAGS_SHIFT;
346 }
347 result_ |= OPERATION_SUCCESS;
348 break;
349 }
350 default:
351 // Ignore message.
352 LOG(VERBOSE, "unknown mti");
353 return;
354 } // switch response MTI
356 } // handle_message
357
361 {
362 // Avoids duplicate wakeups on the timer.
363 unregister_response_handler();
364 hasResponse_ = 1;
365 if (isSleeping_)
366 {
367 // Stops waiting for response and notifies the current flow.
368 timer_.trigger();
369 isSleeping_ = 0;
370 }
373 LOG(VERBOSE, "restarting at datagram finalize");
374 reset_flow(STATE(datagram_finalize));
375 }
376
379 void notify() override
380 {
381 service()->executor()->add(this, priority_);
382 }
383
386 void set_priority(unsigned p)
387 {
388 priority_ = std::min((unsigned)MAX_PRIORITY, p);
389 }
390
393 {
394 return static_cast<If *>(service());
395 }
396
419 unsigned isSleeping_ : 1;
420 unsigned hasResponse_ : 1;
423 unsigned sendPending_ : 1;
425 unsigned priority_ : 24;
428 static constexpr unsigned MAX_PRIORITY = (1 << 24) - 1;
429}; // class DatagramClientImpl
430
431} // namespace openlcb
#define STATE(_fn)
Turns a function name into an argument to be supplied to functions expecting a state.
Definition StateFlow.hxx:61
See OSMutexLock in os/OS.hxx.
Definition Atomic.hxx:153
A BarrierNotifiable allows to create a number of child Notifiable and wait for all of them to finish.
void notify() override
Implementation of the barrier semantics.
BarrierNotifiable * new_child()
Call this for each child task.
Base class for all QMember types that hold data in an expandable format.
Definition Buffer.hxx:195
void register_handler(HandlerType *handler, ID id, ID mask)
Adds a new handler to this dispatcher.
void unregister_handler(HandlerType *handler, ID id, ID mask)
Removes a specific instance of a handler from this dispatcher.
virtual void add(Executable *action, unsigned priority=UINT_MAX)=0
Send a message to this Executor's queue.
virtual void send(MessageType *message, unsigned priority=UINT_MAX)=0
Entry point to the flow.
Using this class as a base class will cause the given class to have all its instances linked up in a ...
ExecutorBase * executor()
Return type for a state flow callback.
Use this timer class to deliver the timeout notification to a stateflow.
Base class for state machines.
Service * service()
Return a pointer to the service I am bound to.
void start_flow(Callback c)
Resets the flow to the specified state and starts it.
Action wait()
Wait for an asynchronous call.
void reset_flow(Callback c)
Resets the flow to the specified state.
Action call_immediately(Callback c)
Imediately call the next state upon return.
Action set_terminated()
Sets the flow to terminated state.
Action sleep_and_call(::Timer *timer, long long timeout_nsec, Callback c)
Suspends execution of this control flow for a specified time.
void trigger()
This will wakeup the timer prematurely, immediately.
Definition Timer.hxx:237
A simple, fast, type-safe single-linked queue class with non-virtual methods.
T * pop_front()
Removes the entry at the front of the queue.
This object is registered to receive response messages at the interface level.
void send(message_type *buffer, unsigned priority=UINT_MAX) OVERRIDE
Entry point to the flow.
Datagram client implementation for CANbus-based datagram protocol.
unsigned isSleeping_
1 when we are in the sleep call waiting for the datagram Ack or Reject message.
MessageHandler * sendFlow_
Addressed datagram send flow from the interface. Externally owned.
BarrierNotifiable * done_
This notifiable is saved from the datagram buffer.
DatagramClientImpl(If *iface, MessageHandler *send_flow)
Constructor.
void notify() override
Overrides the default notify implementation to make sure we obey the priority values.
void cancel() OVERRIDE
Requests cancelling the datagram send operation.
Action do_send()
Hands off the datagram to the send flow.
TypedQueue< Executable > waitingClients_
List of other datagram clients that are trying to send to the same target node.
unsigned sendPending_
1 when we have the handlers registered.
StateFlowTimer timer_
Helper object for sleep.
void set_priority(unsigned p)
Sets the stateflow priority.
ReplyListener listener_
Instance of the listener object.
NodeHandle dst_
Destination of the datagram we are currently sending.
Action start_send()
Entry point to the flow processing.
void write_datagram(Buffer< GenMessage > *b, unsigned priority) OVERRIDE
Triggers sending a datagram.
void reset_message(Buffer< GenMessage > *b, unsigned priority)
Equivalent to enqueuing a new datagram to send.
void handle_response(GenMessage *message)
Callback when a matching response comes in on the bus.
static constexpr unsigned MAX_PRIORITY
Constant used to clamp the incoming priority value to something that first in priority_ bit field.
Buffer< GenMessage > * message_
Datagram message we are trying to send now. We own it.
unsigned priority_
Priority in the executor.
void stop_waiting_for_response()
To be called from the handler.
NodeHandle src_
Source of the datagram we are currently sending.
Action acquire_srcdst_lock()
Ensures that there is no other datagram client with the same src:dst pair.
Use this class to send datagrams.
Definition Datagram.hxx:79
Abstract class representing an OpenLCB Interface.
Definition If.hxx:185
virtual void canonicalize_handle(NodeHandle *h)
Canonicalizes the node handle: fills in id and/or alias from the maps the interface holds internally.
Definition If.hxx:327
MessageDispatchFlow * dispatcher()
Definition If.hxx:224
#define LOG(level, message...)
Conditionally write a message to the logging output.
Definition logging.h:99
static const int VERBOSE
Loglevel that is usually not printed, reporting debugging information.
Definition logging.h:59
static const int INFO
Loglevel that is printed by default, reporting some status information.
Definition logging.h:57
#define OVERRIDE
Function attribute for virtual functions declaring that this funciton is overriding a funciton that s...
Definition macros.h:180
#define HASSERT(x)
Checks that the value of expression x is true, else terminates the current process.
Definition macros.h:138
#define DIE(MSG)
Unconditionally terminates the current process with a message.
Definition macros.h:143
long long DATAGRAM_RESPONSE_TIMEOUT_NSEC
Defines how long the datagram client flow should wait for the datagram ack/nack response message.
Definition Datagram.cxx:42
NodeID buffer_to_node_id(const string &buf)
Converts a 6-byte-long buffer to a node ID.
Definition If.cxx:66
@ MTI_DATAGRAM_REJECTED
datagram rejected by receiver
@ MTI_EXACT
match mask for a single MTI
@ MTI_DATAGRAM_OK
datagram received okay
@ MTI_INITIALIZATION_COMPLETE
initialization complete
@ MTI_OPTIONAL_INTERACTION_REJECTED
rejected request
@ MTI_TERMINATE_DUE_TO_ERROR
terminate due to some error
@ MTI_DATAGRAM
datagram
This class is used in the dispatching of incoming or outgoing NMRAnet messages to the message handler...
Definition If.hxx:72
NodeHandle dst
Destination node.
Definition If.hxx:106
NodeHandle src
Source node.
Definition If.hxx:104
Defs::MTI mti
OpenLCB MTI of the incoming message.
Definition If.hxx:108
string payload
Data content in the message body.
Definition If.hxx:113
Container of both a NodeID and NodeAlias.
NodeID id
48-bit NMRAnet Node ID