Skip to content
Snippets Groups Projects
Commit 38141322 authored by Adrien Béraud's avatar Adrien Béraud
Browse files

peerdiscovery: stop threads when stopping services

parent 6c5ea125
Branches
Tags
No related merge requests found
...@@ -136,6 +136,9 @@ private: ...@@ -136,6 +136,9 @@ private:
* Sender Parameters Setup * Sender Parameters Setup
*/ */
void messages_reload(); void messages_reload();
void stopDiscovery();
void stopPublish();
}; };
PeerDiscovery::DomainPeerDiscovery::DomainPeerDiscovery(sa_family_t domain, in_port_t port) PeerDiscovery::DomainPeerDiscovery::DomainPeerDiscovery(sa_family_t domain, in_port_t port)
...@@ -146,6 +149,8 @@ PeerDiscovery::DomainPeerDiscovery::DomainPeerDiscovery(sa_family_t domain, in_p ...@@ -146,6 +149,8 @@ PeerDiscovery::DomainPeerDiscovery::DomainPeerDiscovery(sa_family_t domain, in_p
PeerDiscovery::DomainPeerDiscovery::~DomainPeerDiscovery() PeerDiscovery::DomainPeerDiscovery::~DomainPeerDiscovery()
{ {
stop();
join();
if (sockfd_ != -1) if (sockfd_ != -1)
close(sockfd_); close(sockfd_);
...@@ -255,6 +260,8 @@ PeerDiscovery::DomainPeerDiscovery::startDiscovery(const std::string &type, Serv ...@@ -255,6 +260,8 @@ PeerDiscovery::DomainPeerDiscovery::startDiscovery(const std::string &type, Serv
std::unique_lock<std::mutex> lck(dmtx_); std::unique_lock<std::mutex> lck(dmtx_);
callbackmap_[type] = callback; callbackmap_[type] = callback;
if (not drunning_) { if (not drunning_) {
if (running_listen_.joinable())
running_listen_.join();
drunning_ = true; drunning_ = true;
listener_setup(); listener_setup();
running_listen_ = std::thread(&DomainPeerDiscovery::listenerpack_thread, this); running_listen_ = std::thread(&DomainPeerDiscovery::listenerpack_thread, this);
...@@ -331,11 +338,15 @@ PeerDiscovery::DomainPeerDiscovery::listenerpack_thread() ...@@ -331,11 +338,15 @@ PeerDiscovery::DomainPeerDiscovery::listenerpack_thread()
if (o.key.type != msgpack::type::STR) if (o.key.type != msgpack::type::STR)
continue; continue;
auto key = o.key.as<std::string>(); auto key = o.key.as<std::string>();
ServiceDiscoveredCallback cb;
{
std::lock_guard<std::mutex> lck(dmtx_); std::lock_guard<std::mutex> lck(dmtx_);
auto callback = callbackmap_.find(key); auto callback = callbackmap_.find(key);
if (callback != callbackmap_.end()){ if (callback != callbackmap_.end())
callback->second(std::move(o.val), std::move(rcv.first)); cb = callback->second;
} }
if (cb)
cb(std::move(o.val), std::move(rcv.first));
} }
} catch (const std::exception& e) { } catch (const std::exception& e) {
std::cerr << "Error receiving packet: " << e.what() << std::endl; std::cerr << "Error receiving packet: " << e.what() << std::endl;
...@@ -370,6 +381,8 @@ PeerDiscovery::DomainPeerDiscovery::startPublish(const std::string &type, const ...@@ -370,6 +381,8 @@ PeerDiscovery::DomainPeerDiscovery::startPublish(const std::string &type, const
messages_[type] = std::move(pack_buf_c); messages_[type] = std::move(pack_buf_c);
messages_reload(); messages_reload();
if (not lrunning_) { if (not lrunning_) {
if (running_send_.joinable())
running_send_.join();
lrunning_ = true; lrunning_ = true;
sender_setup(); sender_setup();
running_send_ = std::thread([this](){ running_send_ = std::thread([this](){
...@@ -396,15 +409,23 @@ PeerDiscovery::DomainPeerDiscovery::startPublish(const std::string &type, const ...@@ -396,15 +409,23 @@ PeerDiscovery::DomainPeerDiscovery::startPublish(const std::string &type, const
bool bool
PeerDiscovery::DomainPeerDiscovery::stopDiscovery(const std::string &type) PeerDiscovery::DomainPeerDiscovery::stopDiscovery(const std::string &type)
{ {
std::unique_lock<std::mutex> lck(dmtx_); std::lock_guard<std::mutex> lck(dmtx_);
return callbackmap_.erase(type) > 0; if (callbackmap_.erase(type) > 0) {
if (callbackmap_.empty())
stopDiscovery();
return true;
}
return false;
} }
bool bool
PeerDiscovery::DomainPeerDiscovery::stopPublish(const std::string &type) PeerDiscovery::DomainPeerDiscovery::stopPublish(const std::string &type)
{ {
std::unique_lock<std::mutex> lck(mtx_); std::lock_guard<std::mutex> lck(mtx_);
if (messages_.erase(type) > 0) { if (messages_.erase(type) > 0) {
if (messages_.empty())
stopPublish();
else
messages_reload(); messages_reload();
return true; return true;
} }
...@@ -412,21 +433,33 @@ PeerDiscovery::DomainPeerDiscovery::stopPublish(const std::string &type) ...@@ -412,21 +433,33 @@ PeerDiscovery::DomainPeerDiscovery::stopPublish(const std::string &type)
} }
void void
PeerDiscovery::DomainPeerDiscovery::stop() PeerDiscovery::DomainPeerDiscovery::stopDiscovery()
{ {
drunning_ = false;
if (stop_writefd_ != -1) {
if (write(stop_writefd_, "\0", 1) == -1) {
perror("write");
}
}
}
void
PeerDiscovery::DomainPeerDiscovery::stopPublish()
{ {
std::unique_lock<std::mutex> lck(mtx_);
lrunning_ = false; lrunning_ = false;
cv_.notify_all();
} }
void
PeerDiscovery::DomainPeerDiscovery::stop()
{
{ {
std::unique_lock<std::mutex> lck(dmtx_); std::unique_lock<std::mutex> lck(dmtx_);
drunning_ = false; stopDiscovery();
}
cv_.notify_all();
if (stop_writefd_ != -1) {
if (write(stop_writefd_, "\0", 1) == -1) {
perror("write");
} }
{
std::unique_lock<std::mutex> lck(mtx_);
stopPublish();
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment