/* * Copyright (C) 2014-2020 Savoir-faire Linux Inc. * Author: Sébastien Blin <sebastien.blin@savoirfairelinux.com> * Adrien Béraud <adrien.beraud@savoirfairelinux.com> * Vsevolod Ivanov <vsevolod.ivanov@savoirfairelinux.com> * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see <https://www.gnu.org/licenses/>. */ #include "dht_proxy_client.h" #include "dhtrunner.h" #include "op_cache.h" #include "utils.h" #include <http_parser.h> #include <deque> namespace dht { struct DhtProxyClient::InfoState { std::atomic_uint ipv4 {0}, ipv6 {0}; std::atomic_bool cancel {false}; }; struct DhtProxyClient::OperationState { std::atomic_bool ok {true}; std::atomic_bool stop {false}; }; struct DhtProxyClient::Listener { Listener(OpValueCache&& c): cache(std::move(c)) {} unsigned callbackId; OpValueCache cache; CacheValueCallback cb; Sp<OperationState> opstate; std::shared_ptr<http::Request> request; std::unique_ptr<asio::steady_timer> refreshSubscriberTimer; }; struct PermanentPut { PermanentPut(const Sp<Value>& v, std::unique_ptr<asio::steady_timer>&& j, const Sp<std::atomic_bool>& o): value(v), refreshPutTimer(std::move(j)), ok(o) {} Sp<Value> value; std::unique_ptr<asio::steady_timer> refreshPutTimer; Sp<std::atomic_bool> ok; }; struct DhtProxyClient::ProxySearch { SearchCache ops {}; std::unique_ptr<asio::steady_timer> opExpirationTimer; std::map<size_t, Listener> listeners {}; std::map<Value::Id, PermanentPut> puts {}; std::set<Sp<Value>> pendingPuts {}; }; struct LineSplit { void append(const char* d, size_t l) { buf_.insert(buf_.end(), d, d+l); } bool getLine(char c) { auto it = buf_.begin(); while (it != buf_.end()) { if (*(it++) == c) { line_.clear(); line_.insert(line_.end(), buf_.begin(), it); buf_.erase(buf_.begin(), it); return true; } } return false; } const std::string& line() const { return line_; } private: std::deque<char> buf_ {}; std::string line_ {}; }; std::string getRandomSessionId(size_t length = 8) { static constexpr const char chars[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789!#$%&()*+,./:;<=>?@[]^_`{|}~"; std::string str(length, 0); crypto::random_device rdev; std::uniform_int_distribution<> dist(0, (sizeof(chars)/sizeof(char)) - 2); std::generate_n( str.begin(), length, [&]{ return chars[dist(rdev)]; } ); return str; } DhtProxyClient::DhtProxyClient() {} DhtProxyClient::DhtProxyClient( std::shared_ptr<dht::crypto::Certificate> serverCA, dht::crypto::Identity clientIdentity, std::function<void()> signal, const std::string& serverHost, const std::string& pushClientId, std::shared_ptr<dht::Logger> logger) : DhtInterface(logger) , proxyUrl_(serverHost) , clientIdentity_(clientIdentity), serverCertificate_(serverCA) , pushClientId_(pushClientId), pushSessionId_(getRandomSessionId()) , loopSignal_(signal) , jsonReader_(Json::CharReaderBuilder{}.newCharReader()) { jsonBuilder_["commentStyle"] = "None"; jsonBuilder_["indentation"] = ""; if (logger_) { if (serverCertificate_) logger_->d("[proxy:client] using ca certificate for ssl:\n%s", serverCertificate_->toString(false/*chain*/).c_str()); if (clientIdentity_.first and clientIdentity_.second) logger_->d("[proxy:client] using client certificate for ssl:\n%s", clientIdentity_.second->toString(false/*chain*/).c_str()); } // run http client httpClientThread_ = std::thread([this](){ try { if (logger_) logger_->d("[proxy:client] starting io_context"); // Ensures the httpContext_ won't run out of work auto work = asio::make_work_guard(httpContext_); httpContext_.run(); if (logger_) logger_->d("[proxy:client] http client io_context stopped"); } catch(const std::exception& ex){ if (logger_) logger_->e("[proxy:client] run error: %s", ex.what()); } }); if (!proxyUrl_.empty()) startProxy(); } void DhtProxyClient::startProxy() { if (proxyUrl_.empty()) return; if (logger_) logger_->d("[proxy:client] start proxy with %s", proxyUrl_.c_str()); nextProxyConfirmationTimer_ = std::make_unique<asio::steady_timer>(httpContext_, std::chrono::steady_clock::now()); nextProxyConfirmationTimer_->async_wait(std::bind(&DhtProxyClient::handleProxyConfirm, this, std::placeholders::_1)); listenerRestartTimer_ = std::make_unique<asio::steady_timer>(httpContext_); loopSignal_(); } void DhtProxyClient::handleProxyConfirm(const asio::error_code &ec) { if (ec == asio::error::operation_aborted) return; else if (ec){ if (logger_) logger_->e("[proxy:client] confirm error: %s", ec.message().c_str()); return; } if (proxyUrl_.empty()) return; getConnectivityStatus(); } DhtProxyClient::~DhtProxyClient() { stop(); } void DhtProxyClient::stop() { if (not isDestroying_.exchange(true)) { resolver_.reset(); cancelAllListeners(); if (infoState_) infoState_->cancel = true; { std::lock_guard<std::mutex> lock(requestLock_); for (auto& request : requests_) request.second->cancel(); } if (not httpContext_.stopped()) httpContext_.stop(); if (httpClientThread_.joinable()) httpClientThread_.join(); requests_.clear(); } } std::vector<Sp<Value>> DhtProxyClient::getLocal(const InfoHash& k, const Value::Filter& filter) const { std::lock_guard<std::mutex> lock(searchLock_); auto s = searches_.find(k); if (s == searches_.end()) return {}; return s->second.ops.get(filter); } Sp<Value> DhtProxyClient::getLocalById(const InfoHash& k, Value::Id id) const { std::lock_guard<std::mutex> lock(searchLock_); auto s = searches_.find(k); if (s == searches_.end()) return {}; return s->second.ops.get(id); } void DhtProxyClient::cancelAllListeners() { std::lock_guard<std::mutex> lock(searchLock_); if (logger_) logger_->d("[proxy:client] [listeners] [%zu searches] cancel all", searches_.size()); for (auto& s: searches_) { s.second.ops.cancelAll([&](size_t token){ auto l = s.second.listeners.find(token); if (l == s.second.listeners.end()) return; l->second.opstate->stop.store(true); l->second.request->cancel(); // implicit request.reset() s.second.listeners.erase(token); }); } } void DhtProxyClient::shutdown(ShutdownCallback cb, bool) { stop(); if (cb) cb(); } NodeStatus DhtProxyClient::getStatus(sa_family_t af) const { std::lock_guard<std::mutex> l(lockCurrentProxyInfos_); switch (af) { case AF_INET: return statusIpv4_; case AF_INET6: return statusIpv6_; default: return NodeStatus::Disconnected; } } bool DhtProxyClient::isRunning(sa_family_t af) const { std::lock_guard<std::mutex> l(lockCurrentProxyInfos_); switch (af) { case AF_INET: return statusIpv4_ != NodeStatus::Disconnected; case AF_INET6: return statusIpv6_ != NodeStatus::Disconnected; default: return false; } } time_point DhtProxyClient::periodic(const uint8_t*, size_t, SockAddr, const time_point& /*now*/) { // Exec all currently stored callbacks decltype(callbacks_) callbacks; { std::lock_guard<std::mutex> lock(lockCallbacks_); callbacks = std::move(callbacks_); } for (auto& callback : callbacks) callback(); callbacks.clear(); return time_point::max(); } void DhtProxyClient::setHeaderFields(http::Request& request){ request.set_header_field(restinio::http_field_t::accept, "*/*"); request.set_header_field(restinio::http_field_t::content_type, "application/json"); } void DhtProxyClient::get(const InfoHash& key, GetCallback cb, DoneCallback donecb, Value::Filter&& f, Where&& w) { if (logger_) logger_->d("[proxy:client] [get] [search %s]", key.to_c_str()); if (isDestroying_) { if (donecb) donecb(false, {}); return; } try { auto request = buildRequest("/" + key.toString()); auto reqid = request->id(); //request->set_connection_type(restinio::http_connection_header_t::keep_alive); request->set_method(restinio::http_method_get()); setHeaderFields(*request); auto opstate = std::make_shared<OperationState>(); Value::Filter filter = w.empty() ? f : f.chain(w.getFilter()); auto rxBuf = std::make_shared<LineSplit>(); request->add_on_body_callback([this, key, opstate, filter, rxBuf, cb](const char* at, size_t length){ try { auto& b = *rxBuf; b.append(at, length); // one value per body line std::vector<Sp<Value>> values; while (b.getLine('\n') and !opstate->stop) { std::string err; Json::Value json; const auto& line = b.line(); if (!jsonReader_->parse(line.data(), line.data() + line.size(), &json, &err)){ opstate->ok.store(false); return; } auto value = std::make_shared<Value>(json); if ((not filter or filter(*value)) and cb) values.emplace_back(std::move(value)); } if (not values.empty() and cb) { { std::lock_guard<std::mutex> lock(lockCallbacks_); callbacks_.emplace_back([opstate, cb, values = std::move(values)](){ if (not opstate->stop.load() and not cb(values)){ opstate->stop.store(true); } }); } loopSignal_(); } } catch(const std::exception& e) { if (logger_) logger_->e("[proxy:client] [get %s] body parsing error: %s", key.to_c_str(), e.what()); opstate->ok.store(false); } }); request->add_on_done_callback([this, reqid, opstate, donecb, key] (const http::Response& response){ if (response.status_code != 200) { if (logger_) logger_->e("[proxy:client] [get %s] failed with code=%i", key.to_c_str(), response.status_code); opstate->ok.store(false); if (not response.aborted and response.status_code == 0) opFailed(); } if (donecb) { { std::lock_guard<std::mutex> lock(lockCallbacks_); callbacks_.emplace_back([donecb, opstate](){ donecb(opstate->ok, {}); opstate->stop.store(true); }); } loopSignal_(); } if (not isDestroying_) { std::lock_guard<std::mutex> l(requestLock_); requests_.erase(reqid); } }); { std::lock_guard<std::mutex> l(requestLock_); requests_[reqid] = request; } request->send(); } catch (const std::exception &e){ if (logger_) logger_->e("[proxy:client] [get %s] error: %s", key.to_c_str(), e.what()); } } void DhtProxyClient::put(const InfoHash& key, Sp<Value> val, DoneCallback cb, time_point created, bool permanent) { if (not val or isDestroying_) { if (cb) cb(false, {}); return; } if (logger_) logger_->d("[proxy:client] [put] [search %s]", key.to_c_str()); std::shared_ptr<std::atomic_bool> ok; if (permanent) { std::lock_guard<std::mutex> lock(searchLock_); ok = std::make_shared<std::atomic_bool>(true); auto& search = searches_[key]; if (val->id) { auto id = val->id; auto refreshPutTimer = std::make_unique<asio::steady_timer>(httpContext_, proxy::OP_TIMEOUT - proxy::OP_MARGIN); refreshPutTimer->async_wait(std::bind(&DhtProxyClient::handleRefreshPut, this, std::placeholders::_1, key, id)); search.puts.erase(id); search.puts.emplace(std::piecewise_construct, std::forward_as_tuple(id), std::forward_as_tuple(val, std::move(refreshPutTimer), ok)); } else { search.pendingPuts.emplace(val); } } doPut(key, val, [this, cb, ok](bool result){ if (ok) *ok = result; if (cb) { std::lock_guard<std::mutex> lock(lockCallbacks_); callbacks_.emplace_back([cb, result](){ cb(result, {}); }); } loopSignal_(); }, created, permanent); } void DhtProxyClient::handleRefreshPut(const asio::error_code &ec, InfoHash key, Value::Id id) { if (ec == asio::error::operation_aborted) return; else if (ec){ if (logger_) logger_->e("[proxy:client] [put] [refresh %s] %s", key.toString().c_str(), ec.message().c_str()); return; } if (logger_) logger_->d("[proxy:client] [put] [refresh %s]", key.to_c_str()); std::lock_guard<std::mutex> lock(searchLock_); auto search = searches_.find(key); if (search != searches_.end()) { auto p = search->second.puts.find(id); if (p != search->second.puts.end()){ doPut(key, p->second.value, [ok = p->second.ok](bool result){ *ok = result; }, time_point::max(), true); p->second.refreshPutTimer->expires_after(proxy::OP_TIMEOUT - proxy::OP_MARGIN); p->second.refreshPutTimer->async_wait(std::bind(&DhtProxyClient::handleRefreshPut, this, std::placeholders::_1, key, id)); } } } std::shared_ptr<http::Request> DhtProxyClient::buildRequest(const std::string& target) { auto resolver = resolver_; if (not resolver) resolver = std::make_shared<http::Resolver>(httpContext_, proxyUrl_, logger_); auto request = target.empty() ? std::make_shared<http::Request>(httpContext_, resolver) : std::make_shared<http::Request>(httpContext_, resolver, target); if (serverCertificate_) request->set_certificate_authority(serverCertificate_); if (clientIdentity_.first and clientIdentity_.second) request->set_identity(clientIdentity_); request->set_header_field(restinio::http_field_t::user_agent, "RESTinio client"); return request; } void DhtProxyClient::doPut(const InfoHash& key, Sp<Value> val, DoneCallbackSimple cb, time_point /*created*/, bool permanent) { if (logger_) logger_->d("[proxy:client] [put] [search %s] executing for %s", key.to_c_str(), val->toString().c_str()); try { auto request = buildRequest("/" + key.toString()); auto reqid = request->id(); request->set_method(restinio::http_method_post()); setHeaderFields(*request); auto json = val->toJson(); if (permanent) { if (deviceKey_.empty()) { json["permanent"] = true; } else { #ifdef OPENDHT_PUSH_NOTIFICATIONS Json::Value refresh; getPushRequest(refresh); json["permanent"] = refresh; #else json["permanent"] = true; #endif } } request->set_body(Json::writeString(jsonBuilder_, json)); request->add_on_done_callback([this, reqid, cb, val, key, permanent] (const http::Response& response){ bool ok = response.status_code == 200; if (ok) { if (val->id == Value::INVALID_ID) { std::string err; Json::Value parsedValue; if (jsonReader_->parse(response.body.data(), response.body.data() + response.body.size(), &parsedValue, &err)){ auto id = dht::Value(parsedValue).id; val->id = id; if (permanent) { std::lock_guard<std::mutex> lock(searchLock_); auto& search = searches_[key]; auto it = search.pendingPuts.find(val); if (it != search.pendingPuts.end()) { auto sok = std::make_shared<std::atomic_bool>(ok); auto refreshPutTimer = std::make_unique<asio::steady_timer>(httpContext_, proxy::OP_TIMEOUT - proxy::OP_MARGIN); refreshPutTimer->async_wait(std::bind(&DhtProxyClient::handleRefreshPut, this, std::placeholders::_1, key, id)); search.puts.emplace(std::piecewise_construct, std::forward_as_tuple(id), std::forward_as_tuple(val, std::move(refreshPutTimer), sok)); search.pendingPuts.erase(it); } } } else { if (logger_) logger_->e("[proxy:client] [status] failed to parse value from server", response.status_code); } } } else { if (logger_) logger_->e("[proxy:client] [status] failed with code=%i", response.status_code); if (not response.aborted and response.status_code == 0) opFailed(); } if (cb) cb(ok); if (not isDestroying_) { std::lock_guard<std::mutex> l(requestLock_); requests_.erase(reqid); } }); { std::lock_guard<std::mutex> l(requestLock_); requests_[reqid] = request; } request->send(); } catch (const std::exception &e){ if (logger_) logger_->e("[proxy:client] [put %s] error: %s", key.to_c_str(), e.what()); } } /** * Get data currently being put at the given hash. */ std::vector<Sp<Value>> DhtProxyClient::getPut(const InfoHash& key) const { std::vector<Sp<Value>> ret; auto search = searches_.find(key); if (search != searches_.end()) { ret.reserve(search->second.puts.size()); for (const auto& put : search->second.puts) ret.emplace_back(put.second.value); } return ret; } /** * Get data currently being put at the given hash with the given id. */ Sp<Value> DhtProxyClient::getPut(const InfoHash& key, const Value::Id& id) const { auto search = searches_.find(key); if (search == searches_.end()) return {}; auto val = search->second.puts.find(id); if (val == search->second.puts.end()) return {}; return val->second.value; } /** * Stop any put/announce operation at the given location, * for the value with the given id. */ bool DhtProxyClient::cancelPut(const InfoHash& key, const Value::Id& id) { auto search = searches_.find(key); if (search == searches_.end()) return false; if (logger_) logger_->d("[proxy:client] [put] [search %s] cancel", key.to_c_str()); return search->second.puts.erase(id) > 0; } NodeStats DhtProxyClient::getNodesStats(sa_family_t af) const { return af == AF_INET ? stats4_ : stats6_; } void DhtProxyClient::getProxyInfos() { if (logger_) logger_->d("[proxy:client] [info] requesting proxy server node information"); auto infoState = std::make_shared<InfoState>(); { std::lock_guard<std::mutex> l(lockCurrentProxyInfos_); if (infoState_) infoState_->cancel = true; infoState_ = infoState; if (statusIpv4_ == NodeStatus::Disconnected) statusIpv4_ = NodeStatus::Connecting; if (statusIpv6_ == NodeStatus::Disconnected) statusIpv6_ = NodeStatus::Connecting; } if (logger_) logger_->d("[proxy:client] [status] sending request"); auto resolver = std::make_shared<http::Resolver>(httpContext_, proxyUrl_, logger_); queryProxyInfo(infoState, resolver, AF_INET); queryProxyInfo(infoState, resolver, AF_INET6); resolver_ = resolver; } void DhtProxyClient::queryProxyInfo(const Sp<InfoState>& infoState, const Sp<http::Resolver>& resolver, sa_family_t family) { if (logger_) logger_->d("[proxy:client] [status] query ipv%i info", family == AF_INET ? 4 : 6); try { auto request = std::make_shared<http::Request>(httpContext_, resolver, family); if (serverCertificate_) request->set_certificate_authority(serverCertificate_); auto reqid = request->id(); request->set_method(restinio::http_method_get()); setHeaderFields(*request); request->add_on_done_callback([this, reqid, family, infoState] (const http::Response& response){ if (infoState->cancel.load()) return; if (response.status_code != 200) { if (logger_) logger_->e("[proxy:client] [status] ipv%i failed with code=%i", family == AF_INET ? 4 : 6, response.status_code); // pass along the failures if ((family == AF_INET and infoState->ipv4 == 0) or (family == AF_INET6 and infoState->ipv6 == 0)) onProxyInfos(Json::Value{}, family); } else { std::string err; Json::Value proxyInfos; if (!jsonReader_->parse(response.body.data(), response.body.data() + response.body.size(), &proxyInfos, &err)){ onProxyInfos(Json::Value{}, family); } else if (not infoState->cancel) { onProxyInfos(proxyInfos, family); } } if (not isDestroying_) { std::lock_guard<std::mutex> l(requestLock_); requests_.erase(reqid); } }); if (infoState->cancel.load()) return; { std::lock_guard<std::mutex> l(requestLock_); requests_[reqid] = request; } request->send(); } catch (const std::exception &e){ if (logger_) logger_->e("[proxy:client] [status] error sending request: %s", e.what()); } } void DhtProxyClient::onProxyInfos(const Json::Value& proxyInfos, const sa_family_t family) { if (isDestroying_) return; std::unique_lock<std::mutex> l(lockCurrentProxyInfos_); auto oldStatus = std::max(statusIpv4_, statusIpv6_); auto& status = family == AF_INET ? statusIpv4_ : statusIpv6_; if (not proxyInfos.isMember("node_id")) { if (logger_) logger_->e("[proxy:client] [info] request failed for %s", family == AF_INET ? "ipv4" : "ipv6"); status = NodeStatus::Disconnected; } else { if (logger_) logger_->d("[proxy:client] [info] got proxy reply for %s", family == AF_INET ? "ipv4" : "ipv6"); try { myid = InfoHash(proxyInfos["node_id"].asString()); stats4_ = NodeStats(proxyInfos["ipv4"]); stats6_ = NodeStats(proxyInfos["ipv6"]); if (stats4_.good_nodes + stats6_.good_nodes) status = NodeStatus::Connected; else if (stats4_.dubious_nodes + stats6_.dubious_nodes) status = NodeStatus::Connecting; else status = NodeStatus::Disconnected; auto publicIp = parsePublicAddress(proxyInfos["public_ip"]); auto publicFamily = publicIp.getFamily(); if (publicFamily == AF_INET) publicAddressV4_ = publicIp; else if (publicFamily == AF_INET6) publicAddressV6_ = publicIp; } catch (const std::exception& e) { if (logger_) logger_->e("[proxy:client] [info] error processing: %s", e.what()); } } auto newStatus = std::max(statusIpv4_, statusIpv6_); if (newStatus == NodeStatus::Connected) { if (oldStatus == NodeStatus::Disconnected || oldStatus == NodeStatus::Connecting) { listenerRestartTimer_->expires_at(std::chrono::steady_clock::now()); listenerRestartTimer_->async_wait(std::bind(&DhtProxyClient::restartListeners, this, std::placeholders::_1)); if (not onConnectCallbacks_.empty()) { std::lock_guard<std::mutex> lock(lockCallbacks_); callbacks_.emplace_back([cbs = std::move(onConnectCallbacks_)]() mutable { while (not cbs.empty()) { cbs.front()(); cbs.pop(); } }); } } nextProxyConfirmationTimer_->expires_at(std::chrono::steady_clock::now() + std::chrono::minutes(15)); nextProxyConfirmationTimer_->async_wait(std::bind(&DhtProxyClient::handleProxyConfirm, this, std::placeholders::_1)); } else if (newStatus == NodeStatus::Disconnected) { nextProxyConfirmationTimer_->expires_at(std::chrono::steady_clock::now() + std::chrono::minutes(1)); nextProxyConfirmationTimer_->async_wait(std::bind(&DhtProxyClient::handleProxyConfirm, this, std::placeholders::_1)); } l.unlock(); loopSignal_(); } SockAddr DhtProxyClient::parsePublicAddress(const Json::Value& val) { auto public_ip = val.asString(); auto hostAndService = splitPort(public_ip); auto sa = SockAddr::resolve(hostAndService.first); if (sa.empty()) return {}; return sa.front().getMappedIPv4(); } std::vector<SockAddr> DhtProxyClient::getPublicAddress(sa_family_t family) { std::lock_guard<std::mutex> l(lockCurrentProxyInfos_); std::vector<SockAddr> result; if (publicAddressV6_ && family != AF_INET) result.emplace_back(publicAddressV6_); if (publicAddressV4_ && family != AF_INET6) result.emplace_back(publicAddressV4_); return result; } size_t DhtProxyClient::listen(const InfoHash& key, ValueCallback cb, Value::Filter filter, Where where) { if (logger_) logger_->d("[proxy:client] [listen] [search %s]", key.to_c_str()); if (isDestroying_) return 0; std::lock_guard<std::mutex> lock(searchLock_); auto& search = searches_[key]; auto query = std::make_shared<Query>(Select{}, std::move(where)); return search.ops.listen(cb, query, filter, [this, key](Sp<Query>, ValueCallback cb, SyncCallback) -> size_t { // Find search auto search = searches_.find(key); if (search == searches_.end()) { if (logger_) logger_->e("[proxy:client] [listen] [search %s] search not found", key.to_c_str()); return 0; } if (logger_) logger_->d("[proxy:client] [listen] [search %s] sending %s", key.to_c_str(), deviceKey_.empty() ? "listen" : "subscribe"); // Add listener auto token = ++listenerToken_; auto l = search->second.listeners.find(token); if (l == search->second.listeners.end()) { l = search->second.listeners.emplace(std::piecewise_construct, std::forward_as_tuple(token), std::forward_as_tuple(std::move(cb))).first; } else { if (l->second.opstate) l->second.opstate->stop = true; } // Add cache callback auto opstate = std::make_shared<OperationState>(); l->second.opstate = opstate; l->second.cb = [this,key,token,opstate](const std::vector<Sp<Value>>& values, bool expired, system_clock::time_point t){ if (opstate->stop) return false; std::lock_guard<std::mutex> lock(searchLock_); auto s = searches_.find(key); if (s != searches_.end()) { auto l = s->second.listeners.find(token); if (l != s->second.listeners.end()) { return l->second.cache.onValue(values, expired, t); } } return false; }; if (not deviceKey_.empty()) { /* * Relaunch push listeners even if a timeout is not received * (if the proxy crash for any reason) */ if (!l->second.refreshSubscriberTimer) l->second.refreshSubscriberTimer = std::make_unique<asio::steady_timer>(httpContext_); l->second.refreshSubscriberTimer->expires_at(std::chrono::steady_clock::now() + proxy::OP_TIMEOUT - proxy::OP_MARGIN); l->second.refreshSubscriberTimer->async_wait(std::bind(&DhtProxyClient::handleResubscribe, this, std::placeholders::_1, key, token, opstate)); } ListenMethod method; restinio::http_request_header_t header; if (deviceKey_.empty()){ // listen method = ListenMethod::LISTEN; header.method(restinio::http_method_get()); header.request_target("/key/" + key.toString() + "/listen"); } else { method = ListenMethod::SUBSCRIBE; header.method(restinio::http_method_subscribe()); header.request_target("/" + key.toString()); } sendListen(header, l->second.cb, opstate, l->second, method); return token; }); } void DhtProxyClient::handleResubscribe(const asio::error_code &ec, const InfoHash& key, const size_t token, std::shared_ptr<OperationState> opstate) { if (ec == asio::error::operation_aborted) return; else if (ec){ if (logger_) logger_->e("[proxy:client] [resubscribe %s] %s", key.toString().c_str(), ec.message().c_str()); return; } if (opstate->stop) return; std::lock_guard<std::mutex> lock(searchLock_); auto s = searches_.find(key); if (s != searches_.end()){ auto l = s->second.listeners.find(token); if (l != s->second.listeners.end()) { resubscribe(key, token, l->second); } else { if (logger_) logger_->e("[proxy:client] [resubscribe %s] token not found", key.toString().c_str()); } } } bool DhtProxyClient::cancelListen(const InfoHash& key, size_t gtoken) { if (logger_) logger_->d(key, "[proxy:client] [search %s] cancel listen %zu", key.to_c_str(), gtoken); std::lock_guard<std::mutex> lock(searchLock_); // find the listener in cache auto it = searches_.find(key); if (it == searches_.end()) return false; auto& ops = it->second.ops; bool canceled = ops.cancelListen(gtoken, std::chrono::steady_clock::now()); // define real cancel listen only once if (not it->second.opExpirationTimer) it->second.opExpirationTimer = std::make_unique<asio::steady_timer>(httpContext_, ops.getExpiration()); else it->second.opExpirationTimer->expires_at(ops.getExpiration()); it->second.opExpirationTimer->async_wait(std::bind(&DhtProxyClient::handleExpireListener, this, std::placeholders::_1, key)); return canceled; } void DhtProxyClient::handleExpireListener(const asio::error_code &ec, const InfoHash& key) { if (ec == asio::error::operation_aborted) return; else if (ec){ if (logger_) logger_->e("[proxy:client] [listen %s] error in cancel: %s", key.toString().c_str(), ec.message().c_str()); return; } if (logger_) logger_->d("[proxy:client] [listen %s] expire listener", key.toString().c_str()); std::lock_guard<std::mutex> lock(searchLock_); auto search = searches_.find(key); if (search == searches_.end()) return; // everytime a new expiry is set, a previous gets aborted time_point next = search->second.ops.expire(std::chrono::steady_clock::now(), [&](size_t ltoken) { auto it = search->second.listeners.find(ltoken); if (it == search->second.listeners.end()) return; auto& listener = it->second; listener.opstate->stop = true; if (not deviceKey_.empty()) { // UNSUBSCRIBE auto request = buildRequest("/" + key.toString()); auto reqid = request->id(); try { request->set_method(restinio::http_method_unsubscribe()); setHeaderFields(*request); Json::Value body; body["key"] = deviceKey_; body["client_id"] = pushClientId_; request->set_body(Json::writeString(jsonBuilder_, body)); request->add_on_done_callback([this, reqid, key] (const http::Response& response){ if (response.status_code != 200) { if (logger_) logger_->e("[proxy:client] [unsubscribe %s] failed with code=%i", key.to_c_str(), response.status_code); if (not response.aborted and response.status_code == 0) opFailed(); } if (not isDestroying_) { std::lock_guard<std::mutex> l(requestLock_); requests_.erase(reqid); } }); { std::lock_guard<std::mutex> l(requestLock_); requests_[reqid] = request; } request->send(); } catch (const std::exception &e){ if (logger_) logger_->e("[proxy:client] [unsubscribe %s] failed: %s", key.to_c_str(), e.what()); } } else { // stop the request listener.request.reset(); } search->second.listeners.erase(it); if (logger_) logger_->d("[proxy:client] [listen:cancel] [search %s] %zu listener remaining", key.to_c_str(), search->second.listeners.size()); }); if (next != time_point::max()){ search->second.opExpirationTimer->expires_at(next); search->second.opExpirationTimer->async_wait(std::bind( &DhtProxyClient::handleExpireListener, this, std::placeholders::_1, key)); } if (search->second.listeners.empty()){ searches_.erase(search); } } void DhtProxyClient::sendListen(const restinio::http_request_header_t& header, const CacheValueCallback& cb, const Sp<OperationState>& opstate, Listener& listener, ListenMethod method) { if (logger_) logger_->e("[proxy:client] [listen] sendListen: %d", (int)method); try { auto request = buildRequest(); listener.request = request; auto reqid = request->id(); request->set_header(header); setHeaderFields(*request); if (method == ListenMethod::LISTEN) request->set_connection_type(restinio::http_connection_header_t::keep_alive); #ifdef OPENDHT_PUSH_NOTIFICATIONS std::string body; if (method != ListenMethod::LISTEN) body = fillBody(method == ListenMethod::RESUBSCRIBE); request->set_body(body); #endif auto rxBuf = std::make_shared<LineSplit>(); request->add_on_body_callback([this, reqid, opstate, rxBuf, cb](const char* at, size_t length){ try { auto& b = *rxBuf; b.append(at, length); // one value per body line while (b.getLine('\n') and !opstate->stop) { std::string err; Json::Value json; const auto& line = b.line(); if (!jsonReader_->parse(line.data(), line.data() + line.size(), &json, &err)){ opstate->ok.store(false); return; } if (json.size() == 0) { // it's the end break; } auto value = std::make_shared<Value>(json); if (cb){ auto expired = json.get("expired", Json::Value(false)).asBool(); { std::lock_guard<std::mutex> lock(lockCallbacks_); callbacks_.emplace_back([cb, value, opstate, expired]() { if (not opstate->stop.load() and not cb({value}, expired, system_clock::time_point::min())) opstate->stop.store(true); }); } loopSignal_(); } } } catch(const std::exception& e) { if (logger_) logger_->e("[proxy:client] [listen] request #%i error in parsing: %s", reqid, e.what()); opstate->ok.store(false); } }); request->add_on_done_callback([this, opstate, reqid] (const http::Response& response) { if (response.status_code != 200) { if (logger_) logger_->e("[proxy:client] [listen] send request #%i failed with code=%i", reqid, response.status_code); opstate->ok.store(false); if (not response.aborted and response.status_code == 0) opFailed(); } if (not isDestroying_) { std::lock_guard<std::mutex> l(requestLock_); requests_.erase(reqid); } }); { std::lock_guard<std::mutex> l(requestLock_); requests_[reqid] = request; } request->send(); } catch (const std::exception &e){ if (logger_) logger_->e("[proxy:client] [listen] request failed: %s", e.what()); } } void DhtProxyClient::opFailed() { if (isDestroying_) return; if (logger_) logger_->e("[proxy:client] proxy request failed"); { std::lock_guard<std::mutex> l(lockCurrentProxyInfos_); statusIpv4_ = NodeStatus::Disconnected; statusIpv6_ = NodeStatus::Disconnected; } getConnectivityStatus(); loopSignal_(); } void DhtProxyClient::getConnectivityStatus() { if (logger_) logger_->d("[proxy:client] [connectivity] get status"); if (!isDestroying_) getProxyInfos(); } void DhtProxyClient::restartListeners(const asio::error_code &ec) { if (ec == asio::error::operation_aborted) return; else if (ec){ if (logger_) logger_->e("[proxy:client] restart error: %s", ec.message().c_str()); return; } if (isDestroying_) return; if (logger_) logger_->d("[proxy:client] [listeners] refresh permanent puts"); std::lock_guard<std::mutex> lock(searchLock_); for (auto& search : searches_) { auto key = search.first; for (auto& put : search.second.puts) { doPut(key, put.second.value, [ok = put.second.ok](bool result){ *ok = result; }, time_point::max(), true); if (!put.second.refreshPutTimer) { put.second.refreshPutTimer = std::make_unique<asio::steady_timer>(httpContext_); } put.second.refreshPutTimer->expires_at(std::chrono::steady_clock::now() + proxy::OP_TIMEOUT - proxy::OP_MARGIN); put.second.refreshPutTimer->async_wait(std::bind(&DhtProxyClient::handleRefreshPut, this, std::placeholders::_1, key, put.first)); } } if (not deviceKey_.empty()) { if (logger_) logger_->d("[proxy:client] [listeners] resubscribe due to a connectivity change"); // Connectivity changed, refresh all subscribe for (auto& search : searches_) for (auto& listener : search.second.listeners) if (!listener.second.opstate->ok) resubscribe(search.first, listener.first, listener.second); return; } if (logger_) logger_->d("[proxy:client] [listeners] restarting listeners"); for (auto& search: searches_) { for (auto& l: search.second.listeners) { auto& listener = l.second; if (auto opstate = listener.opstate) opstate->stop = true; listener.request->cancel(); listener.request.reset(); } } for (auto& search: searches_) { for (auto& l: search.second.listeners) { auto& listener = l.second; auto opstate = listener.opstate; // Redo listen opstate->stop.store(false); opstate->ok.store(true); auto cb = listener.cb; // define header restinio::http_request_header_t header; header.method(restinio::http_method_get()); header.request_target("/key/" + search.first.toString() + "/listen"); sendListen(header, cb, opstate, listener, ListenMethod::LISTEN); } } } void DhtProxyClient::pushNotificationReceived(const std::map<std::string, std::string>& notification) { #ifdef OPENDHT_PUSH_NOTIFICATIONS { // If a push notification is received, the proxy is up and running std::lock_guard<std::mutex> l(lockCurrentProxyInfos_); statusIpv4_ = NodeStatus::Connected; statusIpv6_ = NodeStatus::Connected; } auto launchLoop = false; try { auto sessionId = notification.find("s"); if (sessionId != notification.end() and sessionId->second != pushSessionId_) { if (logger_) logger_->d("[proxy:client] [push] ignoring push for other session"); return; } std::lock_guard<std::mutex> lock(searchLock_); auto timeout = notification.find("timeout"); if (timeout != notification.cend()) { InfoHash key(timeout->second); auto& search = searches_.at(key); auto vidIt = notification.find("vid"); if (vidIt != notification.end()) { // Refresh put auto vid = std::stoull(vidIt->second); auto& put = search.puts.at(vid); if (!put.refreshPutTimer) put.refreshPutTimer = std::make_unique<asio::steady_timer>(httpContext_, std::chrono::steady_clock::now()); else put.refreshPutTimer->expires_at(std::chrono::steady_clock::now()); put.refreshPutTimer->async_wait(std::bind(&DhtProxyClient::handleRefreshPut, this, std::placeholders::_1, key, vid)); } else { // Refresh listen for (auto& list : search.listeners) resubscribe(key, list.first, list.second); } } else { auto key = InfoHash(notification.at("key")); system_clock::time_point sendTime = system_clock::time_point::min(); try { sendTime = system_clock::time_point(std::chrono::milliseconds(std::stoull(notification.at("t")))); } catch (...) {} auto& search = searches_.at(key); for (auto& list : search.listeners) { if (list.second.opstate->stop) continue; if (logger_) logger_->d("[proxy:client] [push] [search %s] received", key.to_c_str()); auto expired = notification.find("exp"); auto token = list.first; auto opstate = list.second.opstate; if (expired == notification.end()) { auto cb = list.second.cb; auto oldValues = list.second.cache.getValues(); get(key, [cb, sendTime](const std::vector<Sp<Value>>& vals) { return cb(vals, false, sendTime); }, [cb, oldValues, sendTime](bool /*ok*/) { // Decrement old values refcount to expire values not // present in the new list cb(oldValues, true, sendTime); }); } else { std::stringstream ss(expired->second); std::vector<Value::Id> ids; while(ss.good()) { std::string substr; getline(ss, substr, ','); ids.emplace_back(std::stoull(substr)); } { std::lock_guard<std::mutex> lock(lockCallbacks_); callbacks_.emplace_back([this, key, token, opstate, ids, sendTime]() { if (opstate->stop) return; std::lock_guard<std::mutex> lock(searchLock_); auto s = searches_.find(key); if (s == searches_.end()) return; auto l = s->second.listeners.find(token); if (l == s->second.listeners.end()) return; if (not opstate->stop and not l->second.cache.onValuesExpired(ids, sendTime)) opstate->stop = true; }); } launchLoop = true; } } } } catch (const std::exception& e) { if (logger_) logger_->e("[proxy:client] [push] receive error: %s", e.what()); } if (launchLoop) loopSignal_(); #else (void) notification; #endif } void DhtProxyClient::resubscribe(const InfoHash& key, const size_t token, Listener& listener) { #ifdef OPENDHT_PUSH_NOTIFICATIONS if (deviceKey_.empty()) return; if (logger_) logger_->d("[proxy:client] [resubscribe] [search %s]", key.to_c_str()); auto opstate = listener.opstate; listener.request.reset(); // This will update ok to false opstate->ok = true; restinio::http_request_header_t header; header.method(restinio::http_method_subscribe()); header.request_target("/" + key.toString()); if (!listener.refreshSubscriberTimer){ listener.refreshSubscriberTimer = std::make_unique<asio::steady_timer>(httpContext_); } listener.refreshSubscriberTimer->expires_at(std::chrono::steady_clock::now() + proxy::OP_TIMEOUT - proxy::OP_MARGIN); listener.refreshSubscriberTimer->async_wait(std::bind(&DhtProxyClient::handleResubscribe, this, std::placeholders::_1, key, token, opstate)); auto vcb = listener.cb; sendListen(header, vcb, opstate, listener, ListenMethod::RESUBSCRIBE); #else (void) key; (void) listener; #endif } #ifdef OPENDHT_PUSH_NOTIFICATIONS void DhtProxyClient::getPushRequest(Json::Value& body) const { body["key"] = deviceKey_; body["client_id"] = pushClientId_; body["session_id"] = pushSessionId_; #ifdef __ANDROID__ body["platform"] = "android"; #endif #ifdef __APPLE__ body["platform"] = "apple"; #endif } std::string DhtProxyClient::fillBody(bool resubscribe) { // Fill body with // { // "key":"device_key", // } Json::Value body; getPushRequest(body); if (resubscribe) { // This is the first listen, we want to retrieve previous values. body["refresh"] = true; } auto content = Json::writeString(jsonBuilder_, body) + "\n"; std::replace(content.begin(), content.end(), '\n', ' '); return content; } #endif // OPENDHT_PUSH_NOTIFICATIONS } // namespace dht