diff --git a/include/opendht/dhtrunner.h b/include/opendht/dhtrunner.h index 4324c06b624ac5d6b629f3eac09b22155745a297..41fbd98022ebe22d59af6e1241308de8c411ca45 100644 --- a/include/opendht/dhtrunner.h +++ b/include/opendht/dhtrunner.h @@ -389,7 +389,7 @@ public: * @param threaded: If false, loop() must be called periodically. Otherwise a thread is launched. * @param cb: Optional callback to receive general state information. */ - void run(const SockAddr& local4, const SockAddr& local6, const Config& config, Context&& context = {}); + void run(SockAddr& local4, SockAddr& local6, const Config& config, Context&& context = {}); /** * Same as @run(sockaddr_in, sockaddr_in6, Identity, bool, StatusCallback), but with string IP addresses and service (port). diff --git a/src/dht.cpp b/src/dht.cpp index 9fcfb9a1915fa640e1473831e0dc29b5a0622aa7..f59041b935bb281a6a33b58948fe0add167d60b4 100644 --- a/src/dht.cpp +++ b/src/dht.cpp @@ -1785,13 +1785,13 @@ Dht::Dht(std::unique_ptr<net::DatagramSocket>&& sock, const Config& config, cons secret = std::uniform_int_distribution<uint64_t>{}(rd); rotateSecrets(); + if (not persistPath.empty()) + loadState(persistPath); + expire(); if (logger_) logger_->d("DHT node initialised with ID %s", myid.toString().c_str()); - - if (not persistPath.empty()) - loadState(persistPath); } bool @@ -2070,13 +2070,13 @@ Dht::importValues(const std::vector<ValuesExport>& import) { const auto& now = scheduler.time(); - for (const auto& node : import) { - if (node.second.empty()) + for (const auto& value : import) { + if (value.second.empty()) continue; try { msgpack::unpacked msg; - msgpack::unpack(msg, (const char*)node.second.data(), node.second.size()); + msgpack::unpack(msg, (const char*)value.second.data(), value.second.size()); auto valarr = msg.get(); if (valarr.type != msgpack::type::ARRAY) throw msgpack::type_error(); @@ -2091,15 +2091,15 @@ Dht::importValues(const std::vector<ValuesExport>& import) tmp_val.msgpack_unpack(valel.via.array.ptr[1]); } catch (const std::exception&) { if (logger_) - logger_->e(node.first, "Error reading value at %s", node.first.toString().c_str()); + logger_->e(value.first, "Error reading value at %s", value.first.toString().c_str()); continue; } val_time = std::min(val_time, now); - storageStore(node.first, std::make_shared<Value>(std::move(tmp_val)), val_time); + storageStore(value.first, std::make_shared<Value>(std::move(tmp_val)), val_time); } } catch (const std::exception&) { if (logger_) - logger_->e(node.first, "Error reading values at %s", node.first.toString().c_str()); + logger_->e(value.first, "Error reading values at %s", value.first.toString().c_str()); continue; } } @@ -2504,12 +2504,24 @@ Dht::onAnnounceDone(const Sp<Node>& node, net::RequestAnswer& answer, Sp<Search> } +struct DhtState { + unsigned v {1}; + InfoHash id; + std::vector<NodeExport> nodes; + std::vector<ValuesExport> values; + + MSGPACK_DEFINE_MAP(v, id, nodes, values) +}; + void Dht::saveState(const std::string& path) const { + DhtState state; + state.id = myid; + state.nodes = exportNodes(); + state.values = exportValues(); std::ofstream file(path); - msgpack::pack(file, exportNodes()); - msgpack::pack(file, exportValues()); + msgpack::pack(file, state); } void @@ -2535,19 +2547,15 @@ Dht::loadState(const std::string& path) // Import nodes msgpack::object_handle oh; if (pac.next(oh)) { - { - auto imported_nodes = oh.get().as<std::vector<NodeExport>>(); - if (logger_) - logger_->d("Importing %zu nodes", imported_nodes.size()); - for (const auto& node : imported_nodes) - insertNode(node); - } - if (pac.next(oh)) { - auto imported_values = oh.get().as<std::vector<ValuesExport>>(); - if (logger_) - logger_->d("Importing %zu values", imported_values.size()); - importValues(imported_values); - } + auto state = oh.get().as<DhtState>(); + if (logger_) + logger_->d("Importing %zu nodes", state.nodes.size()); + myid = state.id; + std::vector<Sp<Node>> tmpNodes; + tmpNodes.reserve(state.nodes.size()); + for (const auto& node : state.nodes) + tmpNodes.emplace_back(network_engine.insertNode(node.id, SockAddr(node.ss, node.sslen))); + importValues(state.values); } } catch (const std::exception& e) { if (logger_) diff --git a/src/dhtrunner.cpp b/src/dhtrunner.cpp index f52f7c18990791275c815644f2e1ee64d1de7588..2ece5e217936000d9d1afcd2304d6a88d6dcb96d 100644 --- a/src/dhtrunner.cpp +++ b/src/dhtrunner.cpp @@ -32,6 +32,8 @@ #include "dht_proxy_client.h" #endif +#include <fstream> + namespace dht { constexpr std::chrono::seconds DhtRunner::BOOTSTRAP_PERIOD; @@ -90,16 +92,46 @@ DhtRunner::run(const char* ip4, const char* ip6, const char* service, const Conf { auto res4 = SockAddr::resolve(ip4, service); auto res6 = SockAddr::resolve(ip6, service); - run(res4.empty() ? SockAddr() : res4.front(), - res6.empty() ? SockAddr() : res6.front(), config, std::move(context)); + if (res4.empty()) + res4.emplace_back(); + if (res6.empty()) + res6.emplace_back(); + run(res4.front(), res6.front(), config, std::move(context)); } void -DhtRunner::run(const SockAddr& local4, const SockAddr& local6, const Config& config, Context&& context) +DhtRunner::run(SockAddr& local4, SockAddr& local6, const Config& config, Context&& context) { if (running == State::Idle) { + auto state_path = config.dht_config.node_config.persist_path; + if (not state_path.empty()) { + state_path += "_port.txt"; + std::ifstream inConfig(state_path); + if (inConfig.is_open()) { + in_port_t port; + if (inConfig >> port) { + if (context.logger) + context.logger->d("[runner %p] Using IPv4 port %hu from saved configuration", this, port); + if (local4.getPort() == 0) + local4.setPort(port); + } + if (inConfig >> port) { + if (context.logger) + context.logger->d("[runner %p] Using IPv6 port %hu from saved configuration", this, port); + if (local6.getPort() == 0) + local6.setPort(port); + } + } + } + if (not context.sock) context.sock.reset(new net::UdpSocket(local4, local6, context.logger)); + + if (not state_path.empty()) { + std::ofstream outConfig(state_path); + outConfig << context.sock->getBoundRef(AF_INET).getPort() << std::endl; + outConfig << context.sock->getBoundRef(AF_INET6).getPort() << std::endl; + } run(config, std::move(context)); } }