diff --git a/include/opendht/peer_discovery.h b/include/opendht/peer_discovery.h index 73146a67ecb617331ae0eb4e01992575e6d4e007..7f17acdb9006bca558d37d5924c18a52db0448b1 100644 --- a/include/opendht/peer_discovery.h +++ b/include/opendht/peer_discovery.h @@ -41,12 +41,26 @@ public: */ void startDiscovery(const std::string &type, ServiceDiscoveredCallback callback); + template<typename T> + void startDiscovery(const std::string &type, std::function<void(T&&, SockAddr&&)> cb) { + startDiscovery(type, [cb](msgpack::object&& ob, SockAddr&& addr) { + cb(ob.as<T>(), std::move(addr)); + }); + } + /** * startPublish - Keeping sending data until node is joinned or stop is called - msgpack */ void startPublish(const std::string &type, const msgpack::sbuffer &pack_buf); void startPublish(sa_family_t domain, const std::string &type, const msgpack::sbuffer &pack_buf); + template<typename T> + void startPublish(const std::string &type, const T& object) { + msgpack::sbuffer buf; + msgpack::pack(buf, object); + startPublish(type, buf); + } + /** * Thread Stopper */ @@ -55,12 +69,13 @@ public: /** * Remove possible callBack to discovery */ - void stopDiscovery(const std::string &type); + bool stopDiscovery(const std::string &type); /** * Remove different serivce message to send */ - void stopPublish(const std::string &type); + bool stopPublish(const std::string &type); + bool stopPublish(sa_family_t domain, const std::string &type); /** * Configure the sockopt to be able to listen multicast group diff --git a/src/peer_discovery.cpp b/src/peer_discovery.cpp index 56bd8534aad50e47b2d03abd4ecf111d1d29e8eb..fef091414052b0bc75306f8411f65e0f509f3890 100644 --- a/src/peer_discovery.cpp +++ b/src/peer_discovery.cpp @@ -66,12 +66,12 @@ public: /** * Remove possible callBack to discovery */ - void stopDiscovery(const std::string &type); + bool stopDiscovery(const std::string &type); /** * Remove different serivce message to send */ - void stopPublish(const std::string &type); + bool stopPublish(const std::string &type); /** * Configure the sockopt to be able to listen multicast group @@ -393,27 +393,22 @@ PeerDiscovery::DomainPeerDiscovery::startPublish(const std::string &type, const } } -void +bool PeerDiscovery::DomainPeerDiscovery::stopDiscovery(const std::string &type) { - { - std::unique_lock<std::mutex> lck(dmtx_); - auto it = callbackmap_.find(type); - if(it != callbackmap_.end()){ - callbackmap_.erase(it); - } - } + std::unique_lock<std::mutex> lck(dmtx_); + return callbackmap_.erase(type) > 0; } -void +bool PeerDiscovery::DomainPeerDiscovery::stopPublish(const std::string &type) { std::unique_lock<std::mutex> lck(mtx_); - auto it = messages_.find(type); - if(it != messages_.end()){ - messages_.erase(it); + if (messages_.erase(type) > 0) { + messages_reload(); + return true; } - messages_reload(); + return false; } void @@ -464,9 +459,6 @@ PeerDiscovery::PeerDiscovery(in_port_t port) PeerDiscovery::~PeerDiscovery(){} -/** - * startDiscovery - Keep Listening data from the sender until node is joinned or stop is called -*/ void PeerDiscovery::startDiscovery(const std::string &type, ServiceDiscoveredCallback callback) { @@ -474,9 +466,6 @@ PeerDiscovery::startDiscovery(const std::string &type, ServiceDiscoveredCallback if(peerDiscovery6_) peerDiscovery6_->startDiscovery(type, callback); } -/** - * startPublish - Keeping sending data until node is joinned or stop is called - msgpack -*/ void PeerDiscovery::startPublish(const std::string &type, const msgpack::sbuffer &pack_buf) { @@ -487,15 +476,13 @@ PeerDiscovery::startPublish(const std::string &type, const msgpack::sbuffer &pac void PeerDiscovery::startPublish(sa_family_t domain, const std::string &type, const msgpack::sbuffer &pack_buf) { - if (domain == AF_INET && peerDiscovery4_) - peerDiscovery4_->startPublish(type, pack_buf); - else if (domain == AF_INET6 && peerDiscovery6_) - peerDiscovery6_->startPublish(type, pack_buf); + if (domain == AF_INET) { + if (peerDiscovery4_) peerDiscovery4_->startPublish(type, pack_buf); + } else if (domain == AF_INET6) { + if (peerDiscovery6_) peerDiscovery6_->startPublish(type, pack_buf); + } } -/** - * Thread Stopper -*/ void PeerDiscovery::stop() { @@ -503,29 +490,32 @@ PeerDiscovery::stop() if(peerDiscovery6_) peerDiscovery6_->stop(); } -/** - * Remove possible callBack to discovery -*/ -void +bool PeerDiscovery::stopDiscovery(const std::string &type) { - if(peerDiscovery4_) peerDiscovery4_->stopDiscovery(type); - if(peerDiscovery6_) peerDiscovery6_->stopDiscovery(type); + bool stopped4 = peerDiscovery4_ and peerDiscovery4_->stopDiscovery(type); + bool stopped6 = peerDiscovery6_ and peerDiscovery6_->stopDiscovery(type); + return stopped4 or stopped6; } -/** - * Remove different serivce message to send -*/ -void +bool PeerDiscovery::stopPublish(const std::string &type) { - if(peerDiscovery4_) peerDiscovery4_->stopPublish(type); - if(peerDiscovery6_) peerDiscovery6_->stopPublish(type); + bool stopped4 = peerDiscovery4_ and peerDiscovery4_->stopPublish(type); + bool stopped6 = peerDiscovery6_ and peerDiscovery6_->stopPublish(type); + return stopped4 or stopped6; +} + +bool +PeerDiscovery::stopPublish(sa_family_t domain, const std::string &type) +{ + if (domain == AF_INET) { + return peerDiscovery4_ and peerDiscovery4_->stopPublish(type); + } else if (domain == AF_INET6) { + return peerDiscovery6_ and peerDiscovery6_->stopPublish(type); + } } -/** - * Join the threads -*/ void PeerDiscovery::join() { if(peerDiscovery4_) peerDiscovery4_->join();