diff --git a/include/opendht/dht_proxy_client.h b/include/opendht/dht_proxy_client.h index ce70a48d093bdacf473e34f050f1931d5df0ddc0..08bfb05fd281e27bdf9b87b57021f6230cc8ef83 100644 --- a/include/opendht/dht_proxy_client.h +++ b/include/opendht/dht_proxy_client.h @@ -344,6 +344,8 @@ private: NodeStatus statusIpv6_ {NodeStatus::Disconnected}; NodeStats stats4_ {}; NodeStats stats6_ {}; + SockAddr localAddrv4_; + SockAddr localAddrv6_; SockAddr publicAddressV4_; SockAddr publicAddressV6_; std::atomic_bool launchConnectedCbs_ {false}; diff --git a/include/opendht/http.h b/include/opendht/http.h index baeeed2e9cfafab4c5e094d1e377b3d9e322f416..2d8a561cc01f99fbf38d0fff55d9bd5145cdc8b0 100644 --- a/include/opendht/http.h +++ b/include/opendht/http.h @@ -114,6 +114,8 @@ public: void async_read(size_t bytes, BytesHandlerCb cb); void async_read_some(size_t bytes, BytesHandlerCb cb); + const asio::ip::address& local_address() const; + void timeout(const std::chrono::seconds timeout, HandlerCb cb = {}); void close(); @@ -142,6 +144,8 @@ private: asio::streambuf read_buf_; std::istream istream_; + asio::ip::address local_address_; + std::unique_ptr<asio::steady_timer> timeout_timer_; std::shared_ptr<dht::Logger> logger_; bool checkOcsp_ {false}; diff --git a/src/dht_proxy_client.cpp b/src/dht_proxy_client.cpp index 6085741c4c93ed5e3389c19b8827f968816b393a..dc4d3f07c82ca8b539bd5c2df35558c28bc1da0e 100644 --- a/src/dht_proxy_client.cpp +++ b/src/dht_proxy_client.cpp @@ -117,6 +117,9 @@ DhtProxyClient::DhtProxyClient( , loopSignal_(signal) , jsonReader_(Json::CharReaderBuilder{}.newCharReader()) { + localAddrv4_.setFamily(AF_INET); + localAddrv6_.setFamily(AF_INET6); + jsonBuilder_["commentStyle"] = "None"; jsonBuilder_["indentation"] = ""; if (logger_) { @@ -658,6 +661,12 @@ DhtProxyClient::queryProxyInfo(const Sp<InfoState>& infoState, const Sp<http::Re if (!jsonReader_->parse(response.body.data(), response.body.data() + response.body.size(), &proxyInfos, &err)){ onProxyInfos(Json::Value{}, family); } else if (not infoState->cancel) { + if (auto req = response.request.lock()) { + if (auto conn = req->get_connection()) { + const auto& localAddr = conn->local_address(); + proxyInfos["local_ip"] = localAddr.to_string(); + } + } onProxyInfos(proxyInfos, family); } } @@ -689,10 +698,17 @@ DhtProxyClient::onProxyInfos(const Json::Value& proxyInfos, const sa_family_t fa std::unique_lock<std::mutex> l(lockCurrentProxyInfos_); auto oldStatus = std::max(statusIpv4_, statusIpv6_); auto& status = family == AF_INET ? statusIpv4_ : statusIpv6_; + auto ipChanged = false; + auto& pubAddress = family == AF_INET? publicAddressV4_ : publicAddressV6_; + auto& localAddress = family == AF_INET? localAddrv4_ : localAddrv6_; if (not proxyInfos.isMember("node_id")) { if (logger_) logger_->e("[proxy:client] [info] request failed for %s", family == AF_INET ? "ipv4" : "ipv6"); status = NodeStatus::Disconnected; + if (pubAddress) { + pubAddress = {}; + ipChanged = true; + } } else { if (logger_) logger_->d("[proxy:client] [info] got proxy reply for %s", @@ -701,19 +717,24 @@ DhtProxyClient::onProxyInfos(const Json::Value& proxyInfos, const sa_family_t fa myid = InfoHash(proxyInfos["node_id"].asString()); stats4_ = NodeStats(proxyInfos["ipv4"]); stats6_ = NodeStats(proxyInfos["ipv6"]); - if (stats4_.good_nodes + stats6_.good_nodes) + auto publicIp = parsePublicAddress(proxyInfos["public_ip"]); + ipChanged = pubAddress && pubAddress.toString() != publicIp.toString(); + pubAddress = publicIp; + + if (auto localIp = proxyInfos["local_ip"]) { + if (localAddress.toString() != localIp.asString()) { + localAddress.setAddress(localIp.asString().c_str()); + ipChanged = (bool)localAddress; + } + } + + if (!ipChanged && stats4_.good_nodes + stats6_.good_nodes) status = NodeStatus::Connected; - else if (stats4_.dubious_nodes + stats6_.dubious_nodes) + else if (!ipChanged && stats4_.dubious_nodes + stats6_.dubious_nodes) status = NodeStatus::Connecting; else status = NodeStatus::Disconnected; - auto publicIp = parsePublicAddress(proxyInfos["public_ip"]); - auto publicFamily = publicIp.getFamily(); - if (publicFamily == AF_INET) - publicAddressV4_ = publicIp; - else if (publicFamily == AF_INET6) - publicAddressV6_ = publicIp; } catch (const std::exception& e) { if (logger_) logger_->e("[proxy:client] [info] error processing: %s", e.what()); @@ -739,7 +760,10 @@ DhtProxyClient::onProxyInfos(const Json::Value& proxyInfos, const sa_family_t fa nextProxyConfirmationTimer_->async_wait(std::bind(&DhtProxyClient::handleProxyConfirm, this, std::placeholders::_1)); } else if (newStatus == NodeStatus::Disconnected) { - nextProxyConfirmationTimer_->expires_at(std::chrono::steady_clock::now() + std::chrono::minutes(1)); + auto next = std::chrono::steady_clock::now(); + if (!ipChanged) + next += std::chrono::minutes(1); + nextProxyConfirmationTimer_->expires_at(next); nextProxyConfirmationTimer_->async_wait(std::bind(&DhtProxyClient::handleProxyConfirm, this, std::placeholders::_1)); } l.unlock(); diff --git a/src/http.cpp b/src/http.cpp index c0d2f25f876d66133fef3ae4e67b584363ecae6c..93d2eb4e3ab963a9622e7104e8bca1db897d53c5 100644 --- a/src/http.cpp +++ b/src/http.cpp @@ -569,9 +569,10 @@ Connection::async_connect(std::vector<asio::ip::tcp::endpoint>&& endpoints, Conn #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wunused-variable" - ConnectHandlerCb wcb = [&base, cb=std::move(cb)](const asio::error_code& ec, const asio::ip::tcp::endpoint& endpoint) { + ConnectHandlerCb wcb = [this, &base, cb=std::move(cb)](const asio::error_code& ec, const asio::ip::tcp::endpoint& endpoint) { if (!ec) { auto socket = base.native_handle(); + local_address_ = base.local_endpoint().address(); // Once connected, set a keep alive on the TCP socket with 30 seconds delay // This will generate broken pipes as soon as possible. // Note this needs to be done once connected to have a valid native_handle() @@ -727,6 +728,12 @@ Connection::async_read_some(size_t bytes, BytesHandlerCb cb) else socket_->async_read_some(buf, onEnd); } +const asio::ip::address& +Connection::local_address() const +{ + return local_address_; +} + void Connection::timeout(const std::chrono::seconds timeout, HandlerCb cb) {