Skip to content
Snippets Groups Projects
Commit baf43d96 authored by Seva's avatar Seva
Browse files

dhtproxy: encapsulate server expire handlers

parent 0f9df9a6
No related branches found
No related tags found
No related merge requests found
...@@ -217,7 +217,7 @@ private: ...@@ -217,7 +217,7 @@ private:
RequestStatus put(restinio::request_handle_t request, RequestStatus put(restinio::request_handle_t request,
restinio::router::route_params_t params); restinio::router::route_params_t params);
void cancelPut(const InfoHash& key, Value::Id vid); void handleCancelPermamentPut(const asio::error_code &ec, const InfoHash& key, Value::Id vid);
#ifdef OPENDHT_PROXY_SERVER_IDENTITY #ifdef OPENDHT_PROXY_SERVER_IDENTITY
/** /**
...@@ -299,17 +299,29 @@ private: ...@@ -299,17 +299,29 @@ private:
*/ */
void sendPushNotification(const std::string& key, Json::Value&& json, bool isAndroid) const; void sendPushNotification(const std::string& key, Json::Value&& json, bool isAndroid) const;
/**
* Send push notification with an expire timeout.
* @param ec
* @param pushToken
* @param json
* @param isAndroid
*/
void handleNotifyPushListenExpire(const asio::error_code &ec, const std::string pushToken,
Json::Value json, const bool isAndroid);
/** /**
* Remove a push listener between a client and a hash * Remove a push listener between a client and a hash
* @param ec
* @param pushToken * @param pushToken
* @param key * @param key
* @param clientId * @param clientId
*/ */
void cancelPushListen(const std::string& pushToken, const InfoHash& key, const std::string& clientId); void handleCancelPushListen(const asio::error_code &ec, const std::string pushToken,
const InfoHash key, const std::string clientId);
#endif //OPENDHT_PUSH_NOTIFICATIONS #endif //OPENDHT_PUSH_NOTIFICATIONS
void asyncPrintStats(); void handlePrintStats(const asio::error_code &ec);
using clock = std::chrono::steady_clock; using clock = std::chrono::steady_clock;
using time_point = clock::time_point; using time_point = clock::time_point;
......
...@@ -102,7 +102,7 @@ DhtProxyServer::DhtProxyServer(std::shared_ptr<DhtRunner> dht, in_port_t port, ...@@ -102,7 +102,7 @@ DhtProxyServer::DhtProxyServer(std::shared_ptr<DhtRunner> dht, in_port_t port,
printStatsTimer_ = std::make_unique<asio::steady_timer>( printStatsTimer_ = std::make_unique<asio::steady_timer>(
httpServer_->io_context(), PRINT_STATS_PERIOD); httpServer_->io_context(), PRINT_STATS_PERIOD);
printStatsTimer_->async_wait(std::bind(&DhtProxyServer::asyncPrintStats, this)); printStatsTimer_->async_wait(std::bind(&DhtProxyServer::handlePrintStats, this, std::placeholders::_1));
} }
DhtProxyServer::~DhtProxyServer() DhtProxyServer::~DhtProxyServer()
...@@ -172,8 +172,14 @@ DhtProxyServer::updateStats() const ...@@ -172,8 +172,14 @@ DhtProxyServer::updateStats() const
} }
void void
DhtProxyServer::asyncPrintStats() DhtProxyServer::handlePrintStats(const asio::error_code &ec)
{ {
if (ec == asio::error::operation_aborted)
return;
else if (ec){
if (logger_)
logger_->e("[proxy:server] [stats] error printing: %s", ec.message().c_str());
}
if (httpServer_->io_context().stopped()) if (httpServer_->io_context().stopped())
return; return;
...@@ -189,7 +195,7 @@ DhtProxyServer::asyncPrintStats() ...@@ -189,7 +195,7 @@ DhtProxyServer::asyncPrintStats()
logger_->d("[proxy:server] [stats] %s", str.c_str()); logger_->d("[proxy:server] [stats] %s", str.c_str());
} }
printStatsTimer_->expires_at(printStatsTimer_->expiry() + PRINT_STATS_PERIOD); printStatsTimer_->expires_at(printStatsTimer_->expiry() + PRINT_STATS_PERIOD);
printStatsTimer_->async_wait(std::bind(&DhtProxyServer::asyncPrintStats, this)); printStatsTimer_->async_wait(std::bind(&DhtProxyServer::handlePrintStats, this, std::placeholders::_1));
} }
template <typename HttpResponse> template <typename HttpResponse>
...@@ -518,28 +524,25 @@ DhtProxyServer::subscribe(restinio::request_handle_t request, ...@@ -518,28 +524,25 @@ DhtProxyServer::subscribe(restinio::request_handle_t request,
return true; return true;
} }
); );
// Init & set timers
auto &ctx = httpServer_->io_context();
listener.expireTimer = std::make_unique<asio::steady_timer>(ctx, timeout);
listener.expireNotifyTimer = std::make_unique<asio::steady_timer>(ctx,
timeout - proxy::OP_MARGIN);
// Launch timers // Launch timers
listener.expireTimer->async_wait(std::bind( auto &ctx = httpServer_->io_context();
&DhtProxyServer::cancelPushListen, this, pushToken, infoHash, clientId)); // expire notify
if (!listener.expireNotifyTimer)
listener.expireNotifyTimer->async_wait( listener.expireNotifyTimer = std::make_unique<asio::steady_timer>(ctx, timeout - proxy::OP_MARGIN);
[this, infoHash, pushToken, isAndroid, clientId](const asio::error_code &ec){ else
if (logger_) listener.expireNotifyTimer->expires_at(timeout - proxy::OP_MARGIN);
logger_->d("[proxy:server] [subscribe] sending refresh %s", infoHash.toString().c_str());
if (ec){
if (logger_)
logger_->d("[proxy:server] [subscribe] error sending refresh: %s", ec.message().c_str());
}
Json::Value json; Json::Value json;
json["timeout"] = infoHash.toString(); json["timeout"] = infoHash.toString();
json["to"] = clientId; json["to"] = clientId;
sendPushNotification(pushToken, std::move(json), isAndroid); listener.expireNotifyTimer->async_wait(std::bind(&DhtProxyServer::handleNotifyPushListenExpire, this,
}); std::placeholders::_1, pushToken, json, isAndroid));
// cancel push listen
if (!listener.expireTimer)
listener.expireTimer = std::make_unique<asio::steady_timer>(ctx, timeout);
else
listener.expireTimer->expires_at(timeout);
listener.expireTimer->async_wait(std::bind(&DhtProxyServer::handleCancelPushListen, this,
std::placeholders::_1, pushToken, infoHash, clientId));
auto response = this->initHttpResponse(request->create_response()); auto response = this->initHttpResponse(request->create_response());
response.set_body("{}\n"); response.set_body("{}\n");
return response.done(); return response.done();
...@@ -569,6 +572,8 @@ DhtProxyServer::unsubscribe(restinio::request_handle_t request, ...@@ -569,6 +572,8 @@ DhtProxyServer::unsubscribe(restinio::request_handle_t request,
response.set_body(RESP_MSG_SERVICE_UNAVAILABLE); response.set_body(RESP_MSG_SERVICE_UNAVAILABLE);
return response.done(); return response.done();
} }
if (logger_)
logger_->d("[proxy:server] [unsubscribe %s]", infoHash.toString().c_str());
try { try {
std::string err; std::string err;
...@@ -588,7 +593,7 @@ DhtProxyServer::unsubscribe(restinio::request_handle_t request, ...@@ -588,7 +593,7 @@ DhtProxyServer::unsubscribe(restinio::request_handle_t request,
return restinio::request_handling_status_t::rejected; return restinio::request_handling_status_t::rejected;
auto clientId = root["client_id"].asString(); auto clientId = root["client_id"].asString();
cancelPushListen(pushToken, infoHash, clientId); handleCancelPushListen(asio::error_code() /*success*/, pushToken, infoHash, clientId);
auto response = this->initHttpResponse(request->create_response()); auto response = this->initHttpResponse(request->create_response());
return response.done(); return response.done();
} }
...@@ -601,8 +606,31 @@ DhtProxyServer::unsubscribe(restinio::request_handle_t request, ...@@ -601,8 +606,31 @@ DhtProxyServer::unsubscribe(restinio::request_handle_t request,
} }
void void
DhtProxyServer::cancelPushListen(const std::string& pushToken, const dht::InfoHash& key, const std::string& clientId) DhtProxyServer::handleNotifyPushListenExpire(const asio::error_code &ec, const std::string pushToken,
Json::Value json, const bool isAndroid)
{
if (ec == asio::error::operation_aborted)
return;
else if (ec){
if (logger_)
logger_->e("[proxy:server] [subscribe] error sending put refresh: %s", ec.message().c_str());
}
if (logger_)
logger_->d("[proxy:server] [subscribe] sending put refresh to %s token", pushToken.c_str());
sendPushNotification(pushToken, std::move(json), isAndroid);
}
void
DhtProxyServer::handleCancelPushListen(const asio::error_code &ec, const std::string pushToken,
const dht::InfoHash key, const std::string clientId)
{ {
if (ec == asio::error::operation_aborted)
return;
else if (ec){
if (logger_)
logger_->e("[proxy:server] [listen:push %s] error cancel: %s",
key.toString().c_str(), ec.message().c_str());
}
if (logger_) if (logger_)
logger_->d("[proxy:server] [listen:push %s] cancelled for %s", logger_->d("[proxy:server] [listen:push %s] cancelled for %s",
key.toString().c_str(), clientId.c_str()); key.toString().c_str(), clientId.c_str());
...@@ -710,10 +738,16 @@ DhtProxyServer::sendPushNotification(const std::string& token, Json::Value&& jso ...@@ -710,10 +738,16 @@ DhtProxyServer::sendPushNotification(const std::string& token, Json::Value&& jso
#endif //OPENDHT_PUSH_NOTIFICATIONS #endif //OPENDHT_PUSH_NOTIFICATIONS
void void
DhtProxyServer::cancelPut(const InfoHash& key, Value::Id vid) DhtProxyServer::handleCancelPermamentPut(const asio::error_code &ec, const InfoHash& key, Value::Id vid)
{ {
if (ec == asio::error::operation_aborted)
return;
else if (ec){
if (logger_) if (logger_)
logger_->d("[proxy:server] [put %s] cancel put %i", key.toString().c_str(), vid); logger_->e("[proxy:server] [put:permament] error sending put refresh: %s", ec.message().c_str());
}
if (logger_)
logger_->d("[proxy:server] [put %s] cancel permament put %i", key.toString().c_str(), vid);
auto sPuts = puts_.find(key); auto sPuts = puts_.find(key);
if (sPuts == puts_.end()) if (sPuts == puts_.end())
return; return;
...@@ -781,36 +815,29 @@ DhtProxyServer::put(restinio::request_handle_t request, ...@@ -781,36 +815,29 @@ DhtProxyServer::put(restinio::request_handle_t request,
auto& pput = r.first->second; auto& pput = r.first->second;
if (r.second){ if (r.second){
auto &ctx = httpServer_->io_context(); auto &ctx = httpServer_->io_context();
// cancel permanent put
if (!pput.expireTimer)
pput.expireTimer = std::make_unique<asio::steady_timer>(ctx, timeout); pput.expireTimer = std::make_unique<asio::steady_timer>(ctx, timeout);
pput.expireTimer->async_wait([this, infoHash, vid](const asio::error_code &ec){ else
if (logger_) pput.expireTimer->expires_at(timeout);
logger_->d("[proxy:server] [put %s] permanent expired: %i", infoHash.toString().c_str(), vid); pput.expireTimer->async_wait(std::bind(&DhtProxyServer::handleCancelPermamentPut, this,
if (ec){ std::placeholders::_1, infoHash, vid));
if (logger_)
logger_->e("[proxy:server] error in permanent put: %s", ec.message().c_str());
}
cancelPut(infoHash, vid);
});
#ifdef OPENDHT_PUSH_NOTIFICATIONS #ifdef OPENDHT_PUSH_NOTIFICATIONS
if (not pushToken.empty()){ if (not pushToken.empty()){
// notify push listen expire
bool isAndroid = platform == "android"; bool isAndroid = platform == "android";
pput.expireNotifyTimer = std::make_unique<asio::steady_timer>(ctx,
timeout - proxy::OP_MARGIN);
pput.expireNotifyTimer->async_wait(
[this, infoHash, vid, pushToken, clientId, isAndroid](const asio::error_code &ec)
{
if (logger_)
logger_->d("[proxy:server] [put %s] refresh: %i", infoHash.toString().c_str(), vid);
if (ec){
if (logger_)
logger_->e("[proxy:server] error in refresh put: %s", ec.message().c_str());
}
Json::Value json; Json::Value json;
json["timeout"] = infoHash.toString(); json["timeout"] = infoHash.toString();
json["to"] = clientId; json["to"] = clientId;
json["vid"] = std::to_string(vid); json["vid"] = std::to_string(vid);
sendPushNotification(pushToken, std::move(json), isAndroid); if (!pput.expireNotifyTimer)
}); pput.expireNotifyTimer = std::make_unique<asio::steady_timer>(ctx,
timeout - proxy::OP_MARGIN);
else
pput.expireNotifyTimer->expires_at(timeout - proxy::OP_MARGIN);
pput.expireNotifyTimer->async_wait(std::bind(
&DhtProxyServer::handleNotifyPushListenExpire, this,
std::placeholders::_1, pushToken, json, isAndroid));
} }
#endif #endif
} else { } else {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment