diff --git a/README.md b/README.md index e77ba894958fec6cfe47f92baf3e2c74df925185..3ab36d4831331d4fe6991aaa2315b0c7400b8314 100644 --- a/README.md +++ b/README.md @@ -6,12 +6,17 @@ A lightweight C++11 Distributed Hash Table implementation. - * Light and fast C++11 Kademlia DHT library - * Distributed shared key->value data-store - * Clean and powerful distributed map API with storage of arbitrary binary values of up to 56 KB +OpenDHT provides an easy to use distributed in-memory data store. +Every node in the network can read and write values to the store. +Values are distributed over the network, with redundancy. + + * Lightweight and scalable, designed for large networks and small devices + * High resilience to network disruption * Public key cryptography layer providing optional data signature and encryption (using GnuTLS) * IPv4 and IPv6 support - * Python binding + * Clean and powerful C++11 map API + * Python 3 bindings + * REST API ## Documentation See the wiki: <https://github.com/savoirfairelinux/opendht/wiki> diff --git a/include/opendht/dht.h b/include/opendht/dht.h index 982d41c63807208ae2bdccbaa647ac28b2b21d91..0c09fd4c114ccc0791d97396fef218ebcb9b689f 100644 --- a/include/opendht/dht.h +++ b/include/opendht/dht.h @@ -414,7 +414,7 @@ private: void expireStorage(InfoHash h); void expireStore(decltype(store)::iterator); - void storageChanged(const InfoHash& id, Storage& st, ValueStorage&); + void storageChanged(const InfoHash& id, Storage& st, ValueStorage&, bool newValue); std::string printStorageLog(const decltype(store)::value_type&) const; /** @@ -535,7 +535,9 @@ private: * * @param sr The search to execute its operations. */ - void searchStep(Sp<Search> sr); + void searchStep(Sp<Search>); + void searchSynchedNodeListen(const Sp<Search>&, SearchNode&); + void dumpSearch(const Search& sr, std::ostream& out) const; bool neighbourhoodMaintenance(RoutingTable&); diff --git a/include/opendht/dht_proxy_client.h b/include/opendht/dht_proxy_client.h index df3fe3638307a25b7440c53464650e67b55585ca..abb0c0882e3fee7908ccab7827b53cc6be8f66e8 100644 --- a/include/opendht/dht_proxy_client.h +++ b/include/opendht/dht_proxy_client.h @@ -205,18 +205,18 @@ public: /** * Get data currently being put at the given hash. */ - std::vector<Sp<Value>> getPut(const InfoHash&) { return {}; } + std::vector<Sp<Value>> getPut(const InfoHash&); /** * Get data currently being put at the given hash with the given id. */ - Sp<Value> getPut(const InfoHash&, const Value::Id&) { return {}; } + Sp<Value> getPut(const InfoHash&, const Value::Id&); /** * Stop any put/announce operation at the given location, * for the value with the given id. */ - bool cancelPut(const InfoHash&, const Value::Id&) { return false; } + bool cancelPut(const InfoHash&, const Value::Id&); void pingNode(const sockaddr*, socklen_t, DoneCallbackSimple&& /*cb*/={}) { } @@ -271,6 +271,8 @@ private: size_t doListen(const InfoHash& key, ValueCallback, Value::Filter); bool doCancelListen(const InfoHash& key, size_t token); + void doPut(const InfoHash&, Sp<Value>, DoneCallback, time_point created, bool permanent); + /** * Initialize statusIpvX_ */ @@ -304,7 +306,7 @@ private: */ struct Listener; struct ProxySearch; - std::map<InfoHash, ProxySearch> listeners_; + std::map<InfoHash, ProxySearch> searches_; size_t listener_token_ {0}; std::mutex lockListener_; diff --git a/include/opendht/dht_proxy_server.h b/include/opendht/dht_proxy_server.h index eab3490ce28ed28a791e08529c9824a819dc19b4..cb15ba55303aa068c3e3d92b34a38d73ce814eb4 100644 --- a/include/opendht/dht_proxy_server.h +++ b/include/opendht/dht_proxy_server.h @@ -23,6 +23,8 @@ #include "def.h" #include "sockaddr.h" #include "infohash.h" +#include "scheduler.h" +#include "value.h" #include <thread> #include <memory> @@ -59,6 +61,27 @@ public: DhtProxyServer& operator=(const DhtProxyServer& other) = delete; DhtProxyServer& operator=(DhtProxyServer&& other) = delete; + struct ServerStats { + /** Current number of listen operations */ + size_t listenCount; + /** Current number of permanent put operations */ + size_t putCount; + /** Current number of push tokens with at least one listen operation */ + size_t pushListenersCount; + /** Average requests per second */ + double requestRate; + + std::string toString() const { + std::ostringstream ss; + ss << "Listens: " << listenCount << " Puts: " << putCount << " PushListeners: " << pushListenersCount << std::endl; + ss << "Requests: " << requestRate << " per second." << std::endl; + return ss.str(); + } + }; + ServerStats getStats() const; + + std::shared_ptr<DhtRunner> getNode() const { return dht_; } + /** * Stop the DhtProxyServer */ @@ -96,7 +119,7 @@ private: * On error: HTTP 503, body: {"err":"xxxx"} * @param session */ - void listen(const std::shared_ptr<restbed::Session>& session) const; + void listen(const std::shared_ptr<restbed::Session>& session); /** * Put a value on the DHT @@ -107,7 +130,9 @@ private: * HTTP 400, body: {"err":"xxxx"} if bad json or HTTP 502 if put fails * @param session */ - void put(const std::shared_ptr<restbed::Session>& session) const; + void put(const std::shared_ptr<restbed::Session>& session); + + void cancelPut(const InfoHash& key, Value::Id vid); #if OPENDHT_PROXY_SERVER_IDENTITY /** @@ -167,7 +192,7 @@ private: * on same hash for same device and must be > 0 * @param session */ - void subscribe(const std::shared_ptr<restbed::Session>& session) const; + void subscribe(const std::shared_ptr<restbed::Session>& session); /** * Unsubscribe to push notifications for an iOS or Android device. * Method: UNSUBSCRIBE "/{InfoHash: .*}" @@ -178,7 +203,7 @@ private: * on same hash for same device * @param session */ - void unsubscribe(const std::shared_ptr<restbed::Session>& session) const; + void unsubscribe(const std::shared_ptr<restbed::Session>& session); /** * Send a push notification via a gorush push gateway * @param key of the device @@ -187,10 +212,18 @@ private: void sendPushNotification(const std::string& key, const Json::Value& json, bool isAndroid) const; #endif //OPENDHT_PUSH_NOTIFICATIONS + using clock = std::chrono::steady_clock; + using time_point = clock::time_point; + std::thread server_thread {}; std::unique_ptr<restbed::Service> service_; std::shared_ptr<DhtRunner> dht_; + std::mutex schedulerLock_; + std::condition_variable schedulerCv_; + Scheduler scheduler_; + std::thread schedulerThread_; + // Handle client quit for listen. // NOTE: can be simplified when we will supports restbed 5.0 std::thread listenThread_; @@ -199,37 +232,33 @@ private: InfoHash hash; std::future<size_t> token; }; - mutable std::vector<SessionToHashToken> currentListeners_; - mutable std::mutex lockListener_; + std::vector<SessionToHashToken> currentListeners_; + std::mutex lockListener_; std::atomic_bool stopListeners {false}; + struct PermanentPut; + struct SearchPuts; + std::map<InfoHash, SearchPuts> puts_; + #if OPENDHT_PUSH_NOTIFICATIONS - struct PushListener { - std::string pushToken; - InfoHash hash; - unsigned token; - std::future<size_t> internalToken; - std::chrono::steady_clock::time_point deadline; - bool started {false}; - unsigned callbackId {0}; - std::string clientId {}; - bool isAndroid {true}; - }; - mutable std::mutex lockPushListeners_; - mutable std::vector<PushListener> pushListeners_; - mutable unsigned tokenPushNotif_ {0}; + struct Listener; + struct PushListener; + std::mutex lockPushListeners_; + std::map<std::string, PushListener> pushListeners_; + unsigned tokenPushNotif_ {0}; + + void cancelPushListen(const std::string& pushToken, const InfoHash& key, unsigned token, unsigned callbackId); #endif //OPENDHT_PUSH_NOTIFICATIONS const std::string pushServer_; + mutable std::atomic<size_t> requestNum_ {0}; + mutable std::atomic<time_point> lastStatsReset_ {time_point::min()}; + /** * Remove finished listeners * @param testSession if we remove the listener only if the session is closed */ void removeClosedListeners(bool testSession = true); - /** - * Launch or remove push listeners if needed - */ - void handlePushListeners(); }; } diff --git a/include/opendht/scheduler.h b/include/opendht/scheduler.h index 8f61d4cd8fdac3a8bac930d8a9197b2003e01afb..bd1a77629bc986c5777e9aad497dc4a65f2e2301 100644 --- a/include/opendht/scheduler.h +++ b/include/opendht/scheduler.h @@ -36,11 +36,10 @@ namespace dht { */ class Scheduler { public: - Scheduler(const Logger& l) : DHT_LOG(l) {} - struct Job { Job(std::function<void()>&& f) : do_(std::move(f)) {} std::function<void()> do_; + void cancel() { do_ = {}; } }; /** @@ -58,17 +57,19 @@ public: return job; } + void add(const Sp<Scheduler::Job>& job, time_point t) { + if (t != time_point::max()) + timers.emplace(std::move(t), job); + } + /** * Reschedules a job. * - * @param time The time at which the job shall be rescheduled. * @param job The job to edit. - * - * @return pointer to the newly scheduled job. + * @param t The time at which the job shall be rescheduled. */ void edit(Sp<Scheduler::Job>& job, time_point t) { if (not job) { - DHT_LOG.ERR("editing an empty job"); return; } // std::function move doesn't garantee to leave the object empty. @@ -118,7 +119,6 @@ public: private: time_point now {clock::now()}; std::multimap<time_point, Sp<Job>> timers {}; /* the jobs ordered by time */ - const Logger& DHT_LOG; }; } diff --git a/src/dht.cpp b/src/dht.cpp index 298267d43319b6514528d7df89279f674975716b..fac3284995c2d563bcc9eb42fc352224adc197e9 100644 --- a/src/dht.cpp +++ b/src/dht.cpp @@ -482,6 +482,72 @@ void Dht::searchSendAnnounceValue(const Sp<Search>& sr) { } } +void +Dht::searchSynchedNodeListen(const Sp<Search>& sr, SearchNode& n) +{ + std::weak_ptr<Search> ws = sr; + for (const auto& l : sr->listeners) { + const auto& query = l.second.query; + auto list_token = l.first; + if (n.getListenTime(query) > scheduler.time()) + continue; + DHT_LOG.w(sr->id, n.node->id, "[search %s] [node %s] sending 'listen'", + sr->id.toString().c_str(), n.node->toString().c_str()); + + auto r = n.listenStatus.find(query); + if (r == n.listenStatus.end()) { + r = n.listenStatus.emplace(query, SearchNode::CachedListenStatus{ + [this,ws,list_token](const std::vector<Sp<Value>>& values, bool expired){ + if (auto sr = ws.lock()) { + auto l = sr->listeners.find(list_token); + if (l != sr->listeners.end()) { + l->second.get_cb(l->second.filter.filter(values), expired); + } + } + } + }).first; + auto node = n.node; + r->second.cacheExpirationJob = scheduler.add(time_point::max(), [this,ws,query,node]{ + if (auto sr = ws.lock()) { + if (auto sn = sr->getNode(node)) { + sn->expireValues(query, scheduler); + } + } + }); + } + auto prev_req = r != n.listenStatus.end() ? r->second.req : nullptr; + r->second.req = network_engine.sendListen(n.node, sr->id, *query, n.token, prev_req, + [this,ws,query](const net::Request& req, net::RequestAnswer&& answer) mutable + { /* on done */ + if (auto sr = ws.lock()) { + scheduler.edit(sr->nextSearchStep, scheduler.time()); + if (auto sn = sr->getNode(req.node)) + scheduler.add(sn->getListenTime(query), std::bind(&Dht::searchStep, this, sr)); + onListenDone(req.node, answer, sr); + } + }, + [this,ws,query](const net::Request& req, bool over) mutable + { /* on request expired */ + if (auto sr = ws.lock()) { + scheduler.edit(sr->nextSearchStep, scheduler.time()); + if (over) + if (auto sn = sr->getNode(req.node)) + sn->listenStatus.erase(query); + } + }, + [this,ws,query](const Sp<Node>& node, net::RequestAnswer&& answer) mutable + { /* on new values */ + if (auto sr = ws.lock()) { + scheduler.edit(sr->nextSearchStep, scheduler.time()); + if (auto sn = sr->getNode(node)) { + sn->onValues(query, std::move(answer), types, scheduler); + } + } + } + ); + } +} + /* When a search is in progress, we periodically call search_step to send further requests. */ void @@ -538,59 +604,7 @@ Dht::searchStep(Sp<Search> sr) for (auto& n : sr->nodes) { if (not n.isSynced(now)) continue; - std::weak_ptr<Search> ws = sr; - for (const auto& l : sr->listeners) { - const auto& query = l.second.query; - auto list_token = l.first; - if (n.getListenTime(query) > now) - continue; - DHT_LOG.w(sr->id, n.node->id, "[search %s] [node %s] sending 'listen'", - sr->id.toString().c_str(), n.node->toString().c_str()); - - auto r = n.listenStatus.find(query); - if (r == n.listenStatus.end()) { - r = n.listenStatus.emplace(query, SearchNode::CachedListenStatus{ - [this,ws,list_token](const std::vector<Sp<Value>>& values, bool expired){ - if (auto sr = ws.lock()) { - auto l = sr->listeners.find(list_token); - if (l != sr->listeners.end()) { - l->second.get_cb(l->second.filter.filter(values), expired); - } - } - } - }).first; - } - auto prev_req = r != n.listenStatus.end() ? r->second.req : nullptr; - r->second.req = network_engine.sendListen(n.node, sr->id, *query, n.token, prev_req, - [this,ws,query](const net::Request& req, net::RequestAnswer&& answer) mutable - { /* on done */ - if (auto sr = ws.lock()) { - scheduler.edit(sr->nextSearchStep, scheduler.time()); - if (auto sn = sr->getNode(req.node)) - scheduler.add(sn->getListenTime(query), std::bind(&Dht::searchStep, this, sr)); - onListenDone(req.node, answer, sr); - } - }, - [this,ws,query](const net::Request& req, bool over) mutable - { /* on expired */ - if (auto sr = ws.lock()) { - scheduler.edit(sr->nextSearchStep, scheduler.time()); - if (over) - if (auto sn = sr->getNode(req.node)) - sn->listenStatus.erase(query); - } - }, - [this,ws,query](const Sp<Node>& node, net::RequestAnswer&& answer) mutable - { /* on new values */ - if (auto sr = ws.lock()) { - scheduler.edit(sr->nextSearchStep, scheduler.time()); - if (auto sn = sr->getNode(node)) - sn->onValues(query, std::move(answer), types, scheduler.time()); - //onGetValuesDone(node, answer, sr, query); - } - } - ); - } + searchSynchedNodeListen(sr, n); if (not n.candidate and ++i == LISTEN_NODES) break; } @@ -1125,25 +1139,27 @@ Dht::cancelPut(const InfoHash& id, const Value::Id& vid) // Storage void -Dht::storageChanged(const InfoHash& id, Storage& st, ValueStorage& v) -{ - if (not st.local_listeners.empty()) { - DHT_LOG.d(id, "[store %s] %lu local listeners", id.toString().c_str(), st.local_listeners.size()); - std::vector<std::pair<ValueCallback, std::vector<Sp<Value>>>> cbs; - for (const auto& l : st.local_listeners) { - std::vector<Sp<Value>> vals; - if (not l.second.filter or l.second.filter(*v.data)) - vals.push_back(v.data); - if (not vals.empty()) { - DHT_LOG.d(id, "[store %s] sending update local listener with token %lu", - id.toString().c_str(), - l.first); - cbs.emplace_back(l.second.get_cb, std::move(vals)); +Dht::storageChanged(const InfoHash& id, Storage& st, ValueStorage& v, bool newValue) +{ + if (newValue) { + if (not st.local_listeners.empty()) { + DHT_LOG.d(id, "[store %s] %lu local listeners", id.toString().c_str(), st.local_listeners.size()); + std::vector<std::pair<ValueCallback, std::vector<Sp<Value>>>> cbs; + for (const auto& l : st.local_listeners) { + std::vector<Sp<Value>> vals; + if (not l.second.filter or l.second.filter(*v.data)) + vals.push_back(v.data); + if (not vals.empty()) { + DHT_LOG.d(id, "[store %s] sending update local listener with token %lu", + id.toString().c_str(), + l.first); + cbs.emplace_back(l.second.get_cb, std::move(vals)); + } } + // listeners are copied: they may be deleted by the callback + for (auto& cb : cbs) + cb.first(cb.second, false); } - // listeners are copied: they may be deleted by the callback - for (auto& cb : cbs) - cb.first(cb.second, false); } if (not st.listeners.empty()) { @@ -1197,7 +1213,7 @@ Dht::storageStore(const InfoHash& id, const Sp<Value>& value, time_point created if (total_store_size > max_store_size) { expireStore(); } - storageChanged(id, st->second, *vs); + storageChanged(id, st->second, *vs, store.second.values_diff > 0); } return std::get<0>(store); @@ -1258,6 +1274,9 @@ Dht::expireStore(decltype(store)::iterator i) } } } + for (const auto& local_listeners : st.local_listeners) { + local_listeners.second.get_cb(stats.second, true); + } } } @@ -1665,13 +1684,12 @@ Dht::~Dht() s.second->clear(); } -Dht::Dht() : store(), scheduler(DHT_LOG), network_engine(DHT_LOG, scheduler) {} +Dht::Dht() : store(), network_engine(DHT_LOG, scheduler) {} Dht::Dht(const int& s, const int& s6, Config config) : myid(config.node_id ? config.node_id : InfoHash::getRandom()), is_bootstrap(config.is_bootstrap), maintain_storage(config.maintain_storage), store(), store_quota(), - scheduler(DHT_LOG), network_engine(myid, config.network, s, s6, DHT_LOG, scheduler, std::bind(&Dht::onError, this, _1, _2), std::bind(&Dht::onNewNode, this, _1, _2), diff --git a/src/dht_proxy_client.cpp b/src/dht_proxy_client.cpp index 9a5b2d05b6738973bc82a52e1c85bf455c9b43bf..757d938153d3cc1b6429153f0580127c1589f1ca 100644 --- a/src/dht_proxy_client.cpp +++ b/src/dht_proxy_client.cpp @@ -21,21 +21,24 @@ #include "dht_proxy_client.h" -#include <chrono> -#include <json/json.h> -#include <restbed> -#include <vector> - #include "dhtrunner.h" #include "op_cache.h" +#include <restbed> +#include <json/json.h> + +#include <chrono> +#include <vector> + constexpr const char* const HTTP_PROTO {"http://"}; +constexpr const std::chrono::seconds OP_TIMEOUT {1 * 60 * 60 - 60}; // one hour minus margin namespace dht { struct DhtProxyClient::Listener { ValueCache cache; + Sp<Scheduler::Job> cacheExpirationJob {}; ValueCallback cb; Value::Filter filter; Sp<restbed::Request> req; @@ -43,20 +46,27 @@ struct DhtProxyClient::Listener unsigned callbackId; Sp<bool> isCanceledViaClose; Sp<unsigned> pushNotifToken; // NOTE: unused if not using push notifications + Sp<Scheduler::Job> refreshJob; Listener(ValueCache&& c, const Sp<restbed::Request>& r, Value::Filter&& f, unsigned cid) : cache(std::move(c)), filter(std::move(f)), req(r), callbackId(cid), isCanceledViaClose(std::make_shared<bool>(false)) {} }; +struct PermanentPut { + Sp<Value> value; + Sp<Scheduler::Job> refreshJob; +}; + struct DhtProxyClient::ProxySearch { SearchCache ops {}; std::map<size_t, Listener> listeners {}; + std::map<Value::Id, PermanentPut> puts {}; }; -DhtProxyClient::DhtProxyClient() : scheduler(DHT_LOG) {} +DhtProxyClient::DhtProxyClient() {} DhtProxyClient::DhtProxyClient(std::function<void()> signal, const std::string& serverHost, const std::string& pushClientId) -: serverHost_(serverHost), pushClientId_(pushClientId), scheduler(DHT_LOG), loopSignal_(signal) +: serverHost_(serverHost), pushClientId_(pushClientId), loopSignal_(signal) { if (!serverHost_.empty()) startProxy(); @@ -86,16 +96,16 @@ DhtProxyClient::~DhtProxyClient() std::vector<Sp<Value>> DhtProxyClient::getLocal(const InfoHash& k, Value::Filter filter) const { - auto s = listeners_.find(k); - if (s == listeners_.end()) + auto s = searches_.find(k); + if (s == searches_.end()) return {}; return s->second.ops.get(filter); } Sp<Value> DhtProxyClient::getLocalById(const InfoHash& k, Value::Id id) const { - auto s = listeners_.find(k); - if (s == listeners_.end()) + auto s = searches_.find(k); + if (s == searches_.end()) return {}; return s->second.ops.get(id); } @@ -121,7 +131,7 @@ void DhtProxyClient::cancelAllListeners() { std::lock_guard<std::mutex> lock(lockListener_); - for (auto& s: listeners_) { + for (auto& s: searches_) { for (auto& l : s.second.listeners) if (l.second.thread.joinable()) { // Close connection to stop listener? @@ -276,11 +286,45 @@ DhtProxyClient::get(const InfoHash& key, GetCallback cb, DoneCallback donecb, Va } void -DhtProxyClient::put(const InfoHash& key, Sp<Value> val, DoneCallback cb, time_point, bool permanent) +DhtProxyClient::put(const InfoHash& key, Sp<Value> val, DoneCallback cb, time_point created, bool permanent) +{ + scheduler.syncTime(); + if (not val) { + if (cb) cb(false, {}); + return; + } + if (val->id == Value::INVALID_ID) { + crypto::random_device rdev; + std::uniform_int_distribution<Value::Id> rand_id {}; + val->id = rand_id(rdev); + } + if (permanent) { + auto id = val->id; + auto search = searches_.emplace(key, ProxySearch{}).first; + auto nextRefresh = scheduler.time() + OP_TIMEOUT; + search->second.puts.erase(id); + search->second.puts.emplace(id, PermanentPut {val, scheduler.add(nextRefresh, [this, key, id]{ + auto s = searches_.find(key); + if (s == searches_.end()) + return; + auto p = s->second.puts.find(id); + if (p == s->second.puts.end()) + return; + const auto& now = scheduler.time(); + doPut(key, p->second.value, {}, now, true); + scheduler.edit(p->second.refreshJob, now + OP_TIMEOUT); + })}); + } + doPut(key, val, std::move(cb), created, permanent); +} + +void +DhtProxyClient::doPut(const InfoHash& key, Sp<Value> val, DoneCallback cb, time_point created, bool permanent) { restbed::Uri uri(HTTP_PROTO + serverHost_ + "/" + key.toString()); auto req = std::make_shared<restbed::Request>(uri); req->set_method("POST"); + auto json = val->toJson(); if (permanent) json["permanent"] = true; @@ -342,6 +386,49 @@ DhtProxyClient::put(const InfoHash& key, Sp<Value> val, DoneCallback cb, time_po } } +/** + * Get data currently being put at the given hash. + */ +std::vector<Sp<Value>> +DhtProxyClient::getPut(const InfoHash& key) { + std::vector<Sp<Value>> ret; + auto search = searches_.find(key); + if (search != searches_.end()) { + ret.reserve(search->second.puts.size()); + for (const auto& put : search->second.puts) + ret.emplace_back(put.second.value); + } + return ret; +} + +/** + * Get data currently being put at the given hash with the given id. + */ +Sp<Value> +DhtProxyClient::getPut(const InfoHash& key, const Value::Id& id) { + auto search = searches_.find(key); + if (search == searches_.end()) + return {}; + auto val = search->second.puts.find(id); + if (val == search->second.puts.end()) + return {}; + return val->second.value; +} + +/** + * Stop any put/announce operation at the given location, + * for the value with the given id. + */ +bool +DhtProxyClient::cancelPut(const InfoHash& key, const Value::Id& id) +{ + auto search = searches_.find(key); + if (search == searches_.end()) + return false; + std::cout << "cancelPut " << key << " " << id << std::endl; + return search->second.puts.erase(id) > 0; +} + NodeStats DhtProxyClient::getNodesStats(sa_family_t af) const { @@ -460,9 +547,9 @@ DhtProxyClient::getPublicAddress(sa_family_t family) size_t DhtProxyClient::listen(const InfoHash& key, ValueCallback cb, Value::Filter filter, Where where) { - auto it = listeners_.find(key); - if (it == listeners_.end()) { - it = listeners_.emplace(key, ProxySearch{}).first; + auto it = searches_.find(key); + if (it == searches_.end()) { + it = searches_.emplace(key, ProxySearch{}).first; } auto query = std::make_shared<Query>(Select{}, where); auto token = it->second.ops.listen(cb, query, filter, [&](Sp<Query> q, ValueCallback vcb){ @@ -473,8 +560,8 @@ DhtProxyClient::listen(const InfoHash& key, ValueCallback cb, Value::Filter filt bool DhtProxyClient::cancelListen(const InfoHash& key, size_t gtoken) { - auto it = listeners_.find(key); - if (it == listeners_.end()) + auto it = searches_.find(key); + if (it == searches_.end()) return false; return it->second.ops.cancelListen(gtoken, [&](size_t ltoken){ doCancelListen(key, ltoken); @@ -489,8 +576,8 @@ DhtProxyClient::doListen(const InfoHash& key, ValueCallback cb, Value::Filter fi req->set_method(deviceKey_.empty() ? "LISTEN" : "SUBSCRIBE"); std::lock_guard<std::mutex> lock(lockListener_); - auto search = listeners_.find(key); - if (search == listeners_.end()) { + auto search = searches_.find(key); + if (search == searches_.end()) { std::cerr << "doListen: search not found" << std::endl; return 0; } @@ -500,9 +587,24 @@ DhtProxyClient::doListen(const InfoHash& key, ValueCallback cb, Value::Filter fi auto l = search->second.listeners.emplace(token, Listener{ ValueCache(cb), req, std::move(filter), callbackId }).first; + if (not l->second.cacheExpirationJob) { + l->second.cacheExpirationJob = scheduler.add(time_point::max(), [this, key, token]{ + auto s = searches_.find(key); + if (s == searches_.end()) { + return; + } + auto l = s->second.listeners.find(token); + if (l == s->second.listeners.end()) { + return; + } + auto next = l->second.cache.expireValues(scheduler.time()); + scheduler.edit(l->second.cacheExpirationJob, next); + }); + } ValueCache& cache = l->second.cache; - l->second.cb = [this,&cache](const std::vector<Sp<Value>>& values, bool expired) { + auto& job = l->second.cacheExpirationJob; + l->second.cb = [this,&cache,&job,key,token](const std::vector<Sp<Value>>& values, bool expired) { const std::vector<Sp<Value>> new_values_empty; std::vector<Value::Id> expired_ids; if (expired) { @@ -510,11 +612,13 @@ DhtProxyClient::doListen(const InfoHash& key, ValueCallback cb, Value::Filter fi for (const auto& v : values) expired_ids.emplace_back(v->id); } - cache.onValues(expired ? new_values_empty : values, std::vector<Value::Id>{}, expired_ids, types, scheduler.time()); + auto next = cache.onValues(expired ? new_values_empty : values, std::vector<Value::Id>{}, expired_ids, types, scheduler.time()); + scheduler.edit(job, next); return true; }; std::weak_ptr<bool> isCanceledViaClose(l->second.isCanceledViaClose); auto pushNotifToken = std::make_shared<unsigned>(0); + auto vcb = l->second.cb; l->second.pushNotifToken = pushNotifToken; l->second.thread = std::thread([=]() { @@ -534,7 +638,7 @@ DhtProxyClient::doListen(const InfoHash& key, ValueCallback cb, Value::Filter fi }; auto state = std::make_shared<State>(); restbed::Http::async(req, - [this, filter, cb, pushNotifToken, state](const std::shared_ptr<restbed::Request>& req, + [this, filter, vcb, pushNotifToken, state](const std::shared_ptr<restbed::Request>& req, const std::shared_ptr<restbed::Response>& reply) { auto code = reply->get_status_code(); if (code == 200) { @@ -571,8 +675,8 @@ DhtProxyClient::doListen(const InfoHash& key, ValueCallback cb, Value::Filter fi auto value = std::make_shared<Value>(json); if (not filter or filter(*value)) { std::lock_guard<std::mutex> lock(lockCallbacks); - callbacks_.emplace_back([cb, value, state]() { - if (not state->cancel and not cb({value}, false)) + callbacks_.emplace_back([vcb, value, state]() { + if (not state->cancel and not vcb({value}, false)) state->cancel = true; }); loopSignal_(); @@ -603,8 +707,8 @@ DhtProxyClient::doCancelListen(const InfoHash& key, size_t ltoken) { std::lock_guard<std::mutex> lock(lockListener_); - auto search = listeners_.find(key); - if (search == listeners_.end()) + auto search = searches_.find(key); + if (search == searches_.end()) return false; auto it = search->second.listeners.find(ltoken); @@ -648,7 +752,7 @@ DhtProxyClient::doCancelListen(const InfoHash& key, size_t ltoken) } search->second.listeners.erase(it); if (search->second.listeners.empty()) { - listeners_.erase(search); + searches_.erase(search); } return true; @@ -677,7 +781,7 @@ void DhtProxyClient::restartListeners() { std::lock_guard<std::mutex> lock(lockListener_); - for (auto& search: listeners_) { + for (auto& search: searches_) { for (auto& l: search.second.listeners) { auto& listener = l.second; if (listener.thread.joinable()) @@ -753,7 +857,7 @@ DhtProxyClient::pushNotificationReceived(const std::map<std::string, std::string #if OPENDHT_PUSH_NOTIFICATIONS try { auto token = std::stoul(notification.at("token")); - for (auto& search: listeners_) { + for (auto& search: searches_) { for (auto& list : search.second.listeners) { auto& listener = list.second; if (*listener.pushNotifToken!= token) diff --git a/src/dht_proxy_server.cpp b/src/dht_proxy_server.cpp index 497609ab392ed55aaac40f918396f76019290f4e..0b77f96c5da2a5657e3e8abd346e2e774588d113 100644 --- a/src/dht_proxy_server.cpp +++ b/src/dht_proxy_server.cpp @@ -21,15 +21,16 @@ #include "default_types.h" #include "dhtrunner.h" -#include "msgpack.hpp" + +#include <msgpack.hpp> +#include <json/json.h> #include <chrono> #include <functional> -#include <json/json.h> #include <limits> +constexpr const std::chrono::seconds OP_TIMEOUT {1 * 60 * 60}; // one hour #if OPENDHT_PUSH_NOTIFICATIONS -constexpr int const TIMEOUT {6 * 60 * 60}; // in seconds (so six hours) constexpr const char* const HTTP_PROTO {"http://"}; // TODO, https for prod #endif //OPENDHT_PUSH_NOTIFICATIONS @@ -37,6 +38,15 @@ using namespace std::placeholders; namespace dht { +struct DhtProxyServer::PermanentPut { + time_point expiration; + Sp<Scheduler::Job> expireJob; +}; +struct DhtProxyServer::SearchPuts { + std::map<dht::Value::Id, PermanentPut> puts; +}; + + DhtProxyServer::DhtProxyServer(std::shared_ptr<DhtRunner> dht, in_port_t port , const std::string& pushServer) : dht_(dht) , pushServer_(pushServer) { @@ -61,12 +71,12 @@ DhtProxyServer::DhtProxyServer(std::shared_ptr<DhtRunner> dht, in_port_t port , resource = std::make_shared<restbed::Resource>(); resource->set_path("/{hash: .*}"); resource->set_method_handler("GET", std::bind(&DhtProxyServer::get, this, _1)); - resource->set_method_handler("LISTEN", std::bind(&DhtProxyServer::listen, this, _1)); + resource->set_method_handler("LISTEN", [this](const Sp<restbed::Session>& session) mutable { listen(session); } ); #if OPENDHT_PUSH_NOTIFICATIONS - resource->set_method_handler("SUBSCRIBE", std::bind(&DhtProxyServer::subscribe, this, _1)); - resource->set_method_handler("UNSUBSCRIBE", std::bind(&DhtProxyServer::unsubscribe, this, _1)); + resource->set_method_handler("SUBSCRIBE", [this](const Sp<restbed::Session>& session) mutable { subscribe(session); } ); + resource->set_method_handler("UNSUBSCRIBE", [this](const Sp<restbed::Session>& session) mutable { unsubscribe(session); } ); #endif //OPENDHT_PUSH_NOTIFICATIONS - resource->set_method_handler("POST", std::bind(&DhtProxyServer::put, this, _1)); + resource->set_method_handler("POST", [this](const Sp<restbed::Session>& session) mutable { put(session); }); #if OPENDHT_PROXY_SERVER_IDENTITY resource->set_method_handler("SIGN", std::bind(&DhtProxyServer::putSigned, this, _1)); resource->set_method_handler("ENCRYPT", std::bind(&DhtProxyServer::putEncrypted, this, _1)); @@ -88,6 +98,7 @@ DhtProxyServer::DhtProxyServer(std::shared_ptr<DhtRunner> dht, in_port_t port , settings->set_port(port); auto maxThreads = std::thread::hardware_concurrency() - 1; settings->set_worker_limit(maxThreads > 1 ? maxThreads : 1); + lastStatsReset_ = clock::now(); try { service_->start(settings); } catch(std::system_error& e) { @@ -96,20 +107,26 @@ DhtProxyServer::DhtProxyServer(std::shared_ptr<DhtRunner> dht, in_port_t port , }); listenThread_ = std::thread([this]() { - while (!service_->is_up() && !stopListeners) { + while (not service_->is_up() and not stopListeners) { std::this_thread::sleep_for(std::chrono::seconds(1)); } - while (service_->is_up() && !stopListeners) { + while (service_->is_up() and not stopListeners) { removeClosedListeners(); - // add listener from push notifs -#if OPENDHT_PUSH_NOTIFICATIONS - handlePushListeners(); -#endif //OPENDHT_PUSH_NOTIFICATIONS std::this_thread::sleep_for(std::chrono::seconds(1)); } // Remove last listeners removeClosedListeners(false); }); + schedulerThread_ = std::thread([this]() { + while (not service_->is_up() and not stopListeners) { + std::this_thread::sleep_for(std::chrono::seconds(1)); + } + while (service_->is_up() and not stopListeners) { + std::unique_lock<std::mutex> lock(schedulerLock_); + auto next = scheduler_.run(); + schedulerCv_.wait_until(lock, next); + } + }); dht->forwardAllMessages(true); } @@ -132,16 +149,37 @@ DhtProxyServer::stop() } } stopListeners = true; + schedulerCv_.notify_all(); // listenThreads_ will stop because there is no more sessions if (listenThread_.joinable()) listenThread_.join(); + if (schedulerThread_.joinable()) + schedulerThread_.join(); if (server_thread.joinable()) server_thread.join(); } +DhtProxyServer::ServerStats +DhtProxyServer::getStats() const +{ + ServerStats ret {}; + auto now = clock::now(); + auto last = lastStatsReset_.exchange(now); + auto count = requestNum_.exchange(0); + auto dt = std::chrono::duration<double>(now - last); + ret.requestRate = count / dt.count(); +#if OPENDHT_PUSH_NOTIFICATIONS + ret.pushListenersCount = pushListeners_.size(); +#endif + ret.putCount = puts_.size(); + ret.listenCount = currentListeners_.size(); + return ret; +} + void DhtProxyServer::getNodeInfo(const std::shared_ptr<restbed::Session>& session) const { + requestNum_++; const auto request = session->get_request(); int content_length = std::stoi(request->get_header("Content-Length", "0")); session->fetch(content_length, @@ -171,6 +209,7 @@ DhtProxyServer::getNodeInfo(const std::shared_ptr<restbed::Session>& session) co void DhtProxyServer::get(const std::shared_ptr<restbed::Session>& session) const { + requestNum_++; const auto request = session->get_request(); int content_length = std::stoi(request->get_header("Content-Length", "0")); auto hash = request->get_path_parameter("hash"); @@ -205,13 +244,12 @@ DhtProxyServer::get(const std::shared_ptr<restbed::Session>& session) const } } ); - } void -DhtProxyServer::listen(const std::shared_ptr<restbed::Session>& session) const +DhtProxyServer::listen(const std::shared_ptr<restbed::Session>& session) { - + requestNum_++; const auto request = session->get_request(); int content_length = std::stoi(request->get_header("Content-Length", "0")); auto hash = request->get_path_parameter("hash"); @@ -261,9 +299,22 @@ DhtProxyServer::listen(const std::shared_ptr<restbed::Session>& session) const } #if OPENDHT_PUSH_NOTIFICATIONS + +struct DhtProxyServer::Listener { + unsigned token; + unsigned callbackId {0}; + std::future<size_t> internalToken; + Sp<Scheduler::Job> expireJob; +}; +struct DhtProxyServer::PushListener { + std::map<InfoHash, std::vector<Listener>> listeners; + bool isAndroid; +}; + void -DhtProxyServer::subscribe(const std::shared_ptr<restbed::Session>& session) const +DhtProxyServer::subscribe(const std::shared_ptr<restbed::Session>& session) { + requestNum_++; const auto request = session->get_request(); int content_length = std::stoi(request->get_header("Content-Length", "0")); auto hash = request->get_path_parameter("hash"); @@ -271,7 +322,7 @@ DhtProxyServer::subscribe(const std::shared_ptr<restbed::Session>& session) cons if (!infoHash) infoHash = InfoHash::get(hash); session->fetch(content_length, - [=](const std::shared_ptr<restbed::Session> s, const restbed::Bytes& b) + [=](const std::shared_ptr<restbed::Session> s, const restbed::Bytes& b) mutable { try { restbed::Bytes buf(b); @@ -297,28 +348,56 @@ DhtProxyServer::subscribe(const std::shared_ptr<restbed::Session>& session) cons { std::lock_guard<std::mutex> lock(lockListener_); // Check if listener is already present and refresh timeout if launched - for(auto& listener: pushListeners_) { - if (listener.pushToken == pushToken && listener.hash == infoHash - && listener.callbackId == callbackId) { - if (listener.started) - listener.deadline = std::chrono::steady_clock::now() - + std::chrono::seconds(TIMEOUT); + auto pushListener = pushListeners_.emplace(pushToken, PushListener{}).first; + auto listeners = pushListener->second.listeners.emplace(infoHash, std::vector<Listener>{}).first; + for (auto& listener: listeners->second) { + if (listener.callbackId == callbackId) { + { + std::lock_guard<std::mutex> l(schedulerLock_); + scheduler_.edit(listener.expireJob, scheduler_.time() + OP_TIMEOUT); + } s->close(restbed::OK, "{\"token\": " + std::to_string(listener.token) + "}\n"); + schedulerCv_.notify_one(); return; } } + pushListener->second.isAndroid = isAndroid; + // The listener is not found, so add it. ++tokenPushNotif_; token = tokenPushNotif_; - PushListener listener; - listener.pushToken = pushToken; - listener.hash = std::move(infoHash); + Listener listener; listener.token = token; - listener.started = false; listener.callbackId = callbackId; - listener.clientId = clientId; - listener.isAndroid = isAndroid; - pushListeners_.emplace_back(std::move(listener)); + listener.internalToken = dht_->listen(infoHash, + [this, pushToken, callbackId, token, isAndroid, clientId](std::vector<std::shared_ptr<Value>> /*value*/) { + // Build message content. + Json::Value json; + if (callbackId > 0) { + json["callback_id"] = callbackId; + } + json["to"] = clientId; + json["token"] = token; + sendPushNotification(pushToken, json, isAndroid); + return true; + } + ); + std::lock_guard<std::mutex> l(schedulerLock_); + listener.expireJob = scheduler_.add(scheduler_.time() + OP_TIMEOUT, + [this, pushToken, infoHash, token, callbackId, clientId, isAndroid] { + cancelPushListen(pushToken, infoHash, token, callbackId); + // Send a push notification to inform the client that this listen has timeout + Json::Value json; + json["timeout"] = infoHash.toString(); + if (callbackId > 0) { + json["callback_id"] = callbackId; + } + json["to"] = clientId; + json["token"] = token; + sendPushNotification(pushToken, json, isAndroid); + } + ); + listeners->second.emplace_back(std::move(listener)); } s->close(restbed::OK, "{\"token\": " + std::to_string(token) + "}\n"); } catch (...) { @@ -329,8 +408,9 @@ DhtProxyServer::subscribe(const std::shared_ptr<restbed::Session>& session) cons } void -DhtProxyServer::unsubscribe(const std::shared_ptr<restbed::Session>& session) const +DhtProxyServer::unsubscribe(const std::shared_ptr<restbed::Session>& session) { + requestNum_++; const auto request = session->get_request(); int content_length = std::stoi(request->get_header("Content-Length", "0")); auto hash = request->get_path_parameter("hash"); @@ -358,19 +438,7 @@ DhtProxyServer::unsubscribe(const std::shared_ptr<restbed::Session>& session) co if (token == 0) return; auto callbackId = root.isMember("callback_id") ? root["callback_id"].asLargestUInt() : 0; - std::lock_guard<std::mutex> lock(lockListener_); - // Check if listener is already present and refresh timeout if launched - auto listener = pushListeners_.begin(); - while (listener != pushListeners_.end()) { - if (listener->pushToken == pushToken && listener->token == token - && listener->hash == infoHash && listener->callbackId == callbackId) { - if (dht_ && listener->started) - dht_->cancelListen(listener->hash, std::move(listener->internalToken.get())); - listener = pushListeners_.erase(listener); - } else { - ++listener; - } - } + cancelPushListen(pushToken, infoHash, token, callbackId); } catch (...) { // do nothing } @@ -378,6 +446,34 @@ DhtProxyServer::unsubscribe(const std::shared_ptr<restbed::Session>& session) co ); } +void +DhtProxyServer::cancelPushListen(const std::string& pushToken, const dht::InfoHash& key, unsigned token, unsigned callbackId) +{ + std::lock_guard<std::mutex> lock(lockListener_); + auto pushListener = pushListeners_.find(pushToken); + if (pushListener == pushListeners_.end()) + return; + auto listeners = pushListener->second.listeners.find(key); + if (listeners == pushListener->second.listeners.end()) + return; + for (auto listener = listeners->second.begin(); listener != listeners->second.end();) { + if (listener->token == token && listener->callbackId == callbackId) { + std::cout << "cancelPushListen " << key << " token:" << token << " cid:" << callbackId << std::endl; + if (dht_) + dht_->cancelListen(key, std::move(listener->internalToken)); + listener = listeners->second.erase(listener); + } else { + ++listener; + } + } + if (listeners->second.empty()) { + pushListener->second.listeners.erase(listeners); + } + if (pushListener->second.listeners.empty()) { + pushListeners_.erase(pushListener); + } +} + void DhtProxyServer::sendPushNotification(const std::string& token, const Json::Value& json, bool isAndroid) const { @@ -415,58 +511,30 @@ DhtProxyServer::sendPushNotification(const std::string& token, const Json::Value restbed::Http::async(req, {}); } +#endif //OPENDHT_PUSH_NOTIFICATIONS + void -DhtProxyServer::handlePushListeners() +DhtProxyServer::cancelPut(const InfoHash& key, Value::Id vid) { - std::lock_guard<std::mutex> lock(lockListener_); - auto pushListener = pushListeners_.begin(); - while (pushListener != pushListeners_.end()) { - if (dht_ && !pushListener->started) { - // Try to start unstarted listeners - auto key = pushListener->pushToken; - auto token = pushListener->token; - auto callbackId = pushListener->callbackId; - auto isAndroid = pushListener->isAndroid; - auto clientId = pushListener->clientId; - pushListener->internalToken = dht_->listen(pushListener->hash, - [this, key, callbackId, token, isAndroid, clientId](std::vector<std::shared_ptr<Value>> /*value*/) { - // Build message content. - Json::Value json; - if (callbackId > 0) { - json["callback_id"] = callbackId; - } - json["to"] = clientId; - json["token"] = token; - sendPushNotification(key, json, isAndroid); - return true; - } - ); - pushListener->deadline = std::chrono::steady_clock::now() + std::chrono::seconds(TIMEOUT); - pushListener->started = true; - pushListener++; - } else if (dht_ && pushListener->started && pushListener->deadline < std::chrono::steady_clock::now()) { - // Cancel listen if deadline has been reached - dht_->cancelListen(pushListener->hash, std::move(pushListener->internalToken.get())); - // Send a push notification to inform the client that this listen has timeout - Json::Value json; - json["timeout"] = pushListener->hash.toString(); - if (pushListener->callbackId > 0) { - json["callback_id"] = pushListener->callbackId; - } - json["to"] = pushListener->clientId; - json["token"] = pushListener->token; - sendPushNotification(pushListener->pushToken, json, pushListener->isAndroid); - pushListener = pushListeners_.erase(pushListener); - } else { - pushListener++; - } - } + std::cout << "cancelPut " << key << " " << vid << std::endl; + auto sPuts = puts_.find(key); + if (sPuts == puts_.end()) + return; + auto& sPutsMap = sPuts->second.puts; + auto put = sPutsMap.find(vid); + if (put == sPutsMap.end()) + return; + if (dht_) + dht_->cancelPut(key, vid); + sPutsMap.erase(put); + if (sPutsMap.empty()) + puts_.erase(sPuts); } -#endif //OPENDHT_PUSH_NOTIFICATIONS void -DhtProxyServer::put(const std::shared_ptr<restbed::Session>& session) const +DhtProxyServer::put(const std::shared_ptr<restbed::Session>& session) { + requestNum_++; const auto request = session->get_request(); int content_length = std::stoi(request->get_header("Content-Length", "0")); auto hash = request->get_path_parameter("hash"); @@ -495,6 +563,24 @@ DhtProxyServer::put(const std::shared_ptr<restbed::Session>& session) const auto value = std::make_shared<Value>(root); auto permanent = root.isMember("permanent") ? root["permanent"].asBool() : false; + if (permanent) { + std::unique_lock<std::mutex> lock(schedulerLock_); + scheduler_.syncTime(); + auto sPuts = puts_.emplace(infoHash, SearchPuts{}).first; + auto timeout = scheduler_.time() + OP_TIMEOUT; + auto vid = value->id; + auto r = sPuts->second.puts.emplace(vid, PermanentPut{}); + if (r.second) { + r.first->second.expireJob = scheduler_.add(timeout, [this, infoHash, vid]{ + cancelPut(infoHash, vid); + }); + } else { + scheduler_.edit(r.first->second.expireJob, timeout); + } + lock.unlock(); + schedulerCv_.notify_one(); + } + dht_->put(infoHash, value, [s, value](bool ok) { if (ok) { Json::StreamWriterBuilder wbuilder; @@ -522,6 +608,7 @@ DhtProxyServer::put(const std::shared_ptr<restbed::Session>& session) const void DhtProxyServer::putSigned(const std::shared_ptr<restbed::Session>& session) const { + requestNum_++; const auto request = session->get_request(); int content_length = std::stoi(request->get_header("Content-Length", "0")); auto hash = request->get_path_parameter("hash"); @@ -568,6 +655,7 @@ DhtProxyServer::putSigned(const std::shared_ptr<restbed::Session>& session) cons void DhtProxyServer::putEncrypted(const std::shared_ptr<restbed::Session>& session) const { + requestNum_++; const auto request = session->get_request(); int content_length = std::stoi(request->get_header("Content-Length", "0")); auto hash = request->get_path_parameter("hash"); @@ -619,6 +707,7 @@ DhtProxyServer::putEncrypted(const std::shared_ptr<restbed::Session>& session) c void DhtProxyServer::handleOptionsMethod(const std::shared_ptr<restbed::Session>& session) const { + requestNum_++; #if OPENDHT_PROXY_SERVER_IDENTITY const auto allowed = "OPTIONS, GET, POST, LISTEN, SIGN, ENCRYPT"; #else @@ -632,6 +721,7 @@ DhtProxyServer::handleOptionsMethod(const std::shared_ptr<restbed::Session>& ses void DhtProxyServer::getFiltered(const std::shared_ptr<restbed::Session>& session) const { + requestNum_++; const auto request = session->get_request(); int content_length = std::stoi(request->get_header("Content-Length", "0")); auto hash = request->get_path_parameter("hash"); diff --git a/src/network_engine.cpp b/src/network_engine.cpp index 9fbc806aeeefa2f4eea6dd8c3c8f44899f277dbe..cb305e170ab12bb91aaeea92c3147918eabaffab 100644 --- a/src/network_engine.cpp +++ b/src/network_engine.cpp @@ -140,7 +140,7 @@ RequestAnswer::RequestAnswer(ParsedMessage&& msg) {} NetworkEngine::NetworkEngine(Logger& log, Scheduler& scheduler, const int& s, const int& s6) - : myid(zeroes), DHT_LOG(log), scheduler(scheduler), dht_socket(s), dht_socket6(s6) + : myid(zeroes), DHT_LOG(log), dht_socket(s), dht_socket6(s6), scheduler(scheduler) {} NetworkEngine::NetworkEngine(InfoHash& myid, NetId net, const int& s, const int& s6, Logger& log, Scheduler& scheduler, decltype(NetworkEngine::onError) onError, @@ -610,6 +610,7 @@ NetworkEngine::process(std::unique_ptr<ParsedMessage>&& msg, const SockAddr& fro DHT_LOG.d(msg->info_hash, node->id, "[node %s] got 'listen' request for %s", node->toString().c_str(), msg->info_hash.toString().c_str()); ++in_stats.listen; RequestAnswer answer = onListen(node, msg->info_hash, msg->token, msg->socket_id, std::move(msg->query)); + auto nnodes = bufferNodes(from.getFamily(), msg->info_hash, msg->want, answer.nodes4, answer.nodes6); sendListenConfirmation(from, msg->tid); break; } diff --git a/src/op_cache.h b/src/op_cache.h index fa87b9804ff6a0ea3fde68570101b1a53f695735..2598b4ab897d078c7bb2a15e74103a4200005799 100644 --- a/src/op_cache.h +++ b/src/op_cache.h @@ -54,13 +54,11 @@ public: auto viop = values.emplace(v->id, OpCacheValueStorage{v}); if (viop.second) { newValues.emplace_back(v); - //std::cout << "onValuesAdded: new value " << v->id << std::endl; } else { viop.first->second.refCount++; - //std::cout << "onValuesAdded: " << viop.first->second.refCount << " refs for value " << v->id << std::endl; } } - return callback(newValues, false); + return newValues.empty() ? true : callback(newValues, false); } bool onValuesExpired(const std::vector<Sp<Value>>& vals) { std::vector<Sp<Value>> expiredValues; @@ -68,13 +66,39 @@ public: auto vit = values.find(v->id); if (vit != values.end()) { vit->second.refCount--; - //std::cout << "onValuesExpired: " << vit->second.refCount << " refs remaining for value " << v->id << std::endl; - if (not vit->second.refCount) + if (not vit->second.refCount) { + expiredValues.emplace_back(std::move(vit->second.data)); values.erase(vit); + } } } - return callback(expiredValues, true); + return expiredValues.empty() ? true : callback(expiredValues, true); } + std::vector<Sp<Value>> get(Value::Filter& filter) const { + std::vector<Sp<Value>> ret; + if (not filter) + ret.reserve(values.size()); + for (const auto& v : values) + if (not filter or filter(*v.second.data)) + ret.emplace_back(v.second.data); + return ret; + } + + Sp<Value> get(Value::Id id) const { + auto v = values.find(id); + if (v == values.end()) + return {}; + return v->second.data; + } + + std::vector<Sp<Value>> getValues() const { + std::vector<Sp<Value>> ret; + ret.reserve(values.size()); + for (const auto& v : values) + ret.emplace_back(v.second.data); + return ret; + } + private: std::map<Value::Id, OpCacheValueStorage> values {}; ValueCallback callback; @@ -82,63 +106,43 @@ private: class OpCache { public: - bool onValue(const std::vector<Sp<Value>>& vals, bool expired) { + OpCache() : cache([this](const std::vector<Sp<Value>>& vals, bool expired){ if (expired) onValuesExpired(vals); else onValuesAdded(vals); + return true; + }) {} + + bool onValue(const std::vector<Sp<Value>>& vals, bool expired) { + cache.onValue(vals, expired); return not listeners.empty(); } void onValuesAdded(const std::vector<Sp<Value>>& vals) { - std::vector<Sp<Value>> newValues; - for (const auto& v : vals) { - auto viop = values.emplace(v->id, OpCacheValueStorage{v}); - if (viop.second) { - newValues.emplace_back(v); - //std::cout << "onValuesAdded: new value " << v->id << std::endl; - } else { - viop.first->second.refCount++; - //std::cout << "onValuesAdded: " << viop.first->second.refCount << " refs for value " << v->id << std::endl; - } - } if (not listeners.empty()) { std::vector<LocalListener> list; list.reserve(listeners.size()); for (const auto& l : listeners) list.emplace_back(l.second); for (auto& l : list) - l.get_cb(l.filter.filter(newValues), false); + l.get_cb(l.filter.filter(vals), false); } } void onValuesExpired(const std::vector<Sp<Value>>& vals) { - std::vector<Sp<Value>> expiredValues; - for (const auto& v : vals) { - auto vit = values.find(v->id); - if (vit != values.end()) { - vit->second.refCount--; - //std::cout << "onValuesExpired: " << vit->second.refCount << " refs remaining for value " << v->id << std::endl; - if (not vit->second.refCount) - values.erase(vit); - } - } if (not listeners.empty()) { std::vector<LocalListener> list; list.reserve(listeners.size()); for (const auto& l : listeners) list.emplace_back(l.second); for (auto& l : list) - l.get_cb(l.filter.filter(expiredValues), true); + l.get_cb(l.filter.filter(vals), true); } } void addListener(size_t token, ValueCallback cb, Sp<Query> q, Value::Filter filter) { listeners.emplace(token, LocalListener{q, filter, cb}); - std::vector<Sp<Value>> newValues; - newValues.reserve(values.size()); - for (const auto& v : values) - newValues.emplace_back(v.second.data); - cb(newValues, false); + cb(cache.get(filter), false); } bool removeListener(size_t token) { @@ -150,26 +154,17 @@ public: } std::vector<Sp<Value>> get(Value::Filter& filter) const { - std::vector<Sp<Value>> ret; - if (not filter) - ret.reserve(values.size()); - for (const auto& v : values) - if (not filter or filter(*v.second.data)) - ret.emplace_back(v.second.data); - return ret; + return cache.get(filter); } Sp<Value> get(Value::Id id) const { - auto v = values.find(id); - if (v == values.end()) - return {}; - return v->second.data; + return cache.get(id); } size_t searchToken; private: + OpValueCache cache; std::map<size_t, LocalListener> listeners; - std::map<Value::Id, OpCacheValueStorage> values; }; class SearchCache { @@ -242,5 +237,4 @@ private: size_t nextToken_ {1}; }; - } diff --git a/src/search.h b/src/search.h index 9a492eb4d9cac4837dc3e7c2f3655aadfa14e4ea..faefa443c1d6fd4c4d7ca2337e70c134e8cd91ed 100644 --- a/src/search.h +++ b/src/search.h @@ -63,6 +63,7 @@ struct Dht::SearchNode { struct CachedListenStatus { ValueCache cache; + Sp<Scheduler::Job> cacheExpirationJob {}; Sp<net::Request> req {}; CachedListenStatus(ValueStateCallback&& cb) : cache(std::forward<ValueStateCallback>(cb)) {} CachedListenStatus(CachedListenStatus&&) = default; @@ -213,13 +214,22 @@ struct Dht::SearchNode { getStatus.clear(); } - void onValues(const Sp<Query>& q, net::RequestAnswer&& answer, const TypeStore& types, const time_point& now) + void onValues(const Sp<Query>& q, net::RequestAnswer&& answer, const TypeStore& types, Scheduler& scheduler) { auto l = listenStatus.find(q); if (l != listenStatus.end()) { - l->second.cache.onValues(answer.values, + auto next = l->second.cache.onValues(answer.values, answer.refreshed_values, - answer.expired_values, types, now); + answer.expired_values, types, scheduler.time()); + scheduler.edit(l->second.cacheExpirationJob, next); + } + } + + void expireValues(const Sp<Query>& q, Scheduler& scheduler) { + auto l = listenStatus.find(q); + if (l != listenStatus.end()) { + auto next = l->second.cache.expireValues(scheduler.time()); + scheduler.edit(l->second.cacheExpirationJob, next); } } diff --git a/src/value_cache.h b/src/value_cache.h index 06fe255f2eb61e64156f76730389a270704861c9..47446d896095213dc47fa98bded8a26887da956b 100644 --- a/src/value_cache.h +++ b/src/value_cache.h @@ -54,14 +54,26 @@ public: return ret; } - CallbackQueue expireValues(const time_point& now) { + time_point expireValues(const time_point& now) { + time_point ret = time_point::max(); + auto cbs = expireValues(now, ret); + while (not cbs.empty()) { + cbs.front()(); + cbs.pop_front(); + } + return ret; + } + + CallbackQueue expireValues(const time_point& now, time_point& next) { std::vector<Sp<Value>> expired_values; for (auto it = values.begin(); it != values.end();) { if (it->second.expiration < now) { expired_values.emplace_back(std::move(it->second.data)); it = values.erase(it); - } else + } else { + next = std::min(next, it->second.expiration); ++it; + } } while (values.size() >= MAX_VALUES) { // too many values, remove oldest values @@ -88,24 +100,26 @@ public: return ret; } - void onValues + time_point onValues (const std::vector<Sp<Value>>& values, const std::vector<Value::Id>& refreshed_values, const std::vector<Value::Id>& expired_values, const TypeStore& types, const time_point& now) { CallbackQueue cbs; + time_point ret = time_point::max(); if (not values.empty()) cbs.splice(cbs.end(), addValues(values, types, now)); for (const auto& vid : refreshed_values) refreshValue(vid, types, now); for (const auto& vid : expired_values) cbs.splice(cbs.end(), expireValue(vid)); - cbs.splice(cbs.end(), expireValues(now)); + cbs.splice(cbs.end(), expireValues(now, ret)); while (not cbs.empty()) { cbs.front()(); cbs.pop_front(); } + return ret; } private: @@ -134,10 +148,11 @@ private: for (const auto& value : new_values) { auto v = values.find(value->id); if (v == values.end()) { - nvals.emplace_back(value); // new value + nvals.emplace_back(value); values.emplace(value->id, CacheValueStorage(value, now, now + types.getType(value->type).expiration)); } else { + // refreshed value v->second.created = now; v->second.expiration = now + types.getType(v->second.data->type).expiration; } @@ -174,5 +189,4 @@ private: } }; - } diff --git a/tools/dhtnode.cpp b/tools/dhtnode.cpp index 777903a18492a09ee51db47ce0b93560885efe02..d2d81775591f15212f213fff7146f6fd4297de43 100644 --- a/tools/dhtnode.cpp +++ b/tools/dhtnode.cpp @@ -87,6 +87,7 @@ void print_help() { << " cl <key> <token> Cancel listen for <token> and <key>." << std::endl << " p <key> <str> Put string value at <key>." << std::endl << " pp <key> <str> Put string value at <key> (persistent version)." << std::endl + << " cpp <key> <id> Cancel persistent put operation for <key> and value <id>." << std::endl << " s <key> <str> Put string value at <key>, signed with our generated private key." << std::endl << " e <key> <dest> <str> Put string value at <key>, encrypted for <dest> with its public key (if found)." << std::endl; @@ -139,6 +140,12 @@ void cmd_loop(std::shared_ptr<DhtRunner>& dht, dht_params& params std::cout << dht->getNodesStats(AF_INET).toString() << std::endl; std::cout << "IPv6 stats:" << std::endl; std::cout << dht->getNodesStats(AF_INET6).toString() << std::endl; +#if OPENDHT_PROXY_SERVER + for (const auto& proxy : proxies) { + std::cout << "Stats for proxy on port " << proxy.first << std::endl; + std::cout << " " << proxy.second->getStats().toString() << std::endl; + } +#endif continue; } else if (op == "lr") { std::cout << "IPv4 routing table:" << std::endl;