Open Model Railroad Network (OpenMRN)
Loading...
Searching...
No Matches
BufferPort.hxx
Go to the documentation of this file.
1
36#ifndef _UTILS_BUFFERPORT_HXX_
37#define _UTILS_BUFFERPORT_HXX_
38
39#include "utils/LimitedPool.hxx"
40
45// Added by default on GridConnect bridges.
46class BufferPort : public HubPort
47{
48public:
58 unsigned buffer_bytes, long long delay_nsec)
60 , downstream_(downstream)
61 , delayNsec_(delay_nsec)
62 , sendBuf_(new char[buffer_bytes])
63 , bufSize_(buffer_bytes)
64 , bufEnd_(0)
65 , timerPending_(0)
66 {
68 }
69
71 {
72 delete [] sendBuf_;
73 }
74
75 bool shutdown() {
77 if (timerPending_) {
78 return false;
79 }
80 if (!is_waiting()) {
81 return false;
82 }
83 return true;
84 }
85
86private:
87 Action entry() override
88 {
89 if (!tgtBuf_)
90 {
92 config_gridconnect_bridge_max_outgoing_packets() <= 1
93 ? nullptr
94 : &outputPool_);
95 }
96 // Defines whether we should optimize the traffic and flush right now.
97 bool opt_flush = false;
98 // This code is OpenLCB-specific. It looks for a certain pattern in the
99 // output data stream, and if that pattern is found, inserts an extra
100 // flush right now, instead of waiting for the timeout to pass. The
101 // data sent on is never modified, so this is purely a performance
102 // optimization for OpenLCB.
103 if (msg().data()[0] == ':' && msg().data()[1] == 'X') {
104 if (msg().data()[3] == 'A' || msg().data()[3] == 'D')
105 {
106 // Found datagram "only" or "end" packet.
107 opt_flush = true;
108 }
109 else if (strncmp(msg().data() + 3, "9A28", 4) == 0)
110 {
111 // Found datagram acknowledge packet.
112 opt_flush = true;
113 }
114 }
115 if (opt_flush && !bufEnd_)
116 {
117 // nothing accumulated, send off directly.
119 return exit();
120 }
121 if (msg().size() < (bufSize_ - bufEnd_))
122 {
123 // Fits into the buffer.
124 memcpy(sendBuf_ + bufEnd_, msg().data(), msg().size());
125 bufEnd_ += msg().size();
126 if (opt_flush)
127 {
128 flush_buffer();
129 }
130 else if (!timerPending_)
131 {
132 timerPending_ = 1;
133 bufferTimer_.start(delayNsec_);
134 }
135 return release_and_exit();
136 }
137 else
138 {
139 flush_buffer();
140 }
141
142 if (msg().size() >= bufSize_)
143 {
144 // Cannot buffer: send off directly.
146 return exit();
147 }
148 else
149 {
150 // After flushing the buffers this will fit.
151 return again();
152 }
153 }
154
157 {
159 tgtBuf_->data()->skipMember_ = message()->data()->skipMember_;
161 }
162
166 {
167 if (!bufEnd_) return; // nothing to do
168 auto *b = tgtBuf_;
169 tgtBuf_ = nullptr;
170 b->data()->assign(sendBuf_, bufEnd_);
171 bufEnd_ = 0;
172 if (message())
173 {
174 b->set_done(message()->new_child());
175 }
176 downstream_->send(b);
177 }
178
180 void timeout()
181 {
182 timerPending_ = 0;
183 flush_buffer();
184 }
185
187 const string &msg()
188 {
189 return *message()->data();
190 }
191
194 class BufferTimer : public ::Timer
195 {
196 public:
199 : Timer(parent->service()->executor()->active_timers())
200 , parent_(parent)
201 {
202 }
203
204 long long timeout() override
205 {
206 parent_->timeout();
207 return NONE;
208 }
209
210 private:
212 } bufferTimer_{this};
213
216 LimitedPool outputPool_ {sizeof(*tgtBuf_),
217 (unsigned)config_gridconnect_bridge_max_outgoing_packets()};
223 long long delayNsec_;
225 char *sendBuf_;
227 unsigned bufSize_;
229 unsigned bufEnd_ : 24;
232 unsigned timerPending_ : 1;
233};
234
235#endif // _UTILS_BUFFERPORT_HXX_
#define STATE(_fn)
Turns a function name into an argument to be supplied to functions expecting a state.
Definition StateFlow.hxx:61
Timer that triggers the parent flow when expiring.
BufferTimer(BufferPort *parent)
Constructor.
BufferPort * parent_
what to notify upon timeout.
long long timeout() override
Clients of timer should override this function.
A wrapper class around a string-based Hub Port that buffers the outgoing bytes for a specified delay ...
HubPortInterface * downstream_
Where to send output data to.
long long delayNsec_
How long maximum we should buffer the input data.
Action entry() override
Entry into the StateFlow activity.
Action buf_alloc_done()
State when the allocation of output buffer completed.
LimitedPool outputPool_
Pool implementation that limits the number of buffers allocatable to the configuration option.
char * sendBuf_
Temporarily stores outgoing data.
Buffer< HubData > * tgtBuf_
Caches one output buffer to fill in the buffer flush method.
const string & msg()
unsigned bufSize_
How many bytes are there in the send buffer.
void timeout()
Callback from the timer.
void flush_buffer()
Sends off any data we may have accumulated in the buffer to the downstream consumer.
unsigned bufEnd_
Offset in sendBuf_ of the first unused byte.
BufferPort(Service *service, HubPortInterface *downstream, unsigned buffer_bytes, long long delay_nsec)
Constructor.
unsigned timerPending_
1 if the timer is running and there will be a timer callback coming in the future.
Base class for all QMember types that hold data in an expandable format.
Definition Buffer.hxx:195
virtual void send(MessageType *message, unsigned priority=UINT_MAX)=0
Entry point to the flow.
Implementation of a Pool interface that takes memory from mainBufferPool (configurable) but limits th...
Collection of related state machines that pend on incoming messages.
Return type for a state flow callback.
Service * service()
Return a pointer to the service I am bound to.
Action allocate_and_call(FlowInterface< Buffer< T > > *target_flow, Callback c, Pool *pool=nullptr)
Allocates a buffer from a pool and proceed to the next state when allocation is successful.
Buffer< T > * get_allocation_result(FlowInterface< Buffer< T > > *target_flow)
Takes the result of the asynchronous allocation.
Action again()
Call the current state again via call_immediately.
Action exit()
Terminates the processing of this flow.
Action release_and_exit()
Terminates the processing of the current message.
State flow with a given typed input queue.
A timer that can schedule itself to run on an executor at specified times in the future.
Definition Timer.hxx:134
@ NONE
Do not restart the timer.
Definition Timer.hxx:161
MessageType * transfer_message()
Releases ownership of the current message.
Action call_immediately(Callback c)
Imediately call the next state upon return.
#define HASSERT(x)
Checks that the value of expression x is true, else terminates the current process.
Definition macros.h:138