diff --git a/include/opendht/http.h b/include/opendht/http.h index af1f1b9472bc792bf17d983e3e671d15d1edc1ca..e8aba77395237ab43de00c6ac3f7baff2f7953c7 100644 --- a/include/opendht/http.h +++ b/include/opendht/http.h @@ -35,6 +35,7 @@ #include <memory> #include <queue> +#include <mutex> namespace Json { class Value; @@ -122,6 +123,8 @@ private: }; } + mutable std::mutex mutex_; + unsigned int id_; static std::atomic_uint ids_; @@ -185,7 +188,7 @@ public: private: void resolve(const std::string& host, const std::string& service); - std::mutex mutex_; + mutable std::mutex mutex_; Url url_; asio::error_code ec_; @@ -312,6 +315,8 @@ private: void onBody(const char* at, size_t length); void onComplete(); + mutable std::mutex mutex_; + std::shared_ptr<dht::Logger> logger_; restinio::http_request_header_t header_; diff --git a/src/http.cpp b/src/http.cpp index ef3c4bdedef29d515c548702f6ed2e484afc6fe4..2eb644bbf318b6fca0dc00159575f9ea275efbe1 100644 --- a/src/http.cpp +++ b/src/http.cpp @@ -130,6 +130,7 @@ Connection::~Connection() { void Connection::close() { + std::lock_guard<std::mutex> lock(mutex_); asio::error_code ec; if (ssl_socket_) { if (ssl_socket_->is_open()) @@ -160,6 +161,7 @@ Connection::is_ssl() const void Connection::set_ssl_verification(const asio::ip::tcp::endpoint& endpoint, const asio::ssl::verify_mode verify_mode) { + std::lock_guard<std::mutex> lock(mutex_); if (ssl_socket_ and verify_mode != asio::ssl::verify_none) { ssl_socket_->asio_ssl_stream().set_verify_mode(verify_mode); ssl_socket_->asio_ssl_stream().set_verify_callback([ @@ -192,6 +194,7 @@ Connection::input() std::string Connection::read_bytes(size_t bytes) { + std::lock_guard<std::mutex> lock(mutex_); if (bytes == 0) bytes = read_buf_.in_avail(); std::string content; @@ -212,6 +215,7 @@ Connection::read_until(const char delim) void Connection::async_connect(std::vector<asio::ip::tcp::endpoint>&& endpoints, ConnectHandlerCb cb) { + std::lock_guard<std::mutex> lock(mutex_); if (ssl_socket_) asio::async_connect(ssl_socket_->lowest_layer(), std::move(endpoints), wrapCallabck(std::move(cb))); else if (socket_) @@ -223,6 +227,7 @@ Connection::async_connect(std::vector<asio::ip::tcp::endpoint>&& endpoints, Conn void Connection::async_handshake(HandlerCb cb) { + std::lock_guard<std::mutex> lock(mutex_); if (ssl_socket_) { std::weak_ptr<Connection> wthis = shared_from_this(); ssl_socket_->async_handshake(asio::ssl::stream<asio::ip::tcp::socket>::client, @@ -254,44 +259,61 @@ Connection::async_handshake(HandlerCb cb) void Connection::async_write(BytesHandlerCb cb) { - if (!is_open()) return; + std::lock_guard<std::mutex> lock(mutex_); + if (!is_open()) { + if (cb) ctx_.post([cb](){ cb(asio::error::broken_pipe, 0); }); + return; + } if (ssl_socket_) asio::async_write(*ssl_socket_, write_buf_, wrapCallabck(std::move(cb))); else if (socket_) asio::async_write(*socket_, write_buf_, wrapCallabck(std::move(cb))); - else if (cb) cb(asio::error::operation_aborted, 0); + else if (cb) ctx_.post([cb](){ cb(asio::error::operation_aborted, 0); }); } void Connection::async_read_until(const char* delim, BytesHandlerCb cb) { - if (!is_open()) return; + std::lock_guard<std::mutex> lock(mutex_); + if (!is_open()) { + if (cb) ctx_.post([cb](){ cb(asio::error::broken_pipe, 0); }); + return; + } if (ssl_socket_) asio::async_read_until(*ssl_socket_, read_buf_, delim, wrapCallabck(std::move(cb))); else if (socket_) asio::async_read_until(*socket_, read_buf_, delim, wrapCallabck(std::move(cb))); - else if (cb) cb(asio::error::operation_aborted, 0); + else if (cb) ctx_.post([cb](){ cb(asio::error::operation_aborted, 0); }); } void Connection::async_read_until(char delim, BytesHandlerCb cb) { - if (!is_open()) return; + std::lock_guard<std::mutex> lock(mutex_); + if (!is_open()) { + if (cb) ctx_.post([cb](){ cb(asio::error::broken_pipe, 0); }); + return; + } if (ssl_socket_) asio::async_read_until(*ssl_socket_, read_buf_, delim, wrapCallabck(std::move(cb))); else if (socket_) asio::async_read_until(*socket_, read_buf_, delim, wrapCallabck(std::move(cb))); - else if (cb) cb(asio::error::operation_aborted, 0); + else if (cb) ctx_.post([cb](){ cb(asio::error::operation_aborted, 0); }); } void Connection::async_read(size_t bytes, BytesHandlerCb cb) { - if (!is_open()) return; + std::lock_guard<std::mutex> lock(mutex_); + if (!is_open()) { + if (cb) ctx_.post([cb](){ cb(asio::error::broken_pipe, 0); }); + return; + } if (ssl_socket_) asio::async_read(*ssl_socket_, read_buf_, asio::transfer_exactly(bytes), wrapCallabck(std::move(cb))); else if (socket_) asio::async_read(*socket_, read_buf_, asio::transfer_exactly(bytes), wrapCallabck(std::move(cb))); - else if (cb) cb(asio::error::operation_aborted, 0); + else if (cb) ctx_.post([cb](){ cb(asio::error::operation_aborted, 0); }); } void Connection::async_read_some(size_t bytes, BytesHandlerCb cb) { + std::lock_guard<std::mutex> lock(mutex_); if (!is_open()) { - if (cb) cb(asio::error::operation_aborted, 0); + if (cb) ctx_.post([cb](){ cb(asio::error::broken_pipe, 0); }); return; } auto buf = read_buf_.prepare(bytes); @@ -754,6 +776,7 @@ Request::connect(std::vector<asio::ip::tcp::endpoint>&& endpoints, HandlerCb cb) if (not sthis) return; auto& this_ = *sthis; + std::lock_guard<std::mutex> lock(this_.mutex_); if (ec == asio::error::operation_aborted){ this_.terminate(ec); return; @@ -810,6 +833,7 @@ Request::send() std::vector<asio::ip::tcp::endpoint> endpoints) { if (auto sthis = wthis.lock()) { auto& this_ = *sthis; + std::lock_guard<std::mutex> lock(this_.mutex_); if (ec){ if (this_.logger_) this_.logger_->e("[http:request:%i] resolve error: %s", this_.id_, ec.message().c_str()); @@ -882,6 +906,7 @@ Request::terminate(const asio::error_code& ec) void Request::handle_request(const asio::error_code& ec) { + std::lock_guard<std::mutex> lock(mutex_); if (ec and ec != asio::error::eof){ terminate(ec); return; @@ -905,6 +930,7 @@ Request::handle_request(const asio::error_code& ec) void Request::handle_response(const asio::error_code& ec, size_t n_bytes) { + std::lock_guard<std::mutex> lock(mutex_); if (ec && ec != asio::error::eof){ terminate(ec); return;