Select Git revision
dhtrunner.cpp
-
Adrien Béraud authoredAdrien Béraud authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
dhtrunner.cpp 35.25 KiB
/*
* Copyright (C) 2014-2020 Savoir-faire Linux Inc.
* Authors: Adrien Béraud <adrien.beraud@savoirfairelinux.com>
* Simon Désaulniers <simon.desaulniers@savoirfairelinux.com>
* Sébastien Blin <sebastien.blin@savoirfairelinux.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include "dhtrunner.h"
#include "securedht.h"
#include "network_utils.h"
#ifdef OPENDHT_PEER_DISCOVERY
#include "peer_discovery.h"
#endif
#ifdef OPENDHT_PROXY_CLIENT
#include "dht_proxy_client.h"
#endif
#include <fstream>
namespace dht {
constexpr std::chrono::seconds DhtRunner::BOOTSTRAP_PERIOD;
static const std::string PEER_DISCOVERY_DHT_SERVICE = "dht";
struct DhtRunner::Listener {
size_t tokenClassicDht {0};
size_t tokenProxyDht {0};
ValueCallback gcb;
InfoHash hash {};
Value::Filter f;
Where w;
};
struct NodeInsertionPack {
dht::InfoHash nodeId;
in_port_t port;
dht::NetId net;
MSGPACK_DEFINE(nodeId, port, net)
};
DhtRunner::DhtRunner() : dht_()
#ifdef OPENDHT_PROXY_CLIENT
, dht_via_proxy_()
#endif //OPENDHT_PROXY_CLIENT
{
#ifdef _WIN32
WSADATA wsd;
if (WSAStartup(MAKEWORD(2,2), &wsd) != 0)
throw DhtException("Can't initialize Winsock2");
#endif
}
DhtRunner::~DhtRunner()
{
join();
#ifdef _WIN32
WSACleanup();
#endif
}
void
DhtRunner::run(in_port_t port, Config& config, Context&& context)
{
config.bind4.setFamily(AF_INET);
config.bind4.setPort(port);
config.bind6.setFamily(AF_INET6);
config.bind6.setPort(port);
run(config, std::move(context));
}
void
DhtRunner::run(const char* ip4, const char* ip6, const char* service, Config& config, Context&& context)
{
auto res4 = SockAddr::resolve(ip4, service);
auto res6 = SockAddr::resolve(ip6, service);
if (res4.empty())
res4.emplace_back();
if (res6.empty())
res6.emplace_back();
config.bind4 = std::move(res4.front());
config.bind6 = std::move(res6.front());
run(config, std::move(context));
}
void
DhtRunner::run(const Config& config, Context&& context)
{
std::lock_guard<std::mutex> lck(dht_mtx);
auto expected = State::Idle;
if (not running.compare_exchange_strong(expected, State::Running)) {
if (context.logger)
context.logger->w("[runner %p] Node is already running. Call join() first before calling run() again.");
return;
}
auto local4 = config.bind4;
auto local6 = config.bind6;
if (not local4 and not local6) {
if (context.logger)
context.logger->w("[runner %p] No address to bind specified in the configuration, using default addresses");
local4.setFamily(AF_INET);
local6.setFamily(AF_INET6);
}
auto state_path = config.dht_config.node_config.persist_path;
if (not state_path.empty() && (local4.getPort() == 0 || local6.getPort() == 0)) {
state_path += "_port.txt";
std::ifstream inConfig(state_path);
if (inConfig.is_open()) {
in_port_t port;
if (inConfig >> port) {
if (local4.getPort() == 0) {
if (context.logger)
context.logger->d("[runner %p] Using IPv4 port %hu from saved configuration", this, port);
local4.setPort(port);
}
}
if (inConfig >> port) {
if (local6.getPort() == 0) {
if (context.logger)
context.logger->d("[runner %p] Using IPv6 port %hu from saved configuration", this, port);
local6.setPort(port);
}
}
}
}
if (not context.sock)
context.sock.reset(new net::UdpSocket(local4, local6, context.logger));
if (not state_path.empty()) {
std::ofstream outConfig(state_path);
outConfig << context.sock->getBoundRef(AF_INET).getPort() << std::endl;
outConfig << context.sock->getBoundRef(AF_INET6).getPort() << std::endl;
}
if (context.logger) {
logger_ = context.logger;
logger_->d("[runner %p] state changed to Running", this);
}
context.sock->setOnReceive([&] (net::PacketList&& pkts) {
net::PacketList ret;
{
std::lock_guard<std::mutex> lck(sock_mtx);
auto maxSize = net::RX_QUEUE_MAX_SIZE - pkts.size();
while (rcv.size() > maxSize) {
if (logger_)
logger_->e("Dropping packet: queue is full!");
rcv.pop_front();
}
rcv.splice(rcv.end(), std::move(pkts));
ret = std::move(rcv_free);
}
cv.notify_all();
return ret;
});
#ifdef OPENDHT_PROXY_CLIENT
config_ = config;
identityAnnouncedCb_ = context.identityAnnouncedCb;
#endif
auto dht = std::unique_ptr<DhtInterface>(new Dht(std::move(context.sock), SecureDht::getConfig(config.dht_config), context.logger));
dht_ = std::unique_ptr<SecureDht>(new SecureDht(std::move(dht), config.dht_config, std::move(context.identityAnnouncedCb), context.logger));
enableProxy(not config.proxy_server.empty());
if (context.logger and dht_via_proxy_) {
dht_via_proxy_->setLogger(context.logger);
}
if (context.statusChangedCallback) {
statusCb = std::move(context.statusChangedCallback);
}
if (context.certificateStore) {
dht_->setLocalCertificateStore(std::move(context.certificateStore));
if (dht_via_proxy_)
dht_via_proxy_->setLocalCertificateStore(std::move(context.certificateStore));
}
if (not config.threaded)
return;
dht_thread = std::thread([this]() {
while (running != State::Idle) {
std::unique_lock<std::mutex> lk(dht_mtx);
time_point wakeup = loop_();
auto hasJobToDo = [this]() {
if (running == State::Idle)
return true;
{
std::lock_guard<std::mutex> lck(sock_mtx);
if (not rcv.empty())
return true;
}
{
std::lock_guard<std::mutex> lck(storage_mtx);
if (not pending_ops_prio.empty())
return true;
auto s = getStatus();
if (not pending_ops.empty() and (s == NodeStatus::Connected or s == NodeStatus::Disconnected))
return true;
}
return false;
};
if (wakeup == time_point::max())
cv.wait(lk, hasJobToDo);
else
cv.wait_until(lk, wakeup, hasJobToDo);
}
});
if (config.peer_discovery or config.peer_publish) {
#ifdef OPENDHT_PEER_DISCOVERY
peerDiscovery_ = context.peerDiscovery ?
std::move(context.peerDiscovery) :
std::make_shared<PeerDiscovery>();
#else
std::cerr << "Peer discovery requested but OpenDHT built without peer discovery support." << std::endl;
#endif
}
#ifdef OPENDHT_PEER_DISCOVERY
auto netId = config.dht_config.node_config.network;
if (config.peer_discovery) {
peerDiscovery_->startDiscovery<NodeInsertionPack>(PEER_DISCOVERY_DHT_SERVICE, [this, netId](NodeInsertionPack&& v, SockAddr&& addr){
addr.setPort(v.port);
if (v.nodeId != dht_->getNodeId() && netId == v.net){
bootstrap(v.nodeId, addr);
}
});
}
if (config.peer_publish) {
msgpack::sbuffer sbuf_node;
NodeInsertionPack adc;
adc.net = netId;
adc.nodeId = dht_->getNodeId();
// IPv4
if (const auto& bound4 = dht_->getSocket()->getBoundRef(AF_INET)) {
adc.port = bound4.getPort();
msgpack::pack(sbuf_node, adc);
peerDiscovery_->startPublish(AF_INET, PEER_DISCOVERY_DHT_SERVICE, sbuf_node);
}
// IPv6
if (const auto& bound6 = dht_->getSocket()->getBoundRef(AF_INET6)) {
adc.port = bound6.getPort();
sbuf_node.clear();
msgpack::pack(sbuf_node, adc);
peerDiscovery_->startPublish(AF_INET6, PEER_DISCOVERY_DHT_SERVICE, sbuf_node);
}
}
#endif
}
void
DhtRunner::shutdown(ShutdownCallback cb) {
auto expected = State::Running;
if (not running.compare_exchange_strong(expected, State::Stopping)) {
if (expected == State::Stopping and ongoing_ops) {
std::lock_guard<std::mutex> lck(storage_mtx);
shutdownCallbacks_.emplace_back(std::move(cb));
}
else if (cb) cb();
return;
}
if (logger_)
logger_->d("[runner %p] state changed to Stopping, %zu ongoing ops", this, ongoing_ops.load());
std::lock_guard<std::mutex> lck(storage_mtx);
ongoing_ops++;
shutdownCallbacks_.emplace_back(std::move(cb));
pending_ops_prio.emplace([=](SecureDht&) mutable {
auto onShutdown = [this]{ opEnded(); };
#ifdef OPENDHT_PROXY_CLIENT
if (dht_via_proxy_)
dht_via_proxy_->shutdown(onShutdown);
#endif
if (dht_)
dht_->shutdown(onShutdown);
});
cv.notify_all();
}
void
DhtRunner::opEnded() {
if (--ongoing_ops == 0)
checkShutdown();
}
DoneCallback
DhtRunner::bindOpDoneCallback(DoneCallback&& cb) {
return [this, cb = std::move(cb)](bool ok, const std::vector<std::shared_ptr<Node>>& nodes){
if (cb) cb(ok, nodes);
opEnded();
};
}
DoneCallbackSimple
DhtRunner::bindOpDoneCallback(DoneCallbackSimple&& cb) {
return [this, cb = std::move(cb)](bool ok){
if (cb) cb(ok);
opEnded();
};
}
bool
DhtRunner::checkShutdown() {
if (running != State::Stopping or ongoing_ops)
return false;
decltype(shutdownCallbacks_) cbs;
{
std::lock_guard<std::mutex> lck(storage_mtx);
cbs = std::move(shutdownCallbacks_);
}
for (auto& cb : cbs)
if (cb) cb();
return true;
}
void
DhtRunner::join()
{
{
std::lock_guard<std::mutex> lck(dht_mtx);
if (running.exchange(State::Idle) == State::Idle)
return;
cv.notify_all();
#ifdef OPENDHT_PEER_DISCOVERY
if (peerDiscovery_)
peerDiscovery_->stop();
#endif
if (dht_)
if (auto sock = dht_->getSocket())
sock->stop();
if (logger_)
logger_->d("[runner %p] state changed to Idle", this);
}
if (dht_thread.joinable())
dht_thread.join();
{
std::lock_guard<std::mutex> lck(storage_mtx);
pending_ops = decltype(pending_ops)();
pending_ops_prio = decltype(pending_ops_prio)();
ongoing_ops = 0;
}
{
std::lock_guard<std::mutex> lck(dht_mtx);
resetDht();
status4 = NodeStatus::Disconnected;
status6 = NodeStatus::Disconnected;
}
}
SockAddr
DhtRunner::getBound(sa_family_t af) const {
std::lock_guard<std::mutex> lck(dht_mtx);
if (dht_)
if (auto sock = dht_->getSocket())
return sock->getBound(af);
return SockAddr{};
}
in_port_t
DhtRunner::getBoundPort(sa_family_t af) const {
std::lock_guard<std::mutex> lck(dht_mtx);
if (dht_)
if (auto sock = dht_->getSocket())
return sock->getPort(af);
return 0;
}
void
DhtRunner::dumpTables() const
{
std::lock_guard<std::mutex> lck(dht_mtx);
activeDht()->dumpTables();
}
InfoHash
DhtRunner::getId() const
{
if (auto dht = activeDht())
return dht->getId();
return {};
}
InfoHash
DhtRunner::getNodeId() const
{
if (auto dht = activeDht())
return dht->getNodeId();
return {};
}
std::pair<size_t, size_t>
DhtRunner::getStoreSize() const {
std::lock_guard<std::mutex> lck(dht_mtx);
if (!dht_)
return {};
return dht_->getStoreSize();
}
void
DhtRunner::setStorageLimit(size_t limit) {
std::lock_guard<std::mutex> lck(dht_mtx);
if (!dht_)
throw std::runtime_error("dht is not running");
return dht_->setStorageLimit(limit);
}
std::vector<NodeExport>
DhtRunner::exportNodes() const {
std::lock_guard<std::mutex> lck(dht_mtx);
if (!dht_)
return {};
return dht_->exportNodes();
}
std::vector<ValuesExport>
DhtRunner::exportValues() const {
std::lock_guard<std::mutex> lck(dht_mtx);
if (!dht_)
return {};
return dht_->exportValues();
}
void
DhtRunner::setLogger(const Sp<Logger>& logger) {
std::lock_guard<std::mutex> lck(dht_mtx);
logger_ = logger;
if (dht_)
dht_->setLogger(logger);
#ifdef OPENDHT_PROXY_CLIENT
if (dht_via_proxy_)
dht_via_proxy_->setLogger(logger);
#endif
}
void
DhtRunner::setLoggers(LogMethod error, LogMethod warn, LogMethod debug) {
Logger logger {std::move(error), std::move(warn), std::move(debug)};
setLogger(logger);
}
void
DhtRunner::setLogFilter(const InfoHash& f) {
std::lock_guard<std::mutex> lck(dht_mtx);
if (dht_)
dht_->setLogFilter(f);
#ifdef OPENDHT_PROXY_CLIENT
if (dht_via_proxy_)
dht_via_proxy_->setLogFilter(f);
#endif
}
void
DhtRunner::registerType(const ValueType& type) {
std::lock_guard<std::mutex> lck(dht_mtx);
activeDht()->registerType(type);
}
void
DhtRunner::importValues(const std::vector<ValuesExport>& values) {
std::lock_guard<std::mutex> lck(dht_mtx);
dht_->importValues(values);
}
unsigned
DhtRunner::getNodesStats(sa_family_t af, unsigned *good_return, unsigned *dubious_return, unsigned *cached_return, unsigned *incoming_return) const
{
std::lock_guard<std::mutex> lck(dht_mtx);
const auto stats = activeDht()->getNodesStats(af);
if (good_return)
*good_return = stats.good_nodes;
if (dubious_return)
*dubious_return = stats.dubious_nodes;
if (cached_return)
*cached_return = stats.cached_nodes;
if (incoming_return)
*incoming_return = stats.incoming_nodes;
return stats.good_nodes + stats.dubious_nodes;
}
NodeStats
DhtRunner::getNodesStats(sa_family_t af) const
{
std::lock_guard<std::mutex> lck(dht_mtx);
return activeDht()->getNodesStats(af);
}
NodeInfo
DhtRunner::getNodeInfo() const {
std::lock_guard<std::mutex> lck(dht_mtx);
NodeInfo info {};
if (auto dht = activeDht()) {
info.id = dht->getId();
info.node_id = dht->getNodeId();
info.ipv4 = dht->getNodesStats(AF_INET);
info.ipv6 = dht->getNodesStats(AF_INET6);
if (auto sock = dht->getSocket()) {
info.bound4 = sock->getBoundRef(AF_INET).getPort();
info.bound6 = sock->getBoundRef(AF_INET6).getPort();
}
}
info.ongoing_ops = ongoing_ops;
return info;
}
void
DhtRunner::getNodeInfo(std::function<void(std::shared_ptr<NodeInfo>)> cb)
{
std::lock_guard<std::mutex> lck(storage_mtx);
ongoing_ops++;
pending_ops_prio.emplace([cb = std::move(cb), this](SecureDht& dht){
auto sinfo = std::make_shared<NodeInfo>();
auto& info = *sinfo;
info.id = dht.getId();
info.node_id = dht.getNodeId();
info.ipv4 = dht.getNodesStats(AF_INET);
info.ipv6 = dht.getNodesStats(AF_INET6);
if (auto sock = dht.getSocket()) {
info.bound4 = sock->getBoundRef(AF_INET).getPort();
info.bound6 = sock->getBoundRef(AF_INET6).getPort();
}
info.ongoing_ops = ongoing_ops;
cb(std::move(sinfo));
opEnded();
});
cv.notify_all();
}
std::vector<unsigned>
DhtRunner::getNodeMessageStats(bool in) const
{
std::lock_guard<std::mutex> lck(dht_mtx);
return activeDht()->getNodeMessageStats(in);
}
std::string
DhtRunner::getStorageLog() const
{
std::lock_guard<std::mutex> lck(dht_mtx);
return activeDht()->getStorageLog();
}
std::string
DhtRunner::getStorageLog(const InfoHash& f) const
{
std::lock_guard<std::mutex> lck(dht_mtx);
return activeDht()->getStorageLog(f);
}
std::string
DhtRunner::getRoutingTablesLog(sa_family_t af) const
{
std::lock_guard<std::mutex> lck(dht_mtx);
return activeDht()->getRoutingTablesLog(af);
}
std::string
DhtRunner::getSearchesLog(sa_family_t af) const
{
std::lock_guard<std::mutex> lck(dht_mtx);
return activeDht()->getSearchesLog(af);
}
std::string
DhtRunner::getSearchLog(const InfoHash& f, sa_family_t af) const
{
std::lock_guard<std::mutex> lck(dht_mtx);
return activeDht()->getSearchLog(f, af);
}
std::vector<SockAddr>
DhtRunner::getPublicAddress(sa_family_t af)
{
std::lock_guard<std::mutex> lck(dht_mtx);
if (auto dht = activeDht())
return dht->getPublicAddress(af);
return {};
}
std::vector<std::string>
DhtRunner::getPublicAddressStr(sa_family_t af)
{
auto addrs = getPublicAddress(af);
std::vector<std::string> ret(addrs.size());
std::transform(addrs.begin(), addrs.end(), ret.begin(), [](const SockAddr& a) { return a.toString(); });
return ret;
}
void
DhtRunner::getPublicAddress(std::function<void(std::vector<SockAddr>&&)> cb, sa_family_t af)
{
std::lock_guard<std::mutex> lck(storage_mtx);
ongoing_ops++;
pending_ops_prio.emplace([cb = std::move(cb), this, af](SecureDht& dht){
cb(dht.getPublicAddress(af));
opEnded();
});
cv.notify_all();
}
void
DhtRunner::registerCertificate(std::shared_ptr<crypto::Certificate> cert) {
std::lock_guard<std::mutex> lck(dht_mtx);
activeDht()->registerCertificate(cert);
}
void
DhtRunner::setLocalCertificateStore(CertificateStoreQuery&& query_method) {
std::lock_guard<std::mutex> lck(dht_mtx);
#ifdef OPENDHT_PROXY_CLIENT
if (dht_via_proxy_)
dht_via_proxy_->setLocalCertificateStore(std::forward<CertificateStoreQuery>(query_method));
#endif
if (dht_)
dht_->setLocalCertificateStore(std::forward<CertificateStoreQuery>(query_method));
}
time_point
DhtRunner::loop_()
{
auto dht = activeDht();
if (not dht)
return {};
decltype(pending_ops) ops {};
{
std::lock_guard<std::mutex> lck(storage_mtx);
auto s = getStatus();
ops = (pending_ops_prio.empty() && (s == NodeStatus::Connected or s == NodeStatus::Disconnected)) ?
std::move(pending_ops) : std::move(pending_ops_prio);
}
while (not ops.empty()) {
ops.front()(*dht);
ops.pop();
}
time_point wakeup {};
decltype(rcv) received {};
decltype(rcv) received_treated {};
{
std::lock_guard<std::mutex> lck(sock_mtx);
// move to stack
received = std::move(rcv);
}
// Discard old packets
size_t dropped {0};
if (not received.empty()) {
auto limit = clock::now() - net::RX_QUEUE_MAX_DELAY;
auto it = received.begin();
while (it != received.end() and it->received < limit) {
it->data.clear();
++it;
dropped++;
}
received_treated.splice(received_treated.end(), received, received.begin(), it);
}
// Handle packets
if (not received.empty()) {
for (auto& pkt : received) {
auto now = clock::now();
if (now - pkt.received > net::RX_QUEUE_MAX_DELAY)
dropped++;
else
wakeup = dht->periodic(pkt.data.data(), pkt.data.size(), std::move(pkt.from), now);
pkt.data.clear();
}
received_treated.splice(received_treated.end(), std::move(received));
} else {
// Or just run the scheduler
wakeup = dht->periodic(nullptr, 0, nullptr, 0, clock::now());
}
if (not received_treated.empty()) {
std::lock_guard<std::mutex> lck(sock_mtx);
if (rcv_free.size() < net::RX_QUEUE_MAX_SIZE)
rcv_free.splice(rcv_free.end(), std::move(received_treated));
}
if (dropped)
std::cerr << "Dropped " << dropped << " packets with high delay" << std::endl;
NodeStatus nstatus4 = dht->updateStatus(AF_INET);
NodeStatus nstatus6 = dht->updateStatus(AF_INET6);
if (nstatus4 != status4 || nstatus6 != status6) {
status4 = nstatus4;
status6 = nstatus6;
if (statusCb)
statusCb(status4, status6);
}
return wakeup;
}
void
DhtRunner::get(InfoHash hash, GetCallback vcb, DoneCallback dcb, Value::Filter f, Where w)
{
if (running != State::Running) {
if (dcb) dcb(false, {});
return;
}
std::lock_guard<std::mutex> lck(storage_mtx);
ongoing_ops++;
pending_ops.emplace([=](SecureDht& dht) mutable {
dht.get(hash, std::move(vcb), bindOpDoneCallback(std::move(dcb)), std::move(f), std::move(w));
});
cv.notify_all();
}
void
DhtRunner::get(const std::string& key, GetCallback vcb, DoneCallbackSimple dcb, Value::Filter f, Where w)
{
get(InfoHash::get(key), std::move(vcb), std::move(dcb), std::move(f), std::move(w));
}
void
DhtRunner::query(const InfoHash& hash, QueryCallback cb, DoneCallback done_cb, Query q) {
if (running != State::Running) {
if (done_cb) done_cb(false, {});
return;
}
std::lock_guard<std::mutex> lck(storage_mtx);
ongoing_ops++;
pending_ops.emplace([=](SecureDht& dht) mutable {
dht.query(hash, std::move(cb), bindOpDoneCallback(std::move(done_cb)), std::move(q));
});
cv.notify_all();
}
std::future<size_t>
DhtRunner::listen(InfoHash hash, ValueCallback vcb, Value::Filter f, Where w)
{
auto ret_token = std::make_shared<std::promise<size_t>>();
if (running != State::Running) {
ret_token->set_value(0);
return ret_token->get_future();
}
std::lock_guard<std::mutex> lck(storage_mtx);
pending_ops.emplace([=](SecureDht& dht) mutable {
#ifdef OPENDHT_PROXY_CLIENT
auto tokenbGlobal = listener_token_++;
auto& listener = listeners_[tokenbGlobal];
listener.hash = hash;
listener.f = std::move(f);
listener.w = std::move(w);
listener.gcb = [hash,vcb,tokenbGlobal,this](const std::vector<Sp<Value>>& vals, bool expired) {
if (not vcb(vals, expired)) {
cancelListen(hash, tokenbGlobal);
return false;
}
return true;
};
if (auto token = dht.listen(hash, listener.gcb, listener.f, listener.w)) {
if (use_proxy) listener.tokenProxyDht = token;
else listener.tokenClassicDht = token;
}
ret_token->set_value(tokenbGlobal);
#else
ret_token->set_value(dht.listen(hash, std::move(vcb), std::move(f), std::move(w)));
#endif
});
cv.notify_all();
return ret_token->get_future();
}
std::future<size_t>
DhtRunner::listen(const std::string& key, GetCallback vcb, Value::Filter f, Where w)
{
return listen(InfoHash::get(key), std::move(vcb), std::move(f), std::move(w));
}
void
DhtRunner::cancelListen(InfoHash h, size_t token)
{
std::lock_guard<std::mutex> lck(storage_mtx);
#ifdef OPENDHT_PROXY_CLIENT
pending_ops.emplace([=](SecureDht&) {
auto it = listeners_.find(token);
if (it == listeners_.end()) return;
if (it->second.tokenClassicDht)
dht_->cancelListen(h, it->second.tokenClassicDht);
if (it->second.tokenProxyDht and dht_via_proxy_)
dht_via_proxy_->cancelListen(h, it->second.tokenProxyDht);
listeners_.erase(it);
});
#else
pending_ops.emplace([=](SecureDht& dht) {
dht.cancelListen(h, token);
});
#endif // OPENDHT_PROXY_CLIENT
cv.notify_all();
}
void
DhtRunner::cancelListen(InfoHash h, std::shared_future<size_t> ftoken)
{
std::lock_guard<std::mutex> lck(storage_mtx);
#ifdef OPENDHT_PROXY_CLIENT
pending_ops.emplace([=](SecureDht&) {
auto it = listeners_.find(ftoken.get());
if (it == listeners_.end()) return;
if (it->second.tokenClassicDht)
dht_->cancelListen(h, it->second.tokenClassicDht);
if (it->second.tokenProxyDht and dht_via_proxy_)
dht_via_proxy_->cancelListen(h, it->second.tokenProxyDht);
listeners_.erase(it);
});
#else
pending_ops.emplace([=](SecureDht& dht) {
dht.cancelListen(h, ftoken.get());
});
#endif // OPENDHT_PROXY_CLIENT
cv.notify_all();
}
void
DhtRunner::put(InfoHash hash, Value&& value, DoneCallback cb, time_point created, bool permanent)
{
if (running != State::Running) {
if (cb) cb(false, {});
return;
}
std::lock_guard<std::mutex> lck(storage_mtx);
ongoing_ops++;
pending_ops.emplace([=,
cb = std::move(cb),
sv = std::make_shared<Value>(std::move(value))
] (SecureDht& dht) mutable {
dht.put(hash, sv, bindOpDoneCallback(std::move(cb)), created, permanent);
});
cv.notify_all();
}
void
DhtRunner::put(InfoHash hash, std::shared_ptr<Value> value, DoneCallback cb, time_point created, bool permanent)
{
if (running != State::Running) {
if (cb) cb(false, {});
return;
}
std::lock_guard<std::mutex> lck(storage_mtx);
ongoing_ops++;
pending_ops.emplace([=, cb = std::move(cb)](SecureDht& dht) mutable {
dht.put(hash, value, bindOpDoneCallback(std::move(cb)), created, permanent);
});
cv.notify_all();
}
void
DhtRunner::put(const std::string& key, Value&& value, DoneCallbackSimple cb, time_point created, bool permanent)
{
put(InfoHash::get(key), std::forward<Value>(value), std::move(cb), created, permanent);
}
void
DhtRunner::cancelPut(const InfoHash& h, Value::Id id)
{
std::lock_guard<std::mutex> lck(storage_mtx);
pending_ops.emplace([=](SecureDht& dht) {
dht.cancelPut(h, id);
});
cv.notify_all();
}
void
DhtRunner::cancelPut(const InfoHash& h, const std::shared_ptr<Value>& value)
{
std::lock_guard<std::mutex> lck(storage_mtx);
pending_ops.emplace([=](SecureDht& dht) {
dht.cancelPut(h, value->id);
});
cv.notify_all();
}
void
DhtRunner::putSigned(InfoHash hash, std::shared_ptr<Value> value, DoneCallback cb, bool permanent)
{
if (running != State::Running) {
if (cb) cb(false, {});
return;
}
std::lock_guard<std::mutex> lck(storage_mtx);
ongoing_ops++;
pending_ops.emplace([=,
cb = std::move(cb),
value = std::move(value)
](SecureDht& dht) mutable {
dht.putSigned(hash, value, bindOpDoneCallback(std::move(cb)), permanent);
});
cv.notify_all();
}
void
DhtRunner::putSigned(InfoHash hash, Value&& value, DoneCallback cb, bool permanent)
{
putSigned(hash, std::make_shared<Value>(std::move(value)), std::move(cb), permanent);
}
void
DhtRunner::putSigned(const std::string& key, Value&& value, DoneCallbackSimple cb, bool permanent)
{
putSigned(InfoHash::get(key), std::forward<Value>(value), std::move(cb), permanent);
}
void
DhtRunner::putEncrypted(InfoHash hash, InfoHash to, std::shared_ptr<Value> value, DoneCallback cb, bool permanent)
{
if (running != State::Running) {
if (cb) cb(false, {});
return;
}
std::lock_guard<std::mutex> lck(storage_mtx);
ongoing_ops++;
pending_ops.emplace([=,
cb = std::move(cb),
value = std::move(value)
] (SecureDht& dht) mutable {
dht.putEncrypted(hash, to, value, bindOpDoneCallback(std::move(cb)), permanent);
});
cv.notify_all();
}
void
DhtRunner::putEncrypted(InfoHash hash, InfoHash to, Value&& value, DoneCallback cb, bool permanent)
{
putEncrypted(hash, to, std::make_shared<Value>(std::move(value)), std::move(cb), permanent);
}
void
DhtRunner::putEncrypted(const std::string& key, InfoHash to, Value&& value, DoneCallback cb, bool permanent)
{
putEncrypted(InfoHash::get(key), to, std::forward<Value>(value), std::move(cb), permanent);
}
void
DhtRunner::bootstrap(const std::string& host, const std::string& service)
{
std::lock_guard<std::mutex> lck(storage_mtx);
pending_ops_prio.emplace([host, service] (SecureDht& dht) mutable {
dht.addBootstrap(host, service);
});
cv.notify_all();
}
void
DhtRunner::bootstrap(const std::string& hostService)
{
std::lock_guard<std::mutex> lck(storage_mtx);
pending_ops_prio.emplace([host_service = splitPort(hostService)] (SecureDht& dht) mutable {
dht.addBootstrap(host_service.first, host_service.second);
});
cv.notify_all();
}
void
DhtRunner::clearBootstrap()
{
std::lock_guard<std::mutex> lck(storage_mtx);
pending_ops_prio.emplace([] (SecureDht& dht) mutable {
dht.clearBootstrap();
});
cv.notify_all();
}
void
DhtRunner::bootstrap(std::vector<SockAddr> nodes, DoneCallbackSimple&& cb)
{
if (running != State::Running) {
cb(false);
return;
}
std::lock_guard<std::mutex> lck(storage_mtx);
ongoing_ops++;
pending_ops_prio.emplace([
cb = bindOpDoneCallback(std::move(cb)),
nodes = std::move(nodes)
] (SecureDht& dht) mutable {
auto rem = cb ? std::make_shared<std::pair<size_t, bool>>(nodes.size(), false) : nullptr;
for (auto& node : nodes) {
if (node.getPort() == 0)
node.setPort(net::DHT_DEFAULT_PORT);
dht.pingNode(std::move(node), [rem,cb](bool ok) {
auto& r = *rem;
r.first--;
r.second |= ok;
if (r.first == 0) {
cb(r.second);
}
});
}
});
cv.notify_all();
}
void
DhtRunner::bootstrap(const SockAddr& addr, DoneCallbackSimple&& cb)
{
if (running != State::Running) {
if (cb) cb(false);
return;
}
std::lock_guard<std::mutex> lck(storage_mtx);
ongoing_ops++;
pending_ops_prio.emplace([addr, cb = bindOpDoneCallback(std::move(cb))](SecureDht& dht) mutable {
dht.pingNode(std::move(addr), std::move(cb));
});
cv.notify_all();
}
void
DhtRunner::bootstrap(const InfoHash& id, const SockAddr& address)
{
if (running != State::Running)
return;
std::unique_lock<std::mutex> lck(storage_mtx);
pending_ops_prio.emplace([id, address](SecureDht& dht) mutable {
dht.insertNode(id, address);
});
cv.notify_all();
}
void
DhtRunner::bootstrap(const std::vector<NodeExport>& nodes)
{
if (running != State::Running)
return;
std::lock_guard<std::mutex> lck(storage_mtx);
pending_ops_prio.emplace([=](SecureDht& dht) {
for (auto& node : nodes)
dht.insertNode(node);
});
cv.notify_all();
}
void
DhtRunner::connectivityChanged()
{
std::lock_guard<std::mutex> lck(storage_mtx);
pending_ops_prio.emplace([=](SecureDht& dht) {
dht.connectivityChanged();
#ifdef OPENDHT_PEER_DISCOVERY
if (peerDiscovery_)
peerDiscovery_->connectivityChanged();
#endif
});
cv.notify_all();
}
void
DhtRunner::findCertificate(InfoHash hash, std::function<void(const Sp<crypto::Certificate>&)> cb) {
if (running != State::Running) {
cb({});
return;
}
std::lock_guard<std::mutex> lck(storage_mtx);
ongoing_ops++;
pending_ops.emplace([this, hash, cb = std::move(cb)] (SecureDht& dht) {
dht.findCertificate(hash, [this, cb = std::move(cb)](const Sp<crypto::Certificate>& crt){
cb(crt);
opEnded();
});
});
cv.notify_all();
}
void
DhtRunner::resetDht()
{
peerDiscovery_.reset();
#ifdef OPENDHT_PROXY_CLIENT
listeners_.clear();
dht_via_proxy_.reset();
#endif // OPENDHT_PROXY_CLIENT
dht_.reset();
}
SecureDht*
DhtRunner::activeDht() const
{
#ifdef OPENDHT_PROXY_CLIENT
return use_proxy? dht_via_proxy_.get() : dht_.get();
#else
return dht_.get();
#endif // OPENDHT_PROXY_CLIENT
}
void
DhtRunner::setProxyServer(const std::string& proxy, const std::string& pushNodeId)
{
#ifdef OPENDHT_PROXY_CLIENT
std::lock_guard<std::mutex> lck(dht_mtx);
if (config_.proxy_server == proxy and config_.push_node_id == pushNodeId)
return;
config_.proxy_server = proxy;
config_.push_node_id = pushNodeId;
enableProxy(use_proxy and not config_.proxy_server.empty());
#else
if (not proxy.empty())
std::cerr << "DHT proxy requested but OpenDHT built without proxy support." << std::endl;
#endif
}
void
DhtRunner::enableProxy(bool proxify)
{
#ifdef OPENDHT_PROXY_CLIENT
if (dht_via_proxy_) {
dht_via_proxy_->shutdown({});
}
if (proxify) {
// Init the proxy client
auto dht_via_proxy = std::unique_ptr<DhtInterface>(
new DhtProxyClient(
config_.server_ca,
config_.client_identity,
[this]{
if (config_.threaded) {
{
std::lock_guard<std::mutex> lck(storage_mtx);
pending_ops_prio.emplace([=](SecureDht&) mutable {});
}
cv.notify_all();
}
},
config_.proxy_server, config_.push_node_id, logger_)
);
#ifdef OPENDHT_PUSH_NOTIFICATIONS
if (not config_.push_token.empty())
dht_via_proxy->setPushNotificationToken(config_.push_token);
#endif
dht_via_proxy_ = std::unique_ptr<SecureDht>(new SecureDht(std::move(dht_via_proxy), config_.dht_config, identityAnnouncedCb_, logger_));
// add current listeners
for (auto& l: listeners_)
l.second.tokenProxyDht = dht_via_proxy_->listen(l.second.hash, l.second.gcb, l.second.f, l.second.w);
// and use it
use_proxy = proxify;
} else {
use_proxy = proxify;
std::lock_guard<std::mutex> lck(storage_mtx);
if (not listeners_.empty()) {
pending_ops.emplace([this](SecureDht& /*dht*/) mutable {
if (not dht_)
return;
for (auto& l : listeners_) {
if (not l.second.tokenClassicDht) {
l.second.tokenClassicDht = dht_->listen(l.second.hash, l.second.gcb, l.second.f, l.second.w);
}
}
});
}
}
#else
if (proxify)
std::cerr << "DHT proxy requested but OpenDHT built without proxy support." << std::endl;
#endif
}
void
DhtRunner::forwardAllMessages(bool forward)
{
std::lock_guard<std::mutex> lck(dht_mtx);
#ifdef OPENDHT_PROXY_SERVER
#ifdef OPENDHT_PROXY_CLIENT
if (dht_via_proxy_)
dht_via_proxy_->forwardAllMessages(forward);
#endif // OPENDHT_PROXY_CLIENT
if (dht_)
dht_->forwardAllMessages(forward);
#else
(void) forward;
#endif // OPENDHT_PROXY_SERVER
}
/**
* Updates the push notification device token
*/
void
DhtRunner::setPushNotificationToken(const std::string& token) {
std::lock_guard<std::mutex> lck(dht_mtx);
#if defined(OPENDHT_PROXY_CLIENT) && defined(OPENDHT_PUSH_NOTIFICATIONS)
config_.push_token = token;
if (dht_via_proxy_)
dht_via_proxy_->setPushNotificationToken(token);
#else
(void) token;
#endif
}
void
DhtRunner::pushNotificationReceived(const std::map<std::string, std::string>& data)
{
#if defined(OPENDHT_PROXY_CLIENT) && defined(OPENDHT_PUSH_NOTIFICATIONS)
{
std::lock_guard<std::mutex> lck(storage_mtx);
pending_ops_prio.emplace([=](SecureDht&) {
if (dht_via_proxy_)
dht_via_proxy_->pushNotificationReceived(data);
});
}
cv.notify_all();
#else
(void) data;
#endif
}
}