57 config_directhub_port_incoming_buffer_size());
206 unsigned next_port = 0;
213 if (next_port >=
ports_.size())
271 std::unique_ptr<MessageSegmenter> segmenter)
338 LOG(
VERBOSE,
"read flow %p (fd %d): notif %p alloc() %u",
this,
358 return do_some_read();
493 return do_some_read();
517 (unsigned)config_directhub_port_max_incoming_packets()};
531 std::unique_ptr<MessageSegmenter> segmenter,
534 , readFlow_(this, std::move(segmenter))
542 unsigned long par = 1;
543 ioctlsocket(
fd_, FIONBIO, &par);
545 ::fcntl(fd, F_SETFL, O_RDWR | O_NONBLOCK);
584 b->
data()->buf_.reset(msg->buf_);
587 b->
set_done(msg->done_->new_child());
659 return check_for_new_message();
681 LOG(
INFO,
"%p: Error writing to fd %d: (%d) %s",
this,
fd_, errno,
687 return check_for_new_message();
693 return check_for_new_message();
696 Action check_for_new_message()
738 std::swap(
fd_, close_fd);
759 std::swap(
fd_, close_fd);
881 std::unique_ptr<MessageSegmenter> segmenter,
Notifiable *on_error)
883 g_last_direct_hub_port =
925 LOG(
ALWAYS,
"Socket rcvbuf %u", (
unsigned)rcvbuf);
940DirectGcTcpHub::~DirectGcTcpHub()
DynamicPool * mainBufferPool
main buffer pool instance
AutoReleaseBuffer< T > BufferPtr
Smart pointer for buffers.
DirectHubInterface< uint8_t[]> * create_hub(ExecutorBase *e)
Temporary function to instantiate the hub.
DataBufferPool g_direct_hub_kbyte_pool(1024)
This object forwards allocations to mainBufferPool.
void create_port_for_fd(DirectHubInterface< uint8_t[]> *hub, int fd, std::unique_ptr< MessageSegmenter > segmenter, Notifiable *on_error)
Creates a hub port of byte stream type reading/writing a given fd.
DataBufferPool g_direct_hub_data_pool(config_directhub_port_incoming_buffer_size())
This object forwards allocations to mainBufferPool.
void create_direct_gc_tcp_hub(DirectHubInterface< uint8_t[]> *hub, int port)
Creates a new GridConnect listener on a given TCP port.
MessageSegmenter * create_gc_message_segmenter()
Creates a message segmenter for gridconnect data.
int fcntl(int fd, int cmd,...)
Manipulate a file descriptor.
int bind(int socket, const struct sockaddr *address, socklen_t address_len)
Bind a name to a socket.
int getsockopt(int socket, int level, int option_name, void *option_value, socklen_t *option_len)
Get the socket options.
#define STATE(_fn)
Turns a function name into an argument to be supplied to functions expecting a state.
A block of BarrierNotifiable objects, with an asynchronous allocation call.
BarrierNotifiable * initialize(QMember *entry)
Turns an allocated entry from the QAsync into a usable BarrierNotifiable.
See OSMutexLock in os/OS.hxx.
Lightweight locking class for protecting small critical sections.
A BarrierNotifiable allows to create a number of child Notifiable and wait for all of them to finish.
BarrierNotifiable * new_child()
Creates a new child notifiable of the current done notifiable.
void set_done(BarrierNotifiable *done)
Specifies that a given BarrierNotifiable must be called when the Buffer is deallocated (unreffed to z...
Base class for all QMember types that hold data in an expandable format.
void unref()
Decrement count.
T * data()
get a pointer to the start of the data.
A notifiable class that calls a particular function object once when it is invoked,...
Proxy Pool that can allocate DataBuffer objects of a certain size.
void alloc(DataBuffer **result)
Get a free item out of the pool with untyped data of the size specified in the constructor.
Specialization of the Buffer class that is designed for storing untyped data arrays.
DataBuffer * get_read_pointer(unsigned skip, uint8_t **ptr, unsigned *available)
Helper function to read out data from a linked data buffer.
void OnNewConnection(int fd)
Callback when a new connection arrives.
DirectGcTcpHub(DirectHubInterface< uint8_t[]> *gc_hub, int port)
Constructor.
DirectHubInterface< uint8_t[]> * gcHub_
Direct GridConnect hub.
SocketListener tcpListener_
Helper object representing the listening on the socket.
void do_send() override
Sends a message to the hub.
void enqueue_send(Executable *caller) override
Signals that the caller wants to send a message to the hub.
MessageAccessor< T > msg_
The message we are trying to send.
void register_port(DirectHubPort< T > *port) override
Adds a port to this hub.
void unregister_port(DirectHubPort< T > *port, Notifiable *done) override
Removes a port from this hub.
MessageAccessor< T > * mutable_message() override
Accessor to fill in the message payload.
bool should_send_to(DirectHubPort< T > *p)
Filters a message going towards a specific output port.
void unregister_port(DirectHubPort< T > *port) override
Synchronously unregisters a port.
std::vector< DirectHubPort< T > * > ports_
Stores the registered output ports. Protected by Atomic *this.
Service * get_service() override
Interface for a the central part of a hub.
virtual MessageAccessor< T > * mutable_message()=0
Accessor to fill in the message payload.
virtual void register_port(DirectHubPort< T > *port)=0
Adds a port to this hub.
virtual void enqueue_send(Executable *caller)=0
Signals that the caller wants to send a message to the hub.
virtual void unregister_port(DirectHubPort< T > *port)=0
Synchronously removes a port from this hub.
virtual void do_send()=0
Sends a message to the hub.
State flow that reads the FD and sends the read data to the direct hub.
uint16_t inlineCall_
1 if we got the send callback inline from the read_done.
AsyncNotifiableBlock pendingLimiterPool_
Pool of BarrierNotifiables that limit the amount of inflight bytes we have.
Action send_prefix()
Called to send a given prefix segment to the hub.
void read_shutdown()
Requests the read port to shut down.
Action barrier_allocated()
Intermediate step if asynchronous allocation was necessary for the read barrier.
DirectHubPortSelect * parent_
Pointer to the owninng port.
std::unique_ptr< MessageSegmenter > segmenter_
Implementation (and state) of the business logic that segments incoming bytes into messages that shal...
uint16_t sendComplete_
1 if the run callback actually happened inline.
Action send_callback()
This is the callback state that is invoked inline by the hub.
ssize_t segmentSize_
Output of the last segmenter call.
BarrierNotifiable * bufferNotifiable_
Barrier notifiable to keep track of the buffer's contents.
void start()
Starts the current flow.
Action incomplete_message()
Called when the segmenter says that we need to read more bytes to complete the current message.
StateFlowSelectHelper helper_
Helper object for Select.
Action alloc_for_read()
Root of the read flow.
LinkedDataBufferPtr buf_
Current buffer that we are filling.
Action call_head_segmenter()
Clears the segmenter and starts segmenting from the beginning of the buf_.
Action get_read_buffer()
Invoked when we have a bufferNotifiable_ from the barrier pool.
Action eval_segment()
Checks the segmenter output; if it indicates a complete message, clears the segmenter and sends off t...
Connects a (bytes typed) hub to an FD.
void flow_exit(bool read)
Marks a flow to be exited, and once both are exited, notifies done and deletes this.
void send(MessageAccessor< uint8_t[]> *msg) override
Synchronous output routine called by the hub.
OutputDataEntry * pendingTail_
Last tail pointer in the pendingQueue.
uint8_t notRunning_
1 if the state flow is paused, waiting for the notification.
StateFlowSelectHelper selectHelper_
Helper object for performing asynchronous writes.
unsigned nextToSkip_
Skip_ parameter matching nextToWrite_;.
uint8_t writeFlowPending_
1 if the write flow is still running.
Action report_and_exit()
Terminates the flow, reporting to the barrier.
QueueType pendingQueue_
Time when the last buffer flush has happened. Not used yet.
Notifiable * onError_
This notifiable will be called before exiting.
void shutdown()
Called on the main executor when a read error wants to cancel the write flow.
int fd_
File descriptor for input/output.
void write_flow_exit()
Marks the write flow as exited. May delete this.
uint8_t readFlowPending_
1 if the read flow is still running.
DirectHubInterface< uint8_t[]> * hub_
Parent hub where output data is coming from.
void report_read_error()
Callback from the ReadFlow when the read call has seen an error.
void report_write_error()
Called by the write flow when it sees an error.
Q QueueType
Type of the queue used to keep the output buffer queue.
unsigned nextToSize_
Size_ parameter matching nextToWrite_;.
void read_flow_exit()
Callback from the read flow that it has exited.
size_t totalPendingSize_
Total numberof bytes in the pendingQueue.
Buffer< OutputDataEntry > BufferType
Type of buffers we are enqueuing for output.
DataBuffer * nextToWrite_
Data we are currently writing to a buffer.
size_t totalWritten_
total number of bytes written to the port.
BufferPtr< OutputDataEntry > currentHead_
The buffer that is taken out of the queue while flushing.
Interface for a downstream port of a hub (aka a target to send data to).
virtual void send(MessageAccessor< T > *msg)=0
Send some data out on this port.
A single service class that is shared between all interconnected DirectHub instances.
void enqueue_caller(Executable *caller)
Adds a caller to the waiting list of who wants to send traffic to the hub.
void on_done()
This function must be called at the end of the enqueued functions in order to properly clear the busy...
QueueType pendingSend_
List of callers that are waiting for the busy_ lock.
unsigned busy_
1 if there is any message being processed right now.
An object that can be scheduled on an executor to run.
virtual void run()=0
Entry point.
This class implements an execution of tasks pulled off an input queue.
virtual void add(Executable *action, unsigned priority=UINT_MAX)=0
Send a message to this Executor's queue.
void unselect(Selectable *job)
Removes a job from the select loop.
Empty class that can be used as a pointer for identifying where a piece of data came from.
A class that keeps ownership of a chain of linked DataBuffer references.
bool try_append_from(const LinkedDataBufferPtr &o, bool add_link=false)
Attempt to combine *this with o into a single LinkedDataBufferPtr this.
uint8_t * data_write_pointer()
DataBuffer * tail() const
void reset(const LinkedDataBufferPtr &o, ssize_t size=-1)
Takes a reference of o, taking a prefix of len size (or all the data).
LinkedDataBufferPtr transfer_head(size_t len)
Transfers the ownership of the prefix of this buffer.
void append_empty_buffer(DataBuffer *buf)
Adds an empty buffer to the end of this buffer chain.
DataBuffer * head() const
void data_write_advance(size_t len)
Advances the tail pointer after a write occurred into the tail.
An object that can schedule itself on an executor to run.
virtual void notify()=0
Generic callback.
void alloc(Buffer< BufferType > **result, Executable *flow=NULL)
Get a free item out of the pool.
void next_async(Executable *flow)
Get an item from the front of the queue.
QMember * next(unsigned index)
Get an item from the front of the queue.
Essentially a "next" pointer container.
This class implements a linked list "queue" of buffers.
void insert_locked(QMember *item, unsigned index=0)
Add an item to the back of the queue.
Result next_locked()
Get an item from the front of the queue.
bool empty(unsigned index)
Test if the queue is empty.
QMember * next(unsigned index)
Get an item from the front of the queue.
Collection of related state machines that pend on incoming messages.
ExecutorBase * executor()
void shutdown()
Shuts down the socket listener.
Return type for a state flow callback.
Base class for state machines.
Action yield_and_call(Callback c)
Place the current flow to the back of the executor, and transition to a new state after we get the CP...
Action read_single(StateFlowSelectHelper *helper, int fd, void *buf, size_t size, Callback c, unsigned priority=Selectable::MAX_PRIO)
Attempts to read at most size_t bytes, and blocks the caller until at least one byte is read.
Service * service()
Return a pointer to the service I am bound to.
void notify() override
Wakeup call arrived.
StateFlowBase()
Default constructor.
void start_flow(Callback c)
Resets the flow to the specified state and starts it.
Action wait()
Wait for an asynchronous call.
Action call_immediately(Callback c)
Imediately call the next state upon return.
Action set_terminated()
Sets the flow to terminated state.
Action wait_and_call(Callback c)
Wait for resource to become available before proceeding to next state.
void cast_allocation_result(T **member)
Takes the result of the asynchronous allocation without resetting the object.
Action write_repeated(StateFlowSelectHelper *helper, int fd, const void *buf, size_t size, Callback c, unsigned priority=Selectable::MAX_PRIO)
Writes some data into a file descriptor, repeating the operation as necessary until all bytes are wri...
A Notifiable for synchronously waiting for a notification.
void wait_for_notification()
Blocks the current thread until the notification is delivered.
#define LOG(level, message...)
Conditionally write a message to the logging output.
static const int VERBOSE
Loglevel that is usually not printed, reporting debugging information.
static const int INFO
Loglevel that is printed by default, reporting some status information.
static const int ALWAYS
Loglevel that is always printed.
#define HASSERT(x)
Checks that the value of expression x is true, else terminates the current process.
#define SO_RCVBUF
socket option for receive buffer size
uint32_t socklen_t
type of sockaddr lenth
#define SOL_SOCKET
socket option category
Holds the necessary information we need to keep in the queue about a single output entry.
Result of pulling an item from the queue based on priority.
unsigned index
index of item pulled from queue
QMember * item
item pulled from queue
Use this class to read from an fd using select() in a state flow.
unsigned remaining_
Number of bytes still outstanding to read.
unsigned hasError_
1 if there was an error reading of writing.