diff --git a/include/opendht/dht.h b/include/opendht/dht.h index 43910c2e1cf0417058dedad79e650aeba9ce6a92..e79d868babc4f8509a7f1ef386d239ead907ba8b 100644 --- a/include/opendht/dht.h +++ b/include/opendht/dht.h @@ -41,7 +41,9 @@ namespace dht { +namespace net { struct Request; +} /* namespace net */ /** * Main Dht class. @@ -402,7 +404,7 @@ private: std::shared_ptr<Scheduler::Job> nextStorageMaintenance {}; time_point mybucket_grow_time {time_point::min()}, mybucket6_grow_time {time_point::min()}; - NetworkEngine network_engine; + net::NetworkEngine network_engine; unsigned pending_pings4 {0}; unsigned pending_pings6 {0}; @@ -497,8 +499,8 @@ private: * @param ws A weak pointer to the search concerned by the request. * @param query The query sent to the node. */ - void searchNodeGetDone(const Request& status, - NetworkEngine::RequestAnswer&& answer, + void searchNodeGetDone(const net::Request& status, + net::NetworkEngine::RequestAnswer&& answer, std::weak_ptr<Search> ws, std::shared_ptr<Query> query); @@ -511,7 +513,7 @@ private: * @param ws A weak pointer to the search concerned by the request. * @param query The query sent to the node. */ - void searchNodeGetExpired(const Request& status, bool over, std::weak_ptr<Search> ws, std::shared_ptr<Query> query); + void searchNodeGetExpired(const net::Request& status, bool over, std::weak_ptr<Search> ws, std::shared_ptr<Query> query); /** * This method recovers sends individual request for values per id. @@ -549,44 +551,44 @@ private: void processMessage(const uint8_t *buf, size_t buflen, const SockAddr&); - void onError(std::shared_ptr<Request> node, DhtProtocolException e); + void onError(std::shared_ptr<net::Request> node, net::DhtProtocolException e); /* when our address is reported by a distant peer. */ void onReportedAddr(const InfoHash& id, const SockAddr&); /* when we receive a ping request */ - NetworkEngine::RequestAnswer onPing(std::shared_ptr<Node> node); + net::NetworkEngine::RequestAnswer onPing(std::shared_ptr<Node> node); /* when we receive a "find node" request */ - NetworkEngine::RequestAnswer onFindNode(std::shared_ptr<Node> node, const InfoHash& hash, want_t want); - void onFindNodeDone(const Request& status, NetworkEngine::RequestAnswer& a, std::shared_ptr<Search> sr); + net::NetworkEngine::RequestAnswer onFindNode(std::shared_ptr<Node> node, const InfoHash& hash, want_t want); + void onFindNodeDone(const Request& status, net::NetworkEngine::RequestAnswer& a, std::shared_ptr<Search> sr); /* when we receive a "get values" request */ - NetworkEngine::RequestAnswer onGetValues(std::shared_ptr<Node> node, + net::NetworkEngine::RequestAnswer onGetValues(std::shared_ptr<Node> node, const InfoHash& hash, want_t want, const Query& q); void onGetValuesDone(const Request& status, - NetworkEngine::RequestAnswer& a, + net::NetworkEngine::RequestAnswer& a, std::shared_ptr<Search>& sr, const std::shared_ptr<Query>& orig_query); /* when we receive a listen request */ - NetworkEngine::RequestAnswer onListen(std::shared_ptr<Node> node, + net::NetworkEngine::RequestAnswer onListen(std::shared_ptr<Node> node, const InfoHash& hash, const Blob& token, size_t rid, const Query& query); void onListenDone(const Request& status, - NetworkEngine::RequestAnswer& a, + net::NetworkEngine::RequestAnswer& a, std::shared_ptr<Search>& sr, const std::shared_ptr<Query>& orig_query); /* when we receive an announce request */ - NetworkEngine::RequestAnswer onAnnounce(std::shared_ptr<Node> node, + net::NetworkEngine::RequestAnswer onAnnounce(std::shared_ptr<Node> node, const InfoHash& hash, const Blob& token, const std::vector<std::shared_ptr<Value>>& v, const time_point& created); - NetworkEngine::RequestAnswer onRefresh(std::shared_ptr<Node> node, + net::NetworkEngine::RequestAnswer onRefresh(std::shared_ptr<Node> node, const InfoHash& hash, const Blob& token, const Value::Id& vid); - void onAnnounceDone(const Request& status, NetworkEngine::RequestAnswer& a, std::shared_ptr<Search>& sr); + void onAnnounceDone(const Request& status, net::NetworkEngine::RequestAnswer& a, std::shared_ptr<Search>& sr); }; } diff --git a/include/opendht/network_engine.h b/include/opendht/network_engine.h index 3d8a4157b8f5b805185769e7c5b0e2078bb3168c..8b61cbd01d207c1c0a92e36e647192b3f9b5d025 100644 --- a/include/opendht/network_engine.h +++ b/include/opendht/network_engine.h @@ -27,6 +27,7 @@ #include "utils.h" #include "rng.h" #include "rate_limiter.h" +#include "net.h" #include <vector> #include <string> @@ -37,6 +38,9 @@ #include <queue> namespace dht { +namespace net { + +struct Request; #ifndef MSG_CONFIRM #define MSG_CONFIRM 0 @@ -96,56 +100,7 @@ struct ParsedMessage; */ class NetworkEngine final { - struct TransPrefix : public std::array<uint8_t, 2> { - TransPrefix(const std::string& str) : std::array<uint8_t, 2>({{(uint8_t)str[0], (uint8_t)str[1]}}) {} - static const TransPrefix PING; - static const TransPrefix FIND_NODE; - static const TransPrefix GET_VALUES; - static const TransPrefix ANNOUNCE_VALUES; - static const TransPrefix REFRESH; - static const TransPrefix LISTEN; - }; public: - - /* Transaction-ids are 4-bytes long, with the first two bytes identifying - * the kind of request, and the remaining two a sequence number in - * host order. - */ - struct TransId final : public std::array<uint8_t, 4> { - static const constexpr uint16_t INVALID {0}; - - TransId() {} - TransId(const std::array<char, 4>& o) { std::copy(o.begin(), o.end(), begin()); } - TransId(const TransPrefix prefix, uint16_t seqno = 0) { - std::copy_n(prefix.begin(), prefix.size(), begin()); - *reinterpret_cast<uint16_t*>(data()+prefix.size()) = seqno; - } - - TransId(const char* q, size_t l) : array<uint8_t, 4>() { - if (l > 4) { - length = 0; - } else { - std::copy_n(q, l, begin()); - length = l; - } - } - - uint16_t getTid() const { - return *reinterpret_cast<const uint16_t*>(&(*this)[2]); - } - - bool matches(const TransPrefix prefix, uint16_t* tid = nullptr) const { - if (std::equal(begin(), begin()+2, prefix.begin())) { - if (tid) - *tid = getTid(); - return true; - } else - return false; - } - - unsigned length {4}; - }; - /*! * @class RequestAnswer * @brief Answer for a request. @@ -440,11 +395,11 @@ private: } struct MessageStats { - unsigned ping {0}; - unsigned find {0}; - unsigned get {0}; - unsigned put {0}; - unsigned listen {0}; + unsigned ping {0}; + unsigned find {0}; + unsigned get {0}; + unsigned put {0}; + unsigned listen {0}; unsigned refresh {0}; }; @@ -542,4 +497,5 @@ private: Scheduler& scheduler; }; -} +} /* namespace net */ +} /* namespace dht */ diff --git a/include/opendht/node.h b/include/opendht/node.h index eac7dc850b150b57cc09858b7a8e0f8e4330d689..6cbfa9c83f05b5807ccb7799a03c32c7895a658c 100644 --- a/include/opendht/node.h +++ b/include/opendht/node.h @@ -28,7 +28,9 @@ namespace dht { +namespace net { struct Request; +} /* namespace net */ struct Node { InfoHash id; @@ -72,8 +74,8 @@ struct Node { void update(const SockAddr&); - void requested(std::shared_ptr<Request>& req); - void received(time_point now, std::shared_ptr<Request> req); + void requested(std::shared_ptr<net::Request>& req); + void received(time_point now, std::shared_ptr<net::Request> req); void setExpired(); @@ -98,12 +100,12 @@ private: /* Number of times we accept authentication errors from this node. */ static const constexpr unsigned MAX_AUTH_ERRORS {3}; - std::list<std::weak_ptr<Request>> requests_ {}; + std::list<std::weak_ptr<net::Request>> requests_ {}; unsigned auth_errors {0}; bool expired_ {false}; void clearPendingQueue() { - requests_.remove_if([](std::weak_ptr<Request>& w) { + requests_.remove_if([](std::weak_ptr<net::Request>& w) { return w.expired(); }); } diff --git a/include/opendht/request.h b/include/opendht/request.h index 8c45199c6f79a1942ba8d6447eec72d71a556e3a..d6c139737cf852b576efb36d96536de22dc218a5 100644 --- a/include/opendht/request.h +++ b/include/opendht/request.h @@ -18,7 +18,12 @@ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ +#pragma once + +#include "net.h" + namespace dht { +namespace net { class NetworkEngine; struct ParsedMessage; @@ -32,7 +37,8 @@ struct ParsedMessage; * request is done. */ struct Request { - friend class dht::NetworkEngine; + friend class dht::net::NetworkEngine; + std::shared_ptr<Node> node {}; /* the node to whom the request is destined. */ time_point reply_time {time_point::min()}; /* time when we received the response to the request. */ @@ -110,4 +116,5 @@ private: const bool persistent {false}; /* the request is not erased upon completion. */ }; +} /* namespace net */ } diff --git a/net.h b/net.h new file mode 100644 index 0000000000000000000000000000000000000000..eecbed8968f4a54702c1c9bc67c3afe594e73d0b --- /dev/null +++ b/net.h @@ -0,0 +1,82 @@ +/* + * Copyright (C) 2014-2016 Savoir-faire Linux Inc. + * Author(s) : Adrien Béraud <adrien.beraud@savoirfairelinux.com> + * Simon Désaulniers <sim.desaulniers@gmail.com> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#pragma once + +#include <array> +#include <cstdint> + +namespace dht { +namespace net { + +struct TransPrefix : public std::array<uint8_t, 2> { + TransPrefix(const std::string& str) : std::array<uint8_t, 2>({{(uint8_t)str[0], (uint8_t)str[1]}}) {} + static const TransPrefix PING; + static const TransPrefix FIND_NODE; + static const TransPrefix GET_VALUES; + static const TransPrefix ANNOUNCE_VALUES; + static const TransPrefix REFRESH; + static const TransPrefix LISTEN; +}; + +/* Transaction-ids are 4-bytes long, with the first two bytes identifying + * the kind of request, and the remaining two a sequence number in + * host order. + */ +struct TransId final : public std::array<uint8_t, 4> { + static const constexpr uint16_t INVALID {0}; + + TransId() { std::fill_n(begin(), 4, 0); } + TransId(const std::array<char, 4>& o) { std::copy(o.begin(), o.end(), begin()); } + TransId(const TransPrefix prefix, uint16_t seqno = 0) { + std::copy_n(prefix.begin(), prefix.size(), begin()); + *reinterpret_cast<uint16_t*>(data()+prefix.size()) = seqno; + } + + TransId(const char* q, size_t l) : array<uint8_t, 4>() { + if (l > 4) { + length = 0; + } else { + std::copy_n(q, l, begin()); + length = l; + } + } + + uint16_t getTid() const { + return *reinterpret_cast<const uint16_t*>(&(*this)[2]); + } + + uint32_t toInt() const { + return *reinterpret_cast<const uint32_t*>(&(*this)[0]); + } + + bool matches(const TransPrefix prefix, uint16_t* tid = nullptr) const { + if (std::equal(begin(), begin()+2, prefix.begin())) { + if (tid) + *tid = getTid(); + return true; + } else + return false; + } + + unsigned length {4}; +}; + +} /* namespace net */ +} /* dht */ diff --git a/src/dht.cpp b/src/dht.cpp index e8808b4683e29b8b934a0b965bff0f63110b906d..66fd79f75e3d02ddfa15a5e42cb298b6ef267974 100644 --- a/src/dht.cpp +++ b/src/dht.cpp @@ -242,16 +242,16 @@ struct Dht::LocalListener { struct Dht::SearchNode { /** - * Foreach value id, we keep track of a pair (Request, time_point) where the + * Foreach value id, we keep track of a pair (net::Request, time_point) where the * request is the request returned by the network engine and the time_point * is the last time at which the value has been refreshed. */ - using AnnounceStatus = std::map<Value::Id, std::pair<std::shared_ptr<Request>, time_point>>; + using AnnounceStatus = std::map<Value::Id, std::pair<std::shared_ptr<net::Request>, time_point>>; /** * Foreach Query, we keep track of the request returned by the network * engine when we sent the "get". */ - using SyncStatus = std::map<std::shared_ptr<Query>, std::shared_ptr<Request>>; + using SyncStatus = std::map<std::shared_ptr<Query>, std::shared_ptr<net::Request>>; std::shared_ptr<Node> node {}; /* the node info */ @@ -1073,32 +1073,32 @@ Dht::expireSearches() } void -Dht::searchNodeGetDone(const Request& status, - NetworkEngine::RequestAnswer&& answer, +Dht::searchNodeGetDone(const net::Request& req, + net::NetworkEngine::RequestAnswer&& answer, std::weak_ptr<Search> ws, std::shared_ptr<Query> query) { const auto& now = scheduler.time(); if (auto sr = ws.lock()) { - if (auto srn = sr->getNode(status.node)) { + if (auto srn = sr->getNode(req.node)) { /* all other get requests which are satisfied by this answer should not be sent anymore */ for (auto& g : sr->callbacks) { auto& q = g.second.query; if (q->isSatisfiedBy(*query) and q != query) { - auto req = std::make_shared<Request>(); - req->cancel(); - srn->getStatus[q] = std::move(req); + auto dummy_req = std::make_shared<net::Request>(); + dummy_req->cancel(); + srn->getStatus[q] = std::move(dummy_req); } } } - sr->insertNode(status.node, now, answer.ntoken); - onGetValuesDone(status, answer, sr, query); + sr->insertNode(req.node, now, answer.ntoken); + onGetValuesDone(req.node, answer, sr, query); } } void -Dht::searchNodeGetExpired(const Request& status, +Dht::searchNodeGetExpired(const net::Request& status, bool over, std::weak_ptr<Search> ws, std::shared_ptr<Query> query) @@ -1118,7 +1118,7 @@ void Dht::paginate(std::weak_ptr<Search> ws, std::shared_ptr<Query> query, Searc if (not sr) return; auto select_q = std::make_shared<Query>(Select {}.field(Value::Field::Id), query ? query->where : Where {}); auto onSelectDone = - [this,ws,query](const Request& status, NetworkEngine::RequestAnswer&& answer) mutable + [this,ws,query](const net::Request& status, net::NetworkEngine::RequestAnswer&& answer) mutable { if (auto sr = ws.lock()) { if (auto sn = sr->getNode(status.node)) { @@ -1248,25 +1248,25 @@ void Dht::searchSendAnnounceValue(const std::shared_ptr<Search>& sr) { if (not something_to_announce) continue; - auto onDone = [this,ws](const Request& status, NetworkEngine::RequestAnswer&& answer) + auto onDone = [this,ws](const net::Request& req, net::NetworkEngine::RequestAnswer&& answer) { /* when put done */ if (auto sr = ws.lock()) { - onAnnounceDone(status, answer, sr); + onAnnounceDone(req.node, answer, sr); searchStep(sr); } }; - auto onExpired = [this,ws](const Request&, bool over) + auto onExpired = [this,ws](const net::Request&, bool over) { /* when put expired */ if (over) if (auto sr = ws.lock()) scheduler.edit(sr->nextSearchStep, scheduler.time()); }; auto onSelectDone = - [this,ws,onDone,onExpired](const Request& status, NetworkEngine::RequestAnswer&& answer) mutable + [this,ws,onDone,onExpired](const net::Request& req, net::NetworkEngine::RequestAnswer&& answer) mutable { /* on probing done */ const auto& now = scheduler.time(); if (auto sr = ws.lock()) { - if (auto sn = sr->getNode(status.node)) { + if (auto sn = sr->getNode(req.node)) { for (auto ait = sr->announce.begin(); ait != sr->announce.end();) { auto& a = *ait; if (sn->isSynced(now) and sn->getAnnounceTime(a.value->id) <= now) { @@ -1307,7 +1307,7 @@ void Dht::searchSendAnnounceValue(const std::shared_ptr<Search>& sr) { } else { DHT_LOG.w(sr->id, sn->node->id, "[search %s] [node %s] already has value (vid: %d). Aborting.", sr->id.toString().c_str(), sn->node->toString().c_str(), a.value->id); - auto ack_req = std::make_shared<Request>(); + auto ack_req = std::make_shared<net::Request>(); ack_req->reply_time = now; sn->acked[a.value->id] = std::make_pair(std::move(ack_req), next_refresh_time); @@ -1396,7 +1396,7 @@ Dht::searchStep(std::shared_ptr<Search> sr) std::weak_ptr<Search> ws = sr; n.listenStatus[query] = network_engine.sendListen(n.node, sr->id, *query, n.token, - [this,ws,last_req,query](const Request& req, NetworkEngine::RequestAnswer&& answer) mutable + [this,ws,last_req,query](const net::Request& req, net::NetworkEngine::RequestAnswer&& answer) mutable { /* on done */ network_engine.cancelRequest(last_req); if (auto sr = ws.lock()) { @@ -1404,7 +1404,7 @@ Dht::searchStep(std::shared_ptr<Search> sr) scheduler.edit(sr->nextSearchStep, scheduler.time()); } }, - [this,ws,last_req,query](const Request& req, bool over) mutable + [this,ws,last_req,query](const net::Request& req, bool over) mutable { /* on expired */ network_engine.cancelRequest(last_req); if (auto sr = ws.lock()) { @@ -2844,7 +2844,7 @@ Dht::bucketMaintenance(RoutingTable& list) DHT_LOG.d(id, n->id, "[node %s] sending find %s for bucket maintenance", n->toString().c_str(), id.toString().c_str()); auto start = scheduler.time(); - network_engine.sendFindNode(n, id, want, nullptr, [this,start,n](const Request&, bool over) { + network_engine.sendFindNode(n, id, want, nullptr, [this,start,n](const net::Request&, bool over) { if (over) { const auto& end = scheduler.time(); using namespace std::chrono; @@ -2934,7 +2934,7 @@ Dht::processMessage(const uint8_t *buf, size_t buflen, const SockAddr& from) } catch (const std::exception& e) { DHT_LOG.e("Can't parse message from %s: %s", from.toString().c_str(), e.what()); //auto code = e.getCode(); - //if (code == DhtProtocolException::INVALID_TID_SIZE or code == DhtProtocolException::WRONG_NODE_INFO_BUF_LEN) { + //if (code == net::DhtProtocolException::INVALID_TID_SIZE or code == net::DhtProtocolException::WRONG_NODE_INFO_BUF_LEN) { /* This is really annoying, as it means that we will time-out all our searches that go through this node. Kill it. */ @@ -3113,11 +3113,11 @@ Dht::pingNode(const sockaddr* sa, socklen_t salen, DoneCallbackSimple&& cb) DHT_LOG.d("Sending ping to %s", print_addr(sa, salen).c_str()); auto& count = sa->sa_family == AF_INET ? pending_pings4 : pending_pings6; count++; - network_engine.sendPing(sa, salen, [&count,cb](const Request&, NetworkEngine::RequestAnswer&&) { + network_engine.sendPing(sa, salen, [&count,cb](const net::Request&, net::NetworkEngine::RequestAnswer&&) { count--; if (cb) cb(true); - }, [&count,cb](const Request&, bool last){ + }, [&count,cb](const net::Request&, bool last){ if (last) { count--; if (cb) @@ -3127,8 +3127,8 @@ Dht::pingNode(const sockaddr* sa, socklen_t salen, DoneCallbackSimple&& cb) } void -Dht::onError(std::shared_ptr<Request> req, DhtProtocolException e) { - if (e.getCode() == DhtProtocolException::UNAUTHORIZED) { +Dht::onError(std::shared_ptr<net::Request> req, net::DhtProtocolException e) { + if (e.getCode() == net::DhtProtocolException::UNAUTHORIZED) { DHT_LOG.e(req->node->id, "[node %s] token flush", req->node->toString().c_str()); req->node->authError(); network_engine.cancelRequest(req); @@ -3142,7 +3142,7 @@ Dht::onError(std::shared_ptr<Request> req, DhtProtocolException e) { break; } } - } else if (e.getCode() == DhtProtocolException::NOT_FOUND) { + } else if (e.getCode() == net::DhtProtocolException::NOT_FOUND) { DHT_LOG.e(req->node->id, "[node %s] returned error 404: storage not found", req->node->toString().c_str()); network_engine.cancelRequest(req); } @@ -3157,17 +3157,17 @@ Dht::onReportedAddr(const InfoHash& id, const SockAddr& addr) reportedAddr(addr); } -NetworkEngine::RequestAnswer +net::NetworkEngine::RequestAnswer Dht::onPing(std::shared_ptr<Node>) { return {}; } -NetworkEngine::RequestAnswer +net::NetworkEngine::RequestAnswer Dht::onFindNode(std::shared_ptr<Node> node, const InfoHash& target, want_t want) { const auto& now = scheduler.time(); - NetworkEngine::RequestAnswer answer; + net::NetworkEngine::RequestAnswer answer; answer.ntoken = makeToken((sockaddr*)&node->addr.first, false); if (want & WANT4) answer.nodes4 = buckets4.findClosestNodes(target, now, TARGET_NODES); @@ -3176,18 +3176,18 @@ Dht::onFindNode(std::shared_ptr<Node> node, const InfoHash& target, want_t want) return answer; } -NetworkEngine::RequestAnswer +net::NetworkEngine::RequestAnswer Dht::onGetValues(std::shared_ptr<Node> node, const InfoHash& hash, want_t, const Query& query) { if (hash == zeroes) { DHT_LOG.w("[node %s] Eek! Got get_values with no info_hash", node->toString().c_str()); - throw DhtProtocolException { - DhtProtocolException::NON_AUTHORITATIVE_INFORMATION, - DhtProtocolException::GET_NO_INFOHASH + throw net::DhtProtocolException { + net::DhtProtocolException::NON_AUTHORITATIVE_INFORMATION, + net::DhtProtocolException::GET_NO_INFOHASH }; } const auto& now = scheduler.time(); - NetworkEngine::RequestAnswer answer {}; + net::NetworkEngine::RequestAnswer answer {}; auto st = store.find(hash); answer.ntoken = makeToken((sockaddr*)&node->addr.first, false); answer.nodes4 = buckets4.findClosestNodes(hash, now, TARGET_NODES); @@ -3202,7 +3202,7 @@ Dht::onGetValues(std::shared_ptr<Node> node, const InfoHash& hash, want_t, const } void Dht::onGetValuesDone(const Request& status, - NetworkEngine::RequestAnswer& a, + net::NetworkEngine::RequestAnswer& a, std::shared_ptr<Search>& sr, const std::shared_ptr<Query>& orig_query) { @@ -3273,19 +3273,19 @@ void Dht::onGetValuesDone(const Request& status, } } -NetworkEngine::RequestAnswer +net::NetworkEngine::RequestAnswer Dht::onListen(std::shared_ptr<Node> node, const InfoHash& hash, const Blob& token, size_t rid, const Query& query) { if (hash == zeroes) { DHT_LOG.w(node->id, "[node %s] listen with no info_hash", node->toString().c_str()); - throw DhtProtocolException { - DhtProtocolException::NON_AUTHORITATIVE_INFORMATION, - DhtProtocolException::LISTEN_NO_INFOHASH + throw net::DhtProtocolException { + net::DhtProtocolException::NON_AUTHORITATIVE_INFORMATION, + net::DhtProtocolException::LISTEN_NO_INFOHASH }; } if (!tokenMatch(token, (sockaddr*)&node->addr.first)) { DHT_LOG.w(hash, node->id, "[node %s] incorrect token %s for 'listen'", node->toString().c_str(), hash.toString().c_str()); - throw DhtProtocolException {DhtProtocolException::UNAUTHORIZED, DhtProtocolException::LISTEN_WRONG_TOKEN}; + throw net::DhtProtocolException {net::DhtProtocolException::UNAUTHORIZED, net::DhtProtocolException::LISTEN_WRONG_TOKEN}; } Query q = query; storageAddListener(hash, node, rid, std::move(q)); @@ -3294,7 +3294,7 @@ Dht::onListen(std::shared_ptr<Node> node, const InfoHash& hash, const Blob& toke void Dht::onListenDone(const Request& status, - NetworkEngine::RequestAnswer& answer, + net::NetworkEngine::RequestAnswer& answer, std::shared_ptr<Search>& sr, const std::shared_ptr<Query>& orig_query) { @@ -3311,7 +3311,7 @@ Dht::onListenDone(const Request& status, } } -NetworkEngine::RequestAnswer +net::NetworkEngine::RequestAnswer Dht::onAnnounce(std::shared_ptr<Node> node, const InfoHash& hash, const Blob& token, @@ -3320,14 +3320,14 @@ Dht::onAnnounce(std::shared_ptr<Node> node, { if (hash == zeroes) { DHT_LOG.w(node->id, "put with no info_hash"); - throw DhtProtocolException { - DhtProtocolException::NON_AUTHORITATIVE_INFORMATION, - DhtProtocolException::PUT_NO_INFOHASH + throw net::DhtProtocolException { + net::DhtProtocolException::NON_AUTHORITATIVE_INFORMATION, + net::DhtProtocolException::PUT_NO_INFOHASH }; } if (!tokenMatch(token, (sockaddr*)&node->addr.first)) { DHT_LOG.w(hash, node->id, "[node %s] incorrect token %s for 'put'", node->toString().c_str(), hash.toString().c_str()); - throw DhtProtocolException {DhtProtocolException::UNAUTHORIZED, DhtProtocolException::PUT_WRONG_TOKEN}; + throw net::DhtProtocolException {net::DhtProtocolException::UNAUTHORIZED, net::DhtProtocolException::PUT_WRONG_TOKEN}; } { // We store a value only if we think we're part of the @@ -3343,9 +3343,9 @@ Dht::onAnnounce(std::shared_ptr<Node> node, for (const auto& v : values) { if (v->id == Value::INVALID_ID) { DHT_LOG.w(hash, node->id, "[value %s] incorrect value id", hash.toString().c_str()); - throw DhtProtocolException { - DhtProtocolException::NON_AUTHORITATIVE_INFORMATION, - DhtProtocolException::PUT_INVALID_ID + throw net::DhtProtocolException { + net::DhtProtocolException::NON_AUTHORITATIVE_INFORMATION, + net::DhtProtocolException::PUT_INVALID_ID }; } auto lv = getLocalById(hash, v->id); @@ -3379,9 +3379,11 @@ Dht::onAnnounce(std::shared_ptr<Node> node, return {}; } -NetworkEngine::RequestAnswer +net::NetworkEngine::RequestAnswer Dht::onRefresh(std::shared_ptr<Node> node, const InfoHash& hash, const Blob& token, const Value::Id& vid) { + using namespace net; + const auto& now = scheduler.time(); if (not tokenMatch(token, (sockaddr*)&node->addr.first)) { DHT_LOG.w(hash, node->id, "[node %s] incorrect token %s for 'put'", node->toString().c_str(), hash.toString().c_str()); @@ -3400,7 +3402,7 @@ Dht::onRefresh(std::shared_ptr<Node> node, const InfoHash& hash, const Blob& tok } void -Dht::onAnnounceDone(const Request& req, NetworkEngine::RequestAnswer& answer, std::shared_ptr<Search>& sr) +Dht::onAnnounceDone(const Request& req, net::NetworkEngine::RequestAnswer& answer, std::shared_ptr<Search>& sr) { const auto& now = scheduler.time(); DHT_LOG.d(sr->id, req.node->id, "[search %s] [node %s] got reply to put!", diff --git a/src/network_engine.cpp b/src/network_engine.cpp index 59c9355416497428a8c903686929e094cd322404..01fa009255a3e479520bb776dfa7e053bd4c1528 100644 --- a/src/network_engine.cpp +++ b/src/network_engine.cpp @@ -26,6 +26,7 @@ #include <msgpack.hpp> namespace dht { +namespace net { const std::string DhtProtocolException::GET_NO_INFOHASH {"Get_values with no info_hash"}; const std::string DhtProtocolException::LISTEN_NO_INFOHASH {"Listen with no info_hash"}; @@ -40,15 +41,15 @@ constexpr std::chrono::seconds NetworkEngine::RX_MAX_PACKET_TIME; constexpr std::chrono::seconds NetworkEngine::RX_TIMEOUT; const std::string NetworkEngine::my_v {"RNG1"}; -const constexpr uint16_t NetworkEngine::TransId::INVALID; +const constexpr uint16_t TransId::INVALID; std::mt19937 NetworkEngine::rd_device {dht::crypto::random_device{}()}; -const NetworkEngine::TransPrefix NetworkEngine::TransPrefix::PING = {"pn"}; -const NetworkEngine::TransPrefix NetworkEngine::TransPrefix::FIND_NODE = {"fn"}; -const NetworkEngine::TransPrefix NetworkEngine::TransPrefix::GET_VALUES = {"gt"}; -const NetworkEngine::TransPrefix NetworkEngine::TransPrefix::ANNOUNCE_VALUES = {"pt"}; -const NetworkEngine::TransPrefix NetworkEngine::TransPrefix::REFRESH = {"rf"}; -const NetworkEngine::TransPrefix NetworkEngine::TransPrefix::LISTEN = {"lt"}; +const TransPrefix TransPrefix::PING = {"pn"}; +const TransPrefix TransPrefix::FIND_NODE = {"fn"}; +const TransPrefix TransPrefix::GET_VALUES = {"gt"}; +const TransPrefix TransPrefix::ANNOUNCE_VALUES = {"pt"}; +const TransPrefix TransPrefix::REFRESH = {"rf"}; +const TransPrefix TransPrefix::LISTEN = {"lt"}; constexpr long unsigned NetworkEngine::MAX_REQUESTS_PER_SEC; static const uint8_t v4prefix[16] = { @@ -1419,4 +1420,5 @@ ParsedMessage::complete() } -} +} /* namespace net */ +} /* namespace dht */ diff --git a/src/node.cpp b/src/node.cpp index 3476b8c66d1371c7423ee4f8c6e588cf3c0e5da2..7e68134d32c09dd952d929bcb0c2ab3cd218ad4e 100644 --- a/src/node.cpp +++ b/src/node.cpp @@ -72,7 +72,7 @@ Node::update(const SockAddr& new_addr) /** To be called when a message was sent to the node */ void -Node::requested(std::shared_ptr<Request>& req) +Node::requested(std::shared_ptr<net::Request>& req) { requests_.emplace_back(req); } @@ -80,7 +80,7 @@ Node::requested(std::shared_ptr<Request>& req) /** To be called when a message was received from the node. Req should be true if the message was an aswer to a request we made*/ void -Node::received(time_point now, std::shared_ptr<Request> req) +Node::received(time_point now, std::shared_ptr<net::Request> req) { time = now; if (req) {