diff --git a/include/opendht/dhtrunner.h b/include/opendht/dhtrunner.h index ecf016802320628e54bb08fd72912ba223775cd4..34677c16fa56fd15552a32c92e8e20e7107ceba5 100644 --- a/include/opendht/dhtrunner.h +++ b/include/opendht/dhtrunner.h @@ -42,6 +42,13 @@ struct Node; class SecureDht; class PeerDiscovery; struct SecureDhtConfig; +class OPENDHT_PUBLIC NodeInsertionPack{ +public: + dht::InfoHash nodeid_; + in_port_t node_port_; + dht::NetId nid_; + MSGPACK_DEFINE(nodeid_, node_port_, nid_) +}; /** * Provides a thread-safe interface to run the (secure) DHT. @@ -268,6 +275,16 @@ public: */ void bootstrap(const InfoHash& id, const SockAddr& address); + /** + * Insert known nodes to the routing table by using the received msgpack + */ + void nodeInsertionCallback(std::string& type, msgpack::object&& sbuf, SockAddr& add); + + /** + * Fill up the callback map for Peerdiscovery + */ + void callbackmapFill(); + /** * Clear the list of bootstrap added using bootstrap(const std::string&, const std::string&). */ @@ -541,6 +558,9 @@ private: /** PeerDiscovery Parameters */ std::unique_ptr<PeerDiscovery> peerDiscovery4_; std::unique_ptr<PeerDiscovery> peerDiscovery6_; + NetId current_node_netid_; + std::map<std::string,std::function<void(std::string&, msgpack::object&&, SockAddr&)>> callbackmap_; + }; } diff --git a/include/opendht/peer_discovery.h b/include/opendht/peer_discovery.h index 98bb349d80286f111e644b9c176eabed499ccc7d..dde0953aca37272f276a6b30e763ba06440e2157 100644 --- a/include/opendht/peer_discovery.h +++ b/include/opendht/peer_discovery.h @@ -32,20 +32,19 @@ class OPENDHT_PUBLIC PeerDiscovery { public: - using PeerDiscoveredCallback = std::function<void(const InfoHash&, const SockAddr&)>; - + using PeerDiscoveredPackCallback = std::function<void(std::string&, msgpack::object&&, SockAddr&)>; PeerDiscovery(sa_family_t domain, in_port_t port); ~PeerDiscovery(); /** * startDiscovery - Keep Listening data from the sender until node is joinned or stop is called */ - void startDiscovery(PeerDiscoveredCallback callback); + void startDiscovery(PeerDiscoveredPackCallback callback); /** - * startPublish - Keeping sending data until node is joinned or stop is called + * startPublish - Keeping sending data until node is joinned or stop is called - msgpack */ - void startPublish(const dht::InfoHash &nodeId, in_port_t port_to_send); + void startPublish(const std::string &type, msgpack::sbuffer && pack_buf, const dht::InfoHash &nodeId); /** * Thread Stopper @@ -61,8 +60,8 @@ public: * Join the threads */ void join() { - if(running_listen.joinable()) running_listen.join(); - if(running_send.joinable()) running_send.join(); + if(running_listen_.joinable()) running_listen_.join(); + if(running_send_.joinable()) running_send_.join(); } private: @@ -75,37 +74,39 @@ private: int stop_writefd_ {-1}; SockAddr sockAddrSend_; - std::array<uint8_t,dht::InfoHash::size() + sizeof(in_port_t)> data_send_; //Thread export to be joined - std::thread running_listen; - std::thread running_send; + std::thread running_listen_; + std::thread running_send_; dht::InfoHash nodeId_; + msgpack::sbuffer sbuf_; + msgpack::sbuffer rbuf_; + /** * Multicast Socket Initialization, accept IPV4, IPV6 */ static int initialize_socket(sa_family_t domain); /** - * Send messages + * Send pack messages */ - void sendTo(uint8_t *buf,size_t buf_size); + void sendTo(); /** * Receive messages */ - SockAddr recvFrom(uint8_t *buf, size_t &buf_size); + SockAddr recvFrom(size_t &buf_size); /** - * Send thread loop + * Send pack thread loop */ - void sender_thread(); + void senderpack_thread(); /** - * Listener thread loop + * Listener pack thread loop */ - void listener_thread(PeerDiscoveredCallback callback); + void listenerpack_thread(PeerDiscoveredPackCallback callback); /** * Listener Parameters Setup @@ -115,7 +116,7 @@ private: /** * Sender Parameters Setup */ - void sender_setup(const dht::InfoHash& nodeId, in_port_t port_to_send); + void sender_setup(const std::string &type, msgpack::sbuffer && pack_buf, const dht::InfoHash &nodeId); }; } diff --git a/src/dhtrunner.cpp b/src/dhtrunner.cpp index 23e2782ccad816ea30edf1ee3fd736f6ed9466fd..3832e92075fad6bf405d7350b257a7a67a4c00bb 100644 --- a/src/dhtrunner.cpp +++ b/src/dhtrunner.cpp @@ -53,6 +53,7 @@ DhtRunner::DhtRunner() : dht_() , dht_via_proxy_() #endif //OPENDHT_PROXY_CLIENT { + callbackmapFill(); #ifdef _WIN32 WSADATA wsd; if (WSAStartup(MAKEWORD(2,2), &wsd) != 0) @@ -154,21 +155,28 @@ DhtRunner::run(const SockAddr& local4, const SockAddr& local6, const DhtRunner:: std::cerr << "Can't start peer discovery (IPv6): " << e.what() << std::endl; } } + + current_node_netid_ = config.dht_config.node_config.network; + NodeInsertionPack adc; + adc.nid_ = current_node_netid_; + adc.node_port_ = getBoundPort(); + adc.nodeid_ = getNodeId(); + msgpack::sbuffer sbuf_node_v4; + msgpack::sbuffer sbuf_node_v6; + msgpack::pack(sbuf_node_v4, adc); + msgpack::pack(sbuf_node_v6, adc); + if (config.peer_discovery) { - using sig = void (DhtRunner::*)(const InfoHash&, const SockAddr&); if (peerDiscovery4_) - peerDiscovery4_->startDiscovery(std::bind(static_cast<sig>(&DhtRunner::bootstrap), this, - std::placeholders::_1,std::placeholders::_2)); - + peerDiscovery4_->startDiscovery(callbackmap_["dht"]); if (peerDiscovery6_) - peerDiscovery6_->startDiscovery(std::bind(static_cast<sig>(&DhtRunner::bootstrap), this, - std::placeholders::_1,std::placeholders::_2)); + peerDiscovery6_->startDiscovery(callbackmap_["dht"]); } if (config.peer_publish) { if (peerDiscovery4_) - peerDiscovery4_->startPublish(dht_->getNodeId(), getBoundPort(AF_INET)); + peerDiscovery4_->startPublish("dht", std::move(sbuf_node_v4), dht_->getNodeId()); if (peerDiscovery6_) - peerDiscovery6_->startPublish(dht_->getNodeId(), getBoundPort(AF_INET6)); + peerDiscovery6_->startPublish("dht", std::move(sbuf_node_v6), dht_->getNodeId()); } } @@ -949,6 +957,25 @@ DhtRunner::bootstrap(const InfoHash& id, const SockAddr& address) cv.notify_all(); } +void +DhtRunner::nodeInsertionCallback(std::string& type, msgpack::object&& obj, SockAddr& add) +{ + if(type == "dht"){ + auto v = obj.as<NodeInsertionPack>(); + add.setPort(v.node_port_); + if(v.nodeid_ != getNodeId() && current_node_netid_ == v.nid_){ + bootstrap(v.nodeid_, add); + } + } +} + +void +DhtRunner::callbackmapFill() +{ + callbackmap_["dht"] = std::bind(&DhtRunner::nodeInsertionCallback, this, + std::placeholders::_1,std::placeholders::_2,std::placeholders::_3); +} + void DhtRunner::bootstrap(const std::vector<NodeExport>& nodes) { diff --git a/src/peer_discovery.cpp b/src/peer_discovery.cpp index f59375b8057b9122ebd0b193ca081d0813c2b3c7..03970b90c5911e8105c1602a1fc69920725848be 100644 --- a/src/peer_discovery.cpp +++ b/src/peer_discovery.cpp @@ -146,31 +146,22 @@ PeerDiscovery::socketJoinMulticast(int sockfd, sa_family_t family) } void -PeerDiscovery::sendTo(uint8_t *buf, size_t buf_size) +PeerDiscovery::startDiscovery(PeerDiscoveredPackCallback callback) { - ssize_t nbytes = sendto( - sockfd_, - buf, - buf_size, - 0, - sockAddrSend_.get(), - sockAddrSend_.getLength() - ); - if (nbytes < 0) { - throw std::runtime_error(std::string("Error sending packet: ") + strerror(errno)); - } + listener_setup(); + running_listen_ = std::thread(&PeerDiscovery::listenerpack_thread, this, callback); } SockAddr -PeerDiscovery::recvFrom(uint8_t *buf, size_t& buf_size) +PeerDiscovery::recvFrom(size_t &buf_size) { sockaddr_storage storeage_recv; socklen_t sa_len = sizeof(storeage_recv); ssize_t nbytes = recvfrom( sockfd_, - buf, - buf_size, + rbuf_.data(), + MSGPACK_SBUFFER_INIT_SIZE, 0, (sockaddr*)&storeage_recv, &sa_len @@ -179,45 +170,13 @@ PeerDiscovery::recvFrom(uint8_t *buf, size_t& buf_size) throw std::runtime_error(std::string("Error receiving packet: ") + strerror(errno)); } - buf_size = nbytes; + buf_size = nbytes; SockAddr ret {storeage_recv, sa_len}; return ret; } -void -PeerDiscovery::sender_setup(const dht::InfoHash& nodeId, in_port_t port_to_send) -{ - nodeId_ = nodeId; - // Setup sender address - sockAddrSend_.setFamily(domain_); - sockAddrSend_.setAddress(domain_ == AF_INET ? MULTICAST_ADDRESS_IPV4 : MULTICAST_ADDRESS_IPV6); - sockAddrSend_.setPort(port_); - - // Setup sent data - std::copy_n(nodeId.cbegin(), nodeId.size(), data_send_.begin()); - auto portAddr = reinterpret_cast<in_port_t*>(data_send_.data() + dht::InfoHash::size()); - *portAddr = htons(port_to_send); -} - -void -PeerDiscovery::sender_thread() -{ - while(true) { - try { - sendTo(data_send_.data(), data_send_.size()); - } catch (const std::exception& e) { - std::cerr << e.what() << std::endl; - } - { - std::unique_lock<std::mutex> lck(mtx_); - if (cv_.wait_for(lck,std::chrono::seconds(3),[&]{ return !running_; })) - break; - } - } -} - -void -PeerDiscovery::listener_thread(PeerDiscoveredCallback callback) +void +PeerDiscovery::listenerpack_thread(PeerDiscoveredPackCallback callback) { int stopfds_pipe[2]; #ifndef _WIN32 @@ -257,23 +216,20 @@ PeerDiscovery::listener_thread(PeerDiscoveredCallback callback) recv(stop_readfd, buf.data(), buf.size(), 0); } - std::array<uint8_t,dht::InfoHash::size() + sizeof(in_port_t)> data_receive; - size_t data_receive_size = data_receive.size(); - auto from = recvFrom(data_receive.data(), data_receive_size); + size_t recv_buf_size {0}; + auto from = recvFrom(recv_buf_size); + msgpack::object_handle oh = msgpack::unpack(rbuf_.data(), recv_buf_size); + msgpack::object obj = oh.get(); - // Data_receive_size as a value-result member will hlep to filter packs - if(data_receive_size != data_receive.size()){ - // std::cerr << "Received invalid peer discovery packet" << std::endl; + if (obj.type != msgpack::type::MAP) continue; - } + for (unsigned i = 0; i < obj.via.map.size; i++) { + auto& o = obj.via.map.ptr[i]; + if (o.key.type != msgpack::type::STR) + continue; + auto key = o.key.as<std::string>(); - dht::InfoHash nodeId; - std::copy_n(data_receive.begin(), dht::InfoHash::size(), nodeId.begin()); - auto portAddr = reinterpret_cast<in_port_t*>(data_receive.data() + dht::InfoHash::size()); - auto port = ntohs(*portAddr); - if (nodeId != nodeId_){ - from.setPort(port); - callback(nodeId, from); + callback(key,std::move(o.val),from); } } } @@ -285,18 +241,62 @@ PeerDiscovery::listener_thread(PeerDiscoveredCallback callback) } } -void -PeerDiscovery::startDiscovery(PeerDiscoveredCallback callback) +void +PeerDiscovery::sender_setup(const std::string &type, msgpack::sbuffer && pack_buf, const dht::InfoHash &nodeId) { - listener_setup(); - running_listen = std::thread(&PeerDiscovery::listener_thread, this, callback); + nodeId_ = nodeId; + // Setup sender address + sockAddrSend_.setFamily(domain_); + sockAddrSend_.setAddress(domain_ == AF_INET ? MULTICAST_ADDRESS_IPV4 : MULTICAST_ADDRESS_IPV6); + sockAddrSend_.setPort(port_); + + //Set up Sending pack + std::map<std::string, msgpack::sbuffer> messages; + messages[type] = std::move(pack_buf); + msgpack::packer<msgpack::sbuffer> pk(&sbuf_); + pk.pack_map(messages.size()); + for (const auto& m : messages) { + pk.pack(m.first); + sbuf_.write(m.second.data(), m.second.size()); + } } -void -PeerDiscovery::startPublish(const dht::InfoHash& nodeId, in_port_t port_to_send) +void PeerDiscovery::startPublish(const std::string &type, msgpack::sbuffer && pack_buf, const dht::InfoHash &nodeId) +{ + sender_setup(type,std::move(pack_buf),nodeId); + running_send_ = std::thread(&PeerDiscovery::senderpack_thread, this); +} + +void +PeerDiscovery::senderpack_thread() +{ + while(true) { + try { + sendTo(); + } catch (const std::exception& e) { + std::cerr << e.what() << std::endl; + } + { + std::unique_lock<std::mutex> lck(mtx_); + if (cv_.wait_for(lck,std::chrono::seconds(3),[&]{ return !running_; })) + break; + } + } +} + +void PeerDiscovery::sendTo() { - sender_setup(nodeId, port_to_send); - running_send = std::thread(&PeerDiscovery::sender_thread, this); + ssize_t nbytes = sendto( + sockfd_, + sbuf_.data(), + sbuf_.size(), + 0, + sockAddrSend_.get(), + sockAddrSend_.getLength() + ); + if (nbytes < 0) { + throw std::runtime_error(std::string("Error sending packet: ") + strerror(errno)); + } } void