Open Model Railroad Network (OpenMRN)
Loading...
Searching...
No Matches
Queue.hxx
Go to the documentation of this file.
1
34#ifndef _UTILS_QUEUE_HXX_
35#define _UTILS_QUEUE_HXX_
36
37#include <cstdint>
38#include <cstdlib>
39#include <cstdarg>
40
41#include "openmrn_features.h"
44#include "os/OS.hxx"
45#include "utils/Atomic.hxx"
46#include "utils/MultiMap.hxx"
47#include "utils/QMember.hxx"
48#include "utils/macros.h"
49
50namespace openlcb
51{
52class AsyncIfTest;
53}
54
55
58struct Result
59{
63 : item(NULL)
64 , index(0)
65 {
66 }
67
73 : item(item)
74 , index(index)
75 {
76 }
77
81 {
82 }
83
85 unsigned index;
86};
87
89{
90public:
91};
92
97class Q : private Atomic
98{
99public:
103 : head(NULL)
104 , tail(NULL)
105 , count(0)
106 {
107 }
108
112 {
113 }
114
115 Atomic *lock()
116 {
117 return this;
118 }
119
124 void insert(QMember *item, unsigned index = 0)
125 {
126 AtomicHolder h(this);
127 HASSERT(item->next == nullptr);
128 HASSERT(item != tail);
129 if (head == NULL)
130 {
131 head = tail = item;
132 }
133 else
134 {
135 tail->next = item;
136 tail = item;
137 }
138 item->next = NULL;
139 ++count;
140 }
141
146 void insert_locked(QMember *item, unsigned index = 0)
147 {
148 HASSERT(item->next == nullptr);
149 HASSERT(item != tail);
150 if (head == NULL)
151 {
152 head = tail = item;
153 }
154 else
155 {
156 tail->next = item;
157 tail = item;
158 }
159 item->next = NULL;
160 ++count;
161 }
162
167 QMember *next(unsigned index)
168 {
169 return next().item;
170 }
171
177 {
178 AtomicHolder h(this);
179 return next_locked();
180 }
181
187 {
188 if (head == NULL)
189 {
190 return Result();
191 }
192 --count;
193 QMember *qm = head;
194 if (head == tail)
195 {
196 tail = NULL;
197 }
198 head = (qm->next);
199 qm->next = NULL;
200
201 return Result(qm, 0);
202 }
203
208 size_t pending(unsigned index)
209 {
210 return pending();
211 };
212
216 size_t pending()
217 {
218 return count;
219 }
220
225 bool empty(unsigned index)
226 {
227 return empty();
228 }
229
233 bool empty()
234 {
235 return (head == NULL);
236 }
237
238private:
241
244
246 size_t count;
247
249};
250
254{
255public:
259 : waiting(true)
260 {
261 }
262
266 {
267 }
268
273 void insert(QMember *item, unsigned index = 0)
274 {
275 Executable *executable = NULL;
276 {
277 AtomicHolder h(impl_.lock());
278 if (waiting)
279 {
280 if (impl_.empty())
281 {
282 waiting = false;
283 impl_.insert(item);
284 }
285 else
286 {
287 executable = static_cast<Executable *>(impl_.next().item);
288 }
289 }
290 else
291 {
292 impl_.insert(item);
293 }
294 }
295 if (executable)
296 {
297 executable->alloc_result(item);
298 }
299 }
300
306 {
307 QMember *qm = NULL;
308 {
309 AtomicHolder h(impl_.lock());
310 if (waiting)
311 {
312 impl_.insert(flow);
313 }
314 else
315 {
316 qm = impl_.next(0);
317 if (qm == NULL)
318 {
319 impl_.insert(flow);
320 waiting = true;
321 }
322 }
323 }
324 if (qm)
325 {
326 flow->alloc_result(qm);
327 }
328 }
329
334 QMember *next(unsigned index)
335 {
336 return next().item;
337 }
338
344 {
345 AtomicHolder h(impl_.lock());
346 return next_locked();
347 }
348
354 {
355 return waiting ? Result() : impl_.next();
356 }
357
362 size_t pending(unsigned index)
363 {
364 return pending();
365 };
366
370 size_t pending()
371 {
372 AtomicHolder h(impl_.lock());
373 return waiting ? 0 : impl_.pending();
374 }
375
380 bool empty(unsigned index)
381 {
382 return empty();
383 }
384
388 bool empty()
389 {
390 AtomicHolder h(impl_.lock());
391 return waiting ? true : impl_.empty();
392 }
393
394private:
397
400
402};
403
405template <class T> class TypedQAsync : public QAsync
406{
407public:
412 {
413 return BlockingWait(this).result();
414 }
415
417 void typed_insert(T* entry) {
418 insert(entry);
419 }
420
422
423private:
427 {
428 public:
431 {
432 parent->next_async(this);
434 }
435
438 {
439 return result_;
440 }
441
442 private:
444 {
445 result_ = static_cast<T *>(item);
446 n_.notify();
447 }
448
450 {
451 DIE("Unexpected call to Run() in BlockingWait");
452 }
453
454 private:
459 };
460};
461
465template <unsigned ITEMS> class QList
466{
467public:
471 : list()
472 {
473 }
474
478 {
479 }
480
481 typedef ::Result Result;
482
483 Atomic *lock()
484 {
485 return list[0].lock();
486 }
487
492 void insert(QMember *item, unsigned index)
493 {
494 AtomicHolder h(lock());
495 insert_locked(item, index);
496 }
497
502 void insert_locked(QMember *item, unsigned index)
503 {
504 if (index >= ITEMS)
505 {
506 index = ITEMS - 1;
507 }
508 list[index].insert_locked(item);
509 }
510
515 QMember *next(unsigned index)
516 {
517 AtomicHolder h(lock());
518 return list[index].next_locked().item;
519 }
520
525 {
526 AtomicHolder h(lock());
527 return next_locked();
528 }
529
530 Result next_locked()
531 {
532 for (unsigned i = 0; i < ITEMS; ++i)
533 {
534 QMember *result = list[i].next_locked().item;
535 if (result)
536 {
537 return Result(result, i);
538 }
539 }
540 return Result();
541 }
542
547 size_t pending(unsigned index)
548 {
549 return list[index].pending();
550 }
551
555 size_t pending()
556 {
557 // @todo we probably don't want to keep locking and unlocking all of
558 // the queues here. There should probably be only one lock for this
559 // entire iteration.
560 size_t result = 0;
561 for (unsigned i = 0; i < ITEMS; ++i)
562 {
563 result += list[i].pending();
564 }
565 return result;
566 }
567
569 size_t size()
570 {
571 return pending();
572 }
573
578 bool empty(unsigned index)
579 {
580 return list[index].empty();
581 }
582
586 bool empty()
587 {
588 AtomicHolder h(lock());
589 for (unsigned i = 0; i < ITEMS; ++i)
590 {
591 if (!list[i].empty())
592 {
593 return false;
594 }
595 }
596 return true;
597 }
598
599private:
601 Q list[ITEMS];
602
604};
605
608template<unsigned items> using QListProtected = QList<items>;
609
610
611#if 0
615template <class T> class QueueWait : public Q <T>, public OSSem
616{
617public:
619 QueueWait()
620 : Q<T>(),
621 OSSem(0)
622 {
623 }
624
627 ~QueueWait()
628 {
629 }
630
634 void insert(T *item)
635 {
636 Q<T>::insert(item);
637 post();
638 }
639
643 T *next()
644 {
645 T *result = Q<T>::next();
646 if (result != NULL)
647 {
648 /* decrement semaphore */
649 OSSem::wait();
650 }
651 return result;
652 }
653
658 T *wait()
659 {
660 OSSem::wait();
661 T *result = Q<T>::next();
662 if(result == NULL)
663 {
664 errno = EINTR;
665 }
666 return result;
667 }
668
674 T *timedwait(long long timeout)
675 {
676 if (OSSem::timedwait(timeout) != 0)
677 {
678 errno = ETIMEDOUT;
679 return NULL;
680 }
681
682 T *result = Q<T>::next();
683 if (result == NULL)
684 {
685 errno = EINTR;
686 }
687 return result;
688 }
689
692 void wakeup()
693 {
694 post();
695 }
696
697private:
698
699 DISALLOW_COPY_AND_ASSIGN(QueueWait);
700};
701
705template <class T> class QueueProtectedWait : public QProtected <T>, public OSSem
706{
707public:
709 QueueProtectedWait()
710 : QProtected<T>(),
711 OSSem(0)
712 {
713 }
714
717 ~QueueProtectedWait()
718 {
719 }
720
724 void insert(T *item)
725 {
726 QProtected<T>::insert(item);
727 post();
728 }
729
730#if OPENMRN_FEATURE_RTOS_FROM_ISR
734 void insert_from_isr(T *item)
735 {
736 QProtected<T>::insert_locked(item);
737 post_from_isr();
738 }
739#endif // OPENMRN_FEATURE_RTOS_FROM_ISR
740
744 T *next()
745 {
746 T *result = QProtected<T>::next();
747 if (result != NULL)
748 {
749 /* decrement semaphore */
750 OSSem::wait();
751 }
752 return result;
753 }
754
759 T *wait()
760 {
761 OSSem::wait();
762 T *result = QProtected<T>::next();
763 if(result == NULL)
764 {
765 errno = EINTR;
766 }
767 return result;
768 }
769
775 T *timedwait(long long timeout)
776 {
777 if (OSSem::timedwait(timeout) != 0)
778 {
779 errno = ETIMEDOUT;
780 return NULL;
781 }
782
783 T *result = QProtected<T>::next();
784 if (result == NULL)
785 {
786 errno = EINTR;
787 }
788 return result;
789 }
790
793 void wakeup()
794 {
795 post();
796 }
797
798private:
799 DISALLOW_COPY_AND_ASSIGN(QueueProtectedWait);
800};
801
802#endif
803
808template <unsigned items>
809class QListProtectedWait : public QListProtected<items>, public OSSem
810{
811public:
815 : QListProtected<items>()
816 , OSSem(0)
817 {
818 }
819
823 {
824 }
825
830 void insert(QMember *item, unsigned index)
831 {
833 post();
834 }
835
836#if OPENMRN_FEATURE_RTOS_FROM_ISR
841 void insert_from_isr(QMember *item, unsigned index)
842 {
843 int woken = 0;
845 this->post_from_isr(&woken);
846 }
847#endif // OPENMRN_FEATURE_RTOS_FROM_ISR
848
850 typedef typename QListProtected<items>::Result Result;
851
856 {
858 if (result.item != NULL)
859 {
860 /* decrement semaphore */
861 OSSem::wait();
862 }
863 return result;
864 }
865
871 {
872 OSSem::wait();
874 if (result.item == NULL)
875 {
876 errno = EINTR;
877 }
878 return result;
879 }
880
881#if OPENMRN_FEATURE_SEM_TIMEDWAIT
887 Result timedwait(long long timeout)
888 {
889 if (OSSem::timedwait(timeout) != 0)
890 {
891 errno = ETIMEDOUT;
892 return {NULL, 0};
893 }
894
896 if (result.item == NULL)
897 {
898 errno = EINTR;
899 }
900 return result;
901 }
902#endif
903
906 void wakeup()
907 {
908 post();
909 }
910
911private:
913};
914
915#endif /* _UTILS_QUEUE_HXX_ */
static OSEvent wakeup
event used to wakeup select calls
Definition Select.cxx:40
See OSMutexLock in os/OS.hxx.
Definition Atomic.hxx:153
Lightweight locking class for protecting small critical sections.
Definition Atomic.hxx:130
An object that can be scheduled on an executor to run.
virtual void alloc_result(QMember *item)
Return the result of an alloc_async() from a memory Pool.
This class provides a counting semaphore API.
Definition OS.hxx:243
void post()
Post (increment) a semaphore.
Definition OS.hxx:260
void wait()
Wait on (decrement) a semaphore.
Definition OS.hxx:279
Asynchronous specialization of Q.
Definition Queue.hxx:254
bool waiting
true if someone is waiting for an insertion
Definition Queue.hxx:396
bool empty()
Test if the queue is empty.
Definition Queue.hxx:388
Result next()
Get an item from the front of the queue.
Definition Queue.hxx:343
void next_async(Executable *flow)
Get an item from the front of the queue.
Definition Queue.hxx:305
QAsync()
Default Constructor.
Definition Queue.hxx:258
void insert(QMember *item, unsigned index=0)
Add an item to the back of the queue.
Definition Queue.hxx:273
size_t pending(unsigned index)
Get the number of pending items in the queue.
Definition Queue.hxx:362
~QAsync()
Default destructor.
Definition Queue.hxx:265
QMember * next(unsigned index)
Get an item from the front of the queue.
Definition Queue.hxx:334
bool empty(unsigned index)
Test if the queue is empty.
Definition Queue.hxx:380
Q impl_
Implementation helper.
Definition Queue.hxx:399
size_t pending()
Get the number of pending items in the queue.
Definition Queue.hxx:370
Result next_locked()
Get an item from the front of the queue.
Definition Queue.hxx:353
A BufferQueue that adds the ability to wait on the next buffer.
Definition Queue.hxx:810
void wakeup()
Wakeup anyone waiting on the wait queue.
Definition Queue.hxx:906
Result wait()
Wait for an item from the front of the queue.
Definition Queue.hxx:870
void insert(QMember *item, unsigned index)
Add an item to the back of the queue.
Definition Queue.hxx:830
QListProtected< items >::Result Result
Translate the Result type.
Definition Queue.hxx:850
Result next()
Get an item from the front of the queue.
Definition Queue.hxx:855
QListProtectedWait()
Default Constructor.
Definition Queue.hxx:814
~QListProtectedWait()
Default destructor.
Definition Queue.hxx:822
A list of queues.
Definition Queue.hxx:466
bool empty()
Test if all the queues are empty.
Definition Queue.hxx:586
size_t pending()
Get the total number of pending items in all queues in the list.
Definition Queue.hxx:555
Result next()
Get an item from the front of the queue queue in priority order.
Definition Queue.hxx:524
size_t size()
Definition Queue.hxx:569
QList()
Default Constructor.
Definition Queue.hxx:470
Q list[ITEMS]
the list of queues
Definition Queue.hxx:601
void insert(QMember *item, unsigned index)
Add an item to the back of the queue.
Definition Queue.hxx:492
~QList()
Destructor.
Definition Queue.hxx:477
QMember * next(unsigned index)
Get an item from the front of the queue.
Definition Queue.hxx:515
size_t pending(unsigned index)
Get the number of pending items in the queue.
Definition Queue.hxx:547
bool empty(unsigned index)
Test if the queue is empty.
Definition Queue.hxx:578
void insert_locked(QMember *item, unsigned index)
Add an item to the back of the queue.
Definition Queue.hxx:502
Essentially a "next" pointer container.
Definition QMember.hxx:42
QMember * next
pointer to the next member in the queue
Definition QMember.hxx:65
This class implements a linked list "queue" of buffers.
Definition Queue.hxx:98
size_t pending()
Get the number of pending items in the queue.
Definition Queue.hxx:216
QMember * head
head item in queue
Definition Queue.hxx:240
void insert_locked(QMember *item, unsigned index=0)
Add an item to the back of the queue.
Definition Queue.hxx:146
Result next_locked()
Get an item from the front of the queue.
Definition Queue.hxx:186
Q()
Default Constructor.
Definition Queue.hxx:102
void insert(QMember *item, unsigned index=0)
Add an item to the back of the queue.
Definition Queue.hxx:124
size_t pending(unsigned index)
Get the number of pending items in the queue.
Definition Queue.hxx:208
Result next()
Get an item from the front of the queue.
Definition Queue.hxx:176
bool empty()
Test if the queue is empty.
Definition Queue.hxx:233
bool empty(unsigned index)
Test if the queue is empty.
Definition Queue.hxx:225
QMember * next(unsigned index)
Get an item from the front of the queue.
Definition Queue.hxx:167
~Q()
Default destructor.
Definition Queue.hxx:111
QMember * tail
tail item in queue
Definition Queue.hxx:243
size_t count
number of items in queue
Definition Queue.hxx:246
A Notifiable for synchronously waiting for a notification.
void notify() override
Implementation of notification receive.
void wait_for_notification()
Blocks the current thread until the notification is delivered.
Helper class for waiting (blocking the current thread) until a message in the queue shows up.
Definition Queue.hxx:427
BlockingWait(TypedQAsync< T > *parent)
Constructor.
Definition Queue.hxx:430
T * result_
Response of the allocation.
Definition Queue.hxx:458
void run() OVERRIDE
Entry point.
Definition Queue.hxx:449
SyncNotifiable n_
helps blocking the calling thread until the allocation is complete.
Definition Queue.hxx:456
void alloc_result(QMember *item) OVERRIDE
Return the result of an alloc_async() from a memory Pool.
Definition Queue.hxx:443
Strongly typed queue class with asynchronous access.
Definition Queue.hxx:406
void typed_insert(T *entry)
Inserts an entry at the end of the queue.
Definition Queue.hxx:417
T * next_blocking()
Definition Queue.hxx:411
#define OVERRIDE
Function attribute for virtual functions declaring that this funciton is overriding a funciton that s...
Definition macros.h:180
#define HASSERT(x)
Checks that the value of expression x is true, else terminates the current process.
Definition macros.h:138
#define DIE(MSG)
Unconditionally terminates the current process with a message.
Definition macros.h:143
#define DISALLOW_COPY_AND_ASSIGN(TypeName)
Removes default copy-constructor and assignment added by C++.
Definition macros.h:171
Result of pulling an item from the queue based on priority.
Definition Queue.hxx:59
unsigned index
index of item pulled from queue
Definition Queue.hxx:85
QMember * item
item pulled from queue
Definition Queue.hxx:84
Result()
Defualt Constructor.
Definition Queue.hxx:62
Result(QMember *item, unsigned index)
Explicit initializer constructor.
Definition Queue.hxx:72
~Result()
Default Destructor.
Definition Queue.hxx:80