diff --git a/include/opendht/dhtrunner.h b/include/opendht/dhtrunner.h index c593732b81a440513f8d7b85f8640fa27fe15e2d..dcfe95e070a56c5a8e9829b975442209a0dc1150 100644 --- a/include/opendht/dhtrunner.h +++ b/include/opendht/dhtrunner.h @@ -42,13 +42,6 @@ struct Node; class SecureDht; class PeerDiscovery; struct SecureDhtConfig; -class OPENDHT_PUBLIC NodeInsertionPack{ -public: - dht::InfoHash nodeid_; - in_port_t node_port_; - dht::NetId nid_; - MSGPACK_DEFINE(nodeid_, node_port_, nid_) -}; /** * Provides a thread-safe interface to run the (secure) DHT. @@ -558,9 +551,7 @@ private: /** PeerDiscovery Parameters */ std::unique_ptr<PeerDiscovery> peerDiscovery4_; std::unique_ptr<PeerDiscovery> peerDiscovery6_; - const std::string pack_type_ {"dht"}; NetId current_node_netid_; - std::map<std::string,std::function<void(msgpack::object&&, SockAddr&)>> callbackmap_; }; diff --git a/include/opendht/peer_discovery.h b/include/opendht/peer_discovery.h index bea44eab347298dea621f08f94016399515ddc86..9dabc026a8cea4be1a52ce81250168a70cb0ae0c 100644 --- a/include/opendht/peer_discovery.h +++ b/include/opendht/peer_discovery.h @@ -28,23 +28,28 @@ namespace dht { +enum class PackType +{ + NodeInsertion = 0 +}; + class OPENDHT_PUBLIC PeerDiscovery { public: - using PeerDiscoveredPackCallback = std::function<void(msgpack::object&&, SockAddr&)>; + using ServiceDiscoveredCallback = std::function<void(msgpack::object&&, SockAddr&)>; PeerDiscovery(sa_family_t domain, in_port_t port); ~PeerDiscovery(); /** * startDiscovery - Keep Listening data from the sender until node is joinned or stop is called */ - void startDiscovery(const std::string &type, PeerDiscoveredPackCallback callback); + void startDiscovery(const std::string &type, ServiceDiscoveredCallback callback); /** * startPublish - Keeping sending data until node is joinned or stop is called - msgpack */ - void startPublish(const std::string &type, msgpack::sbuffer && pack_buf); + void startPublish(const std::string &type, msgpack::sbuffer &pack_buf); /** * Thread Stopper @@ -64,10 +69,16 @@ public: if(running_send_.joinable()) running_send_.join(); } + /** + * Pack Types/Callback Map + */ + static std::map<PackType,std::string> pack_type_; + static std::map<std::string,std::function<void(msgpack::object&&, SockAddr&)>> callbackmap_; + private: std::mutex mtx_; std::condition_variable cv_; - bool running_ {true}; + bool running_ {false}; sa_family_t domain_ {AF_UNSPEC}; int port_; int sockfd_ {-1}; @@ -78,35 +89,25 @@ private: //Thread export to be joined std::thread running_listen_; std::thread running_send_; - dht::InfoHash nodeId_; msgpack::sbuffer sbuf_; msgpack::sbuffer rbuf_; + std::map<std::string, msgpack::sbuffer> messages_; /** * Multicast Socket Initialization, accept IPV4, IPV6 */ static int initialize_socket(sa_family_t domain); - /** - * Send pack messages - */ - void sendTo(); - /** * Receive messages */ SockAddr recvFrom(size_t &buf_size); - - /** - * Send pack thread loop - */ - void senderpack_thread(); - + /** * Listener pack thread loop */ - void listenerpack_thread(const std::string &type, PeerDiscoveredPackCallback callback); + void listenerpack_thread(const std::string &type, ServiceDiscoveredCallback callback); /** * Listener Parameters Setup @@ -116,7 +117,12 @@ private: /** * Sender Parameters Setup */ - void sender_setup(const std::string &type, msgpack::sbuffer && pack_buf); + void sender_setup(); + + /** + * Fill in pack type map + */ + void fillPackMap(); }; } diff --git a/src/dhtrunner.cpp b/src/dhtrunner.cpp index 8f5293de733dd4c6b6b4eac4c6e5c1e0a2048090..4a0ddfacca7d0d5669fa1ec0b50ed245be465457 100644 --- a/src/dhtrunner.cpp +++ b/src/dhtrunner.cpp @@ -38,6 +38,8 @@ namespace dht { constexpr std::chrono::seconds DhtRunner::BOOTSTRAP_PERIOD; static constexpr size_t RX_QUEUE_MAX_SIZE = 1024 * 16; static constexpr in_port_t PEER_DISCOVERY_PORT = 8888; +std::map<PackType,std::string> PeerDiscovery::pack_type_; +std::map<std::string,std::function<void(msgpack::object&&, SockAddr&)>> PeerDiscovery::callbackmap_; struct DhtRunner::Listener { size_t tokenClassicDht {0}; @@ -48,12 +50,19 @@ struct DhtRunner::Listener { Where w; }; +class OPENDHT_PUBLIC NodeInsertionPack{ +public: + dht::InfoHash nodeid_; + in_port_t node_port_; + dht::NetId nid_; + MSGPACK_DEFINE(nodeid_, node_port_, nid_) +}; + DhtRunner::DhtRunner() : dht_() #ifdef OPENDHT_PROXY_CLIENT , dht_via_proxy_() #endif //OPENDHT_PROXY_CLIENT { - callbackmapFill(); #ifdef _WIN32 WSADATA wsd; if (WSAStartup(MAKEWORD(2,2), &wsd) != 0) @@ -143,6 +152,7 @@ DhtRunner::run(const SockAddr& local4, const SockAddr& local6, const DhtRunner:: } }); + callbackmapFill(); if (config.peer_discovery or config.peer_publish) { try { peerDiscovery4_.reset(new PeerDiscovery(AF_INET, PEER_DISCOVERY_PORT)); @@ -155,28 +165,27 @@ DhtRunner::run(const SockAddr& local4, const SockAddr& local6, const DhtRunner:: std::cerr << "Can't start peer discovery (IPv6): " << e.what() << std::endl; } } - - current_node_netid_ = config.dht_config.node_config.network; - NodeInsertionPack adc; - adc.nid_ = current_node_netid_; - adc.node_port_ = getBoundPort(); - adc.nodeid_ = dht_->getNodeId(); - msgpack::sbuffer sbuf_node_v4; - msgpack::sbuffer sbuf_node_v6; - msgpack::pack(sbuf_node_v4, adc); - msgpack::pack(sbuf_node_v6, adc); - + if (config.peer_discovery) { if (peerDiscovery4_) - peerDiscovery4_->startDiscovery(pack_type_,callbackmap_[pack_type_]); + peerDiscovery4_->startDiscovery(PeerDiscovery::pack_type_[PackType::NodeInsertion], + PeerDiscovery::callbackmap_[PeerDiscovery::pack_type_[PackType::NodeInsertion]]); if (peerDiscovery6_) - peerDiscovery6_->startDiscovery(pack_type_,callbackmap_[pack_type_]); + peerDiscovery6_->startDiscovery(PeerDiscovery::pack_type_[PackType::NodeInsertion], + PeerDiscovery::callbackmap_[PeerDiscovery::pack_type_[PackType::NodeInsertion]]); } if (config.peer_publish) { + current_node_netid_ = config.dht_config.node_config.network; + NodeInsertionPack adc; + adc.nid_ = current_node_netid_; + adc.node_port_ = getBoundPort(); + adc.nodeid_ = dht_->getNodeId(); + msgpack::sbuffer sbuf_node; + msgpack::pack(sbuf_node, adc); if (peerDiscovery4_) - peerDiscovery4_->startPublish(pack_type_, std::move(sbuf_node_v4)); + peerDiscovery4_->startPublish(PeerDiscovery::pack_type_[PackType::NodeInsertion], sbuf_node); if (peerDiscovery6_) - peerDiscovery6_->startPublish(pack_type_, std::move(sbuf_node_v6)); + peerDiscovery6_->startPublish(PeerDiscovery::pack_type_[PackType::NodeInsertion], sbuf_node); } } @@ -962,7 +971,7 @@ DhtRunner::nodeInsertionCallback(msgpack::object&& obj, SockAddr& add) { auto v = obj.as<NodeInsertionPack>(); add.setPort(v.node_port_); - if(v.nodeid_ != getNodeId() && current_node_netid_ == v.nid_){ + if(v.nodeid_ != dht_->getNodeId() && current_node_netid_ == v.nid_){ bootstrap(v.nodeid_, add); } } @@ -970,8 +979,8 @@ DhtRunner::nodeInsertionCallback(msgpack::object&& obj, SockAddr& add) void DhtRunner::callbackmapFill() { - callbackmap_[pack_type_] = std::bind(&DhtRunner::nodeInsertionCallback, this, - std::placeholders::_1,std::placeholders::_2); + PeerDiscovery::callbackmap_[PeerDiscovery::pack_type_[PackType::NodeInsertion]] = + std::bind(&DhtRunner::nodeInsertionCallback, this, std::placeholders::_1,std::placeholders::_2); } void diff --git a/src/peer_discovery.cpp b/src/peer_discovery.cpp index b27a8507550d76273563d0c3c32d120638ef908d..ec3eb0d162684076ef0c4b6511113714a170aead 100644 --- a/src/peer_discovery.cpp +++ b/src/peer_discovery.cpp @@ -40,6 +40,7 @@ constexpr char MULTICAST_ADDRESS_IPV6[8] = "ff05::2"; // Site-local multicast PeerDiscovery::PeerDiscovery(sa_family_t domain, in_port_t port) : domain_(domain), port_(port), sockfd_(initialize_socket(domain)) { + fillPackMap(); socketJoinMulticast(sockfd_, domain); } @@ -53,6 +54,12 @@ PeerDiscovery::~PeerDiscovery() #endif } +void +PeerDiscovery::fillPackMap() +{ + pack_type_[PackType::NodeInsertion] = "dht"; +} + int PeerDiscovery::initialize_socket(sa_family_t domain) { @@ -147,7 +154,7 @@ PeerDiscovery::socketJoinMulticast(int sockfd, sa_family_t family) } void -PeerDiscovery::startDiscovery(const std::string &type, PeerDiscoveredPackCallback callback) +PeerDiscovery::startDiscovery(const std::string &type, ServiceDiscoveredCallback callback) { listener_setup(); running_listen_ = std::thread(&PeerDiscovery::listenerpack_thread, this, type, callback); @@ -177,7 +184,7 @@ PeerDiscovery::recvFrom(size_t &buf_size) } void -PeerDiscovery::listenerpack_thread(const std::string &type, PeerDiscoveredPackCallback callback) +PeerDiscovery::listenerpack_thread(const std::string &type, ServiceDiscoveredCallback callback) { int stopfds_pipe[2]; #ifndef _WIN32 @@ -229,7 +236,6 @@ PeerDiscovery::listenerpack_thread(const std::string &type, PeerDiscoveredPackCa if (o.key.type != msgpack::type::STR) continue; auto key = o.key.as<std::string>(); - if(key == type) callback(std::move(o.val),from); } @@ -244,65 +250,49 @@ PeerDiscovery::listenerpack_thread(const std::string &type, PeerDiscoveredPackCa } void -PeerDiscovery::sender_setup(const std::string &type, msgpack::sbuffer && pack_buf) +PeerDiscovery::sender_setup() { // Setup sender address sockAddrSend_.setFamily(domain_); sockAddrSend_.setAddress(domain_ == AF_INET ? MULTICAST_ADDRESS_IPV4 : MULTICAST_ADDRESS_IPV6); sockAddrSend_.setPort(port_); +} - //Set up Sending pack - if(type == "dht"){ - msgpack::object_handle oh = msgpack::unpack(pack_buf.data(), pack_buf.size()); - msgpack::object obj = oh.get(); - auto v = obj.as<NodeInsertionPack>(); - nodeId_ = v.nodeid_; - } - std::map<std::string, msgpack::sbuffer> messages; - messages[type] = std::move(pack_buf); +void PeerDiscovery::startPublish(const std::string &type, msgpack::sbuffer &pack_buf) +{ + sender_setup(); + //Set up New Sending pack + msgpack::sbuffer pack_buf_c; + pack_buf_c.write(pack_buf.data(),pack_buf.size()); + + std::unique_lock<std::mutex> lck(mtx_); + messages_[type] = std::move(pack_buf_c); msgpack::packer<msgpack::sbuffer> pk(&sbuf_); - pk.pack_map(messages.size()); - for (const auto& m : messages) { + pk.pack_map(messages_.size()); + for (const auto& m : messages_) { pk.pack(m.first); sbuf_.write(m.second.data(), m.second.size()); } -} - -void PeerDiscovery::startPublish(const std::string &type, msgpack::sbuffer && pack_buf) -{ - sender_setup(type,std::move(pack_buf)); - running_send_ = std::thread(&PeerDiscovery::senderpack_thread, this); -} - -void -PeerDiscovery::senderpack_thread() -{ - while(true) { - try { - sendTo(); - } catch (const std::exception& e) { - std::cerr << e.what() << std::endl; - } - { + if (not running_) { + running_ = true; + running_send_ = std::thread([this](){ std::unique_lock<std::mutex> lck(mtx_); - if (cv_.wait_for(lck,std::chrono::seconds(3),[&]{ return !running_; })) - break; - } - } -} - -void PeerDiscovery::sendTo() -{ - ssize_t nbytes = sendto( - sockfd_, - sbuf_.data(), - sbuf_.size(), - 0, - sockAddrSend_.get(), - sockAddrSend_.getLength() - ); - if (nbytes < 0) { - throw std::runtime_error(std::string("Error sending packet: ") + strerror(errno)); + while (running_) { + ssize_t nbytes = sendto( + sockfd_, + sbuf_.data(), + sbuf_.size(), + 0, + sockAddrSend_.get(), + sockAddrSend_.getLength() + ); + if (nbytes < 0) { + std::cerr << "Error sending packet: " << strerror(errno) << std::endl; + } + if (cv_.wait_for(lck,std::chrono::seconds(3),[&]{ return !running_; })) + break; + } + }); } } diff --git a/tests/peerdiscoverytester.cpp b/tests/peerdiscoverytester.cpp index 1a0747e816dd768ab7c3cbd6c4c6787bedb38aa4..6efe8b929494634b4cce7aebe9c014fdfddd9ffd 100644 --- a/tests/peerdiscoverytester.cpp +++ b/tests/peerdiscoverytester.cpp @@ -22,6 +22,14 @@ namespace test { +class NodeInsertion{ +public: + dht::InfoHash nodeid_; + in_port_t node_port_; + dht::NetId nid_; + MSGPACK_DEFINE(nodeid_, node_port_, nid_) +}; + CPPUNIT_TEST_SUITE_REGISTRATION(PeerDiscoveryTester); void PeerDiscoveryTester::setUp(){} @@ -35,7 +43,7 @@ void PeerDiscoveryTester::testTransmission_ipv4(){ in_port_t port_n = 50000; dht::NetId netid = 10; msgpack::sbuffer sbuf; - dht::NodeInsertionPack adc; + NodeInsertion adc; adc.nid_ = 10; adc.node_port_ = port_n; adc.nodeid_ = data_n; @@ -45,12 +53,12 @@ void PeerDiscoveryTester::testTransmission_ipv4(){ dht::PeerDiscovery test_s(AF_INET, port); try{ test_s.startDiscovery(type,[&](msgpack::object&& obj, dht::SockAddr& add){ - auto v = obj.as<dht::NodeInsertionPack>(); + auto v = obj.as<NodeInsertion>(); CPPUNIT_ASSERT_EQUAL(v.node_port_, port_n); CPPUNIT_ASSERT_EQUAL(v.nodeid_, data_n); }); - test_n.startPublish(type, std::move(sbuf)); + test_n.startPublish(type, sbuf); std::this_thread::sleep_for(std::chrono::seconds(5)); test_n.stop(); @@ -75,7 +83,7 @@ void PeerDiscoveryTester::testTransmission_ipv6(){ in_port_t port_n = 50000; dht::NetId netid = 10; msgpack::sbuffer sbuf; - dht::NodeInsertionPack adc; + NodeInsertion adc; adc.nid_ = 10; adc.node_port_ = port_n; adc.nodeid_ = data_n; @@ -85,12 +93,12 @@ void PeerDiscoveryTester::testTransmission_ipv6(){ dht::PeerDiscovery test_s(AF_INET6, port); try{ test_s.startDiscovery(type,[&](msgpack::object&& obj, dht::SockAddr& add){ - auto v = obj.as<dht::NodeInsertionPack>(); + auto v = obj.as<NodeInsertion>(); CPPUNIT_ASSERT_EQUAL(v.node_port_, port_n); CPPUNIT_ASSERT_EQUAL(v.nodeid_, data_n); }); - test_n.startPublish(type, std::move(sbuf)); + test_n.startPublish(type, sbuf); std::this_thread::sleep_for(std::chrono::seconds(5)); test_n.stop();