diff --git a/include/opendht/dhtrunner.h b/include/opendht/dhtrunner.h index 34677c16fa56fd15552a32c92e8e20e7107ceba5..c593732b81a440513f8d7b85f8640fa27fe15e2d 100644 --- a/include/opendht/dhtrunner.h +++ b/include/opendht/dhtrunner.h @@ -278,7 +278,7 @@ public: /** * Insert known nodes to the routing table by using the received msgpack */ - void nodeInsertionCallback(std::string& type, msgpack::object&& sbuf, SockAddr& add); + void nodeInsertionCallback(msgpack::object&& sbuf, SockAddr& add); /** * Fill up the callback map for Peerdiscovery @@ -558,8 +558,9 @@ private: /** PeerDiscovery Parameters */ std::unique_ptr<PeerDiscovery> peerDiscovery4_; std::unique_ptr<PeerDiscovery> peerDiscovery6_; + const std::string pack_type_ {"dht"}; NetId current_node_netid_; - std::map<std::string,std::function<void(std::string&, msgpack::object&&, SockAddr&)>> callbackmap_; + std::map<std::string,std::function<void(msgpack::object&&, SockAddr&)>> callbackmap_; }; diff --git a/include/opendht/peer_discovery.h b/include/opendht/peer_discovery.h index dde0953aca37272f276a6b30e763ba06440e2157..bea44eab347298dea621f08f94016399515ddc86 100644 --- a/include/opendht/peer_discovery.h +++ b/include/opendht/peer_discovery.h @@ -32,19 +32,19 @@ class OPENDHT_PUBLIC PeerDiscovery { public: - using PeerDiscoveredPackCallback = std::function<void(std::string&, msgpack::object&&, SockAddr&)>; + using PeerDiscoveredPackCallback = std::function<void(msgpack::object&&, SockAddr&)>; PeerDiscovery(sa_family_t domain, in_port_t port); ~PeerDiscovery(); /** * startDiscovery - Keep Listening data from the sender until node is joinned or stop is called */ - void startDiscovery(PeerDiscoveredPackCallback callback); + void startDiscovery(const std::string &type, PeerDiscoveredPackCallback callback); /** * startPublish - Keeping sending data until node is joinned or stop is called - msgpack */ - void startPublish(const std::string &type, msgpack::sbuffer && pack_buf, const dht::InfoHash &nodeId); + void startPublish(const std::string &type, msgpack::sbuffer && pack_buf); /** * Thread Stopper @@ -106,7 +106,7 @@ private: /** * Listener pack thread loop */ - void listenerpack_thread(PeerDiscoveredPackCallback callback); + void listenerpack_thread(const std::string &type, PeerDiscoveredPackCallback callback); /** * Listener Parameters Setup @@ -116,7 +116,7 @@ private: /** * Sender Parameters Setup */ - void sender_setup(const std::string &type, msgpack::sbuffer && pack_buf, const dht::InfoHash &nodeId); + void sender_setup(const std::string &type, msgpack::sbuffer && pack_buf); }; } diff --git a/src/dhtrunner.cpp b/src/dhtrunner.cpp index 3832e92075fad6bf405d7350b257a7a67a4c00bb..8f5293de733dd4c6b6b4eac4c6e5c1e0a2048090 100644 --- a/src/dhtrunner.cpp +++ b/src/dhtrunner.cpp @@ -160,7 +160,7 @@ DhtRunner::run(const SockAddr& local4, const SockAddr& local6, const DhtRunner:: NodeInsertionPack adc; adc.nid_ = current_node_netid_; adc.node_port_ = getBoundPort(); - adc.nodeid_ = getNodeId(); + adc.nodeid_ = dht_->getNodeId(); msgpack::sbuffer sbuf_node_v4; msgpack::sbuffer sbuf_node_v6; msgpack::pack(sbuf_node_v4, adc); @@ -168,15 +168,15 @@ DhtRunner::run(const SockAddr& local4, const SockAddr& local6, const DhtRunner:: if (config.peer_discovery) { if (peerDiscovery4_) - peerDiscovery4_->startDiscovery(callbackmap_["dht"]); + peerDiscovery4_->startDiscovery(pack_type_,callbackmap_[pack_type_]); if (peerDiscovery6_) - peerDiscovery6_->startDiscovery(callbackmap_["dht"]); + peerDiscovery6_->startDiscovery(pack_type_,callbackmap_[pack_type_]); } if (config.peer_publish) { if (peerDiscovery4_) - peerDiscovery4_->startPublish("dht", std::move(sbuf_node_v4), dht_->getNodeId()); + peerDiscovery4_->startPublish(pack_type_, std::move(sbuf_node_v4)); if (peerDiscovery6_) - peerDiscovery6_->startPublish("dht", std::move(sbuf_node_v6), dht_->getNodeId()); + peerDiscovery6_->startPublish(pack_type_, std::move(sbuf_node_v6)); } } @@ -958,22 +958,20 @@ DhtRunner::bootstrap(const InfoHash& id, const SockAddr& address) } void -DhtRunner::nodeInsertionCallback(std::string& type, msgpack::object&& obj, SockAddr& add) +DhtRunner::nodeInsertionCallback(msgpack::object&& obj, SockAddr& add) { - if(type == "dht"){ - auto v = obj.as<NodeInsertionPack>(); - add.setPort(v.node_port_); - if(v.nodeid_ != getNodeId() && current_node_netid_ == v.nid_){ - bootstrap(v.nodeid_, add); - } + auto v = obj.as<NodeInsertionPack>(); + add.setPort(v.node_port_); + if(v.nodeid_ != getNodeId() && current_node_netid_ == v.nid_){ + bootstrap(v.nodeid_, add); } } void DhtRunner::callbackmapFill() { - callbackmap_["dht"] = std::bind(&DhtRunner::nodeInsertionCallback, this, - std::placeholders::_1,std::placeholders::_2,std::placeholders::_3); + callbackmap_[pack_type_] = std::bind(&DhtRunner::nodeInsertionCallback, this, + std::placeholders::_1,std::placeholders::_2); } void diff --git a/src/peer_discovery.cpp b/src/peer_discovery.cpp index 03970b90c5911e8105c1602a1fc69920725848be..b27a8507550d76273563d0c3c32d120638ef908d 100644 --- a/src/peer_discovery.cpp +++ b/src/peer_discovery.cpp @@ -18,6 +18,7 @@ #include "peer_discovery.h" #include "network_utils.h" +#include "dhtrunner.h" #ifdef _WIN32 #include <Ws2tcpip.h> // needed for ip_mreq definition for multicast @@ -146,10 +147,10 @@ PeerDiscovery::socketJoinMulticast(int sockfd, sa_family_t family) } void -PeerDiscovery::startDiscovery(PeerDiscoveredPackCallback callback) +PeerDiscovery::startDiscovery(const std::string &type, PeerDiscoveredPackCallback callback) { listener_setup(); - running_listen_ = std::thread(&PeerDiscovery::listenerpack_thread, this, callback); + running_listen_ = std::thread(&PeerDiscovery::listenerpack_thread, this, type, callback); } SockAddr @@ -176,7 +177,7 @@ PeerDiscovery::recvFrom(size_t &buf_size) } void -PeerDiscovery::listenerpack_thread(PeerDiscoveredPackCallback callback) +PeerDiscovery::listenerpack_thread(const std::string &type, PeerDiscoveredPackCallback callback) { int stopfds_pipe[2]; #ifndef _WIN32 @@ -229,7 +230,8 @@ PeerDiscovery::listenerpack_thread(PeerDiscoveredPackCallback callback) continue; auto key = o.key.as<std::string>(); - callback(key,std::move(o.val),from); + if(key == type) + callback(std::move(o.val),from); } } } @@ -242,15 +244,20 @@ PeerDiscovery::listenerpack_thread(PeerDiscoveredPackCallback callback) } void -PeerDiscovery::sender_setup(const std::string &type, msgpack::sbuffer && pack_buf, const dht::InfoHash &nodeId) +PeerDiscovery::sender_setup(const std::string &type, msgpack::sbuffer && pack_buf) { - nodeId_ = nodeId; // Setup sender address sockAddrSend_.setFamily(domain_); sockAddrSend_.setAddress(domain_ == AF_INET ? MULTICAST_ADDRESS_IPV4 : MULTICAST_ADDRESS_IPV6); sockAddrSend_.setPort(port_); //Set up Sending pack + if(type == "dht"){ + msgpack::object_handle oh = msgpack::unpack(pack_buf.data(), pack_buf.size()); + msgpack::object obj = oh.get(); + auto v = obj.as<NodeInsertionPack>(); + nodeId_ = v.nodeid_; + } std::map<std::string, msgpack::sbuffer> messages; messages[type] = std::move(pack_buf); msgpack::packer<msgpack::sbuffer> pk(&sbuf_); @@ -261,9 +268,9 @@ PeerDiscovery::sender_setup(const std::string &type, msgpack::sbuffer && pack_bu } } -void PeerDiscovery::startPublish(const std::string &type, msgpack::sbuffer && pack_buf, const dht::InfoHash &nodeId) +void PeerDiscovery::startPublish(const std::string &type, msgpack::sbuffer && pack_buf) { - sender_setup(type,std::move(pack_buf),nodeId); + sender_setup(type,std::move(pack_buf)); running_send_ = std::thread(&PeerDiscovery::senderpack_thread, this); } diff --git a/tests/peerdiscoverytester.cpp b/tests/peerdiscoverytester.cpp index 8f7ae8fcb1c79bf85c4ed22480db41578220acb8..1a0747e816dd768ab7c3cbd6c4c6787bedb38aa4 100644 --- a/tests/peerdiscoverytester.cpp +++ b/tests/peerdiscoverytester.cpp @@ -29,6 +29,7 @@ void PeerDiscoveryTester::setUp(){} void PeerDiscoveryTester::testTransmission_ipv4(){ // Node for getnode id + const std::string type {"dht"}; dht::InfoHash data_n = dht::InfoHash::get("applepin"); int port = 2222; in_port_t port_n = 50000; @@ -43,13 +44,13 @@ void PeerDiscoveryTester::testTransmission_ipv4(){ dht::PeerDiscovery test_n(AF_INET, port); dht::PeerDiscovery test_s(AF_INET, port); try{ - test_s.startDiscovery([&](std::string& type, msgpack::object&& obj, dht::SockAddr& add){ + test_s.startDiscovery(type,[&](msgpack::object&& obj, dht::SockAddr& add){ auto v = obj.as<dht::NodeInsertionPack>(); CPPUNIT_ASSERT_EQUAL(v.node_port_, port_n); CPPUNIT_ASSERT_EQUAL(v.nodeid_, data_n); }); - test_n.startPublish("dht", std::move(sbuf),data_n); + test_n.startPublish(type, std::move(sbuf)); std::this_thread::sleep_for(std::chrono::seconds(5)); test_n.stop(); @@ -68,6 +69,7 @@ void PeerDiscoveryTester::testTransmission_ipv4(){ void PeerDiscoveryTester::testTransmission_ipv6(){ // Node for getnode id + const std::string type {"dht"}; dht::InfoHash data_n = dht::InfoHash::get("applepin"); int port = 2222; in_port_t port_n = 50000; @@ -82,13 +84,13 @@ void PeerDiscoveryTester::testTransmission_ipv6(){ dht::PeerDiscovery test_n(AF_INET6, port); dht::PeerDiscovery test_s(AF_INET6, port); try{ - test_s.startDiscovery([&](std::string& type, msgpack::object&& obj, dht::SockAddr& add){ + test_s.startDiscovery(type,[&](msgpack::object&& obj, dht::SockAddr& add){ auto v = obj.as<dht::NodeInsertionPack>(); CPPUNIT_ASSERT_EQUAL(v.node_port_, port_n); CPPUNIT_ASSERT_EQUAL(v.nodeid_, data_n); }); - test_n.startPublish("dht", std::move(sbuf),data_n); + test_n.startPublish(type, std::move(sbuf)); std::this_thread::sleep_for(std::chrono::seconds(5)); test_n.stop();