Open Model Railroad Network (OpenMRN)
Loading...
Searching...
No Matches
EventService.cxx
1//#define LOGLEVEL VERBOSE
2#include "utils/logging.h"
3
4#include <algorithm>
5#include <vector>
6#include <endian.h>
7
9
14#include "openlcb/Defs.hxx"
16
17namespace openlcb
18{
19
20/*static*/
21EventService *EventService::instance = nullptr;
22
24{
25 HASSERT(instance == nullptr);
26 instance = this;
27 impl_.reset(new Impl(this));
28}
29
30EventService::EventService(If *iface) : Service(iface->executor())
31{
32 HASSERT(instance == nullptr);
33 instance = this;
34 impl_.reset(new Impl(this));
35 register_interface(iface);
36}
37
38EventService::~EventService()
39{
40 HASSERT(instance == this);
41 instance = nullptr;
42}
43
45{
46 impl()->ownedFlows_.emplace_back(new InlineEventIteratorFlow(
47 iface, this, EventService::Impl::MTI_VALUE_EVENT,
48 EventService::Impl::MTI_MASK_EVENT));
49 impl()->ownedFlows_.emplace_back(new EventIteratorFlow(
50 iface, this, EventService::Impl::MTI_VALUE_GLOBAL,
51 EventService::Impl::MTI_MASK_GLOBAL));
52 impl()->ownedFlows_.emplace_back(new EventIteratorFlow(
53 iface, this, EventService::Impl::MTI_VALUE_ADDRESSED_ALL,
54 EventService::Impl::MTI_MASK_ADDRESSED_ALL));
55}
56
57EventService::Impl::Impl(EventService *service) : callerFlow_(service)
58{
59#ifdef TARGET_LPC11Cxx
60 registry.reset(new VectorEventHandlers());
61#else
62 registry.reset(new TreeEventHandlers());
63#endif
64}
65
66EventService::Impl::~Impl()
67{
68}
69
71{
72 EventHandlerCall *c = message()->data();
73 if (c->epoch != EventRegistry::instance()->get_epoch())
74 {
75 // Event registry was invalidated since this call was scheduled. Ignore.
76 return call_immediately(STATE(call_done));
77 }
78 n_.reset(this);
79 (c->registry_entry->handler->*(c->fn))(*c->registry_entry, c->rep, &n_);
80 return wait_and_call(STATE(call_done));
81}
82
83StateFlowBase::Action EventCallerFlow::call_done()
84{
85 return release_and_exit();
86}
87
88EventIteratorFlow::EventIteratorFlow(If *async_if, EventService *event_service,
89 unsigned mti_value, unsigned mti_mask)
90 : IncomingMessageStateFlow(async_if)
91 , eventService_(event_service)
92 , iterator_(event_service->impl()->registry->create_iterator())
93#ifdef DEBUG_EVENT_PERFORMANCE
94 , mtiValue_(mti_value)
95#endif
96{
97 iface()->dispatcher()->register_handler(this, mti_value, mti_mask);
98}
99
100EventIteratorFlow::~EventIteratorFlow()
101{
102 iface()->dispatcher()->unregister_handler_all(this);
103 delete iterator_;
104}
105
108{
109 for (auto &f : impl()->ownedFlows_)
110 {
111 if (!f->is_waiting())
112 return true;
113 }
114 return false;
115}
116
117void DecodeRange(EventReport *r)
118{
119 uint64_t e = r->event;
120 if (e & 1)
121 {
122 r->mask = (e ^ (e + 1)) >> 1;
123 }
124 else
125 {
126 r->mask = (e ^ (e - 1)) >> 1;
127 }
128 r->event &= ~r->mask;
129}
130
132{
133 // at this point: we have the mutex.
134 LOG(VERBOSE, "GlobalFlow::HandleEvent");
135#ifdef DEBUG_EVENT_PERFORMANCE
137#endif
139 rep->src_node = nmsg()->src;
140 rep->dst_node = nmsg()->dstNode;
142 {
143 if (nmsg()->payload.size() != 8)
144 {
145 LOG(INFO, "Invalid input event message, payload length %d",
146 (unsigned)nmsg()->payload.size());
147 return release_and_exit();
148 }
149 rep->event = NetworkToEventID(nmsg()->payload.data());
150 rep->mask = 0;
151 }
152 else
153 {
154 // Message without event payload.
155 rep->event = 0;
157 rep->mask = 0xFFFFFFFFFFFFFFFFULL;
158 }
159
160 switch (nmsg()->mti)
161 {
162 case Defs::MTI_EVENT_REPORT:
164 break;
167 break;
169 DecodeRange(rep);
171 break;
173 rep->state = EventState::UNKNOWN;
175 break;
177 rep->state = EventState::VALID;
179 break;
181 rep->state = EventState::INVALID;
183 break;
185 rep->state = EventState::RESERVED;
187 break;
190 break;
192 DecodeRange(rep);
194 break;
196 rep->state = EventState::UNKNOWN;
198 break;
200 rep->state = EventState::VALID;
202 break;
204 rep->state = EventState::INVALID;
206 break;
208 rep->state = EventState::RESERVED;
210 break;
212 if (!rep->dst_node)
213 {
214 LOG(INFO, "Invalid addressed identify all message, destination "
215 "node not found");
216 return release_and_exit();
217 }
218 // fall through
221 // Reduces the priority so that we let the priority 3 event messages
222 // be processed before the global identify events makes any
223 // progress.
224 set_priority(4);
225 break;
226 default:
227 DIE("Unexpected message arrived at the global event handler.");
228 } // case
229 // The incoming message is not needed anymore.
230 incomingDone_ = message()->new_child();
231 release();
232
233 eventRegistryEpoch_ = eventService_->impl()->registry->get_epoch();
235 return yield_and_call(STATE(iterate_next));
236}
237
238StateFlowBase::Action EventIteratorFlow::iterate_next()
239{
240 if (eventRegistryEpoch_ != eventService_->impl()->registry->get_epoch())
241 {
242 // Iterators are invalidated. We need to start over. This may cause
243 // duplicate delivery of the same events.
245 eventRegistryEpoch_ = eventService_->impl()->registry->get_epoch();
247 }
248
249 EventRegistryEntry *entry = iterator_->next_entry();
250 if (!entry)
251 {
252 if (incomingDone_)
253 {
255 incomingDone_ = nullptr;
256 }
257
258#ifdef DEBUG_EVENT_PERFORMANCE
260 numProcessNsec_ += len;
261 countEvents_++;
262 if (countEvents_ >= REPORT_COUNT)
263 {
264 //long msec = numProcessNsec_ / 1000000;
265 //printf("event perf for mti %04x: %ld msec for %d events\n",
266 // mtiValue_, msec, REPORT_COUNT);
267 countEvents_ = 0;
268 numProcessNsec_ = 0;
269 }
270
271#endif
272
273 return exit();
274 }
275 return dispatch_event(entry);
276}
277
278StateFlowBase::Action EventIteratorFlow::dispatch_event(const EventRegistryEntry *entry)
279{
281 /* This could be made an asynchronous allocation. Then the pool could be
282 * made fixed size. */
283 eventService_->impl()->callerFlow_.pool()->alloc(&b, nullptr);
284 HASSERT(b);
285 b->data()->reset(entry, eventRegistryEpoch_, &eventReport_, fn_);
286 n_.reset(this);
287 b->set_done(&n_);
288 eventService_->impl()->callerFlow_.send(b, priority());
289 return wait();
290}
291
293InlineEventIteratorFlow::dispatch_event(const EventRegistryEntry *entry)
294{
296 if (eventRegistryEpoch_ != eventService_->impl()->registry->get_epoch())
297 {
298 // Will restart iteration.
299 return call_immediately(STATE(iterate_next));
300 }
301 n_.reset(this);
302 // It is required to hold on to a child to call abort_if_almost_done.
303 auto *c = n_.new_child();
305 if (n_.abort_if_almost_done())
306 {
307 // Aborted. Event handler did not do any asynchronous action.
308 return call_immediately(STATE(iterate_next));
309 }
310 else
311 {
312 c->notify();
313 return wait_and_call(STATE(iterate_next));
314 }
315}
316
317} /* namespace openlcb */
#define STATE(_fn)
Turns a function name into an argument to be supplied to functions expecting a state.
Definition StateFlow.hxx:61
BarrierNotifiable * reset(Notifiable *done)
Resets the barrier. Returns &*this. Asserts that is_done().
BarrierNotifiable * new_child()
Call this for each child task.
bool abort_if_almost_done()
Checks if there is exactly one outstanding notification left in the barrier.
void set_done(BarrierNotifiable *done)
Specifies that a given BarrierNotifiable must be called when the Buffer is deallocated (unreffed to z...
Definition Buffer.hxx:97
Base class for all QMember types that hold data in an expandable format.
Definition Buffer.hxx:195
T * data()
get a pointer to the start of the data.
Definition Buffer.hxx:215
void unregister_handler_all(HandlerType *handler)
Removes all instances of a handler from this dispatcher.
This class implements an execution of tasks pulled off an input queue.
Definition Executor.hxx:64
virtual Pool * pool()
virtual void notify()=0
Generic callback.
void alloc(Buffer< BufferType > **result, Executable *flow=NULL)
Get a free item out of the pool.
Definition Buffer.hxx:292
Collection of related state machines that pend on incoming messages.
static EventRegistry * instance()
Definition Singleton.hxx:77
Return type for a state flow callback.
void release() OVERRIDE
Unrefs the current buffer.
void send(MessageType *msg, unsigned priority=UINT_MAX) OVERRIDE
Sends a message to the state flow for processing.
MessageType * message()
virtual Action entry() OVERRIDE
Entry into the StateFlow activity.
virtual void handle_identify_global(const EventRegistryEntry &registry_entry, EventReport *event, BarrierNotifiable *done)=0
Called on the need of sending out identification messages.
virtual void handle_consumer_range_identified(const EventRegistryEntry &registry_entry, EventReport *event, BarrierNotifiable *done)
Called on another node sending ConsumerRangeIdentified.
virtual void handle_producer_range_identified(const EventRegistryEntry &registry_entry, EventReport *event, BarrierNotifiable *done)
Called on another node sending ProducerRangeIdentified for this event.
virtual void handle_event_report(const EventRegistryEntry &registry_entry, EventReport *event, BarrierNotifiable *done)=0
Called on incoming EventReport messages.
virtual void handle_identify_consumer(const EventRegistryEntry &registry_entry, EventReport *event, BarrierNotifiable *done)=0
Called on another node sending IdentifyConsumer.
virtual void handle_producer_identified(const EventRegistryEntry &registry_entry, EventReport *event, BarrierNotifiable *done)
Called on another node sending ProducerIdentified for this event.
virtual void handle_consumer_identified(const EventRegistryEntry &registry_entry, EventReport *event, BarrierNotifiable *done)
Called on another node sending ConsumerIdentified for this event.
virtual void handle_identify_producer(const EventRegistryEntry &registry_entry, EventReport *event, BarrierNotifiable *done)=0
Called on another node sending IdentifyProducer.
Flow to receive incoming messages of event protocol, and dispatch them to the registered event handle...
unsigned eventRegistryEpoch_
The epoch of the event registry at the start of the iteration.
uint8_t countEvents_
How many events' cost are accumulated so far.
long long numProcessNsec_
Accumulator of how many msec processingthe events took.
EventIterator * iterator_
Iterator for generating the event handlers from the registry.
Action entry() OVERRIDE
Entry into the StateFlow activity.
long long currentProcessStart_
When the processing of the current event started.
Notifiable * incomingDone_
This done notifiable holds a reference to the incoming message buffer.
EventReport eventReport_
Statically allocated structure for calling the event handlers from the main event queue.
virtual EventRegistryEntry * next_entry()=0
Steps the iteration.
virtual void clear_iteration()=0
Stops iteration and resets iteration variables.
virtual void init_iteration(EventReport *event)=0
Starts the iteration.
EventHandler * handler
Pointer to the handler.
PImpl class for the EventService.
std::unique_ptr< EventRegistry > registry
The implementation of the event registry.
std::vector< std::unique_ptr< StateFlowWithQueue > > ownedFlows_
Flows that we own.
EventCallerFlow callerFlow_
This flow will serialize calls to NMRAnetEventHandler objects.
Global Event Service.
bool event_processing_pending()
Returns true if there are outstanding events that are not yet handled.
EventService(ExecutorBase *e)
Creates a global event service with no interfaces registered.
void register_interface(If *iface)
Registers this global event handler with an interface.
Abstract class representing an OpenLCB Interface.
Definition If.hxx:185
MessageDispatchFlow * dispatcher()
Definition If.hxx:224
GenMessage * nmsg()
Returns the NMRAnet message we received.
Definition If.hxx:413
Flow to receive incoming messages of event protocol, and dispatch them to the registered event handle...
const EventRegistryEntry * currentEntry_
The handler we need to call.
EventRegistry implementation that keeps event handlers in a SortedListMap and filters the event handl...
EventRegistry implementation that keeps all event handlers in a vector and forwards every single call...
#define LOG(level, message...)
Conditionally write a message to the logging output.
Definition logging.h:99
static const int VERBOSE
Loglevel that is usually not printed, reporting debugging information.
Definition logging.h:59
static const int INFO
Loglevel that is printed by default, reporting some status information.
Definition logging.h:57
#define HASSERT(x)
Checks that the value of expression x is true, else terminates the current process.
Definition macros.h:138
#define DIE(MSG)
Unconditionally terminates the current process with a message.
Definition macros.h:143
uint64_t NetworkToEventID(const void *data)
Takes 8 bytes (big-endian) from *data, and returns the event id they represent.
long long os_get_time_monotonic(void)
Get the monotonic time since the system started.
Definition os.c:571
@ MTI_CONSUMER_IDENTIFIED_INVALID
consumer broadcast, invalid state
@ MTI_PRODUCER_IDENTIFY
query about producers
@ MTI_EVENT_MASK
event number present mask
@ MTI_EVENTS_IDENTIFY_GLOBAL
request identify all of every node's events
@ MTI_CONSUMER_IDENTIFIED_RESERVED
reserved for future use
@ MTI_PRODUCER_IDENTIFIED_RANGE
producer broadcast about a range of producers
@ MTI_PRODUCER_IDENTIFIED_RESERVED
reserved for future use
@ MTI_CONSUMER_IDENTIFIED_UNKNOWN
consumer broadcast, validity unknown
@ MTI_PRODUCER_IDENTIFIED_VALID
producer broadcast, valid state
@ MTI_CONSUMER_IDENTIFY
query about consumers
@ MTI_EVENTS_IDENTIFY_ADDRESSED
request identify all of a node's events
@ MTI_PRODUCER_IDENTIFIED_INVALID
producer broadcast, invalid state
@ MTI_PRODUCER_IDENTIFIED_UNKNOWN
producer broadcast, validity unknown
@ MTI_CONSUMER_IDENTIFIED_VALID
consumer broadcast, valid state
@ MTI_CONSUMER_IDENTIFIED_RANGE
consumer broadcast about a range of consumers
Arguments structure for the EventCallerFlow.
Shared notification structure that is assembled for each incoming event-related message,...
EventId mask
Specifies the mask in case the request is for an event range.
EventState state
For producer/consumer identified messages, specifies the state of the producer/consumer as the sender...
NodeHandle src_node
Information about the sender of the incoming event-related OpenLCB message.
EventId event
The event ID from the incoming message.
Node * dst_node
nullptr for global messages; points to the specific virtual node for addressed events identify messag...
Node * dstNode
If the destination node is local, this value is non-NULL.
Definition If.hxx:110
NodeHandle src
Source node.
Definition If.hxx:104