diff --git a/include/opendht/log.h b/include/opendht/log.h index 232d92ab24c634eb3db24b5525cb8f1e7b615f85..82747a941e35fab1ac80f09f08f8a7b601835a2e 100644 --- a/include/opendht/log.h +++ b/include/opendht/log.h @@ -71,13 +71,13 @@ OPENDHT_PUBLIC void printLog(std::ostream &s, char const *m, va_list args); OPENDHT_PUBLIC -std::unique_ptr<Logger> getStdLogger(); +std::shared_ptr<Logger> getStdLogger(); OPENDHT_PUBLIC -std::unique_ptr<Logger> getFileLogger(const std::string &path); +std::shared_ptr<Logger> getFileLogger(const std::string &path); OPENDHT_PUBLIC -std::unique_ptr<Logger> getSyslogLogger(const char* name); +std::shared_ptr<Logger> getSyslogLogger(const char* name); OPENDHT_PUBLIC void enableLogging(dht::DhtRunner &dht); diff --git a/include/opendht/peer_discovery.h b/include/opendht/peer_discovery.h index 4fcdfafa29c4112bd41b887c932399da49a03608..00d91345c0c9a15ef9b6ca9a665ba4ef28cdab8e 100644 --- a/include/opendht/peer_discovery.h +++ b/include/opendht/peer_discovery.h @@ -22,10 +22,13 @@ #include "def.h" #include "sockaddr.h" #include "infohash.h" +#include "log_enable.h" #include <thread> -#include <mutex> -#include <condition_variable> + +namespace asio { +class io_context; +} namespace dht { @@ -35,7 +38,7 @@ public: static constexpr in_port_t DEFAULT_PORT = 8888; using ServiceDiscoveredCallback = std::function<void(msgpack::object&&, SockAddr&&)>; - PeerDiscovery(in_port_t port = DEFAULT_PORT); + PeerDiscovery(in_port_t port = DEFAULT_PORT, std::shared_ptr<asio::io_context> ioContext = {}, std::shared_ptr<Logger> logger = {}); ~PeerDiscovery(); /** @@ -51,7 +54,7 @@ public: } /** - * startPublish - Keeping sending data until node is joinned or stop is called - msgpack + * startPublish - Keeping sending data until node is joinned or stop is called */ void startPublish(const std::string &type, const msgpack::sbuffer &pack_buf); void startPublish(sa_family_t domain, const std::string &type, const msgpack::sbuffer &pack_buf); @@ -79,20 +82,12 @@ public: bool stopPublish(const std::string &type); bool stopPublish(sa_family_t domain, const std::string &type); - /** - * Configure the sockopt to be able to listen multicast group - */ - static void joinMulticast(int sockfd, sa_family_t family); - - /** - * Join the threads - */ - void join(); - private: class DomainPeerDiscovery; std::unique_ptr<DomainPeerDiscovery> peerDiscovery4_; std::unique_ptr<DomainPeerDiscovery> peerDiscovery6_; + std::shared_ptr<asio::io_context> ioContext_; + std::thread ioRunnner_; }; } diff --git a/src/dhtrunner.cpp b/src/dhtrunner.cpp index b7f1041ddb4a0dd25663e26a5f1faf4e3795b0ac..ff0fd48d3025c5270de590ee7aeb9b24f828ed5d 100644 --- a/src/dhtrunner.cpp +++ b/src/dhtrunner.cpp @@ -307,9 +307,6 @@ DhtRunner::join() if (bootstrap_thread.joinable()) bootstrap_thread.join(); - if (peerDiscovery_) - peerDiscovery_->join(); - { std::lock_guard<std::mutex> lck(storage_mtx); pending_ops = decltype(pending_ops)(); diff --git a/src/log.cpp b/src/log.cpp index b63988174a20c203f1619ccbdcf74a260714485d..5f4d266f9a06b52d5773f256d71da9979b46467c 100644 --- a/src/log.cpp +++ b/src/log.cpp @@ -56,9 +56,9 @@ printLog(std::ostream& s, char const *m, va_list args) { s << std::endl; } -std::unique_ptr<Logger> +std::shared_ptr<Logger> getStdLogger() { - return std::unique_ptr<Logger>(new Logger( + return std::make_shared<Logger>( [](char const *m, va_list args) { std::cerr << red; printLog(std::cerr, m, args); @@ -70,22 +70,22 @@ getStdLogger() { std::cout << def; }, [](char const *m, va_list args) { printLog(std::cout, m, args); } - )); + ); } -std::unique_ptr<Logger> +std::shared_ptr<Logger> getFileLogger(const std::string &path) { auto logfile = std::make_shared<std::ofstream>(); logfile->open(path, std::ios::out); - return std::unique_ptr<Logger>(new Logger( + return std::make_shared<Logger>( [=](char const *m, va_list args) { printLog(*logfile, m, args); }, [=](char const *m, va_list args) { printLog(*logfile, m, args); }, [=](char const *m, va_list args) { printLog(*logfile, m, args); } - )); + ); } -std::unique_ptr<Logger> +std::shared_ptr<Logger> getSyslogLogger(const char* name) { #ifndef _WIN32 struct Syslog { @@ -103,13 +103,13 @@ getSyslogLogger(const char* name) { logfile = std::make_shared<Syslog>(name); opened_logfile = logfile; } - return std::unique_ptr<Logger>(new Logger( + return std::make_shared<Logger>( [logfile](char const *m, va_list args) { vsyslog(LOG_ERR, m, args); }, [logfile](char const *m, va_list args) { vsyslog(LOG_WARNING, m, args); }, [logfile](char const *m, va_list args) { vsyslog(LOG_INFO, m, args); } - )); + ); #else - return std::unique_ptr<Logger>(new Logger()); + return std::make_shared<Logger>(); #endif } diff --git a/src/peer_discovery.cpp b/src/peer_discovery.cpp index e19c57a32d2eb3e4bc9f4b9c4d23a19d620b6101..39395286b70c401ab1a8fdad55f785a006d7f74c 100644 --- a/src/peer_discovery.cpp +++ b/src/peer_discovery.cpp @@ -2,6 +2,7 @@ * Copyright (C) 2014-2019 Savoir-faire Linux Inc. * Author(s) : Mingrui Zhang <mingrui.zhang@savoirfairelinux.com> * Vsevolod Ivanov <vsevolod.ivanov@savoirfairelinux.com> + * Adrien Béraud <adrien.beraud@savoirfairelinux.com> * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -21,402 +22,170 @@ #include "network_utils.h" #include "utils.h" -#ifdef _WIN32 -#include <Ws2tcpip.h> // needed for ip_mreq definition for multicast -#include <Windows.h> -#include <cstring> -#if defined(_MSC_VER) -#include <BaseTsd.h> -typedef SSIZE_T ssize_t; -#endif -#define close(x) closesocket(x) -#else -#include <sys/types.h> -#include <unistd.h> -#endif -#include <fcntl.h> - -#ifndef IPV6_JOIN_GROUP -#define IPV6_JOIN_GROUP IPV6_ADD_MEMBERSHIP -#endif +#include <asio/io_context.hpp> +#include <asio/ip/udp.hpp> +#include <asio/ip/multicast.hpp> +#include <asio/steady_timer.hpp> +#include <asio/streambuf.hpp> + +using namespace std::chrono_literals; namespace dht { // Organization-local Scope multicast -constexpr char MULTICAST_ADDRESS_IPV4[12] = "239.192.0.1"; -constexpr char MULTICAST_ADDRESS_IPV6[10] = "ff08::101"; +constexpr char MULTICAST_ADDRESS_IPV4[] = "239.192.0.1"; +constexpr char MULTICAST_ADDRESS_IPV6[] = "ff08::101"; class PeerDiscovery::DomainPeerDiscovery { public: - DomainPeerDiscovery(sa_family_t domain, in_port_t port); + DomainPeerDiscovery(asio::ip::udp domain, in_port_t port, Sp<asio::io_context> ioContext = {}, Sp<Logger> logger = {}); ~DomainPeerDiscovery(); - /** - * startDiscovery - Keep Listening data from the sender until node is joinned or stop is called - */ void startDiscovery(const std::string &type, ServiceDiscoveredCallback callback); - - /** - * startPublish - Keeping sending data until node is joinned or stop is called - msgpack - */ void startPublish(const std::string &type, const msgpack::sbuffer &pack_buf); - /** - * Thread Stopper - */ void stop(); - /** - * Remove possible callBack to discovery - */ bool stopDiscovery(const std::string &type); - - /** - * Remove different serivce message to send - */ bool stopPublish(const std::string &type); - /** - * Configure the sockopt to be able to listen multicast group - */ - static void joinMulticast(int sockfd, sa_family_t family); - - /** - * Join the threads - */ - void join() { - if(listening_.joinable()) listening_.join(); - if(sending_.joinable()) sending_.join(); - } private: + Sp<Logger> logger_; //dmtx_ for callbackmap_ and drunning_ (write) std::mutex dmtx_; //mtx_ for messages_ and lrunning (listen) std::mutex mtx_; - std::condition_variable cv_; - bool lrunning_ {false}; - bool drunning_ {false}; - sa_family_t domain_ {AF_UNSPEC}; - int port_; - int sockFd_ {-1}; - int stopWriteFd_ {-1}; + std::shared_ptr<asio::io_context> ioContext_; + asio::ip::udp::socket sockFd_; + asio::ip::udp::endpoint sockAddrSend_; + asio::steady_timer publishTimer_; - SockAddr sockAddrSend_; - - //Thread export to be joined - std::thread listening_; - std::thread sending_; + std::array<char, 64 * 1024> receiveBuf_; + asio::ip::udp::endpoint receiveFrom_; msgpack::sbuffer sbuf_; std::map<std::string, msgpack::sbuffer> messages_; std::map<std::string, ServiceDiscoveredCallback> callbackmap_; + bool lrunning_ {false}; + bool drunning_ {false}; - /** - * Multicast Socket Initialization, accept IPV4, IPV6 - */ - static int init(sa_family_t domain); - - /** - * Receive messages - */ - std::pair<SockAddr, msgpack::object_handle> recvFrom(); - - /** - * Listener pack thread loop - */ void loopListener(); - - /** - * Listener Parameters Setup - */ - void setupListener(); - - /** - * Sender Parameters Setup - */ - void setupSender(); - /** - * Sender Parameters Setup - */ void reloadMessages(); void stopDiscovery(); void stopPublish(); + void publish(); }; -PeerDiscovery::DomainPeerDiscovery::DomainPeerDiscovery(sa_family_t domain, in_port_t port) - : domain_(domain), port_(port), sockFd_(init(domain)) +PeerDiscovery::DomainPeerDiscovery::DomainPeerDiscovery(asio::ip::udp domain, in_port_t port, Sp<asio::io_context> ioContext, Sp<Logger> logger) + : logger_(logger) + , ioContext_(ioContext) + , sockFd_(*ioContext_, domain) + , sockAddrSend_(asio::ip::address::from_string(domain.family() == AF_INET ? MULTICAST_ADDRESS_IPV4 + : MULTICAST_ADDRESS_IPV6), port) + , publishTimer_(*ioContext) { - joinMulticast(sockFd_, domain); - setupListener(); - setupSender(); + sockFd_.set_option(asio::ip::multicast::join_group(sockAddrSend_.address())); + sockFd_.set_option(asio::ip::udp::socket::reuse_address(true)); + sockFd_.bind({domain, port}); } PeerDiscovery::DomainPeerDiscovery::~DomainPeerDiscovery() { stop(); - join(); - if (sockFd_ != -1) - close(sockFd_); - -#ifdef _WIN32 - WSACleanup(); -#endif -} - -int -PeerDiscovery::DomainPeerDiscovery::init(sa_family_t domain) -{ -#ifdef _WIN32 - WSADATA wsaData; - if (WSAStartup(0x0101, &wsaData)) { - throw std::runtime_error(std::string("Can't initialize Winsock2 ") + strerror(errno)); - } -#endif - - int sockfd = socket(domain, SOCK_DGRAM, 0); - if (sockfd < 0) { - throw std::runtime_error(std::string("Socket Creation Error: ") + strerror(errno)); - } - net::setNonblocking(sockfd); - return sockfd; -} - -void -PeerDiscovery::DomainPeerDiscovery::setupListener() -{ - SockAddr sockAddrListen_; - sockAddrListen_.setFamily(domain_); - sockAddrListen_.setPort(port_); - sockAddrListen_.setAny(); - - unsigned int opt = 1; - if (setsockopt(sockFd_, SOL_SOCKET, SO_REUSEADDR, (char*)&opt, sizeof(opt)) < 0) { - std::cerr << "setsockopt SO_REUSEADDR failed: " << strerror(errno) << std::endl; - } -#ifdef SO_REUSEPORT - if (setsockopt(sockFd_, SOL_SOCKET, SO_REUSEPORT, (char*)&opt, sizeof(opt)) < 0) { - std::cerr << "setsockopt SO_REUSEPORT failed: " << strerror(errno) << std::endl; - } -#endif - - // bind to receive address - if (bind(sockFd_, sockAddrListen_.get(), sockAddrListen_.getLength()) < 0){ - throw std::runtime_error(std::string("Error binding socket: ") + strerror(errno)); - } -} - -void -PeerDiscovery::DomainPeerDiscovery::joinMulticast(int sockfd, sa_family_t family) -{ - switch (family) - { - case AF_INET:{ - ip_mreq config_ipv4; - - //This option can be used to set the interface for sending outbound - //multicast datagrams from the sockets application. - config_ipv4.imr_interface.s_addr = htonl(INADDR_ANY); - if( setsockopt(sockfd, IPPROTO_IP, IP_MULTICAST_IF, (char*)&config_ipv4.imr_interface, sizeof( struct in_addr )) < 0 ) { - throw std::runtime_error(std::string("Bound Network Interface IPv4 Error: ") + strerror(errno)); - } - - //The IP_MULTICAST_TTL socket option allows the application to primarily - //limit the lifetime of the packet in the Internet and prevent it from circulating indefinitely - unsigned char ttl4 = 20; - if( setsockopt(sockfd, IPPROTO_IP, IP_MULTICAST_TTL, (char*)&ttl4, sizeof( ttl4 )) < 0 ) { - throw std::runtime_error(std::string("TTL Sockopt Error: ") + strerror(errno)); - } - - // config the listener to be interested in joining in the multicast group - config_ipv4.imr_multiaddr.s_addr = inet_addr(MULTICAST_ADDRESS_IPV4); - config_ipv4.imr_interface.s_addr = htonl(INADDR_ANY); - if (setsockopt(sockfd, IPPROTO_IP, IP_ADD_MEMBERSHIP, (char*)&config_ipv4, sizeof(config_ipv4)) < 0){ - throw std::runtime_error(std::string(" Member Addition IPv4 Error: ") + strerror(errno)); - } - break; - } - case AF_INET6: { - ipv6_mreq config_ipv6; - - /* unsigned int outif = 0; - if (setsockopt(sockfd, IPPROTO_IPV6, IPV6_MULTICAST_IF, &outif, sizeof(outif)) < 0) { - std::cerr << "Can't assign multicast interface: " << strerror(errno) << std::endl; - } */ - - unsigned int ttl6 = 20; - if( setsockopt(sockfd, IPPROTO_IPV6, IPV6_MULTICAST_HOPS, (char*)&ttl6, sizeof( ttl6 )) < 0 ) { - throw std::runtime_error(std::string("Hop Count Set Error: ") + strerror(errno)); - } - - config_ipv6.ipv6mr_interface = 0; - inet_pton(AF_INET6, MULTICAST_ADDRESS_IPV6, &config_ipv6.ipv6mr_multiaddr); - if (setsockopt(sockfd, IPPROTO_IPV6, IPV6_JOIN_GROUP, (char*)&config_ipv6, sizeof(config_ipv6)) < 0){ - throw std::runtime_error(std::string("Member Addition IPv6 Error: ") + strerror(errno)); - } - break; - } - } + sockFd_.close(); } void PeerDiscovery::DomainPeerDiscovery::startDiscovery(const std::string &type, ServiceDiscoveredCallback callback) { - std::unique_lock<std::mutex> lck(dmtx_); + std::lock_guard<std::mutex> lck(dmtx_); callbackmap_[type] = callback; if (not drunning_) { - if (listening_.joinable()) - listening_.join(); drunning_ = true; - listening_ = std::thread(&DomainPeerDiscovery::loopListener, this); + ioContext_->post(std::bind(&PeerDiscovery::DomainPeerDiscovery::loopListener, this)); } } -std::pair<SockAddr, msgpack::object_handle> -PeerDiscovery::DomainPeerDiscovery::recvFrom() -{ - sockaddr_storage sa; - socklen_t sa_len = sizeof(sa); - - std::array<uint8_t, 64 * 1024> buf; - ssize_t nbytes = recvfrom( - sockFd_, - (char*)buf.data(), buf.size(), - 0, - (sockaddr*)&sa, &sa_len - ); - if (nbytes < 0) - throw std::runtime_error(std::string("Error receiving packet: ") + strerror(errno)); - - return { SockAddr { sa, sa_len }, msgpack::unpack(reinterpret_cast<char*>(buf.data()), nbytes) }; -} - void PeerDiscovery::DomainPeerDiscovery::loopListener() { - int stopfds_pipe[2]; -#ifndef _WIN32 - if (pipe(stopfds_pipe) == -1) - throw std::runtime_error(std::string("Can't open pipe: ") + strerror(errno)); -#else - net::udpPipe(stopfds_pipe); -#endif - int stop_readfd = stopfds_pipe[0]; - stopWriteFd_ = stopfds_pipe[1]; - - while (true) { - { - std::lock_guard<std::mutex> lck(dmtx_); - if (not drunning_) - break; - } - - fd_set readfds; - - FD_ZERO(&readfds); - FD_SET(stop_readfd, &readfds); - FD_SET(sockFd_, &readfds); - - int data_coming = select(std::max(sockFd_, stop_readfd) + 1, &readfds, nullptr, nullptr, nullptr); - - { - std::lock_guard<std::mutex> lck(dmtx_); - if (not drunning_) - break; + std::lock_guard<std::mutex> lck(dmtx_); + if (not drunning_) + return; + sockFd_.async_receive_from(asio::buffer(receiveBuf_), receiveFrom_, [this](const asio::error_code& error, size_t bytes) { + if (error == asio::error::operation_aborted) + return; + if (error) { + if (logger_) + logger_->e("Error receiving message: %s", error.message().c_str()); } - - if (data_coming < 0) { - if(errno != EINTR) { - perror("Select Error"); - std::this_thread::sleep_for( std::chrono::seconds(1) ); - } - } else if (data_coming > 0) { - if (FD_ISSET(stop_readfd, &readfds)) { - std::array<uint8_t, 64 * 1024> buf; - recv(stop_readfd, (char*)buf.data(), buf.size(), 0); - } - - try { - auto rcv = recvFrom(); - msgpack::object obj = rcv.second.get(); - - if (obj.type != msgpack::type::MAP) + try { + auto rcv = msgpack::unpack(receiveBuf_.data(), bytes); + msgpack::object obj = rcv.get(); + + if (obj.type != msgpack::type::MAP) + throw msgpack::type_error{}; + for (unsigned i = 0; i < obj.via.map.size; i++) { + auto& o = obj.via.map.ptr[i]; + if (o.key.type != msgpack::type::STR) continue; - for (unsigned i = 0; i < obj.via.map.size; i++) { - auto& o = obj.via.map.ptr[i]; - if (o.key.type != msgpack::type::STR) - continue; - auto key = o.key.as<std::string>(); - ServiceDiscoveredCallback cb; - { - std::lock_guard<std::mutex> lck(dmtx_); + auto key = o.key.as<std::string>(); + ServiceDiscoveredCallback cb; + { + std::lock_guard<std::mutex> lck(dmtx_); + if (drunning_) { auto callback = callbackmap_.find(key); if (callback != callbackmap_.end()) cb = callback->second; - } - if (cb) - cb(std::move(o.val), std::move(rcv.first)); + } else + return; } - } catch (const std::exception& e) { - std::cerr << "Error receiving packet: " << e.what() << std::endl; + if (cb) + cb(std::move(o.val), SockAddr{ receiveFrom_.data(), (socklen_t)receiveFrom_.size() }); } + } catch (const std::exception& e) { + if (logger_) + logger_->e("Error receiving packet: %s", e.what()); } - } - if (stop_readfd != -1) - close(stop_readfd); - if (stopWriteFd_ != -1) { - close(stopWriteFd_); - stopWriteFd_ = -1; - } + loopListener(); + }); } void -PeerDiscovery::DomainPeerDiscovery::setupSender() +PeerDiscovery::DomainPeerDiscovery::publish() { - // Setup sender address - sockAddrSend_.setFamily(domain_); - sockAddrSend_.setAddress(domain_ == AF_INET ? MULTICAST_ADDRESS_IPV4 : MULTICAST_ADDRESS_IPV6); - sockAddrSend_.setPort(port_); + std::lock_guard<std::mutex> lck(mtx_); + if (lrunning_) { + sockFd_.async_send_to(asio::buffer(sbuf_.data(), sbuf_.size()), sockAddrSend_, [logger=logger_](const asio::error_code& ec, size_t){ + if (ec and ec != asio::error::operation_aborted) { + if (logger) + logger->w("Error sending packet: %s", ec.message().c_str()); + } + }); + publishTimer_.expires_after(3s); + publishTimer_.async_wait(std::bind(&PeerDiscovery::DomainPeerDiscovery::publish, this)); + } } void PeerDiscovery::DomainPeerDiscovery::startPublish(const std::string &type, const msgpack::sbuffer &pack_buf) { - //Set up New Sending pack msgpack::sbuffer pack_buf_c; pack_buf_c.write(pack_buf.data(),pack_buf.size()); - std::unique_lock<std::mutex> lck(mtx_); + std::lock_guard<std::mutex> lck(mtx_); messages_[type] = std::move(pack_buf_c); reloadMessages(); if (not lrunning_) { - if (sending_.joinable()) - sending_.join(); lrunning_ = true; - sending_ = std::thread([this](){ - std::unique_lock<std::mutex> lck(mtx_); - while (lrunning_) { - ssize_t nbytes = sendto( - sockFd_, - sbuf_.data(), - sbuf_.size(), - 0, - sockAddrSend_.get(), - sockAddrSend_.getLength() - ); - if (nbytes < 0) { - std::cerr << "Error sending packet: " << strerror(errno) << std::endl; - } - if (cv_.wait_for(lck,std::chrono::seconds(3),[&]{ return !lrunning_; })) - break; - } - }); + ioContext_->post(std::bind(&PeerDiscovery::DomainPeerDiscovery::publish, this)); } } bool -PeerDiscovery::DomainPeerDiscovery::stopDiscovery(const std::string &type) +PeerDiscovery::DomainPeerDiscovery::stopDiscovery(const std::string& type) { std::lock_guard<std::mutex> lck(dmtx_); if (callbackmap_.erase(type) > 0) { @@ -428,7 +197,7 @@ PeerDiscovery::DomainPeerDiscovery::stopDiscovery(const std::string &type) } bool -PeerDiscovery::DomainPeerDiscovery::stopPublish(const std::string &type) +PeerDiscovery::DomainPeerDiscovery::stopPublish(const std::string& type) { std::lock_guard<std::mutex> lck(mtx_); if (messages_.erase(type) > 0) { @@ -444,20 +213,8 @@ PeerDiscovery::DomainPeerDiscovery::stopPublish(const std::string &type) void PeerDiscovery::DomainPeerDiscovery::stopDiscovery() { -#ifdef _WIN32 -#define write(s, b, f) send((s), (b), (f), 0) -#endif - if (drunning_) { + if (drunning_) drunning_ = false; - if (stopWriteFd_ != -1) { - if (write(stopWriteFd_, "\0", 1) == -1) { - std::cerr << "Can't send stop message: " << strerror(errno) << std::endl; - } - } - } -#ifdef _WIN32 -#undef write -#endif } void @@ -465,7 +222,7 @@ PeerDiscovery::DomainPeerDiscovery::stopPublish() { if (lrunning_) { lrunning_ = false; - cv_.notify_all(); + publishTimer_.cancel(); } } @@ -473,11 +230,11 @@ void PeerDiscovery::DomainPeerDiscovery::stop() { { - std::unique_lock<std::mutex> lck(dmtx_); + std::lock_guard<std::mutex> lck(dmtx_); stopDiscovery(); } { - std::unique_lock<std::mutex> lck(mtx_); + std::lock_guard<std::mutex> lck(mtx_); stopPublish(); } } @@ -494,38 +251,65 @@ PeerDiscovery::DomainPeerDiscovery::reloadMessages() } } -PeerDiscovery::PeerDiscovery(in_port_t port) +PeerDiscovery::PeerDiscovery(in_port_t port, Sp<asio::io_context> ioContext, Sp<Logger> logger) { + if (not ioContext) { + ioContext = std::make_shared<asio::io_context>(); + ioContext_ = ioContext; + ioRunnner_ = std::thread([logger, ioContext] { + try { + if (logger) + logger->d("[peerdiscovery] starting io_context"); + auto work = asio::make_work_guard(*ioContext); + ioContext->run(); + if (logger) + logger->d("[peerdiscovery] io_context stopped"); + } + catch (const std::exception& ex){ + if (logger) + logger->e("[peerdiscovery] run error: %s", ex.what()); + } + }); + } + try { - peerDiscovery4_.reset(new DomainPeerDiscovery(AF_INET, port)); + peerDiscovery4_.reset(new DomainPeerDiscovery(asio::ip::udp::v4(), port, ioContext, logger)); } catch(const std::exception& e){ - std::cerr << "Can't start peer discovery (IPv4): " << e.what() << std::endl; + if (logger) + logger->e("[peerdiscovery] can't start IPv4: %s", e.what()); } try { - peerDiscovery6_.reset(new DomainPeerDiscovery(AF_INET6, port)); + peerDiscovery6_.reset(new DomainPeerDiscovery(asio::ip::udp::v6(), port, ioContext, logger)); } catch(const std::exception& e) { - std::cerr << "Can't start peer discovery (IPv6): " << e.what() << std::endl; + if (logger) + logger->e("[peerdiscovery] can't start IPv6: %s", e.what()); } } -PeerDiscovery::~PeerDiscovery(){} +PeerDiscovery::~PeerDiscovery() { + stop(); + if (ioContext_) + ioContext_->stop(); + if (ioRunnner_.joinable()) + ioRunnner_.join(); +} void -PeerDiscovery::startDiscovery(const std::string &type, ServiceDiscoveredCallback callback) +PeerDiscovery::startDiscovery(const std::string& type, ServiceDiscoveredCallback callback) { - if(peerDiscovery4_) peerDiscovery4_->startDiscovery(type, callback); - if(peerDiscovery6_) peerDiscovery6_->startDiscovery(type, callback); + if (peerDiscovery4_) peerDiscovery4_->startDiscovery(type, callback); + if (peerDiscovery6_) peerDiscovery6_->startDiscovery(type, callback); } void -PeerDiscovery::startPublish(const std::string &type, const msgpack::sbuffer &pack_buf) +PeerDiscovery::startPublish(const std::string& type, const msgpack::sbuffer& pack_buf) { - if(peerDiscovery4_) peerDiscovery4_->startPublish(type, pack_buf); - if(peerDiscovery6_) peerDiscovery6_->startPublish(type, pack_buf); + if (peerDiscovery4_) peerDiscovery4_->startPublish(type, pack_buf); + if (peerDiscovery6_) peerDiscovery6_->startPublish(type, pack_buf); } void -PeerDiscovery::startPublish(sa_family_t domain, const std::string &type, const msgpack::sbuffer &pack_buf) +PeerDiscovery::startPublish(sa_family_t domain, const std::string& type, const msgpack::sbuffer& pack_buf) { if (domain == AF_INET) { if (peerDiscovery4_) peerDiscovery4_->startPublish(type, pack_buf); @@ -537,8 +321,8 @@ PeerDiscovery::startPublish(sa_family_t domain, const std::string &type, const m void PeerDiscovery::stop() { - if(peerDiscovery4_) peerDiscovery4_->stop(); - if(peerDiscovery6_) peerDiscovery6_->stop(); + if (peerDiscovery4_) peerDiscovery4_->stop(); + if (peerDiscovery6_) peerDiscovery6_->stop(); } bool @@ -558,7 +342,7 @@ PeerDiscovery::stopPublish(const std::string &type) } bool -PeerDiscovery::stopPublish(sa_family_t domain, const std::string &type) +PeerDiscovery::stopPublish(sa_family_t domain, const std::string& type) { if (domain == AF_INET) { return peerDiscovery4_ and peerDiscovery4_->stopPublish(type); @@ -568,10 +352,4 @@ PeerDiscovery::stopPublish(sa_family_t domain, const std::string &type) return false; } -void -PeerDiscovery::join() { - if(peerDiscovery4_) peerDiscovery4_->join(); - if(peerDiscovery6_) peerDiscovery6_->join(); -} - } diff --git a/tests/peerdiscoverytester.cpp b/tests/peerdiscoverytester.cpp index 0bfee88bc515b258f7e82b74e7e70995ec7363e9..7a5aaedff6728e0dc07955297fbb4e28208e2d80 100644 --- a/tests/peerdiscoverytester.cpp +++ b/tests/peerdiscoverytester.cpp @@ -22,7 +22,7 @@ namespace test { -constexpr int MULTICAST_PORT = 2222; +constexpr unsigned MULTICAST_PORT = 2222; const std::string DHT_NODE_NAME {"dht"}; const std::string JAMI_NODE_NAME {"jami"};