diff --git a/include/opendht/peer_discovery.h b/include/opendht/peer_discovery.h index 7aa7c3b6907a7aef5bd0a65f6b490429b12ac3fb..4fcdfafa29c4112bd41b887c932399da49a03608 100644 --- a/include/opendht/peer_discovery.h +++ b/include/opendht/peer_discovery.h @@ -1,6 +1,7 @@ /* * Copyright (C) 2014-2019 Savoir-faire Linux Inc. * Author(s) : Mingrui Zhang <mingrui.zhang@savoirfairelinux.com> + * Vsevolod Ivanov <vsevolod.ivanov@savoirfairelinux.com> * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -81,7 +82,7 @@ public: /** * Configure the sockopt to be able to listen multicast group */ - static void socketJoinMulticast(int sockfd, sa_family_t family); + static void joinMulticast(int sockfd, sa_family_t family); /** * Join the threads diff --git a/src/peer_discovery.cpp b/src/peer_discovery.cpp index 6341c250f674526d04ce08c3bc4dfaaafa9a1ce8..e19c57a32d2eb3e4bc9f4b9c4d23a19d620b6101 100644 --- a/src/peer_discovery.cpp +++ b/src/peer_discovery.cpp @@ -1,6 +1,7 @@ /* * Copyright (C) 2014-2019 Savoir-faire Linux Inc. * Author(s) : Mingrui Zhang <mingrui.zhang@savoirfairelinux.com> + * Vsevolod Ivanov <vsevolod.ivanov@savoirfairelinux.com> * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -45,7 +46,6 @@ namespace dht { constexpr char MULTICAST_ADDRESS_IPV4[12] = "239.192.0.1"; constexpr char MULTICAST_ADDRESS_IPV6[10] = "ff08::101"; - class PeerDiscovery::DomainPeerDiscovery { public: @@ -80,14 +80,14 @@ public: /** * Configure the sockopt to be able to listen multicast group */ - static void socketJoinMulticast(int sockfd, sa_family_t family); + static void joinMulticast(int sockfd, sa_family_t family); /** * Join the threads */ void join() { - if(running_listen_.joinable()) running_listen_.join(); - if(running_send_.joinable()) running_send_.join(); + if(listening_.joinable()) listening_.join(); + if(sending_.joinable()) sending_.join(); } private: //dmtx_ for callbackmap_ and drunning_ (write) @@ -99,14 +99,14 @@ private: bool drunning_ {false}; sa_family_t domain_ {AF_UNSPEC}; int port_; - int sockfd_ {-1}; - int stop_writefd_ {-1}; + int sockFd_ {-1}; + int stopWriteFd_ {-1}; SockAddr sockAddrSend_; //Thread export to be joined - std::thread running_listen_; - std::thread running_send_; + std::thread listening_; + std::thread sending_; msgpack::sbuffer sbuf_; std::map<std::string, msgpack::sbuffer> messages_; @@ -115,7 +115,7 @@ private: /** * Multicast Socket Initialization, accept IPV4, IPV6 */ - static int initialize_socket(sa_family_t domain); + static int init(sa_family_t domain); /** * Receive messages @@ -125,40 +125,40 @@ private: /** * Listener pack thread loop */ - void listenerpack_thread(); + void loopListener(); /** * Listener Parameters Setup */ - void listener_setup(); + void setupListener(); /** * Sender Parameters Setup */ - void sender_setup(); + void setupSender(); /** * Sender Parameters Setup */ - void messages_reload(); + void reloadMessages(); void stopDiscovery(); void stopPublish(); }; PeerDiscovery::DomainPeerDiscovery::DomainPeerDiscovery(sa_family_t domain, in_port_t port) - : domain_(domain), port_(port), sockfd_(initialize_socket(domain)) + : domain_(domain), port_(port), sockFd_(init(domain)) { - socketJoinMulticast(sockfd_, domain); - listener_setup(); - sender_setup(); + joinMulticast(sockFd_, domain); + setupListener(); + setupSender(); } PeerDiscovery::DomainPeerDiscovery::~DomainPeerDiscovery() { stop(); join(); - if (sockfd_ != -1) - close(sockfd_); + if (sockFd_ != -1) + close(sockFd_); #ifdef _WIN32 WSACleanup(); @@ -166,7 +166,7 @@ PeerDiscovery::DomainPeerDiscovery::~DomainPeerDiscovery() } int -PeerDiscovery::DomainPeerDiscovery::initialize_socket(sa_family_t domain) +PeerDiscovery::DomainPeerDiscovery::init(sa_family_t domain) { #ifdef _WIN32 WSADATA wsaData; @@ -184,7 +184,7 @@ PeerDiscovery::DomainPeerDiscovery::initialize_socket(sa_family_t domain) } void -PeerDiscovery::DomainPeerDiscovery::listener_setup() +PeerDiscovery::DomainPeerDiscovery::setupListener() { SockAddr sockAddrListen_; sockAddrListen_.setFamily(domain_); @@ -192,23 +192,23 @@ PeerDiscovery::DomainPeerDiscovery::listener_setup() sockAddrListen_.setAny(); unsigned int opt = 1; - if (setsockopt(sockfd_, SOL_SOCKET, SO_REUSEADDR, (char*)&opt, sizeof(opt)) < 0) { + if (setsockopt(sockFd_, SOL_SOCKET, SO_REUSEADDR, (char*)&opt, sizeof(opt)) < 0) { std::cerr << "setsockopt SO_REUSEADDR failed: " << strerror(errno) << std::endl; } #ifdef SO_REUSEPORT - if (setsockopt(sockfd_, SOL_SOCKET, SO_REUSEPORT, (char*)&opt, sizeof(opt)) < 0) { + if (setsockopt(sockFd_, SOL_SOCKET, SO_REUSEPORT, (char*)&opt, sizeof(opt)) < 0) { std::cerr << "setsockopt SO_REUSEPORT failed: " << strerror(errno) << std::endl; } #endif // bind to receive address - if (bind(sockfd_, sockAddrListen_.get(), sockAddrListen_.getLength()) < 0){ + if (bind(sockFd_, sockAddrListen_.get(), sockAddrListen_.getLength()) < 0){ throw std::runtime_error(std::string("Error binding socket: ") + strerror(errno)); } } void -PeerDiscovery::DomainPeerDiscovery::socketJoinMulticast(int sockfd, sa_family_t family) +PeerDiscovery::DomainPeerDiscovery::joinMulticast(int sockfd, sa_family_t family) { switch (family) { @@ -266,10 +266,10 @@ PeerDiscovery::DomainPeerDiscovery::startDiscovery(const std::string &type, Serv std::unique_lock<std::mutex> lck(dmtx_); callbackmap_[type] = callback; if (not drunning_) { - if (running_listen_.joinable()) - running_listen_.join(); + if (listening_.joinable()) + listening_.join(); drunning_ = true; - running_listen_ = std::thread(&DomainPeerDiscovery::listenerpack_thread, this); + listening_ = std::thread(&DomainPeerDiscovery::loopListener, this); } } @@ -281,7 +281,7 @@ PeerDiscovery::DomainPeerDiscovery::recvFrom() std::array<uint8_t, 64 * 1024> buf; ssize_t nbytes = recvfrom( - sockfd_, + sockFd_, (char*)buf.data(), buf.size(), 0, (sockaddr*)&sa, &sa_len @@ -293,7 +293,7 @@ PeerDiscovery::DomainPeerDiscovery::recvFrom() } void -PeerDiscovery::DomainPeerDiscovery::listenerpack_thread() +PeerDiscovery::DomainPeerDiscovery::loopListener() { int stopfds_pipe[2]; #ifndef _WIN32 @@ -303,7 +303,7 @@ PeerDiscovery::DomainPeerDiscovery::listenerpack_thread() net::udpPipe(stopfds_pipe); #endif int stop_readfd = stopfds_pipe[0]; - stop_writefd_ = stopfds_pipe[1]; + stopWriteFd_ = stopfds_pipe[1]; while (true) { { @@ -316,9 +316,9 @@ PeerDiscovery::DomainPeerDiscovery::listenerpack_thread() FD_ZERO(&readfds); FD_SET(stop_readfd, &readfds); - FD_SET(sockfd_, &readfds); + FD_SET(sockFd_, &readfds); - int data_coming = select(std::max(sockfd_, stop_readfd) + 1, &readfds, nullptr, nullptr, nullptr); + int data_coming = select(std::max(sockFd_, stop_readfd) + 1, &readfds, nullptr, nullptr, nullptr); { std::lock_guard<std::mutex> lck(dmtx_); @@ -365,14 +365,14 @@ PeerDiscovery::DomainPeerDiscovery::listenerpack_thread() } if (stop_readfd != -1) close(stop_readfd); - if (stop_writefd_ != -1) { - close(stop_writefd_); - stop_writefd_ = -1; + if (stopWriteFd_ != -1) { + close(stopWriteFd_); + stopWriteFd_ = -1; } } void -PeerDiscovery::DomainPeerDiscovery::sender_setup() +PeerDiscovery::DomainPeerDiscovery::setupSender() { // Setup sender address sockAddrSend_.setFamily(domain_); @@ -389,16 +389,16 @@ PeerDiscovery::DomainPeerDiscovery::startPublish(const std::string &type, const std::unique_lock<std::mutex> lck(mtx_); messages_[type] = std::move(pack_buf_c); - messages_reload(); + reloadMessages(); if (not lrunning_) { - if (running_send_.joinable()) - running_send_.join(); + if (sending_.joinable()) + sending_.join(); lrunning_ = true; - running_send_ = std::thread([this](){ + sending_ = std::thread([this](){ std::unique_lock<std::mutex> lck(mtx_); while (lrunning_) { ssize_t nbytes = sendto( - sockfd_, + sockFd_, sbuf_.data(), sbuf_.size(), 0, @@ -435,7 +435,7 @@ PeerDiscovery::DomainPeerDiscovery::stopPublish(const std::string &type) if (messages_.empty()) stopPublish(); else - messages_reload(); + reloadMessages(); return true; } return false; @@ -449,8 +449,8 @@ PeerDiscovery::DomainPeerDiscovery::stopDiscovery() #endif if (drunning_) { drunning_ = false; - if (stop_writefd_ != -1) { - if (write(stop_writefd_, "\0", 1) == -1) { + if (stopWriteFd_ != -1) { + if (write(stopWriteFd_, "\0", 1) == -1) { std::cerr << "Can't send stop message: " << strerror(errno) << std::endl; } } @@ -483,7 +483,7 @@ PeerDiscovery::DomainPeerDiscovery::stop() } void -PeerDiscovery::DomainPeerDiscovery::messages_reload() +PeerDiscovery::DomainPeerDiscovery::reloadMessages() { sbuf_.clear(); msgpack::packer<msgpack::sbuffer> pk(&sbuf_); diff --git a/tests/peerdiscoverytester.cpp b/tests/peerdiscoverytester.cpp index d3de94786aa2b0cae3b12d562080b026b355508a..0bfee88bc515b258f7e82b74e7e70995ec7363e9 100644 --- a/tests/peerdiscoverytester.cpp +++ b/tests/peerdiscoverytester.cpp @@ -22,14 +22,17 @@ namespace test { -struct NodeInsertion { +constexpr int MULTICAST_PORT = 2222; +const std::string DHT_NODE_NAME {"dht"}; +const std::string JAMI_NODE_NAME {"jami"}; + +struct DhtNode { dht::InfoHash nodeid; in_port_t node_port; dht::NetId nid; MSGPACK_DEFINE(nodeid, node_port, nid) }; - -struct TestPack { +struct JamiNode { int num; char cha; std::string str; @@ -40,73 +43,69 @@ CPPUNIT_TEST_SUITE_REGISTRATION(PeerDiscoveryTester); void PeerDiscoveryTester::setUp(){} -void PeerDiscoveryTester::testTransmission() +void PeerDiscoveryTester::testMulticastToTwoNodes() { - // Node for getnode id - const std::string type {"dht"}; - const std::string test_type {"pdd"}; - constexpr int MULTICAST_PORT = 2222; - - NodeInsertion adc; - adc.nid = 10; - adc.node_port = 50000; - adc.nodeid = dht::InfoHash::get("applepin"); + DhtNode dhtNode; + dhtNode.nid = 10; + dhtNode.node_port = 50000; + dhtNode.nodeid = dht::InfoHash::get("opendht01"); - TestPack pdd; - pdd.num = 100; - pdd.cha = 'a'; - pdd.str = "apple"; + JamiNode jamiNode; + jamiNode.num = 100; + jamiNode.cha = 'a'; + jamiNode.str = "jami01"; std::mutex lock; std::condition_variable cv; - unsigned count_node {0}; - unsigned count_test {0}; - + unsigned countDht {0}; + unsigned countJami {0}; { std::unique_lock<std::mutex> l(lock); - dht::PeerDiscovery test_n(MULTICAST_PORT); - dht::PeerDiscovery test_s(MULTICAST_PORT); + dht::PeerDiscovery testDht(MULTICAST_PORT); + dht::PeerDiscovery testJami(MULTICAST_PORT); - test_s.startDiscovery<NodeInsertion>(type,[&](NodeInsertion&& v, dht::SockAddr&& add){ - CPPUNIT_ASSERT_EQUAL(adc.node_port, v.node_port); - CPPUNIT_ASSERT_EQUAL(adc.nodeid, v.nodeid); - CPPUNIT_ASSERT_EQUAL(adc.nid, v.nid); + testJami.startDiscovery<DhtNode>(DHT_NODE_NAME,[&](DhtNode&& v, dht::SockAddr&&){ + CPPUNIT_ASSERT_EQUAL(dhtNode.node_port, v.node_port); + CPPUNIT_ASSERT_EQUAL(dhtNode.nodeid, v.nodeid); + CPPUNIT_ASSERT_EQUAL(dhtNode.nid, v.nid); { std::lock_guard<std::mutex> l(lock); - count_node++; + countDht++; } cv.notify_all(); }); - test_s.startDiscovery(test_type,[&](msgpack::object&& obj, dht::SockAddr&& add){ - auto v = obj.as<TestPack>(); - CPPUNIT_ASSERT_EQUAL(pdd.num, v.num); - CPPUNIT_ASSERT_EQUAL(pdd.cha, v.cha); - CPPUNIT_ASSERT_EQUAL(pdd.str, v.str); + testJami.startDiscovery(JAMI_NODE_NAME,[&](msgpack::object&& obj, dht::SockAddr&&){ + auto v = obj.as<JamiNode>(); + CPPUNIT_ASSERT_EQUAL(jamiNode.num, v.num); + CPPUNIT_ASSERT_EQUAL(jamiNode.cha, v.cha); + CPPUNIT_ASSERT_EQUAL(jamiNode.str, v.str); { std::lock_guard<std::mutex> l(lock); - count_test++; + countJami++; } cv.notify_all(); }); - test_n.startPublish(type, adc); + testDht.startPublish(DHT_NODE_NAME, dhtNode); CPPUNIT_ASSERT(cv.wait_for(l, std::chrono::seconds(5), [&]{ - return count_node > 0; + return countDht > 0; })); - test_n.startPublish(test_type, pdd); + testDht.startPublish(JAMI_NODE_NAME, jamiNode); CPPUNIT_ASSERT(cv.wait_for(l, std::chrono::seconds(5), [&]{ - return count_node > 1 and count_test > 0; + return countDht > 1 and countJami > 0; })); + // we don't verify count values since its a continious multicasting + l.unlock(); - test_n.stopPublish(type); - test_n.stopPublish(test_type); - test_s.stopDiscovery(type); - test_s.stopDiscovery(test_type); + testDht.stopPublish(DHT_NODE_NAME); + testDht.stopPublish(JAMI_NODE_NAME); + testJami.stopDiscovery(DHT_NODE_NAME); + testJami.stopDiscovery(JAMI_NODE_NAME); } } void PeerDiscoveryTester::tearDown(){} -} // namespace test \ No newline at end of file +} // namespace test diff --git a/tests/peerdiscoverytester.h b/tests/peerdiscoverytester.h index a682689038d3bda198ceabb41eb2f4326b28572d..32df1b7a550777831f38c5c461a2339c9f546b21 100644 --- a/tests/peerdiscoverytester.h +++ b/tests/peerdiscoverytester.h @@ -29,7 +29,7 @@ namespace test { class OPENDHT_PUBLIC PeerDiscoveryTester : public CppUnit::TestFixture { CPPUNIT_TEST_SUITE(PeerDiscoveryTester); - CPPUNIT_TEST(testTransmission); + CPPUNIT_TEST(testMulticastToTwoNodes); CPPUNIT_TEST_SUITE_END(); public: @@ -42,9 +42,9 @@ class OPENDHT_PUBLIC PeerDiscoveryTester : public CppUnit::TestFixture { */ void tearDown(); /** - * Test Multicast Transmission Ipv4 + * Test Multicast on two nodes */ - void testTransmission(); + void testMulticastToTwoNodes(); }; } // namespace test