Open Model Railroad Network (OpenMRN)
Loading...
Searching...
No Matches
SyncStream.hxx
Go to the documentation of this file.
1
34#ifndef _UTILS_SYNCSTREAM_HXX_
35#define _UTILS_SYNCSTREAM_HXX_
36
37#include <memory>
38#include <stdint.h>
39#include <string.h>
40#include <unistd.h>
41
42#include "utils/macros.h"
43#include "utils/logging.h"
44
46{
47public:
48 virtual ~SyncStream()
49 {
50 }
51
59 virtual ssize_t write(const void *data, size_t len) = 0;
60
66 virtual int finalize(int status)
67 {
68 return 0;
69 }
70
73 ssize_t write_all(const void *data, size_t len)
74 {
75 auto *d = to_8(data);
76 size_t written = 0;
77 while (len > 0)
78 {
79 auto ret = write(d, len);
80 if (ret < 0)
81 {
82 return ret;
83 }
84 if (ret == 0)
85 {
86 return written;
87 }
88 written += ret;
89 d += ret;
90 len -= ret;
91 }
92 return written;
93 }
94
95protected:
97 static const uint8_t *to_8(const void *d)
98 {
99 return static_cast<const uint8_t *>(d);
100 }
101
103 static uint8_t *to_8(void *d)
104 {
105 return static_cast<uint8_t *>(d);
106 }
107};
108
114{
115public:
116 HeaderStream(void *header, size_t header_size)
117 : data_(to_8(header))
118 , remaining_(header_size)
119 {
120 }
121
122 ssize_t write(const void *data, size_t len) override
123 {
124 if (remaining_ == 0)
125 {
126 return 0;
127 }
128 if (len > remaining_)
129 {
130 len = remaining_;
131 }
132 memcpy(data_, data, len);
133 remaining_ -= len;
134 data_ += len;
135 return len;
136 }
137
138private:
140 uint8_t *data_;
143};
144
148{
149public:
150 WrappedStream(SyncStream *delegate)
151 : delegate_(delegate)
152 {
153 }
154
158 void set_delegate(SyncStream *delegate)
159 {
160 if (delegate_)
161 {
162 // TODO: discards error value.
163 delegate_->finalize(-1);
164 }
165 delegate_.reset(delegate);
166 }
167
168 int finalize(int status) override
169 {
170 int ret = 0;
171 if (delegate_)
172 {
173 ret = delegate_->finalize(status);
174 delegate_.reset();
175 }
176 return ret;
177 }
178
179protected:
181 std::unique_ptr<SyncStream> delegate_;
182};
183
187{
188public:
191 MaxLengthStream(size_t length, SyncStream *delegate)
192 : WrappedStream(delegate)
193 , remaining_(length)
194 {
195 }
196
198 LOG(INFO, "deleting maxlengthstream remaining=%d", (int) remaining_);
199 }
200
201 ssize_t write(const void *data, size_t len) override
202 {
203 if (remaining_ == 0)
204 {
205 return 0;
206 }
207 if (len > remaining_)
208 {
209 len = remaining_;
210 }
211 ssize_t ret = delegate_->write(data, len);
212 if (ret <= 0)
213 {
214 return ret;
215 }
216 remaining_ -= ret;
217 return ret;
218 }
219
220private:
223};
224
229{
230public:
232 unsigned min_write_length, uint8_t fill_byte, SyncStream *delegate)
233 : WrappedStream(delegate)
234 , buffer_(nullptr)
235 , bufLength_(0)
236 , minWriteLength_(min_write_length)
237 , fillByte_(fill_byte)
238 , needsFill_(1)
239 {
240 }
241
243 unsigned min_write_length, SyncStream *delegate)
244 : WrappedStream(delegate)
245 , buffer_(nullptr)
246 , bufLength_(0)
247 , minWriteLength_(min_write_length)
248 , needsFill_(0)
249 {
250 }
251
253 {
254 LOG(INFO, "deleting minwritestream l=%d", (int) minWriteLength_);
255 delete[] buffer_;
256 }
257
258 ssize_t write(const void *data, size_t len) override
259 {
260 if (len == 0)
261 return 0; // not sure what to do here
262 if (bufLength_)
263 {
264 // There is some data in the buffer. Try to complete and flush it.
265 HASSERT(buffer_);
266 size_t cp = len;
267 if (cp + bufLength_ > minWriteLength_)
268 {
270 }
271 memcpy(buffer_ + bufLength_, data, cp);
272 bufLength_ += cp;
274 {
275 // Flush
276 auto ret = delegate_->write_all(buffer_, bufLength_);
277 if (ret <= 0)
278 return ret;
279 HASSERT(ret == (int)bufLength_);
280 bufLength_ = 0;
281 }
282 return cp;
283 }
284 if (len < minWriteLength_)
285 {
286 // Too small write. Must copy stuff to the buffer.
287 if (!buffer_)
288 {
289 buffer_ = (uint8_t *)new uint32_t[minWriteLength_ / 4];
290 }
291 memcpy(buffer_, data, len);
292 bufLength_ = len;
293 return len;
294 }
295 auto ret = delegate_->write(data, len);
296 return ret;
297 }
298
299 int finalize(int status) override
300 {
301 if (bufLength_)
302 {
303 memset(
305 auto ret = delegate_->write(buffer_, minWriteLength_);
306 if (ret < 0)
307 return ret;
308 if (ret == 0)
309 return -1; // dropped data.
310 }
311 return delegate_->finalize(status);
312 }
313
314private:
315 uint8_t *buffer_;
317 unsigned bufLength_;
323 uint8_t fillByte_;
325 uint8_t needsFill_;
326};
327
331public:
334 StringAppendStream(std::string* output)
335 : output_(output) {}
336
337 void set_output(std::string* output) {
338 output_ = output;
339 }
340
341 ssize_t write(const void *data, size_t len) override {
342 output_->append((const char*)data, len);
343 return len;
344 }
345
346private:
347 std::string* output_;
348};
349
353{
354public:
355 ssize_t write(const void *data, size_t len) override
356 {
357 if (len == 0)
358 return 0;
359 while (true)
360 {
361 if (!delegate_)
362 {
363 return 0;
364 }
365 ssize_t ret = delegate_->write(data, len);
366 if (ret != 0)
367 {
368 // Both positive (data actually written) as well as negative
369 // (error) will be returned directly.
370 return ret;
371 }
372 on_eof();
373 }
374 }
375
376protected:
380 virtual void on_eof()
381 {
382 delegate_->finalize(0);
383 delegate_.reset();
384 }
385
389 std::unique_ptr<SyncStream> delegate_;
390};
391
392#endif // _UTILS_SYNCSTREAM_HXX_
Stream implementation that allows running a state machine of different streams, typically alternating...
ssize_t write(const void *data, size_t len) override
Main entry point to the data consumption.
virtual void on_eof()
This function will be called when the delegate returns EOF.
std::unique_ptr< SyncStream > delegate_
Stream implementation to delegate the logic to.
Stream implementation that takes a fixed number of bytes, filling in a header structure,...
ssize_t write(const void *data, size_t len) override
Main entry point to the data consumption.
uint8_t * data_
Pointer where we need to save the incoming bytes.
size_t remaining_
How many bytes we still need to save.
Stream wrapper that limits the number of bytes sent to the child stream, and reports EOF after the gi...
ssize_t write(const void *data, size_t len) override
Main entry point to the data consumption.
size_t remaining_
How many bytes we still have to write.
MaxLengthStream(size_t length, SyncStream *delegate)
Stream wrapper that contains a small internal buffer to ensure that all writes are at least a certain...
unsigned bufLength_
Number of used bytes in the buffer.
uint8_t fillByte_
What byte to append to the stream at finalize time when we still have bytes to send onwards.
unsigned minWriteLength_
Total length of the buffer.
ssize_t write(const void *data, size_t len) override
Main entry point to the data consumption.
int finalize(int status) override
Called once after all data has been written to close the stream and release resources.
uint8_t needsFill_
Whether to do fill (1: yes, 0: no).
Simple stream implementation that appends all data to a given std::string.
StringAppendStream(std::string *output)
ssize_t write(const void *data, size_t len) override
Main entry point to the data consumption.
virtual int finalize(int status)
Called once after all data has been written to close the stream and release resources.
static const uint8_t * to_8(const void *d)
Converts a void pointer to an equivalent byte pointer.
virtual ssize_t write(const void *data, size_t len)=0
Main entry point to the data consumption.
ssize_t write_all(const void *data, size_t len)
Repeatedly writes until all data has been consumed or an error occurs.
static uint8_t * to_8(void *d)
Converts a void pointer to an equivalent byte pointer.
Helper class for defining streams that forward data to another stream internally.
void set_delegate(SyncStream *delegate)
Overrides the target where to send the incoming data onwards.
int finalize(int status) override
Called once after all data has been written to close the stream and release resources.
std::unique_ptr< SyncStream > delegate_
Where to write the data to.
#define LOG(level, message...)
Conditionally write a message to the logging output.
Definition logging.h:99
static const int INFO
Loglevel that is printed by default, reporting some status information.
Definition logging.h:57
#define HASSERT(x)
Checks that the value of expression x is true, else terminates the current process.
Definition macros.h:138