Open Model Railroad Network (OpenMRN)
Loading...
Searching...
No Matches
DirectHub.cxx
Go to the documentation of this file.
1
37// #define LOGLEVEL VERBOSE
38
39#include "utils/DirectHub.hxx"
40
41#include <algorithm>
42#include <fcntl.h>
43#include <sys/socket.h>
44#include <sys/types.h>
45#include <vector>
46
49#include "nmranet_config.h"
50#include "utils/logging.h"
52
57 config_directhub_port_incoming_buffer_size());
62
67{
68public:
69 typedef Q QueueType;
70
72 : Service(e)
73 , busy_(0)
74 {
75 }
76
79 {
80 return pendingSend_.lock();
81 }
82
88 {
89 {
90 AtomicHolder h(lock());
91 if (busy_)
92 {
108 return;
109 }
110 busy_ = 1;
111 }
112 caller->run();
113 }
114
118 void on_done()
119 {
120 // De-queues the next entry.
121 Result deq;
122 {
123 AtomicHolder h(lock());
124 if (pendingSend_.empty())
125 {
126 busy_ = 0;
127 return;
128 }
129 deq = pendingSend_.next();
130 }
131 // Schedules it on the executor.
132 executor()->add(static_cast<Executable *>(deq.item), deq.index);
133 }
134
136 unsigned busy_ : 1;
139};
140
141template <class T>
143 protected StateFlowBase,
144 private Atomic
145{
146public:
148 : StateFlowBase(service)
149 {
150 }
151
153 {
154 delete service();
155 }
156
158 {
159 return service();
160 }
161
162 void register_port(DirectHubPort<T> *port) override
163 {
164 AtomicHolder h(this);
165 ports_.push_back(port);
166 }
167
170 {
172 unregister_port(port, &n);
174 }
175
179 void unregister_port(DirectHubPort<T> *port, Notifiable *done) override
180 {
181 // By enqueueing on the service we ensure that the state flow is not
182 // processing any packets while the code below is running.
183 service()->enqueue_caller(new CallbackExecutable([this, port, done]() {
184 {
185 AtomicHolder h(this);
186 ports_.erase(std::remove(ports_.begin(), ports_.end(), port),
187 ports_.end());
188 }
189 done->notify();
190 service()->on_done();
191 }));
192 }
193
194 void enqueue_send(Executable *caller) override
195 {
196 service()->enqueue_caller(caller);
197 }
198
200 {
201 return &msg_;
202 }
203
204 void do_send() override
205 {
206 unsigned next_port = 0;
207 while (true)
208 {
210 {
211
212 AtomicHolder h(this);
213 if (next_port >= ports_.size())
214 {
215 break;
216 }
217 p = ports_[next_port];
218 ++next_port;
219 }
220 if (should_send_to(p))
221 {
222 p->send(&msg_);
223 }
224 }
225 msg_.clear();
226 service()->on_done();
227 }
228
233 {
234 return static_cast<HubSource *>(p) != msg_.source_;
235 }
236
237private:
238 DirectHubService *service()
239 {
240 return static_cast<DirectHubService *>(StateFlowBase::service());
241 }
242
244 std::vector<DirectHubPort<T> *> ports_;
245
248}; // class DirectHubImpl
249
252{
253 auto *s = new DirectHubService(e);
254 auto *dh = new DirectHubImpl<uint8_t[]>(s);
255 return dh;
256}
257
262class DirectHubPortSelect : public DirectHubPort<uint8_t[]>,
263 private StateFlowBase
264{
265private:
268 {
269 public:
271 std::unique_ptr<MessageSegmenter> segmenter)
272 : StateFlowBase(parent->service())
273 , parent_(parent)
274 , segmenter_(std::move(segmenter))
275 {
276 segmenter_->clear();
277 }
278
280 void start()
281 {
283 }
284
289 {
290 auto *e = this->service()->executor();
291 if (e->is_selected(&helper_))
292 {
293 // We're waiting in select on reads, we can cancel right now.
294 e->unselect(&helper_);
296 buf_.reset();
300 }
301 // Else we're waiting for the regular progress to wake up the
302 // flow. It will check fd_ < 0 to exit.
303 }
304
305 private:
309 {
310 QMember *bn = pendingLimiterPool_.next().item;
311 if (bn)
312 {
314 return get_read_buffer();
315 }
316 else
317 {
320 }
321 }
322
333
336 {
337 DataBuffer *p;
338 LOG(VERBOSE, "read flow %p (fd %d): notif %p alloc() %u", this,
340 (unsigned)mainBufferPool->total_size());
341 // Since there is a limit on how many bufferNotifiable_'s can be,
342 // and they are uniquely assigned to the buffers, we know that this
343 // synchronous allocation can only happen for a few buffers
344 // only. The buffers will get recycled through the main buffer pool
345 // exactly at the time when the bufferNotifiable_ comes back to the
346 // pendingLimiterPool_.
348 if (buf_.head())
349 {
351 }
352 else
353 {
354 buf_.reset(p);
355 }
357 bufferNotifiable_ = nullptr;
358 return do_some_read();
359 }
360
361 Action do_some_read()
362 {
363 if (parent_->fd_ < 0)
364 {
365 // Socket closed, terminate and exit.
367 buf_.reset();
369 return wait();
370 }
371 return read_single(&helper_, parent_->fd_,
372 buf_.data_write_pointer(), buf_.free(), STATE(read_done));
373 }
374
375 Action read_done()
376 {
377 if (helper_.hasError_)
378 {
379 LOG(INFO, "%p: Error reading from fd %d: (%d) %s", parent_,
380 parent_->fd_, errno, strerror(errno));
382 buf_.reset();
384 return wait();
385 }
386 size_t bytes_arrived = buf_.free() - helper_.remaining_;
387 segmentSize_ = segmenter_->segment_message(
388 buf_.data_write_pointer(), bytes_arrived);
389 buf_.data_write_advance(bytes_arrived);
390 return eval_segment();
391 }
392
396 {
397 if (segmentSize_ > 0)
398 {
399 // Complete message.
400 segmenter_->clear();
402 }
403 else
404 {
405 return incomplete_message();
406 }
407 }
408
412 {
413 uint8_t *ptr;
414 unsigned available;
415 auto *n =
416 buf_.head()->get_read_pointer(buf_.skip(), &ptr, &available);
417 HASSERT(!n); // We must be at the tail.
418 segmentSize_ = segmenter_->segment_message(ptr, available);
419 return eval_segment();
420 }
421
425 {
426 if (!buf_.free())
427 {
429 }
430 return call_immediately(STATE(do_some_read));
431 }
432
436 {
437 // We expect either an inline call to our run() method or
438 // later a callback on the executor. This sequence of calls
439 // prepares for both of those options.
441 inlineCall_ = 1;
442 sendComplete_ = 0;
443 parent_->hub_->enqueue_send(this); // causes the callback
444 inlineCall_ = 0;
445 if (sendComplete_)
446 {
447 return send_done();
448 }
449 return wait();
450 }
451
457 {
458 auto *m = parent_->hub_->mutable_message();
459 m->set_done(buf_.tail()->new_child());
460 m->source_ = parent_;
461 // This call transfers the chained head of the current buffers,
462 // taking additional references where necessary or transferring the
463 // existing reference. It adjusts the skip_ and size_ arguments in
464 // buf_ to continue from where we left off.
466 parent_->hub_->do_send();
467 sendComplete_ = 1;
468 if (inlineCall_)
469 {
470 // do not disturb current state.
471 return wait();
472 }
473 else
474 {
475 // we were called queued; go back to running the flow on the
476 // main executor.
478 }
479 }
480
482 {
483 if (buf_.size())
484 {
485 // We still have unused data in the current buffer. We have to
486 // segment that and send it to the hub.
487 return call_head_segmenter();
488 }
489 if (buf_.free())
490 {
491 // We still have space in the current buffer. We can read more
492 // data into that space.
493 return do_some_read();
494 }
495 else
496 {
499 buf_.reset();
500 return alloc_for_read();
501 }
502 }
503
511 uint16_t inlineCall_ : 1;
513 uint16_t sendComplete_ : 1;
517 (unsigned)config_directhub_port_max_incoming_packets()};
524 std::unique_ptr<MessageSegmenter> segmenter_;
525 } readFlow_;
526
527 friend class DirectHubReadFlow;
528
529public:
530 DirectHubPortSelect(DirectHubInterface<uint8_t[]> *hub, int fd,
531 std::unique_ptr<MessageSegmenter> segmenter,
532 Notifiable *on_error = nullptr)
533 : StateFlowBase(hub->get_service())
534 , readFlow_(this, std::move(segmenter))
537 , hub_(hub)
538 , fd_(fd)
539 , onError_(on_error)
540 {
541#ifdef __WINNT__
542 unsigned long par = 1;
543 ioctlsocket(fd_, FIONBIO, &par);
544#else
545 ::fcntl(fd, F_SETFL, O_RDWR | O_NONBLOCK);
546#endif
547
548 // Sets the initial state of the write flow to the stage where we read
549 // the next entry from the queue.
550 wait_and_call(STATE(read_queue));
551 notRunning_ = 1;
552
553 hub_->register_port(this);
554 readFlow_.start();
555 LOG(VERBOSE, "%p create fd %d", this, fd_);
556 }
557
559 {
560 }
561
563 void send(MessageAccessor<uint8_t[]> *msg) override
564 {
565 if (fd_ < 0)
566 {
567 // Port already closed. Ignore data to send.
568 return;
569 }
570 {
571 AtomicHolder h(lock());
572 if (pendingTail_ && pendingTail_->buf_.try_append_from(msg->buf_))
573 {
574 // Successfully enqueued the bytes into the tail of the queue.
575 // Nothing else to do here.
576 return;
577 }
578 }
579
582 BufferType *b;
584 b->data()->buf_.reset(msg->buf_);
585 if (msg->done_)
586 {
587 b->set_done(msg->done_->new_child());
588 }
589 // Checks if we need to wake up the flow.
590 {
591 AtomicHolder h(lock());
592 if (fd_ < 0)
593 {
594 // Catch race condition when port is already closed.
595 b->unref();
596 return;
597 }
599 totalPendingSize_ += msg->buf_.size();
600 pendingTail_ = b->data();
601 if (notRunning_)
602 {
603 notRunning_ = 0;
604 }
605 else
606 {
607 // flow already running. Skip notify.
608 return;
609 }
610 }
611 notify();
612 }
613
614private:
617 void shutdown()
618 {
619 HASSERT(fd_ < 0);
620 {
621 AtomicHolder h(lock());
622 if (notRunning_)
623 {
624 // Queue is empty, waiting for new entries. There will be no new
625 // entries because fd_ < 0.
626 hub_->unregister_port(this, this);
628 }
629 // Else eventually we will get to check_for_new_message() which will
630 // flush the queue, unregister the port and exit.
631 }
632 }
633
634 Action read_queue()
635 {
636 BufferType *head;
637 {
638 AtomicHolder h(lock());
639 head = static_cast<BufferType *>(pendingQueue_.next_locked().item);
640 HASSERT(head);
641 if (head->data() == pendingTail_)
642 {
643 pendingTail_ = nullptr;
644 }
645 }
646 currentHead_.reset(head);
647 nextToWrite_ = currentHead_->data()->buf_.head();
648 nextToSkip_ = currentHead_->data()->buf_.skip();
649 nextToSize_ = currentHead_->data()->buf_.size();
650 return do_write();
651 }
652
653 Action do_write()
654 {
655 if (fd_ < 0)
656 {
657 // fd closed. Drop data to the floor.
659 return check_for_new_message();
660 }
661 uint8_t *data;
662 unsigned len;
664 if (len > nextToSize_)
665 {
666 len = nextToSize_;
667 }
668 nextToSkip_ = 0;
669 nextToSize_ -= len;
670 totalPendingSize_ -= len;
671 totalWritten_ += len;
672 LOG(VERBOSE, "write %u total %zu", (unsigned)len, totalWritten_);
673 return write_repeated(
674 &selectHelper_, fd_, data, len, STATE(write_done));
675 }
676
677 Action write_done()
678 {
680 {
681 LOG(INFO, "%p: Error writing to fd %d: (%d) %s", this, fd_, errno,
682 strerror(errno));
683 // will close fd and notify the reader flow to exit.
685 // Flushes the queue of messages. fd_ == -1 now so no write will be
686 // attempted.
687 return check_for_new_message();
688 }
689 if (nextToSize_)
690 {
691 return do_write();
692 }
693 return check_for_new_message();
694 }
695
696 Action check_for_new_message()
697 {
698 currentHead_.reset();
699 AtomicHolder h(lock());
700 if (pendingQueue_.empty())
701 {
702 if (fd_ < 0)
703 {
704 // unregisters the port. All the queue has been flushed now.
705 hub_->unregister_port(this, this);
707 }
708 notRunning_ = 1;
709 return wait_and_call(STATE(read_queue));
710 }
711 else
712 {
713 return call_immediately(STATE(read_queue));
714 }
715 }
716
719 {
721 currentHead_.reset();
723 return wait();
724 }
725
732 {
733 int close_fd = -1;
734 {
735 AtomicHolder h(lock());
736 if (fd_ >= 0)
737 {
738 std::swap(fd_, close_fd);
739 }
740 }
741 if (close_fd >= 0)
742 {
743 ::close(close_fd);
744 }
745 readFlow_.read_shutdown();
746 }
747
753 {
754 int close_fd = -1;
755 {
756 AtomicHolder h(lock());
757 if (fd_ >= 0)
758 {
759 std::swap(fd_, close_fd);
760 }
761 }
762 if (close_fd >= 0)
763 {
764 ::close(close_fd);
765 }
766 // take read barrier
768 // kill write flow
769 shutdown();
770 }
771
775 {
776 LOG(VERBOSE, "%p exit read", this);
777 flow_exit(true);
778 }
779
782 {
783 LOG(VERBOSE, "%p exit write", this);
784 flow_exit(false);
785 }
786
791 void flow_exit(bool read)
792 {
793 bool del = false;
794 {
795 AtomicHolder h(lock());
796 if (read)
797 {
799 }
800 else
801 {
803 }
804 if (writeFlowPending_ == 0 && readFlowPending_ == 0)
805 {
806 del = true;
807 }
808 }
809 if (del)
810 {
811 if (onError_)
812 {
813 onError_->notify();
814 }
815 delete this;
816 }
817 }
818
821 {
822 return pendingQueue_.lock();
823 }
824
829 {
831 };
832
833 friend class DirectHubReadFlow;
834
838 typedef Q QueueType;
839
841 size_t totalWritten_ {0};
842
848 unsigned nextToSkip_;
850 unsigned nextToSize_;
854 // long long lastWriteTimeNsec_ = 0;
855
864 uint8_t notRunning_ : 1;
872 int fd_;
875};
876
877extern DirectHubPortSelect *g_last_direct_hub_port;
878DirectHubPortSelect *g_last_direct_hub_port = nullptr;
879
880void create_port_for_fd(DirectHubInterface<uint8_t[]> *hub, int fd,
881 std::unique_ptr<MessageSegmenter> segmenter, Notifiable *on_error)
882{
883 g_last_direct_hub_port =
884 new DirectHubPortSelect(hub, fd, std::move(segmenter), on_error);
885}
886
888{
889public:
895 DirectGcTcpHub(DirectHubInterface<uint8_t[]> *gc_hub, int port);
897
900 {
901 return tcpListener_.is_started();
902 }
903
904private:
909 void OnNewConnection(int fd);
910
915};
916
918{
919#if 0
920 uint32_t rcvbuf;
921 socklen_t len = sizeof(rcvbuf);
922 int ret = getsockopt(fd, SOL_SOCKET, SO_RCVBUF, &rcvbuf, &len);
923 if (ret >= 0)
924 {
925 LOG(ALWAYS, "Socket rcvbuf %u", (unsigned)rcvbuf);
926 }
927#endif
929 std::unique_ptr<MessageSegmenter>(create_gc_message_segmenter()));
930}
931
933 : gcHub_(gc_hub)
934 , tcpListener_(port,
935 std::bind(
936 &DirectGcTcpHub::OnNewConnection, this, std::placeholders::_1))
937{
938}
939
940DirectGcTcpHub::~DirectGcTcpHub()
941{
943}
944
945void create_direct_gc_tcp_hub(DirectHubInterface<uint8_t[]> *hub, int port)
946{
947 new DirectGcTcpHub(hub, port);
948}
DynamicPool * mainBufferPool
main buffer pool instance
Definition Buffer.cxx:37
AutoReleaseBuffer< T > BufferPtr
Smart pointer for buffers.
Definition Buffer.hxx:259
DirectHubInterface< uint8_t[]> * create_hub(ExecutorBase *e)
Temporary function to instantiate the hub.
DataBufferPool g_direct_hub_kbyte_pool(1024)
This object forwards allocations to mainBufferPool.
void create_port_for_fd(DirectHubInterface< uint8_t[]> *hub, int fd, std::unique_ptr< MessageSegmenter > segmenter, Notifiable *on_error)
Creates a hub port of byte stream type reading/writing a given fd.
DataBufferPool g_direct_hub_data_pool(config_directhub_port_incoming_buffer_size())
This object forwards allocations to mainBufferPool.
void create_direct_gc_tcp_hub(DirectHubInterface< uint8_t[]> *hub, int port)
Creates a new GridConnect listener on a given TCP port.
MessageSegmenter * create_gc_message_segmenter()
Creates a message segmenter for gridconnect data.
int fcntl(int fd, int cmd,...)
Manipulate a file descriptor.
Definition Fileio.cxx:494
int bind(int socket, const struct sockaddr *address, socklen_t address_len)
Bind a name to a socket.
Definition Socket.cxx:159
int getsockopt(int socket, int level, int option_name, void *option_value, socklen_t *option_len)
Get the socket options.
Definition Socket.cxx:272
#define STATE(_fn)
Turns a function name into an argument to be supplied to functions expecting a state.
Definition StateFlow.hxx:61
A block of BarrierNotifiable objects, with an asynchronous allocation call.
BarrierNotifiable * initialize(QMember *entry)
Turns an allocated entry from the QAsync into a usable BarrierNotifiable.
See OSMutexLock in os/OS.hxx.
Definition Atomic.hxx:153
Lightweight locking class for protecting small critical sections.
Definition Atomic.hxx:130
A BarrierNotifiable allows to create a number of child Notifiable and wait for all of them to finish.
BarrierNotifiable * new_child()
Creates a new child notifiable of the current done notifiable.
Definition Buffer.hxx:108
void set_done(BarrierNotifiable *done)
Specifies that a given BarrierNotifiable must be called when the Buffer is deallocated (unreffed to z...
Definition Buffer.hxx:97
Base class for all QMember types that hold data in an expandable format.
Definition Buffer.hxx:195
void unref()
Decrement count.
Definition Buffer.hxx:675
T * data()
get a pointer to the start of the data.
Definition Buffer.hxx:215
A notifiable class that calls a particular function object once when it is invoked,...
Proxy Pool that can allocate DataBuffer objects of a certain size.
void alloc(DataBuffer **result)
Get a free item out of the pool with untyped data of the size specified in the constructor.
Specialization of the Buffer class that is designed for storing untyped data arrays.
uint8_t * data()
DataBuffer * get_read_pointer(unsigned skip, uint8_t **ptr, unsigned *available)
Helper function to read out data from a linked data buffer.
void OnNewConnection(int fd)
Callback when a new connection arrives.
DirectGcTcpHub(DirectHubInterface< uint8_t[]> *gc_hub, int port)
Constructor.
DirectHubInterface< uint8_t[]> * gcHub_
Direct GridConnect hub.
SocketListener tcpListener_
Helper object representing the listening on the socket.
void do_send() override
Sends a message to the hub.
void enqueue_send(Executable *caller) override
Signals that the caller wants to send a message to the hub.
MessageAccessor< T > msg_
The message we are trying to send.
void register_port(DirectHubPort< T > *port) override
Adds a port to this hub.
void unregister_port(DirectHubPort< T > *port, Notifiable *done) override
Removes a port from this hub.
MessageAccessor< T > * mutable_message() override
Accessor to fill in the message payload.
bool should_send_to(DirectHubPort< T > *p)
Filters a message going towards a specific output port.
void unregister_port(DirectHubPort< T > *port) override
Synchronously unregisters a port.
std::vector< DirectHubPort< T > * > ports_
Stores the registered output ports. Protected by Atomic *this.
Service * get_service() override
Interface for a the central part of a hub.
virtual MessageAccessor< T > * mutable_message()=0
Accessor to fill in the message payload.
virtual void register_port(DirectHubPort< T > *port)=0
Adds a port to this hub.
virtual void enqueue_send(Executable *caller)=0
Signals that the caller wants to send a message to the hub.
virtual void unregister_port(DirectHubPort< T > *port)=0
Synchronously removes a port from this hub.
virtual void do_send()=0
Sends a message to the hub.
State flow that reads the FD and sends the read data to the direct hub.
uint16_t inlineCall_
1 if we got the send callback inline from the read_done.
AsyncNotifiableBlock pendingLimiterPool_
Pool of BarrierNotifiables that limit the amount of inflight bytes we have.
Action send_prefix()
Called to send a given prefix segment to the hub.
void read_shutdown()
Requests the read port to shut down.
Action barrier_allocated()
Intermediate step if asynchronous allocation was necessary for the read barrier.
DirectHubPortSelect * parent_
Pointer to the owninng port.
std::unique_ptr< MessageSegmenter > segmenter_
Implementation (and state) of the business logic that segments incoming bytes into messages that shal...
uint16_t sendComplete_
1 if the run callback actually happened inline.
Action send_callback()
This is the callback state that is invoked inline by the hub.
ssize_t segmentSize_
Output of the last segmenter call.
BarrierNotifiable * bufferNotifiable_
Barrier notifiable to keep track of the buffer's contents.
void start()
Starts the current flow.
Action incomplete_message()
Called when the segmenter says that we need to read more bytes to complete the current message.
StateFlowSelectHelper helper_
Helper object for Select.
Action alloc_for_read()
Root of the read flow.
LinkedDataBufferPtr buf_
Current buffer that we are filling.
Action call_head_segmenter()
Clears the segmenter and starts segmenting from the beginning of the buf_.
Action get_read_buffer()
Invoked when we have a bufferNotifiable_ from the barrier pool.
Action eval_segment()
Checks the segmenter output; if it indicates a complete message, clears the segmenter and sends off t...
Connects a (bytes typed) hub to an FD.
void flow_exit(bool read)
Marks a flow to be exited, and once both are exited, notifies done and deletes this.
void send(MessageAccessor< uint8_t[]> *msg) override
Synchronous output routine called by the hub.
OutputDataEntry * pendingTail_
Last tail pointer in the pendingQueue.
uint8_t notRunning_
1 if the state flow is paused, waiting for the notification.
StateFlowSelectHelper selectHelper_
Helper object for performing asynchronous writes.
unsigned nextToSkip_
Skip_ parameter matching nextToWrite_;.
uint8_t writeFlowPending_
1 if the write flow is still running.
Action report_and_exit()
Terminates the flow, reporting to the barrier.
QueueType pendingQueue_
Time when the last buffer flush has happened. Not used yet.
Notifiable * onError_
This notifiable will be called before exiting.
void shutdown()
Called on the main executor when a read error wants to cancel the write flow.
int fd_
File descriptor for input/output.
void write_flow_exit()
Marks the write flow as exited. May delete this.
uint8_t readFlowPending_
1 if the read flow is still running.
DirectHubInterface< uint8_t[]> * hub_
Parent hub where output data is coming from.
void report_read_error()
Callback from the ReadFlow when the read call has seen an error.
void report_write_error()
Called by the write flow when it sees an error.
Q QueueType
Type of the queue used to keep the output buffer queue.
unsigned nextToSize_
Size_ parameter matching nextToWrite_;.
void read_flow_exit()
Callback from the read flow that it has exited.
size_t totalPendingSize_
Total numberof bytes in the pendingQueue.
Buffer< OutputDataEntry > BufferType
Type of buffers we are enqueuing for output.
DataBuffer * nextToWrite_
Data we are currently writing to a buffer.
size_t totalWritten_
total number of bytes written to the port.
BufferPtr< OutputDataEntry > currentHead_
The buffer that is taken out of the queue while flushing.
Interface for a downstream port of a hub (aka a target to send data to).
virtual void send(MessageAccessor< T > *msg)=0
Send some data out on this port.
A single service class that is shared between all interconnected DirectHub instances.
Definition DirectHub.cxx:67
Atomic * lock()
Definition DirectHub.cxx:78
void enqueue_caller(Executable *caller)
Adds a caller to the waiting list of who wants to send traffic to the hub.
Definition DirectHub.cxx:87
void on_done()
This function must be called at the end of the enqueued functions in order to properly clear the busy...
QueueType pendingSend_
List of callers that are waiting for the busy_ lock.
unsigned busy_
1 if there is any message being processed right now.
An object that can be scheduled on an executor to run.
virtual void run()=0
Entry point.
This class implements an execution of tasks pulled off an input queue.
Definition Executor.hxx:64
virtual void add(Executable *action, unsigned priority=UINT_MAX)=0
Send a message to this Executor's queue.
void unselect(Selectable *job)
Removes a job from the select loop.
Definition Executor.cxx:359
Empty class that can be used as a pointer for identifying where a piece of data came from.
Definition DirectHub.hxx:48
A class that keeps ownership of a chain of linked DataBuffer references.
bool try_append_from(const LinkedDataBufferPtr &o, bool add_link=false)
Attempt to combine *this with o into a single LinkedDataBufferPtr this.
uint8_t * data_write_pointer()
DataBuffer * tail() const
unsigned skip() const
void reset(const LinkedDataBufferPtr &o, ssize_t size=-1)
Takes a reference of o, taking a prefix of len size (or all the data).
unsigned size() const
LinkedDataBufferPtr transfer_head(size_t len)
Transfers the ownership of the prefix of this buffer.
void append_empty_buffer(DataBuffer *buf)
Adds an empty buffer to the end of this buffer chain.
DataBuffer * head() const
void data_write_advance(size_t len)
Advances the tail pointer after a write occurred into the tail.
size_t free() const
An object that can schedule itself on an executor to run.
virtual void notify()=0
Generic callback.
size_t total_size()
Definition Buffer.hxx:281
void alloc(Buffer< BufferType > **result, Executable *flow=NULL)
Get a free item out of the pool.
Definition Buffer.hxx:292
void next_async(Executable *flow)
Get an item from the front of the queue.
Definition Queue.hxx:305
QMember * next(unsigned index)
Get an item from the front of the queue.
Definition Queue.hxx:334
Essentially a "next" pointer container.
Definition QMember.hxx:42
This class implements a linked list "queue" of buffers.
Definition Queue.hxx:98
void insert_locked(QMember *item, unsigned index=0)
Add an item to the back of the queue.
Definition Queue.hxx:146
Result next_locked()
Get an item from the front of the queue.
Definition Queue.hxx:186
bool empty(unsigned index)
Test if the queue is empty.
Definition Queue.hxx:225
QMember * next(unsigned index)
Get an item from the front of the queue.
Definition Queue.hxx:167
Collection of related state machines that pend on incoming messages.
ExecutorBase * executor()
void shutdown()
Shuts down the socket listener.
Return type for a state flow callback.
Base class for state machines.
Action yield_and_call(Callback c)
Place the current flow to the back of the executor, and transition to a new state after we get the CP...
Action read_single(StateFlowSelectHelper *helper, int fd, void *buf, size_t size, Callback c, unsigned priority=Selectable::MAX_PRIO)
Attempts to read at most size_t bytes, and blocks the caller until at least one byte is read.
Service * service()
Return a pointer to the service I am bound to.
void notify() override
Wakeup call arrived.
Definition StateFlow.cxx:97
StateFlowBase()
Default constructor.
void start_flow(Callback c)
Resets the flow to the specified state and starts it.
Action wait()
Wait for an asynchronous call.
Action call_immediately(Callback c)
Imediately call the next state upon return.
Action set_terminated()
Sets the flow to terminated state.
Action wait_and_call(Callback c)
Wait for resource to become available before proceeding to next state.
void cast_allocation_result(T **member)
Takes the result of the asynchronous allocation without resetting the object.
Action write_repeated(StateFlowSelectHelper *helper, int fd, const void *buf, size_t size, Callback c, unsigned priority=Selectable::MAX_PRIO)
Writes some data into a file descriptor, repeating the operation as necessary until all bytes are wri...
A Notifiable for synchronously waiting for a notification.
void wait_for_notification()
Blocks the current thread until the notification is delivered.
#define LOG(level, message...)
Conditionally write a message to the logging output.
Definition logging.h:99
static const int VERBOSE
Loglevel that is usually not printed, reporting debugging information.
Definition logging.h:59
static const int INFO
Loglevel that is printed by default, reporting some status information.
Definition logging.h:57
static const int ALWAYS
Loglevel that is always printed.
Definition logging.h:49
#define HASSERT(x)
Checks that the value of expression x is true, else terminates the current process.
Definition macros.h:138
#define SO_RCVBUF
socket option for receive buffer size
Definition socket.h:76
uint32_t socklen_t
type of sockaddr lenth
Definition socket.h:89
#define SOL_SOCKET
socket option category
Definition socket.h:64
Holds the necessary information we need to keep in the queue about a single output entry.
Typed message class.
Definition DirectHub.hxx:93
Result of pulling an item from the queue based on priority.
Definition Queue.hxx:59
unsigned index
index of item pulled from queue
Definition Queue.hxx:85
QMember * item
item pulled from queue
Definition Queue.hxx:84
Use this class to read from an fd using select() in a state flow.
unsigned remaining_
Number of bytes still outstanding to read.
unsigned hasError_
1 if there was an error reading of writing.