Open Model Railroad Network (OpenMRN)
Loading...
Searching...
No Matches
Buffer.hxx
Go to the documentation of this file.
1
34#ifndef _UTILS_BUFFER_HXX_
35#define _UTILS_BUFFER_HXX_
36
37// Enable this to collect the pointer of all buffers live.
38//#define DEBUG_BUFFER_MEMORY
39
40#include <memory>
41#include <atomic>
42#include <new>
43#include <cstdint>
44#include <cstdlib>
45#include <cstdarg>
46
49#include "os/OS.hxx"
50#include "utils/Atomic.hxx"
51#include "utils/MultiMap.hxx"
52#include "utils/QMember.hxx"
53#include "utils/Queue.hxx"
54#include "utils/macros.h"
55
56class DynamicPool;
57class FixedPool;
58class LimitedPool;
59class Pool;
60template <class T> class Buffer;
61class BufferBase;
63
64namespace openlcb
65{
66class AsyncIfTest;
67}
68
69extern const unsigned LARGEST_BUFFERPOOL_BUCKET;
70
73
78
80extern void* g_current_alloc;
81
84class BufferBase : public QMember
85{
86public:
89 uint16_t references()
90 {
91 return count_.load();
92 }
93
98 {
99 if (done_)
100 {
101 done_->notify();
102 }
103 done_ = done;
104 }
105
109 {
110 if (done_)
111 {
112 return done_->new_child();
113 }
114 else
115 {
116 return nullptr;
117 }
118 }
119
121 size_t size()
122 {
123 return size_;
124 }
125
126protected:
131 {
132 return pool_;
133 }
134
137
141
143 uint16_t size_;
144
146 std::atomic_uint_least16_t count_;
147
153 : QMember()
154 , pool_(pool)
155 , done_(NULL)
156 , size_(size)
157 , count_(1)
158 {
159 }
160
164 {
165 if (done_)
166 {
167 done_->notify();
168 done_ = nullptr;
169 }
170 }
171
172 friend class openlcb::AsyncIfTest;
173
175 friend class Pool;
176
178 friend class DynamicPool;
179
181 friend class FixedPool;
182
184 friend class LimitedPool;
185
187 friend class DataBufferPool;
188
190};
191
194template <class T> class Buffer : public BufferBase
195{
196public:
198 typedef T value_type;
199
204 {
205 ++count_;
206 return this;
207 }
208
211 inline void unref();
212
215 T *data()
216 {
217 return &data_;
218 }
219
220private:
225 : BufferBase(sizeof(Buffer<T>), pool)
226 {
227 }
228
232 {
233 }
234
236
238 friend class Pool;
240 friend class DataBuffer;
241
244};
245
248template<typename T> struct BufferDelete {
251 if (b) b->unref();
252 }
253};
254
256template<typename T> using AutoReleaseBuffer = std::unique_ptr<Buffer<T>, BufferDelete<T>>;
259template<typename T> using BufferPtr = AutoReleaseBuffer<T>;
260
272template<typename T> BufferPtr<T> get_buffer_deleter(Buffer<T>* b) {
273 return BufferPtr<T>(b);
274}
275
277class Pool
278{
279public:
281 size_t total_size()
282 {
283 return totalSize;
284 }
285
291 template <class BufferType>
292 void alloc(Buffer<BufferType> **result, Executable *flow = NULL)
293 {
294#ifdef DEBUG_BUFFER_MEMORY
296 alloc:
297#endif
298 *result = static_cast<Buffer<BufferType> *>(
299 alloc_untyped(sizeof(Buffer<BufferType>), flow));
300 if (*result && !flow)
301 {
302 new (*result) Buffer<BufferType>(this);
303 }
304 }
305
309 template <class BufferType> void alloc(BufferPtr<BufferType> *result)
310 {
312 alloc(&p);
313 result->reset(p);
314 }
315
319 template <class BufferType> void alloc_async(Executable *flow)
320 {
321 Buffer<BufferType> *buffer;
322 alloc(&buffer, flow);
323 }
324
330 template <class BufferType>
331 static void alloc_async_init(BufferBase *base, Buffer<BufferType> **result)
332 {
333 HASSERT(base);
334 HASSERT(sizeof(Buffer<BufferType>) == base->size());
335 *result = static_cast<Buffer<BufferType> *>(base);
336 new (*result) Buffer<BufferType>(base->pool());
337 }
338
342 virtual size_t free_items() = 0;
343
348 virtual size_t free_items(size_t size) = 0;
349
350protected:
354 : totalSize(0)
355 {
356 }
357
360 virtual ~Pool()
361 {
362 }
363
369 virtual BufferBase *alloc_untyped(size_t size, Executable *flow) = 0;
370
374 virtual void free(BufferBase *item) = 0;
375
377 size_t totalSize;
378
379private:
381 friend class BufferBase;
383 friend class LimitedPool;
385 friend class DataBufferPool;
386
388 template <class T> friend class Buffer;
389
391};
392
396class Bucket : public Q
397{
398public:
406 static Bucket *init(int s, ...)
407 {
408 va_list ap, aq;
409 va_start(ap, s);
410 va_copy(aq, ap);
411 int count = 1;
412 int current = s;
413
414 while (current != 0)
415 {
416 ++count;
417 int next = va_arg(ap, int);
418 HASSERT(next > current || next == 0);
419 current = next;
420 }
421
422 Bucket *bucket = (Bucket *)malloc(sizeof(Bucket) * count);
423 Bucket *now = bucket;
424
425 new (now) Bucket(s);
426 now++;
427
428 for (int i = 1; i < count; ++i)
429 {
430 new (now) Bucket(va_arg(aq, int));
431 now++;
432 }
433
434 va_end(aq);
435 va_end(ap);
436 return bucket;
437 }
438
442 static void destroy(Bucket *bucket)
443 {
444 free(bucket);
445 }
446
450 size_t size()
451 {
452 return size_;
453 }
454
459 {
460 return pending_.next().item;
461 }
462
463private:
466 Bucket(size_t size)
467 : Q()
468 , size_(size)
469 , pending_()
470 {
471 }
472
476 {
477 }
478
479 size_t size_;
480public:
481 size_t allocCount_{0};
482private:
485};
486
490class DynamicPool : public Pool, private Atomic
491{
492public:
497 : Pool()
498 , buckets(sizes)
499 {
500 }
501
504 {
505#ifdef GTEST
506 for (unsigned i = 0; buckets[i].size() != 0; ++i)
507 {
508 // Frees all memory left in the bucket.
509 do
510 {
511 auto *p = static_cast<BufferBase *>(buckets[i].next().item);
512 if (!p)
513 {
514 break;
515 }
516 ::free(p);
517 } while (true);
518 }
519#endif
521 }
522
526 size_t free_items() override;
527
532 size_t free_items(size_t size) override;
533
534protected:
537
538private:
546 BufferBase *alloc_untyped(size_t size, Executable *flow) override;
547
551 void* alloc_large(size_t size);
554 void free_large(void* block);
555
559 void free(BufferBase *item) override;
560
564
565 friend class ForwardAllocator;
566
568};
569
572class FixedPool : public Pool, public Atomic
573{
574public:
579 bool valid(QMember *item)
580 {
581 if ((char *)item >= mempool &&
582 (char *)item < (mempool + (items * itemSize)))
583 {
584 return true;
585 }
586 return false;
587 }
588
594 FixedPool(size_t item_size, size_t items)
595 : Pool()
596 , mempool(new char[(item_size * items)])
597 , itemSize(item_size)
598 , items(items)
599 , empty(false)
600 {
601 // HASSERT(item_size != 0 && items != 0);
602 QMember *current = (QMember *)mempool;
603 for (size_t i = 0; i < items; ++i)
604 {
605 current->init();
606 queue.insert(current);
607 current = (QMember *)((char *)current + item_size);
608 }
609 }
610
613 {
614 delete[] mempool;
615 }
616
620 size_t free_items() override
621 {
622 return items - (totalSize / itemSize);
623 }
624
629 size_t free_items(size_t size) override
630 {
631 return size == itemSize ? free_items() : 0;
632 }
633
634protected:
637
639 char *mempool;
640
642 size_t itemSize;
643
645 size_t items;
646
648 bool empty;
649
650private:
659 BufferBase *alloc_untyped(size_t size, Executable *flow) override;
660
664 void free(BufferBase *item) override;
665
669
671};
672
675template <class T> void Buffer<T>::unref()
676{
677 if (count_.fetch_sub(1) == 1u)
678 {
679 this->~Buffer();
680 pool_->free(this);
681 }
682}
683
684#endif /* _UTILS_BUFFER_HXX_ */
BufferPtr< T > get_buffer_deleter(Buffer< T > *b)
Helper function to create a BufferPtr of an appropriate type without having to explicitly specify the...
Definition Buffer.hxx:272
AutoReleaseBuffer< T > BufferPtr
Smart pointer for buffers.
Definition Buffer.hxx:259
std::unique_ptr< Buffer< T >, BufferDelete< T > > AutoReleaseBuffer
This class will automatically unref a Buffer when going out of scope.
Definition Buffer.hxx:256
void * g_current_alloc
This pointer will be saved for debugging the current allocation source.
Pool * init_main_buffer_pool()
Initializes the main buffer pool.
Definition Buffer.cxx:40
const unsigned LARGEST_BUFFERPOOL_BUCKET
Ensures that the largest bucket in the main buffer pool is exactly the size of a GenMessage.
Definition If.cxx:40
DynamicPool * mainBufferPool
main buffer pool instance
Definition Buffer.cxx:37
Lightweight locking class for protecting small critical sections.
Definition Atomic.hxx:130
A BarrierNotifiable allows to create a number of child Notifiable and wait for all of them to finish.
void notify() override
Implementation of the barrier semantics.
BarrierNotifiable * new_child()
Call this for each child task.
This is a struct for storing info about a specific size item in the DynamicPool.
Definition Buffer.hxx:397
QMember * executables()
Pull out any pending Executables.
Definition Buffer.hxx:458
size_t size()
Get the size of the bucket.
Definition Buffer.hxx:450
static void destroy(Bucket *bucket)
destroy a bucket created with init.
Definition Buffer.hxx:442
~Bucket()
Destructor.
Definition Buffer.hxx:475
Q pending_
list of anyone waiting for an item in the bucket
Definition Buffer.hxx:484
size_t size_
size of entry
Definition Buffer.hxx:479
Bucket(size_t size)
Constructor.
Definition Buffer.hxx:466
static Bucket * init(int s,...)
Allocate a Bucket array off of the heap initialized with sizes.
Definition Buffer.hxx:406
size_t allocCount_
total entries allocated
Definition Buffer.hxx:481
Abstract base class for all Buffers.
Definition Buffer.hxx:85
Pool * pool_
Reference to the pool from whence this buffer came.
Definition Buffer.hxx:136
uint16_t size_
size of data in bytes
Definition Buffer.hxx:143
BufferBase(size_t size, Pool *pool)
Constructor.
Definition Buffer.hxx:152
size_t size()
Definition Buffer.hxx:121
std::atomic_uint_least16_t count_
number of references in use
Definition Buffer.hxx:146
~BufferBase()
Destructor.
Definition Buffer.hxx:163
Pool * pool()
Get a pointer to the pool that this buffer belongs to.
Definition Buffer.hxx:130
BarrierNotifiable * new_child()
Creates a new child notifiable of the current done notifiable.
Definition Buffer.hxx:108
void set_done(BarrierNotifiable *done)
Specifies that a given BarrierNotifiable must be called when the Buffer is deallocated (unreffed to z...
Definition Buffer.hxx:97
BarrierNotifiable * done_
Notifiable to call when the buffer has finished processing everywhere.
Definition Buffer.hxx:140
uint16_t references()
Definition Buffer.hxx:89
Base class for all QMember types that hold data in an expandable format.
Definition Buffer.hxx:195
Buffer(Pool *pool)
Constructor.
Definition Buffer.hxx:224
T data_
user data
Definition Buffer.hxx:243
Buffer< T > * ref()
Add another reference to the buffer.
Definition Buffer.hxx:203
void unref()
Decrement count.
Definition Buffer.hxx:675
~Buffer()
Destructor.
Definition Buffer.hxx:231
T value_type
The type of payload this buffer contains.
Definition Buffer.hxx:198
T * data()
get a pointer to the start of the data.
Definition Buffer.hxx:215
Proxy Pool that can allocate DataBuffer objects of a certain size.
Specialization of the Buffer class that is designed for storing untyped data arrays.
A specialization of a pool which can allocate new elements dynamically upon request.
Definition Buffer.hxx:491
DynamicPool()
Default constructor.
void free(BufferBase *item) override
Releases an item back to the free pool.
Definition Buffer.cxx:165
size_t free_items() override
Number of free items in the pool.
Definition Buffer.cxx:57
~DynamicPool()
default destructor
Definition Buffer.hxx:503
DynamicPool(Bucket sizes[])
Constructor.
Definition Buffer.hxx:496
BufferBase * alloc_untyped(size_t size, Executable *flow) override
Get a free item out of the pool.
Definition Buffer.cxx:105
void * alloc_large(size_t size)
Allocates a large memory block directly from the heap.
Definition Buffer.cxx:154
Bucket * buckets
Free buffer queue.
Definition Buffer.hxx:536
void free_large(void *block)
Frees a large memory block allocated by alloc_large.
Definition Buffer.cxx:158
An object that can be scheduled on an executor to run.
Pool of fixed number of items which can be allocated up on request.
Definition Buffer.hxx:573
FixedPool()
Default Constructor.
void free(BufferBase *item) override
Release an item back to the free pool.
Definition Buffer.cxx:231
bool valid(QMember *item)
Used in static pools to tell if this item is a member of the pool.
Definition Buffer.hxx:579
~FixedPool()
default destructor
Definition Buffer.hxx:612
FixedPool(size_t item_size, size_t items)
Constructor for a fixed size pool.
Definition Buffer.hxx:594
size_t free_items() override
Number of free items in the pool.
Definition Buffer.hxx:620
BufferBase * alloc_untyped(size_t size, Executable *flow) override
Get a free item out of the pool.
Definition Buffer.cxx:197
size_t items
total number of items in the queue
Definition Buffer.hxx:645
size_t itemSize
item Size for fixed pools
Definition Buffer.hxx:642
bool empty
is the pool empty
Definition Buffer.hxx:648
size_t free_items(size_t size) override
Number of free items in the pool for a given allocation size.
Definition Buffer.hxx:629
char * mempool
First buffer in a pre-allocated array pool.
Definition Buffer.hxx:639
Q queue
Free buffer queue.
Definition Buffer.hxx:636
An arena allocator, which is optimized to not be able to free individual entries, only the entire all...
Implementation of a Pool interface that takes memory from mainBufferPool (configurable) but limits th...
Pool of previously allocated, but currently unused, items.
Definition Buffer.hxx:278
static void alloc_async_init(BufferBase *base, Buffer< BufferType > **result)
Cast the result of an asynchronous allocation and perform a placement new on it.
Definition Buffer.hxx:331
virtual void free(BufferBase *item)=0
Release an item back to the free pool.
Pool()
Default Constructor.
Definition Buffer.hxx:353
size_t total_size()
Definition Buffer.hxx:281
void alloc(BufferPtr< BufferType > *result)
Get a free item out of the pool.
Definition Buffer.hxx:309
virtual BufferBase * alloc_untyped(size_t size, Executable *flow)=0
Untyped buffer allocation method, used be descendants.
virtual size_t free_items()=0
Number of free items in the pool.
size_t totalSize
keep track of total allocated size of memory
Definition Buffer.hxx:377
void alloc(Buffer< BufferType > **result, Executable *flow=NULL)
Get a free item out of the pool.
Definition Buffer.hxx:292
virtual ~Pool()
default destructor.
Definition Buffer.hxx:360
void alloc_async(Executable *flow)
Get a free item out of the pool.
Definition Buffer.hxx:319
virtual size_t free_items(size_t size)=0
Number of free items in the pool for a given allocation size.
Essentially a "next" pointer container.
Definition QMember.hxx:42
void init()
Initiailize a QMember, in place of a public placement construction.
Definition QMember.hxx:46
This class implements a linked list "queue" of buffers.
Definition Queue.hxx:98
void insert(QMember *item, unsigned index=0)
Add an item to the back of the queue.
Definition Queue.hxx:124
Result next()
Get an item from the front of the queue.
Definition Queue.hxx:176
QMember * next(unsigned index)
Get an item from the front of the queue.
Definition Queue.hxx:167
size_t count
number of items in queue
Definition Queue.hxx:246
Test fixture base class with helper methods for exercising the asynchronous interface code.
#define HASSERT(x)
Checks that the value of expression x is true, else terminates the current process.
Definition macros.h:138
#define DISALLOW_COPY_AND_ASSIGN(TypeName)
Removes default copy-constructor and assignment added by C++.
Definition macros.h:171
Helper class for correctly deleting a buffer.
Definition Buffer.hxx:248
void operator()(Buffer< T > *b)
unrefs the passed-in buffer.
Definition Buffer.hxx:250