Skip to content
Snippets Groups Projects
Commit 6c5ea125 authored by Adrien Béraud's avatar Adrien Béraud
Browse files

peerdiscovery: add automatic template serialization

parent d0362e58
No related branches found
No related tags found
No related merge requests found
...@@ -41,12 +41,26 @@ public: ...@@ -41,12 +41,26 @@ public:
*/ */
void startDiscovery(const std::string &type, ServiceDiscoveredCallback callback); void startDiscovery(const std::string &type, ServiceDiscoveredCallback callback);
template<typename T>
void startDiscovery(const std::string &type, std::function<void(T&&, SockAddr&&)> cb) {
startDiscovery(type, [cb](msgpack::object&& ob, SockAddr&& addr) {
cb(ob.as<T>(), std::move(addr));
});
}
/** /**
* startPublish - Keeping sending data until node is joinned or stop is called - msgpack * startPublish - Keeping sending data until node is joinned or stop is called - msgpack
*/ */
void startPublish(const std::string &type, const msgpack::sbuffer &pack_buf); void startPublish(const std::string &type, const msgpack::sbuffer &pack_buf);
void startPublish(sa_family_t domain, const std::string &type, const msgpack::sbuffer &pack_buf); void startPublish(sa_family_t domain, const std::string &type, const msgpack::sbuffer &pack_buf);
template<typename T>
void startPublish(const std::string &type, const T& object) {
msgpack::sbuffer buf;
msgpack::pack(buf, object);
startPublish(type, buf);
}
/** /**
* Thread Stopper * Thread Stopper
*/ */
...@@ -55,12 +69,13 @@ public: ...@@ -55,12 +69,13 @@ public:
/** /**
* Remove possible callBack to discovery * Remove possible callBack to discovery
*/ */
void stopDiscovery(const std::string &type); bool stopDiscovery(const std::string &type);
/** /**
* Remove different serivce message to send * Remove different serivce message to send
*/ */
void stopPublish(const std::string &type); bool stopPublish(const std::string &type);
bool stopPublish(sa_family_t domain, const std::string &type);
/** /**
* Configure the sockopt to be able to listen multicast group * Configure the sockopt to be able to listen multicast group
......
...@@ -66,12 +66,12 @@ public: ...@@ -66,12 +66,12 @@ public:
/** /**
* Remove possible callBack to discovery * Remove possible callBack to discovery
*/ */
void stopDiscovery(const std::string &type); bool stopDiscovery(const std::string &type);
/** /**
* Remove different serivce message to send * Remove different serivce message to send
*/ */
void stopPublish(const std::string &type); bool stopPublish(const std::string &type);
/** /**
* Configure the sockopt to be able to listen multicast group * Configure the sockopt to be able to listen multicast group
...@@ -393,27 +393,22 @@ PeerDiscovery::DomainPeerDiscovery::startPublish(const std::string &type, const ...@@ -393,27 +393,22 @@ PeerDiscovery::DomainPeerDiscovery::startPublish(const std::string &type, const
} }
} }
void bool
PeerDiscovery::DomainPeerDiscovery::stopDiscovery(const std::string &type) PeerDiscovery::DomainPeerDiscovery::stopDiscovery(const std::string &type)
{
{ {
std::unique_lock<std::mutex> lck(dmtx_); std::unique_lock<std::mutex> lck(dmtx_);
auto it = callbackmap_.find(type); return callbackmap_.erase(type) > 0;
if(it != callbackmap_.end()){
callbackmap_.erase(it);
}
}
} }
void bool
PeerDiscovery::DomainPeerDiscovery::stopPublish(const std::string &type) PeerDiscovery::DomainPeerDiscovery::stopPublish(const std::string &type)
{ {
std::unique_lock<std::mutex> lck(mtx_); std::unique_lock<std::mutex> lck(mtx_);
auto it = messages_.find(type); if (messages_.erase(type) > 0) {
if(it != messages_.end()){
messages_.erase(it);
}
messages_reload(); messages_reload();
return true;
}
return false;
} }
void void
...@@ -464,9 +459,6 @@ PeerDiscovery::PeerDiscovery(in_port_t port) ...@@ -464,9 +459,6 @@ PeerDiscovery::PeerDiscovery(in_port_t port)
PeerDiscovery::~PeerDiscovery(){} PeerDiscovery::~PeerDiscovery(){}
/**
* startDiscovery - Keep Listening data from the sender until node is joinned or stop is called
*/
void void
PeerDiscovery::startDiscovery(const std::string &type, ServiceDiscoveredCallback callback) PeerDiscovery::startDiscovery(const std::string &type, ServiceDiscoveredCallback callback)
{ {
...@@ -474,9 +466,6 @@ PeerDiscovery::startDiscovery(const std::string &type, ServiceDiscoveredCallback ...@@ -474,9 +466,6 @@ PeerDiscovery::startDiscovery(const std::string &type, ServiceDiscoveredCallback
if(peerDiscovery6_) peerDiscovery6_->startDiscovery(type, callback); if(peerDiscovery6_) peerDiscovery6_->startDiscovery(type, callback);
} }
/**
* startPublish - Keeping sending data until node is joinned or stop is called - msgpack
*/
void void
PeerDiscovery::startPublish(const std::string &type, const msgpack::sbuffer &pack_buf) PeerDiscovery::startPublish(const std::string &type, const msgpack::sbuffer &pack_buf)
{ {
...@@ -487,15 +476,13 @@ PeerDiscovery::startPublish(const std::string &type, const msgpack::sbuffer &pac ...@@ -487,15 +476,13 @@ PeerDiscovery::startPublish(const std::string &type, const msgpack::sbuffer &pac
void void
PeerDiscovery::startPublish(sa_family_t domain, const std::string &type, const msgpack::sbuffer &pack_buf) PeerDiscovery::startPublish(sa_family_t domain, const std::string &type, const msgpack::sbuffer &pack_buf)
{ {
if (domain == AF_INET && peerDiscovery4_) if (domain == AF_INET) {
peerDiscovery4_->startPublish(type, pack_buf); if (peerDiscovery4_) peerDiscovery4_->startPublish(type, pack_buf);
else if (domain == AF_INET6 && peerDiscovery6_) } else if (domain == AF_INET6) {
peerDiscovery6_->startPublish(type, pack_buf); if (peerDiscovery6_) peerDiscovery6_->startPublish(type, pack_buf);
}
} }
/**
* Thread Stopper
*/
void void
PeerDiscovery::stop() PeerDiscovery::stop()
{ {
...@@ -503,29 +490,32 @@ PeerDiscovery::stop() ...@@ -503,29 +490,32 @@ PeerDiscovery::stop()
if(peerDiscovery6_) peerDiscovery6_->stop(); if(peerDiscovery6_) peerDiscovery6_->stop();
} }
/** bool
* Remove possible callBack to discovery
*/
void
PeerDiscovery::stopDiscovery(const std::string &type) PeerDiscovery::stopDiscovery(const std::string &type)
{ {
if(peerDiscovery4_) peerDiscovery4_->stopDiscovery(type); bool stopped4 = peerDiscovery4_ and peerDiscovery4_->stopDiscovery(type);
if(peerDiscovery6_) peerDiscovery6_->stopDiscovery(type); bool stopped6 = peerDiscovery6_ and peerDiscovery6_->stopDiscovery(type);
return stopped4 or stopped6;
} }
/** bool
* Remove different serivce message to send
*/
void
PeerDiscovery::stopPublish(const std::string &type) PeerDiscovery::stopPublish(const std::string &type)
{ {
if(peerDiscovery4_) peerDiscovery4_->stopPublish(type); bool stopped4 = peerDiscovery4_ and peerDiscovery4_->stopPublish(type);
if(peerDiscovery6_) peerDiscovery6_->stopPublish(type); bool stopped6 = peerDiscovery6_ and peerDiscovery6_->stopPublish(type);
return stopped4 or stopped6;
}
bool
PeerDiscovery::stopPublish(sa_family_t domain, const std::string &type)
{
if (domain == AF_INET) {
return peerDiscovery4_ and peerDiscovery4_->stopPublish(type);
} else if (domain == AF_INET6) {
return peerDiscovery6_ and peerDiscovery6_->stopPublish(type);
}
} }
/**
* Join the threads
*/
void void
PeerDiscovery::join() { PeerDiscovery::join() {
if(peerDiscovery4_) peerDiscovery4_->join(); if(peerDiscovery4_) peerDiscovery4_->join();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment