Skip to content
Snippets Groups Projects
Unverified Commit a0d77e7b authored by Adrien Béraud's avatar Adrien Béraud Committed by GitHub
Browse files

Merge pull request #283 from savoirfairelinux/fixPush

proxy server, fix push listeners and avoid flooding nodes
parents b47a85fe 9421984a
No related branches found
No related tags found
No related merge requests found
...@@ -213,7 +213,13 @@ private: ...@@ -213,7 +213,13 @@ private:
*/ */
void sendPushNotification(const std::string& key, const Json::Value& json, bool isAndroid) const; void sendPushNotification(const std::string& key, const Json::Value& json, bool isAndroid) const;
void cancelPushListen(const std::string& pushToken, const InfoHash& key, proxy::ListenToken token); /**
* Remove a push listener between a client and a hash
* @param pushToken
* @param key
* @param clientId
*/
void cancelPushListen(const std::string& pushToken, const InfoHash& key, const std::string& clientId);
#endif //OPENDHT_PUSH_NOTIFICATIONS #endif //OPENDHT_PUSH_NOTIFICATIONS
......
...@@ -391,6 +391,7 @@ DhtProxyServer::subscribe(const std::shared_ptr<restbed::Session>& session) ...@@ -391,6 +391,7 @@ DhtProxyServer::subscribe(const std::shared_ptr<restbed::Session>& session)
} }
listeners->second.emplace_back(Listener{}); listeners->second.emplace_back(Listener{});
auto& listener = listeners->second.back(); auto& listener = listeners->second.back();
listener.clientId = clientId;
// New listener // New listener
pushListener->second.isAndroid = isAndroid; pushListener->second.isAndroid = isAndroid;
...@@ -410,8 +411,8 @@ DhtProxyServer::subscribe(const std::shared_ptr<restbed::Session>& session) ...@@ -410,8 +411,8 @@ DhtProxyServer::subscribe(const std::shared_ptr<restbed::Session>& session)
} }
); );
listener.expireJob = scheduler_.add(timeout, listener.expireJob = scheduler_.add(timeout,
[this, token, infoHash, pushToken] { [this, clientId, infoHash, pushToken] {
cancelPushListen(pushToken, infoHash, *token); cancelPushListen(pushToken, infoHash, clientId);
} }
); );
listener.expireNotifyJob = scheduler_.add(timeout - proxy::OP_MARGIN, listener.expireNotifyJob = scheduler_.add(timeout - proxy::OP_MARGIN,
...@@ -460,10 +461,9 @@ DhtProxyServer::unsubscribe(const std::shared_ptr<restbed::Session>& session) ...@@ -460,10 +461,9 @@ DhtProxyServer::unsubscribe(const std::shared_ptr<restbed::Session>& session)
} }
auto pushToken = root["key"].asString(); auto pushToken = root["key"].asString();
if (pushToken.empty()) return; if (pushToken.empty()) return;
auto token = unpackId(root, "token"); auto clientId = root["client_id"].asString();
if (token == 0) return;
cancelPushListen(pushToken, infoHash, token); cancelPushListen(pushToken, infoHash, clientId);
s->close(restbed::OK); s->close(restbed::OK);
} catch (...) { } catch (...) {
s->close(restbed::INTERNAL_SERVER_ERROR, "{\"err\":\"Internal server error\"}"); s->close(restbed::INTERNAL_SERVER_ERROR, "{\"err\":\"Internal server error\"}");
...@@ -473,9 +473,9 @@ DhtProxyServer::unsubscribe(const std::shared_ptr<restbed::Session>& session) ...@@ -473,9 +473,9 @@ DhtProxyServer::unsubscribe(const std::shared_ptr<restbed::Session>& session)
} }
void void
DhtProxyServer::cancelPushListen(const std::string& pushToken, const dht::InfoHash& key, proxy::ListenToken token) DhtProxyServer::cancelPushListen(const std::string& pushToken, const dht::InfoHash& key, const std::string& clientId)
{ {
std::cout << "cancelPushListen: " << key << " token:" << token << std::endl; std::cout << "cancelPushListen: " << key << " clientId:" << clientId << std::endl;
std::lock_guard<std::mutex> lock(lockListener_); std::lock_guard<std::mutex> lock(lockListener_);
auto pushListener = pushListeners_.find(pushToken); auto pushListener = pushListeners_.find(pushToken);
if (pushListener == pushListeners_.end()) if (pushListener == pushListeners_.end())
...@@ -484,7 +484,7 @@ DhtProxyServer::cancelPushListen(const std::string& pushToken, const dht::InfoHa ...@@ -484,7 +484,7 @@ DhtProxyServer::cancelPushListen(const std::string& pushToken, const dht::InfoHa
if (listeners == pushListener->second.listeners.end()) if (listeners == pushListener->second.listeners.end())
return; return;
for (auto listener = listeners->second.begin(); listener != listeners->second.end();) { for (auto listener = listeners->second.begin(); listener != listeners->second.end();) {
if (*listener->token == token) { if (listener->clientId == clientId) {
if (dht_) if (dht_)
dht_->cancelListen(key, std::move(listener->internalToken)); dht_->cancelListen(key, std::move(listener->internalToken));
listener = listeners->second.erase(listener); listener = listeners->second.erase(listener);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment