diff --git a/include/opendht/dht.h b/include/opendht/dht.h index bd45eefa1dd74ce9fcb5988260a959c5ab74c263..e812a57634be321fb90dd2e290167a213814272c 100644 --- a/include/opendht/dht.h +++ b/include/opendht/dht.h @@ -68,7 +68,11 @@ public: * Initialise the Dht with two open sockets (for IPv4 and IP6) * and an ID for the node. */ - Dht(std::unique_ptr<net::DatagramSocket>&& sock, const Config& config, const Logger& l = {}); + Dht(std::unique_ptr<net::DatagramSocket>&& sock, const Config& config, const Sp<Logger>& l = {}); + + Dht(std::unique_ptr<net::DatagramSocket>&& sock, const Config& config, const Logger& l = {}) + : Dht(std::move(sock), config, std::make_shared<Logger>(l)) {} + virtual ~Dht(); /** diff --git a/include/opendht/dht_interface.h b/include/opendht/dht_interface.h index b5b3c33106bce69c945bf876d7e63a7c5c54e72c..71b6fbf97f569c82e56c27dfde9a32b7660cf42d 100644 --- a/include/opendht/dht_interface.h +++ b/include/opendht/dht_interface.h @@ -30,7 +30,8 @@ namespace net { class OPENDHT_PUBLIC DhtInterface { public: DhtInterface() = default; - DhtInterface(const Logger& l) : DHT_LOG(l) {}; + DhtInterface(const Logger& l) : logger_(std::make_shared<Logger>(l)) {}; + DhtInterface(const std::shared_ptr<Logger>& l) : logger_(l) {}; virtual ~DhtInterface() = default; // [[deprecated]] @@ -221,15 +222,23 @@ public: * Enable or disable logging of DHT internal messages */ virtual void setLoggers(LogMethod error = {}, LogMethod warn = {}, LogMethod debug = {}) { - DHT_LOG.DBG = debug; - DHT_LOG.WARN = warn; - DHT_LOG.ERR = error; + if (logger_) { + logger_->DBG = std::move(debug); + logger_->WARN = std::move(warn); + logger_->ERR = std::move(error); + } else + logger_= std::make_shared<Logger>(std::move(error), std::move(warn), std::move(debug)); } virtual void setLogger(const Logger& l) { - DHT_LOG.DBG = l.DBG; - DHT_LOG.WARN = l.WARN; - DHT_LOG.ERR = l.ERR; + if (logger_) + *logger_ = l; + else + logger_= std::make_shared<Logger>(l); + } + + virtual void setLogger(const std::shared_ptr<Logger>& l) { + logger_ = l; } /** @@ -237,7 +246,8 @@ public: */ virtual void setLogFilter(const InfoHash& f) { - DHT_LOG.setFilter(f); + if (logger_) + logger_->setFilter(f); } virtual void setPushNotificationToken(const std::string&) {}; @@ -249,7 +259,7 @@ public: virtual void pushNotificationReceived(const std::map<std::string, std::string>& data) = 0; protected: - Logger DHT_LOG; + std::shared_ptr<Logger> logger_ {}; }; } // namespace dht diff --git a/include/opendht/dht_proxy_client.h b/include/opendht/dht_proxy_client.h index 5630f78b37090c0915b3ff48cb3181191435da10..1f8075daea946d2fd9d08995b37d3f795ae953bc 100644 --- a/include/opendht/dht_proxy_client.h +++ b/include/opendht/dht_proxy_client.h @@ -406,7 +406,6 @@ private: Json::StreamWriterBuilder jsonBuilder_; std::unique_ptr<Json::CharReader> jsonReader_; - std::shared_ptr<dht::Logger> logger_; std::shared_ptr<http::Request> buildRequest(const std::string& target = {}); }; diff --git a/include/opendht/dhtrunner.h b/include/opendht/dhtrunner.h index 551601efc0f8cf956fad23c4c60d3af617c29c47..9a6c3bb59efcfafdd17301581835b752acd501f9 100644 --- a/include/opendht/dhtrunner.h +++ b/include/opendht/dhtrunner.h @@ -327,7 +327,10 @@ public: std::vector<ValuesExport> exportValues() const; - void setLogger(const Logger& logger = {}); + void setLogger(const Sp<Logger>& logger = {}); + void setLogger(const Logger& logger) { + setLogger(std::make_shared<Logger>(logger)); + } void setLoggers(LogMethod err = {}, LogMethod warn = {}, LogMethod debug = {}); /** diff --git a/include/opendht/network_engine.h b/include/opendht/network_engine.h index 62d4e5603035a868ccbc70dab8f639a6c70c6ee9..5d2c32150c3276c34cb4b5a387f5ce8309e98032 100644 --- a/include/opendht/network_engine.h +++ b/include/opendht/network_engine.h @@ -212,12 +212,12 @@ public: using RequestCb = std::function<void(const Request&, RequestAnswer&&)>; using RequestExpiredCb = std::function<void(const Request&, bool)>; - NetworkEngine(Logger& log, Scheduler& scheduler, std::unique_ptr<DatagramSocket>&& sock); + NetworkEngine(const Sp<Logger>& log, Scheduler& scheduler, std::unique_ptr<DatagramSocket>&& sock); NetworkEngine( InfoHash& myid, NetworkConfig config, std::unique_ptr<DatagramSocket>&& sock, - Logger& log, + const Sp<Logger>& log, Scheduler& scheduler, decltype(NetworkEngine::onError)&& onError, decltype(NetworkEngine::onNewNode)&& onNewNode, @@ -525,7 +525,7 @@ private: const InfoHash& myid; const NetworkConfig config {}; const std::unique_ptr<DatagramSocket> dht_socket; - const Logger& DHT_LOG; + Sp<Logger> logger_; NodeCache cache {}; diff --git a/include/opendht/network_utils.h b/include/opendht/network_utils.h index 24209a023c545222602f50f84f1f4e9dcbfe328b..0b3c033ae3debd15325583b846cceec0fd231cec 100644 --- a/include/opendht/network_utils.h +++ b/include/opendht/network_utils.h @@ -97,8 +97,8 @@ private: class OPENDHT_PUBLIC UdpSocket : public DatagramSocket { public: - UdpSocket(in_port_t port, const Logger& l = {}); - UdpSocket(const SockAddr& bind4, const SockAddr& bind6, const Logger& l = {}); + UdpSocket(in_port_t port, const std::shared_ptr<Logger>& l = {}); + UdpSocket(const SockAddr& bind4, const SockAddr& bind6, const std::shared_ptr<Logger>& l = {}); ~UdpSocket(); int sendTo(const SockAddr& dest, const uint8_t* data, size_t size, bool replied) override; @@ -118,7 +118,7 @@ public: void stop() override; private: - Logger logger; + std::shared_ptr<Logger> logger; int s4 {-1}; int s6 {-1}; int stopfd {-1}; diff --git a/include/opendht/securedht.h b/include/opendht/securedht.h index 19389215ff322d429c02ed48f682d607742a27be..b53e73000dfb7580718ad7f74d8f32358139d98e 100644 --- a/include/opendht/securedht.h +++ b/include/opendht/securedht.h @@ -316,11 +316,16 @@ public: dht_->setLogger(logger); } + void setLogger(const std::shared_ptr<Logger>& logger) override { + DhtInterface::setLogger(logger); + dht_->setLogger(logger); + } + /** * Only print logs related to the given InfoHash (if given), or disable filter (if zeroes). */ void setLogFilter(const InfoHash& f) override { - DHT_LOG.setFilter(f); + DhtInterface::setLogFilter(f); dht_->setLogFilter(f); } diff --git a/src/dht.cpp b/src/dht.cpp index f7c3de54c72de0b9966c1675b1109568f3e8bd91..c98e692d1a61971becbafedaac5501ea2a080710 100644 --- a/src/dht.cpp +++ b/src/dht.cpp @@ -69,14 +69,16 @@ Dht::shutdown(ShutdownCallback cb) auto remaining = std::make_shared<int>(0); auto str_donecb = [=](bool, const std::vector<Sp<Node>>&) { --*remaining; - DHT_LOG.w("shuting down node: %u ops remaining", *remaining); + if (logger_) + logger_->w("shuting down node: %u ops remaining", *remaining); if (!*remaining && cb) { cb(); } }; for (auto& str : store) *remaining += maintainStorage(str, true, str_donecb); - DHT_LOG.w("shuting down node: after storage, %u ops", *remaining); + if (logger_) + logger_->w("shuting down node: after storage, %u ops", *remaining); if (!*remaining) { if (cb) cb(); @@ -101,7 +103,8 @@ void Dht::sendCachedPing(Bucket& b) { if (b.cached) - DHT_LOG.d(b.cached->id, "[node %s] sending ping to cached node", b.cached->toString().c_str()); + if (logger_) + logger_->d(b.cached->id, "[node %s] sending ping to cached node", b.cached->toString().c_str()); b.sendCachedPing(network_engine); } @@ -208,7 +211,8 @@ Dht::expireSearches() auto& sr = *srp.second; auto b = sr.callbacks.empty() && sr.announce.empty() && sr.listeners.empty() && sr.step_time < t; if (b) { - DHT_LOG.d(srp.first, "[search %s] removing search", srp.first.toString().c_str()); + if (logger_) + logger_->d(srp.first, "[search %s] removing search", srp.first.toString().c_str()); sr.clear(); return b; } else { return false; } @@ -287,7 +291,8 @@ void Dht::paginate(std::weak_ptr<Search> ws, Sp<Query> query, SearchNode* n) { if (vid == Value::INVALID_ID) continue; auto query_for_vid = std::make_shared<Query>(Select {}, Where {}.id(vid)); sn->pagination_queries[query].push_back(query_for_vid); - DHT_LOG.d(id, sn->node->id, "[search %s] [node %s] sending %s", + if (logger_) + logger_->d(id, sn->node->id, "[search %s] [node %s] sending %s", id.toString().c_str(), sn->node->toString().c_str(), query_for_vid->toString().c_str()); sn->getStatus[query_for_vid] = network_engine.sendGetValues(status.node, id, @@ -297,7 +302,8 @@ void Dht::paginate(std::weak_ptr<Search> ws, Sp<Query> query, SearchNode* n) { std::bind(&Dht::searchNodeGetExpired, this, _1, _2, ws, query_for_vid) ); } catch (const std::out_of_range&) { - DHT_LOG.e(id, sn->node->id, "[search %s] [node %s] received non-id field in response to "\ + if (logger_) + logger_->e(id, sn->node->id, "[search %s] [node %s] received non-id field in response to "\ "'SELECT id' request...", id.toString().c_str(), sn->node->toString().c_str()); } @@ -306,7 +312,8 @@ void Dht::paginate(std::weak_ptr<Search> ws, Sp<Query> query, SearchNode* n) { /* add pagination query key for tracking ongoing requests. */ n->pagination_queries[query].push_back(select_q); - DHT_LOG.d(sr->id, n->node->id, "[search %s] [node %s] sending %s", + if (logger_) + logger_->d(sr->id, n->node->id, "[search %s] [node %s] sending %s", sr->id.toString().c_str(), n->node->toString().c_str(), select_q->toString().c_str()); n->getStatus[select_q] = network_engine.sendGetValues(n->node, sr->id, @@ -350,8 +357,9 @@ Dht::searchSendGetValues(Sp<Search> sr, SearchNode* pn, bool update) if (not n) return nullptr; - /*DHT_LOG.d(sr->id, n->node->id, "[search %s] [node %s] sending 'find_node'", - sr->id.toString().c_str(), n->node->toString().c_str());*/ + /* if (logger_) + logger_->d(sr->id, n->node->id, "[search %s] [node %s] sending 'find_node'", + sr->id.toString().c_str(), n->node->toString().c_str());*/ n->getStatus[query] = network_engine.sendFindNode(n->node, sr->id, -1, @@ -364,8 +372,9 @@ Dht::searchSendGetValues(Sp<Search> sr, SearchNode* pn, bool update) if (query and not query->select.getSelection().empty()) { /* The request contains a select. No need to paginate... */ - /*DHT_LOG.d(sr->id, n->node->id, "[search %s] [node %s] sending 'get'", - sr->id.toString().c_str(), n->node->toString().c_str());*/ + /* if (logger_) + logger_->d(sr->id, n->node->id, "[search %s] [node %s] sending 'get'", + sr->id.toString().c_str(), n->node->toString().c_str());*/ n->getStatus[query] = network_engine.sendGetValues(n->node, sr->id, *query, @@ -440,7 +449,8 @@ void Dht::searchSendAnnounceValue(const Sp<Search>& sr) { auto next_refresh_time = now + getType(a.value->type).expiration; /* only put the value if the node doesn't already have it */ if (not hasValue or seq_no < a.value->seq) { - DHT_LOG.d(sr->id, sn->node->id, "[search %s] [node %s] sending 'put' (vid: %d)", + if (logger_) + logger_->d(sr->id, sn->node->id, "[search %s] [node %s] sending 'put' (vid: %d)", sr->id.toString().c_str(), sn->node->toString().c_str(), a.value->id); auto created = a.permanent ? time_point::max() : a.created; sn->acked[a.value->id] = { @@ -448,14 +458,16 @@ void Dht::searchSendAnnounceValue(const Sp<Search>& sr) { next_refresh_time }; } else if (hasValue and a.permanent) { - DHT_LOG.w(sr->id, sn->node->id, "[search %s] [node %s] sending 'refresh' (vid: %d)", + if (logger_) + logger_->w(sr->id, sn->node->id, "[search %s] [node %s] sending 'refresh' (vid: %d)", sr->id.toString().c_str(), sn->node->toString().c_str(), a.value->id); sn->acked[a.value->id] = { network_engine.sendRefreshValue(sn->node, sr->id, a.value->id, sn->token, onDone, onExpired), next_refresh_time }; } else { - DHT_LOG.w(sr->id, sn->node->id, "[search %s] [node %s] already has value (vid: %d). Aborting.", + if (logger_) + logger_->w(sr->id, sn->node->id, "[search %s] [node %s] already has value (vid: %d). Aborting.", sr->id.toString().c_str(), sn->node->toString().c_str(), a.value->id); auto ack_req = std::make_shared<net::Request>(net::Request::State::COMPLETED); ack_req->reply_time = now; @@ -491,7 +503,8 @@ void Dht::searchSendAnnounceValue(const Sp<Search>& sr) { if (a.permanent) { sendQuery = true; } else { - DHT_LOG.w(sr->id, n.node->id, "[search %s] [node %s] sending 'put' (vid: %d)", + if (logger_) + logger_->w(sr->id, n.node->id, "[search %s] [node %s] sending 'put' (vid: %d)", sr->id.toString().c_str(), n.node->toString().c_str(), a.value->id); n.acked[a.value->id] = { network_engine.sendAnnounceValue(n.node, sr->id, a.value, a.created, n.token, onDone, onExpired), @@ -504,7 +517,8 @@ void Dht::searchSendAnnounceValue(const Sp<Search>& sr) { if (sendQuery) { if (not probe_query) probe_query = std::make_shared<Query>(Select {}.field(Value::Field::Id).field(Value::Field::SeqNum)); - DHT_LOG.d(sr->id, n.node->id, "[search %s] [node %s] sending %s", + if (logger_) + logger_->d(sr->id, n.node->id, "[search %s] [node %s] sending %s", sr->id.toString().c_str(), n.node->toString().c_str(), probe_query->toString().c_str()); n.probe_query = probe_query; n.getStatus[probe_query] = network_engine.sendGetValues(n.node, @@ -528,7 +542,8 @@ Dht::searchSynchedNodeListen(const Sp<Search>& sr, SearchNode& n) auto list_token = l.first; if (n.getListenTime(query) > scheduler.time()) continue; - // DHT_LOG.d(sr->id, n.node->id, "[search %s] [node %s] sending 'listen'", + // if (logger_) + // logger_->d(sr->id, n.node->id, "[search %s] [node %s] sending 'listen'", // sr->id.toString().c_str(), n.node->toString().c_str()); auto r = n.listenStatus.find(query); @@ -609,7 +624,8 @@ Dht::searchStep(Sp<Search> sr) const auto& now = scheduler.time(); /*if (auto req_count = sr->currentlySolicitedNodeCount()) - DHT_LOG.d(sr->id, "[search %s IPv%c] step (%d requests)", + if (logger_) + logger_->d(sr->id, "[search %s IPv%c] step (%d requests)", sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6', req_count);*/ sr->step_time = now; @@ -648,7 +664,7 @@ Dht::searchStep(Sp<Search> sr) // true if this node is part of the target nodes cluter. /*bool in = sr->id.xorCmp(myid, sr->nodes.back().node->id) < 0; - DHT_LOG_DBG("[search %s IPv%c] synced%s", + logger__DBG("[search %s IPv%c] synced%s", sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6', in ? ", in" : "");*/ if (not sr->listeners.empty()) { @@ -674,7 +690,8 @@ Dht::searchStep(Sp<Search> sr) if (sr->getNumberOfConsecutiveBadNodes() >= std::min(sr->nodes.size(), static_cast<size_t>(SEARCH_MAX_BAD_NODES))) { - DHT_LOG.w(sr->id, "[search %s IPv%c] expired", sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6'); + if (logger_) + logger_->w(sr->id, "[search %s IPv%c] expired", sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6'); sr->expire(); connectivityChanged(sr->af); } @@ -689,7 +706,8 @@ unsigned Dht::refill(Dht::Search& sr) { auto cached_nodes = network_engine.getCachedNodes(sr.id, sr.af, SEARCH_NODES); if (cached_nodes.empty()) { - DHT_LOG.e(sr.id, "[search %s IPv%c] no nodes from cache while refilling search", + if (logger_) + logger_->e(sr.id, "[search %s IPv%c] no nodes from cache while refilling search", sr.id.toString().c_str(), (sr.af == AF_INET) ? '4' : '6'); return 0; } @@ -700,7 +718,8 @@ unsigned Dht::refill(Dht::Search& sr) { if (sr.insertNode(i, now)) ++inserted; } - DHT_LOG.d(sr.id, "[search %s IPv%c] refilled search with %u nodes from node cache", + if (logger_) + logger_->d(sr.id, "[search %s IPv%c] refilled search with %u nodes from node cache", sr.id.toString().c_str(), (sr.af == AF_INET) ? '4' : '6', inserted); return inserted; } @@ -711,7 +730,8 @@ Sp<Dht::Search> Dht::search(const InfoHash& id, sa_family_t af, GetCallback gcb, QueryCallback qcb, DoneCallback dcb, Value::Filter f, const Sp<Query>& q) { if (!isRunning(af)) { - DHT_LOG.e(id, "[search %s IPv%c] unsupported protocol", id.toString().c_str(), (af == AF_INET) ? '4' : '6'); + if (logger_) + logger_->e(id, "[search %s IPv%c] unsupported protocol", id.toString().c_str(), (af == AF_INET) ? '4' : '6'); if (dcb) dcb(false, {}); return {}; @@ -738,7 +758,8 @@ Dht::search(const InfoHash& id, sa_family_t af, GetCallback gcb, QueryCallback q } } if (not sr) { - DHT_LOG.e(id, "[search %s IPv%c] maximum number of searches reached !", id.toString().c_str(), (af == AF_INET) ? '4' : '6'); + if (logger_) + logger_->e(id, "[search %s IPv%c] maximum number of searches reached !", id.toString().c_str(), (af == AF_INET) ? '4' : '6'); return {}; } } @@ -751,7 +772,8 @@ Dht::search(const InfoHash& id, sa_family_t af, GetCallback gcb, QueryCallback q sr->nodes.clear(); sr->nodes.reserve(SEARCH_NODES+1); sr->nextSearchStep = scheduler.add(time_point::max(), std::bind(&Dht::searchStep, this, sr)); - DHT_LOG.w(id, "[search %s IPv%c] new search", id.toString().c_str(), (af == AF_INET) ? '4' : '6'); + if (logger_) + logger_->w(id, "[search %s IPv%c] new search", id.toString().c_str(), (af == AF_INET) ? '4' : '6'); if (search_id == 0) search_id++; } @@ -820,15 +842,16 @@ Dht::listenTo(const InfoHash& id, sa_family_t af, ValueCallback cb, Value::Filte { if (!isRunning(af)) return 0; - // DHT_LOG_ERR("[search %s IPv%c] search_time is now in %lfs", sr->id.toString().c_str(), (sr->af == AF_INET) ? '4' : '6', print_dt(tm-clock::now())); + // logger__ERR("[search %s IPv%c] search_time is now in %lfs", sr->id.toString().c_str(), (sr->af == AF_INET) ? '4' : '6', print_dt(tm-clock::now())); - //DHT_LOG_WARN("listenTo %s", id.toString().c_str()); + //logger__WARN("listenTo %s", id.toString().c_str()); auto& srs = searches(af); auto srp = srs.find(id); Sp<Search> sr = (srp == srs.end()) ? search(id, af) : srp->second; if (!sr) throw DhtException("Can't create search"); - DHT_LOG.e(id, "[search %s IPv%c] listen", id.toString().c_str(), (af == AF_INET) ? '4' : '6'); + if (logger_) + logger_->e(id, "[search %s IPv%c] listen", id.toString().c_str(), (af == AF_INET) ? '4' : '6'); return sr->listen(cb, f, q, scheduler); } @@ -873,10 +896,12 @@ Dht::cancelListen(const InfoHash& id, size_t token) auto it = listeners.find(token); if (it == listeners.end()) { - DHT_LOG.w(id, "Listen token not found: %d", token); + if (logger_) + logger_->w(id, "Listen token not found: %d", token); return false; } - DHT_LOG.d(id, "cancelListen %s with token %d", id.toString().c_str(), token); + if (logger_) + logger_->d(id, "cancelListen %s with token %d", id.toString().c_str(), token); if (auto tokenlocal = std::get<0>(it->second)) { auto st = store.find(id); if (st != store.end()) @@ -929,7 +954,8 @@ Dht::put(const InfoHash& id, Sp<Value> val, DoneCallback callback, time_point cr created = std::min(now, created); storageStore(id, val, created, {}, permanent); - DHT_LOG.d(id, "put: adding %s -> %s", id.toString().c_str(), val->toString().c_str()); + if (logger_) + logger_->d(id, "put: adding %s -> %s", id.toString().c_str(), val->toString().c_str()); auto op = std::make_shared<OpStatus>(); auto donecb = [callback](const std::vector<Sp<Node>>& nodes, OpStatus& op) { @@ -940,13 +966,15 @@ Dht::put(const InfoHash& id, Sp<Value> val, DoneCallback callback, time_point cr } }; announce(id, AF_INET, val, [=](bool ok4, const std::vector<Sp<Node>>& nodes) { - DHT_LOG.d(id, "Announce done IPv4 %d", ok4); + if (logger_) + logger_->d(id, "Announce done IPv4 %d", ok4); auto& o = *op; o.status4 = {true, ok4}; donecb(nodes, o); }, created, permanent); announce(id, AF_INET6, val, [=](bool ok6, const std::vector<Sp<Node>>& nodes) { - DHT_LOG.d(id, "Announce done IPv6 %d", ok6); + if (logger_) + logger_->d(id, "Announce done IPv6 %d", ok6); auto& o = *op; o.status6 = {true, ok6}; donecb(nodes, o); @@ -1011,12 +1039,12 @@ Dht::get(const InfoHash& id, GetCallback getcb, DoneCallback donecb, Value::Filt gcb(getLocal(id, f)); Dht::search(id, AF_INET, gcb, {}, [=](bool ok, const std::vector<Sp<Node>>& nodes) { - //DHT_LOG_WARN("DHT done IPv4"); + //logger__WARN("DHT done IPv4"); op->status4 = {true, ok}; doneCallbackWrapper(donecb, nodes, *op); }, f, q); Dht::search(id, AF_INET6, gcb, {}, [=](bool ok, const std::vector<Sp<Node>>& nodes) { - //DHT_LOG_WARN("DHT done IPv6"); + //logger__WARN("DHT done IPv6"); op->status6 = {true, ok}; doneCallbackWrapper(donecb, nodes, *op); }, f, q); @@ -1062,12 +1090,12 @@ void Dht::query(const InfoHash& id, QueryCallback cb, DoneCallback done_cb, Quer auto sq = std::make_shared<Query>(std::move(q)); Dht::search(id, AF_INET, {}, qcb, [=](bool ok, const std::vector<Sp<Node>>& nodes) { - //DHT_LOG_WARN("DHT done IPv4"); + //logger__WARN("DHT done IPv4"); op->status4 = {true, ok}; doneCallbackWrapper(done_cb, nodes, *op); }, f, sq); Dht::search(id, AF_INET6, {}, qcb, [=](bool ok, const std::vector<Sp<Node>>& nodes) { - //DHT_LOG_WARN("DHT done IPv6"); + //logger__WARN("DHT done IPv6"); op->status6 = {true, ok}; doneCallbackWrapper(done_cb, nodes, *op); }, f, sq); @@ -1141,7 +1169,8 @@ Dht::storageChanged(const InfoHash& id, Storage& st, ValueStorage& v, bool newVa { if (newValue) { if (not st.local_listeners.empty()) { - DHT_LOG.d(id, "[store %s] %lu local listeners", id.toString().c_str(), st.local_listeners.size()); + if (logger_) + logger_->d(id, "[store %s] %lu local listeners", id.toString().c_str(), st.local_listeners.size()); std::vector<std::pair<ValueCallback, std::vector<Sp<Value>>>> cbs; cbs.reserve(st.local_listeners.size()); for (const auto& l : st.local_listeners) { @@ -1149,7 +1178,8 @@ Dht::storageChanged(const InfoHash& id, Storage& st, ValueStorage& v, bool newVa if (not l.second.filter or l.second.filter(*v.data)) vals.push_back(v.data); if (not vals.empty()) { - DHT_LOG.d(id, "[store %s] sending update local listener with token %lu", + if (logger_) + logger_->d(id, "[store %s] sending update local listener with token %lu", id.toString().c_str(), l.first); cbs.emplace_back(l.second.get_cb, std::move(vals)); @@ -1162,13 +1192,15 @@ Dht::storageChanged(const InfoHash& id, Storage& st, ValueStorage& v, bool newVa } if (not st.listeners.empty()) { - DHT_LOG.d(id, "[store %s] %lu remote listeners", id.toString().c_str(), st.listeners.size()); + if (logger_) + logger_->d(id, "[store %s] %lu remote listeners", id.toString().c_str(), st.listeners.size()); for (const auto& node_listeners : st.listeners) { for (const auto& l : node_listeners.second) { auto f = l.second.query.where.getFilter(); if (f and not f(*v.data)) continue; - DHT_LOG.w(id, node_listeners.first->id, "[store %s] [node %s] sending update", + if (logger_) + logger_->w(id, node_listeners.first->id, "[store %s] [node %s] sending update", id.toString().c_str(), node_listeners.first->toString().c_str()); std::vector<Sp<Value>> vals {}; @@ -1266,11 +1298,13 @@ Dht::expireStore(decltype(store)::iterator i) total_store_size += stats.first; total_values -= stats.second.size(); if (not stats.second.empty()) { - DHT_LOG.d(id, "[store %s] discarded %ld expired values (%ld bytes)", + if (logger_) + logger_->d(id, "[store %s] discarded %ld expired values (%ld bytes)", id.toString().c_str(), stats.second.size(), -stats.first); if (not st.listeners.empty()) { - DHT_LOG.d(id, "[store %s] %lu remote listeners", id.toString().c_str(), st.listeners.size()); + if (logger_) + logger_->d(id, "[store %s] %lu remote listeners", id.toString().c_str(), st.listeners.size()); std::vector<Value::Id> ids; ids.reserve(stats.second.size()); @@ -1279,7 +1313,8 @@ Dht::expireStore(decltype(store)::iterator i) for (const auto& node_listeners : st.listeners) { for (const auto& l : node_listeners.second) { - DHT_LOG.w(id, node_listeners.first->id, "[store %s] [node %s] sending expired", + if (logger_) + logger_->w(id, node_listeners.first->id, "[store %s] [node %s] sending expired", id.toString().c_str(), node_listeners.first->toString().c_str()); Blob ntoken = makeToken(node_listeners.first->getAddr(), false); @@ -1309,7 +1344,8 @@ Dht::expireStore() expireStore(i); if (i->second.empty() && i->second.listeners.empty() && i->second.local_listeners.empty()) { - DHT_LOG.d(i->first, "[store %s] discarding empty storage", i->first.toString().c_str()); + if (logger_) + logger_->d(i->first, "[store %s] discarding empty storage", i->first.toString().c_str()); i = store.erase(i); } else @@ -1320,7 +1356,8 @@ Dht::expireStore() while (total_store_size > max_store_size) { // find IP using the most storage if (store_quota.empty()) { - DHT_LOG.w("No space left: local data consumes all the quota!"); + if (logger_) + logger_->w("No space left: local data consumes all the quota!"); break; } auto largest = store_quota.begin(); @@ -1328,7 +1365,8 @@ Dht::expireStore() if (it->second.size() > largest->second.size()) largest = it; } - DHT_LOG.w("No space left: discarding value of largest consumer %s", largest->first.toString().c_str()); + if (logger_) + logger_->w("No space left: discarding value of largest consumer %s", largest->first.toString().c_str()); while (true) { auto exp_value = largest->second.getOldest(); auto storage = store.find(exp_value.first); @@ -1336,7 +1374,8 @@ Dht::expireStore() auto ret = storage->second.remove(exp_value.first, exp_value.second); total_store_size += ret.size_diff; total_values += ret.values_diff; - DHT_LOG.w("Discarded %ld bytes, still %ld used", largest->first.toString().c_str(), total_store_size); + if (logger_) + logger_->w("Discarded %ld bytes, still %ld used", largest->first.toString().c_str(), total_store_size); if (ret.values_diff) break; } @@ -1575,7 +1614,8 @@ Dht::dumpTables() const out << getStorageLog() << std::endl; - DHT_LOG.d("%s", out.str().c_str()); + if (logger_) + logger_->d("%s", out.str().c_str()); } std::string @@ -1701,11 +1741,11 @@ fromDhtConfig(const Config& config) return netConf; } -Dht::Dht() : store(), network_engine(DHT_LOG, scheduler, {}) {} +Dht::Dht() : store(), network_engine(logger_, scheduler, {}) {} -Dht::Dht(std::unique_ptr<net::DatagramSocket>&& sock, const Config& config, const Logger& l) +Dht::Dht(std::unique_ptr<net::DatagramSocket>&& sock, const Config& config, const Sp<Logger>& l) : DhtInterface(l), myid(config.node_id ? config.node_id : InfoHash::getRandom()), store(), store_quota(), - network_engine(myid, fromDhtConfig(config), std::move(sock), DHT_LOG, scheduler, + network_engine(myid, fromDhtConfig(config), std::move(sock), logger_, scheduler, std::bind(&Dht::onError, this, _1, _2), std::bind(&Dht::onNewNode, this, _1, _2), std::bind(&Dht::onReportedAddr, this, _1, _2), @@ -1746,7 +1786,8 @@ Dht::Dht(std::unique_ptr<net::DatagramSocket>&& sock, const Config& config, cons expire(); - DHT_LOG.d("DHT node initialised with ID %s", myid.toString().c_str()); + if (logger_) + logger_->d("DHT node initialised with ID %s", myid.toString().c_str()); if (not persistPath.empty()) loadState(persistPath); @@ -1755,7 +1796,7 @@ Dht::Dht(std::unique_ptr<net::DatagramSocket>&& sock, const Config& config, cons bool Dht::neighbourhoodMaintenance(RoutingTable& list) { - //DHT_LOG_DBG("neighbourhoodMaintenance"); + //logger__DBG("neighbourhoodMaintenance"); auto b = list.findBucket(myid); if (b == list.end()) return false; @@ -1780,7 +1821,8 @@ Dht::neighbourhoodMaintenance(RoutingTable& list) auto n = q->randomNode(); if (n) { - DHT_LOG.d(id, n->id, "[node %s] sending [find %s] for neighborhood maintenance", + if (logger_) + logger_->d(id, n->id, "[node %s] sending [find %s] for neighborhood maintenance", n->toString().c_str(), id.toString().c_str()); /* Since our node-id is the same in both DHTs, it's probably profitable to query both families. */ @@ -1833,13 +1875,15 @@ Dht::bucketMaintenance(RoutingTable& list) want = WANT4 | WANT6; } - DHT_LOG.d(id, n->id, "[node %s] sending find %s for bucket maintenance", n->toString().c_str(), id.toString().c_str()); + if (logger_) + logger_->d(id, n->id, "[node %s] sending find %s for bucket maintenance", n->toString().c_str(), id.toString().c_str()); //auto start = scheduler.time(); network_engine.sendFindNode(n, id, want, nullptr, [this,n](const net::Request&, bool over) { if (over) { const auto& end = scheduler.time(); // using namespace std::chrono; - // DHT_LOG.d(n->id, "[node %s] bucket maintenance op expired after %llu ms", n->toString().c_str(), duration_cast<milliseconds>(end-start).count()); + // if (logger_) + // logger_->d(n->id, "[node %s] bucket maintenance op expired after %llu ms", n->toString().c_str(), duration_cast<milliseconds>(end-start).count()); scheduler.edit(nextNodesConfirmation, end + Node::MAX_RESPONSE_TIME); } }); @@ -1856,7 +1900,8 @@ Dht::dataPersistence(InfoHash id) const auto& now = scheduler.time(); auto str = store.find(id); if (str != store.end() and now > str->second.maintenance_time) { - DHT_LOG.d(id, "[storage %s] maintenance (%u values, %u bytes)", + if (logger_) + logger_->d(id, "[storage %s] maintenance (%u values, %u bytes)", id.toString().c_str(), str->second.valueCount(), str->second.totalSize()); maintainStorage(*str); str->second.maintenance_time = now + MAX_STORAGE_MAINTENANCE_EXPIRE_TIME; @@ -1903,7 +1948,8 @@ Dht::maintainStorage(decltype(store)::value_type& storage, bool force, const Don } if (not want4 and not want6) { - DHT_LOG.d(storage.first, "Discarding storage values %s", storage.first.toString().c_str()); + if (logger_) + logger_->d(storage.first, "Discarding storage values %s", storage.first.toString().c_str()); auto diff = storage.second.clear(); total_store_size += diff.size_diff; total_values += diff.values_diff; @@ -1920,7 +1966,8 @@ Dht::periodic(const uint8_t *buf, size_t buflen, SockAddr from) try { network_engine.processMessage(buf, buflen, std::move(from)); } catch (const std::exception& e) { - DHT_LOG.w("Can't process message: %s", e.what()); + if (logger_) + logger_->w("Can't process message: %s", e.what()); } } return scheduler.run(); @@ -1947,11 +1994,13 @@ Dht::confirmNodes() const auto& now = scheduler.time(); if (searches4.empty() and getStatus(AF_INET) == NodeStatus::Connected) { - DHT_LOG.d(myid, "[confirm nodes] initial IPv4 'get' for my id (%s)", myid.toString().c_str()); + if (logger_) + logger_->d(myid, "[confirm nodes] initial IPv4 'get' for my id (%s)", myid.toString().c_str()); search(myid, AF_INET); } if (searches6.empty() and getStatus(AF_INET6) == NodeStatus::Connected) { - DHT_LOG.d(myid, "[confirm nodes] initial IPv6 'get' for my id (%s)", myid.toString().c_str()); + if (logger_) + logger_->d(myid, "[confirm nodes] initial IPv6 'get' for my id (%s)", myid.toString().c_str()); search(myid, AF_INET6); } @@ -2026,14 +2075,16 @@ Dht::importValues(const std::vector<ValuesExport>& import) val_time = time_point{time_point::duration{valel.via.array.ptr[0].as<time_point::duration::rep>()}}; tmp_val.msgpack_unpack(valel.via.array.ptr[1]); } catch (const std::exception&) { - DHT_LOG.e(node.first, "Error reading value at %s", node.first.toString().c_str()); + if (logger_) + logger_->e(node.first, "Error reading value at %s", node.first.toString().c_str()); continue; } val_time = std::min(val_time, now); storageStore(node.first, std::make_shared<Value>(std::move(tmp_val)), val_time); } } catch (const std::exception&) { - DHT_LOG.e(node.first, "Error reading values at %s", node.first.toString().c_str()); + if (logger_) + logger_->e(node.first, "Error reading values at %s", node.first.toString().c_str()); continue; } } @@ -2085,7 +2136,8 @@ void Dht::pingNode(SockAddr sa, DoneCallbackSimple&& cb) { scheduler.syncTime(); - DHT_LOG.d("Sending ping to %s", sa.toString().c_str()); + if (logger_) + logger_->d("Sending ping to %s", sa.toString().c_str()); auto& count = sa.getFamily() == AF_INET ? pending_pings4 : pending_pings6; count++; network_engine.sendPing(std::move(sa), [&count,cb](const net::Request&, net::RequestAnswer&&) { @@ -2105,7 +2157,8 @@ void Dht::onError(Sp<net::Request> req, net::DhtProtocolException e) { const auto& node = req->node; if (e.getCode() == net::DhtProtocolException::UNAUTHORIZED) { - DHT_LOG.e(node->id, "[node %s] token flush", node->toString().c_str()); + if (logger_) + logger_->e(node->id, "[node %s] token flush", node->toString().c_str()); node->authError(); node->cancelRequest(req); for (auto& srp : searches(node->getFamily())) { @@ -2120,7 +2173,8 @@ Dht::onError(Sp<net::Request> req, net::DhtProtocolException e) { } } } else if (e.getCode() == net::DhtProtocolException::NOT_FOUND) { - DHT_LOG.e(node->id, "[node %s] returned error 404: storage not found", node->toString().c_str()); + if (logger_) + logger_->e(node->id, "[node %s] returned error 404: storage not found", node->toString().c_str()); node->cancelRequest(req); } } @@ -2155,7 +2209,8 @@ net::RequestAnswer Dht::onGetValues(Sp<Node> node, const InfoHash& hash, want_t, const Query& query) { if (not hash) { - DHT_LOG.w("[node %s] Eek! Got get_values with no info_hash", node->toString().c_str()); + if (logger_) + logger_->w("[node %s] Eek! Got get_values with no info_hash", node->toString().c_str()); throw net::DhtProtocolException { net::DhtProtocolException::NON_AUTHORITATIVE_INFORMATION, net::DhtProtocolException::GET_NO_INFOHASH @@ -2169,7 +2224,8 @@ Dht::onGetValues(Sp<Node> node, const InfoHash& hash, want_t, const Query& query answer.nodes6 = buckets6.findClosestNodes(hash, now, TARGET_NODES); if (st != store.end() && not st->second.empty()) { answer.values = st->second.get(query.where.getFilter()); - DHT_LOG.d(hash, "[node %s] sending %u values", node->toString().c_str(), answer.values.size()); + if (logger_) + logger_->d(hash, "[node %s] sending %u values", node->toString().c_str(), answer.values.size()); } return answer; } @@ -2180,16 +2236,19 @@ void Dht::onGetValuesDone(const Sp<Node>& node, const Sp<Query>& orig_query) { if (not sr) { - DHT_LOG.w("[search unknown] got reply to 'get'. Ignoring."); + if (logger_) + logger_->w("[search unknown] got reply to 'get'. Ignoring."); return; } - /*DHT_LOG.d(sr->id, "[search %s] [node %s] got reply to 'get' with %u nodes", + /* if (logger_) + logger_->d(sr->id, "[search %s] [node %s] got reply to 'get' with %u nodes", sr->id.toString().c_str(), node->toString().c_str(), a.nodes4.size()+a.nodes6.size());*/ if (not a.ntoken.empty()) { if (not a.values.empty() or not a.fields.empty()) { - DHT_LOG.d(sr->id, node->id, "[search %s] [node %s] found %u values", + if (logger_) + logger_->d(sr->id, node->id, "[search %s] [node %s] found %u values", sr->id.toString().c_str(), node->toString().c_str(), a.values.size()); for (auto& getp : sr->callbacks) { /* call all callbacks for this search */ auto& get = getp.second; @@ -2232,11 +2291,13 @@ void Dht::onGetValuesDone(const Sp<Node>& node, for (auto& l : tmp_lists) l.first(l.second, false);*/ } else if (not a.expired_values.empty()) { - DHT_LOG.w(sr->id, node->id, "[search %s] [node %s] %u expired values", + if (logger_) + logger_->w(sr->id, node->id, "[search %s] [node %s] %u expired values", sr->id.toString().c_str(), node->toString().c_str(), a.expired_values.size()); } } else { - DHT_LOG.w(sr->id, "[node %s] no token provided. Ignoring response content.", node->toString().c_str()); + if (logger_) + logger_->w(sr->id, "[node %s] no token provided. Ignoring response content.", node->toString().c_str()); network_engine.blacklistNode(node); } @@ -2252,14 +2313,16 @@ net::RequestAnswer Dht::onListen(Sp<Node> node, const InfoHash& hash, const Blob& token, size_t socket_id, const Query& query) { if (not hash) { - DHT_LOG.w(node->id, "[node %s] listen with no info_hash", node->toString().c_str()); + if (logger_) + logger_->w(node->id, "[node %s] listen with no info_hash", node->toString().c_str()); throw net::DhtProtocolException { net::DhtProtocolException::NON_AUTHORITATIVE_INFORMATION, net::DhtProtocolException::LISTEN_NO_INFOHASH }; } if (not tokenMatch(token, node->getAddr())) { - DHT_LOG.w(hash, node->id, "[node %s] incorrect token %s for 'listen'", node->toString().c_str(), hash.toString().c_str()); + if (logger_) + logger_->w(hash, node->id, "[node %s] incorrect token %s for 'listen'", node->toString().c_str(), hash.toString().c_str()); throw net::DhtProtocolException {net::DhtProtocolException::UNAUTHORIZED, net::DhtProtocolException::LISTEN_WRONG_TOKEN}; } Query q = query; @@ -2270,7 +2333,8 @@ Dht::onListen(Sp<Node> node, const InfoHash& hash, const Blob& token, size_t soc void Dht::onListenDone(const Sp<Node>& /* node */, net::RequestAnswer& /* answer */, Sp<Search>& sr) { - // DHT_LOG.d(sr->id, node->id, "[search %s] [node %s] got listen confirmation", + // if (logger_) + // logger_->d(sr->id, node->id, "[search %s] [node %s] got listen confirmation", // sr->id.toString().c_str(), node->toString().c_str(), answer.values.size()); if (not sr->done) { @@ -2289,14 +2353,16 @@ Dht::onAnnounce(Sp<Node> n, { auto& node = *n; if (not hash) { - DHT_LOG.w(node.id, "put with no info_hash"); + if (logger_) + logger_->w(node.id, "put with no info_hash"); throw net::DhtProtocolException { net::DhtProtocolException::NON_AUTHORITATIVE_INFORMATION, net::DhtProtocolException::PUT_NO_INFOHASH }; } if (!tokenMatch(token, node.getAddr())) { - DHT_LOG.w(hash, node.id, "[node %s] incorrect token %s for 'put'", node.toString().c_str(), hash.toString().c_str()); + if (logger_) + logger_->w(hash, node.id, "[node %s] incorrect token %s for 'put'", node.toString().c_str(), hash.toString().c_str()); throw net::DhtProtocolException {net::DhtProtocolException::UNAUTHORIZED, net::DhtProtocolException::PUT_WRONG_TOKEN}; } { @@ -2304,7 +2370,8 @@ Dht::onAnnounce(Sp<Node> n, // SEARCH_NODES nodes around the target id. auto closest_nodes = buckets(node.getFamily()).findClosestNodes(hash, scheduler.time(), SEARCH_NODES); if (closest_nodes.size() >= TARGET_NODES and hash.xorCmp(closest_nodes.back()->id, myid) < 0) { - DHT_LOG.w(hash, node.id, "[node %s] announce too far from the target. Dropping value.", node.toString().c_str()); + if (logger_) + logger_->w(hash, node.id, "[node %s] announce too far from the target. Dropping value.", node.toString().c_str()); return {}; } } @@ -2312,7 +2379,8 @@ Dht::onAnnounce(Sp<Node> n, auto created = std::min(creation_date, scheduler.time()); for (const auto& v : values) { if (v->id == Value::INVALID_ID) { - DHT_LOG.w(hash, node.id, "[value %s] incorrect value id", hash.toString().c_str()); + if (logger_) + logger_->w(hash, node.id, "[value %s] incorrect value id", hash.toString().c_str()); throw net::DhtProtocolException { net::DhtProtocolException::NON_AUTHORITATIVE_INFORMATION, net::DhtProtocolException::PUT_INVALID_ID @@ -2323,15 +2391,18 @@ Dht::onAnnounce(Sp<Node> n, if (lv) { if (*lv == *vc) { storageRefresh(hash, v->id); - DHT_LOG.d(hash, node.id, "[store %s] [node %s] refreshed value %s", hash.toString().c_str(), node.toString().c_str(), std::to_string(v->id).c_str()); + if (logger_) + logger_->d(hash, node.id, "[store %s] [node %s] refreshed value %s", hash.toString().c_str(), node.toString().c_str(), std::to_string(v->id).c_str()); } else { const auto& type = getType(lv->type); if (type.editPolicy(hash, lv, vc, node.id, node.getAddr())) { - DHT_LOG.d(hash, node.id, "[store %s] editing %s", + if (logger_) + logger_->d(hash, node.id, "[store %s] editing %s", hash.toString().c_str(), vc->toString().c_str()); storageStore(hash, vc, created, node.getAddr()); } else { - DHT_LOG.d(hash, node.id, "[store %s] rejecting edition of %s because of storage policy", + if (logger_) + logger_->d(hash, node.id, "[store %s] rejecting edition of %s because of storage policy", hash.toString().c_str(), vc->toString().c_str()); } } @@ -2339,10 +2410,12 @@ Dht::onAnnounce(Sp<Node> n, // Allow the value to be edited by the storage policy const auto& type = getType(vc->type); if (type.storePolicy(hash, vc, node.id, node.getAddr())) { - //DHT_LOG.d(hash, node.id, "[store %s] storing %s", hash.toString().c_str(), std::to_string(vc->id).c_str()); + // if (logger_) + // logger_->d(hash, node.id, "[store %s] storing %s", hash.toString().c_str(), std::to_string(vc->id).c_str()); storageStore(hash, vc, created, node.getAddr()); } else { - DHT_LOG.d(hash, node.id, "[store %s] rejecting storage of %s", + if (logger_) + logger_->d(hash, node.id, "[store %s] rejecting storage of %s", hash.toString().c_str(), vc->toString().c_str()); } } @@ -2356,13 +2429,16 @@ Dht::onRefresh(Sp<Node> node, const InfoHash& hash, const Blob& token, const Val using namespace net; if (not tokenMatch(token, node->getAddr())) { - DHT_LOG.w(hash, node->id, "[node %s] incorrect token %s for 'put'", node->toString().c_str(), hash.toString().c_str()); + if (logger_) + logger_->w(hash, node->id, "[node %s] incorrect token %s for 'put'", node->toString().c_str(), hash.toString().c_str()); throw DhtProtocolException {DhtProtocolException::UNAUTHORIZED, DhtProtocolException::PUT_WRONG_TOKEN}; } if (storageRefresh(hash, vid)) { - DHT_LOG.d(hash, node->id, "[store %s] [node %s] refreshed value %s", hash.toString().c_str(), node->toString().c_str(), std::to_string(vid).c_str()); + if (logger_) + logger_->d(hash, node->id, "[store %s] [node %s] refreshed value %s", hash.toString().c_str(), node->toString().c_str(), std::to_string(vid).c_str()); } else { - DHT_LOG.d(hash, node->id, "[store %s] [node %s] got refresh for unknown value", + if (logger_) + logger_->d(hash, node->id, "[store %s] [node %s] got refresh for unknown value", hash.toString().c_str(), node->toString().c_str()); throw DhtProtocolException {DhtProtocolException::NOT_FOUND, DhtProtocolException::STORAGE_NOT_FOUND}; } @@ -2379,11 +2455,13 @@ Dht::storageRefresh(const InfoHash& id, Value::Id vid) // need to be refreshed auto& st = s->second; if (not st.listeners.empty()) { - DHT_LOG.d(id, "[store %s] %lu remote listeners", id.toString().c_str(), st.listeners.size()); + if (logger_) + logger_->d(id, "[store %s] %lu remote listeners", id.toString().c_str(), st.listeners.size()); std::vector<Value::Id> ids = {vid}; for (const auto& node_listeners : st.listeners) { for (const auto& l : node_listeners.second) { - DHT_LOG.w(id, node_listeners.first->id, "[store %s] [node %s] sending refresh", + if (logger_) + logger_->w(id, node_listeners.first->id, "[store %s] [node %s] sending refresh", id.toString().c_str(), node_listeners.first->toString().c_str()); Blob ntoken = makeToken(node_listeners.first->getAddr(), false); @@ -2403,7 +2481,8 @@ Dht::storageRefresh(const InfoHash& id, Value::Id vid) void Dht::onAnnounceDone(const Sp<Node>& node, net::RequestAnswer& answer, Sp<Search>& sr) { - DHT_LOG.d(sr->id, node->id, "[search %s] [node %s] got reply to put!", + if (logger_) + logger_->d(sr->id, node->id, "[search %s] [node %s] got reply to put!", sr->id.toString().c_str(), node->toString().c_str()); searchSendGetValues(sr); sr->checkAnnounced(answer.vid); @@ -2421,7 +2500,8 @@ Dht::saveState(const std::string& path) const void Dht::loadState(const std::string& path) { - DHT_LOG.d("Importing state from %s", path.c_str()); + if (logger_) + logger_->d("Importing state from %s", path.c_str()); try { // Import nodes from binary file msgpack::unpacker pac; @@ -2442,18 +2522,21 @@ Dht::loadState(const std::string& path) if (pac.next(oh)) { { auto imported_nodes = oh.get().as<std::vector<NodeExport>>(); - DHT_LOG.d("Importing %zu nodes", imported_nodes.size()); + if (logger_) + logger_->d("Importing %zu nodes", imported_nodes.size()); for (const auto& node : imported_nodes) insertNode(node); } if (pac.next(oh)) { auto imported_values = oh.get().as<std::vector<ValuesExport>>(); - DHT_LOG.d("Importing %zu values", imported_values.size()); + if (logger_) + logger_->d("Importing %zu values", imported_values.size()); importValues(imported_values); } } } catch (const std::exception& e) { - DHT_LOG.w("Error importing state from %s: %s", path.c_str(), e.what()); + if (logger_) + logger_->w("Error importing state from %s: %s", path.c_str(), e.what()); } } diff --git a/src/dht_proxy_client.cpp b/src/dht_proxy_client.cpp index 6793ed8a867876ff303d0272a1f0d8efded45f4e..ace9421dd6da56a12c88b5fd23d287635fb68d9c 100644 --- a/src/dht_proxy_client.cpp +++ b/src/dht_proxy_client.cpp @@ -109,12 +109,12 @@ DhtProxyClient::DhtProxyClient( std::shared_ptr<dht::crypto::Certificate> serverCA, dht::crypto::Identity clientIdentity, std::function<void()> signal, const std::string& serverHost, const std::string& pushClientId, std::shared_ptr<dht::Logger> logger) - : proxyUrl_(serverHost) + : DhtInterface(logger) + , proxyUrl_(serverHost) , clientIdentity_(clientIdentity), serverCertificate_(serverCA) , pushClientId_(pushClientId), pushSessionId_(getRandomSessionId()) , loopSignal_(signal) , jsonReader_(Json::CharReaderBuilder{}.newCharReader()) - , logger_(logger) { jsonBuilder_["commentStyle"] = "None"; jsonBuilder_["indentation"] = ""; @@ -461,7 +461,7 @@ DhtProxyClient::buildRequest(const std::string& target) auto resolver = resolver_; if (not resolver) resolver = std::make_shared<http::Resolver>(httpContext_, proxyUrl_, logger_); - auto request = target.empty() + auto request = target.empty() ? std::make_shared<http::Request>(httpContext_, resolver) : std::make_shared<http::Request>(httpContext_, resolver, target); if (serverCertificate_) @@ -609,7 +609,7 @@ DhtProxyClient::queryProxyInfo(std::shared_ptr<InfoState> infoState, sa_family_t try { auto request = std::make_shared<http::Request>(httpContext_, resolver, family); auto reqid = request->id(); - request->set_method(restinio::http_method_get()); + request->set_method(restinio::http_method_get()); setHeaderFields(*request); request->add_on_done_callback([this, reqid, family, infoState] (const http::Response& response){ if (infoState->cancel.load()) diff --git a/src/dhtrunner.cpp b/src/dhtrunner.cpp index 15633679848b130294f926d94a321f7b078c4842..dd6ed1708ad632ae573d14612cf0b3e46e2814ea 100644 --- a/src/dhtrunner.cpp +++ b/src/dhtrunner.cpp @@ -107,7 +107,7 @@ DhtRunner::run(const SockAddr& local4, const SockAddr& local6, const Config& con { if (running == State::Idle) { if (not context.sock) - context.sock.reset(new net::UdpSocket(local4, local6, context.logger ? *context.logger : Logger{})); + context.sock.reset(new net::UdpSocket(local4, local6, context.logger)); run(config, std::move(context)); } } @@ -138,7 +138,7 @@ DhtRunner::run(const Config& config, Context&& context) cv.notify_all(); }); - auto dht = std::unique_ptr<DhtInterface>(new Dht(std::move(context.sock), SecureDht::getConfig(config.dht_config), context.logger ? *context.logger : Logger{})); + auto dht = std::unique_ptr<DhtInterface>(new Dht(std::move(context.sock), SecureDht::getConfig(config.dht_config), context.logger)); dht_ = std::unique_ptr<SecureDht>(new SecureDht(std::move(dht), config.dht_config)); #ifdef OPENDHT_PROXY_CLIENT @@ -146,7 +146,7 @@ DhtRunner::run(const Config& config, Context&& context) #endif enableProxy(not config.proxy_server.empty()); if (context.logger and dht_via_proxy_) { - dht_via_proxy_->setLogger(*context.logger); + dht_via_proxy_->setLogger(context.logger); } if (context.statusChangedCallback) { statusCb = std::move(context.statusChangedCallback); @@ -410,7 +410,7 @@ DhtRunner::exportValues() const { } void -DhtRunner::setLogger(const Logger& logger) { +DhtRunner::setLogger(const Sp<Logger>& logger) { std::lock_guard<std::mutex> lck(dht_mtx); if (dht_) dht_->setLogger(logger); @@ -429,7 +429,6 @@ DhtRunner::setLoggers(LogMethod error, LogMethod warn, LogMethod debug) { void DhtRunner::setLogFilter(const InfoHash& f) { std::lock_guard<std::mutex> lck(dht_mtx); - activeDht()->setLogFilter(f); if (dht_) dht_->setLogFilter(f); #ifdef OPENDHT_PROXY_CLIENT diff --git a/src/log.cpp b/src/log.cpp index 5f4d266f9a06b52d5773f256d71da9979b46467c..5e164275ecb1d034e67cc21062195b3caecc214c 100644 --- a/src/log.cpp +++ b/src/log.cpp @@ -115,22 +115,22 @@ getSyslogLogger(const char* name) { void enableLogging(dht::DhtRunner &dht) { - dht.setLogger(*getStdLogger()); + dht.setLogger(getStdLogger()); } void enableFileLogging(dht::DhtRunner &dht, const std::string &path) { - dht.setLogger(*getFileLogger(path)); + dht.setLogger(getFileLogger(path)); } OPENDHT_PUBLIC void enableSyslog(dht::DhtRunner &dht, const char* name) { - dht.setLogger(*getSyslogLogger(name)); + dht.setLogger(getSyslogLogger(name)); } void disableLogging(dht::DhtRunner &dht) { - dht.setLogger({}); + dht.setLogger(); } } diff --git a/src/network_engine.cpp b/src/network_engine.cpp index d15a3e5ffca2ba424bdee123b3cad089b7777e1c..727923fa61ae08a566ea9331aa97c48afed2584c 100644 --- a/src/network_engine.cpp +++ b/src/network_engine.cpp @@ -98,11 +98,11 @@ RequestAnswer::RequestAnswer(ParsedMessage&& msg) nodes6(std::move(msg.nodes6)) {} -NetworkEngine::NetworkEngine(Logger& log, Scheduler& scheduler, std::unique_ptr<DatagramSocket>&& sock) - : myid(zeroes), dht_socket(std::move(sock)), DHT_LOG(log), rate_limiter((size_t)-1), scheduler(scheduler) +NetworkEngine::NetworkEngine(const Sp<Logger>& log, Scheduler& scheduler, std::unique_ptr<DatagramSocket>&& sock) + : myid(zeroes), dht_socket(std::move(sock)), logger_(log), rate_limiter((size_t)-1), scheduler(scheduler) {} -NetworkEngine::NetworkEngine(InfoHash& myid, NetworkConfig c, std::unique_ptr<DatagramSocket>&& sock, Logger& log, Scheduler& scheduler, +NetworkEngine::NetworkEngine(InfoHash& myid, NetworkConfig c, std::unique_ptr<DatagramSocket>&& sock, const Sp<Logger>& log, Scheduler& scheduler, decltype(NetworkEngine::onError)&& onError, decltype(NetworkEngine::onNewNode)&& onNewNode, decltype(NetworkEngine::onReportedAddr)&& onReportedAddr, @@ -121,7 +121,7 @@ NetworkEngine::NetworkEngine(InfoHash& myid, NetworkConfig c, std::unique_ptr<Da onListen(std::move(onListen)), onAnnounce(std::move(onAnnounce)), onRefresh(std::move(onRefresh)), - myid(myid), config(c), dht_socket(std::move(sock)), DHT_LOG(log), + myid(myid), config(c), dht_socket(std::move(sock)), logger_(log), rate_limiter(config.max_req_per_sec), scheduler(scheduler) {} @@ -140,7 +140,8 @@ NetworkEngine::tellListener(Sp<Node> node, Tid socket_id, const InfoHash& hash, try { sendNodesValues(node->getAddr(), socket_id, nnodes.first, nnodes.second, values, query, ntoken); } catch (const std::overflow_error& e) { - DHT_LOG.e("Can't send value: buffer not large enough !"); + if (logger_) + logger_->e("Can't send value: buffer not large enough !"); } } @@ -160,7 +161,8 @@ NetworkEngine::tellListenerRefreshed(Sp<Node> n, Tid socket_id, const InfoHash&, if (not values.empty()) { pk.pack(KEY_REQ_REFRESHED); pk.pack(values); - DHT_LOG.d(n->id, "[node %s] sending %zu refreshed values", n->toString().c_str(), values.size()); + if (logger_) + logger_->d(n->id, "[node %s] sending %zu refreshed values", n->toString().c_str(), values.size()); } pk.pack(KEY_TID); pk.pack(socket_id); @@ -190,7 +192,8 @@ NetworkEngine::tellListenerExpired(Sp<Node> n, Tid socket_id, const InfoHash&, c if (not values.empty()) { pk.pack(KEY_REQ_EXPIRED); pk.pack(values); - DHT_LOG.d(n->id, "[node %s] sending %zu expired values", n->toString().c_str(), values.size()); + if (logger_) + logger_->d(n->id, "[node %s] sending %zu expired values", n->toString().c_str(), values.size()); } pk.pack(KEY_TID); pk.pack(socket_id); @@ -246,7 +249,8 @@ NetworkEngine::requestStep(Sp<Request> sreq) auto now = scheduler.time(); auto& node = *req.node; if (req.isExpired(now)) { - // DHT_LOG.d(node.id, "[node %s] expired !", node.toString().c_str()); + // if (logger_) + // logger_->d(node.id, "[node %s] expired !", node.toString().c_str()); node.setExpired(); if (not node.id) requests.erase(req.tid); @@ -366,12 +370,14 @@ NetworkEngine::processMessage(const uint8_t *buf, size_t buflen, SockAddr f) { auto from = f.getMappedIPv4(); if (isMartian(from)) { - DHT_LOG.w("Received packet from martian node %s", from.toString().c_str()); + if (logger_) + logger_->w("Received packet from martian node %s", from.toString().c_str()); return; } if (isNodeBlacklisted(from)) { - DHT_LOG.w("Received packet from blacklisted node %s", from.toString().c_str()); + if (logger_) + logger_->w("Received packet from blacklisted node %s", from.toString().c_str()); return; } @@ -380,13 +386,16 @@ NetworkEngine::processMessage(const uint8_t *buf, size_t buflen, SockAddr f) msgpack::unpacked msg_res = msgpack::unpack((const char*)buf, buflen); msg->msgpack_unpack(msg_res.get()); } catch (const std::exception& e) { - DHT_LOG.w("Can't parse message of size %lu: %s", buflen, e.what()); - //DHT_LOG.DBG.logPrintable(buf, buflen); + if (logger_) + logger_->w("Can't parse message of size %lu: %s", buflen, e.what()); + // if (logger_) + // logger_->DBG.logPrintable(buf, buflen); return; } if (msg->network != config.network) { - DHT_LOG.d("Received message from other config.network %u", msg->network); + if (logger_) + logger_->d("Received message from other config.network %u", msg->network); return; } @@ -397,12 +406,14 @@ NetworkEngine::processMessage(const uint8_t *buf, size_t buflen, SockAddr f) auto pmsg_it = partial_messages.find(msg->tid); if (pmsg_it == partial_messages.end()) { if (logIncoming_) - DHT_LOG.d("Can't find partial message"); + if (logger_) + logger_->d("Can't find partial message"); rateLimit(from); return; } if (!pmsg_it->second.from.equals(from)) { - DHT_LOG.d("Received partial message data from unexpected IP address"); + if (logger_) + logger_->d("Received partial message data from unexpected IP address"); rateLimit(from); return; } @@ -421,14 +432,16 @@ NetworkEngine::processMessage(const uint8_t *buf, size_t buflen, SockAddr f) } if (msg->id == myid or not msg->id) { - DHT_LOG.d("Received message from self"); + if (logger_) + logger_->d("Received message from self"); return; } if (msg->type > MessageType::Reply) { /* Rate limit requests. */ if (!rateLimit(from)) { - DHT_LOG.w("Dropping request due to rate limiting"); + if (logger_) + logger_->w("Dropping request due to rate limiting"); return; } } @@ -447,7 +460,8 @@ NetworkEngine::processMessage(const uint8_t *buf, size_t buflen, SockAddr f) scheduler.add(now + RX_MAX_PACKET_TIME, std::bind(&NetworkEngine::maintainRxBuffer, this, k)); scheduler.add(now + RX_TIMEOUT, std::bind(&NetworkEngine::maintainRxBuffer, this, k)); } else - DHT_LOG.e("Partial message with given TID already exists"); + if (logger_) + logger_->e("Partial message with given TID already exists"); } } @@ -481,7 +495,8 @@ NetworkEngine::process(std::unique_ptr<ParsedMessage>&& msg, const SockAddr& fro node->received(now, req); if (not node->isClient()) onNewNode(node, 1); - DHT_LOG.d(node->id, "[node %s] can't find transaction with id %u", node->toString().c_str(), msg->tid); + if (logger_) + logger_->d(node->id, "[node %s] can't find transaction with id %u", node->toString().c_str(), msg->tid); return; } } @@ -493,7 +508,8 @@ NetworkEngine::process(std::unique_ptr<ParsedMessage>&& msg, const SockAddr& fro onReportedAddr(msg->id, msg->addr); if (req and (req->cancelled() or req->expired() or req->completed())) { - DHT_LOG.w(node->id, "[node %s] response to expired, cancelled or completed request", node->toString().c_str()); + if (logger_) + logger_->w(node->id, "[node %s] response to expired, cancelled or completed request", node->toString().c_str()); return; } @@ -509,7 +525,8 @@ NetworkEngine::process(std::unique_ptr<ParsedMessage>&& msg, const SockAddr& fro onError(req, DhtProtocolException {msg->error_code}); } else { if (logIncoming_) - DHT_LOG.w(msg->id, "[node %s %s] received unknown error message %u", + if (logger_) + logger_->w(msg->id, "[node %s %s] received unknown error message %u", msg->id.toString().c_str(), from.toString().c_str(), msg->error_code); } break; @@ -541,12 +558,14 @@ NetworkEngine::process(std::unique_ptr<ParsedMessage>&& msg, const SockAddr& fro case MessageType::Ping: ++in_stats.ping; if (logIncoming_) - DHT_LOG.d(node->id, "[node %s] sending pong", node->toString().c_str()); + if (logger_) + logger_->d(node->id, "[node %s] sending pong", node->toString().c_str()); onPing(node); sendPong(from, msg->tid); break; case MessageType::FindNode: { - //DHT_LOG.d(msg->target, node->id, "[node %s] got 'find' request for %s (%d)", node->toString().c_str(), msg->target.toString().c_str(), msg->want); + // if (logger_) + // logger_->d(msg->target, node->id, "[node %s] got 'find' request for %s (%d)", node->toString().c_str(), msg->target.toString().c_str(), msg->want); ++in_stats.find; RequestAnswer answer = onFindNode(node, msg->target, msg->want); auto nnodes = bufferNodes(from.getFamily(), msg->target, msg->want, answer.nodes4, answer.nodes6); @@ -554,7 +573,8 @@ NetworkEngine::process(std::unique_ptr<ParsedMessage>&& msg, const SockAddr& fro break; } case MessageType::GetValues: { - //DHT_LOG.d(msg->info_hash, node->id, "[node %s] got 'get' request for %s", node->toString().c_str(), msg->info_hash.toString().c_str()); + // if (logger_) + // logger_->d(msg->info_hash, node->id, "[node %s] got 'get' request for %s", node->toString().c_str(), msg->info_hash.toString().c_str()); ++in_stats.get; RequestAnswer answer = onGetValues(node, msg->info_hash, msg->want, msg->query); auto nnodes = bufferNodes(from.getFamily(), msg->info_hash, msg->want, answer.nodes4, answer.nodes6); @@ -562,8 +582,8 @@ NetworkEngine::process(std::unique_ptr<ParsedMessage>&& msg, const SockAddr& fro break; } case MessageType::AnnounceValue: { - if (logIncoming_) - DHT_LOG.d(msg->info_hash, node->id, "[node %s] got 'put' request for %s", node->toString().c_str(), msg->info_hash.toString().c_str()); + if (logIncoming_ and logger_) + logger_->d(msg->info_hash, node->id, "[node %s] got 'put' request for %s", node->toString().c_str(), msg->info_hash.toString().c_str()); ++in_stats.put; onAnnounce(node, msg->info_hash, msg->token, msg->values, msg->created); @@ -576,15 +596,15 @@ NetworkEngine::process(std::unique_ptr<ParsedMessage>&& msg, const SockAddr& fro break; } case MessageType::Refresh: - if (logIncoming_) - DHT_LOG.d(msg->info_hash, node->id, "[node %s] got 'refresh' request for %s", node->toString().c_str(), msg->info_hash.toString().c_str()); + if (logIncoming_ and logger_) + logger_->d(msg->info_hash, node->id, "[node %s] got 'refresh' request for %s", node->toString().c_str(), msg->info_hash.toString().c_str()); onRefresh(node, msg->info_hash, msg->token, msg->value_id); /* Same note as above in MessageType::AnnounceValue applies. */ sendValueAnnounced(from, msg->tid, msg->value_id); break; case MessageType::Listen: { - if (logIncoming_) - DHT_LOG.d(msg->info_hash, node->id, "[node %s] got 'listen' request for %s", node->toString().c_str(), msg->info_hash.toString().c_str()); + if (logIncoming_ and logger_) + logger_->d(msg->info_hash, node->id, "[node %s] got 'listen' request for %s", node->toString().c_str(), msg->info_hash.toString().c_str()); ++in_stats.listen; RequestAnswer answer = onListen(node, msg->info_hash, msg->token, msg->socket_id, std::move(msg->query)); auto nnodes = bufferNodes(from.getFamily(), msg->info_hash, msg->want, answer.nodes4, answer.nodes6); @@ -595,7 +615,8 @@ NetworkEngine::process(std::unique_ptr<ParsedMessage>&& msg, const SockAddr& fro break; } } catch (const std::overflow_error& e) { - DHT_LOG.e("Can't send value: buffer not large enough !"); + if (logger_) + logger_->e("Can't send value: buffer not large enough !"); } catch (DhtProtocolException& e) { sendError(from, msg->tid, e.getCode(), e.getMsg().c_str(), true); } @@ -642,7 +663,8 @@ NetworkEngine::sendPing(Sp<Node> node, RequestCb&& on_done, RequestExpiredCb&& o auto req = std::make_shared<Request>(MessageType::Ping, tid.toInt(), node, Blob(buffer.data(), buffer.data() + buffer.size()), [=](const Request& req_status, ParsedMessage&&) { - DHT_LOG.d(req_status.node->id, "[node %s] got pong !", req_status.node->toString().c_str()); + if (logger_) + logger_->d(req_status.node->id, "[node %s] got pong !", req_status.node->toString().c_str()); if (on_done) { on_done(req_status, {}); } @@ -847,7 +869,8 @@ NetworkEngine::packValueHeader(msgpack::sbuffer& buffer, const std::vector<Sp<Va if (svals.size() < 50 && total_size < MAX_PACKET_VALUE_SIZE) { for (const auto& b : svals) buffer.write((const char*)b.data(), b.size()); - // DHT_LOG.d("sending %lu bytes of values", total_size); + // if (logger_) + // logger_->d("sending %lu bytes of values", total_size); svals.clear(); } else { for (const auto& b : svals) @@ -1011,12 +1034,14 @@ NetworkEngine::sendListen(Sp<Node> n, socket = previous->getSocket(); } else { if (previous) - DHT_LOG.e(hash, "[node %s] trying refresh listen contract with wrong node", previous->node->toString().c_str()); + if (logger_) + logger_->e(hash, "[node %s] trying refresh listen contract with wrong node", previous->node->toString().c_str()); socket = n->openSocket(std::move(socket_cb)); } if (not socket) { - DHT_LOG.e(hash, "[node %s] unable to get a valid socket for listen. Aborting listen", n->toString().c_str()); + if (logger_) + logger_->e(hash, "[node %s] unable to get a valid socket for listen. Aborting listen", n->toString().c_str()); return {}; } TransId sid(socket); @@ -1121,7 +1146,8 @@ NetworkEngine::sendAnnounceValue(Sp<Node> n, Blob(buffer.data(), buffer.data() + buffer.size()), [=](const Request& req_status, ParsedMessage&& msg) { /* on done */ if (msg.value_id == Value::INVALID_ID) { - DHT_LOG.d(infohash, "Unknown search or announce!"); + if (logger_) + logger_->d(infohash, "Unknown search or announce!"); } else { if (on_done) { RequestAnswer answer {}; @@ -1175,7 +1201,8 @@ NetworkEngine::sendRefreshValue(Sp<Node> n, Blob(buffer.data(), buffer.data() + buffer.size()), [=](const Request& req_status, ParsedMessage&& msg) { /* on done */ if (msg.value_id == Value::INVALID_ID) { - DHT_LOG.d(infohash, "Unknown search or announce!"); + if (logger_) + logger_->d(infohash, "Unknown search or announce!"); } else { if (on_done) { RequestAnswer answer {}; @@ -1258,7 +1285,8 @@ NetworkEngine::maintainRxBuffer(Tid tid) const auto& now = scheduler.time(); if (msg->second.start + RX_MAX_PACKET_TIME < now || msg->second.last_part + RX_TIMEOUT < now) { - DHT_LOG.w("Dropping expired partial message from %s", msg->second.from.toString().c_str()); + if (logger_) + logger_->w("Dropping expired partial message from %s", msg->second.from.toString().c_str()); partial_messages.erase(msg); } } diff --git a/src/network_utils.cpp b/src/network_utils.cpp index 66626d0e6002163ea734806c975a135b9064d5ed..b7955e2451988620a801245329e6314c6d6ed3d8 100644 --- a/src/network_utils.cpp +++ b/src/network_utils.cpp @@ -106,7 +106,7 @@ void udpPipe(int fds[2]) } #endif -UdpSocket::UdpSocket(in_port_t port, const Logger& l) : logger(l) { +UdpSocket::UdpSocket(in_port_t port, const std::shared_ptr<Logger>& l) : logger(l) { SockAddr bind4; bind4.setFamily(AF_INET); bind4.setPort(port); @@ -117,7 +117,7 @@ UdpSocket::UdpSocket(in_port_t port, const Logger& l) : logger(l) { openSockets(bind4, bind6); } -UdpSocket::UdpSocket(const SockAddr& bind4, const SockAddr& bind6, const Logger& l) : logger(l) +UdpSocket::UdpSocket(const SockAddr& bind4, const SockAddr& bind6, const std::shared_ptr<Logger>& l) : logger(l) { std::lock_guard<std::mutex> lk(lock); openSockets(bind4, bind6); @@ -155,7 +155,8 @@ UdpSocket::sendTo(const SockAddr& dest, const uint8_t* data, size_t size, bool r if (sendto(s, (const char*)data, size, flags, dest.get(), dest.getLength()) == -1) { int err = errno; - logger.d("Can't send message to %s: %s", dest.toString().c_str(), strerror(err)); + if (logger) + logger->d("Can't send message to %s: %s", dest.toString().c_str(), strerror(err)); if (err == EPIPE || err == ENOTCONN || err == ECONNRESET) { std::lock_guard<std::mutex> lk(lock); auto bind4 = std::move(bound4), bind6 = std::move(bound6); @@ -194,7 +195,8 @@ UdpSocket::openSockets(const SockAddr& bind4, const SockAddr& bind6) try { s4 = bindSocket(bind4, bound4); } catch (const DhtException& e) { - logger.e("Can't bind inet socket: %s", e.what()); + if (logger) + logger->e("Can't bind inet socket: %s", e.what()); } } @@ -209,7 +211,8 @@ UdpSocket::openSockets(const SockAddr& bind4, const SockAddr& bind6) try { s6 = bindSocket(b6, bound6); } catch (const DhtException& e) { - logger.e("Can't bind inet6 socket: %s", e.what()); + if (logger) + logger->e("Can't bind inet6 socket: %s", e.what()); } } } @@ -217,7 +220,8 @@ UdpSocket::openSockets(const SockAddr& bind4, const SockAddr& bind6) try { s6 = bindSocket(bind6, bound6); } catch (const DhtException& e) { - logger.e("Can't bind inet6 socket: %s", e.what()); + if (logger) + logger->e("Can't bind inet6 socket: %s", e.what()); } } } @@ -244,7 +248,8 @@ UdpSocket::openSockets(const SockAddr& bind4, const SockAddr& bind6) int rc = select(selectFd, &readfds, nullptr, nullptr, nullptr); if (rc < 0) { if (errno != EINTR) { - logger.e("Select error: %s", strerror(errno)); + if (logger) + logger->e("Select error: %s", strerror(errno)); std::this_thread::sleep_for(std::chrono::seconds(1)); } } @@ -259,7 +264,8 @@ UdpSocket::openSockets(const SockAddr& bind4, const SockAddr& bind6) if (FD_ISSET(stop_readfd, &readfds)) { if (recv(stop_readfd, (char*)buf.data(), buf.size(), 0) < 0) { - logger.e("Got stop packet error: %s", strerror(errno)); + if (logger) + logger->e("Got stop packet error: %s", strerror(errno)); break; } } @@ -277,7 +283,8 @@ UdpSocket::openSockets(const SockAddr& bind4, const SockAddr& bind6) pkt->received = clock::now(); onReceived(std::move(pkt)); } else if (rc == -1) { - logger.e("Error receiving packet: %s", strerror(errno)); + if (logger) + logger->e("Error receiving packet: %s", strerror(errno)); int err = errno; if (err == EPIPE || err == ENOTCONN || err == ECONNRESET) { std::lock_guard<std::mutex> lk(lock); @@ -286,7 +293,8 @@ UdpSocket::openSockets(const SockAddr& bind4, const SockAddr& bind6) try { ls4 = bindSocket(bound4, bound4); } catch (const DhtException& e) { - logger.e("Can't bind inet socket: %s", e.what()); + if (logger) + logger->e("Can't bind inet socket: %s", e.what()); } } if (ls6 >= 0) { @@ -294,7 +302,8 @@ UdpSocket::openSockets(const SockAddr& bind4, const SockAddr& bind6) try { ls6 = bindSocket(bound6, bound6); } catch (const DhtException& e) { - logger.e("Can't bind inet6 socket: %s", e.what()); + if (logger) + logger->e("Can't bind inet6 socket: %s", e.what()); } } if (ls4 < 0 && ls6 < 0) @@ -307,7 +316,8 @@ UdpSocket::openSockets(const SockAddr& bind4, const SockAddr& bind6) } } } catch (const std::exception& e) { - logger.e("Error in UdpSocket rx thread: %s", e.what()); + if (logger) + logger->e("Error in UdpSocket rx thread: %s", e.what()); } if (ls4 >= 0) close(ls4); @@ -332,7 +342,8 @@ UdpSocket::stop() if (running.exchange(false)) { auto sfd = stopfd; if (sfd != -1 && write(sfd, "\0", 1) == -1) { - logger.e("Can't write to stop fd"); + if (logger) + logger->e("Can't write to stop fd"); } } } diff --git a/src/securedht.cpp b/src/securedht.cpp index 18dd951e0982d60d576f306887e7c1d75ac74a07..a6bf1bacfbcab45811139af4a0fd6c1e473862f8 100644 --- a/src/securedht.cpp +++ b/src/securedht.cpp @@ -59,7 +59,8 @@ SecureDht::SecureDht(std::unique_ptr<DhtInterface> dht, SecureDht::Config conf) 1 }, [this, certId](bool ok) { if (ok) - DHT_LOG.d(certId, "SecureDht: public key announced successfully"); + if (logger_) + logger_->d(certId, "SecureDht: public key announced successfully"); }, {}, true); } } @@ -76,7 +77,8 @@ SecureDht::secureType(ValueType&& type) v->signatureValid = v->owner and v->owner->checkSignature(v->getToSign(), v->signature); } if (!v->signatureValid) { - DHT_LOG.w("Signature verification failed"); + if (logger_) + logger_->w("Signature verification failed"); return false; } } @@ -86,7 +88,8 @@ SecureDht::secureType(ValueType&& type) if (!o->isSigned()) return type.editPolicy(id, o, n, nid, a); if (o->owner != n->owner) { - DHT_LOG.w("Edition forbidden: owner changed."); + if (logger_) + logger_->w("Edition forbidden: owner changed."); return false; } if (!n->signatureChecked) { @@ -94,14 +97,16 @@ SecureDht::secureType(ValueType&& type) n->signatureValid = o->owner and o->owner->checkSignature(n->getToSign(), n->signature); } if (!n->signatureValid) { - DHT_LOG.w("Edition forbidden: signature verification failed."); + if (logger_) + logger_->w("Edition forbidden: signature verification failed."); return false; } if (o->seq == n->seq) { // If the data is exactly the same, // it can be reannounced, possibly by someone else. if (o->getToSign() != n->getToSign()) { - DHT_LOG.w("Edition forbidden: sequence number must be increasing."); + if (logger_) + logger_->w("Edition forbidden: sequence number must be increasing."); return false; } } @@ -147,7 +152,8 @@ SecureDht::registerCertificate(const InfoHash& node, const Blob& data) } InfoHash h = crt->getPublicKey().getId(); if (node == h) { - DHT_LOG.d("Registering certificate for %s", h.toString().c_str()); + if (logger_) + logger_->d("Registering certificate for %s", h.toString().c_str()); auto it = nodesCertificates_.find(h); if (it == nodesCertificates_.end()) std::tie(it, std::ignore) = nodesCertificates_.emplace(h, std::move(crt)); @@ -155,7 +161,8 @@ SecureDht::registerCertificate(const InfoHash& node, const Blob& data) it->second = std::move(crt); return it->second; } else { - DHT_LOG.w("Certificate %s for node %s does not match node id !", h.toString().c_str(), node.toString().c_str()); + if (logger_) + logger_->w("Certificate %s for node %s does not match node id !", h.toString().c_str(), node.toString().c_str()); return nullptr; } } @@ -172,7 +179,8 @@ SecureDht::findCertificate(const InfoHash& node, const std::function<void(const { Sp<crypto::Certificate> b = getCertificate(node); if (b && *b) { - DHT_LOG.d("Using certificate from cache for %s", node.toString().c_str()); + if (logger_) + logger_->d("Using certificate from cache for %s", node.toString().c_str()); if (cb) cb(b); return; @@ -180,7 +188,8 @@ SecureDht::findCertificate(const InfoHash& node, const std::function<void(const if (localQueryMethod_) { auto res = localQueryMethod_(node); if (not res.empty()) { - DHT_LOG.d("Registering certificate from local store for %s", node.toString().c_str()); + if (logger_) + logger_->d("Registering certificate from local store for %s", node.toString().c_str()); nodesCertificates_.emplace(node, res.front()); if (cb) cb(res.front()); @@ -195,7 +204,8 @@ SecureDht::findCertificate(const InfoHash& node, const std::function<void(const for (const auto& v : vals) { if (auto cert = registerCertificate(node, v->data)) { *found = true; - DHT_LOG.d("Found certificate for %s", node.toString().c_str()); + if (logger_) + logger_->d("Found certificate for %s", node.toString().c_str()); if (cb) cb(cert); return false; @@ -213,7 +223,8 @@ SecureDht::findPublicKey(const InfoHash& node, const std::function<void(const Sp { auto pk = getPublicKey(node); if (pk && *pk) { - DHT_LOG.d("Found public key from cache for %s", node.toString().c_str()); + if (logger_) + logger_->d("Found public key from cache for %s", node.toString().c_str()); if (cb) cb(pk); return; @@ -257,7 +268,8 @@ SecureDht::checkValue(const Sp<Value>& v) } // Ignore values belonging to other people } catch (const std::exception& e) { - DHT_LOG.w("Could not decrypt value %s : %s", v->toString().c_str(), e.what()); + if (logger_) + logger_->w("Could not decrypt value %s : %s", v->toString().c_str(), e.what()); } } // Check signed values @@ -271,8 +283,8 @@ SecureDht::checkValue(const Sp<Value>& v) nodesPubKeys_[v->owner->getId()] = v->owner; return v; } - else - DHT_LOG.w("Signature verification failed for %s", v->toString().c_str()); + else if (logger_) + logger_->w("Signature verification failed for %s", v->toString().c_str()); } // Forward normal values else { @@ -355,13 +367,16 @@ SecureDht::putSigned(const InfoHash& hash, Sp<Value> val, DoneCallback callback, // Check if data already exists on the dht get(hash, [val,this] (const std::vector<Sp<Value>>& vals) { - DHT_LOG.d("Found online previous value being announced."); + if (logger_) + logger_->d("Found online previous value being announced."); for (const auto& v : vals) { - if (!v->isSigned()) - DHT_LOG.e("Existing non-signed value seems to exists at this location."); - else if (not v->owner or v->owner->getId() != getId()) - DHT_LOG.e("Existing signed value belonging to someone else seems to exists at this location."); - else if (val->seq <= v->seq) + if (!v->isSigned()) { + if (logger_) + logger_->e("Existing non-signed value seems to exists at this location."); + } else if (not v->owner or v->owner->getId() != getId()) { + if (logger_) + logger_->e("Existing signed value belonging to someone else seems to exists at this location."); + } else if (val->seq <= v->seq) val->seq = v->seq + 1; } return true; @@ -384,11 +399,13 @@ SecureDht::putEncrypted(const InfoHash& hash, const InfoHash& to, Sp<Value> val, callback(false, {}); return; } - DHT_LOG.w("Encrypting data for PK: %s", pk->getId().toString().c_str()); + if (logger_) + logger_->w("Encrypting data for PK: %s", pk->getId().toString().c_str()); try { dht_->put(hash, encrypt(*val, *pk), callback, time_point::max(), permanent); } catch (const std::exception& e) { - DHT_LOG.e("Error putting encrypted data: %s", e.what()); + if (logger_) + logger_->e("Error putting encrypted data: %s", e.what()); if (callback) callback(false, {}); }