35#include "openlcb/NMRAnetNode.hxx"
36#include "openlcb/NMRAnetMessageID.hxx"
72 Buffer *buffer = buffer_alloc(6);
73 uint8_t *data = (uint8_t*)buffer->start();
79 data[4] = metadata->
srcID;
96 int result =
sem.timedwait(timeout);
136 Buffer *buffer = buffer_alloc(4);
137 uint8_t *data = (uint8_t*)buffer->start();
138 data[0] = metadata->
srcID;
139 data[1] = metadata->
dstID;
178 result = metadata->
data->
put((uint8_t*)buf, size);
183 if (size > (metadata->
size - metadata->
count))
185 buffer_size = metadata->
size - metadata->
count;
192 Buffer *buffer = buffer_alloc(buffer_size + 2);
193 uint8_t *data = (uint8_t*)buffer->start();
194 memcpy(data, buf, buffer_size);
195 data[buffer_size + 0] = metadata->
srcID;
196 data[buffer_size + 1] = metadata->
dstID;
197 buffer->advance(buffer_size + 2);
200 result = buffer_size;
202 metadata->
count += buffer_size;
206 result += metadata->
data->
put((uint8_t*)buf + buffer_size, size);
228 result = metadata->
data->
get((uint8_t*)buf, size);
230 if (space < metadata->size && metadata->
data->
space() >= metadata->
size)
233 Buffer *buffer = buffer_alloc(4);
234 uint8_t *data = (uint8_t*)buffer->start();
235 data[0] = metadata->
srcID;
236 data[1] = metadata->
dstID;
256 if (count < CHANNELS_PER_NODE && buffer->used() >= 6)
258 uint8_t *data = (uint8_t*)buffer->start();
260 size_t buffer_size = (data[0] << 8) + data[1];
271 metadata->
size = buffer_size;
274 metadata->
srcID = data[4];
291 data[0] = (buffer_size >> 8) & 0xFF;
292 data[1] = (buffer_size >> 0) & 0xFF;
295 data[4] = metadata->
srcID;
296 data[5] = metadata->
dstID;
310 buffer->id(ID_STREAM_NEW_CONNECTION);
323 uint8_t *data = (uint8_t*)buffer->start();
325 size_t buffer_size = (data[0] << 8) + data[1];
331 if (node->
value->size > buffer_size)
333 node->
value->size = buffer_size;
336 node->
value->dstID = data[5];
341 buffer->id(ID_STREAM_COMPLETED_CONNECTION);
363 uint8_t *data = (uint8_t*)buffer->start();
370 if (node->
value->data->items() == 0)
380 size_t len = buffer->
size() - buffer->available();
387 while ((node->
value->protocol & DISCOVER_MASK) != DISCOVERED && len > 2)
389 int shift = (node->
value->protocol >> DISCOVER_SHIFT) - 1;
390 node->
value->protocol |= (*data) << (shift * 8);
393 node->
value->protocol-= DISCOVERED_DEC;
396 node->
value->count += (len - 2);
399 node->
value->data->put(data, len - 2);
407 if (
wakeup && (node->
value->protocol & DISCOVER_MASK) == DISCOVERED)
412 buffer->id(ID_STREAM_DATA_POSTED);
418 if (node->
value->count == node->
value->size &&
422 buffer = buffer_alloc(4);
423 data = (uint8_t*)buffer->start();
424 data[0] = node->
value->srcID;
425 data[1] = node->
value->dstID;
430 node->
value->count = 0;
442 uint8_t *data = (uint8_t*)buffer->start();
446 node->
value->count = 0;
450 if (node->
value->data->items())
453 if (node->
value->data->items() < node->
value->size)
455 segment_size = node->
value->data->items();
459 segment_size = node->
value->size;
462 buffer = buffer_alloc(segment_size + 2);
463 data = (uint8_t*)buffer->start();
464 node->
value->data->get(data, segment_size);
465 data[segment_size + 0] = node->
value->srcID;
466 data[segment_size + 1] = node->
value->dstID;
467 buffer->advance(segment_size + 2);
468 node->
value->count = segment_size;
480 uint8_t *data = (uint8_t*)buffer->start();
484 if (node->
value->data->items() == 0)
488 node->
value->data->destroy();
static OSEvent wakeup
event used to wakeup select calls
Base class for all QMember types that hold data in an expandable format.
static OSMutex mutex
mutual exclusion for fileio
void unlock()
Unlock a mutex.
void post()
Post (increment) a semaphore.
size_t get(T *buf, size_t items)
remove a number of items from the buffer.
void destroy()
Destroy an existing ring buffer instance.
size_t space()
space left in buffer of buffer in number items.
size_t put(const T *buf, size_t items)
Insert a number of items to the buffer.
static RingBuffer * create(size_t size)
Factory method to create a ring buffer instance.
size_t items()
Number of items in the buffer.
void initiate_reply(NodeHandle src, Buffer *buffer)
Handle incoming stream initiate reply messages.
void sclose(StreamHandle handle)
Close a stream.
void * StreamHandle
Stream handle type.
ssize_t sread(StreamHandle handle, void *buf, size_t size)
Read data from a stream.
@ ACCEPT
initiate request accepted
void initiate_request(NodeHandle src, Buffer *buffer)
Handle incoming stream initiate request messages.
virtual int write(Defs::MTI mti, NodeHandle dst, Buffer *data)=0
Write a message from a node.
RBTree< uint8_t, Metadata * > inboundTree
tree for keeping track of inbound streams
void packet(Defs::MTI mti, NodeHandle src, Buffer *data)
Handle incoming stream messages.
OSSem sem
wait for completion semaphore
ssize_t swrite(StreamHandle handle, const void *buf, size_t size)
Write data to a stream.
static const uint16_t MAX_BUFFER_SIZE
Max buffer size for stream transmission segments.
RBTree< uint8_t, Metadata * > outboundTree
tree for keeping track of outbound streams
void handle_data(NodeHandle src, Buffer *buffer)
Handle incoming stream data messages.
virtual BufferQueueWait * rx_queue()=0
Get handle to the receive queue for incoming NMRAnet messages.
size_t count
number of active streams for this node
void proceed(NodeHandle src, Buffer *buffer)
Handle incoming stream proceed messages.
void complete(NodeHandle src, Buffer *buffer)
Handle incoming stream complete messages.
StreamHandle sopen(NodeHandle dst, long long timeout)
Open a stream.
static const size_t CHANNELS_PER_NODE
number of simultaneous open streams per virtual node instance
@ CONNECTED_I
connection made, inbound
@ CLOSED_I
connection closed, inbound
@ CONNECTED_O
connection made, outbound
@ PENDING_O
connection pending, outbound
@ WAITING_O
waiting on a pending connection, outbound
@ CLOSED_O
connection closed, outbound
#define HASSERT(x)
Checks that the value of expression x is true, else terminates the current process.
The metadata for a tree node.
Key key
key by which to sort the node
Value value
value of the node
MTI
Known Message type indicators.
@ MTI_STREAM_PROCEED
stream flow control
@ MTI_STREAM_INITIATE_REPLY
Stream initiate reply.
@ MTI_STREAM_INITIATE_REQUEST
Stream initiate request.
@ MTI_STREAM_COMPLETE
stream terminate connection
@ MTI_STREAM_DATA
stream data
Container of both a NodeID and NodeAlias.