Open Model Railroad Network (OpenMRN)
Loading...
Searching...
No Matches
BLEHubPort.hxx
Go to the documentation of this file.
1
35#ifndef _OPENLCB_BLEHUBPORT_HXX_
36#define _OPENLCB_BLEHUBPORT_HXX_
37
38#include <functional>
39
41#include "openlcb/BLEDefs.hxx"
42#include "os/OS.hxx"
43#include "utils/DirectHub.hxx"
44
45extern DataBufferPool g_direct_hub_data_pool;
46
47namespace openlcb
48{
49
50class BLEHubPort : public DirectHubPort<uint8_t[]>,
51 private StateFlowBase,
53{
54public:
63 using SendFunction = std::function<void(const uint8_t *data, size_t len)>;
64
67 static constexpr size_t MAX_BYTES_PER_WRITE = 220;
68
81 std::unique_ptr<MessageSegmenter> segmenter, Service *ble_write_service,
82 SendFunction send_function, Notifiable *on_error = nullptr)
83 : StateFlowBase(ble_write_service)
84 , pendingShutdown_(false)
85 , waitingForAck_(false)
86 , sendFunction_(std::move(send_function))
87 , onError_(on_error)
88 , input_(this, hub, std::move(segmenter))
89 {
90 // Sets the initial state of the write flow to the stage where we read
91 // the next entry from the queue.
93 notRunning_ = true;
94
95 hub->register_port(this);
96 }
97
98 void disconnect_and_delete() override
99 {
100 {
101 AtomicHolder l(lock());
103 input_.hub_->unregister_port(this);
104 // After this, the next notify will eventually reach the shutdown
105 // and delete state.
106 pendingShutdown_ = true;
107 if (notRunning_)
108 {
109 notRunning_ = 0;
110 notify();
111 }
112 }
113 // Synchronization point that ensures that there is no currently
114 // running Executable on this executor. This ensures that is no
115 // currently pending nor will there be any future invocations of
116 // sendFunction_ after this function returns.
117 service()->executor()->sync_run([]() {});
118 }
119
120 // ===== API from the BLE stack connection =====
121
123 void ack()
124 {
125 ack_helper();
126 LOG(ALWAYS, "BLE ack, pend %d", sendPending_);
127 }
128
130 void nack()
131 {
132 ack_helper();
133 LOG(ALWAYS, "BLE nack, pend %d", sendPending_);
134 }
135
143 void input_data(const uint8_t *data, size_t len)
144 {
145 input_.input_data(data, len);
146 }
147
149 void send(MessageAccessor<uint8_t[]> *msg) override
150 {
152 {
153 // Port already closed. Ignore data to send.
154 return;
155 }
156 {
157 AtomicHolder h(lock());
158 if (pendingTail_ && pendingTail_->buf_.try_append_from(msg->buf_))
159 {
160 totalPendingSize_ += msg->buf_.size();
161 // Successfully enqueued the bytes into the tail of the queue.
162 // Nothing else to do here.
163 return;
164 }
165 }
166
169 BufferType *b;
171 b->data()->buf_.reset(msg->buf_);
172 if (msg->done_)
173 {
174 b->set_done(msg->done_->new_child());
175 }
176 // Checks if we need to wake up the flow.
177 {
178 AtomicHolder h(lock());
180 {
181 // Catch race condition when port is already closed.
182 b->unref();
183 return;
184 }
186 totalPendingSize_ += msg->buf_.size();
187 pendingTail_ = b->data();
188 if (notRunning_)
189 {
190 notRunning_ = 0;
191 // When we have exactly one buffer at hand, we release the done
192 // notify of it to allow the upstream stack to generate more
193 // data quicker.
194 b->set_done(nullptr);
195 }
196 else
197 {
198 // flow already running. Skip notify.
199 return;
200 }
201 }
202 notify();
203 }
204
208 {
210 {
212 }
213 BufferType *head;
214 {
215 AtomicHolder h(lock());
216 head = static_cast<BufferType *>(pendingQueue_.next_locked().item);
217 if (!head)
218 {
219 notRunning_ = true;
220 return wait();
221 }
222 HASSERT(head);
223 if (head->data() == pendingTail_)
224 {
225 pendingTail_ = nullptr;
226 }
227 totalPendingSize_ -= head->data()->buf_.size();
228 }
229
230 currentHead_.reset(head);
231 return call_immediately(STATE(do_write));
232 }
233
234 Action do_write()
235 {
237 {
239 }
240
241 const uint8_t *read_ptr;
242 size_t num_bytes;
243 auto &b = currentHead_->data()->buf_;
244 read_ptr = b.data_read_pointer(&num_bytes);
245 if (num_bytes > MAX_BYTES_PER_WRITE)
246 {
247 num_bytes = MAX_BYTES_PER_WRITE;
248 }
249 {
250 AtomicHolder h(lock());
251 ++sendPending_;
252 }
253 LOG(INFO, "BLE send %d bytes pendcount %d queuesize %d/%d", (int)num_bytes, sendPending_, (int)totalPendingSize_, pendingQueue_.pending());
254 sendFunction_(read_ptr, num_bytes);
255 b.data_read_advance(num_bytes);
256 if (b.size())
257 {
258 return yield();
259 }
260 else
261 {
262 currentHead_.reset();
263 AtomicHolder h(lock());
265 {
267 }
268 if (sendPending_ > 1)
269 {
270 waitingForAck_ = 1;
272 }
273 else if (pendingQueue_.empty())
274 {
275 // go back to sleep
276 notRunning_ = 1;
278 }
279 else
280 {
282 }
283 }
284 }
285
289 {
290 // Synchronization with other threads that might have written
291 // pendingShutdown == true.
292 {
293 AtomicHolder h(lock());
294 }
295
296 // Releases all buffers.
297 currentHead_.reset();
298 while (auto *h = static_cast<BufferType *>(pendingQueue_.next().item))
299 {
300 h->unref();
301 }
302 return delete_this();
303 }
304
305protected:
306 void ack_helper()
307 {
308 {
309 AtomicHolder h(lock());
310 --sendPending_;
311 if (waitingForAck_ && sendPending_ <= 1)
312 {
313 waitingForAck_ = false;
314 notify();
315 }
316 }
317 }
318
320 class InputFlow : public StateFlowBase, private Atomic
321 {
322 public:
323 InputFlow(BLEHubPort* parent, DirectHubInterface<uint8_t[]> *hub,
324 std::unique_ptr<MessageSegmenter> segmenter)
325 : StateFlowBase(hub->get_service())
326 , parent_(parent)
327 , segmenter_(std::move(segmenter))
328 , hub_(hub)
329 {
330 segmenter_->clear();
331 flowWaiting_ = true;
333 }
334
342 void input_data(const uint8_t *data, size_t len)
343 {
344 while (len)
345 {
346 uint8_t *dst = nullptr;
347 size_t free = 0;
348 if (!appendBuf_.free())
349 {
350 DataBuffer *p;
351 g_direct_hub_data_pool.alloc(&p);
353 }
355 free = appendBuf_.free();
356 if (len < free) {
357 free = len;
358 }
359 memcpy(dst, data, free);
360 len -= free;
361 data += free;
363 }
364 {
365 AtomicHolder h(this);
367 if (flowWaiting_) {
368 flowWaiting_ = false;
369 notify();
370 }
371 }
373 }
374
384
389 size_t len = 0;
390 const uint8_t* ptr = segmentBuf_.data_read_pointer(&len);
391 if (!len) {
392 AtomicHolder h(this);
393 if (transferBuf_.size()) {
395 }
396 flowWaiting_ = true;
398 }
399 ssize_t segment_size = segmenter_->segment_message(ptr, len);
400 size_t xfer = len;
401 if (segment_size) {
402 xfer = segment_size - outputBuf_.size();
403 }
405 p.reset(segmentBuf_, xfer);
408 if (segment_size) {
409 segmenter_->clear();
410 // completed data
412 } else {
413 // Need to segment more data.
414 return again();
415 }
416 }
417
421 // We expect either an inline call to our run() method or
422 // later a callback on the executor. This sequence of calls
423 // prepares for both of those options.
425 inlineCall_ = 1;
426 sendComplete_ = 0;
427 hub_->enqueue_send(this); // causes the callback
428 inlineCall_ = 0;
429 if (sendComplete_)
430 {
431 return send_done();
432 }
433 return wait();
434 }
435
441 {
442 auto *m = hub_->mutable_message();
444 //m->set_done(buf_.tail()->new_child());
445 m->source_ = parent_;
446 // This call transfers the chained head of the current buffers,
447 // taking additional references where necessary or transferring the
448 // existing reference. It adjusts the skip_ and size_ arguments in
449 // buf_ to continue from where we left off.
450 m->buf_ = std::move(outputBuf_);
451 hub_->do_send();
452 sendComplete_ = 1;
453 if (inlineCall_)
454 {
455 // do not disturb current state.
456 return wait();
457 }
458 else
459 {
460 // we were called queued; go back to running the flow on the
461 // main executor.
462 return yield_and_call(STATE(send_done));
463 }
464 }
465
466 Action send_done()
467 {
468 // Goes back to looking at more data from the transfered buffers.
470 }
471
472 private:
473 friend class BLEHubPort;
474
477
482 uint8_t inlineCall_ : 1;
484 uint8_t sendComplete_ : 1;
485
498 std::unique_ptr<MessageSegmenter> segmenter_;
501
502 };
503
508 {
510 };
514 typedef Q QueueType;
515
518 {
519 return pendingQueue_.lock();
520 }
521
526 bool notRunning_ : 1;
531
541
545
552
555
556 InputFlow input_;
557
558}; // class BLEHubPort
559
560} // namespace openlcb
561
562#endif // _OPENLCB_BLEHUBPORT_HXX_
DynamicPool * mainBufferPool
main buffer pool instance
Definition Buffer.cxx:37
AutoReleaseBuffer< T > BufferPtr
Smart pointer for buffers.
Definition Buffer.hxx:259
#define STATE(_fn)
Turns a function name into an argument to be supplied to functions expecting a state.
Definition StateFlow.hxx:61
See OSMutexLock in os/OS.hxx.
Definition Atomic.hxx:153
Lightweight locking class for protecting small critical sections.
Definition Atomic.hxx:130
void set_done(BarrierNotifiable *done)
Specifies that a given BarrierNotifiable must be called when the Buffer is deallocated (unreffed to z...
Definition Buffer.hxx:97
Base class for all QMember types that hold data in an expandable format.
Definition Buffer.hxx:195
void unref()
Decrement count.
Definition Buffer.hxx:675
T * data()
get a pointer to the start of the data.
Definition Buffer.hxx:215
Proxy Pool that can allocate DataBuffer objects of a certain size.
void alloc(DataBuffer **result)
Get a free item out of the pool with untyped data of the size specified in the constructor.
Specialization of the Buffer class that is designed for storing untyped data arrays.
Interface for a the central part of a hub.
virtual MessageAccessor< T > * mutable_message()=0
Accessor to fill in the message payload.
virtual void enqueue_send(Executable *caller)=0
Signals that the caller wants to send a message to the hub.
virtual void unregister_port(DirectHubPort< T > *port)=0
Synchronously removes a port from this hub.
virtual void do_send()=0
Sends a message to the hub.
Interface for a downstream port of a hub (aka a target to send data to).
void sync_run(std::function< void()> fn)
Synchronously runs a closure on this executor.
Definition Executor.cxx:151
A class that keeps ownership of a chain of linked DataBuffer references.
bool try_append_from(const LinkedDataBufferPtr &o, bool add_link=false)
Attempt to combine *this with o into a single LinkedDataBufferPtr this.
uint8_t * data_write_pointer()
void data_read_advance(size_t len)
Advances the head pointer.
void reset(const LinkedDataBufferPtr &o, ssize_t size=-1)
Takes a reference of o, taking a prefix of len size (or all the data).
unsigned size() const
void append_empty_buffer(DataBuffer *buf)
Adds an empty buffer to the end of this buffer chain.
void data_write_advance(size_t len)
Advances the tail pointer after a write occurred into the tail.
size_t free() const
const uint8_t * data_read_pointer(size_t *len)
Retrieves a pointer where data can be read out of the buffer.
An object that can schedule itself on an executor to run.
void alloc(Buffer< BufferType > **result, Executable *flow=NULL)
Get a free item out of the pool.
Definition Buffer.hxx:292
This class implements a linked list "queue" of buffers.
Definition Queue.hxx:98
void insert_locked(QMember *item, unsigned index=0)
Add an item to the back of the queue.
Definition Queue.hxx:146
Result next_locked()
Get an item from the front of the queue.
Definition Queue.hxx:186
size_t pending(unsigned index)
Get the number of pending items in the queue.
Definition Queue.hxx:208
bool empty(unsigned index)
Test if the queue is empty.
Definition Queue.hxx:225
QMember * next(unsigned index)
Get an item from the front of the queue.
Definition Queue.hxx:167
Collection of related state machines that pend on incoming messages.
ExecutorBase * executor()
Return type for a state flow callback.
Base class for state machines.
Action yield_and_call(Callback c)
Place the current flow to the back of the executor, and transition to a new state after we get the CP...
Service * service()
Return a pointer to the service I am bound to.
void notify() override
Wakeup call arrived.
Definition StateFlow.cxx:97
StateFlowBase()
Default constructor.
Action yield()
Place the current flow to the back of the executor, and re-try the current state after we get the CPU...
Action delete_this()
Terminates the flow and deletes *this.
Action wait()
Wait for an asynchronous call.
Action again()
Call the current state again via call_immediately.
Action call_immediately(Callback c)
Imediately call the next state upon return.
Action wait_and_call(Callback c)
Wait for resource to become available before proceeding to next state.
State flow that handles data arriving from this bluetooth connection.
LinkedDataBufferPtr segmentBuf_
Buffer for the data being segmented.
bool flowWaiting_
True if the flow is paused and waiting for more data to arrive.
Action segment_head()
Takes the head of segmentBuf_, and performs the message segmentation on it.
uint8_t inlineCall_
1 if we got the send callback inline from the read_done.
DirectHubInterface< uint8_t[]> * hub_
Parent hub where output data is coming from.
Action take_input()
Moves over data from the other thread which is in the transferBuf_.
LinkedDataBufferPtr outputBuf_
Buffer for one sent message. This is the output of the segmenter.
Action send_callback()
This is the callback state that is invoked inline by the hub.
LinkedDataBufferPtr transferBuf_
Buffer for transferring data rfrom the input thread to the service thread.
BLEHubPort * parent_
Owning outside flow.
uint8_t sendComplete_
1 if the run callback actually happened inline.
void input_data(const uint8_t *data, size_t len)
Called by the BLE stack when input data arrives from this remote endpoint.
LinkedDataBufferPtr appendBuf_
Current buffer for the input data. Owned by the input thread.
Action send_output()
Called when one full message is segmented into outputBuf_.
std::unique_ptr< MessageSegmenter > segmenter_
Implementation (and state) of the business logic that segments incoming bytes into messages that shal...
bool notRunning_
1 if the state flow is paused, waiting for the notification.
void send(MessageAccessor< uint8_t[]> *msg) override
Synchronous output routine called by the hub.
std::function< void(const uint8_t *data, size_t len)> SendFunction
This function needs to be implemented by the application that has the specific BLE stack.
static constexpr size_t MAX_BYTES_PER_WRITE
How big can a single attribute write be? ESP's BLE implementation says 600 bytes.
Action shutdown_and_exit()
Invoked after pendingShutdown == true.
SendFunction sendFunction_
Function object used to send out actual data.
void ack()
Called by the BLE stack, when a send function is completed.
size_t totalPendingSize_
Total number of bytes in the pendingQueue.
bool pendingShutdown_
True if we have an error and we are trying to shut down.
void nack()
Called by the BLE stack, when a send has failed.
QueueType pendingQueue_
Contains buffers of OutputDataEntries to write.
void disconnect_and_delete() override
Notifies the protocol engine that the connection has been terminated.
Buffer< OutputDataEntry > BufferType
Type of buffers we are enqueuing for output.
void input_data(const uint8_t *data, size_t len)
Called by the BLE stack when input data arrives from this remote endpoint.
OutputDataEntry * pendingTail_
Last tail pointer in the pendingQueue.
bool waitingForAck_
true if the write flow is paused waiting for the BLE stack to ack the data.
Action read_queue()
Entry point to the flow, when an outgoing message got into the queue and we are woken up.
BLEHubPort(DirectHubInterface< uint8_t[]> *hub, std::unique_ptr< MessageSegmenter > segmenter, Service *ble_write_service, SendFunction send_function, Notifiable *on_error=nullptr)
Constructor.
Notifiable * onError_
This notifiable will be called before exiting.
BufferPtr< OutputDataEntry > currentHead_
The buffer that is taken out of the queue while flushing.
int sendPending_
Number of in-flight messages sent but not acknowledged.
Q QueueType
Type of the queue used to keep the output buffer queue.
Shared base class for protocol implementation on a per-BLE-connection basis.
Definition BLEDefs.hxx:42
#define LOG(level, message...)
Conditionally write a message to the logging output.
Definition logging.h:99
static const int INFO
Loglevel that is printed by default, reporting some status information.
Definition logging.h:57
static const int ALWAYS
Loglevel that is always printed.
Definition logging.h:49
#define HASSERT(x)
Checks that the value of expression x is true, else terminates the current process.
Definition macros.h:138
Typed message class.
Definition DirectHub.hxx:93
QMember * item
item pulled from queue
Definition Queue.hxx:84
Holds the necessary information we need to keep in the queue about a single output entry.