diff --git a/include/opendht/dhtrunner.h b/include/opendht/dhtrunner.h index dcfe95e070a56c5a8e9829b975442209a0dc1150..ceba870be5da5bd7306e6a9f534f8b7fed77a7e4 100644 --- a/include/opendht/dhtrunner.h +++ b/include/opendht/dhtrunner.h @@ -271,12 +271,7 @@ public: /** * Insert known nodes to the routing table by using the received msgpack */ - void nodeInsertionCallback(msgpack::object&& sbuf, SockAddr& add); - - /** - * Fill up the callback map for Peerdiscovery - */ - void callbackmapFill(); + void nodeInsertionCallback(msgpack::object&& sbuf, SockAddr&& add); /** * Clear the list of bootstrap added using bootstrap(const std::string&, const std::string&). diff --git a/include/opendht/peer_discovery.h b/include/opendht/peer_discovery.h index 9dabc026a8cea4be1a52ce81250168a70cb0ae0c..14c75b7dc93164a9057e0ca1225eb1abaacad423 100644 --- a/include/opendht/peer_discovery.h +++ b/include/opendht/peer_discovery.h @@ -37,7 +37,7 @@ class OPENDHT_PUBLIC PeerDiscovery { public: - using ServiceDiscoveredCallback = std::function<void(msgpack::object&&, SockAddr&)>; + using ServiceDiscoveredCallback = std::function<void(msgpack::object&&, SockAddr&&)>; PeerDiscovery(sa_family_t domain, in_port_t port); ~PeerDiscovery(); @@ -49,13 +49,23 @@ public: /** * startPublish - Keeping sending data until node is joinned or stop is called - msgpack */ - void startPublish(const std::string &type, msgpack::sbuffer &pack_buf); + void startPublish(const std::string &type, const msgpack::sbuffer &pack_buf); /** * Thread Stopper */ void stop(); + /** + * Remove possible callBack to discovery + */ + void stopDiscovery(const std::string &type); + + /** + * Remove different serivce message to send + */ + void stopPublish(const std::string &type); + /** * Configure the sockopt to be able to listen multicast group */ @@ -72,10 +82,9 @@ public: /** * Pack Types/Callback Map */ - static std::map<PackType,std::string> pack_type_; - static std::map<std::string,std::function<void(msgpack::object&&, SockAddr&)>> callbackmap_; private: + std::mutex dmtx_; std::mutex mtx_; std::condition_variable cv_; bool running_ {false}; @@ -93,6 +102,7 @@ private: msgpack::sbuffer sbuf_; msgpack::sbuffer rbuf_; std::map<std::string, msgpack::sbuffer> messages_; + std::map<std::string, ServiceDiscoveredCallback> callbackmap_; /** * Multicast Socket Initialization, accept IPV4, IPV6 @@ -107,7 +117,7 @@ private: /** * Listener pack thread loop */ - void listenerpack_thread(const std::string &type, ServiceDiscoveredCallback callback); + void listenerpack_thread(); /** * Listener Parameters Setup @@ -118,11 +128,6 @@ private: * Sender Parameters Setup */ void sender_setup(); - - /** - * Fill in pack type map - */ - void fillPackMap(); }; } diff --git a/src/dhtrunner.cpp b/src/dhtrunner.cpp index 4a0ddfacca7d0d5669fa1ec0b50ed245be465457..6940b2881210ae7718730a86fd79f095ce59bb68 100644 --- a/src/dhtrunner.cpp +++ b/src/dhtrunner.cpp @@ -38,8 +38,7 @@ namespace dht { constexpr std::chrono::seconds DhtRunner::BOOTSTRAP_PERIOD; static constexpr size_t RX_QUEUE_MAX_SIZE = 1024 * 16; static constexpr in_port_t PEER_DISCOVERY_PORT = 8888; -std::map<PackType,std::string> PeerDiscovery::pack_type_; -std::map<std::string,std::function<void(msgpack::object&&, SockAddr&)>> PeerDiscovery::callbackmap_; +static const std::string PEER_DISCOVERY_DHT_SERVICE = "dht"; struct DhtRunner::Listener { size_t tokenClassicDht {0}; @@ -152,7 +151,6 @@ DhtRunner::run(const SockAddr& local4, const SockAddr& local6, const DhtRunner:: } }); - callbackmapFill(); if (config.peer_discovery or config.peer_publish) { try { peerDiscovery4_.reset(new PeerDiscovery(AF_INET, PEER_DISCOVERY_PORT)); @@ -168,11 +166,11 @@ DhtRunner::run(const SockAddr& local4, const SockAddr& local6, const DhtRunner:: if (config.peer_discovery) { if (peerDiscovery4_) - peerDiscovery4_->startDiscovery(PeerDiscovery::pack_type_[PackType::NodeInsertion], - PeerDiscovery::callbackmap_[PeerDiscovery::pack_type_[PackType::NodeInsertion]]); + peerDiscovery4_->startDiscovery(PEER_DISCOVERY_DHT_SERVICE, + std::bind(&DhtRunner::nodeInsertionCallback, this, std::placeholders::_1,std::placeholders::_2)); if (peerDiscovery6_) - peerDiscovery6_->startDiscovery(PeerDiscovery::pack_type_[PackType::NodeInsertion], - PeerDiscovery::callbackmap_[PeerDiscovery::pack_type_[PackType::NodeInsertion]]); + peerDiscovery6_->startDiscovery(PEER_DISCOVERY_DHT_SERVICE, + std::bind(&DhtRunner::nodeInsertionCallback, this, std::placeholders::_1,std::placeholders::_2)); } if (config.peer_publish) { current_node_netid_ = config.dht_config.node_config.network; @@ -183,9 +181,9 @@ DhtRunner::run(const SockAddr& local4, const SockAddr& local6, const DhtRunner:: msgpack::sbuffer sbuf_node; msgpack::pack(sbuf_node, adc); if (peerDiscovery4_) - peerDiscovery4_->startPublish(PeerDiscovery::pack_type_[PackType::NodeInsertion], sbuf_node); + peerDiscovery4_->startPublish(PEER_DISCOVERY_DHT_SERVICE, sbuf_node); if (peerDiscovery6_) - peerDiscovery6_->startPublish(PeerDiscovery::pack_type_[PackType::NodeInsertion], sbuf_node); + peerDiscovery6_->startPublish(PEER_DISCOVERY_DHT_SERVICE, sbuf_node); } } @@ -967,7 +965,7 @@ DhtRunner::bootstrap(const InfoHash& id, const SockAddr& address) } void -DhtRunner::nodeInsertionCallback(msgpack::object&& obj, SockAddr& add) +DhtRunner::nodeInsertionCallback(msgpack::object&& obj, SockAddr&& add) { auto v = obj.as<NodeInsertionPack>(); add.setPort(v.node_port_); @@ -976,13 +974,6 @@ DhtRunner::nodeInsertionCallback(msgpack::object&& obj, SockAddr& add) } } -void -DhtRunner::callbackmapFill() -{ - PeerDiscovery::callbackmap_[PeerDiscovery::pack_type_[PackType::NodeInsertion]] = - std::bind(&DhtRunner::nodeInsertionCallback, this, std::placeholders::_1,std::placeholders::_2); -} - void DhtRunner::bootstrap(const std::vector<NodeExport>& nodes) { diff --git a/src/peer_discovery.cpp b/src/peer_discovery.cpp index ec3eb0d162684076ef0c4b6511113714a170aead..f82acfc55b26882440054e65e7c357a94f24d946 100644 --- a/src/peer_discovery.cpp +++ b/src/peer_discovery.cpp @@ -18,7 +18,6 @@ #include "peer_discovery.h" #include "network_utils.h" -#include "dhtrunner.h" #ifdef _WIN32 #include <Ws2tcpip.h> // needed for ip_mreq definition for multicast @@ -40,7 +39,6 @@ constexpr char MULTICAST_ADDRESS_IPV6[8] = "ff05::2"; // Site-local multicast PeerDiscovery::PeerDiscovery(sa_family_t domain, in_port_t port) : domain_(domain), port_(port), sockfd_(initialize_socket(domain)) { - fillPackMap(); socketJoinMulticast(sockfd_, domain); } @@ -54,12 +52,6 @@ PeerDiscovery::~PeerDiscovery() #endif } -void -PeerDiscovery::fillPackMap() -{ - pack_type_[PackType::NodeInsertion] = "dht"; -} - int PeerDiscovery::initialize_socket(sa_family_t domain) { @@ -155,9 +147,13 @@ PeerDiscovery::socketJoinMulticast(int sockfd, sa_family_t family) void PeerDiscovery::startDiscovery(const std::string &type, ServiceDiscoveredCallback callback) -{ +{ + { + std::unique_lock<std::mutex> lck(dmtx_); + callbackmap_[type] = callback; + } listener_setup(); - running_listen_ = std::thread(&PeerDiscovery::listenerpack_thread, this, type, callback); + running_listen_ = std::thread(&PeerDiscovery::listenerpack_thread, this); } SockAddr @@ -184,7 +180,7 @@ PeerDiscovery::recvFrom(size_t &buf_size) } void -PeerDiscovery::listenerpack_thread(const std::string &type, ServiceDiscoveredCallback callback) +PeerDiscovery::listenerpack_thread() { int stopfds_pipe[2]; #ifndef _WIN32 @@ -236,8 +232,11 @@ PeerDiscovery::listenerpack_thread(const std::string &type, ServiceDiscoveredCal if (o.key.type != msgpack::type::STR) continue; auto key = o.key.as<std::string>(); - if(key == type) - callback(std::move(o.val),from); + + std::unique_lock<std::mutex> lck(dmtx_); + auto callback = callbackmap_.find(key); + if (callback != callbackmap_.end()) + callback->second(std::move(o.val), std::move(from)); } } } @@ -258,7 +257,7 @@ PeerDiscovery::sender_setup() sockAddrSend_.setPort(port_); } -void PeerDiscovery::startPublish(const std::string &type, msgpack::sbuffer &pack_buf) +void PeerDiscovery::startPublish(const std::string &type, const msgpack::sbuffer &pack_buf) { sender_setup(); //Set up New Sending pack @@ -296,6 +295,36 @@ void PeerDiscovery::startPublish(const std::string &type, msgpack::sbuffer &pack } } +void +PeerDiscovery::stopDiscovery(const std::string &type) +{ + { + std::unique_lock<std::mutex> lck(dmtx_); + auto it = callbackmap_.find(type); + if(it != callbackmap_.end()){ + callbackmap_.erase(it); + } + } +} + +void +PeerDiscovery::stopPublish(const std::string &type) +{ + { + std::unique_lock<std::mutex> lck(mtx_); + auto it = messages_.find(type); + if(it != messages_.end()){ + messages_.erase(it); + } + msgpack::packer<msgpack::sbuffer> pk(&sbuf_); + pk.pack_map(messages_.size()); + for (const auto& m : messages_) { + pk.pack(m.first); + sbuf_.write(m.second.data(), m.second.size()); + } + } +} + void PeerDiscovery::stop() { diff --git a/tests/peerdiscoverytester.cpp b/tests/peerdiscoverytester.cpp index 6efe8b929494634b4cce7aebe9c014fdfddd9ffd..5d6d7b853290d786180c550d6921e4f5863a757d 100644 --- a/tests/peerdiscoverytester.cpp +++ b/tests/peerdiscoverytester.cpp @@ -17,8 +17,8 @@ * along with this program. If not, see <https://www.gnu.org/licenses/>. */ +#include "opendht/value.h" #include "peerdiscoverytester.h" -#include "opendht/dhtrunner.h" namespace test { @@ -52,7 +52,7 @@ void PeerDiscoveryTester::testTransmission_ipv4(){ dht::PeerDiscovery test_n(AF_INET, port); dht::PeerDiscovery test_s(AF_INET, port); try{ - test_s.startDiscovery(type,[&](msgpack::object&& obj, dht::SockAddr& add){ + test_s.startDiscovery(type,[&](msgpack::object&& obj, dht::SockAddr&& add){ auto v = obj.as<NodeInsertion>(); CPPUNIT_ASSERT_EQUAL(v.node_port_, port_n); CPPUNIT_ASSERT_EQUAL(v.nodeid_, data_n); @@ -92,7 +92,7 @@ void PeerDiscoveryTester::testTransmission_ipv6(){ dht::PeerDiscovery test_n(AF_INET6, port); dht::PeerDiscovery test_s(AF_INET6, port); try{ - test_s.startDiscovery(type,[&](msgpack::object&& obj, dht::SockAddr& add){ + test_s.startDiscovery(type,[&](msgpack::object&& obj, dht::SockAddr&& add){ auto v = obj.as<NodeInsertion>(); CPPUNIT_ASSERT_EQUAL(v.node_port_, port_n); CPPUNIT_ASSERT_EQUAL(v.nodeid_, data_n);