Open Model Railroad Network (OpenMRN)
Loading...
Searching...
No Matches
MemoryConfigStream.hxx
Go to the documentation of this file.
1
35#ifndef _OPENLCB_MEMORYCONFIGSTREAM_HXX_
36#define _OPENLCB_MEMORYCONFIGSTREAM_HXX_
37
38#include "openlcb/If.hxx"
42
43namespace openlcb
44{
45
48{
49public:
52 using CallbackFn = std::function<void(uint16_t)>;
53
66 uint8_t dst_stream_id, uint32_t ofs, uint32_t len,
67 CallbackFn started_cb)
68 : StateFlowBase(node->iface())
69 , dst_(dst)
70 , startedCb_(std::move(started_cb))
71 , space_(space)
72 , dstStreamId_(dst_stream_id)
73 , node_(node)
74 , ofs_(ofs)
75 , len_(len)
76 {
77 LOG(INFO, "starting streamed read, dst %02x", dstStreamId_);
78 start_flow(STATE(alloc_stream));
79 }
80
83 {
84 return srcStreamId_;
85 }
86
89 {
90 return dstStreamId_;
91 }
92
93private:
94 Action alloc_stream()
95 {
96 return allocate_and_call(
97 STATE(got_sender), stream_transport()->sender_allocator());
98 }
99
101 {
102 LOG(INFO, "got sender");
103 sender_ =
104 full_allocation_result(stream_transport()->sender_allocator());
107 senderCan_ = static_cast<StreamSenderCan *>(sender_);
108 HASSERT(senderCan_);
109 return call_immediately(STATE(initiate_stream));
110 }
111
112 Action initiate_stream()
113 {
114 LOG(INFO, "initiate");
115 srcStreamId_ = stream_transport()->get_send_stream_id();
117 return call_immediately(STATE(wait_for_started));
118 }
119
120 Action wait_for_started()
121 {
122 auto state = senderCan_->get_state();
123 if (state == StreamSender::RUNNING)
124 {
125 dstStreamId_ = senderCan_->get_dst_stream_id();
126 startedCb_(0);
127 return call_immediately(STATE(alloc_buffer));
128 }
129 if (state == StreamSender::STATE_ERROR)
130 {
131 auto err = senderCan_->get_error();
132 LOG(INFO, "failed to start stream: 0x%04x", err);
133 startedCb_(err);
134 return call_immediately(STATE(done_stream));
135 }
136 return sleep_and_call(
137 &timer_, MSEC_TO_NSEC(3), STATE(wait_for_started));
138 }
139
140 Action alloc_buffer()
141 {
142 return allocate_and_call<RawData>(
143 nullptr, STATE(have_raw_buffer), &sendBufferPool_);
144 }
145
146 Action have_raw_buffer()
147 {
148 LOG(INFO, "have raw buf len %u", (unsigned)len_);
149
150 RawBufferPtr raw_buffer(get_allocation_result<RawData>(nullptr));
152 sendBuffer_->data()->set_from(std::move(raw_buffer), 0);
153 return call_immediately(STATE(try_read));
154 }
155
156 Action try_read()
157 {
158 if (!len_)
159 {
160 sender_->send(sendBuffer_.release());
161 senderCan_->close_stream();
162 return call_immediately(STATE(wait_for_close));
163 }
164 size_t free = sendBuffer_->data()->free_space();
165 if (!free)
166 {
167 sender_->send(sendBuffer_.release());
168 return call_immediately(STATE(alloc_buffer));
169 }
170 size_t cnt = len_;
171 if (free < cnt)
172 {
173 cnt = free;
174 }
175 uint8_t *ptr = sendBuffer_->data()->append_ptr();
176 MemorySpace::errorcode_t err;
177 size_t copied = space_->read(ofs_, ptr, cnt, &err, this);
178 sendBuffer_->data()->append_complete(copied);
179 ofs_ += copied;
180 len_ -= copied;
181 if (err == MemorySpace::ERROR_AGAIN)
182 {
183 return wait();
184 }
185 if (err == MemoryConfigDefs::ERROR_OUT_OF_BOUNDS)
186 {
187 sender_->send(sendBuffer_.release());
188 senderCan_->close_stream();
189 return call_immediately(STATE(wait_for_close));
190 }
191 if (!err)
192 {
193 return again();
194 }
195 else
196 {
197 LOG(INFO, "error reading input stream: %04x", err);
198 sender_->send(sendBuffer_.release());
199 senderCan_->close_stream(err);
200 return call_immediately(STATE(wait_for_close));
201 }
202 }
203
204 Action wait_for_close()
205 {
206 auto state = senderCan_->get_state();
207 if (state == StreamSender::CLOSING && senderCan_->is_waiting())
208 {
209 // Sender is done and empty.
210 return call_immediately(STATE(done_stream));
211 }
212 if (state == StreamSender::STATE_ERROR && senderCan_->is_waiting())
213 {
214 // Sender has errored and consumed / thrown away all data.
215 // There is no place really to show the error.
216 LOG(INFO, "Stream sender error: 0x%04x", senderCan_->get_error());
217 return call_immediately(STATE(done_stream));
218 }
219 return sleep_and_call(&timer_, MSEC_TO_NSEC(3), STATE(wait_for_close));
220 }
221
222 Action done_stream()
223 {
224 senderCan_->clear();
225 stream_transport()->sender_allocator()->typed_insert(sender_);
226 sender_ = nullptr;
227 return delete_this();
228 }
229
230 StreamTransport *stream_transport()
231 {
232 return node_->iface()->stream_transport();
233 }
234
257 uint32_t ofs_;
260 uint32_t len_;
263 StreamSenderCan *senderCan_;
264 StreamSender *sender_;
265};
266
270{
271public:
273 : MemoryConfigHandlerBase(parent->dg_service())
274 , parent_(parent)
275 {
277 }
278
279 Action entry() override
280 {
281 LOG(INFO, "stream req");
282 // The verification of the incoming data is already done by the calling
283 // MemoryConfigHandler.
284 response_.clear();
285 const uint8_t *bytes = in_bytes();
286 uint8_t cmd = bytes[1];
287
288 switch (cmd & MemoryConfigDefs::COMMAND_MASK)
289 {
291 {
292 return call_immediately(STATE(handle_read_stream));
293 }
295 }
296 return respond_reject(Defs::ERROR_UNIMPLEMENTED_SUBCMD);
297 }
298
299private:
300 Action handle_read_stream()
301 {
302 size_t len = message()->data()->payload.size();
303 const uint8_t *bytes = in_bytes();
304
305 if (len < 8)
306 {
307 return respond_reject(Defs::ERROR_INVALID_ARGS);
308 }
309 MemorySpace *space = get_space();
310 if (!space)
311 {
312 return respond_reject(MemoryConfigDefs::ERROR_SPACE_NOT_KNOWN);
313 }
314
315 size_t stream_data_offset = 6;
316 if (has_custom_space())
317 {
318 ++stream_data_offset;
319 }
320 if (len < stream_data_offset + 2)
321 {
322 return respond_reject(Defs::ERROR_INVALID_ARGS);
323 }
324
325 uint8_t dst_stream_id = bytes[stream_data_offset + 1];
326 uint32_t num_bytes_to_read = 0xFFFFFFFFu;
327 LOG(INFO, "dst stream id %02x", dst_stream_id);
328 if (len >= stream_data_offset + 6)
329 {
330 memcpy(&num_bytes_to_read, bytes + stream_data_offset + 2, 4);
331 num_bytes_to_read = be32toh(num_bytes_to_read);
332 }
333 streamErrorCode_ = Defs::ERROR_TEMPORARY;
334 // This object is self-owned, so it will run `delete this`.
335 readFlow_ = new MemorySpaceStreamReadFlow(message()->data()->dst,
336 get_space(), message()->data()->src, dst_stream_id, get_address(),
337 num_bytes_to_read,
339 std::placeholders::_1));
340 return wait_and_call(STATE(read_started));
341 }
342
345 void stream_start_cb(uint16_t error)
346 {
347 streamErrorCode_ = error;
348 notify();
349 }
350
351 Action read_started()
352 {
353 uint16_t error = streamErrorCode_;
354 size_t response_data_offset = 6;
355 if (has_custom_space())
356 {
357 ++response_data_offset;
358 }
359 response_.reserve(response_data_offset + 6);
360 response_.resize(response_data_offset + 2);
361 uint8_t *response_bytes = out_bytes();
362 response_bytes[0] = DATAGRAM_ID;
363 response_bytes[1] = error ? MemoryConfigDefs::COMMAND_READ_STREAM_FAILED
366 if (error)
367 {
368 response_.resize(response_data_offset + 2);
369 response_bytes[response_data_offset] = error >> 8;
370 response_bytes[response_data_offset + 1] = error & 0xff;
371 }
372 else
373 {
374 response_.resize(response_data_offset + 2);
375 response_bytes[response_data_offset] =
377 response_bytes[response_data_offset + 1] =
379 if (message()->data()->payload.size() >= response_data_offset + 6)
380 {
381 response_.resize(response_data_offset + 6);
382 memcpy(response_bytes + response_data_offset + 2,
383 in_bytes() + response_data_offset + 2, 4);
384 }
385 }
386 return respond_ok(DatagramClient::REPLY_PENDING);
387 }
388
392 {
393 int space_number = get_space_number();
394 if (space_number < 0)
395 return nullptr;
396 MemorySpace *space =
397 registry()->lookup(message()->data()->dst, space_number);
398 if (!space)
399 {
400 LOG(WARNING,
401 "MemoryConfig: asked node 0x%012" PRIx64 " for unknown space "
402 "%d. Source {0x%012" PRIx64 ", %03x}",
403 message()->data()->dst->node_id(), space_number,
404 message()->data()->src.id, message()->data()->src.alias);
405 return nullptr;
406 }
407 if (!space->set_node(message()->data()->dst))
408 {
409 LOG(WARNING, "MemoryConfig: Global space %d rejected node.",
410 space_number);
411 return nullptr;
412 }
413 return space;
414 }
415
416 Registry *registry()
417 {
418 return parent_->registry();
419 ;
420 }
421
424
427
428 union
429 {
433 };
434}; // class MemoryConfigStreamHandler
435
436} // namespace openlcb
437
438#endif // _OPENLCB_MEMORYCONFIGSTREAM_HXX_
Pool * rawBufferPool
Use this BufferPool to allocate raw buffers.
Definition Buffer.cxx:38
BufferPtr< T > get_buffer_deleter(Buffer< T > *b)
Helper function to create a BufferPtr of an appropriate type without having to explicitly specify the...
Definition Buffer.hxx:272
Buffer< RawData > RawBuffer
Buffers of this type will be allocated from the rawBufferPool to hold the payloads of untyped data st...
BufferPtr< RawData > RawBufferPtr
Holds a raw buffer.
BufferPtr< ByteChunk > ByteBufferPtr
Buffer pointer type for references.
#define STATE(_fn)
Turns a function name into an argument to be supplied to functions expecting a state.
Definition StateFlow.hxx:61
MessageType * alloc()
Synchronously allocates a message buffer from the pool of this flow.
Implementation of a Pool interface that takes memory from mainBufferPool (configurable) but limits th...
Return type for a state flow callback.
Use this timer class to deliver the timeout notification to a stateflow.
Base class for state machines.
Action allocate_and_call(FlowInterface< Buffer< T > > *target_flow, Callback c, Pool *pool=nullptr)
Allocates a buffer from a pool and proceed to the next state when allocation is successful.
Action delete_this()
Terminates the flow and deletes *this.
Buffer< T > * full_allocation_result(FlowInterface< Buffer< T > > *target_flow)
Takes the result of the asynchronous allocation without resetting the object.
void start_flow(Callback c)
Resets the flow to the specified state and starts it.
Action wait()
Wait for an asynchronous call.
Action again()
Call the current state again via call_immediately.
Action call_immediately(Callback c)
Imediately call the next state upon return.
Action wait_and_call(Callback c)
Wait for resource to become available before proceeding to next state.
Action sleep_and_call(::Timer *timer, long long timeout_nsec, Callback c)
Suspends execution of this control flow for a specified time.
void notify() override
Wakeup call arrived. Schedules *this on the executor.
Handler * lookup(Node *node, uint32_t id)
Finds a handler for a particular node and particular messageID.
void send(MessageType *msg, unsigned priority=UINT_MAX) OVERRIDE
Sends a message to the state flow for processing.
Action call_immediately(Callback c)
Imediately call the next state upon return.
Action respond_reject(uint16_t error_code)
Sends a DATAGRAM_REJECT response to the datagram originator node.
Action respond_ok(uint8_t flags)
Sends a DATAGRAM_OK response to the datagram originator node.
StreamTransport * stream_transport()
Definition If.hxx:345
int get_space_number()
Returns the memory space number, or -1 if the incoming datagram is of incorrect format.
void set_address_and_space()
Copies the address and memory space information from the incoming datagram to the outgoing datagram p...
address_t get_address()
Returns the address from the incoming datagram.
Implementation of the Memory Access Configuration Protocol for OpenLCB.
void set_stream_handler(DatagramHandlerFlow *stream_handler)
This will be called by the constructor of the stream handler plugin.
Handler for the stream read/write commands in the memory config protocol (server side).
MemorySpace * get_space()
Looks up the memory space for the current datagram.
MemoryConfigHandler * parent_
Parent object from which we are getting commands forwarded.
uint16_t streamErrorCode_
OpenLCB error code from the stream start.
void stream_start_cb(uint16_t error)
Callback from the stream flow that tells us whether the stream was successfully started (or not).
MemorySpaceStreamReadFlow * readFlow_
The flow that we created for reading the memory space into the stream.
Action entry() override
Entry into the StateFlow activity.
This is a self-owned flow which reads an memory space into a stream.
std::function< void(uint16_t)> CallbackFn
This callback function will be called with an error code, or 0 on success.
CallbackFn startedCb_
callback to invoke after start is successful.
StateFlowTimer timer_
Helper object for waiting.
NodeHandle dst_
Address to which we are sending the stream.
MemorySpaceStreamReadFlow(Node *node, MemorySpace *space, NodeHandle dst, uint8_t dst_stream_id, uint32_t ofs, uint32_t len, CallbackFn started_cb)
Constructor.
uint8_t srcStreamId_
Destination stream ID on the target node.
uint8_t dstStreamId_
Destination stream ID on the target node.
LimitedPool sendBufferPool_
This pool is used to allocate raw buffers to read data into from the memory space.
uint32_t len_
How many bytes are left to read.
MemorySpace * space_
Memory space we are reading.
ByteBufferPtr sendBuffer_
We keep reading into this buffer from the memory space.
Node * node_
Node from which we are sending the stream.
Abstract base class for the address spaces exported via the Memory Config Protocol.
virtual size_t read(address_t source, uint8_t *dst, size_t len, errorcode_t *error, Notifiable *again)=0
virtual bool set_node(Node *node)
Specifies which node the next operation pertains.
static const errorcode_t ERROR_AGAIN
This error code signals that the operation was only partially completed, the again notify was used an...
Base class for NMRAnet nodes conforming to the asynchronous interface.
Definition Node.hxx:52
Helper class for sending stream data to a CAN interface.
void clear()
Sets the stream sender to be available for reuse after a stream has been closed or reached error.
void close_stream(uint16_t error_code=0)
Closes the stream when all the bytes are transferred.
StreamSenderState get_state()
StreamSenderCan & start_stream(Node *src, NodeHandle dst, uint8_t source_stream_id, uint8_t dst_stream_id=StreamDefs::INVALID_STREAM_ID)
Initiates using the stream sender.
@ STATE_ERROR
An error occurred.
@ RUNNING
Stream is open and data can be transferred.
@ CLOSING
Stream close message was sent.
TypedQAsync< StreamSender > * sender_allocator()
Stream sender flows.
#define LOG(level, message...)
Conditionally write a message to the logging output.
Definition logging.h:99
static const int WARNING
Loglevel that is always printed, reporting a warning or a retryable error.
Definition logging.h:55
static const int INFO
Loglevel that is printed by default, reporting some status information.
Definition logging.h:57
#define HASSERT(x)
Checks that the value of expression x is true, else terminates the current process.
Definition macros.h:138
#define MSEC_TO_NSEC(_msec)
Convert a millisecond value to a nanosecond value.
Definition os.h:268
@ COMMAND_READ_STREAM
command to read data using a stream
@ COMMAND_READ_STREAM_FAILED
failed to read data using a stream
@ COMMAND_READ_STREAM_REPLY
reply to read data using a stream
Container of both a NodeID and NodeAlias.
static constexpr uint8_t INVALID_STREAM_ID
This value is invalid as a source or destination stream ID.