diff --git a/src/peer_discovery.cpp b/src/peer_discovery.cpp index fef091414052b0bc75306f8411f65e0f509f3890..0f9d246864133874cc61f11be85f1ba10b59f2ab 100644 --- a/src/peer_discovery.cpp +++ b/src/peer_discovery.cpp @@ -136,6 +136,9 @@ private: * Sender Parameters Setup */ void messages_reload(); + + void stopDiscovery(); + void stopPublish(); }; PeerDiscovery::DomainPeerDiscovery::DomainPeerDiscovery(sa_family_t domain, in_port_t port) @@ -146,6 +149,8 @@ PeerDiscovery::DomainPeerDiscovery::DomainPeerDiscovery(sa_family_t domain, in_p PeerDiscovery::DomainPeerDiscovery::~DomainPeerDiscovery() { + stop(); + join(); if (sockfd_ != -1) close(sockfd_); @@ -254,7 +259,9 @@ PeerDiscovery::DomainPeerDiscovery::startDiscovery(const std::string &type, Serv { std::unique_lock<std::mutex> lck(dmtx_); callbackmap_[type] = callback; - if (not drunning_){ + if (not drunning_) { + if (running_listen_.joinable()) + running_listen_.join(); drunning_ = true; listener_setup(); running_listen_ = std::thread(&DomainPeerDiscovery::listenerpack_thread, this); @@ -331,11 +338,15 @@ PeerDiscovery::DomainPeerDiscovery::listenerpack_thread() if (o.key.type != msgpack::type::STR) continue; auto key = o.key.as<std::string>(); - std::lock_guard<std::mutex> lck(dmtx_); - auto callback = callbackmap_.find(key); - if (callback != callbackmap_.end()){ - callback->second(std::move(o.val), std::move(rcv.first)); + ServiceDiscoveredCallback cb; + { + std::lock_guard<std::mutex> lck(dmtx_); + auto callback = callbackmap_.find(key); + if (callback != callbackmap_.end()) + cb = callback->second; } + if (cb) + cb(std::move(o.val), std::move(rcv.first)); } } catch (const std::exception& e) { std::cerr << "Error receiving packet: " << e.what() << std::endl; @@ -370,6 +381,8 @@ PeerDiscovery::DomainPeerDiscovery::startPublish(const std::string &type, const messages_[type] = std::move(pack_buf_c); messages_reload(); if (not lrunning_) { + if (running_send_.joinable()) + running_send_.join(); lrunning_ = true; sender_setup(); running_send_ = std::thread([this](){ @@ -396,33 +409,33 @@ PeerDiscovery::DomainPeerDiscovery::startPublish(const std::string &type, const bool PeerDiscovery::DomainPeerDiscovery::stopDiscovery(const std::string &type) { - std::unique_lock<std::mutex> lck(dmtx_); - return callbackmap_.erase(type) > 0; + std::lock_guard<std::mutex> lck(dmtx_); + if (callbackmap_.erase(type) > 0) { + if (callbackmap_.empty()) + stopDiscovery(); + return true; + } + return false; } bool PeerDiscovery::DomainPeerDiscovery::stopPublish(const std::string &type) { - std::unique_lock<std::mutex> lck(mtx_); + std::lock_guard<std::mutex> lck(mtx_); if (messages_.erase(type) > 0) { - messages_reload(); + if (messages_.empty()) + stopPublish(); + else + messages_reload(); return true; } return false; } void -PeerDiscovery::DomainPeerDiscovery::stop() +PeerDiscovery::DomainPeerDiscovery::stopDiscovery() { - { - std::unique_lock<std::mutex> lck(mtx_); - lrunning_ = false; - } - { - std::unique_lock<std::mutex> lck(dmtx_); - drunning_ = false; - } - cv_.notify_all(); + drunning_ = false; if (stop_writefd_ != -1) { if (write(stop_writefd_, "\0", 1) == -1) { perror("write"); @@ -430,6 +443,26 @@ PeerDiscovery::DomainPeerDiscovery::stop() } } +void +PeerDiscovery::DomainPeerDiscovery::stopPublish() +{ + lrunning_ = false; + cv_.notify_all(); +} + +void +PeerDiscovery::DomainPeerDiscovery::stop() +{ + { + std::unique_lock<std::mutex> lck(dmtx_); + stopDiscovery(); + } + { + std::unique_lock<std::mutex> lck(mtx_); + stopPublish(); + } +} + void PeerDiscovery::DomainPeerDiscovery::messages_reload() {