diff --git a/include/opendht/dhtrunner.h b/include/opendht/dhtrunner.h index 2f3be5c1e7ecc2ccb0bec8f9dea94d7bc1267c9d..a7fa51b9e9b1f2a2f4cd08745e5544697dbde675 100644 --- a/include/opendht/dhtrunner.h +++ b/include/opendht/dhtrunner.h @@ -268,11 +268,6 @@ public: */ void bootstrap(const InfoHash& id, const SockAddr& address); - /** - * Insert known nodes to the routing table by using the received msgpack - */ - void nodeInsertionCallback(msgpack::object&& sbuf, SockAddr&& add); - /** * Clear the list of bootstrap added using bootstrap(const std::string&, const std::string&). */ @@ -545,8 +540,6 @@ private: /** PeerDiscovery Parameters */ std::unique_ptr<PeerDiscovery> peerDiscovery_; - NetId current_node_netid_; - }; } diff --git a/include/opendht/peer_discovery.h b/include/opendht/peer_discovery.h index 304554a1acacc9281fbef1942133e353cea4e41d..73146a67ecb617331ae0eb4e01992575e6d4e007 100644 --- a/include/opendht/peer_discovery.h +++ b/include/opendht/peer_discovery.h @@ -45,6 +45,7 @@ public: * startPublish - Keeping sending data until node is joinned or stop is called - msgpack */ void startPublish(const std::string &type, const msgpack::sbuffer &pack_buf); + void startPublish(sa_family_t domain, const std::string &type, const msgpack::sbuffer &pack_buf); /** * Thread Stopper diff --git a/src/dhtrunner.cpp b/src/dhtrunner.cpp index 1a34bcc00e98756f2f7e6100c455ea35e54d1bd4..11492b340d4306d8afcc98fb4ccf7b2e58373337 100644 --- a/src/dhtrunner.cpp +++ b/src/dhtrunner.cpp @@ -154,22 +154,37 @@ DhtRunner::run(const SockAddr& local4, const SockAddr& local6, const DhtRunner:: if (config.peer_discovery or config.peer_publish) { peerDiscovery_.reset(new PeerDiscovery(PEER_DISCOVERY_PORT)); } - + + auto netId = config.dht_config.node_config.network; if (config.peer_discovery) { - if (peerDiscovery_) - peerDiscovery_->startDiscovery(PEER_DISCOVERY_DHT_SERVICE, - std::bind(&DhtRunner::nodeInsertionCallback, this, std::placeholders::_1,std::placeholders::_2)); + peerDiscovery_->startDiscovery(PEER_DISCOVERY_DHT_SERVICE, [this, netId](msgpack::object&& obj, SockAddr&& add){ + try { + auto v = obj.as<NodeInsertionPack>(); + add.setPort(v.node_port_); + if(v.nodeid_ != dht_->getNodeId() && netId == v.nid_){ + bootstrap(v.nodeid_, add); + } + } catch(const msgpack::type_error &e){ + std::cerr << "Msgpack Info Invalid: " << e.what() << '\n'; + } + }); } if (config.peer_publish) { - current_node_netid_ = config.dht_config.node_config.network; + msgpack::sbuffer sbuf_node; + // IPv4 NodeInsertionPack adc; - adc.nid_ = current_node_netid_; - adc.node_port_ = getBoundPort(); + adc.nid_ = netId; + adc.node_port_ = getBoundPort(AF_INET); adc.nodeid_ = dht_->getNodeId(); - msgpack::sbuffer sbuf_node; msgpack::pack(sbuf_node, adc); - if (peerDiscovery_) - peerDiscovery_->startPublish(PEER_DISCOVERY_DHT_SERVICE, sbuf_node); + peerDiscovery_->startPublish(AF_INET, PEER_DISCOVERY_DHT_SERVICE, sbuf_node); + // IPv6 + adc.nid_ = netId; + adc.node_port_ = getBoundPort(AF_INET6); + adc.nodeid_ = dht_->getNodeId(); + sbuf_node.clear(); + msgpack::pack(sbuf_node, adc); + peerDiscovery_->startPublish(AF_INET6, PEER_DISCOVERY_DHT_SERVICE, sbuf_node); } } @@ -604,6 +619,8 @@ DhtRunner::startNetwork(const SockAddr sin4, const SockAddr sin6) rcv.emplace(ReceivedPacket {Blob {buf.begin(), buf.begin()+rc+1}, SockAddr(from, from_len), clock::now()}); } cv.notify_all(); + } else if (rc == -1) { + std::cerr << "Error receiving packet: " << strerror(errno) << std::endl; } } } @@ -948,20 +965,6 @@ DhtRunner::bootstrap(const InfoHash& id, const SockAddr& address) cv.notify_all(); } -void -DhtRunner::nodeInsertionCallback(msgpack::object&& obj, SockAddr&& add) -{ - try { - auto v = obj.as<NodeInsertionPack>(); - add.setPort(v.node_port_); - if(v.nodeid_ != dht_->getNodeId() && current_node_netid_ == v.nid_){ - bootstrap(v.nodeid_, add); - } - } catch(const msgpack::type_error &e){ - std::cerr << "Msgpack Info Invalid: " << e.what() << '\n'; - } -} - void DhtRunner::bootstrap(const std::vector<NodeExport>& nodes) { diff --git a/src/network_engine.cpp b/src/network_engine.cpp index 8a1197db51ad29d2fd9e484d4fd689011efc9514..60bfec9aa2b9d1dec05c2a9449cc5e3fef453cb0 100644 --- a/src/network_engine.cpp +++ b/src/network_engine.cpp @@ -43,9 +43,7 @@ constexpr std::chrono::seconds NetworkEngine::RX_TIMEOUT; const std::string NetworkEngine::my_v {"RNG1"}; constexpr size_t NetworkEngine::MAX_REQUESTS_PER_SEC; -static constexpr uint8_t v4prefix[16] = { - 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xFF, 0xFF, 0, 0, 0, 0 -}; +static constexpr uint8_t v4prefix[16] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xFF, 0xFF, 0, 0, 0, 0}; constexpr unsigned SEND_NODES {8}; diff --git a/src/peer_discovery.cpp b/src/peer_discovery.cpp index 60fcb7b6757d34ccc711abfe7f4f016540d32996..5b45e2a3b19dd18fef6e2484520953e155066894 100644 --- a/src/peer_discovery.cpp +++ b/src/peer_discovery.cpp @@ -105,7 +105,6 @@ private: std::thread running_send_; msgpack::sbuffer sbuf_; - msgpack::sbuffer rbuf_; std::map<std::string, msgpack::sbuffer> messages_; std::map<std::string, ServiceDiscoveredCallback> callbackmap_; @@ -118,7 +117,7 @@ private: * Receive messages */ std::pair<SockAddr, Blob> recvFrom(); - + /** * Listener pack thread loop */ @@ -280,12 +279,12 @@ PeerDiscovery::DomainPeerDiscovery::recvFrom() if (nbytes < 0) { throw std::runtime_error(std::string("Error receiving packet: ") + strerror(errno)); } - + SockAddr ret {storeage_recv, sa_len}; - return {ret, Blob(recv.begin(), recv.end())}; + return {ret, Blob(recv.begin(), recv.begin()+nbytes+1)}; } -void +void PeerDiscovery::DomainPeerDiscovery::listenerpack_thread() { int stopfds_pipe[2]; @@ -353,7 +352,7 @@ PeerDiscovery::DomainPeerDiscovery::listenerpack_thread() } } -void +void PeerDiscovery::DomainPeerDiscovery::sender_setup() { // Setup sender address @@ -362,7 +361,7 @@ PeerDiscovery::DomainPeerDiscovery::sender_setup() sockAddrSend_.setPort(port_); } -void +void PeerDiscovery::DomainPeerDiscovery::startPublish(const std::string &type, const msgpack::sbuffer &pack_buf) { //Set up New Sending pack @@ -396,10 +395,10 @@ PeerDiscovery::DomainPeerDiscovery::startPublish(const std::string &type, const } } -void +void PeerDiscovery::DomainPeerDiscovery::stopDiscovery(const std::string &type) -{ - { +{ + { std::unique_lock<std::mutex> lck(dmtx_); auto it = callbackmap_.find(type); if(it != callbackmap_.end()){ @@ -408,7 +407,7 @@ PeerDiscovery::DomainPeerDiscovery::stopDiscovery(const std::string &type) } } -void +void PeerDiscovery::DomainPeerDiscovery::stopPublish(const std::string &type) { std::unique_lock<std::mutex> lck(mtx_); @@ -450,18 +449,16 @@ PeerDiscovery::DomainPeerDiscovery::messages_reload() } } -PeerDiscovery::PeerDiscovery(in_port_t port) +PeerDiscovery::PeerDiscovery(in_port_t port) { try { peerDiscovery4_.reset(new DomainPeerDiscovery(AF_INET, port)); } catch(const std::exception& e){ - peerDiscovery4_.reset(nullptr); std::cerr << "Can't start peer discovery (IPv4): " << e.what() << std::endl; } try { peerDiscovery6_.reset(new DomainPeerDiscovery(AF_INET6, port)); } catch(const std::exception& e) { - peerDiscovery6_.reset(nullptr); std::cerr << "Can't start peer discovery (IPv6): " << e.what() << std::endl; } @@ -472,7 +469,7 @@ PeerDiscovery::~PeerDiscovery(){} /** * startDiscovery - Keep Listening data from the sender until node is joinned or stop is called */ -void +void PeerDiscovery::startDiscovery(const std::string &type, ServiceDiscoveredCallback callback) { if(peerDiscovery4_) peerDiscovery4_->startDiscovery(type, callback); @@ -482,17 +479,26 @@ PeerDiscovery::startDiscovery(const std::string &type, ServiceDiscoveredCallback /** * startPublish - Keeping sending data until node is joinned or stop is called - msgpack */ -void +void PeerDiscovery::startPublish(const std::string &type, const msgpack::sbuffer &pack_buf) { if(peerDiscovery4_) peerDiscovery4_->startPublish(type, pack_buf); if(peerDiscovery6_) peerDiscovery6_->startPublish(type, pack_buf); } +void +PeerDiscovery::startPublish(sa_family_t domain, const std::string &type, const msgpack::sbuffer &pack_buf) +{ + if (domain == AF_INET && peerDiscovery4_) + peerDiscovery4_->startPublish(type, pack_buf); + else if (domain == AF_INET6 && peerDiscovery6_) + peerDiscovery6_->startPublish(type, pack_buf); +} + /** * Thread Stopper */ -void +void PeerDiscovery::stop() { if(peerDiscovery4_) peerDiscovery4_->stop(); @@ -502,7 +508,7 @@ PeerDiscovery::stop() /** * Remove possible callBack to discovery */ -void +void PeerDiscovery::stopDiscovery(const std::string &type) { if(peerDiscovery4_) peerDiscovery4_->stopDiscovery(type); @@ -512,7 +518,7 @@ PeerDiscovery::stopDiscovery(const std::string &type) /** * Remove different serivce message to send */ -void +void PeerDiscovery::stopPublish(const std::string &type) { if(peerDiscovery4_) peerDiscovery4_->stopPublish(type); @@ -522,7 +528,7 @@ PeerDiscovery::stopPublish(const std::string &type) /** * Join the threads */ -void +void PeerDiscovery::join() { if(peerDiscovery4_) peerDiscovery4_->join(); if(peerDiscovery6_) peerDiscovery6_->join();