diff --git a/CMakeLists.txt b/CMakeLists.txt index a2708d7223eb2cae49aaf11fc09867ca04c628e3..9ef2d4afa27cfcf378840ee3a5f4b64323616b42 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -87,6 +87,7 @@ list (APPEND opendht_HEADERS include/opendht/rate_limiter.h include/opendht/securedht.h include/opendht/log.h + include/opendht/log_enable.h include/opendht.h include/opendht/indexation/pht.h ) diff --git a/include/opendht/dht.h b/include/opendht/dht.h index 60fcff277ca549408e4f7c3411084ebb171feb0b..c5ba9c0fa9fea65961676c076cd5dc449a384920 100644 --- a/include/opendht/dht.h +++ b/include/opendht/dht.h @@ -26,6 +26,7 @@ #include "scheduler.h" #include "routing_table.h" #include "callbacks.h" +#include "log_enable.h" #include <string> #include <array> @@ -99,6 +100,13 @@ public: */ void setLoggers(LogMethod error = NOLOG, LogMethod warn = NOLOG, LogMethod debug = NOLOG); + /** + * Only print logs related to the given InfoHash (if given), or disable filter (if zeroes). + */ + void setLogFilter(const InfoHash& f) { + DHT_LOG.setFilter(f); + } + virtual void registerType(const ValueType& type) { types[type.id] = type; } @@ -290,6 +298,8 @@ public: protected: Logger DHT_LOG; + bool logFilerEnable_ {}; + InfoHash logFiler_ {}; private: diff --git a/include/opendht/dhtrunner.h b/include/opendht/dhtrunner.h index b8e0572f380f48716afcc5649e9aae01ced7a6c3..c0fea5905caf99ac22f09bb997ddfd5f543d6d08 100644 --- a/include/opendht/dhtrunner.h +++ b/include/opendht/dhtrunner.h @@ -25,6 +25,7 @@ #include "value.h" #include "callbacks.h" #include "sockaddr.h" +#include "log_enable.h" #include <thread> #include <mutex> @@ -261,6 +262,11 @@ public: void setLoggers(LogMethod err = NOLOG, LogMethod warn = NOLOG, LogMethod debug = NOLOG); + /** + * Only print logs related to the given InfoHash (if given), or disable filter (if zeroes). + */ + void setLogFilter(const InfoHash& f = {}); + void registerType(const ValueType& type); void importValues(const std::vector<ValuesExport>& values); diff --git a/include/opendht/log_enable.h b/include/opendht/log_enable.h new file mode 100644 index 0000000000000000000000000000000000000000..5c89c516a27ddd52f795215c13636eb78050e7b1 --- /dev/null +++ b/include/opendht/log_enable.h @@ -0,0 +1,175 @@ +/* + * Copyright (C) 2016 Savoir-faire Linux Inc. + * Author : Adrien Béraud <adrien.beraud@savoirfairelinux.com> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#pragma once + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include "infohash.h" + +#ifndef OPENDHT_LOG +#define OPENDHT_LOG true +#endif + +namespace dht { + +// Logging related utility functions + +/** + * Dummy function used to disable logging + */ +inline void NOLOG(char const*, va_list) {} + +/** + * Wrapper for logging methods + */ +struct LogMethod { + LogMethod() = default; + + template<typename T> + LogMethod(T&& t) : func(std::forward<T>(t)) {} + + void operator()(char const* format, ...) const { + va_list args; + va_start(args, format); + func(format, args); + va_end(args); + } + void log(char const* format, va_list args) const { + func(format, args); + } + explicit operator bool() const { + return (bool)func; + } + + void logPrintable(const uint8_t *buf, size_t buflen) const { + std::string buf_clean(buflen, '\0'); + for (size_t i=0; i<buflen; i++) + buf_clean[i] = isprint(buf[i]) ? buf[i] : '.'; + (*this)("%s", buf_clean.c_str()); + } +private: + std::function<void(char const*, va_list)> func; +}; + +struct Logger { + LogMethod DEBUG = NOLOG; + LogMethod WARN = NOLOG; + LogMethod ERR = NOLOG; + void setFilter(const InfoHash& f) { + filter_ = f; + filterEnable_ = filter_ != InfoHash{}; + } + inline void log0(const LogMethod& logger, char const* format, va_list args) const { +#if OPENDHT_LOG + if (logger and not filterEnable_) + logger.log(format, args); +#endif + } + inline void log1(const LogMethod& logger, const InfoHash& f, char const* format, va_list args) const { +#if OPENDHT_LOG + if (logger and (not filterEnable_ or f == filter_)) + logger.log(format, args); +#endif + } + inline void log2(const LogMethod& logger, const InfoHash& f1, const InfoHash& f2, char const* format, va_list args) const { +#if OPENDHT_LOG + if (logger and (not filterEnable_ or f1 == filter_ or f2 == filter_)) + logger.log(format, args); +#endif + } + inline void d(char const* format, ...) const { +#if OPENDHT_LOG + va_list args; + va_start(args, format); + log0(DEBUG, format, args); + va_end(args); +#endif + } + inline void d(const InfoHash& f, char const* format, ...) const { +#if OPENDHT_LOG + va_list args; + va_start(args, format); + log1(DEBUG, f, format, args); + va_end(args); +#endif + } + inline void d(const InfoHash& f1, const InfoHash& f2, char const* format, ...) const { +#if OPENDHT_LOG + va_list args; + va_start(args, format); + log2(DEBUG, f1, f2, format, args); + va_end(args); +#endif + } + inline void w(char const* format, ...) const { +#if OPENDHT_LOG + va_list args; + va_start(args, format); + log0(WARN, format, args); + va_end(args); +#endif + } + inline void w(const InfoHash& f, char const* format, ...) const { +#if OPENDHT_LOG + va_list args; + va_start(args, format); + log1(WARN, f, format, args); + va_end(args); +#endif + } + inline void w(const InfoHash& f1, const InfoHash& f2, char const* format, ...) const { +#if OPENDHT_LOG + va_list args; + va_start(args, format); + log2(WARN, f1, f2, format, args); + va_end(args); +#endif + } + inline void e(char const* format, ...) const { +#if OPENDHT_LOG + va_list args; + va_start(args, format); + log0(ERR, format, args); + va_end(args); +#endif + } + inline void e(const InfoHash& f, char const* format, ...) const { +#if OPENDHT_LOG + va_list args; + va_start(args, format); + log1(ERR, f, format, args); + va_end(args); +#endif + } + inline void e(const InfoHash& f1, const InfoHash& f2, char const* format, ...) const { +#if OPENDHT_LOG + va_list args; + va_start(args, format); + log2(ERR, f1, f2, format, args); + va_end(args); +#endif + } +private: + bool filterEnable_ {false}; + InfoHash filter_ {}; +}; + +} diff --git a/include/opendht/scheduler.h b/include/opendht/scheduler.h index 8f5ca6ff2d4f388c3eee4a4f1007e213980a8251..43e6f82ded0b9476010f99d6d92e0018b3e89464 100644 --- a/include/opendht/scheduler.h +++ b/include/opendht/scheduler.h @@ -22,6 +22,7 @@ #pragma once #include "utils.h" +#include "log_enable.h" #include <functional> #include <map> diff --git a/include/opendht/utils.h b/include/opendht/utils.h index 8d5e1c7cb92a71f35cef87e8d5d548fc9f0e9c4e..183202ac3d61ae34e36fdb65c16d69c8aa43602a 100644 --- a/include/opendht/utils.h +++ b/include/opendht/utils.h @@ -89,45 +89,6 @@ public: } }; -// Logging related utility functions - -/** - * Dummy function used to disable logging - */ -inline void NOLOG(char const*, va_list) {} - -/** - * Wrapper for logging methods - */ -struct LogMethod { - LogMethod() = default; - - template<typename T> - LogMethod(T&& t) : func(std::forward<T>(t)) {} - - void operator()(char const* format, ...) const { - va_list args; - va_start(args, format); - func(format, args); - va_end(args); - } - - void logPrintable(const uint8_t *buf, size_t buflen) const { - std::string buf_clean(buflen, '\0'); - for (size_t i=0; i<buflen; i++) - buf_clean[i] = isprint(buf[i]) ? buf[i] : '.'; - (*this)("%s", buf_clean.c_str()); - } -private: - std::function<void(char const*, va_list)> func; -}; - -struct Logger { - LogMethod DEBUG = NOLOG; - LogMethod WARN = NOLOG; - LogMethod ERR = NOLOG; -}; - // Serialization related definitions and utility functions using Blob = std::vector<uint8_t>; diff --git a/src/Makefile.am b/src/Makefile.am index 2dde1aa9a10a13902ad501e1d4cf4c0358c593c1..6a94e5a6fc1ddeae01377a7e132aa23f3bd12fe2 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -46,6 +46,7 @@ nobase_include_HEADERS = \ ../include/opendht/dhtrunner.h \ ../include/opendht/default_types.h \ ../include/opendht/log.h \ + ../include/opendht/log_enable.h \ ../include/opendht/rng.h \ ../include/opendht/indexation/pht.h diff --git a/src/dht.cpp b/src/dht.cpp index 09bc3eef7ce58ab166c645d7e2d0305fa68471cd..caf19312e7f2265f4f5d7260b20cb4189e7ece9a 100644 --- a/src/dht.cpp +++ b/src/dht.cpp @@ -21,7 +21,6 @@ #include "dht.h" #include "rng.h" #include "request.h" -#include "log_enable.h" #include <msgpack.hpp> extern "C" { @@ -715,9 +714,9 @@ struct Dht::Search { void Dht::setLoggers(LogMethod error, LogMethod warn, LogMethod debug) { - DHT_LOG_DEBUG = debug; - DHT_LOG_WARN = warn; - DHT_LOG_ERR = error; + DHT_LOG.DEBUG = debug; + DHT_LOG.WARN = warn; + DHT_LOG.ERR = error; } NodeStatus @@ -743,7 +742,7 @@ Dht::shutdown(ShutdownCallback cb) { auto remaining = std::make_shared<int>(0); auto str_donecb = [=](bool, const std::vector<std::shared_ptr<Node>>&) { --*remaining; - DHT_LOG_WARN("Shuting down node: %u ops remaining.", *remaining); + DHT_LOG.w("shuting down node: %u ops remaining", *remaining); if (!*remaining && cb) { cb(); } }; @@ -752,7 +751,7 @@ Dht::shutdown(ShutdownCallback cb) { } if (!*remaining) { - DHT_LOG_WARN("Shuting down node: %u ops remaining.", *remaining); + DHT_LOG.w("shuting down node: %u ops remaining", *remaining); if (cb) cb(); } @@ -792,7 +791,7 @@ Dht::sendCachedPing(Bucket& b) if (!b.cached) return 0; - DHT_LOG_DEBUG("[node %s] Sending ping to cached node.", b.cached->toString().c_str()); + DHT_LOG.d(b.cached->id, "[node %s] sending ping to cached node", b.cached->toString().c_str()); network_engine.sendPing(b.cached, nullptr, nullptr); b.cached = {}; return 0; @@ -893,7 +892,7 @@ Dht::onNewNode(const std::shared_ptr<Node>& node, int confirm) if (not n->isGood(now)) { dubious = true; if (not n->isPendingMessage()) { - DHT_LOG_DEBUG("[node %s] Sending ping to dubious node.", n->toString().c_str()); + DHT_LOG.d(n->id, "[node %s] sending ping to dubious node", n->toString().c_str()); network_engine.sendPing(n, nullptr, nullptr); break; } @@ -901,7 +900,7 @@ Dht::onNewNode(const std::shared_ptr<Node>& node, int confirm) } if ((mybucket || (is_bootstrap and list.depth(b) < 6)) && (!dubious || list.size() == 1)) { - DHT_LOG_DEBUG("Splitting from depth %u", list.depth(b)); + DHT_LOG.d("Splitting from depth %u", list.depth(b)); sendCachedPing(*b); list.split(b); onNewNode(node, 0); @@ -1046,7 +1045,7 @@ Dht::expireSearches() auto& sr = *srp.second; auto b = sr.callbacks.empty() && sr.announce.empty() && sr.listeners.empty() && sr.step_time < t; if (b) { - DHT_LOG_DEBUG("Removing search %s", srp.first.toString().c_str()); + DHT_LOG.d(srp.first, "[search %s] removing search", srp.first.toString().c_str()); sr.clear(); return b; } else { return false; } @@ -1097,59 +1096,56 @@ Dht::searchNodeGetExpired(const Request& status, } void Dht::paginate(std::weak_ptr<Search> ws, std::shared_ptr<Query> query, SearchNode* n) { - if (auto sr = ws.lock()) { - auto select_q = std::make_shared<Query>(Select {}.field(Value::Field::Id), query ? query->where : Where {}); - auto onSelectDone = - [this,ws,query](const Request& status, NetworkEngine::RequestAnswer&& answer) mutable - { - if (auto sr = ws.lock()) { - if (auto sn = sr->getNode(status.node)) { - if (answer.fields.empty()) { - searchNodeGetDone(status, std::move(answer), ws, query); - return; - } else { - for (const auto& fvi : answer.fields) { - try { - auto vid = fvi->index.at(Value::Field::Id).getInt(); - if (vid == Value::INVALID_ID) continue; - auto query_for_vid = std::make_shared<Query>(Select {}, Where {}.id(vid)); - sn->pagination_queries[query].push_back(query_for_vid); - DHT_LOG_WARN("[search %s IPv%c] [node %s] sending %s", - sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6', - sn->node->toString().c_str(), query_for_vid->toString().c_str()); - sn->getStatus[query_for_vid] = network_engine.sendGetValues(status.node, - sr->id, - *query_for_vid, - -1, - std::bind(&Dht::searchNodeGetDone, this, _1, _2, ws, query), - std::bind(&Dht::searchNodeGetExpired, this, _1, _2, ws, query_for_vid) - ); - } catch (std::out_of_range&) { - DHT_LOG_ERR("[search %s IPv%c] [node %s] received non-id field in response to "\ - "'SELECT id' request...", - sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6', - sn->node->toString().c_str()); - } + auto sr = ws.lock(); + if (not sr) return; + auto select_q = std::make_shared<Query>(Select {}.field(Value::Field::Id), query ? query->where : Where {}); + auto onSelectDone = + [this,ws,query](const Request& status, NetworkEngine::RequestAnswer&& answer) mutable + { + if (auto sr = ws.lock()) { + if (auto sn = sr->getNode(status.node)) { + if (answer.fields.empty()) { + searchNodeGetDone(status, std::move(answer), ws, query); + return; + } else { + for (const auto& fvi : answer.fields) { + try { + auto vid = fvi->index.at(Value::Field::Id).getInt(); + if (vid == Value::INVALID_ID) continue; + auto query_for_vid = std::make_shared<Query>(Select {}, Where {}.id(vid)); + sn->pagination_queries[query].push_back(query_for_vid); + DHT_LOG.w(sr->id, sn->node->id, "[search %s] [node %s] sending %s", + sr->id.toString().c_str(), sn->node->toString().c_str(), query_for_vid->toString().c_str()); + sn->getStatus[query_for_vid] = network_engine.sendGetValues(status.node, + sr->id, + *query_for_vid, + -1, + std::bind(&Dht::searchNodeGetDone, this, _1, _2, ws, query), + std::bind(&Dht::searchNodeGetExpired, this, _1, _2, ws, query_for_vid) + ); + } catch (std::out_of_range&) { + DHT_LOG.e(sr->id, sn->node->id, "[search %s] [node %s] received non-id field in response to "\ + "'SELECT id' request...", + sr->id.toString().c_str(), sn->node->toString().c_str()); } } - } + } - }; - /* add pagination query key for tracking ongoing requests. */ - n->pagination_queries[query].push_back(select_q); + } + }; + /* add pagination query key for tracking ongoing requests. */ + n->pagination_queries[query].push_back(select_q); - DHT_LOG_WARN("[search %s IPv%c] [node %s] sending %s", - sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6', - n->node->toString().c_str(), select_q->toString().c_str()); - n->getStatus[select_q] = network_engine.sendGetValues(n->node, - sr->id, - *select_q, - -1, - onSelectDone, - std::bind(&Dht::searchNodeGetExpired, this, _1, _2, ws, select_q) - ); - } + DHT_LOG.w(sr->id, n->node->id, "[search %s] [node %s] sending %s", + sr->id.toString().c_str(), n->node->toString().c_str(), select_q->toString().c_str()); + n->getStatus[select_q] = network_engine.sendGetValues(n->node, + sr->id, + *select_q, + -1, + onSelectDone, + std::bind(&Dht::searchNodeGetExpired, this, _1, _2, ws, select_q) + ); } Dht::SearchNode* @@ -1184,7 +1180,7 @@ Dht::searchSendGetValues(std::shared_ptr<Search> sr, SearchNode* pn, bool update if (not n) return nullptr; - DHT_LOG_WARN("[search %s] [node %s] sending 'find_node'", + DHT_LOG.w(sr->id, n->node->id, "[search %s] [node %s] sending 'find_node'", sr->id.toString().c_str(), n->node->toString().c_str()); n->getStatus[query] = network_engine.sendFindNode(n->node, sr->id, @@ -1198,7 +1194,7 @@ Dht::searchSendGetValues(std::shared_ptr<Search> sr, SearchNode* pn, bool update if (query and not query->select.getSelection().empty()) { /* The request contains a select. No need to paginate... */ - DHT_LOG_WARN("[search %s] [node %s] sending 'get'", + DHT_LOG.w(sr->id, n->node->id, "[search %s] [node %s] sending 'get'", sr->id.toString().c_str(), n->node->toString().c_str()); n->getStatus[query] = network_engine.sendGetValues(n->node, sr->id, @@ -1272,7 +1268,7 @@ void Dht::searchSendAnnounceValue(const std::shared_ptr<Search>& sr) { auto next_refresh_time = now + getType(a.value->type).expiration; /* only put the value if the node doesn't already have it */ if (not hasValue or seq_no < a.value->seq) { - DHT_LOG_WARN("[search %s] [node %s] sending 'put' (vid: %d)", + DHT_LOG.w(sr->id, sn->node->id, "[search %s] [node %s] sending 'put' (vid: %d)", sr->id.toString().c_str(), sn->node->toString().c_str(), a.value->id); sn->acked[a.value->id] = std::make_pair(network_engine.sendAnnounceValue(sn->node, sr->id, @@ -1282,7 +1278,7 @@ void Dht::searchSendAnnounceValue(const std::shared_ptr<Search>& sr) { onDone, onExpired), next_refresh_time); } else if (hasValue and a.permanent) { - DHT_LOG_WARN("[search %s] [node %s] sending 'refresh' (vid: %d)", + DHT_LOG.w(sr->id, sn->node->id, "[search %s] [node %s] sending 'refresh' (vid: %d)", sr->id.toString().c_str(), sn->node->toString().c_str(), a.value->id); sn->acked[a.value->id] = std::make_pair(network_engine.sendRefreshValue(sn->node, sr->id, @@ -1291,7 +1287,7 @@ void Dht::searchSendAnnounceValue(const std::shared_ptr<Search>& sr) { onDone, onExpired), next_refresh_time); } else { - DHT_LOG.WARN("[search %s] [node %s] already has value (vid: %d). Aborting.", + DHT_LOG.w(sr->id, sn->node->id, "[search %s] [node %s] already has value (vid: %d). Aborting.", sr->id.toString().c_str(), sn->node->toString().c_str(), a.value->id); auto ack_req = std::make_shared<Request>(); ack_req->reply_time = now; @@ -1309,9 +1305,8 @@ void Dht::searchSendAnnounceValue(const std::shared_ptr<Search>& sr) { } } }; - DHT_LOG_WARN("[search %s IPv%c] [node %s] sending %s", - sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6', - n.node->toString().c_str(), probe_query->toString().c_str()); + DHT_LOG.w(sr->id, n.node->id, "[search %s] [node %s] sending %s", + sr->id.toString().c_str(), n.node->toString().c_str(), probe_query->toString().c_str()); n.probe_query = probe_query; n.getStatus[probe_query] = network_engine.sendGetValues(n.node, sr->id, @@ -1333,7 +1328,7 @@ Dht::searchStep(std::shared_ptr<Search> sr) const auto& now = scheduler.time(); if (auto req_count = sr->currentlySolicitedNodeCount()) - DHT_LOG_DEBUG("[search %s IPv%c] step (%d requests)", + DHT_LOG.d(sr->id, "[search %s IPv%c] step (%d requests)", sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6', req_count); sr->step_time = now; @@ -1375,7 +1370,7 @@ Dht::searchStep(std::shared_ptr<Search> sr) for (const auto& l : sr->listeners) { const auto& query = l.second.query; if (n.getListenTime(query) <= now) { - DHT_LOG_WARN("[search %s] [node %s] sending 'listen'", + DHT_LOG.w(sr->id, n.node->id, "[search %s] [node %s] sending 'listen'", sr->id.toString().c_str(), n.node->toString().c_str()); const auto& r = n.listenStatus.find(query); @@ -1432,7 +1427,7 @@ Dht::searchStep(std::shared_ptr<Search> sr) if (sr->getNumberOfConsecutiveBadNodes() >= std::min(sr->nodes.size(), static_cast<size_t>(SEARCH_MAX_BAD_NODES))) { - DHT_LOG_WARN("[search %s IPv%c] expired", sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6'); + DHT_LOG.w(sr->id, "[search %s IPv%c] expired", sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6'); sr->expire(); connectivityChanged(sr->af); } @@ -1631,7 +1626,7 @@ unsigned Dht::refill(Dht::Search& sr) { auto cached_nodes = network_engine.getCachedNodes(sr.id, sr.af, SEARCH_NODES); if (cached_nodes.empty()) { - DHT_LOG_ERR("[search %s IPv%c] no nodes from cache while refilling search", + DHT_LOG.e(sr.id, "[search %s IPv%c] no nodes from cache while refilling search", sr.id.toString().c_str(), (sr.af == AF_INET) ? '4' : '6'); return 0; } @@ -1642,7 +1637,7 @@ unsigned Dht::refill(Dht::Search& sr) { if (sr.insertNode(i, now)) ++inserted; } - DHT_LOG_DEBUG("[search %s IPv%c] refilled search with %u nodes from node cache", + DHT_LOG.d(sr.id, "[search %s IPv%c] refilled search with %u nodes from node cache", sr.id.toString().c_str(), (sr.af == AF_INET) ? '4' : '6', inserted); sr.refill_time = now; return inserted; @@ -1654,7 +1649,7 @@ std::shared_ptr<Dht::Search> Dht::search(const InfoHash& id, sa_family_t af, GetCallback gcb, QueryCallback qcb, DoneCallback dcb, Value::Filter f, Query q) { if (!isRunning(af)) { - DHT_LOG_ERR("[search %s IPv%c] unsupported protocol", id.toString().c_str(), (af == AF_INET) ? '4' : '6'); + DHT_LOG.e(id, "[search %s IPv%c] unsupported protocol", id.toString().c_str(), (af == AF_INET) ? '4' : '6'); if (dcb) dcb(false, {}); return {}; @@ -1691,7 +1686,7 @@ Dht::search(const InfoHash& id, sa_family_t af, GetCallback gcb, QueryCallback q sr->expired = false; sr->nodes.clear(); sr->nodes.reserve(SEARCH_NODES+1); - DHT_LOG_WARN("[search %s IPv%c] new search", id.toString().c_str(), (af == AF_INET) ? '4' : '6'); + DHT_LOG.w(id, "[search %s IPv%c] new search", id.toString().c_str(), (af == AF_INET) ? '4' : '6'); if (search_id == 0) search_id++; } @@ -1791,7 +1786,7 @@ Dht::listenTo(const InfoHash& id, sa_family_t af, GetCallback cb, Value::Filter std::shared_ptr<Search> sr = (srp == srs.end()) ? search(id, af) : srp->second; if (!sr) throw DhtException("Can't create search"); - DHT_LOG_ERR("[search %s IPv%c] listen", id.toString().c_str(), (af == AF_INET) ? '4' : '6'); + DHT_LOG.e(id, "[search %s IPv%c] listen", id.toString().c_str(), (af == AF_INET) ? '4' : '6'); sr->done = false; auto token = ++sr->listener_token; sr->listeners.emplace(token, LocalListener{q, f, cb}); @@ -1855,7 +1850,7 @@ Dht::listen(const InfoHash& id, GetCallback cb, Value::Filter&& f, Where&& where auto token4 = Dht::listenTo(id, AF_INET, gcb, filter, query); auto token6 = Dht::listenTo(id, AF_INET6, gcb, filter, query); - DHT_LOG_DEBUG("Added listen : %d -> %d %d %d", token, tokenlocal, token4, token6); + DHT_LOG.d(id, "Added listen : %d -> %d %d %d", token, tokenlocal, token4, token6); listeners.emplace(token, std::make_tuple(tokenlocal, token4, token6)); return token; } @@ -1867,10 +1862,10 @@ Dht::cancelListen(const InfoHash& id, size_t token) auto it = listeners.find(token); if (it == listeners.end()) { - DHT_LOG_WARN("Listen token not found: %d", token); + DHT_LOG.w(id, "Listen token not found: %d", token); return false; } - DHT_LOG_DEBUG("cancelListen %s with token %d", id.toString().c_str(), token); + DHT_LOG.d(id, "cancelListen %s with token %d", id.toString().c_str(), token); auto st = store.find(id); auto tokenlocal = std::get<0>(it->second); if (st != store.end() && tokenlocal) @@ -1920,7 +1915,7 @@ Dht::put(const InfoHash& id, std::shared_ptr<Value> val, DoneCallback callback, val->id = rand_id(rdev); } - DHT_LOG_DEBUG("put: adding %s -> %s", id.toString().c_str(), val->toString().c_str()); + DHT_LOG.d(id, "put: adding %s -> %s", id.toString().c_str(), val->toString().c_str()); auto ok = std::make_shared<bool>(false); auto done = std::make_shared<bool>(false); @@ -1934,13 +1929,13 @@ Dht::put(const InfoHash& id, std::shared_ptr<Value> val, DoneCallback callback, } }; announce(id, AF_INET, val, [=](bool ok4, const std::vector<std::shared_ptr<Node>>& nodes) { - DHT_LOG_DEBUG("Announce done IPv4 %d", ok4); + DHT_LOG.d(id, "Announce done IPv4 %d", ok4); *done4 = true; *ok |= ok4; donecb(nodes); }, created, permanent); announce(id, AF_INET6, val, [=](bool ok6, const std::vector<std::shared_ptr<Node>>& nodes) { - DHT_LOG_DEBUG("Announce done IPv6 %d", ok6); + DHT_LOG.d(id, "Announce done IPv6 %d", ok6); *done6 = true; *ok |= ok6; donecb(nodes); @@ -2167,16 +2162,16 @@ Dht::cancelPut(const InfoHash& id, const Value::Id& vid) void Dht::storageChanged(const InfoHash& id, Storage& st, ValueStorage& v) { - DHT_LOG_DEBUG("[Storage %s] changed.", id.toString().c_str()); + DHT_LOG.d(id, "[store %s] changed", id.toString().c_str()); if (not st.local_listeners.empty()) { - DHT_LOG_DEBUG("[Storage %s] %lu local listeners.", id.toString().c_str(), st.local_listeners.size()); + DHT_LOG.d(id, "[store %s] %lu local listeners", id.toString().c_str(), st.local_listeners.size()); std::vector<std::pair<GetCallback, std::vector<std::shared_ptr<Value>>>> cbs; for (const auto& l : st.local_listeners) { std::vector<std::shared_ptr<Value>> vals; if (not l.second.filter or l.second.filter(*v.data)) vals.push_back(v.data); if (not vals.empty()) { - DHT_LOG_DEBUG("[Storage %s] Sending update local listener with token %lu.", + DHT_LOG.d(id, "[store %s] sending update local listener with token %lu", id.toString().c_str(), l.first); cbs.emplace_back(l.second.get_cb, std::move(vals)); @@ -2187,12 +2182,12 @@ Dht::storageChanged(const InfoHash& id, Storage& st, ValueStorage& v) cb.first(cb.second); } - DHT_LOG_DEBUG("[Storage %s] %lu remote listeners.", id.toString().c_str(), st.listeners.size()); + DHT_LOG.d(id, "[store %s] %lu remote listeners", id.toString().c_str(), st.listeners.size()); for (const auto& l : st.listeners) { auto f = l.second.query.where.getFilter(); if (f and not f(*v.data)) continue; - DHT_LOG_DEBUG("[Storage %s] Sending update to %s.", id.toString().c_str(), l.first->toString().c_str()); + DHT_LOG.w(id, l.first->id, "[store %s] [node %s] sending update", id.toString().c_str(), l.first->toString().c_str()); std::vector<std::shared_ptr<Value>> vals {}; vals.push_back(v.data); Blob ntoken = makeToken((const sockaddr*)&l.first->addr.first, false); @@ -2210,7 +2205,7 @@ Dht::storageStore(const InfoHash& id, const std::shared_ptr<Value>& value, time_ auto expiration = created + getType(value->type).expiration; if (expiration < now) { using namespace std::chrono; - DHT_LOG_WARN("[store %s] won't store already expired value (created %lld s ago, expired %lld s ago)", + DHT_LOG.w(id, "[store %s] won't store already expired value (created %lld s ago, expired %lld s ago)", id.toString().c_str(), duration_cast<seconds>(now - created).count(), duration_cast<seconds>(now - expiration).count()); return false; @@ -2310,7 +2305,7 @@ Dht::expireStorage() for (auto l = i->second.listeners.cbegin(); l != i->second.listeners.cend();){ bool expired = l->second.time + Node::NODE_EXPIRE_TIME < now; if (expired) { - DHT_LOG_DEBUG("Discarding expired listener %s", l->first->id.toString().c_str()); + DHT_LOG.d(i->first, l->first->id, "[store %s] [node %s] discarding expired listener", i->first.toString().c_str(), l->first->toString().c_str()); i->second.listeners.erase(l++); } else ++l; @@ -2321,7 +2316,7 @@ Dht::expireStorage() total_values += stats.second; if (i->second.empty() && i->second.listeners.empty() && i->second.local_listeners.empty()) { - DHT_LOG_DEBUG("Discarding expired value %s", i->first.toString().c_str()); + DHT_LOG.d(i->first, "[store %s] discarding empty storage", i->first.toString().c_str()); i = store.erase(i); } else @@ -2602,7 +2597,7 @@ Dht::dumpTables() const out << getStorageLog() << std::endl; - DHT_LOG_DEBUG("%s", out.str().c_str()); + DHT_LOG.d("%s", out.str().c_str()); } std::string @@ -2625,7 +2620,7 @@ Dht::getStorageLog() const out << " (since " << since.count() << "s, exp in " << expires.count() << "s)" << std::endl; } } - out << "Total " << store.size() << " storages, " << total_values << " values (" << (total_store_size/1024) << " ĶB)" << std::endl; + out << "Total " << store.size() << " storages, " << total_values << " values (" << (total_store_size/1024) << " KB)" << std::endl; return out.str(); } @@ -2695,7 +2690,7 @@ Dht::Dht(int s, int s6, Config config) uniform_duration_distribution<> time_dis {std::chrono::seconds(3), std::chrono::seconds(5)}; auto confirm_nodes_time = scheduler.time() + time_dis(rd); - DHT_LOG_DEBUG("Scheduling %s", myid.toString().c_str()); + DHT_LOG.d(myid, "Scheduling %s", myid.toString().c_str()); nextNodesConfirmation = scheduler.add(confirm_nodes_time, std::bind(&Dht::confirmNodes, this)); // Fill old secret @@ -2707,7 +2702,7 @@ Dht::Dht(int s, int s6, Config config) expire(); - DHT_LOG_DEBUG("DHT initialised with node ID %s", myid.toString().c_str()); + DHT_LOG.d("DHT initialised with node ID %s", myid.toString().c_str()); } @@ -2734,7 +2729,7 @@ Dht::neighbourhoodMaintenance(RoutingTable& list) auto n = q->randomNode(); if (n) { - DHT_LOG_DEBUG("[node %s] sending [find %s] for neighborhood maintenance.", + DHT_LOG.d(id, n->id, "[node %s] sending [find %s] for neighborhood maintenance", n->toString().c_str(), id.toString().c_str()); /* Since our node-id is the same in both DHTs, it's probably profitable to query both families. */ @@ -2786,12 +2781,13 @@ Dht::bucketMaintenance(RoutingTable& list) want = WANT4 | WANT6; } - DHT_LOG_DEBUG("[node %s] sending find %s for bucket maintenance.", n->toString().c_str(), id.toString().c_str()); + DHT_LOG.d(id, n->id, "[node %s] sending find %s for bucket maintenance", n->toString().c_str(), id.toString().c_str()); auto start = scheduler.time(); network_engine.sendFindNode(n, id, want, nullptr, [this,start,n](const Request&, bool over) { if (over) { - auto end = scheduler.time(); - DHT_LOG_DEBUG("[node %s] bucket maintenance op expired after %llu", n->toString().c_str(), std::chrono::duration_cast<std::chrono::milliseconds>(end-start).count()); + const auto& end = scheduler.time(); + using namespace std::chrono; + DHT_LOG.d(n->id, "[node %s] bucket maintenance op expired after %llu ms", n->toString().c_str(), duration_cast<milliseconds>(end-start).count()); scheduler.edit(nextNodesConfirmation, end + 3 * Node::NODE_EXPIRE_TIME); } }); @@ -2810,14 +2806,14 @@ Dht::dataPersistence() { auto storage_maintenance_time = time_point::max(); for (auto &str : store) { if (now > str.second.maintenance_time) { - DHT_LOG_WARN("[storage %s] maintenance (%u values, %u bytes)", + DHT_LOG.w(str.first, "[storage %s] maintenance (%u values, %u bytes)", str.first.toString().c_str(), str.second.valueCount(), str.second.totalSize()); maintainStorage(str.first); str.second.maintenance_time = now + MAX_STORAGE_MAINTENANCE_EXPIRE_TIME; } storage_maintenance_time = std::min(storage_maintenance_time, str.second.maintenance_time); } - DHT_LOG_WARN("[store] next maintenance in %u minutes", + DHT_LOG.w("[store] next maintenance in %u minutes", std::chrono::duration_cast<std::chrono::minutes>(storage_maintenance_time-now)); nextStorageMaintenance = storage_maintenance_time != time_point::max() ? scheduler.add(storage_maintenance_time, std::bind(&Dht::dataPersistence, this)) : @@ -2864,7 +2860,7 @@ Dht::maintainStorage(InfoHash id, bool force, DoneCallback donecb) { } if (not want4 and not want6) { - DHT_LOG_DEBUG("Discarding storage values %s", id.toString().c_str()); + DHT_LOG.d(id, "Discarding storage values %s", id.toString().c_str()); auto diff = local_storage->second.clear(); total_store_size += diff.first; total_values += diff.second; @@ -2882,7 +2878,7 @@ Dht::processMessage(const uint8_t *buf, size_t buflen, const SockAddr& from) try { network_engine.processMessage(buf, buflen, from); } catch (const std::exception& e) { - DHT_LOG_ERR("Can't parse message from %s: %s", from.toString().c_str(), e.what()); + DHT_LOG.e("Can't parse message from %s: %s", from.toString().c_str(), e.what()); //auto code = e.getCode(); //if (code == DhtProtocolException::INVALID_TID_SIZE or code == DhtProtocolException::WRONG_NODE_INFO_BUF_LEN) { /* This is really annoying, as it means that we will @@ -2923,11 +2919,11 @@ Dht::confirmNodes() const auto& now = scheduler.time(); if (searches4.empty() and getStatus(AF_INET) == NodeStatus::Connected) { - DHT_LOG_DEBUG("[confirm nodes] initial IPv4 'get' for my id (%s).", myid.toString().c_str()); + DHT_LOG.d(myid, "[confirm nodes] initial IPv4 'get' for my id (%s)", myid.toString().c_str()); search(myid, AF_INET); } if (searches6.empty() and getStatus(AF_INET6) == NodeStatus::Connected) { - DHT_LOG_DEBUG("[confirm nodes] initial IPv6 'get' for my id (%s).", myid.toString().c_str()); + DHT_LOG.d(myid, "[confirm nodes] initial IPv6 'get' for my id (%s)", myid.toString().c_str()); search(myid, AF_INET6); } @@ -3003,17 +2999,17 @@ Dht::importValues(const std::vector<ValuesExport>& import) val_time = time_point{time_point::duration{valel.via.array.ptr[0].as<time_point::duration::rep>()}}; tmp_val.msgpack_unpack(valel.via.array.ptr[1]); } catch (const std::exception&) { - DHT_LOG_ERR("Error reading value at %s", h.first.toString().c_str()); + DHT_LOG.e(h.first, "Error reading value at %s", h.first.toString().c_str()); continue; } if (val_time + getType(tmp_val.type).expiration < scheduler.time()) { - DHT_LOG_DEBUG("Discarding expired value at %s", h.first.toString().c_str()); + DHT_LOG.d(h.first, "Discarding expired value at %s", h.first.toString().c_str()); continue; } storageStore(h.first, std::make_shared<Value>(std::move(tmp_val)), val_time); } } catch (const std::exception&) { - DHT_LOG_ERR("Error reading values at %s", h.first.toString().c_str()); + DHT_LOG.e(h.first, "Error reading values at %s", h.first.toString().c_str()); continue; } } @@ -3065,7 +3061,7 @@ void Dht::pingNode(const sockaddr* sa, socklen_t salen, DoneCallbackSimple&& cb) { scheduler.syncTime(); - DHT_LOG_DEBUG("Sending ping to %s", print_addr(sa, salen).c_str()); + DHT_LOG.d("Sending ping to %s", print_addr(sa, salen).c_str()); auto& count = sa->sa_family == AF_INET ? pending_pings4 : pending_pings6; count++; network_engine.sendPing(sa, salen, [&count,cb](const Request&, NetworkEngine::RequestAnswer&&) { @@ -3084,7 +3080,7 @@ Dht::pingNode(const sockaddr* sa, socklen_t salen, DoneCallbackSimple&& cb) void Dht::onError(std::shared_ptr<Request> req, DhtProtocolException e) { if (e.getCode() == DhtProtocolException::UNAUTHORIZED) { - DHT_LOG_ERR("[node %s] token flush", req->node->toString().c_str()); + DHT_LOG.e(req->node->id, "[node %s] token flush", req->node->toString().c_str()); req->node->authError(); network_engine.cancelRequest(req); for (auto& srp : req->node->getFamily() == AF_INET ? searches4 : searches6) { @@ -3098,7 +3094,7 @@ Dht::onError(std::shared_ptr<Request> req, DhtProtocolException e) { } } } else if (e.getCode() == DhtProtocolException::NOT_FOUND) { - DHT_LOG.ERR("[node %s] returned error 404: storage not found", req->node->id.toString().c_str()); + DHT_LOG.e(req->node->id, "[node %s] returned error 404: storage not found", req->node->toString().c_str()); network_engine.cancelRequest(req); } } @@ -3135,7 +3131,7 @@ NetworkEngine::RequestAnswer Dht::onGetValues(std::shared_ptr<Node> node, const InfoHash& hash, want_t, const Query& query) { if (hash == zeroes) { - DHT_LOG_WARN("[node %s] Eek! Got get_values with no info_hash.", node->toString().c_str()); + DHT_LOG.w("[node %s] Eek! Got get_values with no info_hash", node->toString().c_str()); throw DhtProtocolException { DhtProtocolException::NON_AUTHORITATIVE_INFORMATION, DhtProtocolException::GET_NO_INFOHASH @@ -3149,9 +3145,9 @@ Dht::onGetValues(std::shared_ptr<Node> node, const InfoHash& hash, want_t, const answer.nodes6 = buckets6.findClosestNodes(hash, now, TARGET_NODES); if (st != store.end() && not st->second.empty()) { answer.values = st->second.get(query.where.getFilter()); - DHT_LOG_DEBUG("[node %s] sending %u values.", node->toString().c_str(), answer.values.size()); + DHT_LOG.d(hash, "[node %s] sending %u values", node->toString().c_str(), answer.values.size()); } else { - DHT_LOG_DEBUG("[node %s] sending nodes.", node->toString().c_str()); + DHT_LOG.d(hash, "[node %s] sending nodes", node->toString().c_str()); } return answer; } @@ -3162,16 +3158,16 @@ void Dht::onGetValuesDone(const Request& status, const std::shared_ptr<Query>& orig_query) { if (not sr) { - DHT_LOG_WARN("[search unknown] got reply to 'get'. Ignoring."); + DHT_LOG.w("[search unknown] got reply to 'get'. Ignoring."); return; } - DHT_LOG_DEBUG("[search %s] [node %s] got reply to 'get' with %u nodes", + DHT_LOG.d(sr->id, "[search %s] [node %s] got reply to 'get' with %u nodes", sr->id.toString().c_str(), status.node->toString().c_str(), a.nodes4.size()); if (not a.ntoken.empty()) { if (not a.values.empty() or not a.fields.empty()) { - DHT_LOG_DEBUG("[search %s IPv%c] found %u values", + DHT_LOG.d(sr->id, "[search %s IPv%c] found %u values", sr->id.toString().c_str(), sr->af == AF_INET ? '4' : '6', a.values.size()); for (auto& getp : sr->callbacks) { /* call all callbacks for this search */ @@ -3216,7 +3212,7 @@ void Dht::onGetValuesDone(const Request& status, l.first(l.second); } } else { - DHT_LOG_WARN("[node %s] no token provided. Ignoring response content.", status.node->toString().c_str()); + DHT_LOG.w(sr->id, "[node %s] no token provided. Ignoring response content.", status.node->toString().c_str()); network_engine.blacklistNode(status.node); } @@ -3232,14 +3228,14 @@ NetworkEngine::RequestAnswer Dht::onListen(std::shared_ptr<Node> node, const InfoHash& hash, const Blob& token, size_t rid, const Query& query) { if (hash == zeroes) { - DHT_LOG_WARN("[node %s] Listen with no info_hash.", node->toString().c_str()); + DHT_LOG.w(node->id, "[node %s] listen with no info_hash", node->toString().c_str()); throw DhtProtocolException { DhtProtocolException::NON_AUTHORITATIVE_INFORMATION, DhtProtocolException::LISTEN_NO_INFOHASH }; } if (!tokenMatch(token, (sockaddr*)&node->addr.first)) { - DHT_LOG_WARN("[node %s] incorrect token %s for 'listen'.", node->toString().c_str(), hash.toString().c_str()); + DHT_LOG.w(hash, node->id, "[node %s] incorrect token %s for 'listen'", node->toString().c_str(), hash.toString().c_str()); throw DhtProtocolException {DhtProtocolException::UNAUTHORIZED, DhtProtocolException::LISTEN_WRONG_TOKEN}; } Query q = query; @@ -3253,20 +3249,17 @@ Dht::onListenDone(const Request& status, std::shared_ptr<Search>& sr, const std::shared_ptr<Query>& orig_query) { - DHT_LOG_DEBUG("[search %s] [node %s] Got reply to listen.", sr->id.toString().c_str(), status.node->toString().c_str()); - if (sr) { - if (not answer.values.empty()) { /* got new values from listen request */ - DHT_LOG_DEBUG("[listen %s] Got new values.", sr->id.toString().c_str()); - onGetValuesDone(status, answer, sr, orig_query); - } + DHT_LOG.d(sr->id, status.node->id, "[search %s] [node %s] got reply to listen (%llu values)", + sr->id.toString().c_str(), status.node->toString().c_str(), answer.values.size()); + if (not answer.values.empty()) { /* got new values from listen request */ + onGetValuesDone(status, answer, sr, orig_query); + } - if (not sr->done) { - const auto& now = scheduler.time(); - searchSendGetValues(sr); - scheduler.edit(sr->nextSearchStep, sr->getNextStepTime(now)); - } - } else - DHT_LOG_DEBUG("Unknown search or announce!"); + if (not sr->done) { + const auto& now = scheduler.time(); + searchSendGetValues(sr); + scheduler.edit(sr->nextSearchStep, sr->getNextStepTime(now)); + } } NetworkEngine::RequestAnswer @@ -3277,14 +3270,14 @@ Dht::onAnnounce(std::shared_ptr<Node> node, const time_point& created) { if (hash == zeroes) { - DHT_LOG_WARN("Put with no info_hash."); + DHT_LOG.w(node->id, "put with no info_hash"); throw DhtProtocolException { DhtProtocolException::NON_AUTHORITATIVE_INFORMATION, DhtProtocolException::PUT_NO_INFOHASH }; } if (!tokenMatch(token, (sockaddr*)&node->addr.first)) { - DHT_LOG_WARN("[node %s] incorrect token %s for 'put'.", node->toString().c_str(), hash.toString().c_str()); + DHT_LOG.w(hash, node->id, "[node %s] incorrect token %s for 'put'", node->toString().c_str(), hash.toString().c_str()); throw DhtProtocolException {DhtProtocolException::UNAUTHORIZED, DhtProtocolException::PUT_WRONG_TOKEN}; } { @@ -3292,14 +3285,14 @@ Dht::onAnnounce(std::shared_ptr<Node> node, // SEARCH_NODES nodes around the target id. auto closest_nodes = buckets(node->getFamily()).findClosestNodes(hash, scheduler.time(), SEARCH_NODES); if (closest_nodes.size() >= TARGET_NODES and hash.xorCmp(closest_nodes.back()->id, myid) < 0) { - DHT_LOG_WARN("[node %s] announce too far from the target. Dropping value.", node->toString().c_str()); + DHT_LOG.w(hash, node->id, "[node %s] announce too far from the target. Dropping value.", node->toString().c_str()); return {}; } } for (const auto& v : values) { if (v->id == Value::INVALID_ID) { - DHT_LOG_WARN("[value %s] incorrect value id", hash.toString().c_str()); + DHT_LOG.w(hash, node->id, "[value %s] incorrect value id", hash.toString().c_str()); throw DhtProtocolException { DhtProtocolException::NON_AUTHORITATIVE_INFORMATION, DhtProtocolException::PUT_INVALID_ID @@ -3309,15 +3302,15 @@ Dht::onAnnounce(std::shared_ptr<Node> node, std::shared_ptr<Value> vc = v; if (lv) { if (*lv == *vc) { - DHT_LOG_WARN("[store %s] nothing to do for %s.", hash.toString().c_str(), lv->toString().c_str()); + DHT_LOG.w(hash, node->id, "[store %s] nothing to do for %s", hash.toString().c_str(), lv->toString().c_str()); } else { const auto& type = getType(lv->type); if (type.editPolicy(hash, lv, vc, node->id, (sockaddr*)&node->addr.first, node->addr.second)) { - DHT_LOG_DEBUG("[store %s] editing %s.", + DHT_LOG.d(hash, node->id, "[store %s] editing %s", hash.toString().c_str(), vc->toString().c_str()); storageStore(hash, vc, created); } else { - DHT_LOG_DEBUG("[store %s] rejecting edition of %s because of storage policy.", + DHT_LOG.d(hash, node->id, "[store %s] rejecting edition of %s because of storage policy", hash.toString().c_str(), vc->toString().c_str()); } } @@ -3325,10 +3318,10 @@ Dht::onAnnounce(std::shared_ptr<Node> node, // Allow the value to be edited by the storage policy const auto& type = getType(vc->type); if (type.storePolicy(hash, vc, node->id, (sockaddr*)&node->addr.first, node->addr.second)) { - DHT_LOG_DEBUG("[store %s] storing %s.", hash.toString().c_str(), vc->toString().c_str()); + DHT_LOG.d(hash, node->id, "[store %s] storing %s", hash.toString().c_str(), vc->toString().c_str()); storageStore(hash, vc, created); } else { - DHT_LOG_DEBUG("[store %s] rejecting storage of %s.", + DHT_LOG.d(hash, node->id, "[store %s] rejecting storage of %s", hash.toString().c_str(), vc->toString().c_str()); } } @@ -3341,17 +3334,16 @@ Dht::onRefresh(std::shared_ptr<Node> node, const InfoHash& hash, const Blob& tok { const auto& now = scheduler.time(); if (not tokenMatch(token, (sockaddr*)&node->addr.first)) { - DHT_LOG.WARN("[node %s] incorrect token %s for 'put'.", node->toString().c_str(), hash.toString().c_str()); + DHT_LOG.w(hash, node->id, "[node %s] incorrect token %s for 'put'", node->toString().c_str(), hash.toString().c_str()); throw DhtProtocolException {DhtProtocolException::UNAUTHORIZED, DhtProtocolException::PUT_WRONG_TOKEN}; } auto s = store.find(hash); if (s != store.end() and s->second.refresh(now, vid)) { - DHT_LOG.DEBUG("[store %s] refreshed value %s.", hash.toString().c_str(), std::to_string(vid).c_str()); + DHT_LOG.d(hash, node->id, "[store %s] [node %s] refreshed value %s", hash.toString().c_str(), node->toString().c_str(), std::to_string(vid).c_str()); } else { - DHT_LOG.DEBUG("[node %s] got refresh value for unknown storage (id: %s).", - node->id.toString().c_str(), - hash.toString().c_str()); + DHT_LOG.d(hash, node->id, "[store %s] [node %s] got refresh for unknown value", + hash.toString().c_str(), node->toString().c_str()); throw DhtProtocolException {DhtProtocolException::NOT_FOUND, DhtProtocolException::STORAGE_NOT_FOUND}; } return {}; @@ -3361,7 +3353,7 @@ void Dht::onAnnounceDone(const Request& req, NetworkEngine::RequestAnswer& answer, std::shared_ptr<Search>& sr) { const auto& now = scheduler.time(); - DHT_LOG_DEBUG("[search %s] [node %s] got reply to put!", + DHT_LOG.d(sr->id, req.node->id, "[search %s] [node %s] got reply to put!", sr->id.toString().c_str(), req.node->toString().c_str()); searchSendGetValues(sr); /* if (auto sn = sr->getNode(req->node)) { */ diff --git a/src/dhtrunner.cpp b/src/dhtrunner.cpp index a8a3cf0287fdd33dd27337e1ea26758481e004d9..5bb478f5eb3dec0aad6e51c739f85541d8e6113b 100644 --- a/src/dhtrunner.cpp +++ b/src/dhtrunner.cpp @@ -214,6 +214,12 @@ DhtRunner::setLoggers(LogMethod error, LogMethod warn, LogMethod debug) { dht_->setLoggers(std::forward<LogMethod>(error), std::forward<LogMethod>(warn), std::forward<LogMethod>(debug)); } +void +DhtRunner::setLogFilter(const InfoHash& f) { + std::lock_guard<std::mutex> lck(dht_mtx); + dht_->setLogFilter(f); +} + void DhtRunner::registerType(const ValueType& type) { std::lock_guard<std::mutex> lck(dht_mtx); @@ -446,10 +452,12 @@ DhtRunner::doRun(const sockaddr_in* sin4, const sockaddr_in6* sin6, SecureDht::C void DhtRunner::get(InfoHash hash, GetCallback vcb, DoneCallback dcb, Value::Filter f, Where w) { - std::lock_guard<std::mutex> lck(storage_mtx); - pending_ops.emplace([=](SecureDht& dht) mutable { - dht.get(hash, vcb, dcb, std::move(f), std::move(w)); - }); + { + std::lock_guard<std::mutex> lck(storage_mtx); + pending_ops.emplace([=](SecureDht& dht) mutable { + dht.get(hash, vcb, dcb, std::move(f), std::move(w)); + }); + } cv.notify_all(); } @@ -459,21 +467,25 @@ DhtRunner::get(const std::string& key, GetCallback vcb, DoneCallbackSimple dcb, get(InfoHash::get(key), vcb, dcb, f, w); } void DhtRunner::query(const InfoHash& hash, QueryCallback cb, DoneCallback done_cb, Query q) { - std::lock_guard<std::mutex> lck(storage_mtx); - pending_ops.emplace([=](SecureDht& dht) mutable { - dht.query(hash, cb, done_cb, std::move(q)); - }); + { + std::lock_guard<std::mutex> lck(storage_mtx); + pending_ops.emplace([=](SecureDht& dht) mutable { + dht.query(hash, cb, done_cb, std::move(q)); + }); + } cv.notify_all(); } std::future<size_t> DhtRunner::listen(InfoHash hash, GetCallback vcb, Value::Filter f, Where w) { - std::lock_guard<std::mutex> lck(storage_mtx); auto ret_token = std::make_shared<std::promise<size_t>>(); - pending_ops.emplace([=](SecureDht& dht) mutable { - ret_token->set_value(dht.listen(hash, vcb, std::move(f), std::move(w))); - }); + { + std::lock_guard<std::mutex> lck(storage_mtx); + pending_ops.emplace([=](SecureDht& dht) mutable { + ret_token->set_value(dht.listen(hash, vcb, std::move(f), std::move(w))); + }); + } cv.notify_all(); return ret_token->get_future(); } @@ -487,42 +499,50 @@ DhtRunner::listen(const std::string& key, GetCallback vcb, Value::Filter f, Wher void DhtRunner::cancelListen(InfoHash h, size_t token) { - std::lock_guard<std::mutex> lck(storage_mtx); - pending_ops.emplace([=](SecureDht& dht) { - dht.cancelListen(h, token); - }); + { + std::lock_guard<std::mutex> lck(storage_mtx); + pending_ops.emplace([=](SecureDht& dht) { + dht.cancelListen(h, token); + }); + } cv.notify_all(); } void DhtRunner::cancelListen(InfoHash h, std::shared_future<size_t> token) { - std::lock_guard<std::mutex> lck(storage_mtx); - pending_ops.emplace([=](SecureDht& dht) { - auto tk = token.get(); - dht.cancelListen(h, tk); - }); + { + std::lock_guard<std::mutex> lck(storage_mtx); + pending_ops.emplace([=](SecureDht& dht) { + auto tk = token.get(); + dht.cancelListen(h, tk); + }); + } cv.notify_all(); } void DhtRunner::put(InfoHash hash, Value&& value, DoneCallback cb, time_point created, bool permanent) { - std::lock_guard<std::mutex> lck(storage_mtx); - auto sv = std::make_shared<Value>(std::move(value)); - pending_ops.emplace([=](SecureDht& dht) { - dht.put(hash, sv, cb, created, permanent); - }); + { + std::lock_guard<std::mutex> lck(storage_mtx); + auto sv = std::make_shared<Value>(std::move(value)); + pending_ops.emplace([=](SecureDht& dht) { + dht.put(hash, sv, cb, created, permanent); + }); + } cv.notify_all(); } void DhtRunner::put(InfoHash hash, std::shared_ptr<Value> value, DoneCallback cb, time_point created, bool permanent) { - std::lock_guard<std::mutex> lck(storage_mtx); - pending_ops.emplace([=](SecureDht& dht) { - dht.put(hash, value, cb, created, permanent); - }); + { + std::lock_guard<std::mutex> lck(storage_mtx); + pending_ops.emplace([=](SecureDht& dht) { + dht.put(hash, value, cb, created, permanent); + }); + } cv.notify_all(); } @@ -535,20 +555,24 @@ DhtRunner::put(const std::string& key, Value&& value, DoneCallbackSimple cb, tim void DhtRunner::cancelPut(const InfoHash& h , const Value::Id& id) { - std::lock_guard<std::mutex> lck(storage_mtx); - pending_ops.emplace([=](SecureDht& dht) { - dht.cancelPut(h, id); - }); + { + std::lock_guard<std::mutex> lck(storage_mtx); + pending_ops.emplace([=](SecureDht& dht) { + dht.cancelPut(h, id); + }); + } cv.notify_all(); } void DhtRunner::putSigned(InfoHash hash, std::shared_ptr<Value> value, DoneCallback cb) { - std::lock_guard<std::mutex> lck(storage_mtx); - pending_ops.emplace([=](SecureDht& dht) { - dht.putSigned(hash, value, cb); - }); + { + std::lock_guard<std::mutex> lck(storage_mtx); + pending_ops.emplace([=](SecureDht& dht) { + dht.putSigned(hash, value, cb); + }); + } cv.notify_all(); } @@ -567,10 +591,12 @@ DhtRunner::putSigned(const std::string& key, Value&& value, DoneCallbackSimple c void DhtRunner::putEncrypted(InfoHash hash, InfoHash to, std::shared_ptr<Value> value, DoneCallback cb) { - std::lock_guard<std::mutex> lck(storage_mtx); - pending_ops.emplace([=](SecureDht& dht) { - dht.putEncrypted(hash, to, value, cb); - }); + { + std::lock_guard<std::mutex> lck(storage_mtx); + pending_ops.emplace([=](SecureDht& dht) { + dht.putEncrypted(hash, to, value, cb); + }); + } cv.notify_all(); } @@ -707,30 +733,36 @@ DhtRunner::bootstrap(const std::vector<std::pair<sockaddr_storage, socklen_t>>& void DhtRunner::bootstrap(const std::vector<NodeExport>& nodes) { - std::lock_guard<std::mutex> lck(storage_mtx); - pending_ops_prio.emplace([=](SecureDht& dht) { - for (auto& node : nodes) - dht.insertNode(node); - }); + { + std::lock_guard<std::mutex> lck(storage_mtx); + pending_ops_prio.emplace([=](SecureDht& dht) { + for (auto& node : nodes) + dht.insertNode(node); + }); + } cv.notify_all(); } void DhtRunner::connectivityChanged() { - std::lock_guard<std::mutex> lck(storage_mtx); - pending_ops.emplace([=](SecureDht& dht) { - dht.connectivityChanged(); - }); + { + std::lock_guard<std::mutex> lck(storage_mtx); + pending_ops.emplace([=](SecureDht& dht) { + dht.connectivityChanged(); + }); + } cv.notify_all(); } void DhtRunner::findCertificate(InfoHash hash, std::function<void(const std::shared_ptr<crypto::Certificate>)> cb) { - std::lock_guard<std::mutex> lck(storage_mtx); - pending_ops.emplace([=](SecureDht& dht) { - dht.findCertificate(hash, cb); - }); + { + std::lock_guard<std::mutex> lck(storage_mtx); + pending_ops.emplace([=](SecureDht& dht) { + dht.findCertificate(hash, cb); + }); + } cv.notify_all(); } diff --git a/src/log_enable.h b/src/log_enable.h deleted file mode 100644 index 76b35149c3493cd27138ed17695c81e7a23fd6c1..0000000000000000000000000000000000000000 --- a/src/log_enable.h +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Copyright (C) 2016 Savoir-faire Linux Inc. - * Author : Adrien Béraud <adrien.beraud@savoirfairelinux.com> - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - */ - -#pragma once - -#ifdef HAVE_CONFIG_H -#include "config.h" -#endif - -#ifndef OPENDHT_LOG -#define OPENDHT_LOG true -#endif - -#define DHT_LOG_DEBUG if (OPENDHT_LOG) DHT_LOG.DEBUG -#define DHT_LOG_WARN if (OPENDHT_LOG) DHT_LOG.WARN -#define DHT_LOG_ERR if (OPENDHT_LOG) DHT_LOG.ERR diff --git a/src/network_engine.cpp b/src/network_engine.cpp index f042b45d486188842497f5713cb39c21ff0862ef..6326ba88e67547bb47285aef4b2e2a9844e87a2b 100644 --- a/src/network_engine.cpp +++ b/src/network_engine.cpp @@ -166,7 +166,7 @@ NetworkEngine::tellListener(std::shared_ptr<Node> node, uint16_t rid, const Info sendNodesValues(node->addr, TransId {TransPrefix::GET_VALUES, (uint16_t)rid}, nnodes.first, nnodes.second, values, query, ntoken); } catch (const std::overflow_error& e) { - DHT_LOG_ERR("Can't send value: buffer not large enough !"); + DHT_LOG.e("Can't send value: buffer not large enough !"); } } @@ -221,7 +221,7 @@ NetworkEngine::requestStep(std::shared_ptr<Request> sreq) auto now = scheduler.time(); auto& node = *req.node; if (req.isExpired(now)) { - DHT_LOG_ERR("[node %s] expired !", node.toString().c_str()); + DHT_LOG.e(node.id, "[node %s] expired !", node.toString().c_str()); node.setExpired(); requests.erase(req.tid); return; @@ -255,7 +255,7 @@ NetworkEngine::sendRequest(std::shared_ptr<Request>& request) // Try to handle this scenario as well as we can e.first->second->setExpired(); e.first->second = request; - DHT_LOG_ERR("Request already existed (tid: %d)!", request->tid); + DHT_LOG.e(request->node->id, "Request already existed (tid: %d)!", request->tid); } request->node->requested(request); requestStep(request); @@ -345,12 +345,12 @@ void NetworkEngine::processMessage(const uint8_t *buf, size_t buflen, const SockAddr& from) { if (isMartian(from)) { - DHT_LOG_WARN("Received packet from martian node %s", from.toString().c_str()); + DHT_LOG.w("Received packet from martian node %s", from.toString().c_str()); return; } if (isNodeBlacklisted(from)) { - DHT_LOG_WARN("Received packet from blacklisted node %s", from.toString().c_str()); + DHT_LOG.w("Received packet from blacklisted node %s", from.toString().c_str()); return; } @@ -359,13 +359,13 @@ NetworkEngine::processMessage(const uint8_t *buf, size_t buflen, const SockAddr& msgpack::unpacked msg_res = msgpack::unpack((const char*)buf, buflen); msg->msgpack_unpack(msg_res.get()); } catch (const std::exception& e) { - DHT_LOG_WARN("Can't process message of size %lu: %s.", buflen, e.what()); - DHT_LOG_DEBUG.logPrintable(buf, buflen); + DHT_LOG.w("Can't process message of size %lu: %s", buflen, e.what()); + DHT_LOG.DEBUG.logPrintable(buf, buflen); return; } if (msg->network != network) { - DHT_LOG_DEBUG("Received message from other network %u.", msg->network); + DHT_LOG.d("Received message from other network %u", msg->network); return; } @@ -375,12 +375,12 @@ NetworkEngine::processMessage(const uint8_t *buf, size_t buflen, const SockAddr& if (msg->type == MessageType::ValueData) { auto pmsg_it = partial_messages.find(msg->tid); if (pmsg_it == partial_messages.end()) { - DHT_LOG_DEBUG("Can't find partial message"); + DHT_LOG.d("Can't find partial message"); rateLimit(from); return; } if (!pmsg_it->second.from.equals(from)) { - DHT_LOG_DEBUG("Received partial message data from unexpected IP address"); + DHT_LOG.d("Received partial message data from unexpected IP address"); rateLimit(from); return; } @@ -399,14 +399,14 @@ NetworkEngine::processMessage(const uint8_t *buf, size_t buflen, const SockAddr& } if (msg->id == myid || msg->id == zeroes) { - DHT_LOG_DEBUG("Received message from self."); + DHT_LOG.d("Received message from self"); return; } if (msg->type > MessageType::Reply) { /* Rate limit requests. */ if (!rateLimit(from)) { - DHT_LOG_WARN("Dropping request due to rate limiting."); + DHT_LOG.w("Dropping request due to rate limiting"); return; } } @@ -425,7 +425,7 @@ NetworkEngine::processMessage(const uint8_t *buf, size_t buflen, const SockAddr& scheduler.add(now + RX_MAX_PACKET_TIME, std::bind(&NetworkEngine::maintainRxBuffer, this, wmsg.first->first)); scheduler.add(now + RX_TIMEOUT, std::bind(&NetworkEngine::maintainRxBuffer, this, wmsg.first->first)); } else - DHT_LOG_ERR("Partial message with given TID already exists."); + DHT_LOG.e("Partial message with given TID already exists"); } } @@ -451,7 +451,7 @@ NetworkEngine::process(std::unique_ptr<ParsedMessage>&& msg, const SockAddr& fro // received reply from unexpected node node->received(now, req); onNewNode(node, 2); - DHT_LOG_WARN("[node %s] Message received from unexpected node.", node->toString().c_str()); + DHT_LOG.w(node->id, "[node %s] message received from unexpected node", node->toString().c_str()); return; } } else @@ -462,7 +462,7 @@ NetworkEngine::process(std::unique_ptr<ParsedMessage>&& msg, const SockAddr& fro onReportedAddr(msg->id, msg->addr); if (req->cancelled() or req->expired() or (req->completed() and not req->persistent)) { - DHT_LOG_WARN("[node %s] response to expired, cancelled or completed request", node->toString().c_str()); + DHT_LOG.w(node->id, "[node %s] response to expired, cancelled or completed request", node->toString().c_str()); requests.erase(reqp); return; } @@ -478,7 +478,7 @@ NetworkEngine::process(std::unique_ptr<ParsedMessage>&& msg, const SockAddr& fro req->reply_time = TIME_INVALID; onError(req, DhtProtocolException {msg->error_code}); } else { - DHT_LOG_WARN("[node %s %s] received unknown error message %u", + DHT_LOG.w(msg->id, "[node %s %s] received unknown error message %u", msg->id.toString().c_str(), from.toString().c_str(), msg->error_code); } break; @@ -506,12 +506,12 @@ NetworkEngine::process(std::unique_ptr<ParsedMessage>&& msg, const SockAddr& fro switch (msg->type) { case MessageType::Ping: ++in_stats.ping; - DHT_LOG_DEBUG("[node %s] Sending pong.", node->toString().c_str()); + DHT_LOG.d(node->id, "[node %s] sending pong", node->toString().c_str()); onPing(node); sendPong(from, msg->tid); break; case MessageType::FindNode: { - DHT_LOG_DEBUG("[node %s] got 'find' request for %s (%d).", node->toString().c_str(), msg->target.toString().c_str(), msg->want); + DHT_LOG.d(msg->target, node->id, "[node %s] got 'find' request for %s (%d)", node->toString().c_str(), msg->target.toString().c_str(), msg->want); ++in_stats.find; RequestAnswer answer = onFindNode(node, msg->target, msg->want); auto nnodes = bufferNodes(from.getFamily(), msg->target, msg->want, answer.nodes4, answer.nodes6); @@ -519,7 +519,7 @@ NetworkEngine::process(std::unique_ptr<ParsedMessage>&& msg, const SockAddr& fro break; } case MessageType::GetValues: { - DHT_LOG_DEBUG("[node %s] got 'get' request for %s.", node->toString().c_str(), msg->info_hash.toString().c_str()); + DHT_LOG.d(msg->info_hash, node->id, "[node %s] got 'get' request for %s", node->toString().c_str(), msg->info_hash.toString().c_str()); ++in_stats.get; RequestAnswer answer = onGetValues(node, msg->info_hash, msg->want, msg->query); auto nnodes = bufferNodes(from.getFamily(), msg->info_hash, msg->want, answer.nodes4, answer.nodes6); @@ -527,7 +527,7 @@ NetworkEngine::process(std::unique_ptr<ParsedMessage>&& msg, const SockAddr& fro break; } case MessageType::AnnounceValue: { - DHT_LOG_DEBUG("[node %s] got 'put' request for %s.", node->toString().c_str(), msg->info_hash.toString().c_str()); + DHT_LOG.d(msg->info_hash, node->id, "[node %s] got 'put' request for %s", node->toString().c_str(), msg->info_hash.toString().c_str()); ++in_stats.put; onAnnounce(node, msg->info_hash, msg->token, msg->values, msg->created); @@ -540,15 +540,13 @@ NetworkEngine::process(std::unique_ptr<ParsedMessage>&& msg, const SockAddr& fro break; } case MessageType::Refresh: - DHT_LOG.DEBUG("[node %s %s] got 'refresh value' request for %s.", - msg->id.toString().c_str(), from.toString().c_str(), - msg->info_hash.toString().c_str()); + DHT_LOG.d(msg->info_hash, node->id, "[node %s] got 'refresh' request for %s", node->toString().c_str(), msg->info_hash.toString().c_str()); onRefresh(node, msg->info_hash, msg->token, msg->value_id); /* Same note as above in MessageType::AnnounceValue applies. */ sendValueAnnounced(from, msg->tid, msg->value_id); break; case MessageType::Listen: { - DHT_LOG_DEBUG("[node %s] got 'listen' request for %s.", node->toString().c_str(), msg->info_hash.toString().c_str()); + DHT_LOG.d(msg->info_hash, node->id, "[node %s] got 'listen' request for %s", node->toString().c_str(), msg->info_hash.toString().c_str()); ++in_stats.listen; RequestAnswer answer = onListen(node, msg->info_hash, msg->token, msg->tid.getTid(), std::move(msg->query)); sendListenConfirmation(from, msg->tid); @@ -558,7 +556,7 @@ NetworkEngine::process(std::unique_ptr<ParsedMessage>&& msg, const SockAddr& fro break; } } catch (const std::overflow_error& e) { - DHT_LOG_ERR("Can't send value: buffer not large enough !"); + DHT_LOG.e("Can't send value: buffer not large enough !"); } catch (DhtProtocolException& e) { sendError(from, msg->tid, e.getCode(), e.getMsg().c_str(), true); } @@ -625,7 +623,7 @@ NetworkEngine::sendPing(std::shared_ptr<Node> node, RequestCb on_done, RequestEx Blob b {buffer.data(), buffer.data() + buffer.size()}; std::shared_ptr<Request> req(new Request {tid.getTid(), node, std::move(b), [=](const Request& req_status, ParsedMessage&&) { - DHT_LOG_DEBUG("[node %s] Got pong !", req_status.node->toString().c_str()); + DHT_LOG.d(req_status.node->id, "[node %s] got pong !", req_status.node->toString().c_str()); if (on_done) { on_done(req_status, {}); } @@ -814,7 +812,7 @@ NetworkEngine::packValueHeader(msgpack::sbuffer& buffer, const std::vector<std:: if (svals.size() < 50 && total_size < MAX_PACKET_VALUE_SIZE) { for (const auto& b : svals) buffer.write((const char*)b.data(), b.size()); - DHT_LOG_DEBUG("sending %lu bytes of values", total_size); + DHT_LOG.d("sending %lu bytes of values", total_size); svals.clear(); } else { for (const auto& b : svals) @@ -1063,7 +1061,7 @@ NetworkEngine::sendAnnounceValue(std::shared_ptr<Node> n, std::shared_ptr<Request> req(new Request {tid.getTid(), n, std::move(b), [=](const Request& req_status, ParsedMessage&& msg) { /* on done */ if (msg.value_id == Value::INVALID_ID) { - DHT_LOG_DEBUG("Unknown search or announce!"); + DHT_LOG.d(infohash, "Unknown search or announce!"); } else { if (on_done) { RequestAnswer answer {}; @@ -1117,7 +1115,7 @@ NetworkEngine::sendRefreshValue(std::shared_ptr<Node> n, std::shared_ptr<Request> req(new Request {tid.getTid(), n, std::move(b), [=](const Request& req_status, ParsedMessage&& msg) { /* on done */ if (msg.value_id == Value::INVALID_ID) { - DHT_LOG.DEBUG("Unknown search or announce!"); + DHT_LOG.d(infohash, "Unknown search or announce!"); } else { if (on_done) { RequestAnswer answer {}; @@ -1375,7 +1373,7 @@ NetworkEngine::maintainRxBuffer(const TransId& tid) if (msg != partial_messages.end()) { if (msg->second.start + RX_MAX_PACKET_TIME < now || msg->second.last_part + RX_TIMEOUT < now) { - DHT_LOG_WARN("Dropping expired partial message from %s", msg->second.from.toString().c_str()); + DHT_LOG.w("Dropping expired partial message from %s", msg->second.from.toString().c_str()); partial_messages.erase(msg); } } diff --git a/tools/dhtnode.cpp b/tools/dhtnode.cpp index f169c2e6205728e0381af11acd0fea8f4a0d5cda..8d266e6d445a7437d290c2b26d1c17b600ee8bfe 100644 --- a/tools/dhtnode.cpp +++ b/tools/dhtnode.cpp @@ -140,11 +140,14 @@ void cmd_loop(std::shared_ptr<DhtRunner>& dht, dht_params& params) } continue; } else if (op == "log") { - params.log = !params.log; + iss >> idstr; + InfoHash filter(idstr); + params.log = filter == InfoHash{} ? !params.log : true; if (params.log) log::enableLogging(*dht); else log::disableLogging(*dht); + dht->setLogFilter(filter); continue; }