diff --git a/include/opendht/dht_proxy_server.h b/include/opendht/dht_proxy_server.h index a3720d1d0cb8de9d9e4080492fca912505d5d73c..7c1a41dc913d2240968fb5b50728b1d4148ccb97 100644 --- a/include/opendht/dht_proxy_server.h +++ b/include/opendht/dht_proxy_server.h @@ -36,6 +36,15 @@ #include <memory> #include <mutex> +namespace dht { +enum class PushType { + None = 0, + Android, + iOS +}; +} +MSGPACK_ADD_ENUM(dht::PushType) + namespace http { class Request; struct ListenerSession; @@ -52,6 +61,13 @@ class DhtRunner; using RestRouter = restinio::router::express_router_t<>; using RequestStatus = restinio::request_handling_status_t; +struct ProxyServerConfig { + in_port_t port {8000}; + std::string pushServer {}; + std::string persistStatePath {}; + dht::crypto::Identity identity {}; +}; + /** * Describes the REST API */ @@ -66,10 +82,9 @@ public: * @note if the server fails to start (if port is already used or reserved), * it will fails silently */ - DhtProxyServer( - dht::crypto::Identity identity, - std::shared_ptr<DhtRunner> dht, in_port_t port = 8000, const std::string& pushServer = "", - std::shared_ptr<dht::Logger> logger = {}); + DhtProxyServer(const std::shared_ptr<DhtRunner>& dht, + const ProxyServerConfig& config = {}, + const std::shared_ptr<dht::Logger>& logger = {}); virtual ~DhtProxyServer(); @@ -282,17 +297,17 @@ private: * @param key of the device * @param json, the content to send */ - void sendPushNotification(const std::string& key, Json::Value&& json, bool isAndroid, bool highPriority); + void sendPushNotification(const std::string& key, Json::Value&& json, PushType type, bool highPriority); /** * Send push notification with an expire timeout. * @param ec * @param pushToken * @param json - * @param isAndroid + * @param type */ void handleNotifyPushListenExpire(const asio::error_code &ec, const std::string pushToken, - std::function<Json::Value()> json, const bool isAndroid); + std::function<Json::Value()> json, PushType type); /** * Remove a push listener between a client and a hash @@ -309,6 +324,12 @@ private: void handlePrintStats(const asio::error_code &ec); void updateStats(); + template <typename Os> + void saveState(Os& stream); + + template <typename Is> + void loadState(Is& is, size_t size); + using clock = std::chrono::steady_clock; using time_point = clock::time_point; @@ -318,6 +339,8 @@ private: Json::CharReaderBuilder jsonReaderBuilder_; std::mt19937_64 rd {crypto::getSeededRandomEngine<std::mt19937_64>()}; + std::string persistPath_; + // http server std::thread serverThread_; std::unique_ptr<restinio::http_server_t<RestRouterTraitsTls>> httpsServer_; @@ -354,9 +377,32 @@ private: std::unique_ptr<asio::steady_timer> expireTimer; std::unique_ptr<asio::steady_timer> expireNotifyTimer; Sp<Value> value; + PushType type; + + template <typename Packer> + void msgpack_pack(Packer& p) const + { + p.pack_map(2 + (sessionCtx ? 1 : 0) + (clientId.empty() ? 0 : 1) + (type == PushType::None ? 0 : 2)); + p.pack("value"); p.pack(value); + p.pack("exp"); p.pack(to_time_t(expiration)); + if (not clientId.empty()) { + p.pack("cid"); p.pack(clientId); + } + if (sessionCtx) { + std::lock_guard<std::mutex> l(sessionCtx->lock); + p.pack("sid"); p.pack(sessionCtx->sessionId); + } + if (type != PushType::None) { + p.pack("t"); p.pack(type); + p.pack("token"); p.pack(pushToken); + } + } + + void msgpack_unpack(const msgpack::object& o); }; struct SearchPuts { std::map<dht::Value::Id, PermanentPut> puts; + MSGPACK_DEFINE_ARRAY(puts) }; std::mutex lockSearchPuts_; std::map<InfoHash, SearchPuts> puts_; @@ -368,14 +414,32 @@ private: #ifdef OPENDHT_PUSH_NOTIFICATIONS struct Listener { + time_point expiration; std::string clientId; std::shared_ptr<PushSessionContext> sessionCtx; std::future<size_t> internalToken; std::unique_ptr<asio::steady_timer> expireTimer; std::unique_ptr<asio::steady_timer> expireNotifyTimer; + PushType type; + + template <typename Packer> + void msgpack_pack(Packer& p) const + { + p.pack_map(sessionCtx ? 4 : 3); + p.pack("cid"); p.pack(clientId); + p.pack("exp"); p.pack(to_time_t(expiration)); + if (sessionCtx) { + std::lock_guard<std::mutex> l(sessionCtx->lock); + p.pack("sid"); p.pack(sessionCtx->sessionId); + } + p.pack("t"); p.pack(type); + } + + void msgpack_unpack(const msgpack::object& o); }; struct PushListener { std::map<InfoHash, std::vector<Listener>> listeners; + MSGPACK_DEFINE_ARRAY(listeners) }; std::mutex lockPushListeners_; std::map<std::string, PushListener> pushListeners_; diff --git a/src/dht_proxy_server.cpp b/src/dht_proxy_server.cpp index ebc6b6fc7e1b495729bcbedc9235aedb95740563..c31ff1dd73549a4a0cc6e8d41daac32d152be4dc 100644 --- a/src/dht_proxy_server.cpp +++ b/src/dht_proxy_server.cpp @@ -34,6 +34,7 @@ #include <functional> #include <limits> #include <iostream> +#include <fstream> using namespace std::placeholders; using namespace std::chrono_literals; @@ -173,26 +174,71 @@ struct DhtProxyServer::RestRouterTraits : public restinio::default_traits_t using connection_state_listener_t = ConnectionListener; }; -DhtProxyServer::DhtProxyServer( - crypto::Identity identity, - std::shared_ptr<DhtRunner> dht, in_port_t port, const std::string& pushServer, - std::shared_ptr<Logger> logger +void +DhtProxyServer::PermanentPut::msgpack_unpack(const msgpack::object& o) +{ + if (auto cid = findMapValue(o, "cid")) { + clientId = cid->as<std::string>(); + } + if (auto exp = findMapValue(o, "exp")) { + expiration = from_time_t(exp->as<time_t>()); + } + if (auto token = findMapValue(o, "token")) { + pushToken = token->as<std::string>(); + } + if (auto sid = findMapValue(o, "sid")) { + if (not sessionCtx) + sessionCtx = std::make_shared<PushSessionContext>(); + sessionCtx->sessionId = sid->as<std::string>(); + } + if (auto t = findMapValue(o, "t")) { + type = t->as<PushType>(); + } + if (auto val = findMapValue(o, "value")) { + value = std::make_shared<dht::Value>(*val); + } +} + +#ifdef OPENDHT_PUSH_NOTIFICATIONS +void +DhtProxyServer::Listener::msgpack_unpack(const msgpack::object& o) +{ + if (auto cid = findMapValue(o, "cid")) { + clientId = cid->as<std::string>(); + } + if (auto exp = findMapValue(o, "exp")) { + expiration = from_time_t(exp->as<time_t>()); + } + if (auto sid = findMapValue(o, "sid")) { + if (not sessionCtx) + sessionCtx = std::make_shared<PushSessionContext>(); + sessionCtx->sessionId = sid->as<std::string>(); + } + if (auto t = findMapValue(o, "t")) { + type = t->as<PushType>(); + } +} +#endif + +DhtProxyServer::DhtProxyServer(const std::shared_ptr<DhtRunner>& dht, + const ProxyServerConfig& config, + const std::shared_ptr<dht::Logger>& logger ) : ioContext_(std::make_shared<asio::io_context>()), - dht_(dht), logger_(logger), + dht_(dht), persistPath_(config.persistStatePath), logger_(logger), printStatsTimer_(std::make_unique<asio::steady_timer>(*ioContext_, 3s)), connListener_(std::make_shared<ConnectionListener>(std::bind(&DhtProxyServer::onConnectionClosed, this, std::placeholders::_1))), - pushServer_(pushServer) + pushServer_(config.pushServer) { if (not dht_) throw std::invalid_argument("A DHT instance must be provided"); if (logger_) - logger_->d("[proxy:server] [init] running on %i", port); - if (not pushServer.empty()){ + logger_->d("[proxy:server] [init] running on %i", config.port); + if (not pushServer_.empty()){ #ifdef OPENDHT_PUSH_NOTIFICATIONS if (logger_) - logger_->d("[proxy:server] [init] using push server %s", pushServer.c_str()); + logger_->d("[proxy:server] [init] using push server %s", pushServer_.c_str()); #else if (logger_) logger_->e("[proxy:server] [init] opendht built without push notification support"); @@ -202,9 +248,9 @@ DhtProxyServer::DhtProxyServer( jsonBuilder_["commentStyle"] = "None"; jsonBuilder_["indentation"] = ""; - if (!pushServer.empty()){ + if (!pushServer_.empty()){ // no host delim, assume port only - if (pushServer.find(":") == std::string::npos) + if (pushServer_.find(":") == std::string::npos) pushServer_ = "localhost:" + pushServer_; // define http request destination for push notifications pushHostPort_ = splitPort(pushServer_); @@ -212,7 +258,7 @@ DhtProxyServer::DhtProxyServer( logger_->d("Using push server for notifications: %s:%s", pushHostPort_.first.c_str(), pushHostPort_.second.c_str()); } - if (identity.first and identity.second) { + if (config.identity.first and config.identity.second) { asio::error_code ec; // define tls context asio::ssl::context tls_context { asio::ssl::context::sslv23 }; @@ -226,13 +272,13 @@ DhtProxyServer::DhtProxyServer( SSL_CTX_set_options(tls_context.native_handle(), SSL_OP_NO_RENEGOTIATION); // CVE-2009-3555 #endif // node private key - auto key = identity.first->serialize(); + auto key = config.identity.first->serialize(); tls_context.use_private_key(asio::const_buffer{key.data(), key.size()}, asio::ssl::context::file_format::pem, ec); if (ec) throw std::runtime_error("Error setting node's private key: " + ec.message()); // certificate chain - auto certchain = identity.second->toString(true/*chain*/); + auto certchain = config.identity.second->toString(true/*chain*/); tls_context.use_certificate_chain(asio::const_buffer{certchain.data(), certchain.size()}, ec); if (ec) throw std::runtime_error("Error setting certificate chain: " + ec.message()); @@ -241,7 +287,7 @@ DhtProxyServer::DhtProxyServer( // build http server auto settings = restinio::run_on_this_thread_settings_t<RestRouterTraitsTls>(); addServerSettings(settings); - settings.port(port); + settings.port(config.port); settings.tls_context(std::move(tls_context)); httpsServer_ = std::make_unique<restinio::http_server_t<RestRouterTraitsTls>>( ioContext_, @@ -258,7 +304,7 @@ DhtProxyServer::DhtProxyServer( else { auto settings = restinio::run_on_this_thread_settings_t<RestRouterTraits>(); addServerSettings(settings); - settings.port(port); + settings.port(config.port); httpServer_ = std::make_unique<restinio::http_server_t<RestRouterTraits>>( ioContext_, std::forward<restinio::run_on_this_thread_settings_t<RestRouterTraits>>(std::move(settings)) @@ -274,8 +320,150 @@ DhtProxyServer::DhtProxyServer( dht->forwardAllMessages(true); updateStats(); printStatsTimer_->async_wait(std::bind(&DhtProxyServer::handlePrintStats, this, std::placeholders::_1)); + + if (not persistPath_.empty()) { + try { + std::ifstream stateFile(persistPath_, std::ios::binary | std::ios::ate); + if (stateFile) { + std::streamsize size = stateFile.tellg(); + stateFile.seekg(0, std::ios::beg); + if (logger_) + logger_->d("Loading proxy state from %.*s (%td bytes)", (int)persistPath_.size(), persistPath_.c_str(), size); + loadState(stateFile, size); + } + } catch (const std::exception& e) { + if (logger_) + logger_->e("Error loading state from file: %s", e.what()); + } + } +} + +template <typename Os> +void +DhtProxyServer::saveState(Os& stream) { + msgpack::packer<Os> pk(&stream); + pk.pack_map(2); + { + std::lock_guard<std::mutex> lock(lockSearchPuts_); + pk.pack("puts"); + pk.pack(puts_); + } + { + std::lock_guard<std::mutex> lock(lockListener_); + pk.pack("pushListeners"); + pk.pack(pushListeners_); + } } +template <typename Is> +void +DhtProxyServer::loadState(Is& is, size_t size) { + msgpack::unpacker pac; + pac.reserve_buffer(size); + if (is.read(pac.buffer(), size)) { + pac.buffer_consumed(size); + + msgpack::object_handle oh; + while (pac.next(oh)) { + if (oh.get().type != msgpack::type::MAP) + continue; + if (auto puts = findMapValue(oh.get(), "puts")) { + std::lock_guard<std::mutex> lock(lockSearchPuts_); + puts_ = puts->as<decltype(puts_)>(); + if (logger_) + logger_->d("Loading %zu persistent puts", puts_.size()); + for (auto& put : puts_) { + for (auto& pput : put.second.puts) { + pput.second.expireTimer = std::make_unique<asio::steady_timer>(io_context(), pput.second.expiration); + pput.second.expireTimer->async_wait(std::bind(&DhtProxyServer::handleCancelPermamentPut, this, + std::placeholders::_1, put.first, pput.first)); + auto jsonProvider = [infoHash=put.first.toString(), clientId=pput.second.clientId, vid = pput.first, sessionCtx = pput.second.sessionCtx](){ + Json::Value json; + json["timeout"] = infoHash; + json["to"] = clientId; + json["vid"] = std::to_string(vid); + std::lock_guard<std::mutex> l(sessionCtx->lock); + json["s"] = sessionCtx->sessionId; + return json; + }; + pput.second.expireNotifyTimer = std::make_unique<asio::steady_timer>(io_context(), pput.second.expiration - proxy::OP_MARGIN); + pput.second.expireNotifyTimer->async_wait(std::bind( + &DhtProxyServer::handleNotifyPushListenExpire, this, + std::placeholders::_1, pput.second.pushToken, std::move(jsonProvider), pput.second.type)); + dht_->put(put.first, pput.second.value, DoneCallbackSimple{}, time_point::max(), true); + } + } + } else { + if (logger_) + logger_->d("No persistent puts in state"); + } +#ifdef OPENDHT_PUSH_NOTIFICATIONS + if (auto listeners = findMapValue(oh.get(), "pushListeners")) { + std::lock_guard<std::mutex> lock(lockListener_); + pushListeners_ = listeners->as<decltype(pushListeners_)>(); + if (logger_) + logger_->d("Loading %zu push listeners", pushListeners_.size()); + for (auto& pushListener : pushListeners_) { + for (auto& listeners : pushListener.second.listeners) { + for (auto& listener : listeners.second) { + listener.internalToken = dht_->listen(listeners.first, + [this, infoHash=listeners.first, pushToken=pushListener.first, type=listener.type, clientId=listener.clientId, sessionCtx = listener.sessionCtx] + (const std::vector<std::shared_ptr<Value>>& values, bool expired) { + // Build message content + Json::Value json; + json["key"] = infoHash.toString(); + json["to"] = clientId; + json["t"] = Json::Value::Int64(std::chrono::duration_cast<std::chrono::milliseconds>(system_clock::now().time_since_epoch()).count()); + { + std::lock_guard<std::mutex> l(sessionCtx->lock); + json["s"] = sessionCtx->sessionId; + } + if (expired and values.size() < 2){ + std::stringstream ss; + for(size_t i = 0; i < values.size(); ++i){ + if(i != 0) ss << ","; + ss << values[i]->id; + } + json["exp"] = ss.str(); + } + auto maxPrio = 1000u; + for (const auto& v : values) + maxPrio = std::min(maxPrio, v->priority); + sendPushNotification(pushToken, std::move(json), type, !expired and maxPrio == 0); + return true; + } + ); + // expire notify + listener.expireNotifyTimer = std::make_unique<asio::steady_timer>(io_context(), listener.expiration - proxy::OP_MARGIN); + auto jsonProvider = [infoHash = listeners.first.toString(), clientId = listener.clientId, sessionCtx = listener.sessionCtx](){ + Json::Value json; + json["timeout"] = infoHash; + json["to"] = clientId; + std::lock_guard<std::mutex> l(sessionCtx->lock); + json["s"] = sessionCtx->sessionId; + return json; + }; + listener.expireNotifyTimer->async_wait(std::bind(&DhtProxyServer::handleNotifyPushListenExpire, this, + std::placeholders::_1, pushListener.first, std::move(jsonProvider), listener.type)); + // cancel push listen + listener.expireTimer = std::make_unique<asio::steady_timer>(io_context(), listener.expiration); + listener.expireTimer->async_wait(std::bind(&DhtProxyServer::handleCancelPushListen, this, + std::placeholders::_1, pushListener.first, listeners.first, listener.clientId)); + } + } + } + } else { + if (logger_) + logger_->d("No push listeners in state"); + } +#endif + } + if (logger_) + logger_->d("loading ended"); + } +} + + asio::io_context& DhtProxyServer::io_context() const { @@ -284,6 +472,12 @@ DhtProxyServer::io_context() const DhtProxyServer::~DhtProxyServer() { + if (not persistPath_.empty()) { + if (logger_) + logger_->d("Saving proxy state to %.*s", (int)persistPath_.size(), persistPath_.c_str()); + std::ofstream stateFile(persistPath_, std::ios::binary); + saveState(stateFile); + } if (dht_) { std::lock_guard<std::mutex> lock(lockListener_); for (auto& l : listeners_) { @@ -607,7 +801,7 @@ DhtProxyServer::subscribe(restinio::request_handle_t request, response.set_body(RESP_MSG_NO_TOKEN); return response.done(); } - auto isAndroid = root["platform"].asString() == "android"; + auto type = root["platform"].asString() == "android" ? PushType::Android : PushType::iOS; auto clientId = root["client_id"].asString(); auto sessionId = root["session_id"].asString(); @@ -628,8 +822,10 @@ DhtProxyServer::subscribe(restinio::request_handle_t request, if (logger_) logger_->d("[proxy:server] [subscribe] found [client %s]", listener.clientId.c_str()); // Reset timers + listener.expiration = timeout; listener.expireTimer->expires_at(timeout); listener.expireNotifyTimer->expires_at(timeout - proxy::OP_MARGIN); + listener.type = type; { std::lock_guard<std::mutex> l(listener.sessionCtx->lock); listener.sessionCtx->sessionId = sessionId; @@ -664,10 +860,11 @@ DhtProxyServer::subscribe(restinio::request_handle_t request, listener.clientId = clientId; listener.sessionCtx = std::make_shared<PushSessionContext>(); listener.sessionCtx->sessionId = sessionId; + listener.type = type; // Add listen on dht listener.internalToken = dht_->listen(infoHash, - [this, infoHash, pushToken, isAndroid, clientId, sessionCtx = listener.sessionCtx] + [this, infoHash, pushToken, type, clientId, sessionCtx = listener.sessionCtx] (const std::vector<std::shared_ptr<Value>>& values, bool expired){ // Build message content Json::Value json; @@ -689,7 +886,7 @@ DhtProxyServer::subscribe(restinio::request_handle_t request, auto maxPrio = 1000u; for (const auto& v : values) maxPrio = std::min(maxPrio, v->priority); - sendPushNotification(pushToken, std::move(json), isAndroid, !expired and maxPrio == 0); + sendPushNotification(pushToken, std::move(json), type, !expired and maxPrio == 0); return true; } ); @@ -707,7 +904,7 @@ DhtProxyServer::subscribe(restinio::request_handle_t request, return json; }; listener.expireNotifyTimer->async_wait(std::bind(&DhtProxyServer::handleNotifyPushListenExpire, this, - std::placeholders::_1, pushToken, std::move(jsonProvider), isAndroid)); + std::placeholders::_1, pushToken, std::move(jsonProvider), listener.type)); // cancel push listen if (!listener.expireTimer) listener.expireTimer = std::make_unique<asio::steady_timer>(io_context(), timeout); @@ -766,7 +963,7 @@ DhtProxyServer::unsubscribe(restinio::request_handle_t request, void DhtProxyServer::handleNotifyPushListenExpire(const asio::error_code &ec, const std::string pushToken, - std::function<Json::Value()> jsonProvider, const bool isAndroid) + std::function<Json::Value()> jsonProvider, PushType type) { if (ec == asio::error::operation_aborted) return; @@ -776,7 +973,7 @@ DhtProxyServer::handleNotifyPushListenExpire(const asio::error_code &ec, const s } if (logger_) logger_->d("[proxy:server] [subscribe] sending put refresh to %s token", pushToken.c_str()); - sendPushNotification(pushToken, jsonProvider(), isAndroid, false); + sendPushNotification(pushToken, jsonProvider(), type, false); } void @@ -818,7 +1015,7 @@ DhtProxyServer::handleCancelPushListen(const asio::error_code &ec, const std::st } void -DhtProxyServer::sendPushNotification(const std::string& token, Json::Value&& json, bool isAndroid, bool highPriority) +DhtProxyServer::sendPushNotification(const std::string& token, Json::Value&& json, PushType type, bool highPriority) { if (pushServer_.empty()) return; @@ -840,7 +1037,7 @@ DhtProxyServer::sendPushNotification(const std::string& token, Json::Value&& jso Json::Value tokens(Json::arrayValue); tokens[0] = token; notification["tokens"] = std::move(tokens); - notification["platform"] = isAndroid ? 2 : 1; + notification["platform"] = type == PushType::Android ? 2 : 1; notification["data"] = std::move(json); notification["priority"] = highPriority ? "high" : "normal"; notification["time_to_live"] = 600; @@ -889,6 +1086,7 @@ DhtProxyServer::handleCancelPermamentPut(const asio::error_code &ec, const InfoH } if (logger_) logger_->d("[proxy:server] [put %s] cancel permament put %i", key.toString().c_str(), vid); + std::lock_guard<std::mutex> lock(lockSearchPuts_); auto sPuts = puts_.find(key); if (sPuts == puts_.end()) return; @@ -898,6 +1096,8 @@ DhtProxyServer::handleCancelPermamentPut(const asio::error_code &ec, const InfoH return; if (dht_) dht_->cancelPut(key, vid); + if (put->second.expireTimer) + put->second.expireTimer->cancel(); if (put->second.expireNotifyTimer) put->second.expireNotifyTimer->cancel(); sPutsMap.erase(put); @@ -941,7 +1141,7 @@ DhtProxyServer::put(restinio::request_handle_t request, platform = pVal["platform"].asString(); sessionId = pVal["session_id"].asString(); } - std::unique_lock<std::mutex> lock(lockSearchPuts_); + std::lock_guard<std::mutex> lock(lockSearchPuts_); auto timeout = std::chrono::steady_clock::now() + proxy::OP_TIMEOUT; auto& sPuts = puts_[infoHash]; if (value->id == Value::INVALID_ID) { @@ -970,18 +1170,20 @@ DhtProxyServer::put(restinio::request_handle_t request, auto vid = value->id; auto& pput = sPuts.puts[vid]; pput.value = value; + pput.expiration = timeout; if (not pput.expireTimer) { auto &ctx = io_context(); // cancel permanent put pput.expireTimer = std::make_unique<asio::steady_timer>(ctx, timeout); #ifdef OPENDHT_PUSH_NOTIFICATIONS if (not pushToken.empty()){ + bool isAndroid = platform == "android"; pput.pushToken = pushToken; pput.clientId = clientId; + pput.type = isAndroid ? PushType::Android : PushType::iOS; pput.sessionCtx = std::make_shared<PushSessionContext>(); pput.sessionCtx->sessionId = sessionId; // notify push listen expire - bool isAndroid = platform == "android"; auto jsonProvider = [infoHash, clientId, vid, sessionCtx = pput.sessionCtx](){ Json::Value json; json["timeout"] = infoHash.toString(); @@ -998,7 +1200,7 @@ DhtProxyServer::put(restinio::request_handle_t request, pput.expireNotifyTimer->expires_at(timeout - proxy::OP_MARGIN); pput.expireNotifyTimer->async_wait(std::bind( &DhtProxyServer::handleNotifyPushListenExpire, this, - std::placeholders::_1, pushToken, std::move(jsonProvider), isAndroid)); + std::placeholders::_1, pushToken, std::move(jsonProvider), pput.type)); } #endif } else { diff --git a/tests/dhtproxytester.cpp b/tests/dhtproxytester.cpp index 0fea6aef7f1847dfc596909508fc24226d76b05c..400d56bdd4d5e304413d0c36c52e4c984162ca33 100644 --- a/tests/dhtproxytester.cpp +++ b/tests/dhtproxytester.cpp @@ -44,12 +44,12 @@ DhtProxyTester::setUp() { nodeProxy->bootstrap(nodePeer.getBound()); auto serverCAIdentity = dht::crypto::generateEcIdentity("DHT Node CA"); - auto serverIdentity = dht::crypto::generateIdentity("DHT Node", serverCAIdentity); - serverProxy = std::make_unique<dht::DhtProxyServer>( - //dht::crypto::Identity{}, // http - serverIdentity, // https - nodeProxy, 8080, /*pushServer*/"127.0.0.1:8090"); + dht::ProxyServerConfig serverConfig; + serverConfig.identity = dht::crypto::generateIdentity("DHT Node", serverCAIdentity); + serverConfig.port = 8080; + serverConfig.pushServer = "127.0.0.1:8090"; + serverProxy = std::make_unique<dht::DhtProxyServer>(nodeProxy, serverConfig); clientConfig.server_ca = serverCAIdentity.second; clientConfig.client_identity = dht::crypto::generateIdentity("DhtProxyTester"); diff --git a/tests/httptester.cpp b/tests/httptester.cpp index 9fe01f26edb4fcfffe9bc01f55fa6e389d90fa0a..a657519a256a27ebb69c6ce3ecb57974b02b69e7 100644 --- a/tests/httptester.cpp +++ b/tests/httptester.cpp @@ -39,9 +39,10 @@ HttpTester::setUp() { nodeProxy->run(0, /*identity*/{}, /*threaded*/true); nodeProxy->bootstrap(nodePeer->getBound()); - serverProxy = std::unique_ptr<dht::DhtProxyServer>( - new dht::DhtProxyServer( - /*http*/dht::crypto::Identity{}, nodeProxy, 8080, /*pushServer*/"127.0.0.1:8090")); + dht::ProxyServerConfig config; + config.port = 8080; + config.pushServer = "127.0.0.1:8090"; + serverProxy = std::make_unique<dht::DhtProxyServer>(nodeProxy, config); } void diff --git a/tools/dhtnode.cpp b/tools/dhtnode.cpp index a3d7c4d2ad3a89301cc18263e2c1640f7723a896..9367f6accf125defb490fd690bec7cd7928ce179 100644 --- a/tools/dhtnode.cpp +++ b/tools/dhtnode.cpp @@ -218,14 +218,11 @@ void cmd_loop(std::shared_ptr<DhtRunner>& node, dht_params& params iss >> idstr; #endif // OPENDHT_PUSH_NOTIFICATIONS try { - unsigned int port = std::stoi(idstr); - proxies.emplace(port, std::unique_ptr<DhtProxyServer>( - new DhtProxyServer( - dht::crypto::Identity{}, node, port -#ifdef OPENDHT_PUSH_NOTIFICATIONS - ,pushServer -#endif - ))); + in_port_t port = std::stoi(idstr); + ProxyServerConfig serverConfig; + serverConfig.port = port; + serverConfig.pushServer = pushServer; + proxies.emplace(port, std::make_unique<DhtProxyServer>(node, serverConfig)); } catch (...) { } continue; } else if (op == "psx") { @@ -236,13 +233,12 @@ void cmd_loop(std::shared_ptr<DhtRunner>& node, dht_params& params #endif // OPENDHT_PUSH_NOTIFICATIONS try { if (params.proxy_id.first and params.proxy_id.second){ - unsigned int port = std::stoi(idstr); - proxies.emplace(port, std::unique_ptr<DhtProxyServer>( - new DhtProxyServer(params.proxy_id, node, port -#ifdef OPENDHT_PUSH_NOTIFICATIONS - ,pushServer -#endif - ))); + in_port_t port = std::stoi(idstr); + ProxyServerConfig serverConfig; + serverConfig.identity = params.proxy_id; + serverConfig.port = port; + serverConfig.pushServer = pushServer; + proxies.emplace(port, std::make_unique<DhtProxyServer>(node, serverConfig)); } else { std::cerr << "Missing Identity private key or certificate" << std::endl; @@ -542,17 +538,24 @@ main(int argc, char **argv) #ifdef OPENDHT_PROXY_SERVER std::map<in_port_t, std::unique_ptr<DhtProxyServer>> proxies; #endif - if (params.proxyserverssl and params.proxy_id.first and params.proxy_id.second){ + if (params.proxyserver or params.proxyserverssl) { #ifdef OPENDHT_PROXY_SERVER - proxies.emplace(params.proxyserverssl, std::unique_ptr<DhtProxyServer>( - new DhtProxyServer(params.proxy_id, - node, params.proxyserverssl, params.pushserver, dhtConf.second.logger))); - } - if (params.proxyserver) { - proxies.emplace(params.proxyserver, std::unique_ptr<DhtProxyServer>( - new DhtProxyServer( - dht::crypto::Identity{}, - node, params.proxyserver, params.pushserver, dhtConf.second.logger))); + ProxyServerConfig serverConfig; + serverConfig.pushServer = params.pushserver; + if (params.proxyserverssl and params.proxy_id.first and params.proxy_id.second){ + serverConfig.identity = params.proxy_id; + serverConfig.port = params.proxyserverssl; + if (not params.persist_path.empty()) + serverConfig.persistStatePath = params.persist_path + '_' + std::to_string(serverConfig.port); + proxies.emplace(params.proxyserverssl, std::make_unique<DhtProxyServer>(node, serverConfig, dhtConf.second.logger)); + } + if (params.proxyserver) { + serverConfig.identity = {}; + serverConfig.port = params.proxyserver; + if (not params.persist_path.empty()) + serverConfig.persistStatePath = params.persist_path + '_' + std::to_string(serverConfig.port); + proxies.emplace(params.proxyserver, std::make_unique<DhtProxyServer>(node, serverConfig, dhtConf.second.logger)); + } #else std::cerr << "DHT proxy server requested but OpenDHT built without proxy server support." << std::endl; exit(EXIT_FAILURE);