diff --git a/include/opendht/dht_proxy_server.h b/include/opendht/dht_proxy_server.h index 448a34b3ce4eb51480c071852809fa7680c429f7..31c46916fcfa1698b37d46fc8e766f568ed16e7f 100644 --- a/include/opendht/dht_proxy_server.h +++ b/include/opendht/dht_proxy_server.h @@ -276,6 +276,12 @@ private: restinio::router::route_params_t params); #ifdef OPENDHT_PUSH_NOTIFICATIONS + struct PushSessionContext { + std::mutex lock; + std::string sessionId; + PushSessionContext(const std::string& id) : sessionId(id) {} + }; + PushType getTypeFromString(const std::string& type); std::string getDefaultTopic(PushType type); @@ -331,6 +337,24 @@ private: void handleCancelPushListen(const asio::error_code &ec, const std::string pushToken, const InfoHash key, const std::string clientId); + /** + * Handles a push listen request. + * + * @param infoHash The information hash associated with the push listen request. + * @param pushToken The push token associated with the push listen request. + * @param type The type of the push listen request. + * @param clientId The client ID associated with the push listen request. + * @param sessionCtx The shared pointer to the push session context associated with the push listen request. + * @param topic The topic associated with the push listen request. + * @param values The vector of shared pointers to values associated with the push listen request. + * @param expired A boolean indicating whether the push listen request has expired. + * @return true. + */ + bool handlePushListen(const InfoHash infoHash, const std::string pushToken, + PushType type, const std::string clientId, + const std::shared_ptr<DhtProxyServer::PushSessionContext> sessionCtx, const std::string topic, + const std::vector<std::shared_ptr<Value>>& values, bool expired); + #endif //OPENDHT_PUSH_NOTIFICATIONS void handlePrintStats(const asio::error_code &ec); @@ -376,12 +400,6 @@ private: std::map<restinio::connection_id_t, http::ListenerSession> listeners_; // Connection Listener observing conn state changes. std::shared_ptr<ConnectionListener> connListener_; - - struct PushSessionContext { - std::mutex lock; - std::string sessionId; - PushSessionContext(const std::string& id) : sessionId(id) {} - }; struct PermanentPut { time_point expiration; std::string pushToken; diff --git a/src/dht_proxy_server.cpp b/src/dht_proxy_server.cpp index 3a90d36c542941613618e0c6032b9f3af11a5927..9fb728020cea9e8a5c220405887a0180ad28541d 100644 --- a/src/dht_proxy_server.cpp +++ b/src/dht_proxy_server.cpp @@ -402,33 +402,12 @@ DhtProxyServer::loadState(Is& is, size_t size) { for (auto& pushListener : pushListeners_) { for (auto& listeners : pushListener.second.listeners) { for (auto& listener : listeners.second) { + // start listening listener.internalToken = dht_->listen(listeners.first, - [this, infoHash=listeners.first, pushToken=pushListener.first, type=listener.type, clientId=listener.clientId, sessionCtx = listener.sessionCtx, topic=listener.topic] - (const std::vector<std::shared_ptr<Value>>& values, bool expired) { - // Build message content - Json::Value json; - json["key"] = infoHash.toString(); - json["to"] = clientId; - json["t"] = Json::Value::Int64(std::chrono::duration_cast<std::chrono::milliseconds>(system_clock::now().time_since_epoch()).count()); - { - std::lock_guard<std::mutex> l(sessionCtx->lock); - json["s"] = sessionCtx->sessionId; - } - if (expired and values.size() < 2){ - std::ostringstream ss; - for(size_t i = 0; i < values.size(); ++i){ - if(i != 0) ss << ","; - ss << values[i]->id; - } - json["exp"] = ss.str(); - } - auto maxPrio = 1000u; - for (const auto& v : values) - maxPrio = std::min(maxPrio, v->priority); - sendPushNotification(pushToken, std::move(json), type, !expired and maxPrio == 0, topic); - return true; - } - ); + std::bind(&DhtProxyServer::handlePushListen, this, + listeners.first, pushListener.first, listener.type, + listener.clientId, listener.sessionCtx, listener.topic, + std::placeholders::_1, std::placeholders::_2)); // expire notify listener.expireNotifyTimer = std::make_unique<asio::steady_timer>(io_context(), listener.expiration - proxy::OP_MARGIN); auto jsonProvider = [infoHash = listeners.first.toString(), clientId = listener.clientId, sessionCtx = listener.sessionCtx](){ @@ -897,13 +876,13 @@ DhtProxyServer::subscribe(restinio::request_handle_t request, // Send response if (not newListener) { if (logger_) - logger_->d("[proxy:server] [subscribe] found [client %s]", listener.clientId.c_str()); + logger_->d("[proxy:server] [subscribe %s] found [client %s]", infoHash.toString().c_str(), listener.clientId.c_str()); // Send response header auto response = std::make_shared<ResponseByPartsBuilder>(initHttpResponse(request->create_response<ResponseByParts>())); response->flush(); if (!root["refresh"].asBool()) { // No Refresh - dht_->get(infoHash, [this, response](const Sp<Value>& value){ + dht_->get(infoHash, [this, response](const Sp<Value>& value) { auto output = Json::writeString(jsonBuilder_, value->toJson()) + "\n"; response->append_chunk(output); response->flush(); @@ -920,33 +899,13 @@ DhtProxyServer::subscribe(restinio::request_handle_t request, } else { // =========== No existing listener for an infoHash ============ // Add listen on dht - listener.internalToken = dht_->listen(infoHash, - [this, infoHash, pushToken, type, clientId, sessionCtx = listener.sessionCtx, topic] - (const std::vector<std::shared_ptr<Value>>& values, bool expired){ - // Build message content - Json::Value json; - json["key"] = infoHash.toString(); - json["to"] = clientId; - json["t"] = Json::Value::Int64(std::chrono::duration_cast<std::chrono::milliseconds>(system_clock::now().time_since_epoch()).count()); - { - std::lock_guard<std::mutex> l(sessionCtx->lock); - json["s"] = sessionCtx->sessionId; - } - if (expired and values.size() < 2){ - std::ostringstream ss; - for(size_t i = 0; i < values.size(); ++i) { - if(i != 0) ss << ","; - ss << values[i]->id; - } - json["exp"] = ss.str(); - } - auto maxPrio = 1000u; - for (const auto& v : values) - maxPrio = std::min(maxPrio, v->priority); - sendPushNotification(pushToken, std::move(json), type, !expired and maxPrio == 0, topic); - return true; - } - ); + if (logger_) + logger_->d("[proxy:server] [subscribe %s] new", infoHash.toString().c_str()); + listener.internalToken = dht_->listen(infoHash, std::bind(&DhtProxyServer::handlePushListen, this, + infoHash, pushToken, type, clientId, + listener.sessionCtx, topic, + std::placeholders::_1, std::placeholders::_2)); + // Send response header auto response = initHttpResponse(request->create_response()); response.set_body("{}\n"); return response.done(); @@ -1050,6 +1009,50 @@ DhtProxyServer::handleCancelPushListen(const asio::error_code &ec, const std::st pushListeners_.erase(pushListener); } +bool +DhtProxyServer::handlePushListen(const InfoHash infoHash, const std::string pushToken, PushType type, const std::string clientId, + const std::shared_ptr<DhtProxyServer::PushSessionContext> sessionCtx, const std::string topic, + const std::vector<std::shared_ptr<Value>>& values, bool expired) +{ + Json::Value json; + json["key"] = infoHash.toString(); + json["to"] = clientId; + using namespace std::chrono; + json["t"] = Json::Value::Int64(duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count()); + + { + std::lock_guard<std::mutex> l(sessionCtx->lock); + json["s"] = sessionCtx->sessionId; + } + + // Build a comma-separated list of ids from the values. This is intended to be used when + // streaming from the iOS notification extension in order to filter out unwanted values. + std::string ids; + for(const auto& value : values) { + if (!ids.empty()) ids += ","; + ids += std::to_string(value->id); + } + json["ids"] = ids; + + // If message is expired copy the value ID to the `exp` field. + // This is presumably used by the Android notification system. + if (expired && values.size() < 2) { + json["exp"] = json["ids"]; + } + + auto minPriority = 1000u; + for (const auto& v : values) + minPriority = std::min(minPriority, v->priority); + + if (logger_) + logger_->d("[proxy:server] [listen %s] [client %s] [session %s] [expired %i] [priority %i] [values %zu]", + infoHash.toString().c_str(), clientId.c_str(), sessionCtx->sessionId.c_str(), expired, minPriority, values.size()); + + sendPushNotification(pushToken, std::move(json), type, !expired and minPriority == 0, topic); + + return true; +} + void DhtProxyServer::sendPushNotification(const std::string& token, Json::Value&& json, PushType type, bool highPriority, const std::string& topic) {