Open Model Railroad Network (OpenMRN)
Loading...
Searching...
No Matches
Stream.cxx
Go to the documentation of this file.
1
34#include "openlcb/Stream.hxx"
35#include "openlcb/NMRAnetNode.hxx"
36#include "openlcb/NMRAnetMessageID.hxx"
37#include "utils/macros.h"
38
39namespace openlcb
40{
41
49{
50 StreamHandle handle = NULL;
51
54 {
55 Metadata* metadata = new Metadata;
57 metadata->size = MAX_BUFFER_SIZE;
58 metadata->count = 0;
59 metadata->srcID = 0;
60 metadata->nodeHandle = dst;
62
63 if (last != NULL)
64 {
65 metadata->srcID = last->key + 1;
66 while (outboundTree.find(metadata->srcID) != NULL)
67 {
68 metadata->srcID++;
69 }
70 }
71
72 Buffer *buffer = buffer_alloc(6);
73 uint8_t *data = (uint8_t*)buffer->start();
74
75 data[0] = (MAX_BUFFER_SIZE >> 8) & 0xFF;
76 data[1] = (MAX_BUFFER_SIZE >> 0) & 0xFF;
77 data[2] = 0;
78 data[3] = 0;
79 data[4] = metadata->srcID;
80 data[5] = 0;
81
82 buffer->advance(6);
83
85 new RBTree<uint8_t, Metadata*>::Node(metadata->srcID, metadata);
86
87 outboundTree.insert(node);
88
90
91 if (timeout)
92 {
93 metadata->state = WAITING_O;
94
96 int result = sem.timedwait(timeout);
98
99 if (result != 0 || metadata->state != CONNECTED_O)
100 {
101 outboundTree.remove(node);
102 metadata->data->destroy();
103 delete node;
104 delete metadata;
105 }
106 else
107 {
108 count++;
109 handle = (StreamHandle)metadata;
110 }
112 }
113 else
114 {
115 metadata->state = PENDING_O;
116 count++;
117 handle = (StreamHandle)metadata;
118 }
119 }
121
122 return handle;
123}
124
129{
130 Metadata *metadata = (Metadata*)handle;
131
133 if (metadata->data->items() == 0)
134 {
135 /* no more data left to send. Hooray! */
136 Buffer *buffer = buffer_alloc(4);
137 uint8_t *data = (uint8_t*)buffer->start();
138 data[0] = metadata->srcID;
139 data[1] = metadata->dstID;
140 data[2] = 0;
141 data[3] = 0;
142 buffer->advance(4);
143
144 write(Defs::MTI_STREAM_COMPLETE, metadata->nodeHandle, buffer);
145
146 /* no more data left in the buffer */
147 RBTree<uint8_t, Metadata*>::Node *node = outboundTree.remove(metadata->srcID);
148 metadata->data->destroy();
149 delete metadata;
150 delete node;
151 }
152 else
153 {
154 /* we have some data left to send, but mark us as closed */
155 metadata->state = CLOSED_O;
156 }
158}
159
166ssize_t Stream::swrite(StreamHandle handle, const void *buf, size_t size)
167{
168 Metadata *metadata = (Metadata*)handle;
169
170 ssize_t result = 0;
171
173 if (metadata->data->items() || metadata->count >= metadata->size)
174 {
175 /* there is already some data in flight, so lets just add to the back
176 * of the buffer.
177 */
178 result = metadata->data->put((uint8_t*)buf, size);
179 }
180 else
181 {
182 size_t buffer_size;
183 if (size > (metadata->size - metadata->count))
184 {
185 buffer_size = metadata->size - metadata->count;
186 }
187 else
188 {
189 buffer_size = size;
190 }
191
192 Buffer *buffer = buffer_alloc(buffer_size + 2);
193 uint8_t *data = (uint8_t*)buffer->start();
194 memcpy(data, buf, buffer_size);
195 data[buffer_size + 0] = metadata->srcID;
196 data[buffer_size + 1] = metadata->dstID;
197 buffer->advance(buffer_size + 2);
198 write(Defs::MTI_STREAM_DATA, metadata->nodeHandle, buffer);
199
200 result = buffer_size;
201 size -= buffer_size;
202 metadata->count += buffer_size;
203
204 if (size)
205 {
206 result += metadata->data->put((uint8_t*)buf + buffer_size, size);
207 }
208 }
210
211 return result;
212}
213
220ssize_t Stream::sread(StreamHandle handle, void *buf, size_t size)
221{
222 Metadata *metadata = (Metadata*)handle;
223
224 ssize_t result = 0;
225
227 size_t space = metadata->data->space();
228 result = metadata->data->get((uint8_t*)buf, size);
229
230 if (space < metadata->size && metadata->data->space() >= metadata->size)
231 {
232 /* proceed since we made room for the next data set */
233 Buffer *buffer = buffer_alloc(4);
234 uint8_t *data = (uint8_t*)buffer->start();
235 data[0] = metadata->srcID;
236 data[1] = metadata->dstID;
237 data[2] = 0;
238 data[3] = 0;
239 buffer->advance(4);
240
241 metadata->count = 0;
242
243 write(Defs::MTI_STREAM_PROCEED, metadata->nodeHandle, buffer);
244 }
246
247 return result;
248}
249
255{
256 if (count < CHANNELS_PER_NODE && buffer->used() >= 6)
257 {
258 uint8_t *data = (uint8_t*)buffer->start();
259
260 size_t buffer_size = (data[0] << 8) + data[1];
261
262 if (MAX_BUFFER_SIZE < buffer_size)
263 {
264 buffer_size = MAX_BUFFER_SIZE;
265 }
266
267 Metadata* metadata = new Metadata;
268 metadata->protocol = DISCOVER_1;
270 metadata->nodeHandle = src;
271 metadata->size = buffer_size;
272 metadata->count = 0;
273 metadata->state = CONNECTED_I;
274 metadata->srcID = data[4];
275 metadata->dstID = 0;
276
278
279 if (last != NULL)
280 {
281 metadata->dstID = last->key + 1;
282 while (inboundTree.find(metadata->dstID) != NULL)
283 {
284 metadata->dstID++;
285 }
286 }
287
288 /* reuse buffer to reply since we are otherwise done with it */
289 buffer->zero();
290
291 data[0] = (buffer_size >> 8) & 0xFF;
292 data[1] = (buffer_size >> 0) & 0xFF;
293 data[2] = ACCEPT;
294 data[3] = 0;
295 data[4] = metadata->srcID;
296 data[5] = metadata->dstID;
297
298 buffer->advance(6);
299
301
303 new RBTree<uint8_t, Metadata*>::Node(metadata->dstID, metadata);
304
305 inboundTree.insert(node);
306
307 buffer = buffer_alloc(sizeof(IdStreamType));
308 IdStreamType* type = (IdStreamType*)buffer->start();
309 type->stream = (StreamHandle)node->value;
310 buffer->id(ID_STREAM_NEW_CONNECTION);
311 buffer->advance(sizeof(IdStreamType));
312
313 rx_queue()->insert(buffer);
314 }
315}
316
322{
323 uint8_t *data = (uint8_t*)buffer->start();
324
325 size_t buffer_size = (data[0] << 8) + data[1];
326
328
329 HASSERT(node);
330
331 if (node->value->size > buffer_size)
332 {
333 node->value->size = buffer_size;
334 }
335
336 node->value->dstID = data[5];
337
338 buffer = buffer_alloc(sizeof(IdStreamType));
339 IdStreamType* type = (IdStreamType*)buffer->start();
340 type->stream = (StreamHandle)node->value;
341 buffer->id(ID_STREAM_COMPLETED_CONNECTION);
342 buffer->advance(sizeof(IdStreamType));
343
344 if (node->value->state == WAITING_O)
345 {
346 node->value->state = CONNECTED_O;
347 sem.post();
348 }
349 else
350 {
351 node->value->state = CONNECTED_O;
352 rx_queue()->insert(buffer);
353 }
354}
355
361{
362 bool wakeup;
363 uint8_t *data = (uint8_t*)buffer->start();
364
365 RBTree<uint8_t, Metadata*>::Node *node = inboundTree.find(data[buffer->used() - 1]);
366
367 /* we only wakeup if starting from zero, otherwise, the node already knows
368 * there is data available, so no need to re-notify it.
369 */
370 if (node->value->data->items() == 0)
371 {
372 wakeup = true;
373 }
374 else
375 {
376 wakeup = false;
377 }
378
379 /* last two bytes represent source and destination ID */
380 size_t len = buffer->size() - buffer->available();
381
382 /* some sanity tests that will help us debug this */
383 HASSERT(node->value->state == CONNECTED_I);
384 HASSERT(len > 2);
385 HASSERT(node->value->data->space() >= (len - 2));
386
387 while ((node->value->protocol & DISCOVER_MASK) != DISCOVERED && len > 2)
388 {
389 int shift = (node->value->protocol >> DISCOVER_SHIFT) - 1;
390 node->value->protocol |= (*data) << (shift * 8);
391 data++;
392 len--;
393 node->value->protocol-= DISCOVERED_DEC;
394 }
395
396 node->value->count += (len - 2);
397 HASSERT(node->value->count <= node->value->size);
398
399 node->value->data->put(data, len - 2);
400
401 /* release buffer, we are done with it */
402 buffer->free();
403
404 /* this operation must come before proceed because proceed will release the
405 * critical Node::mutex and cause a race condition.
406 */
407 if (wakeup && (node->value->protocol & DISCOVER_MASK) == DISCOVERED)
408 {
409 buffer = buffer_alloc(sizeof(IdStreamType));
410 IdStreamType* type = (IdStreamType*)buffer->start();
411 type->stream = (StreamHandle)node->value;
412 buffer->id(ID_STREAM_DATA_POSTED);
413 buffer->advance(sizeof(IdStreamType));
414
415 rx_queue()->insert(buffer);
416 }
417
418 if (node->value->count == node->value->size &&
419 node->value->data->space() >= MAX_BUFFER_SIZE)
420 {
421 /* proceed since we already have room for the next data set */
422 buffer = buffer_alloc(4);
423 data = (uint8_t*)buffer->start();
424 data[0] = node->value->srcID;
425 data[1] = node->value->dstID;
426 data[2] = 0;
427 data[3] = 0;
428 buffer->advance(4);
429
430 node->value->count = 0;
431
432 write(Defs::MTI_STREAM_PROCEED, src, buffer);
433 }
434}
435
441{
442 uint8_t *data = (uint8_t*)buffer->start();
443
445
446 node->value->count = 0;
447
448 buffer->free();
449
450 if (node->value->data->items())
451 {
452 size_t segment_size;
453 if (node->value->data->items() < node->value->size)
454 {
455 segment_size = node->value->data->items();
456 }
457 else
458 {
459 segment_size = node->value->size;
460 }
461
462 buffer = buffer_alloc(segment_size + 2);
463 data = (uint8_t*)buffer->start();
464 node->value->data->get(data, segment_size);
465 data[segment_size + 0] = node->value->srcID;
466 data[segment_size + 1] = node->value->dstID;
467 buffer->advance(segment_size + 2);
468 node->value->count = segment_size;
469
470 write(Defs::MTI_STREAM_DATA, src, buffer);
471 }
472}
473
479{
480 uint8_t *data = (uint8_t*)buffer->start();
481
482 RBTree<uint8_t, Metadata*>::Node *node = inboundTree.find(data[2]);
483
484 if (node->value->data->items() == 0)
485 {
486 /* no more data left in the buffer */
487 outboundTree.remove(node);
488 node->value->data->destroy();
489 delete node->value;
490 delete node;
491 }
492 else
493 {
494 /* we have some data left, give the application a chance to get it */
495 node->value->state = CLOSED_I;
496 }
497}
498
505{
506 switch (mti)
507 {
508 default:
509 HASSERT(0);
510 break;
512 initiate_request(src, data);
513 break;
515 initiate_reply(src, data);
516 break;
518 handle_data(src, data);
519 break;
521 proceed(src, data);
522 break;
524 complete(src, data);
525 break;
526 }
527}
528
529};
530
static OSEvent wakeup
event used to wakeup select calls
Definition Select.cxx:40
size_t size()
Definition Buffer.hxx:121
Base class for all QMember types that hold data in an expandable format.
Definition Buffer.hxx:195
static OSMutex mutex
mutual exclusion for fileio
Definition Devtab.hxx:264
void lock()
Lock a mutex.
Definition OS.hxx:446
void unlock()
Unlock a mutex.
Definition OS.hxx:453
void post()
Post (increment) a semaphore.
Definition OS.hxx:260
size_t get(T *buf, size_t items)
remove a number of items from the buffer.
void destroy()
Destroy an existing ring buffer instance.
size_t space()
space left in buffer of buffer in number items.
size_t put(const T *buf, size_t items)
Insert a number of items to the buffer.
static RingBuffer * create(size_t size)
Factory method to create a ring buffer instance.
size_t items()
Number of items in the buffer.
void initiate_reply(NodeHandle src, Buffer *buffer)
Handle incoming stream initiate reply messages.
Definition Stream.cxx:321
void sclose(StreamHandle handle)
Close a stream.
Definition Stream.cxx:128
void * StreamHandle
Stream handle type.
Definition Stream.hxx:59
ssize_t sread(StreamHandle handle, void *buf, size_t size)
Read data from a stream.
Definition Stream.cxx:220
@ ACCEPT
initiate request accepted
Definition Stream.hxx:120
void initiate_request(NodeHandle src, Buffer *buffer)
Handle incoming stream initiate request messages.
Definition Stream.cxx:254
virtual int write(Defs::MTI mti, NodeHandle dst, Buffer *data)=0
Write a message from a node.
RBTree< uint8_t, Metadata * > inboundTree
tree for keeping track of inbound streams
Definition Stream.hxx:215
void packet(Defs::MTI mti, NodeHandle src, Buffer *data)
Handle incoming stream messages.
Definition Stream.cxx:504
OSSem sem
wait for completion semaphore
Definition Stream.hxx:221
ssize_t swrite(StreamHandle handle, const void *buf, size_t size)
Write data to a stream.
Definition Stream.cxx:166
static const uint16_t MAX_BUFFER_SIZE
Max buffer size for stream transmission segments.
Definition Stream.hxx:56
RBTree< uint8_t, Metadata * > outboundTree
tree for keeping track of outbound streams
Definition Stream.hxx:212
void handle_data(NodeHandle src, Buffer *buffer)
Handle incoming stream data messages.
Definition Stream.cxx:360
virtual BufferQueueWait * rx_queue()=0
Get handle to the receive queue for incoming NMRAnet messages.
size_t count
number of active streams for this node
Definition Stream.hxx:218
void proceed(NodeHandle src, Buffer *buffer)
Handle incoming stream proceed messages.
Definition Stream.cxx:440
void complete(NodeHandle src, Buffer *buffer)
Handle incoming stream complete messages.
Definition Stream.cxx:478
StreamHandle sopen(NodeHandle dst, long long timeout)
Open a stream.
Definition Stream.cxx:48
static const size_t CHANNELS_PER_NODE
number of simultaneous open streams per virtual node instance
Definition Stream.hxx:53
@ CONNECTED_I
connection made, inbound
Definition Stream.hxx:133
@ CLOSED_I
connection closed, inbound
Definition Stream.hxx:134
@ CONNECTED_O
connection made, outbound
Definition Stream.hxx:129
@ PENDING_O
connection pending, outbound
Definition Stream.hxx:127
@ WAITING_O
waiting on a pending connection, outbound
Definition Stream.hxx:128
@ CLOSED_O
connection closed, outbound
Definition Stream.hxx:130
#define HASSERT(x)
Checks that the value of expression x is true, else terminates the current process.
Definition macros.h:138
The metadata for a tree node.
Definition RBTree.hxx:82
Key key
key by which to sort the node
Definition RBTree.hxx:86
Value value
value of the node
Definition RBTree.hxx:87
MTI
Known Message type indicators.
@ MTI_STREAM_PROCEED
stream flow control
@ MTI_STREAM_INITIATE_REPLY
Stream initiate reply.
@ MTI_STREAM_INITIATE_REQUEST
Stream initiate request.
@ MTI_STREAM_COMPLETE
stream terminate connection
@ MTI_STREAM_DATA
stream data
Container of both a NodeID and NodeAlias.
Stream metadata, often cast to StreamHandle.
Definition Stream.hxx:155
uint64_t protocol
stream protocol
Definition Stream.hxx:156
State state
Stream state.
Definition Stream.hxx:162
size_t count
count received before proceed required
Definition Stream.hxx:161
uint8_t dstID
unique destination ID
Definition Stream.hxx:164
NodeHandle nodeHandle
node we are communicating with
Definition Stream.hxx:158
size_t size
max buffer size
Definition Stream.hxx:160
uint8_t srcID
unique source ID
Definition Stream.hxx:163
RingBuffer< uint8_t > * data
stream data
Definition Stream.hxx:157