diff --git a/include/opendht/dhtrunner.h b/include/opendht/dhtrunner.h index ecf016802320628e54bb08fd72912ba223775cd4..2f3be5c1e7ecc2ccb0bec8f9dea94d7bc1267c9d 100644 --- a/include/opendht/dhtrunner.h +++ b/include/opendht/dhtrunner.h @@ -268,6 +268,11 @@ public: */ void bootstrap(const InfoHash& id, const SockAddr& address); + /** + * Insert known nodes to the routing table by using the received msgpack + */ + void nodeInsertionCallback(msgpack::object&& sbuf, SockAddr&& add); + /** * Clear the list of bootstrap added using bootstrap(const std::string&, const std::string&). */ @@ -539,8 +544,9 @@ private: std::string pushToken_; /** PeerDiscovery Parameters */ - std::unique_ptr<PeerDiscovery> peerDiscovery4_; - std::unique_ptr<PeerDiscovery> peerDiscovery6_; + std::unique_ptr<PeerDiscovery> peerDiscovery_; + NetId current_node_netid_; + }; } diff --git a/include/opendht/peer_discovery.h b/include/opendht/peer_discovery.h index 98bb349d80286f111e644b9c176eabed499ccc7d..304554a1acacc9281fbef1942133e353cea4e41d 100644 --- a/include/opendht/peer_discovery.h +++ b/include/opendht/peer_discovery.h @@ -32,20 +32,19 @@ class OPENDHT_PUBLIC PeerDiscovery { public: - using PeerDiscoveredCallback = std::function<void(const InfoHash&, const SockAddr&)>; - - PeerDiscovery(sa_family_t domain, in_port_t port); + using ServiceDiscoveredCallback = std::function<void(msgpack::object&&, SockAddr&&)>; + PeerDiscovery(in_port_t port); ~PeerDiscovery(); /** * startDiscovery - Keep Listening data from the sender until node is joinned or stop is called */ - void startDiscovery(PeerDiscoveredCallback callback); + void startDiscovery(const std::string &type, ServiceDiscoveredCallback callback); /** - * startPublish - Keeping sending data until node is joinned or stop is called + * startPublish - Keeping sending data until node is joinned or stop is called - msgpack */ - void startPublish(const dht::InfoHash &nodeId, in_port_t port_to_send); + void startPublish(const std::string &type, const msgpack::sbuffer &pack_buf); /** * Thread Stopper @@ -53,69 +52,29 @@ public: void stop(); /** - * Configure the sockopt to be able to listen multicast group - */ - static void socketJoinMulticast(int sockfd, sa_family_t family); - - /** - * Join the threads - */ - void join() { - if(running_listen.joinable()) running_listen.join(); - if(running_send.joinable()) running_send.join(); - } - -private: - std::mutex mtx_; - std::condition_variable cv_; - bool running_ {true}; - sa_family_t domain_ {AF_UNSPEC}; - int port_; - int sockfd_ {-1}; - int stop_writefd_ {-1}; - - SockAddr sockAddrSend_; - std::array<uint8_t,dht::InfoHash::size() + sizeof(in_port_t)> data_send_; - - //Thread export to be joined - std::thread running_listen; - std::thread running_send; - dht::InfoHash nodeId_; - - /** - * Multicast Socket Initialization, accept IPV4, IPV6 - */ - static int initialize_socket(sa_family_t domain); - - /** - * Send messages - */ - void sendTo(uint8_t *buf,size_t buf_size); - - /** - * Receive messages + * Remove possible callBack to discovery */ - SockAddr recvFrom(uint8_t *buf, size_t &buf_size); + void stopDiscovery(const std::string &type); /** - * Send thread loop + * Remove different serivce message to send */ - void sender_thread(); + void stopPublish(const std::string &type); /** - * Listener thread loop + * Configure the sockopt to be able to listen multicast group */ - void listener_thread(PeerDiscoveredCallback callback); + static void socketJoinMulticast(int sockfd, sa_family_t family); /** - * Listener Parameters Setup + * Join the threads */ - void listener_setup(); + void join(); - /** - * Sender Parameters Setup - */ - void sender_setup(const dht::InfoHash& nodeId, in_port_t port_to_send); +private: + class DomainPeerDiscovery; + std::unique_ptr<DomainPeerDiscovery> peerDiscovery4_; + std::unique_ptr<DomainPeerDiscovery> peerDiscovery6_; }; } diff --git a/src/dhtrunner.cpp b/src/dhtrunner.cpp index 23e2782ccad816ea30edf1ee3fd736f6ed9466fd..1a34bcc00e98756f2f7e6100c455ea35e54d1bd4 100644 --- a/src/dhtrunner.cpp +++ b/src/dhtrunner.cpp @@ -38,6 +38,7 @@ namespace dht { constexpr std::chrono::seconds DhtRunner::BOOTSTRAP_PERIOD; static constexpr size_t RX_QUEUE_MAX_SIZE = 1024 * 16; static constexpr in_port_t PEER_DISCOVERY_PORT = 8888; +static const std::string PEER_DISCOVERY_DHT_SERVICE = "dht"; struct DhtRunner::Listener { size_t tokenClassicDht {0}; @@ -48,6 +49,14 @@ struct DhtRunner::Listener { Where w; }; +class OPENDHT_PUBLIC NodeInsertionPack{ +public: + dht::InfoHash nodeid_; + in_port_t node_port_; + dht::NetId nid_; + MSGPACK_DEFINE(nodeid_, node_port_, nid_) +}; + DhtRunner::DhtRunner() : dht_() #ifdef OPENDHT_PROXY_CLIENT , dht_via_proxy_() @@ -143,32 +152,24 @@ DhtRunner::run(const SockAddr& local4, const SockAddr& local6, const DhtRunner:: }); if (config.peer_discovery or config.peer_publish) { - try { - peerDiscovery4_.reset(new PeerDiscovery(AF_INET, PEER_DISCOVERY_PORT)); - } catch(const std::exception& e){ - std::cerr << "Can't start peer discovery (IPv4): " << e.what() << std::endl; - } - try { - peerDiscovery6_.reset(new PeerDiscovery(AF_INET6, PEER_DISCOVERY_PORT)); - } catch(const std::exception& e) { - std::cerr << "Can't start peer discovery (IPv6): " << e.what() << std::endl; - } + peerDiscovery_.reset(new PeerDiscovery(PEER_DISCOVERY_PORT)); } + if (config.peer_discovery) { - using sig = void (DhtRunner::*)(const InfoHash&, const SockAddr&); - if (peerDiscovery4_) - peerDiscovery4_->startDiscovery(std::bind(static_cast<sig>(&DhtRunner::bootstrap), this, - std::placeholders::_1,std::placeholders::_2)); - - if (peerDiscovery6_) - peerDiscovery6_->startDiscovery(std::bind(static_cast<sig>(&DhtRunner::bootstrap), this, - std::placeholders::_1,std::placeholders::_2)); + if (peerDiscovery_) + peerDiscovery_->startDiscovery(PEER_DISCOVERY_DHT_SERVICE, + std::bind(&DhtRunner::nodeInsertionCallback, this, std::placeholders::_1,std::placeholders::_2)); } if (config.peer_publish) { - if (peerDiscovery4_) - peerDiscovery4_->startPublish(dht_->getNodeId(), getBoundPort(AF_INET)); - if (peerDiscovery6_) - peerDiscovery6_->startPublish(dht_->getNodeId(), getBoundPort(AF_INET6)); + current_node_netid_ = config.dht_config.node_config.network; + NodeInsertionPack adc; + adc.nid_ = current_node_netid_; + adc.node_port_ = getBoundPort(); + adc.nodeid_ = dht_->getNodeId(); + msgpack::sbuffer sbuf_node; + msgpack::pack(sbuf_node, adc); + if (peerDiscovery_) + peerDiscovery_->startPublish(PEER_DISCOVERY_DHT_SERVICE, sbuf_node); } } @@ -192,8 +193,7 @@ DhtRunner::join() running = false; cv.notify_all(); bootstrap_cv.notify_all(); - if (peerDiscovery4_) peerDiscovery4_->stop(); - if (peerDiscovery6_) peerDiscovery6_->stop(); + if (peerDiscovery_) peerDiscovery_->stop(); if (dht_thread.joinable()) dht_thread.join(); @@ -202,8 +202,7 @@ DhtRunner::join() if (rcv_thread.joinable()) rcv_thread.join(); - if (peerDiscovery4_) peerDiscovery4_->join(); - if (peerDiscovery6_) peerDiscovery6_->join(); + if (peerDiscovery_) peerDiscovery_->join(); { std::lock_guard<std::mutex> lck(storage_mtx); @@ -949,6 +948,20 @@ DhtRunner::bootstrap(const InfoHash& id, const SockAddr& address) cv.notify_all(); } +void +DhtRunner::nodeInsertionCallback(msgpack::object&& obj, SockAddr&& add) +{ + try { + auto v = obj.as<NodeInsertionPack>(); + add.setPort(v.node_port_); + if(v.nodeid_ != dht_->getNodeId() && current_node_netid_ == v.nid_){ + bootstrap(v.nodeid_, add); + } + } catch(const msgpack::type_error &e){ + std::cerr << "Msgpack Info Invalid: " << e.what() << '\n'; + } +} + void DhtRunner::bootstrap(const std::vector<NodeExport>& nodes) { @@ -988,8 +1001,7 @@ DhtRunner::findCertificate(InfoHash hash, std::function<void(const std::shared_p void DhtRunner::resetDht() { - peerDiscovery4_.reset(); - peerDiscovery6_.reset(); + peerDiscovery_.reset(); #ifdef OPENDHT_PROXY_CLIENT listeners_.clear(); dht_via_proxy_.reset(); diff --git a/src/peer_discovery.cpp b/src/peer_discovery.cpp index a521c40ba2c456f68bfb3191c2db6b67b3417514..60fcb7b6757d34ccc711abfe7f4f016540d32996 100644 --- a/src/peer_discovery.cpp +++ b/src/peer_discovery.cpp @@ -18,6 +18,7 @@ #include "peer_discovery.h" #include "network_utils.h" +#include "utils.h" #ifdef _WIN32 #include <Ws2tcpip.h> // needed for ip_mreq definition for multicast @@ -40,13 +41,111 @@ namespace dht { constexpr char MULTICAST_ADDRESS_IPV4[10] = "224.0.0.1"; constexpr char MULTICAST_ADDRESS_IPV6[8] = "ff05::2"; // Site-local multicast -PeerDiscovery::PeerDiscovery(sa_family_t domain, in_port_t port) + +class PeerDiscovery::DomainPeerDiscovery +{ +public: + DomainPeerDiscovery(sa_family_t domain, in_port_t port); + ~DomainPeerDiscovery(); + + /** + * startDiscovery - Keep Listening data from the sender until node is joinned or stop is called + */ + void startDiscovery(const std::string &type, ServiceDiscoveredCallback callback); + + /** + * startPublish - Keeping sending data until node is joinned or stop is called - msgpack + */ + void startPublish(const std::string &type, const msgpack::sbuffer &pack_buf); + + /** + * Thread Stopper + */ + void stop(); + + /** + * Remove possible callBack to discovery + */ + void stopDiscovery(const std::string &type); + + /** + * Remove different serivce message to send + */ + void stopPublish(const std::string &type); + + /** + * Configure the sockopt to be able to listen multicast group + */ + static void socketJoinMulticast(int sockfd, sa_family_t family); + + /** + * Join the threads + */ + void join() { + if(running_listen_.joinable()) running_listen_.join(); + if(running_send_.joinable()) running_send_.join(); + } +private: + //dmtx_ for callbackmap_ and drunning_ (write) + std::mutex dmtx_; + //mtx_ for messages_ and lrunning (listen) + std::mutex mtx_; + std::condition_variable cv_; + bool lrunning_ {false}; + bool drunning_ {false}; + sa_family_t domain_ {AF_UNSPEC}; + int port_; + int sockfd_ {-1}; + int stop_writefd_ {-1}; + + SockAddr sockAddrSend_; + + //Thread export to be joined + std::thread running_listen_; + std::thread running_send_; + + msgpack::sbuffer sbuf_; + msgpack::sbuffer rbuf_; + std::map<std::string, msgpack::sbuffer> messages_; + std::map<std::string, ServiceDiscoveredCallback> callbackmap_; + + /** + * Multicast Socket Initialization, accept IPV4, IPV6 + */ + static int initialize_socket(sa_family_t domain); + + /** + * Receive messages + */ + std::pair<SockAddr, Blob> recvFrom(); + + /** + * Listener pack thread loop + */ + void listenerpack_thread(); + + /** + * Listener Parameters Setup + */ + void listener_setup(); + + /** + * Sender Parameters Setup + */ + void sender_setup(); + /** + * Sender Parameters Setup + */ + void messages_reload(); +}; + +PeerDiscovery::DomainPeerDiscovery::DomainPeerDiscovery(sa_family_t domain, in_port_t port) : domain_(domain), port_(port), sockfd_(initialize_socket(domain)) { socketJoinMulticast(sockfd_, domain); } -PeerDiscovery::~PeerDiscovery() +PeerDiscovery::DomainPeerDiscovery::~DomainPeerDiscovery() { if (sockfd_ != -1) close(sockfd_); @@ -57,7 +156,7 @@ PeerDiscovery::~PeerDiscovery() } int -PeerDiscovery::initialize_socket(sa_family_t domain) +PeerDiscovery::DomainPeerDiscovery::initialize_socket(sa_family_t domain) { #ifdef _WIN32 WSADATA wsaData; @@ -75,7 +174,7 @@ PeerDiscovery::initialize_socket(sa_family_t domain) } void -PeerDiscovery::listener_setup() +PeerDiscovery::DomainPeerDiscovery::listener_setup() { SockAddr sockAddrListen_; sockAddrListen_.setFamily(domain_); @@ -99,7 +198,7 @@ PeerDiscovery::listener_setup() } void -PeerDiscovery::socketJoinMulticast(int sockfd, sa_family_t family) +PeerDiscovery::DomainPeerDiscovery::socketJoinMulticast(int sockfd, sa_family_t family) { switch (family) { @@ -152,31 +251,28 @@ PeerDiscovery::socketJoinMulticast(int sockfd, sa_family_t family) } void -PeerDiscovery::sendTo(uint8_t *buf, size_t buf_size) +PeerDiscovery::DomainPeerDiscovery::startDiscovery(const std::string &type, ServiceDiscoveredCallback callback) { - ssize_t nbytes = sendto( - sockfd_, - (char*)buf, - buf_size, - 0, - sockAddrSend_.get(), - sockAddrSend_.getLength() - ); - if (nbytes < 0) { - throw std::runtime_error(std::string("Error sending packet: ") + strerror(errno)); + std::unique_lock<std::mutex> lck(dmtx_); + callbackmap_[type] = callback; + if (not drunning_){ + drunning_ = true; + listener_setup(); + running_listen_ = std::thread(&DomainPeerDiscovery::listenerpack_thread, this); } } -SockAddr -PeerDiscovery::recvFrom(uint8_t *buf, size_t& buf_size) +std::pair<SockAddr, Blob> +PeerDiscovery::DomainPeerDiscovery::recvFrom() { sockaddr_storage storeage_recv; socklen_t sa_len = sizeof(storeage_recv); + std::array<uint8_t, 64 * 1024> recv; ssize_t nbytes = recvfrom( sockfd_, - (char*)buf, - buf_size, + recv.data(), + recv.size(), 0, (sockaddr*)&storeage_recv, &sa_len @@ -184,46 +280,13 @@ PeerDiscovery::recvFrom(uint8_t *buf, size_t& buf_size) if (nbytes < 0) { throw std::runtime_error(std::string("Error receiving packet: ") + strerror(errno)); } - - buf_size = nbytes; + SockAddr ret {storeage_recv, sa_len}; - return ret; + return {ret, Blob(recv.begin(), recv.end())}; } -void -PeerDiscovery::sender_setup(const dht::InfoHash& nodeId, in_port_t port_to_send) -{ - nodeId_ = nodeId; - // Setup sender address - sockAddrSend_.setFamily(domain_); - sockAddrSend_.setAddress(domain_ == AF_INET ? MULTICAST_ADDRESS_IPV4 : MULTICAST_ADDRESS_IPV6); - sockAddrSend_.setPort(port_); - - // Setup sent data - std::copy_n(nodeId.cbegin(), nodeId.size(), data_send_.begin()); - auto portAddr = reinterpret_cast<in_port_t*>(data_send_.data() + dht::InfoHash::size()); - *portAddr = htons(port_to_send); -} - -void -PeerDiscovery::sender_thread() -{ - while(true) { - try { - sendTo(data_send_.data(), data_send_.size()); - } catch (const std::exception& e) { - std::cerr << e.what() << std::endl; - } - { - std::unique_lock<std::mutex> lck(mtx_); - if (cv_.wait_for(lck,std::chrono::seconds(3),[&]{ return !running_; })) - break; - } - } -} - -void -PeerDiscovery::listener_thread(PeerDiscoveredCallback callback) +void +PeerDiscovery::DomainPeerDiscovery::listenerpack_thread() { int stopfds_pipe[2]; #ifndef _WIN32 @@ -235,7 +298,7 @@ PeerDiscovery::listener_thread(PeerDiscoveredCallback callback) int stop_readfd = stopfds_pipe[0]; stop_writefd_ = stopfds_pipe[1]; - while(true) { + while (true) { fd_set readfds; FD_ZERO(&readfds); @@ -245,8 +308,8 @@ PeerDiscovery::listener_thread(PeerDiscoveredCallback callback) int data_coming = select(sockfd_ > stop_readfd ? sockfd_ + 1 : stop_readfd + 1, &readfds, nullptr, nullptr, nullptr); { - std::unique_lock<std::mutex> lck(mtx_); - if (not running_) + std::unique_lock<std::mutex> lck(dmtx_); + if (not drunning_) break; } @@ -263,23 +326,22 @@ PeerDiscovery::listener_thread(PeerDiscoveredCallback callback) recv(stop_readfd, (char*)buf.data(), buf.size(), 0); } - std::array<uint8_t,dht::InfoHash::size() + sizeof(in_port_t)> data_receive; - size_t data_receive_size = data_receive.size(); - auto from = recvFrom(data_receive.data(), data_receive_size); + auto rcv = recvFrom(); + msgpack::object_handle oh = msgpack::unpack(reinterpret_cast<char*>(rcv.second.data()), rcv.second.size()); + msgpack::object obj = oh.get(); - // Data_receive_size as a value-result member will hlep to filter packs - if(data_receive_size != data_receive.size()){ - // std::cerr << "Received invalid peer discovery packet" << std::endl; + if (obj.type != msgpack::type::MAP) continue; - } - - dht::InfoHash nodeId; - std::copy_n(data_receive.begin(), dht::InfoHash::size(), nodeId.begin()); - auto portAddr = reinterpret_cast<in_port_t*>(data_receive.data() + dht::InfoHash::size()); - auto port = ntohs(*portAddr); - if (nodeId != nodeId_){ - from.setPort(port); - callback(nodeId, from); + for (unsigned i = 0; i < obj.via.map.size; i++) { + auto& o = obj.via.map.ptr[i]; + if (o.key.type != msgpack::type::STR) + continue; + auto key = o.key.as<std::string>(); + std::unique_lock<std::mutex> lck(dmtx_); + auto callback = callbackmap_.find(key); + if (callback != callbackmap_.end()){ + callback->second(std::move(o.val), std::move(rcv.first)); + } } } } @@ -291,26 +353,82 @@ PeerDiscovery::listener_thread(PeerDiscoveredCallback callback) } } -void -PeerDiscovery::startDiscovery(PeerDiscoveredCallback callback) +void +PeerDiscovery::DomainPeerDiscovery::sender_setup() { - listener_setup(); - running_listen = std::thread(&PeerDiscovery::listener_thread, this, callback); + // Setup sender address + sockAddrSend_.setFamily(domain_); + sockAddrSend_.setAddress(domain_ == AF_INET ? MULTICAST_ADDRESS_IPV4 : MULTICAST_ADDRESS_IPV6); + sockAddrSend_.setPort(port_); } -void -PeerDiscovery::startPublish(const dht::InfoHash& nodeId, in_port_t port_to_send) +void +PeerDiscovery::DomainPeerDiscovery::startPublish(const std::string &type, const msgpack::sbuffer &pack_buf) { - sender_setup(nodeId, port_to_send); - running_send = std::thread(&PeerDiscovery::sender_thread, this); + //Set up New Sending pack + msgpack::sbuffer pack_buf_c; + pack_buf_c.write(pack_buf.data(),pack_buf.size()); + + std::unique_lock<std::mutex> lck(mtx_); + messages_[type] = std::move(pack_buf_c); + messages_reload(); + if (not lrunning_) { + lrunning_ = true; + sender_setup(); + running_send_ = std::thread([this](){ + std::unique_lock<std::mutex> lck(mtx_); + while (lrunning_) { + ssize_t nbytes = sendto( + sockfd_, + sbuf_.data(), + sbuf_.size(), + 0, + sockAddrSend_.get(), + sockAddrSend_.getLength() + ); + if (nbytes < 0) { + std::cerr << "Error sending packet: " << strerror(errno) << std::endl; + } + if (cv_.wait_for(lck,std::chrono::seconds(3),[&]{ return !lrunning_; })) + break; + } + }); + } +} + +void +PeerDiscovery::DomainPeerDiscovery::stopDiscovery(const std::string &type) +{ + { + std::unique_lock<std::mutex> lck(dmtx_); + auto it = callbackmap_.find(type); + if(it != callbackmap_.end()){ + callbackmap_.erase(it); + } + } +} + +void +PeerDiscovery::DomainPeerDiscovery::stopPublish(const std::string &type) +{ + std::unique_lock<std::mutex> lck(mtx_); + auto it = messages_.find(type); + if(it != messages_.end()){ + messages_.erase(it); + } + messages_reload(); } void -PeerDiscovery::stop() +PeerDiscovery::DomainPeerDiscovery::stop() { { std::unique_lock<std::mutex> lck(mtx_); - running_ = false; + lrunning_ = false; + } + { + std::unique_lock<std::mutex> lck(dmtx_); + drunning_ = false; } cv_.notify_all(); if (stop_writefd_ != -1) { @@ -320,4 +438,94 @@ PeerDiscovery::stop() } } +void +PeerDiscovery::DomainPeerDiscovery::messages_reload() +{ + sbuf_.clear(); + msgpack::packer<msgpack::sbuffer> pk(&sbuf_); + pk.pack_map(messages_.size()); + for (const auto& m : messages_) { + pk.pack(m.first); + sbuf_.write(m.second.data(), m.second.size()); + } +} + +PeerDiscovery::PeerDiscovery(in_port_t port) +{ + try { + peerDiscovery4_.reset(new DomainPeerDiscovery(AF_INET, port)); + } catch(const std::exception& e){ + peerDiscovery4_.reset(nullptr); + std::cerr << "Can't start peer discovery (IPv4): " << e.what() << std::endl; + } + try { + peerDiscovery6_.reset(new DomainPeerDiscovery(AF_INET6, port)); + } catch(const std::exception& e) { + peerDiscovery6_.reset(nullptr); + std::cerr << "Can't start peer discovery (IPv6): " << e.what() << std::endl; + } + +} + +PeerDiscovery::~PeerDiscovery(){} + +/** + * startDiscovery - Keep Listening data from the sender until node is joinned or stop is called +*/ +void +PeerDiscovery::startDiscovery(const std::string &type, ServiceDiscoveredCallback callback) +{ + if(peerDiscovery4_) peerDiscovery4_->startDiscovery(type, callback); + if(peerDiscovery6_) peerDiscovery6_->startDiscovery(type, callback); +} + +/** + * startPublish - Keeping sending data until node is joinned or stop is called - msgpack +*/ +void +PeerDiscovery::startPublish(const std::string &type, const msgpack::sbuffer &pack_buf) +{ + if(peerDiscovery4_) peerDiscovery4_->startPublish(type, pack_buf); + if(peerDiscovery6_) peerDiscovery6_->startPublish(type, pack_buf); +} + +/** + * Thread Stopper +*/ +void +PeerDiscovery::stop() +{ + if(peerDiscovery4_) peerDiscovery4_->stop(); + if(peerDiscovery6_) peerDiscovery6_->stop(); +} + +/** + * Remove possible callBack to discovery +*/ +void +PeerDiscovery::stopDiscovery(const std::string &type) +{ + if(peerDiscovery4_) peerDiscovery4_->stopDiscovery(type); + if(peerDiscovery6_) peerDiscovery6_->stopDiscovery(type); +} + +/** + * Remove different serivce message to send +*/ +void +PeerDiscovery::stopPublish(const std::string &type) +{ + if(peerDiscovery4_) peerDiscovery4_->stopPublish(type); + if(peerDiscovery6_) peerDiscovery6_->stopPublish(type); +} + +/** + * Join the threads +*/ +void +PeerDiscovery::join() { + if(peerDiscovery4_) peerDiscovery4_->join(); + if(peerDiscovery6_) peerDiscovery6_->join(); +} + } diff --git a/tests/peerdiscoverytester.cpp b/tests/peerdiscoverytester.cpp index 4f3b0535a760d0ace3d83148621aa11abad65b16..2454f2324bfec696b5f55986d7c96d432fd96c9b 100644 --- a/tests/peerdiscoverytester.cpp +++ b/tests/peerdiscoverytester.cpp @@ -18,71 +18,86 @@ */ #include "peerdiscoverytester.h" +#include "opendht/value.h" namespace test { +class NodeInsertion{ +public: + dht::InfoHash nodeid_; + in_port_t node_port_; + dht::NetId nid_; + MSGPACK_DEFINE(nodeid_, node_port_, nid_) +}; + +class TestPack{ +public: + int num; + char cha; + std::string str; + MSGPACK_DEFINE(num, cha, str) +}; + CPPUNIT_TEST_SUITE_REGISTRATION(PeerDiscoveryTester); void PeerDiscoveryTester::setUp(){} -void PeerDiscoveryTester::testTransmission_ipv4(){ +void PeerDiscoveryTester::testTransmission(){ // Node for getnode id + const std::string type {"dht"}; + const std::string test_type {"pdd"}; dht::InfoHash data_n = dht::InfoHash::get("applepin"); int port = 2222; in_port_t port_n = 50000; - try{ - dht::PeerDiscovery test_n(AF_INET, port); - dht::PeerDiscovery test_s(AF_INET, port); - try{ - test_s.startDiscovery([&](const dht::InfoHash& node, const dht::SockAddr& addr){ - CPPUNIT_ASSERT_EQUAL(data_n, node); - CPPUNIT_ASSERT_EQUAL(port_n, addr.getPort()); - }); - - test_n.startPublish(data_n,port_n); - std::this_thread::sleep_for(std::chrono::seconds(5)); - test_n.stop(); - test_s.stop(); - test_n.join(); - test_s.join(); - } catch(std::exception &exception){ - perror(exception.what()); - CPPUNIT_ASSERT(false); - } - } catch(std::exception &exception){ - perror(exception.what()); - } -} + msgpack::sbuffer sbuf; + NodeInsertion adc; + adc.nid_ = 10; + adc.node_port_ = port_n; + adc.nodeid_ = data_n; + msgpack::pack(sbuf,adc); -void PeerDiscoveryTester::testTransmission_ipv6(){ + msgpack::sbuffer pbuf; + TestPack pdd; + pdd.num = 100; + pdd.cha = 'a'; + pdd.str = "apple"; + msgpack::pack(pbuf,pdd); - // Node for getnode id - dht::InfoHash data_n = dht::InfoHash::get("applepin"); - int port = 3333; - in_port_t port_n = 50001; try{ - dht::PeerDiscovery test_n(AF_INET6, port); - dht::PeerDiscovery test_s(AF_INET6, port); - + dht::PeerDiscovery test_n(port); + dht::PeerDiscovery test_s(port); try{ - test_s.startDiscovery([&](const dht::InfoHash& node, const dht::SockAddr& addr){ - CPPUNIT_ASSERT_EQUAL(data_n, node); - CPPUNIT_ASSERT_EQUAL(port_n, addr.getPort()); + test_s.startDiscovery(type,[&](msgpack::object&& obj, dht::SockAddr&& add){ + auto v = obj.as<NodeInsertion>(); + CPPUNIT_ASSERT_EQUAL(v.node_port_, port_n); + CPPUNIT_ASSERT_EQUAL(v.nodeid_, data_n); }); - test_n.startPublish(data_n,port_n); + test_s.startDiscovery(test_type,[&](msgpack::object&& obj, dht::SockAddr&& add){ + auto v = obj.as<TestPack>(); + CPPUNIT_ASSERT_EQUAL(v.num, 100); + CPPUNIT_ASSERT_EQUAL(v.cha, 'a'); + }); + test_n.startPublish(type, sbuf); std::this_thread::sleep_for(std::chrono::seconds(5)); + test_n.startPublish(test_type, pbuf); + std::this_thread::sleep_for(std::chrono::seconds(5)); + test_n.stopPublish(test_type); + + std::this_thread::sleep_for(std::chrono::seconds(10)); test_n.stop(); test_s.stop(); test_n.join(); test_s.join(); } catch(std::exception &exception){ + perror(exception.what()); CPPUNIT_ASSERT(false); } - } catch(std::exception &exception) { + } catch(std::exception &exception){ + perror(exception.what()); } } diff --git a/tests/peerdiscoverytester.h b/tests/peerdiscoverytester.h index b1f1dd9e8b004a8ec0666eb8f840080deca2c6cc..a682689038d3bda198ceabb41eb2f4326b28572d 100644 --- a/tests/peerdiscoverytester.h +++ b/tests/peerdiscoverytester.h @@ -26,11 +26,10 @@ namespace test { -class PeerDiscoveryTester : public CppUnit::TestFixture { +class OPENDHT_PUBLIC PeerDiscoveryTester : public CppUnit::TestFixture { CPPUNIT_TEST_SUITE(PeerDiscoveryTester); - CPPUNIT_TEST(testTransmission_ipv4); - CPPUNIT_TEST(testTransmission_ipv6); + CPPUNIT_TEST(testTransmission); CPPUNIT_TEST_SUITE_END(); public: @@ -45,12 +44,7 @@ class PeerDiscoveryTester : public CppUnit::TestFixture { /** * Test Multicast Transmission Ipv4 */ - void testTransmission_ipv4(); - /** - * Test Multicast Transmission Ipv6 - */ - void testTransmission_ipv6(); - + void testTransmission(); }; } // namespace test