diff --git a/include/opendht/dht.h b/include/opendht/dht.h index 67d4c9b954f207e04821ef583bd33d2acfde7631..09bb0f56bb1c4f26246acb1b692e774ae943d56a 100644 --- a/include/opendht/dht.h +++ b/include/opendht/dht.h @@ -58,7 +58,7 @@ struct LocalListener; * Must be given open UDP sockets and ::periodic must be * called regularly. */ -class OPENDHT_PUBLIC Dht : public DhtInterface { +class OPENDHT_PUBLIC Dht final : public DhtInterface { public: Dht(); @@ -71,7 +71,9 @@ public: virtual ~Dht(); - virtual void start(const std::string& ) {}; +#if OPENDHT_PROXY_CLIENT + void startProxy(const std::string&) {}; +#endif /** * Get the ID of the node. diff --git a/include/opendht/dht_interface.h b/include/opendht/dht_interface.h index b0b683b5932b0ecd6f2eb980f70ccb80e2b2c7b4..d981745326c6fa51f5861fb5132cf27fe9ca0a4f 100644 --- a/include/opendht/dht_interface.h +++ b/include/opendht/dht_interface.h @@ -28,7 +28,9 @@ public: DhtInterface() = default; virtual ~DhtInterface() = default; - virtual void start(const std::string& host) = 0; +#if OPENDHT_PROXY_CLIENT + virtual void startProxy(const std::string& host) = 0; +#endif // [[deprecated]] using Status = NodeStatus; diff --git a/include/opendht/dht_proxy_client.h b/include/opendht/dht_proxy_client.h index 79f63026ecf53b41fad2591e3e7b8a8265d9b76a..119aecd5c642487c96a48162b73530e01fe6f735 100644 --- a/include/opendht/dht_proxy_client.h +++ b/include/opendht/dht_proxy_client.h @@ -16,11 +16,13 @@ * along with this program. If not, see <https://www.gnu.org/licenses/>. */ -#if OPENDHT_PROXY_SERVER +#if OPENDHT_PROXY_CLIENT #pragma once +#include <functional> #include <thread> +#include <mutex> #include "callbacks.h" #include "def.h" @@ -34,7 +36,7 @@ namespace restbed namespace dht { -class OPENDHT_PUBLIC DhtProxyClient : public DhtInterface { +class OPENDHT_PUBLIC DhtProxyClient final : public DhtInterface { public: DhtProxyClient() : scheduler(DHT_LOG) {} @@ -44,7 +46,12 @@ public: * and an ID for the node. */ explicit DhtProxyClient(const std::string& serverHost); - void start(const std::string& serverHost); + /** + * Start the connection with a server. + * @param serverHost the server address + */ + void startProxy(const std::string& serverHost); + virtual ~DhtProxyClient(); /** @@ -156,6 +163,10 @@ public: return listen(key, bindGetCb(cb), std::forward<Value::Filter>(f), std::forward<Where>(w)); } + time_point periodic(const uint8_t*, size_t, const SockAddr&); + time_point periodic(const uint8_t *buf, size_t buflen, const sockaddr* from, socklen_t fromlen) { + return periodic(buf, buflen, SockAddr(from, fromlen)); + } /** * TODO @@ -201,78 +212,27 @@ public: * NOTE: The following methods will not be implemented because the * DhtProxyClient doesn't have any storage nor synchronization process */ - - /** - * Insert a node in the main routing table. - * The node is not pinged, so this should be - * used to bootstrap efficiently from previously known nodes. - */ void insertNode(const InfoHash&, const SockAddr&) { } void insertNode(const InfoHash&, const sockaddr*, socklen_t) { } void insertNode(const NodeExport&) { } - - /** - * Returns the total memory usage of stored values and the number - * of stored values. - */ std::pair<size_t, size_t> getStoreSize() const { return {}; } - virtual void registerType(const ValueType&) { } const ValueType& getType(ValueType::Id) const { } - - /** - * Get locally stored data for the given hash. - */ std::vector<Sp<Value>> getLocal(const InfoHash&, Value::Filter) const { return {}; } - - /** - * Get locally stored data for the given key and value id. - */ Sp<Value> getLocalById(const InfoHash&, Value::Id) const { return {}; } - - /** - * Get the list of good nodes for local storage saving purposes - * The list is ordered to minimize the back-to-work delay. - */ std::vector<NodeExport> exportNodes() { return {}; } - std::vector<ValuesExport> exportValues() const { return {}; } void importValues(const std::vector<ValuesExport>&) {} - std::string getStorageLog() const { return {}; } std::string getStorageLog(const InfoHash&) const { return {}; } - std::string getRoutingTablesLog(sa_family_t) const { return {}; } std::string getSearchesLog(sa_family_t) const { return {}; } std::string getSearchLog(const InfoHash&, sa_family_t) const { return {}; } - void dumpTables() const {} std::vector<unsigned> getNodeMessageStats(bool) { return {}; } - - /** - * Set the in-memory storage limit in bytes - */ void setStorageLimit(size_t) {} - - /** - * Inform the DHT of lower-layer connectivity changes. - * This will cause the DHT to assume a public IP address change. - * The DHT will recontact neighbor nodes, re-register for listen ops etc. - */ void connectivityChanged(sa_family_t) {} - void connectivityChanged() { - connectivityChanged(AF_INET); - connectivityChanged(AF_INET6); - } - - time_point periodic(const uint8_t*, size_t, const SockAddr&) { - // The DhtProxyClient doesn't use NetworkEngine, so here, we have nothing to do for now. - scheduler.syncTime(); - return scheduler.run(); - } - time_point periodic(const uint8_t *buf, size_t buflen, const sockaddr* from, socklen_t fromlen) { - return periodic(buf, buflen, SockAddr(from, fromlen)); - } + void connectivityChanged() { } private: /** @@ -308,10 +268,12 @@ private: std::string key; GetCallback cb; Value::Filter filterChain; - std::unique_ptr<std::thread> thread; + std::thread thread; }; std::vector<Listener> listeners_; size_t listener_token_ {0}; + std::mutex lockListener_; + /** * Store current put and get requests. */ @@ -321,6 +283,12 @@ private: std::thread thread; }; std::vector<Operation> operations_; + /** + * Callbacks should be executed in the main thread. + */ + std::vector<std::function<void()>> callbacks_; + std::mutex lockCallbacks; + std::unique_ptr<std::mutex> lockCurrentProxyInfos_; Scheduler scheduler; /** @@ -338,7 +306,10 @@ private: */ void restartListeners(); - std::shared_ptr<Json::Value> currentProxyInfos_; + /** + * Store the current proxy status + */ + std::unique_ptr<Json::Value> currentProxyInfos_; }; } diff --git a/include/opendht/dht_proxy_server.h b/include/opendht/dht_proxy_server.h index d2eb6a3896072c03096c846bfd12380eb65c83f7..ade9b02a36824ed4b79461bdfca0ca50986a1212 100644 --- a/include/opendht/dht_proxy_server.h +++ b/include/opendht/dht_proxy_server.h @@ -26,6 +26,7 @@ #include <thread> #include <memory> +#include <mutex> #include <restbed> namespace dht { @@ -160,6 +161,7 @@ private: std::future<size_t> token; }; mutable std::vector<SessionToHashToken> currentListeners_; + mutable std::mutex lockListener_; std::atomic_bool stopListeners {false}; }; diff --git a/include/opendht/securedht.h b/include/opendht/securedht.h index 8678874bfcff60c441b43a7c853fb96c418c3040..cd67bff81293a7291e81cc08b65bb2f9d44220d9 100644 --- a/include/opendht/securedht.h +++ b/include/opendht/securedht.h @@ -29,7 +29,7 @@ namespace dht { -class OPENDHT_PUBLIC SecureDht : public DhtInterface { +class OPENDHT_PUBLIC SecureDht final : public DhtInterface { public: typedef std::function<void(bool)> SignatureCheckCallback; @@ -174,12 +174,6 @@ public: std::vector<ValuesExport> exportValues() const { return dht_->exportValues(); } - void setLoggers(LogMethod error = NOLOG, LogMethod warn = NOLOG, LogMethod debug = NOLOG) { - dht_->setLoggers(error, warn, debug); - } - void setLogFilter(const InfoHash& f) { - dht_->setLogFilter(f); - } void importValues(const std::vector<ValuesExport>& v) { dht_->importValues(v); } @@ -298,13 +292,15 @@ public: dht_->connectivityChanged(); } - void start(const std::string& host) { - dht_->start(host); +#if OPENDHT_PROXY_CLIENT + void startProxy(const std::string& host) { + dht_->startProxy(host); } +#endif #if OPENDHT_PROXY_SERVER void forwardAllMessages(bool forward) { - force_forward_ = forward; + forward_all_ = forward; } #endif //OPENDHT_PROXY_SERVER @@ -329,7 +325,7 @@ private: std::uniform_int_distribution<Value::Id> rand_id {}; #if OPENDHT_PROXY_SERVER - std::atomic_bool force_forward_ {false}; + std::atomic_bool forward_all_ {false}; #endif //OPENDHT_PROXY_SERVER }; diff --git a/src/dht_proxy_client.cpp b/src/dht_proxy_client.cpp index d60b21cfd2a39426263ba25e8b82d836f31b13ff..459fb49d8a49941daf08f36130c2f1a865835a96 100644 --- a/src/dht_proxy_client.cpp +++ b/src/dht_proxy_client.cpp @@ -16,7 +16,7 @@ * along with this program. If not, see <https://www.gnu.org/licenses/>. */ -#if OPENDHT_PROXY_SERVER +#if OPENDHT_PROXY_CLIENT #include "dht_proxy_client.h" @@ -24,19 +24,21 @@ #include <json/json.h> #include <restbed> #include <vector> -#include <signal.h> #include "dhtrunner.h" +#include <iostream> + constexpr const char* const HTTP_PROTO {"http://"}; namespace dht { DhtProxyClient::DhtProxyClient(const std::string& serverHost) -: serverHost_(serverHost), scheduler(DHT_LOG), currentProxyInfos_(new Json::Value()) +: serverHost_(serverHost), lockCurrentProxyInfos_(new std::mutex()), + scheduler(DHT_LOG), currentProxyInfos_(new Json::Value()) { if (!serverHost_.empty()) - start(serverHost_); + startProxy(serverHost_); } void @@ -56,7 +58,7 @@ DhtProxyClient::confirmProxy() } void -DhtProxyClient::start(const std::string& serverHost) +DhtProxyClient::startProxy(const std::string& serverHost) { serverHost_ = serverHost; if (serverHost_.empty()) return; @@ -97,14 +99,16 @@ DhtProxyClient::cancelAllOperations() void DhtProxyClient::cancelAllListeners() { + lockListener_.lock(); for (auto& listener: listeners_) { - if (listener.thread && listener.thread->joinable()) { + if (listener.thread.joinable()) { // Close connection to stop listener? if (listener.req) restbed::Http::close(listener.req); - listener.thread->join(); + listener.thread.join(); } } + lockListener_.unlock(); } void @@ -144,6 +148,20 @@ DhtProxyClient::isRunning(sa_family_t af) const } } +time_point +DhtProxyClient::periodic(const uint8_t*, size_t, const SockAddr&) +{ + // Exec all currently stored callbacks + scheduler.syncTime(); + if (!callbacks_.empty()) { + lockCallbacks.lock(); + for (auto& callback : callbacks_) + callback(); + callbacks_.clear(); + lockCallbacks.unlock(); + } + return scheduler.run(); +} void DhtProxyClient::get(const InfoHash& key, GetCallback cb, DoneCallback donecb, @@ -177,8 +195,13 @@ DhtProxyClient::get(const InfoHash& key, GetCallback cb, DoneCallback donecb, Json::Reader reader; if (reader.parse(body, json)) { auto value = std::make_shared<Value>(json); - if ((not filterChain or filterChain(*value)) && cb) - cb({value}); + if ((not filterChain or filterChain(*value)) && cb) { + lockCallbacks.lock(); + callbacks_.emplace_back([=](){ + cb({value}); + }); + lockCallbacks.unlock(); + } } else { *ok = false; } @@ -188,8 +211,13 @@ DhtProxyClient::get(const InfoHash& key, GetCallback cb, DoneCallback donecb, *ok = false; } }).wait(); - if (donecb) - donecb(*ok, {}); + if (donecb) { + lockCallbacks.lock(); + callbacks_.emplace_back([=](){ + donecb(*ok, {}); + }); + lockCallbacks.unlock(); + } if (!ok) { // Connection failed, update connectivity getConnectivityStatus(); @@ -217,7 +245,7 @@ DhtProxyClient::put(const InfoHash& key, Sp<Value> val, DoneCallback cb, time_po o.thread = std::move(std::thread([=](){ auto ok = std::make_shared<bool>(true); restbed::Http::async(req, - [this, val, ok](const std::shared_ptr<restbed::Request>& /*req*/, + [this, ok](const std::shared_ptr<restbed::Request>& /*req*/, const std::shared_ptr<restbed::Response>& reply) { auto code = reply->get_status_code(); @@ -229,17 +257,23 @@ DhtProxyClient::put(const InfoHash& key, Sp<Value> val, DoneCallback cb, time_po Json::Value json; Json::Reader reader; - if (reader.parse(body, json)) { - auto value = std::make_shared<Value>(json); - } else { + try { + if (!reader.parse(body, json)) + *ok = false; + } catch (...) { *ok = false; } } else { *ok = false; } }).wait(); - if (cb) - cb(*ok, {}); + if (cb) { + lockCallbacks.lock(); + callbacks_.emplace_back([=](){ + cb(*ok, {}); + }); + lockCallbacks.unlock(); + } if (!ok) { // Connection failed, update connectivity getConnectivityStatus(); @@ -251,7 +285,9 @@ DhtProxyClient::put(const InfoHash& key, Sp<Value> val, DoneCallback cb, time_po NodeStats DhtProxyClient::getNodesStats(sa_family_t af) const { + lockCurrentProxyInfos_->lock(); auto proxyInfos = *currentProxyInfos_; + lockCurrentProxyInfos_->unlock(); NodeStats stats {}; auto identifier = af == AF_INET6 ? "ipv6" : "ipv4"; try { @@ -279,33 +315,35 @@ DhtProxyClient::getProxyInfos() const reply->get_body(body); Json::Reader reader; + lockCurrentProxyInfos_->lock(); try { reader.parse(body, *currentProxyInfos_); } catch (...) { *currentProxyInfos_ = Json::Value(); } + lockCurrentProxyInfos_->unlock(); } else { + lockCurrentProxyInfos_->lock(); *currentProxyInfos_ = Json::Value(); + lockCurrentProxyInfos_->unlock(); } }).wait(); - return *currentProxyInfos_; + lockCurrentProxyInfos_->lock(); + auto result = *currentProxyInfos_; + lockCurrentProxyInfos_->unlock(); + return result; } std::vector<SockAddr> DhtProxyClient::getPublicAddress(sa_family_t family) { + lockCurrentProxyInfos_->lock(); auto proxyInfos = *currentProxyInfos_; + lockCurrentProxyInfos_->unlock(); // json["public_ip"] contains [ipv6:ipv4]:port or ipv4:port - if (!proxyInfos.isMember("public_ip")) { - return {}; - } auto public_ip = proxyInfos["public_ip"].asString(); - if (public_ip.length() < 2) { - return {}; - } - std::string ipv4Address = ""; - std::string ipv6Address = ""; - std::string port = ""; + if (!proxyInfos.isMember("public_ip") || (public_ip.length() < 2)) return {}; + std::string ipv4Address = "", ipv6Address = "", port = ""; if (public_ip[0] == '[') { // ipv6 complient auto endIp = public_ip.find(']'); @@ -349,7 +387,7 @@ DhtProxyClient::listen(const InfoHash& key, GetCallback cb, Value::Filter&& filt l.req = req; l.cb = cb; l.filterChain = std::move(filterChain); - l.thread = std::move(std::unique_ptr<std::thread>(new std::thread([=]() + l.thread = std::move(std::thread([=]() { auto settings = std::make_shared<restbed::Settings>(); std::chrono::milliseconds timeout(std::numeric_limits<int>::max()); @@ -372,8 +410,13 @@ DhtProxyClient::listen(const InfoHash& key, GetCallback cb, Value::Filter&& filt Json::Reader reader; if (reader.parse(body, json)) { auto value = std::make_shared<Value>(json); - if ((not filterChain or filterChain(*value)) && cb) - cb({value}); + if ((not filterChain or filterChain(*value)) && cb) { + lockCallbacks.lock(); + callbacks_.emplace_back([=](){ + cb({value}); + }); + lockCallbacks.unlock(); + } } } } catch (std::runtime_error&) { @@ -387,28 +430,32 @@ DhtProxyClient::listen(const InfoHash& key, GetCallback cb, Value::Filter&& filt }, settings).get(); getConnectivityStatus(); }) - )); + ); + lockListener_.lock(); listeners_.emplace_back(std::move(l)); + lockListener_.unlock(); return listener_token_; } bool DhtProxyClient::cancelListen(const InfoHash&, size_t token) { + lockListener_.lock(); for (auto it = listeners_.begin(); it != listeners_.end(); ++it) { auto& listener = *it; if (listener.token == token) { - if (listener.thread->joinable()) { + if (listener.thread.joinable()) { // Close connection to stop listener? if (listener.req) restbed::Http::close(listener.req); - if (listener.thread->joinable()) - listener.thread->join(); + listener.thread.join(); listeners_.erase(it); + lockListener_.unlock(); return true; } } } + lockListener_.unlock(); return false; } @@ -420,18 +467,12 @@ DhtProxyClient::getConnectivityStatus() try { auto goodIpv4 = static_cast<long>(proxyInfos["ipv4"]["good"].asLargestUInt()); auto dubiousIpv4 = static_cast<long>(proxyInfos["ipv4"]["dubious"].asLargestUInt()); - if (goodIpv4 + dubiousIpv4 > 0) { - statusIpv4_ = NodeStatus::Connected; - } else { - statusIpv4_ = NodeStatus::Disconnected; - } + statusIpv4_ = (goodIpv4 + dubiousIpv4 > 0) ? NodeStatus::Connected : NodeStatus::Disconnected; + auto goodIpv6 = static_cast<long>(proxyInfos["ipv6"]["good"].asLargestUInt()); auto dubiousIpv6 = static_cast<long>(proxyInfos["ipv6"]["dubious"].asLargestUInt()); - if (goodIpv6 + dubiousIpv6 > 0) { - statusIpv6_ = NodeStatus::Connected; - } else { - statusIpv6_ = NodeStatus::Disconnected; - } + statusIpv6_ = (goodIpv6 + dubiousIpv6 > 0) ? NodeStatus::Connected : NodeStatus::Disconnected; + myid = InfoHash(proxyInfos["node_id"].asString()); if (statusIpv4_ == NodeStatus::Disconnected && statusIpv6_ == NodeStatus::Disconnected) { const auto& now = scheduler.time(); @@ -448,9 +489,10 @@ DhtProxyClient::getConnectivityStatus() void DhtProxyClient::restartListeners() { + lockListener_.lock(); for (auto& listener: listeners_) { - if (listener.thread && listener.thread->joinable()) - listener.thread->join(); + if (listener.thread.joinable()) + listener.thread.join(); // Redo listen auto filterChain = listener.filterChain; auto cb = listener.cb; @@ -458,7 +500,7 @@ DhtProxyClient::restartListeners() auto req = std::make_shared<restbed::Request>(uri); req->set_method("LISTEN"); listener.req = req; - listener.thread = std::move(std::unique_ptr<std::thread>(new std::thread([this, filterChain, cb, req]() + listener.thread = std::move(std::thread([this, filterChain, cb, req]() { auto settings = std::make_shared<restbed::Settings>(); std::chrono::milliseconds timeout(std::numeric_limits<int>::max()); @@ -481,8 +523,13 @@ DhtProxyClient::restartListeners() Json::Reader reader; if (reader.parse(body, json)) { auto value = std::make_shared<Value>(json); - if ((not filterChain or filterChain(*value)) && cb) - cb({value}); + if ((not filterChain or filterChain(*value)) && cb) { + lockCallbacks.lock(); + callbacks_.emplace_back([=](){ + cb({value}); + }); + lockCallbacks.unlock(); + } } } } catch (std::runtime_error&) { @@ -496,8 +543,9 @@ DhtProxyClient::restartListeners() }, settings).get(); getConnectivityStatus(); }) - )); + ); } + lockListener_.unlock(); } diff --git a/src/dht_proxy_server.cpp b/src/dht_proxy_server.cpp index 058f25d9375ffe523616458a4f99f92c152895b9..580c8821b7e5f8fcef659753a72b1b76e116056b 100644 --- a/src/dht_proxy_server.cpp +++ b/src/dht_proxy_server.cpp @@ -82,6 +82,7 @@ DhtProxyServer::DhtProxyServer(std::shared_ptr<DhtRunner> dht, in_port_t port) std::this_thread::sleep_for(std::chrono::seconds(1)); } while (service_->is_up() && !stopListeners) { + lockListener_.lock(); auto listener = currentListeners_.begin(); while (listener != currentListeners_.end()) { if (dht_ && listener->session->is_closed()) { @@ -92,19 +93,22 @@ DhtProxyServer::DhtProxyServer(std::shared_ptr<DhtRunner> dht, in_port_t port) ++listener; } } + lockListener_.unlock(); std::this_thread::sleep_for(std::chrono::seconds(1)); } // Remove last listeners + lockListener_.lock(); auto listener = currentListeners_.begin(); while (listener != currentListeners_.end()) { if (dht_) { - dht_->cancelListen(listener->hash, std::move(listener->token)); + dht_->cancelListen(listener->hash, std::move(listener->token.get())); // Remove listener if unused listener = currentListeners_.erase(listener); } else { ++listener; } } + lockListener_.unlock(); }); dht->forwardAllMessages(true); @@ -119,11 +123,13 @@ void DhtProxyServer::stop() { service_->stop(); + lockListener_.lock(); auto listener = currentListeners_.begin(); while (listener != currentListeners_.end()) { listener->session->close(); ++ listener; } + lockListener_.unlock(); stopListeners = true; // listenThreads_ will stop because there is no more sessions if (listenThread_.joinable()) @@ -173,7 +179,11 @@ DhtProxyServer::get(const std::shared_ptr<restbed::Session>& session) const infoHash = InfoHash::get(hash); } s->yield(restbed::OK, "", [=]( const std::shared_ptr< restbed::Session > s) { - dht_->get(infoHash, [s](std::shared_ptr<Value> value) { + auto cacheSession = std::weak_ptr<restbed::Session>(s); + + dht_->get(infoHash, [cacheSession](std::shared_ptr<Value> value) { + auto s = cacheSession.lock(); + if (!s) return false; // Send values as soon as we get them Json::FastWriter writer; s->yield(writer.write(value->toJson()), [](const std::shared_ptr<restbed::Session> /*session*/){ }); @@ -229,7 +239,9 @@ DhtProxyServer::listen(const std::shared_ptr<restbed::Session>& session) const } return !s->is_closed(); })); + lockListener_.lock(); currentListeners_.emplace_back(std::move(listener)); + lockListener_.unlock(); } else { session->close(restbed::SERVICE_UNAVAILABLE, "{\"err\":\"Incorrect DhtRunner\"}"); } @@ -268,9 +280,11 @@ DhtProxyServer::put(const std::shared_ptr<restbed::Session>& session) const dht_->put(infoHash, value, [s, value](bool ok) { if (ok) { Json::FastWriter writer; - s->close(restbed::OK, writer.write(value->toJson())); + if (s->is_open()) + s->close(restbed::OK, writer.write(value->toJson())); } else { - s->close(restbed::BAD_GATEWAY, "{\"err\":\"put failed\"}"); + if (s->is_open()) + s->close(restbed::BAD_GATEWAY, "{\"err\":\"put failed\"}"); } }, time_point::max(), permanent); } else { diff --git a/src/dhtrunner.cpp b/src/dhtrunner.cpp index 7c5f879b732e5b193e8db9b7b44e4864efbf4ac9..1f34d9a67cc025bac0e4f35449815f1260110970 100644 --- a/src/dhtrunner.cpp +++ b/src/dhtrunner.cpp @@ -164,7 +164,7 @@ void DhtRunner::dumpTables() const { std::lock_guard<std::mutex> lck(dht_mtx); - activeDht()->dumpTables(); // NOTE: NOT USED by RingAccount + activeDht()->dumpTables(); } InfoHash @@ -180,7 +180,7 @@ DhtRunner::getNodeId() const { if (!activeDht()) return {}; - return activeDht()->getNodeId(); // NOTE: This is OK, return the SecureDht id + return activeDht()->getNodeId(); } @@ -189,7 +189,7 @@ DhtRunner::getStoreSize() const { std::lock_guard<std::mutex> lck(dht_mtx); if (!activeDht()) return {}; - return activeDht()->getStoreSize(); // NOTE: NOT USED by RingAccount + return activeDht()->getStoreSize(); } void @@ -197,7 +197,7 @@ DhtRunner::setStorageLimit(size_t limit) { std::lock_guard<std::mutex> lck(dht_mtx); if (!activeDht()) throw std::runtime_error("dht is not running"); - return activeDht()->setStorageLimit(limit); // NOTE: NOT USED by RingAccount + return activeDht()->setStorageLimit(limit); } std::vector<NodeExport> @@ -205,7 +205,7 @@ DhtRunner::exportNodes() const { std::lock_guard<std::mutex> lck(dht_mtx); if (!dht_) return {}; - return activeDht()->exportNodes(); // NOTE: TBD Should be OK + return activeDht()->exportNodes(); } std::vector<ValuesExport> @@ -213,38 +213,38 @@ DhtRunner::exportValues() const { std::lock_guard<std::mutex> lck(dht_mtx); if (!activeDht()) return {}; - return activeDht()->exportValues(); // NOTE: TBD Should be OK + return activeDht()->exportValues(); } void DhtRunner::setLoggers(LogMethod error, LogMethod warn, LogMethod debug) { std::lock_guard<std::mutex> lck(dht_mtx); - activeDht()->setLoggers(std::forward<LogMethod>(error), std::forward<LogMethod>(warn), std::forward<LogMethod>(debug)); // NOTE: TBD Should be OK + activeDht()->setLoggers(std::forward<LogMethod>(error), std::forward<LogMethod>(warn), std::forward<LogMethod>(debug)); } void DhtRunner::setLogFilter(const InfoHash& f) { std::lock_guard<std::mutex> lck(dht_mtx); - activeDht()->setLogFilter(f); // NOTE: NOT USED by RingAccount + activeDht()->setLogFilter(f); } void DhtRunner::registerType(const ValueType& type) { std::lock_guard<std::mutex> lck(dht_mtx); - activeDht()->registerType(type); // NOTE: NOT USED by RingAccount + activeDht()->registerType(type); } void DhtRunner::importValues(const std::vector<ValuesExport>& values) { std::lock_guard<std::mutex> lck(dht_mtx); - activeDht()->importValues(values); // NOTE: TBD Should be OK + activeDht()->importValues(values); } unsigned DhtRunner::getNodesStats(sa_family_t af, unsigned *good_return, unsigned *dubious_return, unsigned *cached_return, unsigned *incoming_return) const { std::lock_guard<std::mutex> lck(dht_mtx); - const auto stats = activeDht()->getNodesStats(af); // NOTE: TBD Should be OK + const auto stats = activeDht()->getNodesStats(af); if (good_return) *good_return = stats.good_nodes; if (dubious_return) @@ -260,51 +260,51 @@ NodeStats DhtRunner::getNodesStats(sa_family_t af) const { std::lock_guard<std::mutex> lck(dht_mtx); - return activeDht()->getNodesStats(af); // NOTE: TBD Should be OK + return activeDht()->getNodesStats(af); } std::vector<unsigned> DhtRunner::getNodeMessageStats(bool in) const { std::lock_guard<std::mutex> lck(dht_mtx); - return activeDht()->getNodeMessageStats(in); // NOTE: NOT USED by RingAccount + return activeDht()->getNodeMessageStats(in); } std::string DhtRunner::getStorageLog() const { std::lock_guard<std::mutex> lck(dht_mtx); - return activeDht()->getStorageLog(); // NOTE: NOT USED by RingAccount + return activeDht()->getStorageLog(); } std::string DhtRunner::getStorageLog(const InfoHash& f) const { std::lock_guard<std::mutex> lck(dht_mtx); - return activeDht()->getStorageLog(f); // NOTE: NOT USED by RingAccount + return activeDht()->getStorageLog(f); } std::string DhtRunner::getRoutingTablesLog(sa_family_t af) const { std::lock_guard<std::mutex> lck(dht_mtx); - return activeDht()->getRoutingTablesLog(af); // NOTE: NOT USED by RingAccount + return activeDht()->getRoutingTablesLog(af); } std::string DhtRunner::getSearchesLog(sa_family_t af) const { std::lock_guard<std::mutex> lck(dht_mtx); - return activeDht()->getSearchesLog(af); // NOTE: NOT USED by RingAccount + return activeDht()->getSearchesLog(af); } std::string DhtRunner::getSearchLog(const InfoHash& f, sa_family_t af) const { std::lock_guard<std::mutex> lck(dht_mtx); - return activeDht()->getSearchLog(f, af); // NOTE: NOT USED by RingAccount + return activeDht()->getSearchLog(f, af); } std::vector<SockAddr> DhtRunner::getPublicAddress(sa_family_t af) { std::lock_guard<std::mutex> lck(dht_mtx); - return activeDht()->getPublicAddress(af); // NOTE: TBD Should be OK + return activeDht()->getPublicAddress(af); } std::vector<std::string> DhtRunner::getPublicAddressStr(sa_family_t af) @@ -318,12 +318,14 @@ DhtRunner::getPublicAddressStr(sa_family_t af) void DhtRunner::registerCertificate(std::shared_ptr<crypto::Certificate> cert) { std::lock_guard<std::mutex> lck(dht_mtx); - activeDht()->registerCertificate(cert); // NOTE: NOT USED by RingAccount + activeDht()->registerCertificate(cert); } void DhtRunner::setLocalCertificateStore(CertificateStoreQuery&& query_method) { std::lock_guard<std::mutex> lck(dht_mtx); +#if OPENDHT_PROXY_CLIENT dht_via_proxy_->setLocalCertificateStore(std::forward<CertificateStoreQuery>(query_method)); +#endif dht_->setLocalCertificateStore(std::forward<CertificateStoreQuery>(query_method)); } @@ -891,7 +893,7 @@ DhtRunner::enableProxy(bool proxify) { } if (proxify) { // Init the proxy client - dht_via_proxy_->start(config_.proxy_server); + dht_via_proxy_->startProxy(config_.proxy_server); // add current listeners for (auto& listener: listeners_) { auto tokenProxy = dht_via_proxy_->listen(listener->hash, listener->gcb, std::move(listener->f), std::move(listener->w)); diff --git a/src/securedht.cpp b/src/securedht.cpp index 9dd8b75b4642a245406a9233a05db90627f9adc3..99c7557e789139a63350a1d7b430ff268134f5e0 100644 --- a/src/securedht.cpp +++ b/src/securedht.cpp @@ -229,8 +229,10 @@ SecureDht::getCallbackFilter(GetCallback cb, Value::Filter&& filter) // Decrypt encrypted values if (v->isEncrypted()) { if (not key_) { - if (force_forward_) +#if OPENDHT_PROXY_SERVER + if (forward_all_) // We are currently a proxy, send messages to clients. tmpvals.push_back(v); +#endif continue; } try {