Skip to content
Snippets Groups Projects
Commit 9487bc5e authored by M P's avatar M P Committed by Adrien Béraud
Browse files

implement connectivity change in peer discovery

this makes peer discovery pub/sub on connectivity change, it aims to
make peer discovery more energy efficient.

also, stop echo'ing.

TODO:
in its current implementation however, peer_discovery operates in the
dark; it doesn't know routes available to it (v4 vs v6); nor does it
know of the state of the link(s) (UP vs. DOWN) and therefore it merely
tries to, on connectivity change, (re)pub/sub on previous ip version
routes.

IDEA:
open an interface to kernel so that peer discovery, and perhaps others,
can learn about the routes available to them.

don't leave

add query() and direct publish(endpoint)

check if peer discovery was request (run and compile time)

add try/catch routine to DomainPeerDiscovery's constructor
parent 10d48137
No related branches found
No related tags found
No related merge requests found
......@@ -82,6 +82,8 @@ public:
bool stopPublish(const std::string &type);
bool stopPublish(sa_family_t domain, const std::string &type);
void connectivityChanged();
private:
class DomainPeerDiscovery;
std::unique_ptr<DomainPeerDiscovery> peerDiscovery4_;
......
......@@ -1032,6 +1032,10 @@ DhtRunner::connectivityChanged()
std::lock_guard<std::mutex> lck(storage_mtx);
pending_ops_prio.emplace([=](SecureDht& dht) {
dht.connectivityChanged();
#ifdef OPENDHT_PEER_DISCOVERY
if (peerDiscovery_)
peerDiscovery_->connectivityChanged();
#endif
});
cv.notify_all();
}
......
......@@ -24,8 +24,6 @@
#include <asio.hpp>
using namespace std::chrono_literals;
namespace dht {
// Organization-local Scope multicast
......@@ -46,6 +44,8 @@ public:
bool stopDiscovery(const std::string &type);
bool stopPublish(const std::string &type);
void connectivityChanged();
private:
Sp<Logger> logger_;
//dmtx_ for callbackmap_ and drunning_ (write)
......@@ -55,7 +55,6 @@ private:
std::shared_ptr<asio::io_context> ioContext_;
asio::ip::udp::socket sockFd_;
asio::ip::udp::endpoint sockAddrSend_;
asio::steady_timer publishTimer_;
std::array<char, 64 * 1024> receiveBuf_;
asio::ip::udp::endpoint receiveFrom_;
......@@ -67,11 +66,15 @@ private:
bool drunning_ {false};
void loopListener();
void query(const asio::ip::udp::endpoint peer);
void reloadMessages();
void stopDiscovery();
void stopPublish();
void publish();
void publish(const asio::ip::udp::endpoint peer);
void reDiscover();
};
PeerDiscovery::DomainPeerDiscovery::DomainPeerDiscovery(asio::ip::udp domain, in_port_t port, Sp<asio::io_context> ioContext, Sp<Logger> logger)
......@@ -80,11 +83,16 @@ PeerDiscovery::DomainPeerDiscovery::DomainPeerDiscovery(asio::ip::udp domain, in
, sockFd_(*ioContext_, domain)
, sockAddrSend_(asio::ip::address::from_string(domain.family() == AF_INET ? MULTICAST_ADDRESS_IPV4
: MULTICAST_ADDRESS_IPV6), port)
, publishTimer_(*ioContext)
{
try {
sockFd_.set_option(asio::ip::multicast::join_group(sockAddrSend_.address()));
sockFd_.set_option(asio::ip::udp::socket::reuse_address(true));
sockFd_.bind({domain, port});
} catch (const std::exception& e) {
if (logger_)
logger_->e("Can't start peer discovery for %s: %s",
domain.family() == AF_INET ? "IPv4" : "IPv6", e.what());
}
}
PeerDiscovery::DomainPeerDiscovery::~DomainPeerDiscovery()
......@@ -100,7 +108,10 @@ PeerDiscovery::DomainPeerDiscovery::startDiscovery(const std::string &type, Serv
callbackmap_[type] = callback;
if (not drunning_) {
drunning_ = true;
ioContext_->post(std::bind(&PeerDiscovery::DomainPeerDiscovery::loopListener, this));
ioContext_->post([this] () {
loopListener();
query(sockAddrSend_);
});
}
}
......@@ -121,8 +132,11 @@ PeerDiscovery::DomainPeerDiscovery::loopListener()
auto rcv = msgpack::unpack(receiveBuf_.data(), bytes);
msgpack::object obj = rcv.get();
if (obj.type != msgpack::type::MAP)
throw msgpack::type_error{};
if (obj.type == msgpack::type::STR) {
auto s = obj.as<std::string>();
if (!strcmp(s.c_str(), "q"))
publish(receiveFrom_);
} else if (obj.type == msgpack::type::MAP) {
for (unsigned i = 0; i < obj.via.map.size; i++) {
auto& o = obj.via.map.ptr[i];
if (o.key.type != msgpack::type::STR)
......@@ -141,6 +155,9 @@ PeerDiscovery::DomainPeerDiscovery::loopListener()
if (cb)
cb(std::move(o.val), SockAddr{ receiveFrom_.data(), (socklen_t)receiveFrom_.size() });
}
} else {
throw msgpack::type_error{};
}
} catch (const std::exception& e) {
if (logger_)
logger_->e("Error receiving packet: %s", e.what());
......@@ -150,21 +167,45 @@ PeerDiscovery::DomainPeerDiscovery::loopListener()
}
void
PeerDiscovery::DomainPeerDiscovery::publish()
PeerDiscovery::DomainPeerDiscovery::query(const asio::ip::udp::endpoint peer)
{
std::lock_guard<std::mutex> lck(mtx_);
if (lrunning_) {
sockFd_.async_send_to(asio::buffer(sbuf_.data(), sbuf_.size()), sockAddrSend_, [logger=logger_](const asio::error_code& ec, size_t){
if (ec and ec != asio::error::operation_aborted) {
if (logger)
logger->w("Error sending packet: %s", ec.message().c_str());
if (not lrunning_)
return;
msgpack::sbuffer pbuf_request;
msgpack::pack(pbuf_request, "q");
sockFd_.async_send_to(asio::buffer(pbuf_request.data(), pbuf_request.size()), peer,
[logger=logger_, peer] (const asio::error_code& ec, size_t)
{
if (ec and (ec != asio::error::operation_aborted) and logger)
logger->w("Error sending packet to: %s with err: %s",
peer.address().to_string().c_str(),
ec.message().c_str());
}
});
publishTimer_.expires_after(3s);
publishTimer_.async_wait(std::bind(&PeerDiscovery::DomainPeerDiscovery::publish, this));
);
}
void
PeerDiscovery::DomainPeerDiscovery::publish(const asio::ip::udp::endpoint peer)
{
std::lock_guard<std::mutex> lck(mtx_);
if (not lrunning_)
return;
sockFd_.async_send_to(asio::buffer(sbuf_.data(), sbuf_.size()), peer,
[logger=logger_, peer] (const asio::error_code& ec, size_t)
{
if (ec and (ec != asio::error::operation_aborted) and logger)
logger->w("Error sending packet to: %s with err: %s",
peer.address().to_string().c_str(),
ec.message().c_str());
}
);
}
void
PeerDiscovery::DomainPeerDiscovery::startPublish(const std::string &type, const msgpack::sbuffer &pack_buf)
{
......@@ -174,10 +215,8 @@ PeerDiscovery::DomainPeerDiscovery::startPublish(const std::string &type, const
std::lock_guard<std::mutex> lck(mtx_);
messages_[type] = std::move(pack_buf_c);
reloadMessages();
if (not lrunning_) {
lrunning_ = true;
ioContext_->post(std::bind(&PeerDiscovery::DomainPeerDiscovery::publish, this));
}
ioContext_->post([this] () { publish(sockAddrSend_); });
}
bool
......@@ -216,10 +255,7 @@ PeerDiscovery::DomainPeerDiscovery::stopDiscovery()
void
PeerDiscovery::DomainPeerDiscovery::stopPublish()
{
if (lrunning_) {
lrunning_ = false;
publishTimer_.cancel();
}
}
void
......@@ -247,6 +283,25 @@ PeerDiscovery::DomainPeerDiscovery::reloadMessages()
}
}
void
PeerDiscovery::DomainPeerDiscovery::reDiscover()
{
asio::error_code ec;
sockFd_.set_option(asio::ip::multicast::join_group(sockAddrSend_.address()), ec);
if (ec and logger_)
logger_->w("can't multicast on %s: %s",
sockAddrSend_.address().to_string().c_str(),
ec.message().c_str());
}
void
PeerDiscovery::DomainPeerDiscovery::connectivityChanged()
{
reDiscover();
publish(sockAddrSend_);
}
PeerDiscovery::PeerDiscovery(in_port_t port, Sp<asio::io_context> ioContext, Sp<Logger> logger)
{
if (not ioContext) {
......@@ -348,4 +403,13 @@ PeerDiscovery::stopPublish(sa_family_t domain, const std::string& type)
return false;
}
void
PeerDiscovery::connectivityChanged()
{
if (peerDiscovery4_)
peerDiscovery4_->connectivityChanged();
if (peerDiscovery6_)
peerDiscovery6_->connectivityChanged();
}
} /* namespace dht */
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment