Open Model Railroad Network (OpenMRN)
Loading...
Searching...
No Matches
HubDeviceSelect.hxx
1
34#ifndef _UTILS_HUBDEVICESELECT_HXX_
35#define _UTILS_HUBDEVICESELECT_HXX_
36
37#include "openmrn_features.h"
38
39// Only compile HubDeviceSelect if the OS has implementation for
40// Executor::select.
41#ifdef OPENMRN_FEATURE_EXECUTOR_SELECT
42
43#include <unistd.h>
44#include <stdio.h>
45#include <fcntl.h>
46
48#include "utils/Hub.hxx"
49#include "utils/LimitedPool.hxx"
50
54template <class BType> struct SelectBufferInfo
55{
56};
57
59template <> struct SelectBufferInfo<HubFlow::buffer_type>
60{
63 {
64 b->data()->resize(64);
65 }
69 static void check_target_size(HubFlow::buffer_type *b, int remaining)
70 {
71 HASSERT(remaining >= 0);
72 HASSERT(remaining <= 64);
73 b->data()->resize(64 - remaining);
74 }
76 static bool needs_read_fully()
77 {
78 return false;
79 }
80
82 static bool limit_input()
83 {
84 return true;
85 }
86};
87
89template <class T>
91{
94
96 static void resize_target(buffer_type *b)
97 {
98 }
102 static void check_target_size(buffer_type *b, int remaining)
103 {
104 HASSERT(remaining == 0);
105 }
108 static bool needs_read_fully()
109 {
110 return true;
111 }
112
114 static bool limit_input()
115 {
116 return true;
117 }
118};
119
124template <>
128
131 {
132 }
136 static void check_target_size(buffer_type *b, int remaining)
137 {
138 HASSERT(remaining == 0);
139 }
142 static bool needs_read_fully()
143 {
144 return true;
145 }
146
148 static bool limit_input()
149 {
150 // We should never throttle a CAN-bus reader.
151 return false;
152 }
153};
154
156int hubdevice_incoming_packet_limit();
157
159template <class HFlow> class HubDeviceSelectReadFlow : public StateFlowBase
160{
161public:
163 typedef typename HFlow::buffer_type buffer_type;
164
169 typename HFlow::port_type *dst, typename HFlow::port_type *skip_member)
172 , b_(nullptr)
173 , dst_(dst)
174 , skipMember_(skip_member)
175 {
176 set_limit_input(shouldThrottle_);
178 }
179
180 void set_limit_input(bool should_throttle)
181 {
182 shouldThrottle_ = should_throttle;
183 int limit = hubdevice_incoming_packet_limit();
184 if (shouldThrottle_ && limit < 1000000 && !inputPool_)
185 {
186 inputPool_.reset(new LimitedPool(sizeof(buffer_type), limit));
187 }
188 }
189
192 void shutdown()
193 {
194 auto *e = this->service()->executor();
195 if (e->is_selected(&selectHelper_))
196 {
198 }
201 }
202
205 {
206 return static_cast<FdHubPortService *>(this->service());
207 }
208
211 {
212 // If inputPool is nullptr, this will use the destination flow's pool,
213 // which is typically mainBufferPool.
214 return this->allocate_and_call(dst_, STATE(try_read), inputPool_.get());
215 }
216
220 {
222 b_->data()->skipMember_ = skipMember_;
225 {
226 return this->read_repeated(&selectHelper_, device()->fd(),
227 (void *)b_->data()->data(), b_->data()->size(),
228 STATE(read_done), 0);
229 }
230 else
231 {
232 return this->read_single(&selectHelper_, device()->fd(),
233 (void *)b_->data()->data(), b_->data()->size(),
234 STATE(read_done), 0);
235 }
236 }
237
241 {
243 {
245 b_->unref();
249 return exit();
250 }
253 dst_->send(b_, 0);
254 b_ = nullptr;
256 }
257
258private:
262 {
263 if (barrierOwned_)
264 {
265 barrierOwned_ = false;
267 }
268 }
269
271 bool barrierOwned_{true};
279 std::unique_ptr<LimitedPool> inputPool_;
281 typename HFlow::port_type *dst_;
283 typename HFlow::port_type *skipMember_;
284};
285
297template <class HFlow, class ReadFlow = HubDeviceSelectReadFlow<HFlow>>
299{
300public:
301#ifndef __WINNT__
304 HFlow *hub, const char *path, Notifiable *on_error = nullptr)
306 hub->service()->executor(), ::open(path, O_RDWR | O_NONBLOCK))
307 , hub_(hub)
308 , readFlow_(this, hub, &writeFlow_)
309 , writeFlow_(this)
310 {
311 HASSERT(fd_ >= 0);
313 on_error ? on_error : EmptyNotifiable::DefaultInstance());
315 hub_->register_port(write_port());
316 isRegistered_ = true;
317 }
318#endif
319
327 HubDeviceSelect(HFlow *hub, int fd, Notifiable *on_error = nullptr)
328 : FdHubPortService(hub->service()->executor(), fd)
329 , hub_(hub)
330 , readFlow_(this, hub, &writeFlow_)
331 , writeFlow_(this)
332 {
333 HASSERT(fd_ >= 0);
335 on_error ? on_error : EmptyNotifiable::DefaultInstance());
337#ifdef __WINNT__
338 unsigned long par = 1;
339 ioctlsocket(fd_, FIONBIO, &par);
340#else
341 ::fcntl(fd, F_SETFL, O_RDWR | O_NONBLOCK);
342#endif
343 hub_->register_port(write_port());
344 isRegistered_ = true;
345 }
346
349 {
350 if (fd_ >= 0) {
352 close_fd();
353 executor()->sync_run([this]() {
354 readFlow_.shutdown();
356 });
357 }
358 bool completed = false;
359 while (!completed)
360 {
361 executor()->sync_run([this, &completed]() {
362 if (barrier_.is_done())
363 {
364 completed = true;
365 }
366 });
367 }
368 }
369
371 HFlow *hub()
372 {
373 return hub_;
374 }
375
377 typename HFlow::port_type *write_port()
378 {
379 return &writeFlow_;
380 }
381
384 {
385 LOG(VERBOSE, "HubDeviceSelect::unregister write port %p %p",
387 {
388 AtomicHolder h(this);
389 if (!isRegistered_)
390 {
391 return;
392 }
393 isRegistered_ = false;
394 }
395 hub_->unregister_port(&writeFlow_);
396 /* We put an empty message at the end of the queue. This will cause
397 * wait until all pending messages are dealt with, and then ping the
398 * barrier notifiable, commencing the shutdown. */
399 auto *b = writeFlow_.alloc();
400 b->set_done(&barrier_);
401 writeFlow_.send(b);
402 }
403
407 {
408 return writeFlow_.is_waiting();
409 }
410
411protected:
416 {
417 public:
420 : WriteFlowBase(dev)
421 {
422 }
423
426 {
427 HASSERT(this->is_waiting());
428 }
429
431 void shutdown()
432 {
433 // The fd must be set to negative already to ensure the shutdown
434 // completes successfully.
435 HASSERT(device()->fd() < 0);
436 auto* e = this->service()->executor();
437 if (!selectHelper_.is_empty() && e->is_selected(&selectHelper_)) {
439 // will make the internal_try_write exit immediately
441 // actually wake up the flow
442 this->notify();
443 }
444 }
445
448 {
449 return static_cast<HubDeviceSelect *>(this->service());
450 }
451
453 {
454 if (device()->fd() < 0) {
455 return this->release_and_exit();
456 }
457 return this->write_repeated(&selectHelper_, device()->fd(),
458 this->message()->data()->data(),
459 this->message()->data()->size(), STATE(write_done),
460 this->priority());
461 }
462
465 {
468 }
469 return this->release_and_exit();
470 }
471
472 private:
475 };
476
477protected:
478
481 void report_write_error() override
482 {
483 readFlow_.shutdown();
485 close_fd();
486 }
487
491 void report_read_error() override
492 {
494 close_fd();
495 }
496
497 void close_fd()
498 {
499 int fd = -1;
500 {
501 AtomicHolder h(this);
502 fd = fd_;
503 if (fd < 0)
504 {
505 return;
506 }
507 fd_ = -1;
508 }
509 // This is a workaround that sometimes my linux kernel gets stuck in
510 // ::read when I closed the fd like this, even though the fd is
511 // O_NONBLOCK.
512 executor()->add(new CallbackExecutable([this, fd]() {
513 ::close(fd);
514 readFlow_.shutdown();
516 }));
517 }
518
520 HFlow *hub_;
522 ReadFlow readFlow_;
529};
530
531#endif // FEATURE_EXECUTOR_SELECT
532
533
534#endif // _UTILS_HUBDEVICESELECT_HXX_
int fcntl(int fd, int cmd,...)
Manipulate a file descriptor.
Definition Fileio.cxx:494
#define STATE(_fn)
Turns a function name into an argument to be supplied to functions expecting a state.
Definition StateFlow.hxx:61
See OSMutexLock in os/OS.hxx.
Definition Atomic.hxx:153
Lightweight locking class for protecting small critical sections.
Definition Atomic.hxx:130
void notify() override
Implementation of the barrier semantics.
BarrierNotifiable * reset(Notifiable *done)
Resets the barrier. Returns &*this. Asserts that is_done().
BarrierNotifiable * new_child()
Call this for each child task.
Base class for all QMember types that hold data in an expandable format.
Definition Buffer.hxx:195
T * data()
get a pointer to the start of the data.
Definition Buffer.hxx:215
A notifiable class that calls a particular function object once when it is invoked,...
static Notifiable * DefaultInstance()
void sync_run(std::function< void()> fn)
Synchronously runs a closure on this executor.
Definition Executor.cxx:151
virtual void add(Executable *action, unsigned priority=UINT_MAX)=0
Send a message to this Executor's queue.
void unselect(Selectable *job)
Removes a job from the select loop.
Definition Executor.cxx:359
int fd_
The device file descriptor.
Definition Hub.hxx:233
Shared base class for thread-based and select-based hub devices.
Definition Hub.hxx:243
virtual void report_read_error()=0
Callback from the readflow when it encounters an error.
BarrierNotifiable barrier_
This notifiable will be called (if not NULL) upon read or write error.
Definition Hub.hxx:266
MessageType * alloc()
Synchronously allocates a message buffer from the pool of this flow.
Data type wrapper for sending data through a Hub.
Definition Hub.hxx:101
State flow implementing select-aware fd reads.
FdHubPortService * device()
buffer_type * b_
Buffer that we are currently filling.
HFlow::port_type * skipMember_
What should be the source port designation.
bool shouldThrottle_
True if we are throttling via a limited pool.
void shutdown()
Unregisters the current flow from the hub.
HubDeviceSelectReadFlow(FdHubPortService *device, typename HFlow::port_type *dst, typename HFlow::port_type *skip_member)
Constructor.
void notify_barrier()
Calls into the parent flow's barrier notify, but makes sure to only do this once in the lifetime of *...
std::unique_ptr< LimitedPool > inputPool_
Throttling input helper.
HFlow::buffer_type buffer_type
Buffer type.
bool barrierOwned_
true iff pending parent->barrier_.notify()
HFlow::port_type * dst_
Where do we forward the messages we created.
Action read_done()
Called when the stateflow read call(s) are completed.
StateFlowSelectHelper selectHelper_
Helper object for read/write FD asynchronously.
Action allocate_buffer()
Allocates a new buffer for incoming data.
Action try_read()
Attempts to read into the current buffer from the target fd.
State flow implementing select-aware fd writes.
StateFlowBase::StateFlowSelectHelper selectHelper_
Helper class for asynchronous writes.
WriteFlow(HubDeviceSelect *dev)
Constructor.
StateFlowBase::Action entry() OVERRIDE
Entry into the StateFlow activity.
StateFlowBase::Action write_done()
State flow call.
void shutdown()
Unregisters this object from the flows.
HubPort that connects a select-aware device to a strongly typed Hub.
HubDeviceSelect(HFlow *hub, int fd, Notifiable *on_error=nullptr)
Creates a select-aware hub port for the opened device specified by ‘fd’.
HubDeviceSelect(HFlow *hub, const char *path, Notifiable *on_error=nullptr)
Creates a select-aware hub port for the device specified by ‘path’.
ReadFlow readFlow_
StateFlow for reading data from the fd. Woken when data arrives.
void report_write_error() override
The assumption here is that the write flow still has entries in its queue that need to be removed.
HFlow::port_type * write_port()
void unregister_write_port()
Removes the current write port from the registry of the source hub.
StateFlow< typename HFlow::buffer_type, QList< 1 > > WriteFlowBase
Base stateflow for the WriteFlow.
bool isRegistered_
True when the write flow is registered in the hub.
WriteFlow writeFlow_
StateFlow for writing data to the fd.
virtual ~HubDeviceSelect()
If the barrier has not been called yet, will notify it inline.
HFlow * hub_
Hub whose data we are trying to send.
void report_read_error() override
Callback from the ReadFlow when the read call has seen an error.
Implementation of a Pool interface that takes memory from mainBufferPool (configurable) but limits th...
An object that can schedule itself on an executor to run.
bool is_empty()
ExecutorBase * executor()
Return type for a state flow callback.
Base class for state machines.
Action read_repeated(StateFlowSelectHelper *helper, int fd, void *buf, size_t size, Callback c, unsigned priority=Selectable::MAX_PRIO)
Blocks until size bytes are read and then invokes the next state.
Action read_single(StateFlowSelectHelper *helper, int fd, void *buf, size_t size, Callback c, unsigned priority=Selectable::MAX_PRIO)
Attempts to read at most size_t bytes, and blocks the caller until at least one byte is read.
Service * service()
Return a pointer to the service I am bound to.
Action allocate_and_call(FlowInterface< Buffer< T > > *target_flow, Callback c, Pool *pool=nullptr)
Allocates a buffer from a pool and proceed to the next state when allocation is successful.
Action exit()
Terminate current StateFlow activity.
void start_flow(Callback c)
Resets the flow to the specified state and starts it.
Buffer< T > * get_allocation_result(FlowInterface< Buffer< T > > *target_flow)
Takes the result of the asynchronous allocation.
Action call_immediately(Callback c)
Imediately call the next state upon return.
Action set_terminated()
Sets the flow to terminated state.
Action write_repeated(StateFlowSelectHelper *helper, int fd, const void *buf, size_t size, Callback c, unsigned priority=Selectable::MAX_PRIO)
Writes some data into a file descriptor, repeating the operation as necessary until all bytes are wri...
void notify() override
Wakeup call arrived. Schedules *this on the executor.
Action release_and_exit()
Terminates the processing of the current message.
State flow with a given typed input queue.
Container for an arbitrary structure to pass through a Hub.
Definition Hub.hxx:47
void send(MessageType *msg, unsigned priority=UINT_MAX) OVERRIDE
Sends a message to the state flow for processing.
#define LOG(level, message...)
Conditionally write a message to the logging output.
Definition logging.h:99
static const int VERBOSE
Loglevel that is usually not printed, reporting debugging information.
Definition logging.h:59
#define OVERRIDE
Function attribute for virtual functions declaring that this funciton is overriding a funciton that s...
Definition macros.h:180
#define HASSERT(x)
Checks that the value of expression x is true, else terminates the current process.
Definition macros.h:138
static void resize_target(buffer_type *b)
CAN buffers do not need to be resized.
Buffer< CanHubData > buffer_type
Helper type for declaring the payload buffer type.
static void check_target_size(buffer_type *b, int remaining)
a CAN buffer is only okay if the entire buffer was read in in one go.
static void resize_target(buffer_type *b)
struct buffers do not need to be resized.
Buffer< HubContainer< StructContainer< T > > > buffer_type
Helper type for declaring the payload buffer type.
static void check_target_size(buffer_type *b, int remaining)
a struct buffer is only okay if the entire buffer was read in in one go.
static void resize_target(HubFlow::buffer_type *b)
Preps a buffer for receiving data.
static void check_target_size(HubFlow::buffer_type *b, int remaining)
Clears out all potential empty space left after a buffer has been partially filled.
Generic template for the buffer traits.
Use this class to read from an fd using select() in a state flow.
unsigned remaining_
Number of bytes still outstanding to read.
unsigned hasError_
1 if there was an error reading of writing.