diff --git a/src/dht_proxy_server.cpp b/src/dht_proxy_server.cpp index 91a1f350f513451210045912c42565ffc68e0466..3db40c8910175f786ffbb2c4cb0727a821e9df6b 100644 --- a/src/dht_proxy_server.cpp +++ b/src/dht_proxy_server.cpp @@ -372,7 +372,9 @@ DhtProxyServer::subscribe(const std::shared_ptr<restbed::Session>& session) std::cout << "Subscribe " << infoHash << " token:" << tokenFromReq << " client:" << clientId << std::endl; { - std::lock_guard<std::mutex> lock(lockListener_); + std::lock(schedulerLock_, lockListener_); + std::lock_guard<std::mutex> lk1(lockListener_, std::adopt_lock); + std::lock_guard<std::mutex> lk2(schedulerLock_, std::adopt_lock); // Check if listener is already present and refresh timeout if launched // One push listener per pushToken.infoHash.clientId auto pushListener = pushListeners_.emplace(pushToken, PushListener{}).first; @@ -380,10 +382,7 @@ DhtProxyServer::subscribe(const std::shared_ptr<restbed::Session>& session) for (auto& listener: listeners->second) { if (listener.clientId == clientId) { *listener.token = tokenFromReq; - { - std::lock_guard<std::mutex> l(schedulerLock_); - scheduler_.edit(listener.expireJob, scheduler_.time() + proxy::OP_TIMEOUT); - } + scheduler_.edit(listener.expireJob, scheduler_.time() + proxy::OP_TIMEOUT); s->close(restbed::OK, "{\"token\": " + std::to_string(tokenFromReq) + "}\n"); schedulerCv_.notify_one(); return; @@ -409,7 +408,6 @@ DhtProxyServer::subscribe(const std::shared_ptr<restbed::Session>& session) return true; } ); - std::lock_guard<std::mutex> l(schedulerLock_); listener.expireJob = scheduler_.add(scheduler_.time() + proxy::OP_TIMEOUT, [this, token, infoHash, pushToken, isAndroid, clientId] { cancelPushListen(pushToken, infoHash, *token); @@ -422,6 +420,7 @@ DhtProxyServer::subscribe(const std::shared_ptr<restbed::Session>& session) } ); } + schedulerCv_.notify_one(); s->close(restbed::OK, "{\"token\": " + std::to_string(tokenFromReq) + "}\n"); } catch (...) { s->close(restbed::INTERNAL_SERVER_ERROR, "{\"err\":\"Internal server error\"}"); @@ -829,9 +828,9 @@ DhtProxyServer::removeClosedListeners(bool testSession) std::lock_guard<std::mutex> lock(lockListener_); auto listener = currentListeners_.begin(); while (listener != currentListeners_.end()) { - auto cancel = testSession ? dht_ && listener->session->is_closed() : static_cast<bool>(dht_); + auto cancel = dht_ and (not testSession or listener->session->is_closed()); if (cancel) { - dht_->cancelListen(listener->hash, std::move(listener->token.get())); + dht_->cancelListen(listener->hash, std::move(listener->token)); // Remove listener if unused listener = currentListeners_.erase(listener); } else {