diff --git a/include/opendht/dhtrunner.h b/include/opendht/dhtrunner.h index ceba870be5da5bd7306e6a9f534f8b7fed77a7e4..2f3be5c1e7ecc2ccb0bec8f9dea94d7bc1267c9d 100644 --- a/include/opendht/dhtrunner.h +++ b/include/opendht/dhtrunner.h @@ -544,8 +544,7 @@ private: std::string pushToken_; /** PeerDiscovery Parameters */ - std::unique_ptr<PeerDiscovery> peerDiscovery4_; - std::unique_ptr<PeerDiscovery> peerDiscovery6_; + std::unique_ptr<PeerDiscovery> peerDiscovery_; NetId current_node_netid_; }; diff --git a/include/opendht/peer_discovery.h b/include/opendht/peer_discovery.h index efecc50d317239bc63134476504d4adb59564209..304554a1acacc9281fbef1942133e353cea4e41d 100644 --- a/include/opendht/peer_discovery.h +++ b/include/opendht/peer_discovery.h @@ -28,17 +28,12 @@ namespace dht { -enum class PackType -{ - NodeInsertion = 0 -}; - class OPENDHT_PUBLIC PeerDiscovery { public: using ServiceDiscoveredCallback = std::function<void(msgpack::object&&, SockAddr&&)>; - PeerDiscovery(sa_family_t domain, in_port_t port); + PeerDiscovery(in_port_t port); ~PeerDiscovery(); /** @@ -74,63 +69,12 @@ public: /** * Join the threads */ - void join() { - if(running_listen_.joinable()) running_listen_.join(); - if(running_send_.joinable()) running_send_.join(); - } - - /** - * Pack Types/Callback Map - */ + void join(); private: - //dmtx_ for callbackmap_ and drunning_ (write) - std::mutex dmtx_; - //mtx_ for messages_ and lrunning (listen) - std::mutex mtx_; - std::condition_variable cv_; - bool lrunning_ {false}; - bool drunning_ {false}; - sa_family_t domain_ {AF_UNSPEC}; - int port_; - int sockfd_ {-1}; - int stop_writefd_ {-1}; - - SockAddr sockAddrSend_; - - //Thread export to be joined - std::thread running_listen_; - std::thread running_send_; - - msgpack::sbuffer sbuf_; - msgpack::sbuffer rbuf_; - std::map<std::string, msgpack::sbuffer> messages_; - std::map<std::string, ServiceDiscoveredCallback> callbackmap_; - - /** - * Multicast Socket Initialization, accept IPV4, IPV6 - */ - static int initialize_socket(sa_family_t domain); - - /** - * Receive messages - */ - SockAddr recvFrom(size_t &buf_size); - - /** - * Listener pack thread loop - */ - void listenerpack_thread(); - - /** - * Listener Parameters Setup - */ - void listener_setup(); - - /** - * Sender Parameters Setup - */ - void sender_setup(); + class DomainPeerDiscovery; + std::unique_ptr<DomainPeerDiscovery> peerDiscovery4_; + std::unique_ptr<DomainPeerDiscovery> peerDiscovery6_; }; } diff --git a/src/dhtrunner.cpp b/src/dhtrunner.cpp index 4923df22af9869dbada35cc4805a9f43010b5171..1a34bcc00e98756f2f7e6100c455ea35e54d1bd4 100644 --- a/src/dhtrunner.cpp +++ b/src/dhtrunner.cpp @@ -152,24 +152,12 @@ DhtRunner::run(const SockAddr& local4, const SockAddr& local6, const DhtRunner:: }); if (config.peer_discovery or config.peer_publish) { - try { - peerDiscovery4_.reset(new PeerDiscovery(AF_INET, PEER_DISCOVERY_PORT)); - } catch(const std::exception& e){ - std::cerr << "Can't start peer discovery (IPv4): " << e.what() << std::endl; - } - try { - peerDiscovery6_.reset(new PeerDiscovery(AF_INET6, PEER_DISCOVERY_PORT)); - } catch(const std::exception& e) { - std::cerr << "Can't start peer discovery (IPv6): " << e.what() << std::endl; - } + peerDiscovery_.reset(new PeerDiscovery(PEER_DISCOVERY_PORT)); } if (config.peer_discovery) { - if (peerDiscovery4_) - peerDiscovery4_->startDiscovery(PEER_DISCOVERY_DHT_SERVICE, - std::bind(&DhtRunner::nodeInsertionCallback, this, std::placeholders::_1,std::placeholders::_2)); - if (peerDiscovery6_) - peerDiscovery6_->startDiscovery(PEER_DISCOVERY_DHT_SERVICE, + if (peerDiscovery_) + peerDiscovery_->startDiscovery(PEER_DISCOVERY_DHT_SERVICE, std::bind(&DhtRunner::nodeInsertionCallback, this, std::placeholders::_1,std::placeholders::_2)); } if (config.peer_publish) { @@ -180,10 +168,8 @@ DhtRunner::run(const SockAddr& local4, const SockAddr& local6, const DhtRunner:: adc.nodeid_ = dht_->getNodeId(); msgpack::sbuffer sbuf_node; msgpack::pack(sbuf_node, adc); - if (peerDiscovery4_) - peerDiscovery4_->startPublish(PEER_DISCOVERY_DHT_SERVICE, sbuf_node); - if (peerDiscovery6_) - peerDiscovery6_->startPublish(PEER_DISCOVERY_DHT_SERVICE, sbuf_node); + if (peerDiscovery_) + peerDiscovery_->startPublish(PEER_DISCOVERY_DHT_SERVICE, sbuf_node); } } @@ -207,8 +193,7 @@ DhtRunner::join() running = false; cv.notify_all(); bootstrap_cv.notify_all(); - if (peerDiscovery4_) peerDiscovery4_->stop(); - if (peerDiscovery6_) peerDiscovery6_->stop(); + if (peerDiscovery_) peerDiscovery_->stop(); if (dht_thread.joinable()) dht_thread.join(); @@ -217,8 +202,7 @@ DhtRunner::join() if (rcv_thread.joinable()) rcv_thread.join(); - if (peerDiscovery4_) peerDiscovery4_->join(); - if (peerDiscovery6_) peerDiscovery6_->join(); + if (peerDiscovery_) peerDiscovery_->join(); { std::lock_guard<std::mutex> lck(storage_mtx); @@ -1017,8 +1001,7 @@ DhtRunner::findCertificate(InfoHash hash, std::function<void(const std::shared_p void DhtRunner::resetDht() { - peerDiscovery4_.reset(); - peerDiscovery6_.reset(); + peerDiscovery_.reset(); #ifdef OPENDHT_PROXY_CLIENT listeners_.clear(); dht_via_proxy_.reset(); diff --git a/src/peer_discovery.cpp b/src/peer_discovery.cpp index 063a7d91f65f1ce8429d1f2f8251692af19476ca..74b466e0d985ffc7e0ab6bda86b61d99f904f561 100644 --- a/src/peer_discovery.cpp +++ b/src/peer_discovery.cpp @@ -18,6 +18,7 @@ #include "peer_discovery.h" #include "network_utils.h" +#include "utils.h" #ifdef _WIN32 #include <Ws2tcpip.h> // needed for ip_mreq definition for multicast @@ -36,13 +37,111 @@ namespace dht { constexpr char MULTICAST_ADDRESS_IPV4[10] = "224.0.0.1"; constexpr char MULTICAST_ADDRESS_IPV6[8] = "ff05::2"; // Site-local multicast -PeerDiscovery::PeerDiscovery(sa_family_t domain, in_port_t port) + +class PeerDiscovery::DomainPeerDiscovery +{ +public: + DomainPeerDiscovery(sa_family_t domain, in_port_t port); + ~DomainPeerDiscovery(); + + /** + * startDiscovery - Keep Listening data from the sender until node is joinned or stop is called + */ + void startDiscovery(const std::string &type, ServiceDiscoveredCallback callback); + + /** + * startPublish - Keeping sending data until node is joinned or stop is called - msgpack + */ + void startPublish(const std::string &type, const msgpack::sbuffer &pack_buf); + + /** + * Thread Stopper + */ + void stop(); + + /** + * Remove possible callBack to discovery + */ + void stopDiscovery(const std::string &type); + + /** + * Remove different serivce message to send + */ + void stopPublish(const std::string &type); + + /** + * Configure the sockopt to be able to listen multicast group + */ + static void socketJoinMulticast(int sockfd, sa_family_t family); + + /** + * Join the threads + */ + void join() { + if(running_listen_.joinable()) running_listen_.join(); + if(running_send_.joinable()) running_send_.join(); + } +private: + //dmtx_ for callbackmap_ and drunning_ (write) + std::mutex dmtx_; + //mtx_ for messages_ and lrunning (listen) + std::mutex mtx_; + std::condition_variable cv_; + bool lrunning_ {false}; + bool drunning_ {false}; + sa_family_t domain_ {AF_UNSPEC}; + int port_; + int sockfd_ {-1}; + int stop_writefd_ {-1}; + + SockAddr sockAddrSend_; + + //Thread export to be joined + std::thread running_listen_; + std::thread running_send_; + + msgpack::sbuffer sbuf_; + msgpack::sbuffer rbuf_; + std::map<std::string, msgpack::sbuffer> messages_; + std::map<std::string, ServiceDiscoveredCallback> callbackmap_; + + /** + * Multicast Socket Initialization, accept IPV4, IPV6 + */ + static int initialize_socket(sa_family_t domain); + + /** + * Receive messages + */ + std::pair<SockAddr, Blob> recvFrom(); + + /** + * Listener pack thread loop + */ + void listenerpack_thread(); + + /** + * Listener Parameters Setup + */ + void listener_setup(); + + /** + * Sender Parameters Setup + */ + void sender_setup(); + /** + * Sender Parameters Setup + */ + void messages_reload(); +}; + +PeerDiscovery::DomainPeerDiscovery::DomainPeerDiscovery(sa_family_t domain, in_port_t port) : domain_(domain), port_(port), sockfd_(initialize_socket(domain)) { socketJoinMulticast(sockfd_, domain); } -PeerDiscovery::~PeerDiscovery() +PeerDiscovery::DomainPeerDiscovery::~DomainPeerDiscovery() { if (sockfd_ != -1) close(sockfd_); @@ -53,7 +152,7 @@ PeerDiscovery::~PeerDiscovery() } int -PeerDiscovery::initialize_socket(sa_family_t domain) +PeerDiscovery::DomainPeerDiscovery::initialize_socket(sa_family_t domain) { #ifdef _WIN32 WSADATA wsaData; @@ -71,7 +170,7 @@ PeerDiscovery::initialize_socket(sa_family_t domain) } void -PeerDiscovery::listener_setup() +PeerDiscovery::DomainPeerDiscovery::listener_setup() { SockAddr sockAddrListen_; sockAddrListen_.setFamily(domain_); @@ -93,7 +192,7 @@ PeerDiscovery::listener_setup() } void -PeerDiscovery::socketJoinMulticast(int sockfd, sa_family_t family) +PeerDiscovery::DomainPeerDiscovery::socketJoinMulticast(int sockfd, sa_family_t family) { switch (family) { @@ -146,28 +245,28 @@ PeerDiscovery::socketJoinMulticast(int sockfd, sa_family_t family) } void -PeerDiscovery::startDiscovery(const std::string &type, ServiceDiscoveredCallback callback) +PeerDiscovery::DomainPeerDiscovery::startDiscovery(const std::string &type, ServiceDiscoveredCallback callback) { std::unique_lock<std::mutex> lck(dmtx_); callbackmap_[type] = callback; if (not drunning_){ drunning_ = true; listener_setup(); - running_listen_ = std::thread(&PeerDiscovery::listenerpack_thread, this); + running_listen_ = std::thread(&DomainPeerDiscovery::listenerpack_thread, this); } } -SockAddr -PeerDiscovery::recvFrom(size_t &buf_size) +std::pair<SockAddr, Blob> +PeerDiscovery::DomainPeerDiscovery::recvFrom() { sockaddr_storage storeage_recv; socklen_t sa_len = sizeof(storeage_recv); - char recv[1024]; + std::array<uint8_t, 64 * 1024> recv; ssize_t nbytes = recvfrom( sockfd_, - recv, - 1024, + recv.data(), + recv.size(), 0, (sockaddr*)&storeage_recv, &sa_len @@ -175,15 +274,13 @@ PeerDiscovery::recvFrom(size_t &buf_size) if (nbytes < 0) { throw std::runtime_error(std::string("Error receiving packet: ") + strerror(errno)); } - - rbuf_.write(recv,nbytes); - buf_size = nbytes; + SockAddr ret {storeage_recv, sa_len}; - return ret; + return {ret, Blob(recv.begin(), recv.end())}; } void -PeerDiscovery::listenerpack_thread() +PeerDiscovery::DomainPeerDiscovery::listenerpack_thread() { int stopfds_pipe[2]; #ifndef _WIN32 @@ -195,7 +292,7 @@ PeerDiscovery::listenerpack_thread() int stop_readfd = stopfds_pipe[0]; stop_writefd_ = stopfds_pipe[1]; - while(true) { + while (true) { fd_set readfds; FD_ZERO(&readfds); @@ -223,9 +320,8 @@ PeerDiscovery::listenerpack_thread() recv(stop_readfd, buf.data(), buf.size(), 0); } - size_t recv_buf_size {0}; - auto from = recvFrom(recv_buf_size); - msgpack::object_handle oh = msgpack::unpack(rbuf_.data(), recv_buf_size); + auto rcv = recvFrom(); + msgpack::object_handle oh = msgpack::unpack(reinterpret_cast<char*>(rcv.second.data()), rcv.second.size()); msgpack::object obj = oh.get(); if (obj.type != msgpack::type::MAP) @@ -238,10 +334,9 @@ PeerDiscovery::listenerpack_thread() std::unique_lock<std::mutex> lck(dmtx_); auto callback = callbackmap_.find(key); if (callback != callbackmap_.end()){ - callback->second(std::move(o.val), std::move(from)); + callback->second(std::move(o.val), std::move(rcv.first)); } } - ::free(rbuf_.release()); } } if (stop_readfd != -1) @@ -253,7 +348,7 @@ PeerDiscovery::listenerpack_thread() } void -PeerDiscovery::sender_setup() +PeerDiscovery::DomainPeerDiscovery::sender_setup() { // Setup sender address sockAddrSend_.setFamily(domain_); @@ -261,21 +356,16 @@ PeerDiscovery::sender_setup() sockAddrSend_.setPort(port_); } -void PeerDiscovery::startPublish(const std::string &type, const msgpack::sbuffer &pack_buf) +void +PeerDiscovery::DomainPeerDiscovery::startPublish(const std::string &type, const msgpack::sbuffer &pack_buf) { //Set up New Sending pack msgpack::sbuffer pack_buf_c; pack_buf_c.write(pack_buf.data(),pack_buf.size()); std::unique_lock<std::mutex> lck(mtx_); - ::free(sbuf_.release()); messages_[type] = std::move(pack_buf_c); - msgpack::packer<msgpack::sbuffer> pk(&sbuf_); - pk.pack_map(messages_.size()); - for (const auto& m : messages_) { - pk.pack(m.first); - sbuf_.write(m.second.data(), m.second.size()); - } + messages_reload(); if (not lrunning_) { lrunning_ = true; sender_setup(); @@ -301,7 +391,7 @@ void PeerDiscovery::startPublish(const std::string &type, const msgpack::sbuffer } void -PeerDiscovery::stopDiscovery(const std::string &type) +PeerDiscovery::DomainPeerDiscovery::stopDiscovery(const std::string &type) { { std::unique_lock<std::mutex> lck(dmtx_); @@ -313,26 +403,18 @@ PeerDiscovery::stopDiscovery(const std::string &type) } void -PeerDiscovery::stopPublish(const std::string &type) +PeerDiscovery::DomainPeerDiscovery::stopPublish(const std::string &type) { - { - std::unique_lock<std::mutex> lck(mtx_); - ::free(sbuf_.release()); - auto it = messages_.find(type); - if(it != messages_.end()){ - messages_.erase(it); - } - msgpack::packer<msgpack::sbuffer> pk(&sbuf_); - pk.pack_map(messages_.size()); - for (const auto& m : messages_) { - pk.pack(m.first); - sbuf_.write(m.second.data(), m.second.size()); - } + std::unique_lock<std::mutex> lck(mtx_); + auto it = messages_.find(type); + if(it != messages_.end()){ + messages_.erase(it); } + messages_reload(); } void -PeerDiscovery::stop() +PeerDiscovery::DomainPeerDiscovery::stop() { { std::unique_lock<std::mutex> lck(mtx_); @@ -350,4 +432,94 @@ PeerDiscovery::stop() } } +void +PeerDiscovery::DomainPeerDiscovery::messages_reload() +{ + sbuf_.clear(); + msgpack::packer<msgpack::sbuffer> pk(&sbuf_); + pk.pack_map(messages_.size()); + for (const auto& m : messages_) { + pk.pack(m.first); + sbuf_.write(m.second.data(), m.second.size()); + } +} + +PeerDiscovery::PeerDiscovery(in_port_t port) +{ + try { + peerDiscovery4_.reset(new DomainPeerDiscovery(AF_INET, port)); + } catch(const std::exception& e){ + peerDiscovery4_.reset(nullptr); + std::cerr << "Can't start peer discovery (IPv4): " << e.what() << std::endl; + } + try { + peerDiscovery6_.reset(new DomainPeerDiscovery(AF_INET6, port)); + } catch(const std::exception& e) { + peerDiscovery6_.reset(nullptr); + std::cerr << "Can't start peer discovery (IPv6): " << e.what() << std::endl; + } + +} + +PeerDiscovery::~PeerDiscovery(){} + +/** + * startDiscovery - Keep Listening data from the sender until node is joinned or stop is called +*/ +void +PeerDiscovery::startDiscovery(const std::string &type, ServiceDiscoveredCallback callback) +{ + if(peerDiscovery4_) peerDiscovery4_->startDiscovery(type, callback); + if(peerDiscovery6_) peerDiscovery6_->startDiscovery(type, callback); +} + +/** + * startPublish - Keeping sending data until node is joinned or stop is called - msgpack +*/ +void +PeerDiscovery::startPublish(const std::string &type, const msgpack::sbuffer &pack_buf) +{ + if(peerDiscovery4_) peerDiscovery4_->startPublish(type, pack_buf); + if(peerDiscovery6_) peerDiscovery6_->startPublish(type, pack_buf); +} + +/** + * Thread Stopper +*/ +void +PeerDiscovery::stop() +{ + if(peerDiscovery4_) peerDiscovery4_->stop(); + if(peerDiscovery6_) peerDiscovery6_->stop(); +} + +/** + * Remove possible callBack to discovery +*/ +void +PeerDiscovery::stopDiscovery(const std::string &type) +{ + if(peerDiscovery4_) peerDiscovery4_->stopDiscovery(type); + if(peerDiscovery6_) peerDiscovery6_->stopDiscovery(type); +} + +/** + * Remove different serivce message to send +*/ +void +PeerDiscovery::stopPublish(const std::string &type) +{ + if(peerDiscovery4_) peerDiscovery4_->stopPublish(type); + if(peerDiscovery6_) peerDiscovery6_->stopPublish(type); +} + +/** + * Join the threads +*/ +void +PeerDiscovery::join() { + if(peerDiscovery4_) peerDiscovery4_->join(); + if(peerDiscovery6_) peerDiscovery6_->join(); +} + } diff --git a/tests/peerdiscoverytester.cpp b/tests/peerdiscoverytester.cpp index 32ce32a79ccdbf460ef18597239ded9021b147c6..2454f2324bfec696b5f55986d7c96d432fd96c9b 100644 --- a/tests/peerdiscoverytester.cpp +++ b/tests/peerdiscoverytester.cpp @@ -42,7 +42,7 @@ CPPUNIT_TEST_SUITE_REGISTRATION(PeerDiscoveryTester); void PeerDiscoveryTester::setUp(){} -void PeerDiscoveryTester::testTransmission_ipv4(){ +void PeerDiscoveryTester::testTransmission(){ // Node for getnode id const std::string type {"dht"}; @@ -66,67 +66,8 @@ void PeerDiscoveryTester::testTransmission_ipv4(){ msgpack::pack(pbuf,pdd); try{ - dht::PeerDiscovery test_n(AF_INET, port); - dht::PeerDiscovery test_s(AF_INET, port); - try{ - test_s.startDiscovery(type,[&](msgpack::object&& obj, dht::SockAddr&& add){ - auto v = obj.as<NodeInsertion>(); - CPPUNIT_ASSERT_EQUAL(v.node_port_, port_n); - CPPUNIT_ASSERT_EQUAL(v.nodeid_, data_n); - }); - - test_s.startDiscovery(test_type,[&](msgpack::object&& obj, dht::SockAddr&& add){ - auto v = obj.as<TestPack>(); - CPPUNIT_ASSERT_EQUAL(v.num, 100); - CPPUNIT_ASSERT_EQUAL(v.cha, 'a'); - }); - - test_n.startPublish(type, sbuf); - std::this_thread::sleep_for(std::chrono::seconds(5)); - test_n.startPublish(test_type, pbuf); - std::this_thread::sleep_for(std::chrono::seconds(5)); - test_n.stopPublish(test_type); - - std::this_thread::sleep_for(std::chrono::seconds(10)); - test_n.stop(); - test_s.stop(); - test_n.join(); - test_s.join(); - } catch(std::exception &exception){ - perror(exception.what()); - CPPUNIT_ASSERT(false); - } - } catch(std::exception &exception){ - perror(exception.what()); - } -} - -void PeerDiscoveryTester::testTransmission_ipv6(){ - - // Node for getnode id - const std::string type {"dht"}; - const std::string test_type {"pdd"}; - dht::InfoHash data_n = dht::InfoHash::get("applepin"); - int port = 2222; - in_port_t port_n = 50000; - - msgpack::sbuffer sbuf; - NodeInsertion adc; - adc.nid_ = 10; - adc.node_port_ = port_n; - adc.nodeid_ = data_n; - msgpack::pack(sbuf,adc); - - msgpack::sbuffer pbuf; - TestPack pdd; - pdd.num = 100; - pdd.cha = 'a'; - pdd.str = "apple"; - msgpack::pack(pbuf,pdd); - - try{ - dht::PeerDiscovery test_n(AF_INET6, port); - dht::PeerDiscovery test_s(AF_INET6, port); + dht::PeerDiscovery test_n(port); + dht::PeerDiscovery test_s(port); try{ test_s.startDiscovery(type,[&](msgpack::object&& obj, dht::SockAddr&& add){ auto v = obj.as<NodeInsertion>(); diff --git a/tests/peerdiscoverytester.h b/tests/peerdiscoverytester.h index b1f1dd9e8b004a8ec0666eb8f840080deca2c6cc..a682689038d3bda198ceabb41eb2f4326b28572d 100644 --- a/tests/peerdiscoverytester.h +++ b/tests/peerdiscoverytester.h @@ -26,11 +26,10 @@ namespace test { -class PeerDiscoveryTester : public CppUnit::TestFixture { +class OPENDHT_PUBLIC PeerDiscoveryTester : public CppUnit::TestFixture { CPPUNIT_TEST_SUITE(PeerDiscoveryTester); - CPPUNIT_TEST(testTransmission_ipv4); - CPPUNIT_TEST(testTransmission_ipv6); + CPPUNIT_TEST(testTransmission); CPPUNIT_TEST_SUITE_END(); public: @@ -45,12 +44,7 @@ class PeerDiscoveryTester : public CppUnit::TestFixture { /** * Test Multicast Transmission Ipv4 */ - void testTransmission_ipv4(); - /** - * Test Multicast Transmission Ipv6 - */ - void testTransmission_ipv6(); - + void testTransmission(); }; } // namespace test