Open Model Railroad Network (OpenMRN)
Loading...
Searching...
No Matches
HubDevice.hxx
Go to the documentation of this file.
1
34#ifndef _UTILS_HUBDEVICE_HXX_
35#define _UTILS_HUBDEVICE_HXX_
36
37#include <unistd.h>
38
39#include "openmrn_features.h"
40#include "utils/Hub.hxx"
42
43template <class Data> class FdHubWriteFlow;
44
49{
50public:
51#ifdef ESP_PLATFORM
52 // TODO: shrink these if possible
54 static const int kWriteThreadStackSize = 2048;
56 static const int kReadThreadStackSize = 2048;
57#else
59 static const int kWriteThreadStackSize = 1000;
61 static const int kReadThreadStackSize = 1000;
62#endif // ESP_PLATFORM
63
78
79 virtual ~FdHubPortBase()
80 {
81 }
82
90 static void fill_thread_name(char *buf, char mode, int fd);
91
92protected:
98 const char *fill_thread_name(char mode, int fd) {
100 return threadName_;
101 }
102
106 virtual void unregister_write_port() = 0;
107
111 {
112 {
113 AtomicHolder h(this);
114 if (hasError_)
115 {
116 return;
117 }
118 else
119 {
120 hasError_ = 1;
121 }
122 }
123 ::close(fd_);
125 }
126
129 {
130 public:
133 {
134 }
135
137 virtual int unit() = 0;
140 virtual int buf_size() = 0;
142 virtual void send_message(const void *buf, int size) = 0;
144 {
146 uint8_t buf[buf_size()];
147 while (true)
148 {
149 int done = 0;
150 while (done < unit())
151 {
152 {
154 if (port_->hasError_)
155 {
156 return NULL;
157 }
158 }
159 ssize_t ret =
160 ::read(port_->fd_, &buf[done], buf_size() - done);
161 if (ret > 0)
162 {
163 done += ret;
164 continue;
165 }
166 if ((ret < 0) && (errno == EINTR || errno == EAGAIN))
167 {
168 continue;
169 }
170// Now: we have an error.
171#if OPENMRN_FEATURE_BSD_SOCKETS_REPORT_EOF_ERROR
172 if (!ret)
173 {
174 LOG_ERROR("EOF reading fd %d", port_->fd_);
175 }
176 else
177 {
178 LOG_ERROR("Error reading fd %d: (%d) %s", port_->fd_,
179 errno, strerror(errno));
180 }
181#endif // OPENMRN_FEATURE_BSD_SOCKETS_REPORT_EOF_ERROR
183 return NULL;
184 }
185 send_message(buf, done);
186 }
187 }
188
189 protected:
192 };
193
194protected:
195 template <class Data> friend class FdHubWriteFlow;
196
198 char threadName_[30];
207 unsigned hasError_ : 1;
210 unsigned writeExitEnqueued_ : 1;
211};
212
216template <class Data>
217class FdHubWriteFlow : public StateFlow<Buffer<Data>, QList<1>>
218{
219public:
222 : StateFlow<Buffer<Data>, QList<1>>(&parent->writeService_)
223 , port_(parent)
224 {
225 }
226
229 {
230 const uint8_t *buf =
231 reinterpret_cast<const uint8_t *>(this->message()->data()->data());
232 size_t size = this->message()->data()->size();
233 while (size > 0)
234 {
235 {
237 if (port_->hasError_)
238 {
239 if (port_->writeExitEnqueued_ && this->queue_empty())
240 {
241 StateFlowBase::Action a = this->set_terminated();
242 this->release();
243 return a;
244 }
245 return this->release_and_exit();
246 }
247 }
248 ssize_t ret = ::write(port_->fd_, buf, size);
249 if (ret > 0)
250 {
251 size -= ret;
252 buf += ret;
253 continue;
254 }
255// now: we have an error.
256#if OPENMRN_FEATURE_BSD_SOCKETS_REPORT_EOF_ERROR
257 if (!ret)
258 {
259 LOG_ERROR("EOF writing fd %d", port_->fd_);
260 }
261 else
262 {
263 LOG_ERROR("Error writing fd %d: (%d) %s", port_->fd_, errno,
264 strerror(errno));
265 }
266#endif // OPENMRN_FEATURE_BSD_SOCKETS_REPORT_EOF_ERROR
268 break;
269 }
270 return this->release_and_exit();
271 }
272
275};
276
287template <class HFlow> class FdHubPort : public FdHubPortBase
288{
289public:
296 FdHubPort(HFlow *hub, int fd, Notifiable *done)
297 : FdHubPortBase(fd, done)
298 , hub_(hub)
299 , writeFlow_(this)
300 , readThread_(this)
301 {
302 hub_->register_port(&writeFlow_);
303 }
304
306 {
308 }
309
311 {
312 hub_->unregister_port(&writeFlow_);
313 /* We put an empty message at the end of the queue. This will cause
314 * wait until all pending messages are dealt with, and then ping the
315 * barrier notifiable, commencing the shutdown. */
316 auto *b = writeFlow_.alloc();
317 b->set_done(&barrier_);
318 writeFlow_.send(b);
319 }
320
323 {
324 public:
327 {
328 init();
329 start(port->fill_thread_name('R', port->fd_), 0,
330 port->kReadThreadStackSize);
331 }
332
333 ~ReadThread() {
334 delete semaphores_;
335 }
336
339 {
340 return static_cast<FdHubPort<HFlow> *>(port_);
341 }
342
345 {
346 return kUnit;
347 }
351 {
352 return kBufSize;
353 }
355 void send_message(const void *buf, int size) OVERRIDE;
356
357 private:
359 void init();
362
364 static const int kUnit;
367 static const int kBufSize;
368 };
369
370private:
371 friend class ReadThread;
372
374 HFlow *hub_;
379};
380
381#endif // _UTILS_HUBDEVICE_HXX_
See OSMutexLock in os/OS.hxx.
Definition Atomic.hxx:153
Lightweight locking class for protecting small critical sections.
Definition Atomic.hxx:130
This class sends a notification in its destructor.
A BarrierNotifiable allows to create a number of child Notifiable and wait for all of them to finish.
BarrierNotifiable * new_child()
Call this for each child task.
Base class for all QMember types that hold data in an expandable format.
Definition Buffer.hxx:195
void shutdown()
Terminates the executor thread.
Definition Executor.cxx:437
Implementation the ExecutorBase with a specific number of priority bands.
Definition Executor.hxx:266
Read thread implementation with template-inspecific methods.
void * entry() OVERRIDE
User entry point for the created thread.
ReadThreadBase(FdHubPortBase *port)
Constructor.
virtual void send_message(const void *buf, int size)=0
Sends off a buffer.
FdHubPortBase * port_
Parent port.
Template-nonspecific base class for FdHubPort.
Definition HubDevice.hxx:49
static void fill_thread_name(char *buf, char mode, int fd)
Puts the desired thread name for the read or write thread.
Definition HubDevice.cxx:96
unsigned hasError_
If this is 1, the fd has been closed.
void report_error()
Call when an IO error is encountered.
static const int kReadThreadStackSize
How many bytes of stack should we allocate to the read thread's stack.
Definition HubDevice.hxx:61
FdHubPortBase(int fd, Notifiable *done)
Constructor.
Definition HubDevice.hxx:68
char threadName_[30]
Temporary buffer used for rendering thread names.
Service writeService_
Service for the write flow.
static const int kWriteThreadStackSize
How many bytes of stack should we allocate to the write thread's stack.
Definition HubDevice.hxx:59
Executor< 1 > writeThread_
This executor is running the writes.
BarrierNotifiable barrier_
This barrier will be notified when both read and write thread has exited.
const char * fill_thread_name(char mode, int fd)
Puts the desired thread name for the read or write thread.
Definition HubDevice.hxx:98
virtual void unregister_write_port()=0
Removes the write flow from the hub's registration.
unsigned writeExitEnqueued_
If this is 1, we have already enqueued the request to exit the write flow.
Shared base class for thread-based and select-based hub devices.
Definition Hub.hxx:219
int fd_
The device file descriptor.
Definition Hub.hxx:233
Thread performing the read operations on the device.
int buf_size() OVERRIDE
int unit() OVERRIDE
void init()
Initializes the semaphore notifiables.
static const int kUnit
This is the minimum number of bytes that we will send.
SemaphoreNotifiableBlock * semaphores_
If non-null, one slot will be acquired for each incoming message.
void send_message(const void *buf, int size) OVERRIDE
Sends off a buffer.
ReadThread(FdHubPort< HFlow > *port)
Constructor.
static const int kBufSize
We will allocate this many bytes for read buffer.
FdHubPort< HFlow > * port()
HubPort that connects a raw device to a strongly typed Hub.
FdHubPort(HFlow *hub, int fd, Notifiable *done)
Constructor.
HFlow * hub_
Parent hub to send the data to / read the data from.
FdHubWriteFlow< typename HFlow::value_type > writeFlow_
StateFlow that is performing the actual writes.
void unregister_write_port() OVERRIDE
Removes the write flow from the hub's registration.
ReadThread readThread_
An OSThread child that is performing the reads.
State flow for writing data to an fd.
FdHubPortBase * port_
The owning port.
StateFlowBase::Action entry() OVERRIDE
Handles the next incoming entry.
FdHubWriteFlow(FdHubPortBase *parent)
Constructor.
MessageType * alloc()
Synchronously allocates a message buffer from the pool of this flow.
An object that can schedule itself on an executor to run.
This class provides a threading API.
Definition OS.hxx:46
void start(const char *name, int priority, size_t stack_size)
Starts the thread.
Definition OS.hxx:78
A list of queues.
Definition Queue.hxx:466
A block of BarrierNotifiable objects, with a synchronous allocation call.
Collection of related state machines that pend on incoming messages.
Return type for a state flow callback.
State flow with a given typed input queue.
void release() OVERRIDE
Unrefs the current buffer.
void send(MessageType *msg, unsigned priority=UINT_MAX) OVERRIDE
Sends a message to the state flow for processing.
MessageType * message()
#define LOG_ERROR(message...)
Shorthand for LOG(LEVEL_ERROR, message...). See LOG.
Definition logging.h:124
#define OVERRIDE
Function attribute for virtual functions declaring that this funciton is overriding a funciton that s...
Definition macros.h:180