Skip to content
Snippets Groups Projects
Commit a2bbca79 authored by Ming Rui Zhang's avatar Ming Rui Zhang
Browse files

peerdiscovery: add dmtx_ to distinguish send & listen shared resources, add...

peerdiscovery: add dmtx_ to distinguish send & listen shared resources, add stopDiscovery, stoplisten to stop listen or discover certain type of service
parent 0f51115f
No related branches found
No related tags found
No related merge requests found
...@@ -271,12 +271,7 @@ public: ...@@ -271,12 +271,7 @@ public:
/** /**
* Insert known nodes to the routing table by using the received msgpack * Insert known nodes to the routing table by using the received msgpack
*/ */
void nodeInsertionCallback(msgpack::object&& sbuf, SockAddr& add); void nodeInsertionCallback(msgpack::object&& sbuf, SockAddr&& add);
/**
* Fill up the callback map for Peerdiscovery
*/
void callbackmapFill();
/** /**
* Clear the list of bootstrap added using bootstrap(const std::string&, const std::string&). * Clear the list of bootstrap added using bootstrap(const std::string&, const std::string&).
......
...@@ -37,7 +37,7 @@ class OPENDHT_PUBLIC PeerDiscovery ...@@ -37,7 +37,7 @@ class OPENDHT_PUBLIC PeerDiscovery
{ {
public: public:
using ServiceDiscoveredCallback = std::function<void(msgpack::object&&, SockAddr&)>; using ServiceDiscoveredCallback = std::function<void(msgpack::object&&, SockAddr&&)>;
PeerDiscovery(sa_family_t domain, in_port_t port); PeerDiscovery(sa_family_t domain, in_port_t port);
~PeerDiscovery(); ~PeerDiscovery();
...@@ -49,13 +49,23 @@ public: ...@@ -49,13 +49,23 @@ public:
/** /**
* startPublish - Keeping sending data until node is joinned or stop is called - msgpack * startPublish - Keeping sending data until node is joinned or stop is called - msgpack
*/ */
void startPublish(const std::string &type, msgpack::sbuffer &pack_buf); void startPublish(const std::string &type, const msgpack::sbuffer &pack_buf);
/** /**
* Thread Stopper * Thread Stopper
*/ */
void stop(); void stop();
/**
* Remove possible callBack to discovery
*/
void stopDiscovery(const std::string &type);
/**
* Remove different serivce message to send
*/
void stopPublish(const std::string &type);
/** /**
* Configure the sockopt to be able to listen multicast group * Configure the sockopt to be able to listen multicast group
*/ */
...@@ -72,10 +82,9 @@ public: ...@@ -72,10 +82,9 @@ public:
/** /**
* Pack Types/Callback Map * Pack Types/Callback Map
*/ */
static std::map<PackType,std::string> pack_type_;
static std::map<std::string,std::function<void(msgpack::object&&, SockAddr&)>> callbackmap_;
private: private:
std::mutex dmtx_;
std::mutex mtx_; std::mutex mtx_;
std::condition_variable cv_; std::condition_variable cv_;
bool running_ {false}; bool running_ {false};
...@@ -93,6 +102,7 @@ private: ...@@ -93,6 +102,7 @@ private:
msgpack::sbuffer sbuf_; msgpack::sbuffer sbuf_;
msgpack::sbuffer rbuf_; msgpack::sbuffer rbuf_;
std::map<std::string, msgpack::sbuffer> messages_; std::map<std::string, msgpack::sbuffer> messages_;
std::map<std::string, ServiceDiscoveredCallback> callbackmap_;
/** /**
* Multicast Socket Initialization, accept IPV4, IPV6 * Multicast Socket Initialization, accept IPV4, IPV6
...@@ -107,7 +117,7 @@ private: ...@@ -107,7 +117,7 @@ private:
/** /**
* Listener pack thread loop * Listener pack thread loop
*/ */
void listenerpack_thread(const std::string &type, ServiceDiscoveredCallback callback); void listenerpack_thread();
/** /**
* Listener Parameters Setup * Listener Parameters Setup
...@@ -118,11 +128,6 @@ private: ...@@ -118,11 +128,6 @@ private:
* Sender Parameters Setup * Sender Parameters Setup
*/ */
void sender_setup(); void sender_setup();
/**
* Fill in pack type map
*/
void fillPackMap();
}; };
} }
...@@ -38,8 +38,7 @@ namespace dht { ...@@ -38,8 +38,7 @@ namespace dht {
constexpr std::chrono::seconds DhtRunner::BOOTSTRAP_PERIOD; constexpr std::chrono::seconds DhtRunner::BOOTSTRAP_PERIOD;
static constexpr size_t RX_QUEUE_MAX_SIZE = 1024 * 16; static constexpr size_t RX_QUEUE_MAX_SIZE = 1024 * 16;
static constexpr in_port_t PEER_DISCOVERY_PORT = 8888; static constexpr in_port_t PEER_DISCOVERY_PORT = 8888;
std::map<PackType,std::string> PeerDiscovery::pack_type_; static const std::string PEER_DISCOVERY_DHT_SERVICE = "dht";
std::map<std::string,std::function<void(msgpack::object&&, SockAddr&)>> PeerDiscovery::callbackmap_;
struct DhtRunner::Listener { struct DhtRunner::Listener {
size_t tokenClassicDht {0}; size_t tokenClassicDht {0};
...@@ -152,7 +151,6 @@ DhtRunner::run(const SockAddr& local4, const SockAddr& local6, const DhtRunner:: ...@@ -152,7 +151,6 @@ DhtRunner::run(const SockAddr& local4, const SockAddr& local6, const DhtRunner::
} }
}); });
callbackmapFill();
if (config.peer_discovery or config.peer_publish) { if (config.peer_discovery or config.peer_publish) {
try { try {
peerDiscovery4_.reset(new PeerDiscovery(AF_INET, PEER_DISCOVERY_PORT)); peerDiscovery4_.reset(new PeerDiscovery(AF_INET, PEER_DISCOVERY_PORT));
...@@ -168,11 +166,11 @@ DhtRunner::run(const SockAddr& local4, const SockAddr& local6, const DhtRunner:: ...@@ -168,11 +166,11 @@ DhtRunner::run(const SockAddr& local4, const SockAddr& local6, const DhtRunner::
if (config.peer_discovery) { if (config.peer_discovery) {
if (peerDiscovery4_) if (peerDiscovery4_)
peerDiscovery4_->startDiscovery(PeerDiscovery::pack_type_[PackType::NodeInsertion], peerDiscovery4_->startDiscovery(PEER_DISCOVERY_DHT_SERVICE,
PeerDiscovery::callbackmap_[PeerDiscovery::pack_type_[PackType::NodeInsertion]]); std::bind(&DhtRunner::nodeInsertionCallback, this, std::placeholders::_1,std::placeholders::_2));
if (peerDiscovery6_) if (peerDiscovery6_)
peerDiscovery6_->startDiscovery(PeerDiscovery::pack_type_[PackType::NodeInsertion], peerDiscovery6_->startDiscovery(PEER_DISCOVERY_DHT_SERVICE,
PeerDiscovery::callbackmap_[PeerDiscovery::pack_type_[PackType::NodeInsertion]]); std::bind(&DhtRunner::nodeInsertionCallback, this, std::placeholders::_1,std::placeholders::_2));
} }
if (config.peer_publish) { if (config.peer_publish) {
current_node_netid_ = config.dht_config.node_config.network; current_node_netid_ = config.dht_config.node_config.network;
...@@ -183,9 +181,9 @@ DhtRunner::run(const SockAddr& local4, const SockAddr& local6, const DhtRunner:: ...@@ -183,9 +181,9 @@ DhtRunner::run(const SockAddr& local4, const SockAddr& local6, const DhtRunner::
msgpack::sbuffer sbuf_node; msgpack::sbuffer sbuf_node;
msgpack::pack(sbuf_node, adc); msgpack::pack(sbuf_node, adc);
if (peerDiscovery4_) if (peerDiscovery4_)
peerDiscovery4_->startPublish(PeerDiscovery::pack_type_[PackType::NodeInsertion], sbuf_node); peerDiscovery4_->startPublish(PEER_DISCOVERY_DHT_SERVICE, sbuf_node);
if (peerDiscovery6_) if (peerDiscovery6_)
peerDiscovery6_->startPublish(PeerDiscovery::pack_type_[PackType::NodeInsertion], sbuf_node); peerDiscovery6_->startPublish(PEER_DISCOVERY_DHT_SERVICE, sbuf_node);
} }
} }
...@@ -967,7 +965,7 @@ DhtRunner::bootstrap(const InfoHash& id, const SockAddr& address) ...@@ -967,7 +965,7 @@ DhtRunner::bootstrap(const InfoHash& id, const SockAddr& address)
} }
void void
DhtRunner::nodeInsertionCallback(msgpack::object&& obj, SockAddr& add) DhtRunner::nodeInsertionCallback(msgpack::object&& obj, SockAddr&& add)
{ {
auto v = obj.as<NodeInsertionPack>(); auto v = obj.as<NodeInsertionPack>();
add.setPort(v.node_port_); add.setPort(v.node_port_);
...@@ -976,13 +974,6 @@ DhtRunner::nodeInsertionCallback(msgpack::object&& obj, SockAddr& add) ...@@ -976,13 +974,6 @@ DhtRunner::nodeInsertionCallback(msgpack::object&& obj, SockAddr& add)
} }
} }
void
DhtRunner::callbackmapFill()
{
PeerDiscovery::callbackmap_[PeerDiscovery::pack_type_[PackType::NodeInsertion]] =
std::bind(&DhtRunner::nodeInsertionCallback, this, std::placeholders::_1,std::placeholders::_2);
}
void void
DhtRunner::bootstrap(const std::vector<NodeExport>& nodes) DhtRunner::bootstrap(const std::vector<NodeExport>& nodes)
{ {
......
...@@ -18,7 +18,6 @@ ...@@ -18,7 +18,6 @@
#include "peer_discovery.h" #include "peer_discovery.h"
#include "network_utils.h" #include "network_utils.h"
#include "dhtrunner.h"
#ifdef _WIN32 #ifdef _WIN32
#include <Ws2tcpip.h> // needed for ip_mreq definition for multicast #include <Ws2tcpip.h> // needed for ip_mreq definition for multicast
...@@ -40,7 +39,6 @@ constexpr char MULTICAST_ADDRESS_IPV6[8] = "ff05::2"; // Site-local multicast ...@@ -40,7 +39,6 @@ constexpr char MULTICAST_ADDRESS_IPV6[8] = "ff05::2"; // Site-local multicast
PeerDiscovery::PeerDiscovery(sa_family_t domain, in_port_t port) PeerDiscovery::PeerDiscovery(sa_family_t domain, in_port_t port)
: domain_(domain), port_(port), sockfd_(initialize_socket(domain)) : domain_(domain), port_(port), sockfd_(initialize_socket(domain))
{ {
fillPackMap();
socketJoinMulticast(sockfd_, domain); socketJoinMulticast(sockfd_, domain);
} }
...@@ -54,12 +52,6 @@ PeerDiscovery::~PeerDiscovery() ...@@ -54,12 +52,6 @@ PeerDiscovery::~PeerDiscovery()
#endif #endif
} }
void
PeerDiscovery::fillPackMap()
{
pack_type_[PackType::NodeInsertion] = "dht";
}
int int
PeerDiscovery::initialize_socket(sa_family_t domain) PeerDiscovery::initialize_socket(sa_family_t domain)
{ {
...@@ -156,8 +148,12 @@ PeerDiscovery::socketJoinMulticast(int sockfd, sa_family_t family) ...@@ -156,8 +148,12 @@ PeerDiscovery::socketJoinMulticast(int sockfd, sa_family_t family)
void void
PeerDiscovery::startDiscovery(const std::string &type, ServiceDiscoveredCallback callback) PeerDiscovery::startDiscovery(const std::string &type, ServiceDiscoveredCallback callback)
{ {
{
std::unique_lock<std::mutex> lck(dmtx_);
callbackmap_[type] = callback;
}
listener_setup(); listener_setup();
running_listen_ = std::thread(&PeerDiscovery::listenerpack_thread, this, type, callback); running_listen_ = std::thread(&PeerDiscovery::listenerpack_thread, this);
} }
SockAddr SockAddr
...@@ -184,7 +180,7 @@ PeerDiscovery::recvFrom(size_t &buf_size) ...@@ -184,7 +180,7 @@ PeerDiscovery::recvFrom(size_t &buf_size)
} }
void void
PeerDiscovery::listenerpack_thread(const std::string &type, ServiceDiscoveredCallback callback) PeerDiscovery::listenerpack_thread()
{ {
int stopfds_pipe[2]; int stopfds_pipe[2];
#ifndef _WIN32 #ifndef _WIN32
...@@ -236,8 +232,11 @@ PeerDiscovery::listenerpack_thread(const std::string &type, ServiceDiscoveredCal ...@@ -236,8 +232,11 @@ PeerDiscovery::listenerpack_thread(const std::string &type, ServiceDiscoveredCal
if (o.key.type != msgpack::type::STR) if (o.key.type != msgpack::type::STR)
continue; continue;
auto key = o.key.as<std::string>(); auto key = o.key.as<std::string>();
if(key == type)
callback(std::move(o.val),from); std::unique_lock<std::mutex> lck(dmtx_);
auto callback = callbackmap_.find(key);
if (callback != callbackmap_.end())
callback->second(std::move(o.val), std::move(from));
} }
} }
} }
...@@ -258,7 +257,7 @@ PeerDiscovery::sender_setup() ...@@ -258,7 +257,7 @@ PeerDiscovery::sender_setup()
sockAddrSend_.setPort(port_); sockAddrSend_.setPort(port_);
} }
void PeerDiscovery::startPublish(const std::string &type, msgpack::sbuffer &pack_buf) void PeerDiscovery::startPublish(const std::string &type, const msgpack::sbuffer &pack_buf)
{ {
sender_setup(); sender_setup();
//Set up New Sending pack //Set up New Sending pack
...@@ -296,6 +295,36 @@ void PeerDiscovery::startPublish(const std::string &type, msgpack::sbuffer &pack ...@@ -296,6 +295,36 @@ void PeerDiscovery::startPublish(const std::string &type, msgpack::sbuffer &pack
} }
} }
void
PeerDiscovery::stopDiscovery(const std::string &type)
{
{
std::unique_lock<std::mutex> lck(dmtx_);
auto it = callbackmap_.find(type);
if(it != callbackmap_.end()){
callbackmap_.erase(it);
}
}
}
void
PeerDiscovery::stopPublish(const std::string &type)
{
{
std::unique_lock<std::mutex> lck(mtx_);
auto it = messages_.find(type);
if(it != messages_.end()){
messages_.erase(it);
}
msgpack::packer<msgpack::sbuffer> pk(&sbuf_);
pk.pack_map(messages_.size());
for (const auto& m : messages_) {
pk.pack(m.first);
sbuf_.write(m.second.data(), m.second.size());
}
}
}
void void
PeerDiscovery::stop() PeerDiscovery::stop()
{ {
......
...@@ -17,8 +17,8 @@ ...@@ -17,8 +17,8 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>. * along with this program. If not, see <https://www.gnu.org/licenses/>.
*/ */
#include "opendht/value.h"
#include "peerdiscoverytester.h" #include "peerdiscoverytester.h"
#include "opendht/dhtrunner.h"
namespace test { namespace test {
...@@ -52,7 +52,7 @@ void PeerDiscoveryTester::testTransmission_ipv4(){ ...@@ -52,7 +52,7 @@ void PeerDiscoveryTester::testTransmission_ipv4(){
dht::PeerDiscovery test_n(AF_INET, port); dht::PeerDiscovery test_n(AF_INET, port);
dht::PeerDiscovery test_s(AF_INET, port); dht::PeerDiscovery test_s(AF_INET, port);
try{ try{
test_s.startDiscovery(type,[&](msgpack::object&& obj, dht::SockAddr& add){ test_s.startDiscovery(type,[&](msgpack::object&& obj, dht::SockAddr&& add){
auto v = obj.as<NodeInsertion>(); auto v = obj.as<NodeInsertion>();
CPPUNIT_ASSERT_EQUAL(v.node_port_, port_n); CPPUNIT_ASSERT_EQUAL(v.node_port_, port_n);
CPPUNIT_ASSERT_EQUAL(v.nodeid_, data_n); CPPUNIT_ASSERT_EQUAL(v.nodeid_, data_n);
...@@ -92,7 +92,7 @@ void PeerDiscoveryTester::testTransmission_ipv6(){ ...@@ -92,7 +92,7 @@ void PeerDiscoveryTester::testTransmission_ipv6(){
dht::PeerDiscovery test_n(AF_INET6, port); dht::PeerDiscovery test_n(AF_INET6, port);
dht::PeerDiscovery test_s(AF_INET6, port); dht::PeerDiscovery test_s(AF_INET6, port);
try{ try{
test_s.startDiscovery(type,[&](msgpack::object&& obj, dht::SockAddr& add){ test_s.startDiscovery(type,[&](msgpack::object&& obj, dht::SockAddr&& add){
auto v = obj.as<NodeInsertion>(); auto v = obj.as<NodeInsertion>();
CPPUNIT_ASSERT_EQUAL(v.node_port_, port_n); CPPUNIT_ASSERT_EQUAL(v.node_port_, port_n);
CPPUNIT_ASSERT_EQUAL(v.nodeid_, data_n); CPPUNIT_ASSERT_EQUAL(v.nodeid_, data_n);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment