Open Model Railroad Network (OpenMRN)
Loading...
Searching...
No Matches
ScheduledQueue.hxx
Go to the documentation of this file.
1
36#ifndef _UTILS_SCHEDULEDQUEUE_HXX_
37#define _UTILS_SCHEDULEDQUEUE_HXX_
38
39#include "os/OS.hxx"
40#include "utils/Fixed16.hxx"
41#include "utils/Queue.hxx"
42#include "utils/logging.h"
43
48{
49public:
58 ScheduledQueue(unsigned num_bands, const Fixed16 *strides)
59 : numBands_(num_bands)
60 , bands_(new Band[num_bands])
61 {
62 for (unsigned i = 0; i < numBands_; i++)
63 {
64 bands_[i].stride_ = strides[i];
65 bands_[i].currentToken_ -= strides[i];
66 }
67 }
68
71 {
72 delete[] bands_;
73 }
74
79 {
80 OSMutexLock h(lock());
81 return next_locked();
82 }
83
88 {
89 if (!numPending_)
90 {
91 // Empty queue.
92 return Result(0, 0);
93 }
94 // Execute the priority based scheduling algorithm.
95 for (unsigned i = 0; i < numBands_; ++i)
96 {
98 if (bands_[i].currentToken_.trunc() >= 1)
99 {
100 Result ret = bands_[i].queue_.next_locked();
101 if (ret.item)
102 {
103 ret.index = i;
104 --numPending_;
105 bands_[i].currentToken_ -= 1;
106 return ret;
107 }
108 else
109 {
110 // This queue has a token but is empty. We remove
111 // fractional tokens and keep searching onwards in the
112 // priorities.
113 bands_[i].currentToken_ = 1;
114 }
115 }
116 }
117 // Fallen off at the end. We go backwards to find any queue with
118 // nonempty members.
119 for (int i = numBands_ - 1; i >= 0; --i)
120 {
121 if (!bands_[i].queue_.empty())
122 {
123 Result ret = bands_[i].queue_.next_locked();
124 bands_[i].currentToken_ = 0;
125 ret.index = i;
126 --numPending_;
127 return ret;
128 }
129 }
130 DIE("Unexpected nonempty queue");
131 return Result(0, 0);
132 }
133
141 {
142 return &lock_;
143 }
144
150 void insert(QMember *item, unsigned prio)
151 {
152 OSMutexLock h(lock());
153 return insert_locked(item, prio);
154 }
155
161 void insert_locked(QMember *item, unsigned prio)
162 {
163 HASSERT(prio < numBands_);
164 ++numPending_;
165 bands_[prio].queue_.insert_locked(item);
166 }
167
171 size_t pending(unsigned prio)
172 {
173 HASSERT(prio < numBands_);
174 return bands_[prio].queue_.pending();
175 };
176
179 size_t pending() const
180 {
181 return numPending_;
182 }
183
185 bool empty() const
186 {
187 return numPending_ == 0;
188 }
189
191 unsigned num_prio() const
192 {
193 return numBands_;
194 }
195
196private:
198 struct Band
199 {
208 };
209
212
214 unsigned numBands_;
215
217 unsigned numPending_ {0};
218
221};
222
223#endif // _UTILS_SCHEDULEDQUEUE_HXX_
Class to allow convenient locking and unlocking of mutexes in a C context.
Definition OS.hxx:494
This class provides a mutex API.
Definition OS.hxx:427
Essentially a "next" pointer container.
Definition QMember.hxx:42
This class implements a linked list "queue" of buffers.
Definition Queue.hxx:98
void insert_locked(QMember *item, unsigned index=0)
Add an item to the back of the queue.
Definition Queue.hxx:146
Result next_locked()
Get an item from the front of the queue.
Definition Queue.hxx:186
size_t pending(unsigned index)
Get the number of pending items in the queue.
Definition Queue.hxx:208
ScheduledQueue is a queue with multiple priorities, where each priority is a FIFO list.
~ScheduledQueue()
Destructor.
size_t pending() const
Get the number of pending items in the queue (all bands total)
OSMutex lock_
Protects insert and next operations.
void insert(QMember *item, unsigned prio)
Adds an entry to the queue.
Band * bands_
The actual priority bands.
Result next()
Get an item from the queue.
unsigned numPending_
How many queue entries are pending.
size_t pending(unsigned prio)
Get the number of pending items in the queue.
bool empty() const
unsigned num_prio() const
unsigned numBands_
How many priority bands we have.
OSMutex * lock()
The caller must acquire this lock before using any of the _locked() functions.
Result next_locked()
Get an item from the queue.
ScheduledQueue(unsigned num_bands, const Fixed16 *strides)
Constructor.
void insert_locked(QMember *item, unsigned prio)
Adds an entry to the queue.
#define HASSERT(x)
Checks that the value of expression x is true, else terminates the current process.
Definition macros.h:138
#define DIE(MSG)
Unconditionally terminates the current process with a message.
Definition macros.h:143
Result of pulling an item from the queue based on priority.
Definition Queue.hxx:59
unsigned index
index of item pulled from queue
Definition Queue.hxx:85
QMember * item
item pulled from queue
Definition Queue.hxx:84
This structure contains information about one priority band.
Fixed16 currentToken_
How many tokens we have right now.
Fixed16 stride_
How many tokens we add each call.
Q queue_
Holds the queue for this priority band.