diff --git a/include/opendht/dht_proxy_client.h b/include/opendht/dht_proxy_client.h index de689cc84193b7b660937a78d76e2c2e44b5057e..0b260331d4619771b1573b1fdb392de2d6c285e6 100644 --- a/include/opendht/dht_proxy_client.h +++ b/include/opendht/dht_proxy_client.h @@ -307,10 +307,10 @@ private: */ struct Listener; struct ProxySearch; - std::map<InfoHash, ProxySearch> searches_; size_t listener_token_ {0}; - std::mutex lockListener_; + std::map<InfoHash, ProxySearch> searches_; + mutable std::mutex searchLock_; /** * Store current put and get requests. diff --git a/src/dht_proxy_client.cpp b/src/dht_proxy_client.cpp index 8876b22a4c1e07c09371daf70dc3580f2da5af04..a70061367a11bcc74209788a0337ef091b62112d 100644 --- a/src/dht_proxy_client.cpp +++ b/src/dht_proxy_client.cpp @@ -48,8 +48,12 @@ struct DhtProxyClient::Listener Sp<bool> isCanceledViaClose; Sp<unsigned> pushNotifToken; // NOTE: unused if not using push notifications Sp<Scheduler::Job> refreshJob; - Listener(ValueCache&& c, const Sp<restbed::Request>& r, Value::Filter&& f) - : cache(std::move(c)), filter(std::move(f)), req(r), isCanceledViaClose(std::make_shared<bool>(false)) + Listener(ValueCache&& c, Sp<Scheduler::Job>&& j, const Sp<restbed::Request>& r, Value::Filter&& f) + : cache(std::move(c)), + cacheExpirationJob(std::move(j)), + filter(std::move(f)), + req(r), + isCanceledViaClose(std::make_shared<bool>(false)) {} }; @@ -97,6 +101,7 @@ DhtProxyClient::~DhtProxyClient() std::vector<Sp<Value>> DhtProxyClient::getLocal(const InfoHash& k, Value::Filter filter) const { + std::lock_guard<std::mutex> lock(searchLock_); auto s = searches_.find(k); if (s == searches_.end()) return {}; @@ -105,6 +110,7 @@ DhtProxyClient::getLocal(const InfoHash& k, Value::Filter filter) const { Sp<Value> DhtProxyClient::getLocalById(const InfoHash& k, Value::Id id) const { + std::lock_guard<std::mutex> lock(searchLock_); auto s = searches_.find(k); if (s == searches_.end()) return {}; @@ -131,7 +137,7 @@ DhtProxyClient::cancelAllOperations() void DhtProxyClient::cancelAllListeners() { - std::lock_guard<std::mutex> lock(lockListener_); + std::lock_guard<std::mutex> lock(searchLock_); for (auto& s: searches_) { for (auto& l : s.second.listeners) if (l.second.thread.joinable()) { @@ -300,6 +306,7 @@ DhtProxyClient::put(const InfoHash& key, Sp<Value> val, DoneCallback cb, time_po val->id = rand_id(rdev); } if (permanent) { + std::lock_guard<std::mutex> lock(searchLock_); auto id = val->id; auto search = searches_.emplace(key, ProxySearch{}).first; auto nextRefresh = scheduler.time() + OP_TIMEOUT; @@ -591,7 +598,7 @@ DhtProxyClient::doListen(const InfoHash& key, ValueCallback cb, Value::Filter fi auto req = std::make_shared<restbed::Request>(uri); req->set_method(deviceKey_.empty() ? "LISTEN" : "SUBSCRIBE"); - std::lock_guard<std::mutex> lock(lockListener_); + std::lock_guard<std::mutex> lock(searchLock_); auto search = searches_.find(key); if (search == searches_.end()) { std::cerr << "doListen: search not found" << std::endl; @@ -599,22 +606,24 @@ DhtProxyClient::doListen(const InfoHash& key, ValueCallback cb, Value::Filter fi } auto token = ++listener_token_; - auto l = search->second.listeners.emplace(token, Listener{ - ValueCache(cb), req, std::move(filter) - }).first; - if (not l->second.cacheExpirationJob) { - l->second.cacheExpirationJob = scheduler.add(time_point::max(), [this, key, token]{ - auto s = searches_.find(key); - if (s == searches_.end()) { - return; - } - auto l = s->second.listeners.find(token); - if (l == s->second.listeners.end()) { - return; - } - auto next = l->second.cache.expireValues(scheduler.time()); - scheduler.edit(l->second.cacheExpirationJob, next); - }); + auto l = search->second.listeners.find(token); + if (l == search->second.listeners.end()) { + auto f = filter; + l = search->second.listeners.emplace(token, Listener { + ValueCache(cb), scheduler.add(time_point::max(), [this, key, token]{ + std::lock_guard<std::mutex> lock(searchLock_); + auto s = searches_.find(key); + if (s == searches_.end()) { + return; + } + auto l = s->second.listeners.find(token); + if (l == s->second.listeners.end()) { + return; + } + auto next = l->second.cache.expireValues(scheduler.time()); + scheduler.edit(l->second.cacheExpirationJob, next); + }), req, std::move(f) + }).first; } ValueCache& cache = l->second.cache; @@ -720,7 +729,7 @@ DhtProxyClient::doListen(const InfoHash& key, ValueCallback cb, Value::Filter fi bool DhtProxyClient::doCancelListen(const InfoHash& key, size_t ltoken) { - std::lock_guard<std::mutex> lock(lockListener_); + std::lock_guard<std::mutex> lock(searchLock_); auto search = searches_.find(key); if (search == searches_.end()) @@ -794,7 +803,7 @@ DhtProxyClient::getConnectivityStatus() void DhtProxyClient::restartListeners() { - std::lock_guard<std::mutex> lock(lockListener_); + std::lock_guard<std::mutex> lock(searchLock_); for (auto& search: searches_) { for (auto& l: search.second.listeners) { auto& listener = l.second; @@ -870,6 +879,7 @@ DhtProxyClient::pushNotificationReceived(const std::map<std::string, std::string { #if OPENDHT_PUSH_NOTIFICATIONS try { + std::lock_guard<std::mutex> lock(searchLock_); auto token = std::stoul(notification.at("token")); for (auto& search: searches_) { for (auto& list : search.second.listeners) {