diff --git a/include/opendht/peer_discovery.h b/include/opendht/peer_discovery.h index 14c75b7dc93164a9057e0ca1225eb1abaacad423..efecc50d317239bc63134476504d4adb59564209 100644 --- a/include/opendht/peer_discovery.h +++ b/include/opendht/peer_discovery.h @@ -84,10 +84,13 @@ public: */ private: + //dmtx_ for callbackmap_ and drunning_ (write) std::mutex dmtx_; + //mtx_ for messages_ and lrunning (listen) std::mutex mtx_; std::condition_variable cv_; - bool running_ {false}; + bool lrunning_ {false}; + bool drunning_ {false}; sa_family_t domain_ {AF_UNSPEC}; int port_; int sockfd_ {-1}; diff --git a/src/dhtrunner.cpp b/src/dhtrunner.cpp index 6940b2881210ae7718730a86fd79f095ce59bb68..4923df22af9869dbada35cc4805a9f43010b5171 100644 --- a/src/dhtrunner.cpp +++ b/src/dhtrunner.cpp @@ -967,10 +967,14 @@ DhtRunner::bootstrap(const InfoHash& id, const SockAddr& address) void DhtRunner::nodeInsertionCallback(msgpack::object&& obj, SockAddr&& add) { - auto v = obj.as<NodeInsertionPack>(); - add.setPort(v.node_port_); - if(v.nodeid_ != dht_->getNodeId() && current_node_netid_ == v.nid_){ - bootstrap(v.nodeid_, add); + try { + auto v = obj.as<NodeInsertionPack>(); + add.setPort(v.node_port_); + if(v.nodeid_ != dht_->getNodeId() && current_node_netid_ == v.nid_){ + bootstrap(v.nodeid_, add); + } + } catch(const msgpack::type_error &e){ + std::cerr << "Msgpack Info Invalid: " << e.what() << '\n'; } } diff --git a/src/peer_discovery.cpp b/src/peer_discovery.cpp index f82acfc55b26882440054e65e7c357a94f24d946..7c584108197f8769b5ca75acd6e4f376ca697047 100644 --- a/src/peer_discovery.cpp +++ b/src/peer_discovery.cpp @@ -147,13 +147,14 @@ PeerDiscovery::socketJoinMulticast(int sockfd, sa_family_t family) void PeerDiscovery::startDiscovery(const std::string &type, ServiceDiscoveredCallback callback) -{ - { - std::unique_lock<std::mutex> lck(dmtx_); - callbackmap_[type] = callback; +{ + std::unique_lock<std::mutex> lck(dmtx_); + callbackmap_[type] = callback; + if (not drunning_){ + drunning_ = true; + listener_setup(); + running_listen_ = std::thread(&PeerDiscovery::listenerpack_thread, this); } - listener_setup(); - running_listen_ = std::thread(&PeerDiscovery::listenerpack_thread, this); } SockAddr @@ -161,11 +162,12 @@ PeerDiscovery::recvFrom(size_t &buf_size) { sockaddr_storage storeage_recv; socklen_t sa_len = sizeof(storeage_recv); + char *recv = new char[1024]; ssize_t nbytes = recvfrom( sockfd_, - rbuf_.data(), - MSGPACK_SBUFFER_INIT_SIZE, + recv, + 1024, 0, (sockaddr*)&storeage_recv, &sa_len @@ -174,8 +176,10 @@ PeerDiscovery::recvFrom(size_t &buf_size) throw std::runtime_error(std::string("Error receiving packet: ") + strerror(errno)); } + rbuf_.write(recv,nbytes); buf_size = nbytes; SockAddr ret {storeage_recv, sa_len}; + delete []recv; return ret; } @@ -202,8 +206,8 @@ PeerDiscovery::listenerpack_thread() int data_coming = select(sockfd_ > stop_readfd ? sockfd_ + 1 : stop_readfd + 1, &readfds, nullptr, nullptr, nullptr); { - std::unique_lock<std::mutex> lck(mtx_); - if (not running_) + std::unique_lock<std::mutex> lck(dmtx_); + if (not drunning_) break; } @@ -232,12 +236,13 @@ PeerDiscovery::listenerpack_thread() if (o.key.type != msgpack::type::STR) continue; auto key = o.key.as<std::string>(); - std::unique_lock<std::mutex> lck(dmtx_); auto callback = callbackmap_.find(key); - if (callback != callbackmap_.end()) + if (callback != callbackmap_.end()){ callback->second(std::move(o.val), std::move(from)); + } } + rbuf_.release(); } } if (stop_readfd != -1) @@ -259,12 +264,12 @@ PeerDiscovery::sender_setup() void PeerDiscovery::startPublish(const std::string &type, const msgpack::sbuffer &pack_buf) { - sender_setup(); //Set up New Sending pack msgpack::sbuffer pack_buf_c; pack_buf_c.write(pack_buf.data(),pack_buf.size()); std::unique_lock<std::mutex> lck(mtx_); + sbuf_.release(); messages_[type] = std::move(pack_buf_c); msgpack::packer<msgpack::sbuffer> pk(&sbuf_); pk.pack_map(messages_.size()); @@ -272,11 +277,12 @@ void PeerDiscovery::startPublish(const std::string &type, const msgpack::sbuffer pk.pack(m.first); sbuf_.write(m.second.data(), m.second.size()); } - if (not running_) { - running_ = true; + if (not lrunning_) { + lrunning_ = true; + sender_setup(); running_send_ = std::thread([this](){ std::unique_lock<std::mutex> lck(mtx_); - while (running_) { + while (lrunning_) { ssize_t nbytes = sendto( sockfd_, sbuf_.data(), @@ -288,7 +294,7 @@ void PeerDiscovery::startPublish(const std::string &type, const msgpack::sbuffer if (nbytes < 0) { std::cerr << "Error sending packet: " << strerror(errno) << std::endl; } - if (cv_.wait_for(lck,std::chrono::seconds(3),[&]{ return !running_; })) + if (cv_.wait_for(lck,std::chrono::seconds(3),[&]{ return !lrunning_; })) break; } }); @@ -312,6 +318,7 @@ PeerDiscovery::stopPublish(const std::string &type) { { std::unique_lock<std::mutex> lck(mtx_); + sbuf_.release(); auto it = messages_.find(type); if(it != messages_.end()){ messages_.erase(it); @@ -330,7 +337,11 @@ PeerDiscovery::stop() { { std::unique_lock<std::mutex> lck(mtx_); - running_ = false; + lrunning_ = false; + } + { + std::unique_lock<std::mutex> lck(dmtx_); + drunning_ = false; } cv_.notify_all(); if (stop_writefd_ != -1) { diff --git a/tests/peerdiscoverytester.cpp b/tests/peerdiscoverytester.cpp index 5d6d7b853290d786180c550d6921e4f5863a757d..32ce32a79ccdbf460ef18597239ded9021b147c6 100644 --- a/tests/peerdiscoverytester.cpp +++ b/tests/peerdiscoverytester.cpp @@ -17,8 +17,8 @@ * along with this program. If not, see <https://www.gnu.org/licenses/>. */ -#include "opendht/value.h" #include "peerdiscoverytester.h" +#include "opendht/value.h" namespace test { @@ -30,6 +30,14 @@ public: MSGPACK_DEFINE(nodeid_, node_port_, nid_) }; +class TestPack{ +public: + int num; + char cha; + std::string str; + MSGPACK_DEFINE(num, cha, str) +}; + CPPUNIT_TEST_SUITE_REGISTRATION(PeerDiscoveryTester); void PeerDiscoveryTester::setUp(){} @@ -38,16 +46,25 @@ void PeerDiscoveryTester::testTransmission_ipv4(){ // Node for getnode id const std::string type {"dht"}; + const std::string test_type {"pdd"}; dht::InfoHash data_n = dht::InfoHash::get("applepin"); int port = 2222; in_port_t port_n = 50000; - dht::NetId netid = 10; + msgpack::sbuffer sbuf; NodeInsertion adc; adc.nid_ = 10; adc.node_port_ = port_n; adc.nodeid_ = data_n; msgpack::pack(sbuf,adc); + + msgpack::sbuffer pbuf; + TestPack pdd; + pdd.num = 100; + pdd.cha = 'a'; + pdd.str = "apple"; + msgpack::pack(pbuf,pdd); + try{ dht::PeerDiscovery test_n(AF_INET, port); dht::PeerDiscovery test_s(AF_INET, port); @@ -58,9 +75,19 @@ void PeerDiscoveryTester::testTransmission_ipv4(){ CPPUNIT_ASSERT_EQUAL(v.nodeid_, data_n); }); - test_n.startPublish(type, sbuf); + test_s.startDiscovery(test_type,[&](msgpack::object&& obj, dht::SockAddr&& add){ + auto v = obj.as<TestPack>(); + CPPUNIT_ASSERT_EQUAL(v.num, 100); + CPPUNIT_ASSERT_EQUAL(v.cha, 'a'); + }); + test_n.startPublish(type, sbuf); std::this_thread::sleep_for(std::chrono::seconds(5)); + test_n.startPublish(test_type, pbuf); + std::this_thread::sleep_for(std::chrono::seconds(5)); + test_n.stopPublish(test_type); + + std::this_thread::sleep_for(std::chrono::seconds(10)); test_n.stop(); test_s.stop(); test_n.join(); @@ -76,18 +103,27 @@ void PeerDiscoveryTester::testTransmission_ipv4(){ void PeerDiscoveryTester::testTransmission_ipv6(){ - // Node for getnode id + // Node for getnode id const std::string type {"dht"}; + const std::string test_type {"pdd"}; dht::InfoHash data_n = dht::InfoHash::get("applepin"); int port = 2222; in_port_t port_n = 50000; - dht::NetId netid = 10; + msgpack::sbuffer sbuf; NodeInsertion adc; adc.nid_ = 10; adc.node_port_ = port_n; adc.nodeid_ = data_n; msgpack::pack(sbuf,adc); + + msgpack::sbuffer pbuf; + TestPack pdd; + pdd.num = 100; + pdd.cha = 'a'; + pdd.str = "apple"; + msgpack::pack(pbuf,pdd); + try{ dht::PeerDiscovery test_n(AF_INET6, port); dht::PeerDiscovery test_s(AF_INET6, port); @@ -98,9 +134,19 @@ void PeerDiscoveryTester::testTransmission_ipv6(){ CPPUNIT_ASSERT_EQUAL(v.nodeid_, data_n); }); - test_n.startPublish(type, sbuf); + test_s.startDiscovery(test_type,[&](msgpack::object&& obj, dht::SockAddr&& add){ + auto v = obj.as<TestPack>(); + CPPUNIT_ASSERT_EQUAL(v.num, 100); + CPPUNIT_ASSERT_EQUAL(v.cha, 'a'); + }); + test_n.startPublish(type, sbuf); std::this_thread::sleep_for(std::chrono::seconds(5)); + test_n.startPublish(test_type, pbuf); + std::this_thread::sleep_for(std::chrono::seconds(5)); + test_n.stopPublish(test_type); + + std::this_thread::sleep_for(std::chrono::seconds(10)); test_n.stop(); test_s.stop(); test_n.join();