35#ifndef _OPENLCB_MEMORYCONFIGSTREAM_HXX_
36#define _OPENLCB_MEMORYCONFIGSTREAM_HXX_
66 uint8_t dst_stream_id, uint32_t ofs, uint32_t len,
112 Action initiate_stream()
120 Action wait_for_started()
132 LOG(
INFO,
"failed to start stream: 0x%04x", err);
140 Action alloc_buffer()
142 return allocate_and_call<RawData>(
146 Action have_raw_buffer()
150 RawBufferPtr raw_buffer(get_allocation_result<RawData>(
nullptr));
152 sendBuffer_->data()->set_from(std::move(raw_buffer), 0);
176 MemorySpace::errorcode_t err;
185 if (err == MemoryConfigDefs::ERROR_OUT_OF_BOUNDS)
197 LOG(
INFO,
"error reading input stream: %04x", err);
204 Action wait_for_close()
230 StreamTransport *stream_transport()
286 uint8_t cmd = bytes[1];
288 switch (cmd & MemoryConfigDefs::COMMAND_MASK)
300 Action handle_read_stream()
302 size_t len =
message()->data()->payload.size();
315 size_t stream_data_offset = 6;
318 ++stream_data_offset;
320 if (len < stream_data_offset + 2)
325 uint8_t dst_stream_id = bytes[stream_data_offset + 1];
326 uint32_t num_bytes_to_read = 0xFFFFFFFFu;
327 LOG(
INFO,
"dst stream id %02x", dst_stream_id);
328 if (len >= stream_data_offset + 6)
330 memcpy(&num_bytes_to_read, bytes + stream_data_offset + 2, 4);
331 num_bytes_to_read = be32toh(num_bytes_to_read);
339 std::placeholders::_1));
351 Action read_started()
354 size_t response_data_offset = 6;
357 ++response_data_offset;
359 response_.reserve(response_data_offset + 6);
360 response_.resize(response_data_offset + 2);
362 response_bytes[0] = DATAGRAM_ID;
368 response_.resize(response_data_offset + 2);
369 response_bytes[response_data_offset] = error >> 8;
370 response_bytes[response_data_offset + 1] = error & 0xff;
374 response_.resize(response_data_offset + 2);
375 response_bytes[response_data_offset] =
377 response_bytes[response_data_offset + 1] =
379 if (
message()->data()->
payload.size() >= response_data_offset + 6)
381 response_.resize(response_data_offset + 6);
382 memcpy(response_bytes + response_data_offset + 2,
383 in_bytes() + response_data_offset + 2, 4);
386 return respond_ok(DatagramClient::REPLY_PENDING);
394 if (space_number < 0)
401 "MemoryConfig: asked node 0x%012" PRIx64
" for unknown space "
402 "%d. Source {0x%012" PRIx64
", %03x}",
403 message()->data()->dst->node_id(), space_number,
409 LOG(
WARNING,
"MemoryConfig: Global space %d rejected node.",
Pool * rawBufferPool
Use this BufferPool to allocate raw buffers.
BufferPtr< T > get_buffer_deleter(Buffer< T > *b)
Helper function to create a BufferPtr of an appropriate type without having to explicitly specify the...
Buffer< RawData > RawBuffer
Buffers of this type will be allocated from the rawBufferPool to hold the payloads of untyped data st...
BufferPtr< RawData > RawBufferPtr
Holds a raw buffer.
BufferPtr< ByteChunk > ByteBufferPtr
Buffer pointer type for references.
#define STATE(_fn)
Turns a function name into an argument to be supplied to functions expecting a state.
MessageType * alloc()
Synchronously allocates a message buffer from the pool of this flow.
Implementation of a Pool interface that takes memory from mainBufferPool (configurable) but limits th...
Return type for a state flow callback.
Use this timer class to deliver the timeout notification to a stateflow.
Base class for state machines.
Action allocate_and_call(FlowInterface< Buffer< T > > *target_flow, Callback c, Pool *pool=nullptr)
Allocates a buffer from a pool and proceed to the next state when allocation is successful.
Action delete_this()
Terminates the flow and deletes *this.
Buffer< T > * full_allocation_result(FlowInterface< Buffer< T > > *target_flow)
Takes the result of the asynchronous allocation without resetting the object.
void start_flow(Callback c)
Resets the flow to the specified state and starts it.
Action wait()
Wait for an asynchronous call.
Action again()
Call the current state again via call_immediately.
Action call_immediately(Callback c)
Imediately call the next state upon return.
Action wait_and_call(Callback c)
Wait for resource to become available before proceeding to next state.
Action sleep_and_call(::Timer *timer, long long timeout_nsec, Callback c)
Suspends execution of this control flow for a specified time.
void notify() override
Wakeup call arrived. Schedules *this on the executor.
Handler * lookup(Node *node, uint32_t id)
Finds a handler for a particular node and particular messageID.
void send(MessageType *msg, unsigned priority=UINT_MAX) OVERRIDE
Sends a message to the state flow for processing.
Action call_immediately(Callback c)
Imediately call the next state upon return.
Action respond_reject(uint16_t error_code)
Sends a DATAGRAM_REJECT response to the datagram originator node.
Action respond_ok(uint8_t flags)
Sends a DATAGRAM_OK response to the datagram originator node.
const uint8_t * payload()
StreamTransport * stream_transport()
const uint8_t * in_bytes()
int get_space_number()
Returns the memory space number, or -1 if the incoming datagram is of incorrect format.
void set_address_and_space()
Copies the address and memory space information from the incoming datagram to the outgoing datagram p...
address_t get_address()
Returns the address from the incoming datagram.
Implementation of the Memory Access Configuration Protocol for OpenLCB.
void set_stream_handler(DatagramHandlerFlow *stream_handler)
This will be called by the constructor of the stream handler plugin.
Handler for the stream read/write commands in the memory config protocol (server side).
MemorySpace * get_space()
Looks up the memory space for the current datagram.
MemoryConfigHandler * parent_
Parent object from which we are getting commands forwarded.
uint16_t streamErrorCode_
OpenLCB error code from the stream start.
void stream_start_cb(uint16_t error)
Callback from the stream flow that tells us whether the stream was successfully started (or not).
MemorySpaceStreamReadFlow * readFlow_
The flow that we created for reading the memory space into the stream.
Action entry() override
Entry into the StateFlow activity.
This is a self-owned flow which reads an memory space into a stream.
std::function< void(uint16_t)> CallbackFn
This callback function will be called with an error code, or 0 on success.
CallbackFn startedCb_
callback to invoke after start is successful.
StateFlowTimer timer_
Helper object for waiting.
NodeHandle dst_
Address to which we are sending the stream.
uint8_t get_src_stream_id()
MemorySpaceStreamReadFlow(Node *node, MemorySpace *space, NodeHandle dst, uint8_t dst_stream_id, uint32_t ofs, uint32_t len, CallbackFn started_cb)
Constructor.
uint8_t srcStreamId_
Destination stream ID on the target node.
uint32_t ofs_
Next byte to read.
uint8_t dstStreamId_
Destination stream ID on the target node.
LimitedPool sendBufferPool_
This pool is used to allocate raw buffers to read data into from the memory space.
uint32_t len_
How many bytes are left to read.
MemorySpace * space_
Memory space we are reading.
uint8_t get_dst_stream_id()
ByteBufferPtr sendBuffer_
We keep reading into this buffer from the memory space.
Node * node_
Node from which we are sending the stream.
Abstract base class for the address spaces exported via the Memory Config Protocol.
virtual size_t read(address_t source, uint8_t *dst, size_t len, errorcode_t *error, Notifiable *again)=0
virtual bool set_node(Node *node)
Specifies which node the next operation pertains.
static const errorcode_t ERROR_AGAIN
This error code signals that the operation was only partially completed, the again notify was used an...
Base class for NMRAnet nodes conforming to the asynchronous interface.
Helper class for sending stream data to a CAN interface.
void clear()
Sets the stream sender to be available for reuse after a stream has been closed or reached error.
void close_stream(uint16_t error_code=0)
Closes the stream when all the bytes are transferred.
StreamSenderState get_state()
StreamSenderCan & start_stream(Node *src, NodeHandle dst, uint8_t source_stream_id, uint8_t dst_stream_id=StreamDefs::INVALID_STREAM_ID)
Initiates using the stream sender.
uint8_t get_dst_stream_id()
@ STATE_ERROR
An error occurred.
@ RUNNING
Stream is open and data can be transferred.
@ CLOSING
Stream close message was sent.
TypedQAsync< StreamSender > * sender_allocator()
Stream sender flows.
uint8_t get_send_stream_id()
#define LOG(level, message...)
Conditionally write a message to the logging output.
static const int WARNING
Loglevel that is always printed, reporting a warning or a retryable error.
static const int INFO
Loglevel that is printed by default, reporting some status information.
#define HASSERT(x)
Checks that the value of expression x is true, else terminates the current process.
#define MSEC_TO_NSEC(_msec)
Convert a millisecond value to a nanosecond value.
@ COMMAND_READ_STREAM
command to read data using a stream
@ COMMAND_READ_STREAM_FAILED
failed to read data using a stream
@ COMMAND_READ_STREAM_REPLY
reply to read data using a stream
Container of both a NodeID and NodeAlias.
static constexpr uint8_t INVALID_STREAM_ID
This value is invalid as a source or destination stream ID.