diff --git a/include/opendht/dht_proxy_client.h b/include/opendht/dht_proxy_client.h index 80198a7f4f0a46c11e8f71db3eda91f44e7c6a3a..8f26a0cda35a94240066e141d4bd8cd6e7fc7603 100644 --- a/include/opendht/dht_proxy_client.h +++ b/include/opendht/dht_proxy_client.h @@ -349,6 +349,8 @@ private: */ asio::io_context httpContext_; std::shared_ptr<http::Resolver> resolver_; + + mutable std::mutex requestLock_; std::map<unsigned int /*id*/, std::shared_ptr<http::Request>> requests_; /* * Thread for executing the http io_context.run() blocking call diff --git a/src/dht_proxy_client.cpp b/src/dht_proxy_client.cpp index 275923dda998082d10a1ebcd5e92cb0437bcdafd..8159fb3e7071394242f9734967f31810d70b30fe 100644 --- a/src/dht_proxy_client.cpp +++ b/src/dht_proxy_client.cpp @@ -334,11 +334,15 @@ DhtProxyClient::get(const InfoHash& key, GetCallback cb, DoneCallback donecb, Va } loopSignal_(); } + std::lock_guard<std::mutex> l(requestLock_); requests_.erase(reqid); } }); + { + std::lock_guard<std::mutex> l(requestLock_); + requests_[reqid] = request; + } request->send(); - requests_[reqid] = request; } catch (const std::exception &e){ if (logger_) @@ -470,11 +474,15 @@ DhtProxyClient::doPut(const InfoHash& key, Sp<Value> val, DoneCallbackSimple cb, } if (cb) cb(ok); + std::lock_guard<std::mutex> l(requestLock_); requests_.erase(reqid); } }); + { + std::lock_guard<std::mutex> l(requestLock_); + requests_[reqid] = request; + } request->send(); - requests_[reqid] = request; } catch (const std::exception &e){ if (logger_) @@ -590,14 +598,17 @@ DhtProxyClient::queryProxyInfo(std::shared_ptr<InfoState> infoState, sa_family_t if (not infoState->cancel) onProxyInfos(proxyInfos, family); } + std::lock_guard<std::mutex> l(requestLock_); requests_.erase(reqid); } }); if (infoState->cancel.load()) return; - - requests_[reqid] = request; + { + std::lock_guard<std::mutex> l(requestLock_); + requests_[reqid] = request; + } request->send(); } catch (const std::exception &e){ @@ -686,10 +697,10 @@ DhtProxyClient::listen(const InfoHash& key, ValueCallback cb, Value::Filter filt if (logger_) logger_->d("[proxy:client] [listen] [search %s]", key.to_c_str()); + std::lock_guard<std::mutex> lock(searchLock_); auto& search = searches_[key]; auto query = std::make_shared<Query>(Select{}, std::move(where)); return search.ops.listen(cb, query, filter, [this, key](Sp<Query>, ValueCallback cb, SyncCallback) -> size_t { - std::lock_guard<std::mutex> lock(searchLock_); // Find search auto search = searches_.find(key); if (search == searches_.end()) { @@ -863,13 +874,15 @@ DhtProxyClient::handleExpireListener(const asio::error_code &ec, const InfoHash& requests_.erase(reqid); } }); + { + std::lock_guard<std::mutex> l(requestLock_); + requests_[reqid] = request; + } request->send(); - requests_[reqid] = request; } catch (const std::exception &e){ if (logger_) logger_->e("[proxy:client] [unsubscribe %s] failed: %s", key.to_c_str(), e.what()); - requests_.erase(reqid); } } else { // stop the request @@ -962,11 +975,15 @@ DhtProxyClient::sendListen(const restinio::http_request_header_t header, if (response.status_code == 0) opFailed(); } + std::lock_guard<std::mutex> l(requestLock_); requests_.erase(reqid); } }); + { + std::lock_guard<std::mutex> l(requestLock_); + requests_[reqid] = request; + } request->send(); - requests_[reqid] = request; } catch (const std::exception &e){ if (logger_)