Open Model Railroad Network (OpenMRN)
Loading...
Searching...
No Matches
SocketClient.hxx
Go to the documentation of this file.
1
35#ifndef _UTILS_SOCKET_CLIENT_HXX_
36#define _UTILS_SOCKET_CLIENT_HXX_
37
38#include <functional>
39#include <netdb.h>
40#ifndef ESP_PLATFORM
41// this doesn't exist on the ESP32 with LWiP
42#include <arpa/inet.h>
43#endif
44#include <fcntl.h>
45#include <ifaddrs.h>
46#include <array>
47
49#include "executor/Timer.hxx"
50#include "os/MDNS.hxx"
51#include "os/sleep.h"
52#include "utils/Atomic.hxx"
55
56class SocketClient : public StateFlowBase, private Atomic
57{
58public:
81 ExecutorBase *mdns_executor, std::unique_ptr<SocketClientParams> params,
82 std::function<void(int, Notifiable *)> connect_callback)
84 , callback_(connect_callback)
85 , connectExecutor_(connect_executor)
86 , mdnsExecutor_(mdns_executor)
88 , mdnsPending_(false)
89 , mdnsJoin_(false)
90 , sleeping_(false)
91 , requestShutdown_(false)
92 , isConnected_(false)
93 , fd_(-1)
94 {
95 reset_params(std::move(params));
97 }
98
102 {
103 shutdown();
104 }
105
108 {
109 void operator()(struct addrinfo *s)
110 {
111 if (s)
112 {
113 freeaddrinfo(s);
114 }
115 }
116 };
117
119 typedef std::unique_ptr<struct addrinfo, AddrInfoDeleter> AddrinfoPtr;
120
124 enum class Attempt : uint8_t
125 {
127 RECONNECT,
138 };
139
143 void reset_params(std::unique_ptr<SocketClientParams> params)
144 {
145 params_ = std::move(params);
148 }
149
152 void shutdown()
153 {
154 LOG(VERBOSE,
155 "socketclient: sync shutdown isconn %d slp %d rqshut %d mdnsP %d"
156 "mdnsJ %d isShut %d",
158 is_shutdown());
160 while (true)
161 {
162 AtomicHolder h(this);
163 if (!mdnsPending_)
164 {
165 break;
166 }
167 }
168 while (!is_terminated())
169 {
170 microsleep(1000);
171 }
172 }
173
176 {
177 AtomicHolder h(this);
178 return isConnected_;
179 }
180
183 {
184 return is_terminated();
185 }
186
190 {
191 LOG(VERBOSE,
192 "socketclient: start shutdown isconn %d slp %d rqshut %d mdnsP %d"
193 "mdnsJ %d isShut %d",
195 is_shutdown());
196 {
197 AtomicHolder h(this);
199 {
200 return;
201 }
202 requestShutdown_ = true;
203 if (sleeping_)
204 {
205 sleeping_ = false;
207 }
208 if (isConnected_)
209 {
210 isConnected_ = false;
211 notify();
212 }
213 }
214 // NOTE: It would be nice to abort any pending asynchronous tasks we
215 // have such as a connect or a getaddrinfo call running on the
216 // secondary executors. However, there is no API to do that right now.
217 }
218
227 static int connect(const char *host, int port)
228 {
229 return connect(host, integer_to_string(port).c_str());
230 }
231
240 static int connect(const char *host, const char* port_str);
241
251 static int connect(struct addrinfo *addr);
252
258 static bool address_to_string(
259 struct addrinfo *addr, string *host, int *port);
260
265 static AddrinfoPtr string_to_address(const char *host, int port)
266 {
267 return string_to_address(host, integer_to_string(port).c_str());
268 }
269
276 const char *host, const char *port_str);
277
278private:
281 {
282 unsigned ofs = 0;
283 auto search = params_->search_mode();
284 // If we only have one extra thread, we initiate mdns only at the time
285 // we are trying to connect to it. If we have two, we start the lookup
286 // at the beginning.
287 bool mdns_ahead = (connectExecutor_ != mdnsExecutor_);
288 if (mdns_ahead && search != SocketClientParams::MANUAL_ONLY)
289 {
291 }
292 if (params_->enable_last())
293 {
295 }
296 switch (search)
297 {
299 if (!mdns_ahead)
300 {
302 }
305 break;
308 if (!mdns_ahead)
309 {
311 }
313 break;
316 break;
318 if (!mdns_ahead)
319 {
321 }
323 break;
324 }
325 if (params_->one_shot())
326 {
328 }
329 else
330 {
332 }
333 HASSERT(ofs <= strategyConfig_.size());
334 }
335
338 {
341 {
342 AtomicHolder h(this);
343 strategyOffset_ = 0;
344 mdnsPending_ = false;
345 mdnsJoin_ = false;
346 isConnected_ = false;
347 mdnsAddr_.reset();
348 }
350 }
351
354 {
356 if (strategyOffset_ < strategyConfig_.size())
357 {
358 AtomicHolder h(this);
360 {
361 return exit();
362 }
364 }
365 switch (a)
366 {
367 default:
368 DIE("Unexpected action");
370 return wait_retry();
372 return failed_oneshot();
375 params_->last_host_name(), params_->last_port());
377 return start_mdns();
379 return wait_and_connect_mdns();
382 params_->manual_host_name(), params_->manual_port());
383 }
384 }
385
397 SocketClientParams::LogMessage log, string host, int port)
398 {
399 if (port <= 0 || host.empty())
400 {
402 }
403 string v = host;
404 v += ':';
405 v += integer_to_string(port);
406 params_->log_message(log, v);
407 fd_ = -1;
408 n_.reset(this);
410 [this, host, port]() { connect_blocking(host, port); }));
412 }
413
417 void connect_blocking(const string &host, int port)
418 {
419 AutoNotify an(&n_);
420 auto addr = SocketClient::string_to_address(host.c_str(), port);
421 if (params_->disallow_local() && local_test(addr.get()))
422 {
424 return;
425 }
426 fd_ = SocketClient::connect(addr.get());
427 if (fd_ >= 0)
428 {
429 params_->set_last(host.c_str(), port);
430 }
431 }
432
435 {
436 if (fd_ < 0)
437 {
438 // reconnect failed
439 return next_step();
440 }
441 else
442 {
443 // we have a connection.
444 return connected();
445 }
446 }
447
450 {
451 {
452 AtomicHolder h(this);
454 {
455 ::close(fd_);
456 fd_ = -1;
457 return exit();
458 }
459 isConnected_ = true;
460 }
461 callback_(fd_, this);
463 }
464
468 string to_string(const char *p)
469 {
470 if (!p || !*p)
471 {
472 return string();
473 }
474 return p;
475 }
476
480 {
482 mdnsAddr_.reset();
483 string srv = params_->mdns_service_name();
484 string host = params_->mdns_host_name();
485 if (!srv.empty())
486 {
487 {
488 AtomicHolder h(this);
489 mdnsPending_ = true;
490 }
492 [this, host, srv]() { mdns_lookup(host, srv); }));
493 }
495 }
496
501 void mdns_lookup(string mdns_hostname, string mdns_service)
502 {
503 int ai_ret = -1;
504 struct addrinfo hints;
505
506 memset(&hints, 0, sizeof(hints));
507 hints.ai_family = AF_INET;
508 hints.ai_socktype = SOCK_STREAM;
509 hints.ai_flags = 0;
510 hints.ai_protocol = IPPROTO_TCP;
511
512 struct addrinfo *addr = nullptr;
513 params_->log_message(SocketClientParams::MDNS_SEARCH, mdns_service);
514 ai_ret = MDNS::lookup(mdns_service.c_str(), &hints, &addr);
515 mdnsAddr_.reset(addr); // will take care of freeing it.
516 if (ai_ret != 0 || addr == nullptr)
517 {
519 // LOG(INFO, "mdns lookup for %s failed.", mdns_service.c_str());
520 }
521 else
522 {
524 }
525 {
526 AtomicHolder h(this);
527 mdnsPending_ = false;
528 if (mdnsJoin_)
529 {
530 // Flow is waiting for mdns result.
531 mdnsJoin_ = false;
532 notify();
533 }
534 }
535 }
536
541 {
542 {
543 AtomicHolder h(this);
544 if (mdnsPending_)
545 {
546 mdnsJoin_ = true;
548 }
549 }
551 }
552
557 {
558 string host;
559 int port = -1;
560 if (!mdnsAddr_.get() ||
561 !SocketClient::address_to_string(mdnsAddr_.get(), &host, &port))
562 {
563 // no address to connect to.
565 }
567 SocketClientParams::CONNECT_MDNS, std::move(host), port);
568 }
569
573 {
575 return exit();
576 }
581 {
582 {
583 AtomicHolder h(this);
585 {
586 return exit();
587 }
588 sleeping_ = true;
589 }
590 long long end_time = startTime_ + SEC_TO_NSEC(params_->retry_seconds());
591 timer_.start_absolute(end_time);
592 return wait_and_call(STATE(sleep_done));
593 }
594
595 Action sleep_done()
596 {
597 {
598 AtomicHolder h(this);
599 sleeping_ = false;
600 }
602 }
603
608 static bool local_test(struct addrinfo *addr);
609
611 long long startTime_;
614
616 std::unique_ptr<SocketClientParams> params_;
617
619 std::function<void(int, Notifiable *)> callback_ = nullptr;
620
626
628 std::array<Attempt, 5> strategyConfig_{{
630 }};
633 uint8_t strategyOffset_ : 3;
634
637 uint8_t mdnsPending_ : 1;
638
641 uint8_t mdnsJoin_ : 1;
642
644 uint8_t sleeping_ : 1;
645
647 uint8_t requestShutdown_ : 1;
648
651 uint8_t isConnected_ : 1;
652
655
657
659 int fd_;
660
662};
663
664#endif /* _UTILS_SOCKET_CLIENT_HXX */
665
#define STATE(_fn)
Turns a function name into an argument to be supplied to functions expecting a state.
Definition StateFlow.hxx:61
See OSMutexLock in os/OS.hxx.
Definition Atomic.hxx:153
Lightweight locking class for protecting small critical sections.
Definition Atomic.hxx:130
This class sends a notification in its destructor.
A BarrierNotifiable allows to create a number of child Notifiable and wait for all of them to finish.
BarrierNotifiable * reset(Notifiable *done)
Resets the barrier. Returns &*this. Asserts that is_done().
A notifiable class that calls a particular function object once when it is invoked,...
This class implements an execution of tasks pulled off an input queue.
Definition Executor.hxx:64
virtual void add(Executable *action, unsigned priority=UINT_MAX)=0
Send a message to this Executor's queue.
static int lookup(const char *service, struct addrinfo *hints, struct addrinfo **addr)
Lookup an mDNS name.
Definition MDNS.cxx:149
An object that can schedule itself on an executor to run.
Collection of related state machines that pend on incoming messages.
@ AUTO_MANUAL
Try mDNS first, then (if it failed) try manual address.
@ MANUAL_ONLY
Try only manual address, never try mDNS lookups.
@ MANUAL_AUTO
Try manual address first, then (if it failed) try mDNS.
@ AUTO_ONLY
Try only mDNS, ignore manual address.
LogMessage
Enum for sending connection status updates to the caller.
@ MDNS_NOT_FOUND
mDNS lookup failed.
@ CONNECT_FAILED_SELF
Connection dropped because target is localhost.
@ MDNS_FOUND
mDNS lookup suceeded.
@ CONNECT_MDNS
Connecting to mDNS target. Arg is host:port.
@ CONNECT_MANUAL
Connecting to manual target. Arg is hostname:port.
@ MDNS_SEARCH
Starting mDNS lookup. Argument: mdns [hostname.]service_name.
@ CONNECT_FAILED_ONESHOT
Attempt to search & connect failed, given up. No arg.
@ CONNECT_RE
Attempting to reconnect. Arg is host:port.
ExecutorBase * mdnsExecutor_
Executor for synchronous (blocking) mDNS lookup calls.
uint8_t requestShutdown_
true if an external agent requested the flow to exit.
static bool local_test(struct addrinfo *addr)
Test if a given address is local.
void prepare_strategy()
Parses the params_ configuration and fills in strategyConfig_.
Attempt
This enum represents individual states of this state flow that we can branch to.
@ FAILED_EXIT
Failed and do not start again (for one-shot mode).
@ CONNECT_MDNS
Connect to mDNS lookup result.
@ RECONNECT
Connect to the reconnect slot.
@ CONNECT_STATIC
Connect to static target.
@ INITIATE_MDNS
Start mDNS lookup.
@ WAIT_RETRY
Attempt complete. Start again.
Action connected()
State that gets called when we have a completed connection in fd_.
ExecutorBase * connectExecutor_
Executor for synchronous (blocking) connect calls. Externally owned.
Action connect_mdns()
Takes the address from the mdns lookup and connects to it if it is valid.
long long startTime_
When the last connection attempt was started.
static AddrinfoPtr string_to_address(const char *host, const char *port_str)
Converts a hostname string (or null) and port number (or service name) to a struct addrinfo.
void reset_params(std::unique_ptr< SocketClientParams > params)
Updates the parameter structure for this socket client.
uint8_t strategyOffset_
What is the next step in the strategy.
Action start_connection()
Main entry point of the connection process.
std::function< void(int, Notifiable *)> callback_
callback to call on connection success
uint8_t mdnsJoin_
true if the main flow is waiting for the mdns lookup to complete.
SocketClient(Service *service, ExecutorBase *connect_executor, ExecutorBase *mdns_executor, std::unique_ptr< SocketClientParams > params, std::function< void(int, Notifiable *)> connect_callback)
Constructor.
void shutdown()
Shutdown the client so that it can be deleted.
static bool address_to_string(struct addrinfo *addr, string *host, int *port)
Converts a struct addrinfo to a dotted-decimal notation IP address.
uint8_t isConnected_
true if we are connected and waiting for a client notification to restart.
Action try_schedule_connect(SocketClientParams::LogMessage log, string host, int port)
Helper function to schedule asynchronous connect on a separate executor.
static int connect(const char *host, int port)
Connects a tcp socket to the specified remote host:port.
Action failed_oneshot()
Last state in the connection sequence, when everything failed, but the caller wanted one shot only.
AddrinfoPtr mdnsAddr_
Holds the results of the mdns lookup. null if failed (or never ran).
std::unique_ptr< struct addrinfo, AddrInfoDeleter > AddrinfoPtr
Custom unique pointer that knows how to delete a struct addrinfo.
std::array< Attempt, 5 > strategyConfig_
Stores the sequence of operations we need to try.
uint8_t mdnsPending_
true if there is a pending mdns lookup operation.
Action wait_and_connect_mdns()
Blocks the flow until mdns lookup is complete, then connects to the resulting address.
void mdns_lookup(string mdns_hostname, string mdns_service)
Synchronous function that runs on the mdns executor.
void start_shutdown()
Request that this client shutdown and exit the other thread.
std::unique_ptr< SocketClientParams > params_
Stores the parameter structure.
Action start_mdns()
State that initiates the mdns lookup asynchronously.
Action next_step()
Execute the next step of the strategy.
int fd_
socket descriptor
StateFlowTimer timer_
Helper for sleeping.
Action connect_complete()
State that gets invoked once the reconnect attempt is complete.
Action wait_retry()
Last state in the connection sequence, when everything failed: sleeps until the timeout specified in ...
void connect_blocking(const string &host, int port)
Called on the connect executor.
static AddrinfoPtr string_to_address(const char *host, int port)
Converts a hostname string and port number to a struct addrinfo.
uint8_t sleeping_
true while we are waiting for the timer.
string to_string(const char *p)
Turns a parameter to a string.
static int connect(const char *host, const char *port_str)
Connects a tcp socket to the specified remote host:port.
~SocketClient()
Destructor.
static int connect(struct addrinfo *addr)
Connects a tcp socket to the specified remote address.
Return type for a state flow callback.
Use this timer class to deliver the timeout notification to a stateflow.
Base class for state machines.
Service * service()
Return a pointer to the service I am bound to.
bool is_terminated()
void notify() override
Wakeup call arrived.
Definition StateFlow.cxx:97
Action exit()
Terminate current StateFlow activity.
void start_flow(Callback c)
Resets the flow to the specified state and starts it.
Action call_immediately(Callback c)
Imediately call the next state upon return.
Action wait_and_call(Callback c)
Wait for resource to become available before proceeding to next state.
void start_absolute(long long expiry_time_nsec)
Starts the timer with an absolute deadline.
Definition Timer.hxx:199
void ensure_triggered()
Triggers the timer if it is not expired yet.
Definition Timer.hxx:249
#define IPPROTO_TCP
TCP Raw Socket.
Definition in.h:70
#define LOG(level, message...)
Conditionally write a message to the logging output.
Definition logging.h:99
static const int VERBOSE
Loglevel that is usually not printed, reporting debugging information.
Definition logging.h:59
#define HASSERT(x)
Checks that the value of expression x is true, else terminates the current process.
Definition macros.h:138
#define DIE(MSG)
Unconditionally terminates the current process with a message.
Definition macros.h:143
#define DISALLOW_COPY_AND_ASSIGN(TypeName)
Removes default copy-constructor and assignment added by C++.
Definition macros.h:171
void freeaddrinfo(struct addrinfo *ai)
see 'man freeaddrinfo'
long long os_get_time_monotonic(void)
Get the monotonic time since the system started.
Definition os.c:571
#define SEC_TO_NSEC(_sec)
Convert a second value to a nanosecond value.
Definition os.h:286
static void microsleep(uint32_t microseconds)
Sleep a given number of microseconds.
#define SOCK_STREAM
TCP Socket.
Definition socket.h:45
#define AF_INET
IPv4 Socket (UDP, TCP, etc...)
Definition socket.h:54
Helper structure for creating a unique_ptr for struct addrinfo pointers.
Structure to contain information about address of a service provider.
Definition netdb.h:48
int ai_socktype
Socket type.
Definition netdb.h:51
int ai_protocol
Protocol for socket.
Definition netdb.h:52
int ai_flags
Input flags.
Definition netdb.h:49
int ai_family
Protocol family for socket.
Definition netdb.h:50