diff --git a/include/opendht/dht_proxy_server.h b/include/opendht/dht_proxy_server.h index 7c1a41dc913d2240968fb5b50728b1d4148ccb97..5977129a1f94920333d29480f2e22a9121b89f81 100644 --- a/include/opendht/dht_proxy_server.h +++ b/include/opendht/dht_proxy_server.h @@ -368,6 +368,7 @@ private: struct PushSessionContext { std::mutex lock; std::string sessionId; + PushSessionContext(const std::string& id) : sessionId(id) {} }; struct PermanentPut { time_point expiration; diff --git a/src/dht_proxy_server.cpp b/src/dht_proxy_server.cpp index c31ff1dd73549a4a0cc6e8d41daac32d152be4dc..c3490c89e76c5a44375200b64838a702d4bd43e9 100644 --- a/src/dht_proxy_server.cpp +++ b/src/dht_proxy_server.cpp @@ -188,8 +188,9 @@ DhtProxyServer::PermanentPut::msgpack_unpack(const msgpack::object& o) } if (auto sid = findMapValue(o, "sid")) { if (not sessionCtx) - sessionCtx = std::make_shared<PushSessionContext>(); - sessionCtx->sessionId = sid->as<std::string>(); + sessionCtx = std::make_shared<PushSessionContext>(sid->as<std::string>()); + else + sessionCtx->sessionId = sid->as<std::string>(); } if (auto t = findMapValue(o, "t")) { type = t->as<PushType>(); @@ -211,8 +212,9 @@ DhtProxyServer::Listener::msgpack_unpack(const msgpack::object& o) } if (auto sid = findMapValue(o, "sid")) { if (not sessionCtx) - sessionCtx = std::make_shared<PushSessionContext>(); - sessionCtx->sessionId = sid->as<std::string>(); + sessionCtx = std::make_shared<PushSessionContext>(sid->as<std::string>()); + else + sessionCtx->sessionId = sid->as<std::string>(); } if (auto t = findMapValue(o, "t")) { type = t->as<PushType>(); @@ -779,12 +781,11 @@ DhtProxyServer::subscribe(restinio::request_handle_t request, restinio::router::route_params_t params) { requestNum_++; - - InfoHash infoHash(params["hash"].to_string()); - if (!infoHash) - infoHash = InfoHash::get(params["hash"].to_string()); - try { + InfoHash infoHash(params["hash"].to_string()); + if (!infoHash) + infoHash = InfoHash::get(params["hash"].to_string()); + std::string err; Json::Value r; auto* char_data = reinterpret_cast<const char*>(request->body().data()); @@ -807,97 +808,37 @@ DhtProxyServer::subscribe(restinio::request_handle_t request, if (logger_) logger_->d("[proxy:server] [subscribe %s] [client %s] [session %s]", infoHash.toString().c_str(), clientId.c_str(), sessionId.c_str()); - // ================ Search for existing listener =================== - // start the timer - auto timeout = std::chrono::steady_clock::now() + proxy::OP_TIMEOUT; - std::lock_guard<std::mutex> lock(lockPushListeners_); // Insert new or return existing push listeners of a token + std::lock_guard<std::mutex> lock(lockPushListeners_); auto& pushListener = pushListeners_[pushToken]; auto& pushListeners = pushListener.listeners[infoHash]; - for (auto &listener: pushListeners) { - // Found -> Resubscribe - if (listener.clientId == clientId) { - if (logger_) - logger_->d("[proxy:server] [subscribe] found [client %s]", listener.clientId.c_str()); - // Reset timers - listener.expiration = timeout; - listener.expireTimer->expires_at(timeout); - listener.expireNotifyTimer->expires_at(timeout - proxy::OP_MARGIN); - listener.type = type; - { - std::lock_guard<std::mutex> l(listener.sessionCtx->lock); - listener.sessionCtx->sessionId = sessionId; - } - // Send response header - auto response = std::make_shared<ResponseByPartsBuilder>( - initHttpResponse(request->create_response<ResponseByParts>())); - response->flush(); - if (!root["refresh"].asBool()) { - // No Refresh - dht_->get(infoHash, [this, response](const Sp<Value>& value){ - auto output = Json::writeString(jsonBuilder_, value->toJson()) + "\n"; - response->append_chunk(output); - response->flush(); - return true; - }, - [response] (bool){ - response->done(); - }); - } else { - // Refresh - response->append_chunk("{}\n"); - response->done(); - } - return restinio::request_handling_status_t::accepted; - } + auto listIt = std::find_if(pushListeners.begin(), pushListeners.end(), [&](const Listener& l) { + return l.clientId == clientId; + }); + bool newListener = listIt == pushListeners.end(); + if (newListener) { + pushListeners.emplace_back(Listener{}); + listIt = std::prev(pushListeners.end()); + listIt->clientId = clientId; + listIt->sessionCtx = std::make_shared<PushSessionContext>(sessionId); + } else { + std::lock_guard<std::mutex> l(listIt->sessionCtx->lock); + listIt->sessionCtx->sessionId = sessionId; } - // =========== No existing listener for an infoHash ============ - // Add new listener to list of listeners - pushListeners.emplace_back(Listener{}); - auto &listener = pushListeners.back(); - listener.clientId = clientId; - listener.sessionCtx = std::make_shared<PushSessionContext>(); - listener.sessionCtx->sessionId = sessionId; - listener.type = type; - - // Add listen on dht - listener.internalToken = dht_->listen(infoHash, - [this, infoHash, pushToken, type, clientId, sessionCtx = listener.sessionCtx] - (const std::vector<std::shared_ptr<Value>>& values, bool expired){ - // Build message content - Json::Value json; - json["key"] = infoHash.toString(); - json["to"] = clientId; - json["t"] = Json::Value::Int64(std::chrono::duration_cast<std::chrono::milliseconds>(system_clock::now().time_since_epoch()).count()); - { - std::lock_guard<std::mutex> l(sessionCtx->lock); - json["s"] = sessionCtx->sessionId; - } - if (expired and values.size() < 2){ - std::stringstream ss; - for(size_t i = 0; i < values.size(); ++i){ - if(i != 0) ss << ","; - ss << values[i]->id; - } - json["exp"] = ss.str(); - } - auto maxPrio = 1000u; - for (const auto& v : values) - maxPrio = std::min(maxPrio, v->priority); - sendPushNotification(pushToken, std::move(json), type, !expired and maxPrio == 0); - return true; - } - ); - // expire notify - if (!listener.expireNotifyTimer) - listener.expireNotifyTimer = std::make_unique<asio::steady_timer>(io_context(), timeout - proxy::OP_MARGIN); - else + auto& listener = *listIt; + + // Expiration + auto timeout = std::chrono::steady_clock::now() + proxy::OP_TIMEOUT; + listener.expiration = timeout; + if (listener.expireNotifyTimer) listener.expireNotifyTimer->expires_at(timeout - proxy::OP_MARGIN); - auto jsonProvider = [infoHash, clientId, sessionCtx = listener.sessionCtx](){ + else + listener.expireNotifyTimer = std::make_unique<asio::steady_timer>(io_context(), timeout - proxy::OP_MARGIN); + auto jsonProvider = [h=infoHash.toString(), clientId, sessionCtx = listener.sessionCtx](){ Json::Value json; - json["timeout"] = infoHash.toString(); + json["timeout"] = h; json["to"] = clientId; std::lock_guard<std::mutex> l(sessionCtx->lock); json["s"] = sessionCtx->sessionId; @@ -905,16 +846,70 @@ DhtProxyServer::subscribe(restinio::request_handle_t request, }; listener.expireNotifyTimer->async_wait(std::bind(&DhtProxyServer::handleNotifyPushListenExpire, this, std::placeholders::_1, pushToken, std::move(jsonProvider), listener.type)); - // cancel push listen if (!listener.expireTimer) listener.expireTimer = std::make_unique<asio::steady_timer>(io_context(), timeout); else listener.expireTimer->expires_at(timeout); listener.expireTimer->async_wait(std::bind(&DhtProxyServer::handleCancelPushListen, this, - std::placeholders::_1, pushToken, infoHash, clientId)); - auto response = initHttpResponse(request->create_response()); - response.set_body("{}\n"); - return response.done(); + std::placeholders::_1, pushToken, infoHash, clientId)); + + // Send response + if (not newListener) { + if (logger_) + logger_->d("[proxy:server] [subscribe] found [client %s]", listener.clientId.c_str()); + // Send response header + auto response = std::make_shared<ResponseByPartsBuilder>(initHttpResponse(request->create_response<ResponseByParts>())); + response->flush(); + if (!root["refresh"].asBool()) { + // No Refresh + dht_->get(infoHash, [this, response](const Sp<Value>& value){ + auto output = Json::writeString(jsonBuilder_, value->toJson()) + "\n"; + response->append_chunk(output); + response->flush(); + return true; + }, + [response] (bool){ + response->done(); + }); + } else { + // Refresh + response->append_chunk("{}\n"); + return response->done(); + } + } else { + // =========== No existing listener for an infoHash ============ + // Add listen on dht + listener.internalToken = dht_->listen(infoHash, + [this, infoHash, pushToken, type, clientId, sessionCtx = listener.sessionCtx] + (const std::vector<std::shared_ptr<Value>>& values, bool expired){ + // Build message content + Json::Value json; + json["key"] = infoHash.toString(); + json["to"] = clientId; + json["t"] = Json::Value::Int64(std::chrono::duration_cast<std::chrono::milliseconds>(system_clock::now().time_since_epoch()).count()); + { + std::lock_guard<std::mutex> l(sessionCtx->lock); + json["s"] = sessionCtx->sessionId; + } + if (expired and values.size() < 2){ + std::stringstream ss; + for(size_t i = 0; i < values.size(); ++i){ + if(i != 0) ss << ","; + ss << values[i]->id; + } + json["exp"] = ss.str(); + } + auto maxPrio = 1000u; + for (const auto& v : values) + maxPrio = std::min(maxPrio, v->priority); + sendPushNotification(pushToken, std::move(json), type, !expired and maxPrio == 0); + return true; + } + ); + auto response = initHttpResponse(request->create_response()); + response.set_body("{}\n"); + return response.done(); + } } catch (...) { return serverError(*request); @@ -1155,9 +1150,11 @@ DhtProxyServer::put(restinio::request_handle_t request, std::placeholders::_1, infoHash, pp.second.value->id)); if (not sessionId.empty()) { if (not pp.second.sessionCtx) - pp.second.sessionCtx = std::make_shared<PushSessionContext>(); - std::lock_guard<std::mutex> l(pp.second.sessionCtx->lock); - pp.second.sessionCtx->sessionId = sessionId; + pp.second.sessionCtx = std::make_shared<PushSessionContext>(sessionId); + else { + std::lock_guard<std::mutex> l(pp.second.sessionCtx->lock); + pp.second.sessionCtx->sessionId = sessionId; + } } auto response = initHttpResponse(request->create_response()); response.append_body(Json::writeString(jsonBuilder_, value->toJson()) + "\n"); @@ -1181,8 +1178,7 @@ DhtProxyServer::put(restinio::request_handle_t request, pput.pushToken = pushToken; pput.clientId = clientId; pput.type = isAndroid ? PushType::Android : PushType::iOS; - pput.sessionCtx = std::make_shared<PushSessionContext>(); - pput.sessionCtx->sessionId = sessionId; + pput.sessionCtx = std::make_shared<PushSessionContext>(sessionId); // notify push listen expire auto jsonProvider = [infoHash, clientId, vid, sessionCtx = pput.sessionCtx](){ Json::Value json; @@ -1206,9 +1202,11 @@ DhtProxyServer::put(restinio::request_handle_t request, } else { if (not sessionId.empty()) { if (not pput.sessionCtx) - pput.sessionCtx = std::make_shared<PushSessionContext>(); - std::lock_guard<std::mutex> l(pput.sessionCtx->lock); - pput.sessionCtx->sessionId = sessionId; + pput.sessionCtx = std::make_shared<PushSessionContext>(sessionId); + else { + std::lock_guard<std::mutex> l(pput.sessionCtx->lock); + pput.sessionCtx->sessionId = sessionId; + } } pput.expireTimer->expires_at(timeout); if (pput.expireNotifyTimer)