Open Model Railroad Network (OpenMRN)
Loading...
Searching...
No Matches
IfTcp.cxx
Go to the documentation of this file.
1
35#include "openmrn_features.h"
36
37// Only compile this if we can use select based reads in the
38// executor. TcpHubDeviceSelect is an integral part of this implementation.
39#ifdef OPENMRN_FEATURE_EXECUTOR_SELECT
40
41#include "openlcb/IfTcp.hxx"
42#include "openlcb/IfImpl.hxx"
43#include "openlcb/IfTcpImpl.hxx"
44
45namespace openlcb
46{
47
49{
50 remove_local_node_from_map(node);
51}
52
54{
55 ownedFlows_.emplace_back(e);
56}
57
59{
60 for (auto it = ownedFlows_.begin(); it != ownedFlows_.end(); ++it)
61 {
62 if (it->get() == d)
63 {
64 ownedFlows_.erase(it);
65 return;
66 }
67 }
68 LOG_ERROR("Deleting nonexistent owned flow.");
69}
70
72{
73 if (expected.id && actual.id)
74 {
75 return expected.id == actual.id;
76 }
77 // Cannot reconcile.
78 LOG(VERBOSE, "Cannot reconcile expected and actual NodeHandles for "
79 "equality testing.");
80 return false;
81}
82
83void IfTcp::add_network_fd(int fd, Notifiable *on_error)
84{
85 // What are the cases we need to support:
86 //
87 // - the remote server closing the socket. on_error shall be called and the
88 // port destructed.
89 //
90 // - the IfTcp being destructed (presumably not on the main executor). We
91 // need to unregister the port, shutdown the socket, flush the data, then
92 // delete this. The destructor of HubDeviceSelect triggers the barrier
93 // callback inline.
94 //
95 // The contract with HubDeviceSelect is that by the time the on_error
96 // notifiable is called, the HubDeviceSelect has been unregistered,
97 // flushed, and can be deleted, even if we are on the main executor.
98 //
99 struct RemotePort : public Executable
100 {
101 RemotePort(IfTcp *parent, Notifiable *on_error)
102 : parent_(parent)
103 , onError_(on_error)
104 {
105 }
106 ~RemotePort()
107 {
108 // auto* p = port_->write_port();
109 // LOG_ERROR("remoteport::delete %p %p", p, this);
110 deleting_ = true;
111 port_.reset();
112 // LOG_ERROR("remoteport::delete done %p", p);
113 }
114 IfTcp *parent_;
115 std::unique_ptr<TcpHubDeviceSelect> port_;
116 Notifiable *onError_;
117 bool deleting_{false};
118 void notify() override
119 {
120 // auto* p = port_->write_port();
121 // LOG(VERBOSE, "remoteport::notify %d %p %p", deleting_, p, this);
122 if (onError_)
123 {
124 onError_->notify();
125 onError_ = nullptr;
126 }
127 if (!deleting_) // avoids duplicate destruction.
128 {
129 parent_->executor()->add(this);
130 }
131 }
132
133 void run() override
134 {
135 if (deleting_ || !port_)
136 {
137 return;
138 }
139 if (!port_->write_done())
140 {
141 // yield
142 parent_->executor()->add(this);
143 return;
144 }
145 parent_->delete_owned_flow(this);
146 }
147 };
148 RemotePort *p = new RemotePort(this, on_error);
149 p->port_.reset(new TcpHubDeviceSelect(device_, fd, p));
151}
152
156{
157public:
158 LocalMessageFilter(If *iface)
159 : iface_(iface)
160 {
161 }
162
163 void send(Buffer<GenMessage> *b, unsigned prio) override
164 {
165 auto id = b->data()->dst.id;
166 if (id)
167 {
168 auto *dst = iface_->lookup_local_node(id);
169 if (dst)
170 {
171 b->data()->dstNode = dst;
172 }
173 else
174 {
175 b->unref();
176 return;
177 }
178 }
179 iface_->dispatcher()->send(b, prio);
180 }
181
182private:
183 If *iface_;
184};
185
186IfTcp::IfTcp(NodeID gateway_node_id, HubFlow *device, int local_nodes_count)
187 : If(device->service()->executor(), local_nodes_count)
188 , device_(device)
189{
194 auto filter = new LocalMessageFilter(this);
195 add_owned_flow(filter);
196 recvFlow_ = new TcpRecvFlow(filter);
198 sendFlow_ = new TcpSendFlow(this, gateway_node_id, device, recvFlow_, seq_);
203}
204
206{
208 while (!ownedFlows_.empty())
209 {
210 ownedFlows_.resize(ownedFlows_.size() - 1);
211 }
212}
213
218
219} // namespace openlcb
220
221#endif // OPENMRN_FEATURE_EXECUTOR_SELECT
Base class for all QMember types that hold data in an expandable format.
Definition Buffer.hxx:195
void unref()
Decrement count.
Definition Buffer.hxx:675
T * data()
get a pointer to the start of the data.
Definition Buffer.hxx:215
Base class of everything with a virtual destructor.
An object that can be scheduled on an executor to run.
virtual void add(Executable *action, unsigned priority=UINT_MAX)=0
Send a message to this Executor's queue.
void register_port(port_type *port)
Adds a new port.
Definition Hub.hxx:167
void unregister_port(port_type *port)
Removes a previously added port.
Definition Hub.hxx:174
HubPort that connects a select-aware device to a strongly typed Hub.
An object that can schedule itself on an executor to run.
virtual void notify()=0
Generic callback.
ExecutorBase * executor()
void send(MessageType *msg, unsigned priority=UINT_MAX) OVERRIDE
Sends a message to the state flow for processing.
Implementation of sequence number generator that uses the real clock.
Network interface class for a character stream link that speaks the (point-to-point) TcpTransfer prot...
Definition IfTcp.hxx:67
TcpSendFlow * sendFlow_
Flow used for converting GenMessage into the binary representation.
Definition IfTcp.hxx:128
~IfTcp()
Destructor.
Definition IfTcp.cxx:205
bool matching_node(NodeHandle expected, NodeHandle actual) override
Definition IfTcp.cxx:71
void delete_owned_flow(Destructable *d)
Finds a given pointer in the owned flows list, deletes it and removes it from the list.
Definition IfTcp.cxx:58
void delete_local_node(Node *node) override
Removes a local node from this interface.
Definition IfTcp.cxx:48
void add_owned_flow(Executable *e) override
Transfers ownership of a module to the interface.
Definition IfTcp.cxx:53
void add_network_fd(int fd, Notifiable *on_error=nullptr)
Adds a network client connection to the device.
Definition IfTcp.cxx:83
HubFlow * device_
Where to send traffic to.
Definition IfTcp.hxx:121
ClockBaseSequenceNumberGenerator * seq_
Sequence number generator for outgoing TCP packets.
Definition IfTcp.hxx:125
std::vector< std::unique_ptr< Destructable > > ownedFlows_
Various implementation control flows that this interface owns.
Definition IfTcp.hxx:123
NodeID get_default_node_id() override
Definition IfTcp.cxx:214
TcpRecvFlow * recvFlow_
Flow for parsing incoming messages. Owned by ownedFlows_.
Definition IfTcp.hxx:130
IfTcp(NodeID gateway_node_id, HubFlow *device, int local_nodes_count)
Creates a TCP interface.
Definition IfTcp.cxx:186
Abstract class representing an OpenLCB Interface.
Definition If.hxx:185
MessageDispatchFlow * dispatcher()
Definition If.hxx:224
MessageHandler * addressedWriteFlow_
Allocator containing the addressed write flows.
Definition If.hxx:371
Node * lookup_local_node(NodeID id)
Looks up a node ID in the local nodes' registry.
Definition If.hxx:263
MessageHandler * globalWriteFlow_
Allocator containing the global write flows.
Definition If.hxx:369
Component that drops incoming addressed messages that are not destined for a local node.
Definition IfTcp.cxx:156
Base class for NMRAnet nodes conforming to the asynchronous interface.
Definition Node.hxx:52
Simple stateless translator for incoming TCP messages from binary format into the structured format.
This flow renders outgoing OpenLCB messages to their TCP stream representation.
NodeID get_gateway_node_id()
Message handler that is registered as a fallback handler in the interface's message dispatcher.
Definition IfImpl.hxx:280
This handler handles VerifyNodeId messages (both addressed and global) on the interface level.
Definition IfImpl.hxx:140
#define LOG(level, message...)
Conditionally write a message to the logging output.
Definition logging.h:99
static const int VERBOSE
Loglevel that is usually not printed, reporting debugging information.
Definition logging.h:59
#define LOG_ERROR(message...)
Shorthand for LOG(LEVEL_ERROR, message...). See LOG.
Definition logging.h:124
uint64_t NodeID
48-bit NMRAnet Node ID type
Container of both a NodeID and NodeAlias.
NodeID id
48-bit NMRAnet Node ID