Open Model Railroad Network (OpenMRN)
Loading...
Searching...
No Matches
DataBuffer.hxx
Go to the documentation of this file.
1
36#ifndef _UTILS_DATABUFFER_HXX_
37#define _UTILS_DATABUFFER_HXX_
38
39#include "utils/Buffer.hxx"
41#include "utils/macros.h"
42
43#ifdef GTEST
44// #define DEBUG_DATA_BUFFER_FREE
45#endif
46
47class DataBufferPool;
48
49#ifdef DEBUG_DATA_BUFFER_FREE
50class DataBuffer;
51static void check_db_ownership(DataBuffer *p);
52#endif
53
58class DataBuffer : public Buffer<uint8_t[]>
59{
60public:
65 void set_size(uint16_t s)
66 {
67 size_ = s;
68 }
69
73 {
74 QMember::next = n;
75 }
76
79 {
80 return static_cast<DataBuffer *>(QMember::next);
81 }
82
84 uint8_t *data()
85 {
86 return static_cast<uint8_t *>(&data_[0]);
87 }
88
91 {
92 return static_cast<DataBuffer *>(Buffer<uint8_t[]>::ref());
93 }
94
102 DataBuffer *ref_all(unsigned total_size, DataBuffer **tail = nullptr,
103 unsigned *tail_size = nullptr)
104 {
105 DataBuffer *curr = this;
106 do
107 {
108 HASSERT(curr);
109 curr->ref();
110 if (total_size > curr->size())
111 {
112 total_size -= curr->size();
113 }
114 else
115 {
116 if (tail)
117 {
118 *tail = curr;
119 }
120 if (tail_size)
121 {
122 *tail_size = total_size;
123 }
124 total_size = 0;
125 }
126 curr = curr->next();
127 } while (total_size > 0);
128 return this;
129 }
130
136 void unref_all(unsigned total_size)
137 {
138 DataBuffer *curr = this;
139 while (true)
140 {
141 HASSERT(curr);
142 if (total_size > curr->size())
143 {
144 DataBuffer *next = curr->next();
145 total_size -= curr->size();
146 curr->unref();
147 curr = next;
148 }
149 else
150 {
151 curr->unref();
152 break;
153 }
154 }
155 }
156
166 unsigned skip, uint8_t **ptr, unsigned *available)
167 {
168 DataBuffer *curr = this;
169 while (curr->size() <= skip)
170 {
171 skip -= curr->size();
172 curr = curr->next();
173 HASSERT(curr);
174 }
175 *ptr = curr->data() + skip;
176 *available = curr->size() - skip;
177 return curr->next();
178 }
179
180#ifdef DEBUG_DATA_BUFFER_FREE
181 void unref()
182 {
183 if (references() == 1)
184 {
185 check_db_ownership(this);
186 }
188 }
189#endif
190
191private:
192 friend class DataBufferPool;
193
195 : Buffer<uint8_t[]>((Pool *)p)
196 {
197 }
198}; // class DataBuffer
199
200using DataBufferPtr = std::unique_ptr<DataBuffer, BufferDelete<uint8_t[]>>;
201
203
206#ifdef DEBUG_DATA_BUFFER_FREE
207 : public LinkedObject<LinkedDataBufferPtr>
208#endif
209{
210public:
212 {
213 }
214
216 {
217 reset();
218 }
219
222 : head_(o.head_)
223 , tail_(o.tail_)
224 , size_(o.size_)
225 , skip_(o.skip_)
226 , free_(o.free_)
227 {
228 o.clear();
229 }
230
234 {
235 reset();
236 head_ = o.head_;
237 tail_ = o.tail_;
238 size_ = o.size_;
239 skip_ = o.skip_;
240 free_ = o.free_;
241 o.clear();
242 }
243
247 void operator=(const LinkedDataBufferPtr &) = delete;
248
255 void reset(const LinkedDataBufferPtr &o, ssize_t size = -1)
256 {
257 reset();
258 if (size < 0)
259 {
260 size = o.size_;
261 }
262 if ((size_t)size > o.size_)
263 {
264 size = o.size_;
265 }
266 if (!size)
267 {
268 // Nothing to copy, this will be an empty buffer.
269 return;
270 }
271 skip_ = o.skip_;
272 size_ = size;
273 // Takes references, keeping the tail and tail size.
274 unsigned tail_size = 0;
275 head_ = o.head_->ref_all(o.skip_ + size, &tail_, &tail_size);
276 HASSERT(tail_size > 0);
277 free_ = -tail_size;
278 }
279
283 void reset(DataBuffer *buf)
284 {
285 reset();
286 head_ = tail_ = buf;
287 skip_ = 0;
288 free_ = buf->size();
289 size_ = 0;
290 buf->set_size(0);
291 }
292
298 void reset(DataBuffer *buf, unsigned skip, unsigned size)
299 {
300 reset();
301 head_ = buf;
302 skip_ = skip;
303 size_ = size;
304 free_ = -int(skip + size);
305 tail_ = buf;
306 }
307
312 {
313 if (!head_)
314 {
315 reset(buf);
316 return;
317 }
318 HASSERT(free_ >= 0); // appendable
319 HASSERT(tail_);
320 // Note: if free_ was > 0, there were some unused bytes in the tail
321 // buffer. However, as part of the append operation, we lose these
322 // bytes as capacity. The new free part will be only in the newly
323 // appended tail_ buffer. This is because free_ can never span more
324 // than one buffer.
325 free_ = buf->size();
326 buf->set_size(0);
327 HASSERT(!tail_->next());
328 tail_->set_next(buf);
329 tail_ = buf;
330 }
331
333 void reset()
334 {
335 if (head_)
336 {
337 auto *h = head_;
338 size_t len = size_ + skip_;
339 clear();
340 h->unref_all(len);
341 return;
342 }
343 else
344 {
345 clear();
346 }
347 }
348
352 {
353 if (!tail_)
354 {
355 return nullptr;
356 }
357 return tail_->data() + tail_->size();
358 }
359
363 void data_write_advance(size_t len)
364 {
365 HASSERT(free_ >= 0 && ((int)len <= free_));
366 free_ -= len;
367 tail_->set_size(tail_->size() + len);
368 size_ += len;
369 }
370
376 const uint8_t *data_read_pointer(size_t *len)
377 {
378 if (!head_ || !size_)
379 {
380 *len = 0;
381 return nullptr;
382 }
383 unsigned avail = 0;
384 uint8_t *p;
385 head_->get_read_pointer(skip_, &p, &avail);
386 if (avail > size_)
387 {
388 avail = size_;
389 }
390 *len = avail;
391 return p;
392 }
393
397 void data_read_advance(size_t len)
398 {
399 HASSERT(len <= size());
400 skip_ += len;
401 size_ -= len;
402 while (head_ && skip_ >= head_->size())
403 {
404 if (head_ == tail_)
405 {
406 if (free() > 0)
407 {
408 // We can still write into this buffer, do not unref it.
409 break;
410 }
411 else
412 {
413 // We're ending up with an empty linkedbuffer.
414 auto *b = head_;
415 clear();
416 b->unref();
417 return;
418 }
419 }
420 skip_ -= head_->size();
421 auto *b = head_;
422 auto *next_head = head_->next();
423 head_ = next_head;
424 if (!head_)
425 {
426 tail_ = nullptr;
427 }
428 b->unref();
429 }
430 }
431
434 {
435 return head_;
436 }
437
440 {
441 return tail_;
442 }
443
445 unsigned skip() const
446 {
447 return skip_;
448 }
449
451 unsigned size() const
452 {
453 return size_;
454 }
455
458 size_t free() const
459 {
460 if (free_ < 0)
461 {
462 return 0;
463 }
464 return free_;
465 }
466
476 {
478 ret.head_ = head_;
479 ret.tail_ = tail_;
480 ret.skip_ = skip_;
481 ret.size_ = len;
482
483 HASSERT(tail_); // always true when we have a buffer
484 HASSERT(len <= size_);
485
486 size_t bytes_left = size_ - len;
487
488 // tail_->size() is the previously used bytes in the tail buffer. The
489 // number of bytes not transferred shall fit into this. There must be
490 // however at least one byte in the tail buffer that *was* transferred.
491 HASSERT(bytes_left < tail_->size());
492
493 size_t bytes_transferred_from_tail_buffer = tail_->size() - bytes_left;
494
495 // Since the tail is now in both the transferred chain as well as in
496 // the current chain, it needs an extra ref. We keep that ref.
497 head_ = tail_->ref();
498 size_ = bytes_left;
499 skip_ = bytes_transferred_from_tail_buffer;
500 HASSERT(skip_ > 0);
501 // Saves the end offset of the tail buffer in ret.
502 ret.free_ = -bytes_transferred_from_tail_buffer;
503 // this->free_ remains as is.
504 return ret;
505 }
506
509 void append_to(std::string *recvd) const
510 {
512 unsigned skip = skip_;
513 size_t len = size_;
514 recvd->reserve(recvd->size() + len);
515 while (len > 0)
516 {
517 uint8_t *ptr;
518 unsigned available;
519 head = head->get_read_pointer(skip, &ptr, &available);
520 if (available > len)
521 {
522 available = len;
523 }
524 recvd->append((char *)ptr, available);
525 len -= available;
526 skip = 0;
527 }
528 }
529
538 bool try_append_from(const LinkedDataBufferPtr &o, bool add_link = false)
539 {
540 if (!o.size())
541 {
542 return true; // zero bytes, nothing to do.
543 }
544 if (!size_)
545 {
546 // We are empty, so anything can be appended.
547 reset(o);
548 return true;
549 }
550 if (free_ >= 0)
551 {
552 // writeable buffer, cannot append.
553 return false;
554 }
555 HASSERT(o.head());
556 if (o.head() != tail_) // Buffer does not start in the same chain where
557 // we end.
558 {
559 HASSERT(tail_); // else we went into the !size_ branch above
560
561 // Checks if the end of the tail buffer is already reached. This
562 // means that we don't depend on the value of free_ anymore for
563 // correctness. We also check that o starts at the beginning of the
564 // head buffer.
565 if (tail_->size() == (size_t)-free_ && o.skip() == 0)
566 {
567 if (tail_->next() == o.head())
568 {
569 // link already exists
570 }
571 else if (add_link && tail_->next() == nullptr)
572 {
573 tail_->set_next(o.head());
574 }
575 else
576 {
577 return false;
578 }
579 }
580 else
581 {
582 return false;
583 }
584 }
585 else if (-free_ != (int)o.skip())
586 {
587 // Not back-to-back.
588 return false;
589 }
590 // Now we're good, so take over the extra buffers.
591 // Acquire extra references
592 o.head_->ref_all(o.skip() + o.size());
593 if (tail_ == o.head())
594 {
595 HASSERT(o.head_->references() > 1);
596 // Release duplicate reference between the two chains.
597 o.head_->unref();
598 }
599 tail_ = o.tail_;
600 if (o.free_ < 0)
601 {
602 free_ = o.free_;
603 }
604 else
605 {
606 free_ = -tail_->size();
607 }
608 size_ += o.size_;
609 return true;
610 }
611
612private:
616 void clear()
617 {
618 head_ = tail_ = nullptr;
619 skip_ = free_ = size_ = 0;
620 }
621
623 DataBuffer *head_ {nullptr};
625 DataBuffer *tail_ {nullptr};
628 size_t size_ {0};
630 uint16_t skip_ {0};
635 int16_t free_ {0};
636};
637
638#ifdef DEBUG_DATA_BUFFER_FREE
639void check_db_ownership(DataBuffer *b)
640{
641 AtomicHolder h(LinkedDataBufferPtr::head_mu());
642 for (LinkedDataBufferPtr *l = LinkedDataBufferPtr::link_head(); l;
643 l = l->link_next())
644 {
645 ssize_t total = l->skip() + l->size();
646 for (DataBuffer *curr = l->head(); total > 0;)
647 {
648 HASSERT(curr != b);
649 total -= curr->size();
650 curr = curr->next();
651 }
652 }
653}
654#endif
655
658class DataBufferPool : public Pool
659{
660public:
663 {
664 HASSERT(payload_size <= 65535u - sizeof(BufferBase));
665 }
666
667#ifdef GTEST
670 uint16_t *payload_size_override()
671 {
672 return &payloadSize_;
673 }
674#endif
675
677 size_t free_items() override
678 {
679 return base_pool()->free_items(alloc_size());
680 }
681
685 size_t free_items(size_t size) override
686 {
687 return base_pool()->free_items(size);
688 }
689
694 void alloc(DataBuffer **result)
695 {
696#ifdef DEBUG_BUFFER_MEMORY
698 alloc:
699#endif
700 *result = static_cast<DataBuffer *>(
701 base_pool()->alloc_untyped(alloc_size(), nullptr));
702 if (*result)
703 {
704 new (*result) DataBuffer(this);
705 (*result)->size_ = payload_size();
706 }
707 }
708
713 void alloc(DataBufferPtr *result)
714 {
715 DataBuffer *b;
716 alloc(&b);
717 result->reset(b);
718 }
719
720private:
723 BufferBase *alloc_untyped(size_t size, Executable *flow) override
724 {
725 DIE("DataBufferPool does not support this type of allocation.");
726 }
727
729 void free(BufferBase *item) override
730 {
731 // Restores the correct size for assigning it to the right freelist
732 // bucket.
733 item->size_ = alloc_size();
734 // Clears the next pointer as we are not using these for queues.
735 item->next = nullptr;
736 base_pool()->free(item);
737 }
738
741 {
742 return mainBufferPool;
743 }
744
746 uint16_t alloc_size()
747 {
748 return sizeof(BufferBase) + payloadSize_;
749 }
750
753 uint16_t payload_size()
754 {
755 return payloadSize_;
756 }
757
759 uint16_t payloadSize_;
760};
761
762#endif // _UTILS_DATABUFFER_HXX_
DynamicPool * mainBufferPool
main buffer pool instance
Definition Buffer.cxx:37
void * g_current_alloc
This pointer will be saved for debugging the current allocation source.
See OSMutexLock in os/OS.hxx.
Definition Atomic.hxx:153
Abstract base class for all Buffers.
Definition Buffer.hxx:85
uint16_t size_
size of data in bytes
Definition Buffer.hxx:143
size_t size()
Definition Buffer.hxx:121
uint16_t references()
Definition Buffer.hxx:89
Base class for all QMember types that hold data in an expandable format.
Definition Buffer.hxx:195
friend class DataBuffer
Allow DataBuffer access to our constructor.
Definition Buffer.hxx:240
uint8_t data_
user data
Definition Buffer.hxx:243
Buffer< T > * ref()
Add another reference to the buffer.
Definition Buffer.hxx:203
void unref()
Decrement count.
Definition Buffer.hxx:675
Proxy Pool that can allocate DataBuffer objects of a certain size.
uint16_t payloadSize_
Number of bytes that need to be stored in each buffer.
size_t free_items(size_t size) override
Number of free items in the pool for a given allocation size.
size_t free_items() override
Number of free items in the pool.
void alloc(DataBuffer **result)
Get a free item out of the pool with untyped data of the size specified in the constructor.
uint16_t alloc_size()
void alloc(DataBufferPtr *result)
Get a free item out of the pool with untyped data of the size specified in the constructor.
Pool * base_pool()
void free(BufferBase *item) override
Function called when a buffer refcount reaches zero.
uint16_t payload_size()
BufferBase * alloc_untyped(size_t size, Executable *flow) override
Internal helper funciton used by the default Buffer allocimplementation.
Specialization of the Buffer class that is designed for storing untyped data arrays.
uint8_t * data()
DataBuffer * ref()
void unref_all(unsigned total_size)
Releases one reference to all blocks of this buffer.
void set_size(uint16_t s)
Overrides the size of the data buffer.
DataBuffer * next()
DataBuffer * ref_all(unsigned total_size, DataBuffer **tail=nullptr, unsigned *tail_size=nullptr)
Acquires one reference to all blocks of this buffer.
DataBuffer * get_read_pointer(unsigned skip, uint8_t **ptr, unsigned *available)
Helper function to read out data from a linked data buffer.
void set_next(DataBuffer *n)
Sets the linking pointer of the DataBuffer to a target buffer.
An object that can be scheduled on an executor to run.
A class that keeps ownership of a chain of linked DataBuffer references.
bool try_append_from(const LinkedDataBufferPtr &o, bool add_link=false)
Attempt to combine *this with o into a single LinkedDataBufferPtr this.
void reset()
Deallocates the current content (by releasing the references).
uint8_t * data_write_pointer()
int16_t free_
If >= 0: How many free bytes are there in the tail buffer.
void data_read_advance(size_t len)
Advances the head pointer.
void reset(DataBuffer *buf)
Clears the current contents and replaces it with the empty buf.
DataBuffer * tail() const
unsigned skip() const
size_t size_
How many bytes we have filled in (counting starts at head.data() + skip_).
LinkedDataBufferPtr(LinkedDataBufferPtr &&o)
Move constructor. Takes the ownership that o has. Leaves o as empty.
void reset(const LinkedDataBufferPtr &o, ssize_t size=-1)
Takes a reference of o, taking a prefix of len size (or all the data).
DataBuffer * head_
First buffer in the chain. This is the root of the ownership.
unsigned size() const
LinkedDataBufferPtr transfer_head(size_t len)
Transfers the ownership of the prefix of this buffer.
void append_empty_buffer(DataBuffer *buf)
Adds an empty buffer to the end of this buffer chain.
void reset(DataBuffer *buf, unsigned skip, unsigned size)
Set to a single data buffer.
void append_to(std::string *recvd) const
Appends all content in this buffer to an std::string.
DataBuffer * head() const
void data_write_advance(size_t len)
Advances the tail pointer after a write occurred into the tail.
void operator=(LinkedDataBufferPtr &&o)
Move assignment operator.
DataBuffer * tail_
Last buffer in the chain. This is where we can extend the owned bytes.
size_t free() const
void clear()
Internal helper function of constructors and reset functions.
uint16_t skip_
How many bytes to skip in the head buffer.
const uint8_t * data_read_pointer(size_t *len)
Retrieves a pointer where data can be read out of the buffer.
LinkedDataBufferPtr(const LinkedDataBufferPtr &)=delete
We do not permit default copy operation.
Using this class as a base class will cause the given class to have all its instances linked up in a ...
Pool of previously allocated, but currently unused, items.
Definition Buffer.hxx:278
virtual void free(BufferBase *item)=0
Release an item back to the free pool.
virtual BufferBase * alloc_untyped(size_t size, Executable *flow)=0
Untyped buffer allocation method, used be descendants.
virtual size_t free_items()=0
Number of free items in the pool.
friend class DataBufferPool
DataBufferPool proxies to a base Pool.
Definition Buffer.hxx:385
friend class BufferBase
Allow BufferBase to access this class.
Definition Buffer.hxx:381
QMember * next
pointer to the next member in the queue
Definition QMember.hxx:65
#define HASSERT(x)
Checks that the value of expression x is true, else terminates the current process.
Definition macros.h:138
#define DIE(MSG)
Unconditionally terminates the current process with a message.
Definition macros.h:143