Open Model Railroad Network (OpenMRN)
Loading...
Searching...
No Matches
Stream.hxx
Go to the documentation of this file.
1
34#ifndef _OPENLCB_STREAM_HXX_
35#define _OPENLCB_STREAM_HXX_
36
37#include "openlcb/NMRAnetIf.hxx"
38#include "utils/RBTree.hxx"
39#include "utils/RingBuffer.hxx"
40
41namespace openlcb
42{
43
49class Stream
50{
51public:
53 static const size_t CHANNELS_PER_NODE;
54
56 static const uint16_t MAX_BUFFER_SIZE;
57
59 typedef void *StreamHandle;
60
67 StreamHandle sopen(NodeHandle dst, long long timeout);
68
72 void sclose(StreamHandle handle);
73
80 ssize_t swrite(StreamHandle handle, const void *buf, size_t size);
81
88 ssize_t sread(StreamHandle handle, void *buf, size_t size);
89
90protected:
94 : outboundTree(),
96 count(0),
97 sem()
98 {
99 }
100
104 {
105 }
106
112 void packet(Defs::MTI mti, NodeHandle src, Buffer *data);
113
114private:
117 enum Flag
118 {
120 ACCEPT = 0x80
121 };
122
137
138 enum Protocol
139 {
140 DISCOVER_1 = 0x0600000000000000,
141 DISCOVER_2 = 0x0500000000000000,
142 DISCOVER_3 = 0x0400000000000000,
143 DISCOVER_4 = 0x0300000000000000,
144 DISCOVER_5 = 0x0200000000000000,
145 DISCOVER_6 = 0x0100000000000000,
146 DISCOVERED = 0x0000000000000000,
147 DISCOVERED_DEC = 0x0100000000000000,
148 DISCOVER_MASK = 0xFF00000000000000,
149 DISCOVER_SHIFT = 56,
150 };
151
166
174 virtual int write(Defs::MTI mti, NodeHandle dst, Buffer *data) = 0;
175
179 virtual BufferQueueWait *rx_queue() = 0;
180
185 void initiate_request(NodeHandle src, Buffer *buffer);
186
191 void initiate_reply(NodeHandle src, Buffer *buffer);
192
197 void handle_data(NodeHandle src, Buffer *buffer);
198
203 void proceed(NodeHandle src, Buffer *buffer);
204
209 void complete(NodeHandle src, Buffer *buffer);
210
212 RBTree <uint8_t, Metadata*> outboundTree;
213
215 RBTree <uint8_t, Metadata*> inboundTree;
216
218 size_t count;
219
222
224};
225
228{
230};
231
232
233};
234
235#endif // _OPENLCB_STREAM_HXX_
Base class for all QMember types that hold data in an expandable format.
Definition Buffer.hxx:195
This class provides a counting semaphore API.
Definition OS.hxx:243
Implements a vanilla ring buffer.
Base service for OpenLCB streaming protocol.
Definition Stream.hxx:50
void initiate_reply(NodeHandle src, Buffer *buffer)
Handle incoming stream initiate reply messages.
Definition Stream.cxx:321
void sclose(StreamHandle handle)
Close a stream.
Definition Stream.cxx:128
~Stream()
Default destructor.
Definition Stream.hxx:103
void * StreamHandle
Stream handle type.
Definition Stream.hxx:59
ssize_t sread(StreamHandle handle, void *buf, size_t size)
Read data from a stream.
Definition Stream.cxx:220
Flag
possible flag values.
Definition Stream.hxx:118
@ ACCEPT
initiate request accepted
Definition Stream.hxx:120
@ PERMINATE_ERROR
a perminate error occured
Definition Stream.hxx:119
void initiate_request(NodeHandle src, Buffer *buffer)
Handle incoming stream initiate request messages.
Definition Stream.cxx:254
Stream()
Default constructor.
Definition Stream.hxx:93
virtual int write(Defs::MTI mti, NodeHandle dst, Buffer *data)=0
Write a message from a node.
RBTree< uint8_t, Metadata * > inboundTree
tree for keeping track of inbound streams
Definition Stream.hxx:215
void packet(Defs::MTI mti, NodeHandle src, Buffer *data)
Handle incoming stream messages.
Definition Stream.cxx:504
OSSem sem
wait for completion semaphore
Definition Stream.hxx:221
ssize_t swrite(StreamHandle handle, const void *buf, size_t size)
Write data to a stream.
Definition Stream.cxx:166
static const uint16_t MAX_BUFFER_SIZE
Max buffer size for stream transmission segments.
Definition Stream.hxx:56
RBTree< uint8_t, Metadata * > outboundTree
tree for keeping track of outbound streams
Definition Stream.hxx:212
void handle_data(NodeHandle src, Buffer *buffer)
Handle incoming stream data messages.
Definition Stream.cxx:360
virtual BufferQueueWait * rx_queue()=0
Get handle to the receive queue for incoming NMRAnet messages.
size_t count
number of active streams for this node
Definition Stream.hxx:218
void proceed(NodeHandle src, Buffer *buffer)
Handle incoming stream proceed messages.
Definition Stream.cxx:440
void complete(NodeHandle src, Buffer *buffer)
Handle incoming stream complete messages.
Definition Stream.cxx:478
StreamHandle sopen(NodeHandle dst, long long timeout)
Open a stream.
Definition Stream.cxx:48
static const size_t CHANNELS_PER_NODE
number of simultaneous open streams per virtual node instance
Definition Stream.hxx:53
State
Possible stream states.
Definition Stream.hxx:126
@ WAITING_I
waiting on a pending connection, inbound
Definition Stream.hxx:132
@ CONNECTED_I
connection made, inbound
Definition Stream.hxx:133
@ CLOSED_I
connection closed, inbound
Definition Stream.hxx:134
@ CONNECTED_O
connection made, outbound
Definition Stream.hxx:129
@ PENDING_O
connection pending, outbound
Definition Stream.hxx:127
@ WAITING_O
waiting on a pending connection, outbound
Definition Stream.hxx:128
@ CLOSED_O
connection closed, outbound
Definition Stream.hxx:130
@ PENDING_I
connection pending, inbound
Definition Stream.hxx:131
#define DISALLOW_COPY_AND_ASSIGN(TypeName)
Removes default copy-constructor and assignment added by C++.
Definition macros.h:171
MTI
Known Message type indicators.
Container of both a NodeID and NodeAlias.
Stream metadata, often cast to StreamHandle.
Definition Stream.hxx:155
uint64_t protocol
stream protocol
Definition Stream.hxx:156
State state
Stream state.
Definition Stream.hxx:162
size_t count
count received before proceed required
Definition Stream.hxx:161
uint8_t dstID
unique destination ID
Definition Stream.hxx:164
NodeHandle nodeHandle
node we are communicating with
Definition Stream.hxx:158
size_t size
max buffer size
Definition Stream.hxx:160
uint8_t srcID
unique source ID
Definition Stream.hxx:163
RingBuffer< uint8_t > * data
stream data
Definition Stream.hxx:157