Open Model Railroad Network (OpenMRN)
Loading...
Searching...
No Matches
GridConnectHub.cxx
Go to the documentation of this file.
1
34//#define LOGLEVEL VERBOSE
35
36#include "openmrn_features.h"
37
39
41#include "can_frame.h"
42#include "nmranet_config.h"
43#include "utils/Buffer.hxx"
44#include "utils/BufferPort.hxx"
45#include "utils/HubDevice.hxx"
46#if OPENMRN_FEATURE_EXECUTOR_SELECT
47#include "utils/HubDeviceSelect.hxx"
48#endif
49#include "utils/Hub.hxx"
51#include "utils/gc_format.h"
52
56{
57public:
64 GCAdapter(HubFlow *gc_side, CanHubFlow *can_side, bool double_bytes)
65 : parser_(can_side->service(), can_side, &formatter_)
66 , formatter_(can_side->service(), gc_side, &parser_, double_bytes)
67 {
68 gc_side->register_port(&parser_);
69 can_side->register_port(&formatter_);
70 isRegistered_ = 1;
71 }
72
82 GCAdapter(HubFlow *gc_side_read, HubFlow *gc_side_write,
83 CanHubFlow *can_side, bool double_bytes)
84 : parser_(can_side->service(), can_side, &formatter_)
85 , formatter_(can_side->service(), gc_side_write, &parser_, double_bytes)
86 {
87 gc_side_read->register_port(&parser_);
88 can_side->register_port(&formatter_);
89 isRegistered_ = 1;
90 }
91
92 virtual ~GCAdapter()
93 {
94 unregister();
95 }
96
108
110 {
111 unregister();
114 }
115
120 {
121 public:
131 HubPort *skip_member, int double_bytes)
133 , delayPort_(service, destination, config_gridconnect_buffer_size(),
134 USEC_TO_NSEC(config_gridconnect_buffer_delay_usec()))
136 , skipMember_(skip_member)
137 , double_bytes_(double_bytes)
138 {
139 const int cnt = config_gridconnect_bridge_max_outgoing_packets();
140 if (cnt > 1)
141 {
142 ownedPool_.reset(
143 new LimitedPool(sizeof(Buffer<CanHubData>), cnt));
144 pool_ = ownedPool_.get();
145 }
146 else
147 {
149 }
150 }
151
154 {
155 return destination_;
156 }
157
160 bool shutdown()
161 {
162 const int cnt = config_gridconnect_bridge_max_outgoing_packets();
163 bool state_delay = delayPort_.shutdown();
164 bool state_pool = true;
165 if (ownedPool_)
166 {
167 state_pool = (int(ownedPool_->free_items()) == cnt);
168 }
169 return state_delay && state_pool;
170 }
171
174 Pool *pool() override
175 {
176 return pool_;
177 }
178
179 Action entry() override
180 {
181 LOG(VERBOSE, "can packet arrived: %" PRIx32,
182 GET_CAN_FRAME_ID_EFF(*message()->data()));
183 char *end =
185 size_t size = (end - dbuf_);
186 if (size)
187 {
188 Buffer<HubData> *target_buffer = nullptr;
190 mainBufferPool->alloc(&target_buffer);
191 target_buffer->data()->skipMember_ = skipMember_;
194 target_buffer->data()->resize(size);
195 memcpy((char *)target_buffer->data()->data(), dbuf_, size);
196 target_buffer->set_done(bn_.reset(this));
197 delayPort_.send(target_buffer, 0);
198 release();
199 return wait_and_call(STATE(buffer_accepted));
200 }
201 else
202 {
203 LOG(INFO, "gc generate failed.");
204 }
205 return release_and_exit();
206 }
207
208 Action buffer_accepted()
209 {
210 return exit();
211 }
212
213 private:
218 std::unique_ptr<LimitedPool> ownedPool_;
222 char dbuf_[56];
231 };
232
237 {
238 public:
249 , skipMember_(skip_member)
250 {
251 int max_frames_to_parse =
252 config_gridconnect_bridge_max_incoming_packets();
253 if (max_frames_to_parse > 1) {
254 frameAllocator_.reset(new LimitedPool(
255 sizeof(CanHubFlow::buffer_type), max_frames_to_parse));
256 }
257 }
258
263 {
264 int max_frames_to_parse =
265 config_gridconnect_bridge_max_incoming_packets();
266 if (max_frames_to_parse > 1)
267 {
268 if (frameAllocator_->free_items() < (size_t)max_frames_to_parse)
269 {
270 return false;
271 }
272 }
273 return is_waiting();
274 }
275
278 {
279 return destination_;
280 }
281
283 Action entry() override
284 {
285 inBuf_ = message()->data()->data();
286 inBufSize_ = message()->data()->size();
288 }
289
293 {
294 while (inBufSize_--)
295 {
296 char c = *inBuf_++;
298 {
299 // End of frame. Allocate an output buffer and parse the
300 // frame.
301 return allocate_and_call(destination_, STATE(parse_to_output_frame), frameAllocator_.get());
302 }
303 }
304 // Will notify the caller.
305 return release_and_exit();
306 }
307
312 {
315 {
316 b->data()->skipMember_ = skipMember_;
317 destination_->send(b);
318 }
319 else
320 {
321 // Releases the buffer.
322 b->unref();
323 }
325 }
326
327 private:
330
332 const char *inBuf_;
335
336 // Allocator to get the frame from. If NULL, the target's default
337 // buffer pool will be used.
338 std::unique_ptr<LimitedPool> frameAllocator_;
339
340 // ==== static data ====
341
346 };
347
348private:
354 unsigned isRegistered_ : 1;
355};
356
358 CanHubFlow *can_side,
359 bool double_bytes)
360{
361 return new GCAdapter(gc_side, can_side, double_bytes);
362}
363
365 HubFlow *gc_side_write,
366 CanHubFlow *can_side,
367 bool double_bytes)
368{
369 return new GCAdapter(gc_side_read, gc_side_write, can_side, double_bytes);
370}
371
376{
381 Impl(CanHubFlow *can_hub, bool timestamped)
382 : canHub_(can_hub)
383 , timestamped_(timestamped)
384 {
385 canHub_->register_port(this);
386 }
387
388 ~Impl()
389 {
391 }
392
397 void send(Buffer<CanHubData> *message, unsigned priority) OVERRIDE
398 {
400 char str[40];
401 char* p = gc_format_generate(message->data(), str, false);
402 *p = 0;
403 if (timestamped_)
404 {
405#if defined(__linux__) || defined(__MACH__)
406 struct timeval tv;
407 gettimeofday(&tv, nullptr);
408 struct tm t;
409 localtime_r(&tv.tv_sec, &t);
410 fprintf(stderr, "%04d-%02d-%02d %02d:%02d:%02d:%06ld [%p] ",
411 t.tm_year + 1900, t.tm_mon + 1, t.tm_mday, t.tm_hour, t.tm_min,
412 t.tm_sec, (long)tv.tv_usec, message->data()->skipMember_);
413#endif
414 }
415 fprintf(stderr, "%s", str);
416 if (config_gc_generate_newlines() != 1)
417 {
418 fprintf(stderr, "\n");
419 }
420 }
421
426};
427
428GcPacketPrinter::GcPacketPrinter(CanHubFlow *can_hub, bool timestamped) : impl_(new Impl(can_hub, timestamped))
429{
430}
431
432GcPacketPrinter::~GcPacketPrinter()
433{
434}
435
441struct GcHubPort : public Executable
442{
451 GcHubPort(CanHubFlow *can_hub, int fd, Notifiable *on_exit, bool use_select)
452 : gcHub_(can_hub->service())
453 , bridge_(
454 GCAdapterBase::CreateGridConnectAdapter(&gcHub_, can_hub, false))
455 , onExit_(on_exit)
456 {
457 LOG(VERBOSE, "gchub port %p", (Executable *)this);
458 if (use_select) {
459#ifndef OPENMRN_FEATURE_EXECUTOR_SELECT
460 DIE("select is not supported");
461#else
462 gcWrite_.reset(new HubDeviceSelect<HubFlow>(&gcHub_, fd, this));
463#endif
464 } else {
465 gcWrite_.reset(new FdHubPort<HubFlow>(&gcHub_, fd, this));
466 }
467 }
468 virtual ~GcHubPort()
469 {
470 }
471
484 std::unique_ptr<GCAdapterBase> bridge_;
488 std::unique_ptr<FdHubPortInterface> gcWrite_;
492
495 {
496 /* We would like to delete *this but we cannot do that in this
497 * callback, because we don't know what executor we are running
498 * on. Deleting on the write executor would cause a deadlock for
499 * example. */
500 gcHub_.service()->executor()->add(this);
501 }
502
504 {
505 if (!bridge_->shutdown() || !gcHub_.is_waiting())
506 {
507 // Yield.
508 gcHub_.service()->executor()->add(this);
509 return;
510 }
511 LOG(INFO, "GCHubPort: Shutting down gridconnect port %d. (%p)",
512 gcWrite_->fd(), bridge_.get());
513 if (onExit_) {
514 onExit_->notify();
515 onExit_ = nullptr;
516 }
517 /* We get this call when something is wrong with the FDs and we need to
518 * close the connection. It is guaranteed that by the time we got this
519 * call the device is unregistered from the char bridge, and the
520 * service thread is ready to be stopped. */
521 delete this;
522 }
523};
524
526 CanHubFlow *can_hub, int fd, Notifiable *on_exit, bool use_select)
527{
528 new GcHubPort(can_hub, fd, on_exit, use_select);
529}
DynamicPool * mainBufferPool
main buffer pool instance
Definition Buffer.cxx:37
std::unique_ptr< Buffer< T >, BufferDelete< T > > AutoReleaseBuffer
This class will automatically unref a Buffer when going out of scope.
Definition Buffer.hxx:256
void create_gc_port_for_can_hub(CanHubFlow *can_hub, int fd, Notifiable *on_exit, bool use_select)
Creates a new port on a CAN hub in gridconnect format for a select-compatible file descriptor.
#define STATE(_fn)
Turns a function name into an argument to be supplied to functions expecting a state.
Definition StateFlow.hxx:61
A BarrierNotifiable allows to create a number of child Notifiable and wait for all of them to finish.
BarrierNotifiable * reset(Notifiable *done)
Resets the barrier. Returns &*this. Asserts that is_done().
void set_done(BarrierNotifiable *done)
Specifies that a given BarrierNotifiable must be called when the Buffer is deallocated (unreffed to z...
Definition Buffer.hxx:97
A wrapper class around a string-based Hub Port that buffers the outgoing bytes for a specified delay ...
Base class for all QMember types that hold data in an expandable format.
Definition Buffer.hxx:195
T * data()
get a pointer to the start of the data.
Definition Buffer.hxx:215
An object that can be scheduled on an executor to run.
virtual void add(Executable *action, unsigned priority=UINT_MAX)=0
Send a message to this Executor's queue.
HubPort that connects a raw device to a strongly typed Hub.
Publicly visible API for the gridconnect-to-CAN bridge.
static GCAdapterBase * CreateGridConnectAdapter(HubFlow *gc_side, CanHubFlow *can_side, bool double_bytes)
This function connects an ASCII (GridConnect-format) CAN adapter to a binary CAN adapter,...
HubPort (on a CAN-typed hub) that turns a binary CAN packet into a string-formatted CAN packet,...
Pool * pool() override
The dispatcher will be using this pool to allocate frames when a hub needs to make a copy for an outg...
int double_bytes_
Non-zero if doubling was requested.
Pool * pool_
The allocation buffer pool to use for outgoing frames.
std::unique_ptr< LimitedPool > ownedPool_
If we want frame limits, this pool can do that for us.
Action entry() override
Entry into the StateFlow activity.
HubPort * skipMember_
The pipe member that should be sent as "source".
BinaryToGCMember(Service *service, HubFlow *destination, HubPort *skip_member, int double_bytes)
Constructor.
char dbuf_[56]
Destination buffer (characters).
BarrierNotifiable bn_
Helper object.
BufferPort delayPort_
Helper class that assembles larger outgoing packets from the individual packets by delaying data a li...
HubFlow * destination_
Pipe to send data to.
HubPort (on a string hub) that turns a gridconnect-formatted CAN packet into a binary CAN packet,...
size_t inBufSize_
The remaining number of characters in inBuf_.
GcStreamParser streamSegmenter_
Holds the state of the incoming characters and the boundary.
Action parse_more_data()
Matches the incoming characters to the pattern to form incoming frames.
Action entry() override
Takes more characters from the pending incoming buffer.
CanHubPortInterface * skipMember_
The pipe member that should be sent as "source".
Action parse_to_output_frame()
Takes the completed frame in cbuf_, parses it into the allocation result (a can pipe buffer) and send...
const char * inBuf_
The incoming characters.
CanHubFlow * destination_
Pipe to send data to.
GCToBinaryMember(Service *service, CanHubFlow *destination, CanHubPort *skip_member)
Constructor.
Actual implementation for the gridconnect bridge between a string-typed Hub and a CAN-frame-typed Hub...
GCToBinaryMember parser_
PipeMember doing the parsing.
GCAdapter(HubFlow *gc_side, CanHubFlow *can_side, bool double_bytes)
Constructor.
BinaryToGCMember formatter_
PipeMember doing the formatting.
bool shutdown() OVERRIDE
Unregisters *this from the pipes.
unsigned isRegistered_
1 if the flows are registered.
void unregister()
GCAdapter(HubFlow *gc_side_read, HubFlow *gc_side_write, CanHubFlow *can_side, bool double_bytes)
Constructor.
GcPacketPrinter(CanHubFlow *can_hub, bool timestamped)
constructor
Parses a sequence of characters; finds GridConnect protocol packet boundaries in the sequence of pack...
bool consume_byte(char c)
Adds the next character from the source stream.
bool parse_frame_to_output(struct can_frame *output_frame)
Parses the current contents of the frame buffer to a can_frame struct.
void register_port(port_type *port)
Adds a new port.
Definition Hub.hxx:167
void unregister_port(port_type *port)
Removes a previously added port.
Definition Hub.hxx:174
HubPort that connects a select-aware device to a strongly typed Hub.
Implementation of a Pool interface that takes memory from mainBufferPool (configurable) but limits th...
An object that can schedule itself on an executor to run.
virtual void notify()=0
Generic callback.
Pool of previously allocated, but currently unused, items.
Definition Buffer.hxx:278
void alloc(Buffer< BufferType > **result, Executable *flow=NULL)
Get a free item out of the pool.
Definition Buffer.hxx:292
Collection of related state machines that pend on incoming messages.
ExecutorBase * executor()
Return type for a state flow callback.
Service * service()
Return a pointer to the service I am bound to.
Action allocate_and_call(FlowInterface< Buffer< T > > *target_flow, Callback c, Pool *pool=nullptr)
Allocates a buffer from a pool and proceed to the next state when allocation is successful.
Buffer< T > * get_allocation_result(FlowInterface< Buffer< T > > *target_flow)
Takes the result of the asynchronous allocation.
Action wait_and_call(Callback c)
Wait for resource to become available before proceeding to next state.
Action exit()
Terminates the processing of this flow.
Action release_and_exit()
Terminates the processing of the current message.
State flow with a given typed input queue.
void release() OVERRIDE
Unrefs the current buffer.
void send(MessageType *msg, unsigned priority=UINT_MAX) OVERRIDE
Sends a message to the state flow for processing.
Action call_immediately(Callback c)
Imediately call the next state upon return.
char * gc_format_generate(const struct can_frame *can_frame, char *buf, int double_format)
Formats a can frame in the GridConnect protocol.
#define LOG(level, message...)
Conditionally write a message to the logging output.
Definition logging.h:99
static const int VERBOSE
Loglevel that is usually not printed, reporting debugging information.
Definition logging.h:59
static const int INFO
Loglevel that is printed by default, reporting some status information.
Definition logging.h:57
#define OVERRIDE
Function attribute for virtual functions declaring that this funciton is overriding a funciton that s...
Definition macros.h:180
#define DIE(MSG)
Unconditionally terminates the current process with a message.
Definition macros.h:143
#define USEC_TO_NSEC(_usec)
Convert a microsecond value to a nanosecond value.
Definition os.h:250
Implementation class that adds a device to a CAN hub with dynamic translation of the packets to/from ...
std::unique_ptr< GCAdapterBase > bridge_
Translates packets between the can-hub of the device and the char-hub of this port.
void notify() OVERRIDE
Callback in case the connection is closed due to error.
GcHubPort(CanHubFlow *can_hub, int fd, Notifiable *on_exit, bool use_select)
Constructor.
void run() OVERRIDE
Entry point.
std::unique_ptr< FdHubPortInterface > gcWrite_
Reads the characters from the char-hub and sends them to the fd.
HubFlow gcHub_
This hub sees the character-based representation of the packets.
Notifiable * onExit_
If not null, this notifiable will be called when the device is closed.
Implementation for the gridconnect bridge.
bool timestamped_
Whether we are printing timestamps of the packets.
void send(Buffer< CanHubData > *message, unsigned priority) OVERRIDE
Overridden entry method to send binary data to this hub.
Impl(CanHubFlow *can_hub, bool timestamped)
Constructor.
CanHubFlow * canHub_
Which hun are we registered to.