Open Model Railroad Network (OpenMRN)
Loading...
Searching...
No Matches
HubDeviceNonBlock.hxx
1
34#ifndef _UTILS_HUBDEVICENONBLOCK_HXX_
35#define _UTILS_HUBDEVICENONBLOCK_HXX_
36
37#include "openmrn_features.h"
38#ifdef OPENMRN_FEATURE_FD_CAN_DEVICE
39
40#include <unistd.h>
41#include <stdio.h>
42#include <fcntl.h>
43
45#ifdef __FreeRTOS__
46#include "freertos/can_ioctl.h"
47#else
48#include "can_ioctl.h"
49#endif
50#include "utils/Hub.hxx"
51
52#ifdef __FreeRTOS__
53extern int ioctl(int fd, unsigned long int key, ...);
54#endif // __FreeRTOS__
55
56template <class HFlow> class HubDeviceNonBlock : public Destructable, private Atomic, public Service
57{
58public:
59 HubDeviceNonBlock(HFlow *hub, const char *path)
60 : Service(hub->service()->executor())
61 , fd_(::open(path, O_RDWR | O_NONBLOCK))
62 , hub_(hub)
63 , readFlow_(this)
64 , writeFlow_(this)
65 {
66 HASSERT(fd_ >= 0);
67 hub_->register_port(write_port());
68 }
69
70 virtual ~HubDeviceNonBlock()
71 {
72 hub_->unregister_port(write_port());
73 }
74
75 HFlow *hub()
76 {
77 return hub_;
78 }
79
80 typename HFlow::port_type *write_port()
81 {
82 return &writeFlow_;
83 }
84
85 int fd()
86 {
87 return fd_;
88 }
89
90protected:
91 class ReadFlow : public StateFlowBase
92 {
93 public:
94 ReadFlow(HubDeviceNonBlock *device)
95 : StateFlowBase(device)
96 , b_(nullptr)
97 {
98 this->start_flow(STATE(allocate_buffer));
99 }
100
101 HubDeviceNonBlock *device()
102 {
103 return static_cast<HubDeviceNonBlock *>(this->service());
104 }
105
106 void notify() OVERRIDE
107 {
108 service()->executor()->add(this, 0);
109 }
110
111 void notify_from_isr() OVERRIDE
112 {
113 // We override to priority zero.
114 service()->executor()->add_from_isr(this, 0);
115 }
116
117 Action allocate_buffer()
118 {
119 return this->allocate_and_call(device()->hub(), STATE(try_read));
120 }
121
122 Action try_read()
123 {
124 b_ = this->get_allocation_result(device()->hub());
125 b_->data()->skipMember_ = device()->write_port();
126 return this->call_immediately(STATE(retry_read));
127 }
128
129 Action retry_read()
130 {
131 HASSERT(b_);
132 int ret =
133 ::read(device()->fd(), b_->data()->data(), b_->data()->size());
134 if (ret <= 0)
135 {
136 // We are blocked. There is no race condition here, because the
137 // contract of the ioctl is that if there is any data in the
138 // queuy, they will immediately call us.
139 HASSERT(::ioctl(device()->fd(), CAN_IOC_READ_ACTIVE, this) == 0);
140 return this->wait();
141 }
142 else
143 {
144 HASSERT((unsigned)ret == b_->data()->size());
145 device()->hub()->send(b_, 0);
146 b_ = nullptr;
147 return this->call_immediately(STATE(allocate_buffer));
148 }
149 }
150
151 private:
152 typename HFlow::buffer_type *b_;
153 };
154
156 class WriteFlow : public WriteFlowBase
157 {
158 public:
159 WriteFlow(HubDeviceNonBlock *dev)
160 : WriteFlowBase(dev)
161 {
162 }
163
164 HubDeviceNonBlock *device()
165 {
166 return static_cast<HubDeviceNonBlock *>(this->service());
167 }
168
169 void notify() OVERRIDE
170 {
171 this->service()->executor()->add(this, 0);
172 }
173
174 void notify_from_isr() OVERRIDE
175 {
176 this->service()->executor()->add_from_isr(this, 0);
177 }
178
180 {
181 bufferPos_ = static_cast<uint8_t*>(this->message()->data()->data());
182 len_ = this->message()->data()->size();
183 return this->call_immediately(STATE(try_write));
184 }
185
186 StateFlowBase::Action try_write()
187 {
188 ssize_t ret = ::write(device()->fd(), bufferPos_, len_);
189 if (!ret)
190 {
191 // We are blocked. There is no race condition here, because the
192 // contract of the ioctl is that if there is any data in the
193 // queuy, they will immediately call us.
194 HASSERT(::ioctl(device()->fd(), CAN_IOC_WRITE_ACTIVE, this) == 0);
195 return this->wait();
196 }
197 else if (ret > 0)
198 {
199 len_ -= ret;
200 bufferPos_ += ret;
201 if (!len_)
202 {
203 return this->release_and_exit();
204 }
205 else
206 {
207 return this->again();
208 }
209 }
210 else
211 {
212 // error!
213 DIE("Error writing.");
214 }
215 }
216
217 private:
218 uint8_t *bufferPos_;
219 ssize_t len_;
220 };
221
222protected:
224 int fd_;
225 HFlow *hub_;
226 ReadFlow readFlow_;
227 WriteFlow writeFlow_;
228};
229
230#endif // OPENMRN_FEATURE_FD_CAN_DEVICE
231#endif // _UTILS_HUBDEVICENONBLOCK_HXX_
int ioctl(int fd, unsigned long int key,...)
Request and ioctl transaction.
Definition Fileio.cxx:452
#define STATE(_fn)
Turns a function name into an argument to be supplied to functions expecting a state.
Definition StateFlow.hxx:61
Lightweight locking class for protecting small critical sections.
Definition Atomic.hxx:130
Base class of everything with a virtual destructor.
Collection of related state machines that pend on incoming messages.
Return type for a state flow callback.
Base class for state machines.
State flow with a given typed input queue.
#define CAN_IOC_READ_ACTIVE
read active ioctl.
#define CAN_IOC_WRITE_ACTIVE
write active ioctl.
#define OVERRIDE
Function attribute for virtual functions declaring that this funciton is overriding a funciton that s...
Definition macros.h:180
#define HASSERT(x)
Checks that the value of expression x is true, else terminates the current process.
Definition macros.h:138
#define DIE(MSG)
Unconditionally terminates the current process with a message.
Definition macros.h:143