From b09e3df2e2e1096b88e4fe3941a7f30130f080c3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrien=20B=C3=A9raud?= <adrien.beraud@savoirfairelinux.com> Date: Tue, 21 Jun 2016 02:28:14 -0400 Subject: [PATCH] add network id to distinguish different networks --- include/opendht/callbacks.h | 23 ++++++++++- include/opendht/dhtrunner.h | 5 ++- include/opendht/network_engine.h | 5 ++- include/opendht/routing_table.h | 2 + include/opendht/utils.h | 3 +- src/dht.cpp | 2 +- src/network_engine.cpp | 67 +++++++++++++++++++++++++------- tools/dhtchat.cpp | 2 +- tools/dhtnode.cpp | 2 +- tools/dhtscanner.cpp | 2 +- tools/tools_common.h | 7 +++- 11 files changed, 93 insertions(+), 27 deletions(-) diff --git a/include/opendht/callbacks.h b/include/opendht/callbacks.h index 7f09eb33..5318c08c 100644 --- a/include/opendht/callbacks.h +++ b/include/opendht/callbacks.h @@ -30,23 +30,44 @@ namespace dht { struct Node; +/** + * Current status of a DHT node. + */ enum class NodeStatus { Disconnected, // 0 nodes Connecting, // 1+ nodes Connected // 1+ good nodes }; +/** + * Dht configuration. + */ struct Config { + /** DHT node ID */ InfoHash node_id; + + /** + * DHT network ID. A node will only talk with other nodes having + * the same network ID. + * Network ID 0 (default) represents the main public network. + */ + NetId network; + + /** For testing purposes only, enables bootstrap mode */ bool is_bootstrap; }; +/** + * SecureDht configuration. + */ struct SecureDhtConfig { Config node_config; crypto::Identity id; }; +static constexpr size_t DEFAULT_STORAGE_LIMIT {1024 * 1024 * 64}; + using ValuesExport = std::pair<InfoHash, Blob>; using GetCallback = std::function<bool(const std::vector<std::shared_ptr<Value>>& values)>; @@ -57,8 +78,6 @@ using CertificateStoreQuery = std::function<std::vector<std::shared_ptr<crypto:: typedef bool (*GetCallbackRaw)(std::shared_ptr<Value>, void *user_data); -static constexpr size_t DEFAULT_STORAGE_LIMIT {1024 * 1024 * 64}; - GetCallbackSimple bindGetCb(GetCallbackRaw raw_cb, void* user_data); GetCallback bindGetCb(GetCallbackSimple cb); diff --git a/include/opendht/dhtrunner.h b/include/opendht/dhtrunner.h index dc342bf9..6ee06ed0 100644 --- a/include/opendht/dhtrunner.h +++ b/include/opendht/dhtrunner.h @@ -268,12 +268,13 @@ public: * @param threaded: If false, ::loop() must be called periodically. Otherwise a thread is launched. * @param cb: Optional callback to receive general state information. */ - void run(in_port_t port, const crypto::Identity identity, bool threaded = false, bool is_bootstrap = false) { + void run(in_port_t port, const crypto::Identity identity, bool threaded = false, NetId network = 0) { run(port, { /*.dht_config = */{ /*.node_config = */{ /*.node_id = */{}, - /*.is_bootstrap = */is_bootstrap + /*.network = */network, + /*.is_bootstrap = */false }, /*.id = */identity }, diff --git a/include/opendht/network_engine.h b/include/opendht/network_engine.h index 3a37354e..3d336c35 100644 --- a/include/opendht/network_engine.h +++ b/include/opendht/network_engine.h @@ -256,7 +256,7 @@ public: using RequestExpiredCb = std::function<void(const Request&, bool)>; NetworkEngine(Logger& log, Scheduler& scheduler) : myid(zeroes), DHT_LOG(log), scheduler(scheduler) {} - NetworkEngine(InfoHash& myid, int s, int s6, Logger& log, Scheduler& scheduler, + NetworkEngine(InfoHash& myid, NetId net, int s, int s6, Logger& log, Scheduler& scheduler, decltype(NetworkEngine::onError) onError, decltype(NetworkEngine::onNewNode) onNewNode, decltype(NetworkEngine::onReportedAddr) onReportedAddr, @@ -266,7 +266,7 @@ public: decltype(NetworkEngine::onListen) onListen, decltype(NetworkEngine::onAnnounce) onAnnounce) : onError(onError), onNewNode(onNewNode), onReportedAddr(onReportedAddr), onPing(onPing), onFindNode(onFindNode), - onGetValues(onGetValues), onListen(onListen), onAnnounce(onAnnounce), myid(myid), + onGetValues(onGetValues), onListen(onListen), onAnnounce(onAnnounce), myid(myid), network(net), dht_socket(s), dht_socket6(s6), DHT_LOG(log), scheduler(scheduler) { transaction_id = std::uniform_int_distribution<decltype(transaction_id)>{1}(rd_device); @@ -376,6 +376,7 @@ private: /* DHT info */ const InfoHash& myid; + const NetId network {0}; const int dht_socket {-1}; const int dht_socket6 {-1}; const Logger& DHT_LOG; diff --git a/include/opendht/routing_table.h b/include/opendht/routing_table.h index 5823999f..573d8680 100644 --- a/include/opendht/routing_table.h +++ b/include/opendht/routing_table.h @@ -23,6 +23,8 @@ namespace dht { +static constexpr unsigned TARGET_NODES {8}; + struct Bucket { Bucket() : cached() {} Bucket(sa_family_t af, const InfoHash& f = {}, time_point t = time_point::min()) diff --git a/include/opendht/utils.h b/include/opendht/utils.h index 2a0ca686..a5f676c0 100644 --- a/include/opendht/utils.h +++ b/include/opendht/utils.h @@ -40,9 +40,8 @@ namespace dht { -static constexpr unsigned TARGET_NODES {8}; - using Address = std::pair<sockaddr_storage, socklen_t>; +using NetId = uint32_t; using want_t = int_fast8_t; std::string print_addr(const sockaddr* sa, socklen_t slen); diff --git a/src/dht.cpp b/src/dht.cpp index f38b365e..590bea80 100644 --- a/src/dht.cpp +++ b/src/dht.cpp @@ -2102,7 +2102,7 @@ Dht::Dht() : store(), network_engine(DHT_LOG, scheduler) {} Dht::Dht(int s, int s6, Config config) : myid(config.node_id), is_bootstrap(config.is_bootstrap), store(), - network_engine(myid, s, s6, DHT_LOG, scheduler, + network_engine(myid, config.network, s, s6, DHT_LOG, scheduler, std::bind(&Dht::onError, this, _1, _2), std::bind(&Dht::onNewNode, this, _1, _2), std::bind(&Dht::onReportedAddr, this, _1, _2, _3), diff --git a/src/network_engine.cpp b/src/network_engine.cpp index e24a1ddf..15f2431c 100644 --- a/src/network_engine.cpp +++ b/src/network_engine.cpp @@ -49,6 +49,7 @@ static const uint8_t v4prefix[16] = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xFF, 0xFF, 0, 0, 0, 0 }; +constexpr unsigned SEND_NODES {8}; enum class MessageType { Error = 0, @@ -63,6 +64,7 @@ enum class MessageType { struct ParsedMessage { MessageType type; InfoHash id; /* the id of the sender */ + NetId network {0}; /* network id */ InfoHash info_hash; /* hash for which values are requested */ InfoHash target; /* target id around which to find nodes */ NetworkEngine::TransId tid; /* transaction id */ @@ -293,6 +295,11 @@ NetworkEngine::processMessage(const uint8_t *buf, size_t buflen, const sockaddr* return; } + if (msg.network != network) { + DHT_LOG.DEBUG("Received message from other network %u.", msg.network); + return; + } + if (msg.id == myid || msg.id == zeroes) { DHT_LOG.DEBUG("Received message from self."); return; @@ -451,9 +458,10 @@ packToken(msgpack::packer<msgpack::sbuffer>& pk, Blob token) } void -insertAddr(msgpack::packer<msgpack::sbuffer>& pk, const sockaddr *sa, socklen_t) +insertAddr(msgpack::packer<msgpack::sbuffer>& pk, const sockaddr *sa, socklen_t sa_len) { - size_t addr_len = (sa->sa_family == AF_INET) ? sizeof(in_addr) : sizeof(in6_addr); + size_t addr_len = std::min<size_t>(sa_len, + (sa->sa_family == AF_INET) ? sizeof(in_addr) : sizeof(in6_addr)); void* addr_ptr = (sa->sa_family == AF_INET) ? (void*)&((sockaddr_in*)sa)->sin_addr : (void*)&((sockaddr_in6*)sa)->sin6_addr; pk.pack("sa"); @@ -485,7 +493,7 @@ NetworkEngine::sendPing(std::shared_ptr<Node> node, RequestCb on_done, RequestEx auto tid = TransId {TransPrefix::PING, getNewTid()}; msgpack::sbuffer buffer; msgpack::packer<msgpack::sbuffer> pk(&buffer); - pk.pack_map(5); + pk.pack_map(5+(network?1:0)); pk.pack(std::string("a")); pk.pack_map(1); pk.pack(std::string("id")); pk.pack(myid); @@ -495,6 +503,9 @@ NetworkEngine::sendPing(std::shared_ptr<Node> node, RequestCb on_done, RequestEx pk.pack_bin_body((const char*)tid.data(), tid.size()); pk.pack(std::string("y")); pk.pack(std::string("q")); pk.pack(std::string("v")); pk.pack(my_v); + if (network) { + pk.pack(std::string("n")); pk.pack(network); + } Blob b {buffer.data(), buffer.data() + buffer.size()}; std::shared_ptr<Request> req(new Request {tid.getTid(), node, std::move(b), @@ -519,7 +530,7 @@ void NetworkEngine::sendPong(const sockaddr* sa, socklen_t salen, TransId tid) { msgpack::sbuffer buffer; msgpack::packer<msgpack::sbuffer> pk(&buffer); - pk.pack_map(4); + pk.pack_map(4+(network?1:0)); pk.pack(std::string("r")); pk.pack_map(2); pk.pack(std::string("id")); pk.pack(myid); @@ -529,6 +540,9 @@ NetworkEngine::sendPong(const sockaddr* sa, socklen_t salen, TransId tid) { pk.pack_bin_body((const char*)tid.data(), tid.size()); pk.pack(std::string("y")); pk.pack(std::string("r")); pk.pack(std::string("v")); pk.pack(my_v); + if (network) { + pk.pack(std::string("n")); pk.pack(network); + } send(buffer.data(), buffer.size(), 0, sa, salen); } @@ -539,7 +553,7 @@ NetworkEngine::sendFindNode(std::shared_ptr<Node> n, const InfoHash& target, wan auto tid = TransId {TransPrefix::FIND_NODE, getNewTid()}; msgpack::sbuffer buffer; msgpack::packer<msgpack::sbuffer> pk(&buffer); - pk.pack_map(5); + pk.pack_map(5+(network?1:0)); pk.pack(std::string("a")); pk.pack_map(2 + (want>0?1:0)); pk.pack(std::string("id")); pk.pack(myid); @@ -556,7 +570,9 @@ NetworkEngine::sendFindNode(std::shared_ptr<Node> n, const InfoHash& target, wan pk.pack_bin_body((const char*)tid.data(), tid.size()); pk.pack(std::string("y")); pk.pack(std::string("q")); pk.pack(std::string("v")); pk.pack(my_v); - + if (network) { + pk.pack(std::string("n")); pk.pack(network); + } Blob b {buffer.data(), buffer.data() + buffer.size()}; std::shared_ptr<Request> req(new Request {tid.getTid(), n, std::move(b), @@ -583,7 +599,7 @@ NetworkEngine::sendGetValues(std::shared_ptr<Node> n, const InfoHash& info_hash, auto tid = TransId {TransPrefix::GET_VALUES, getNewTid()}; msgpack::sbuffer buffer; msgpack::packer<msgpack::sbuffer> pk(&buffer); - pk.pack_map(5); + pk.pack_map(5+(network?1:0)); pk.pack(std::string("a")); pk.pack_map(2 + (want>0?1:0)); pk.pack(std::string("id")); pk.pack(myid); @@ -600,6 +616,9 @@ NetworkEngine::sendGetValues(std::shared_ptr<Node> n, const InfoHash& info_hash, pk.pack_bin_body((const char*)tid.data(), tid.size()); pk.pack(std::string("y")); pk.pack(std::string("q")); pk.pack(std::string("v")); pk.pack(my_v); + if (network) { + pk.pack(std::string("n")); pk.pack(network); + } Blob b {buffer.data(), buffer.data() + buffer.size()}; std::shared_ptr<Request> req(new Request {tid.getTid(), n, std::move(b), @@ -664,7 +683,7 @@ NetworkEngine::sendNodesValues(const sockaddr* sa, socklen_t salen, TransId tid, const std::vector<std::shared_ptr<Value>>& st, const Blob& token) { msgpack::sbuffer buffer; msgpack::packer<msgpack::sbuffer> pk(&buffer); - pk.pack_map(4); + pk.pack_map(4+(network?1:0)); pk.pack(std::string("r")); pk.pack_map(2 + (not st.empty()?1:0) + (nodes.size()>0?1:0) + (nodes6.size()>0?1:0) + (not token.empty()?1:0)); @@ -715,6 +734,9 @@ NetworkEngine::sendNodesValues(const sockaddr* sa, socklen_t salen, TransId tid, pk.pack_bin_body((const char*)tid.data(), tid.size()); pk.pack(std::string("y")); pk.pack(std::string("r")); pk.pack(std::string("v")); pk.pack(my_v); + if (network) { + pk.pack(std::string("n")); pk.pack(network); + } send(buffer.data(), buffer.size(), 0, sa, salen); } @@ -725,7 +747,7 @@ NetworkEngine::bufferNodes(sa_family_t af, const InfoHash& id, std::vector<std:: std::sort(nodes.begin(), nodes.end(), [&](const std::shared_ptr<Node>& a, const std::shared_ptr<Node>& b){ return id.xorCmp(a->id, b->id) < 0; }); - size_t nnode = std::min<size_t>(TARGET_NODES, nodes.size()); + size_t nnode = std::min<size_t>(SEND_NODES, nodes.size()); Blob bnodes; if (af == AF_INET) { bnodes.resize(NODE4_INFO_BUF_LEN * nnode); @@ -777,7 +799,7 @@ NetworkEngine::sendListen(std::shared_ptr<Node> n, const InfoHash& infohash, con auto tid = TransId {TransPrefix::LISTEN, getNewTid()}; msgpack::sbuffer buffer; msgpack::packer<msgpack::sbuffer> pk(&buffer); - pk.pack_map(5); + pk.pack_map(5+(network?1:0)); pk.pack(std::string("a")); pk.pack_map(3); pk.pack(std::string("id")); pk.pack(myid); @@ -789,7 +811,9 @@ NetworkEngine::sendListen(std::shared_ptr<Node> n, const InfoHash& infohash, con pk.pack_bin_body((const char*)tid.data(), tid.size()); pk.pack(std::string("y")); pk.pack(std::string("q")); pk.pack(std::string("v")); pk.pack(my_v); - + if (network) { + pk.pack(std::string("n")); pk.pack(network); + } Blob b {buffer.data(), buffer.data() + buffer.size()}; std::shared_ptr<Request> req(new Request {tid.getTid(), n, std::move(b), @@ -812,7 +836,7 @@ void NetworkEngine::sendListenConfirmation(const sockaddr* sa, socklen_t salen, TransId tid) { msgpack::sbuffer buffer; msgpack::packer<msgpack::sbuffer> pk(&buffer); - pk.pack_map(4); + pk.pack_map(4+(network?1:0)); pk.pack(std::string("r")); pk.pack_map(2); pk.pack(std::string("id")); pk.pack(myid); @@ -822,6 +846,9 @@ NetworkEngine::sendListenConfirmation(const sockaddr* sa, socklen_t salen, Trans pk.pack_bin_body((const char*)tid.data(), tid.size()); pk.pack(std::string("y")); pk.pack(std::string("r")); pk.pack(std::string("v")); pk.pack(my_v); + if (network) { + pk.pack(std::string("n")); pk.pack(network); + } send(buffer.data(), buffer.size(), 0, sa, salen); } @@ -832,7 +859,7 @@ NetworkEngine::sendAnnounceValue(std::shared_ptr<Node> n, const InfoHash& infoha auto tid = TransId {TransPrefix::ANNOUNCE_VALUES, getNewTid()}; msgpack::sbuffer buffer; msgpack::packer<msgpack::sbuffer> pk(&buffer); - pk.pack_map(5); + pk.pack_map(5+(network?1:0)); pk.pack(std::string("a")); pk.pack_map((created < scheduler.time() ? 5 : 4)); pk.pack(std::string("id")); pk.pack(myid); @@ -849,6 +876,9 @@ NetworkEngine::sendAnnounceValue(std::shared_ptr<Node> n, const InfoHash& infoha pk.pack_bin_body((const char*)tid.data(), tid.size()); pk.pack(std::string("y")); pk.pack(std::string("q")); pk.pack(std::string("v")); pk.pack(my_v); + if (network) { + pk.pack(std::string("n")); pk.pack(network); + } Blob b {buffer.data(), buffer.data() + buffer.size()}; std::shared_ptr<Request> req(new Request {tid.getTid(), n, std::move(b), @@ -878,7 +908,7 @@ void NetworkEngine::sendValueAnnounced(const sockaddr* sa, socklen_t salen, TransId tid, Value::Id vid) { msgpack::sbuffer buffer; msgpack::packer<msgpack::sbuffer> pk(&buffer); - pk.pack_map(4); + pk.pack_map(4+(network?1:0)); pk.pack(std::string("r")); pk.pack_map(3); pk.pack(std::string("id")); pk.pack(myid); @@ -889,6 +919,9 @@ NetworkEngine::sendValueAnnounced(const sockaddr* sa, socklen_t salen, TransId t pk.pack_bin_body((const char*)tid.data(), tid.size()); pk.pack(std::string("y")); pk.pack(std::string("r")); pk.pack(std::string("v")); pk.pack(my_v); + if (network) { + pk.pack(std::string("n")); pk.pack(network); + } send(buffer.data(), buffer.size(), 0, sa, salen); } @@ -917,6 +950,9 @@ NetworkEngine::sendError(const sockaddr* sa, pk.pack_bin_body((const char*)tid.data(), tid.size()); pk.pack(std::string("y")); pk.pack(std::string("e")); pk.pack(std::string("v")); pk.pack(my_v); + if (network) { + pk.pack(std::string("n")); pk.pack(network); + } send(buffer.data(), buffer.size(), 0, sa, salen); } @@ -960,6 +996,9 @@ ParsedMessage::msgpack_unpack(msgpack::object msg) error_code = e->via.array.ptr[0].as<uint16_t>(); } + if (auto netid = findMapValue(msg, "n")) + network = netid->as<NetId>(); + if (auto rid = findMapValue(req, "id")) id = {*rid}; diff --git a/tools/dhtchat.cpp b/tools/dhtchat.cpp index 359c0630..981b551d 100644 --- a/tools/dhtchat.cpp +++ b/tools/dhtchat.cpp @@ -53,7 +53,7 @@ main(int argc, char **argv) throw std::runtime_error(std::string("Error initializing GnuTLS: ")+gnutls_strerror(rc)); DhtRunner dht; - dht.run(params.port, dht::crypto::generateIdentity("DHT Chat Node"), true); + dht.run(params.port, dht::crypto::generateIdentity("DHT Chat Node"), true, params.network); if (not params.bootstrap.first.empty()) dht.bootstrap(params.bootstrap.first.c_str(), params.bootstrap.second.c_str()); diff --git a/tools/dhtnode.cpp b/tools/dhtnode.cpp index f7acaa4a..49ddef7e 100644 --- a/tools/dhtnode.cpp +++ b/tools/dhtnode.cpp @@ -256,7 +256,7 @@ main(int argc, char **argv) crt = dht::crypto::generateIdentity("DHT Node", ca_tmp); } - dht.run(params.port, crt, true, params.is_bootstrap_node); + dht.run(params.port, crt, true, params.network); if (params.log) { if (not params.logfile.empty()) diff --git a/tools/dhtscanner.cpp b/tools/dhtscanner.cpp index eb323781..f93c31e7 100644 --- a/tools/dhtscanner.cpp +++ b/tools/dhtscanner.cpp @@ -80,7 +80,7 @@ main(int argc, char **argv) auto crt_tmp = dht::crypto::generateIdentity("Scanner node", ca_tmp); DhtRunner dht; - dht.run(params.port, crt_tmp, true, [](dht::NodeStatus /* ipv4 */, dht::NodeStatus /* ipv6 */) {}); + dht.run(params.port, crt_tmp, true, params.network); if (not params.bootstrap.first.empty()) dht.bootstrap(params.bootstrap.first.c_str(), params.bootstrap.second.c_str()); diff --git a/tools/tools_common.h b/tools/tools_common.h index 144bd521..84347082 100644 --- a/tools/tools_common.h +++ b/tools/tools_common.h @@ -142,6 +142,7 @@ struct dht_params { bool log {false}; std::string logfile {}; in_port_t port {0}; + dht::NetId network {0}; bool is_bootstrap_node {false}; bool generate_identity {false}; bool daemonize {false}; @@ -151,6 +152,7 @@ struct dht_params { static const constexpr struct option long_options[] = { {"help", no_argument, nullptr, 'h'}, {"port", required_argument, nullptr, 'p'}, + {"net", required_argument, nullptr, 'n'}, {"bootstrap", optional_argument, nullptr, 'b'}, {"identity", no_argument , nullptr, 'i'}, {"verbose", no_argument , nullptr, 'v'}, @@ -162,7 +164,7 @@ dht_params parseArgs(int argc, char **argv) { dht_params params; int opt; - while ((opt = getopt_long(argc, argv, ":hidv:p:b:", long_options, nullptr)) != -1) { + while ((opt = getopt_long(argc, argv, ":hidv:p:n:b:", long_options, nullptr)) != -1) { switch (opt) { case 'p': { int port_arg = atoi(optarg); @@ -172,6 +174,9 @@ parseArgs(int argc, char **argv) { std::cout << "Invalid port: " << port_arg << std::endl; } break; + case 'n': + params.network = strtoul(optarg, nullptr, 0); + break; case 'b': if (optarg) { params.bootstrap = splitPort((optarg[0] == '=') ? optarg+1 : optarg); -- GitLab