diff --git a/CMakeLists.txt b/CMakeLists.txt index 443a9963846813601463cb70cba6933f7c04a9e2..5395402a13a88f27494ec193c9adac04a42126ea 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -125,6 +125,8 @@ list (APPEND opendht_SOURCES src/storage.h src/listener.h src/search.h + src/value_cache.h + src/op_cache.h src/net.h src/parsed_message.h src/request.h diff --git a/include/opendht/callbacks.h b/include/opendht/callbacks.h index fd067c6ba23ce615957eba4336fc23d9a558202a..8507a77e09588459292e8453c1f8c352ec4b065a 100644 --- a/include/opendht/callbacks.h +++ b/include/opendht/callbacks.h @@ -98,6 +98,7 @@ using ValuesExport = std::pair<InfoHash, Blob>; using QueryCallback = std::function<bool(const std::vector<std::shared_ptr<FieldValueIndex>>& fields)>; using GetCallback = std::function<bool(const std::vector<std::shared_ptr<Value>>& values)>; +using ValueCallback = std::function<bool(const std::vector<std::shared_ptr<Value>>& values, bool expired)>; using GetCallbackSimple = std::function<bool(std::shared_ptr<Value> value)>; using ShutdownCallback = std::function<void()>; diff --git a/include/opendht/dht.h b/include/opendht/dht.h index aafd2f5606ea354d80b5e058641a8c8177771c7b..982d41c63807208ae2bdccbaa647ac28b2b21d91 100644 --- a/include/opendht/dht.h +++ b/include/opendht/dht.h @@ -230,8 +230,16 @@ public: * * @return a token to cancel the listener later. */ - virtual size_t listen(const InfoHash&, GetCallback, Value::Filter={}, Where w = {}); - virtual size_t listen(const InfoHash& key, GetCallbackSimple cb, Value::Filter f={}, Where w = {}) { + virtual size_t listen(const InfoHash&, ValueCallback, Value::Filter={}, Where={}); + + virtual size_t listen(const InfoHash& key, GetCallback cb, Value::Filter f={}, Where w={}) { + return listen(key, [cb](const std::vector<Sp<Value>>& vals, bool expired){ + if (not expired) + return cb(vals); + return true; + }, std::forward<Value::Filter>(f), std::forward<Where>(w)); + } + virtual size_t listen(const InfoHash& key, GetCallbackSimple cb, Value::Filter f={}, Where w={}) { return listen(key, bindGetCb(cb), std::forward<Value::Filter>(f), std::forward<Where>(w)); } @@ -459,7 +467,7 @@ private: Sp<Search> search(const InfoHash& id, sa_family_t af, GetCallback = {}, QueryCallback = {}, DoneCallback = {}, Value::Filter = {}, Query q = {}); void announce(const InfoHash& id, sa_family_t af, Sp<Value> value, DoneCallback callback, time_point created=time_point::max(), bool permanent = false); - size_t listenTo(const InfoHash& id, sa_family_t af, GetCallback cb, Value::Filter f = Value::AllFilter(), const Sp<Query>& q = {}); + size_t listenTo(const InfoHash& id, sa_family_t af, ValueCallback cb, Value::Filter f = Value::AllFilter(), const Sp<Query>& q = {}); /** * Refill the search with good nodes if possible. diff --git a/include/opendht/dht_interface.h b/include/opendht/dht_interface.h index 7f3641968a333fc8e969399e9fed260ec9b82f99..30d0a60bba1cd504ef3c2d834159da8414972251 100644 --- a/include/opendht/dht_interface.h +++ b/include/opendht/dht_interface.h @@ -165,6 +165,7 @@ public: */ virtual size_t listen(const InfoHash&, GetCallback, Value::Filter={}, Where w = {}) = 0; virtual size_t listen(const InfoHash& key, GetCallbackSimple cb, Value::Filter f={}, Where w = {}) = 0; + virtual size_t listen(const InfoHash&, ValueCallback, Value::Filter={}, Where w = {}) = 0; virtual bool cancelListen(const InfoHash&, size_t token) = 0; @@ -235,11 +236,6 @@ public: * @param notification to process */ virtual void pushNotificationReceived(const std::map<std::string, std::string>& data) = 0; - /** - * Refresh a listen via a token - * @param token - */ - virtual void resubscribe(unsigned token) = 0; protected: bool logFilerEnable_ {}; diff --git a/include/opendht/dht_proxy_client.h b/include/opendht/dht_proxy_client.h index 35083d358e65e0e525044b65ba5f4fb32e918e21..df3fe3638307a25b7440c53464650e67b55585ca 100644 --- a/include/opendht/dht_proxy_client.h +++ b/include/opendht/dht_proxy_client.h @@ -40,10 +40,12 @@ namespace Json { namespace dht { +class SearchCache; + class OPENDHT_PUBLIC DhtProxyClient final : public DhtInterface { public: - DhtProxyClient() : scheduler(DHT_LOG) {} + DhtProxyClient(); explicit DhtProxyClient(std::function<void()> loopSignal, const std::string& serverHost, const std::string& pushClientId = ""); @@ -102,8 +104,6 @@ public: get(key, bindGetCb(cb), bindDoneCb(donecb), std::forward<Value::Filter>(f), std::forward<Where>(w)); } - void get(const InfoHash& key, const GetCallback& cb, DoneCallback donecb, const Value::Filter& filterChain); - /** * Announce a value on all available protocols (IPv4, IPv6). * @@ -161,22 +161,25 @@ public: * * @return a token to cancel the listener later. */ - virtual size_t listen(const InfoHash&, GetCallback, Value::Filter={}, Where={}); - virtual size_t listen(const InfoHash& key, GetCallbackSimple cb, Value::Filter f={}, Where w = {}) { + virtual size_t listen(const InfoHash&, ValueCallback, Value::Filter={}, Where={}); + + virtual size_t listen(const InfoHash& key, GetCallback cb, Value::Filter f={}, Where w={}) { + return listen(key, [cb](const std::vector<Sp<Value>>& vals, bool expired){ + if (not expired) + return cb(vals); + return true; + }, std::forward<Value::Filter>(f), std::forward<Where>(w)); + } + virtual size_t listen(const InfoHash& key, GetCallbackSimple cb, Value::Filter f={}, Where w={}) { return listen(key, bindGetCb(cb), std::forward<Value::Filter>(f), std::forward<Where>(w)); } - virtual bool cancelListen(const InfoHash&, size_t token); + virtual bool cancelListen(const InfoHash& key, size_t token); /** * Call linked callback with a push notification * @param notification to process */ void pushNotificationReceived(const std::map<std::string, std::string>& notification); - /** - * Refresh a listen via a token - * @param token - */ - void resubscribe(const unsigned token); time_point periodic(const uint8_t*, size_t, const SockAddr&); time_point periodic(const uint8_t *buf, size_t buflen, const sockaddr* from, socklen_t fromlen) { @@ -217,6 +220,16 @@ public: void pingNode(const sockaddr*, socklen_t, DoneCallbackSimple&& /*cb*/={}) { } + virtual void registerType(const ValueType& type) { + types.registerType(type); + } + const ValueType& getType(ValueType::Id type_id) const { + return types.getType(type_id); + } + + std::vector<Sp<Value>> getLocal(const InfoHash& k, Value::Filter filter) const; + Sp<Value> getLocalById(const InfoHash& k, Value::Id id) const; + /** * NOTE: The following methods will not be implemented because the * DhtProxyClient doesn't have any storage nor synchronization process @@ -225,10 +238,6 @@ public: void insertNode(const InfoHash&, const sockaddr*, socklen_t) { } void insertNode(const NodeExport&) { } std::pair<size_t, size_t> getStoreSize() const { return {}; } - virtual void registerType(const ValueType&) { } - const ValueType& getType(ValueType::Id) const { return NO_VALUE; } - std::vector<Sp<Value>> getLocal(const InfoHash&, Value::Filter) const { return {}; } - Sp<Value> getLocalById(const InfoHash&, Value::Id) const { return {}; } std::vector<NodeExport> exportNodes() { return {}; } std::vector<ValuesExport> exportValues() const { return {}; } void importValues(const std::vector<ValuesExport>&) {} @@ -244,8 +253,6 @@ public: void connectivityChanged() { } private: - const ValueType NO_VALUE; - /** * Start the connection with a server. */ @@ -261,6 +268,9 @@ private: void opFailed(); + size_t doListen(const InfoHash& key, ValueCallback, Value::Filter); + bool doCancelListen(const InfoHash& key, size_t token); + /** * Initialize statusIpvX_ */ @@ -273,6 +283,7 @@ private: * cancel all Operations */ void cancelAllOperations(); + std::string serverHost_; std::string pushClientId_; @@ -285,22 +296,16 @@ private: InfoHash myid {}; + // registred types + TypeStore types; + /** * Store listen requests. */ - struct Listener - { - size_t token; - std::shared_ptr<restbed::Request> req; - std::string key; - GetCallback cb; - Value::Filter filterChain; - std::thread thread; - unsigned callbackId; - std::shared_ptr<bool> isCanceledViaClose; // NOTE: unused if using push notifications - std::shared_ptr<unsigned> pushNotifToken; // NOTE: unused if not using push notifications - }; - std::vector<Listener> listeners_; + struct Listener; + struct ProxySearch; + std::map<InfoHash, ProxySearch> listeners_; + size_t listener_token_ {0}; std::mutex lockListener_; @@ -335,13 +340,18 @@ private: */ void restartListeners(); + /** + * Refresh a listen via a token + * @param token + */ + void resubscribe(const InfoHash& key, Listener& listener); + /** * If we want to use push notifications by default. * NOTE: empty by default to avoid to use services like FCM or APN. */ std::string deviceKey_ {}; - unsigned callbackId_ {0}; - std::mutex lockCallback_; + std::atomic_uint callbackId_ {0}; const std::function<void()> loopSignal_; diff --git a/include/opendht/dhtrunner.h b/include/opendht/dhtrunner.h index 6814f517701bbd285776db662a39585e31cdeccb..d14a67257e77796a9e3d2d4d447f180c1dca3adb 100644 --- a/include/opendht/dhtrunner.h +++ b/include/opendht/dhtrunner.h @@ -136,7 +136,15 @@ public: query(hash, cb, bindDoneCb(done_cb), q); } - std::future<size_t> listen(InfoHash key, GetCallback vcb, Value::Filter f = Value::AllFilter(), Where w = {}); + std::future<size_t> listen(InfoHash key, ValueCallback vcb, Value::Filter f = Value::AllFilter(), Where w = {}); + + std::future<size_t> listen(InfoHash key, GetCallback cb, Value::Filter f={}, Where w={}) { + return listen(key, [cb](const std::vector<Sp<Value>>& vals, bool expired){ + if (not expired) + return cb(vals); + return true; + }, std::forward<Value::Filter>(f), std::forward<Where>(w)); + } std::future<size_t> listen(const std::string& key, GetCallback vcb, Value::Filter f = Value::AllFilter(), Where w = {}); std::future<size_t> listen(InfoHash key, GetCallbackSimple cb, Value::Filter f = Value::AllFilter(), Where w = {}) { return listen(key, bindGetCb(cb), f, w); @@ -307,7 +315,7 @@ public: * @param threaded: If false, ::loop() must be called periodically. Otherwise a thread is launched. * @param cb: Optional callback to receive general state information. */ - void run(in_port_t port, const crypto::Identity identity, bool threaded = false, NetId network = 0) { + void run(in_port_t port = 4222, const crypto::Identity identity = {}, bool threaded = false, NetId network = 0) { run(port, { /*.dht_config = */{ /*.node_config = */{ @@ -400,11 +408,6 @@ public: * Insert a push notification to process for OpenDHT */ void pushNotificationReceived(const std::map<std::string, std::string>& data) const; - /** - * Refresh a listen via a token - * @param token - */ - void resubscribe(unsigned token); /* Proxy server mothods */ void forwardAllMessages(bool forward); @@ -451,15 +454,8 @@ private: /** * Store current listeners and translates global tokens for each client. */ - struct Listener { - size_t tokenClassicDht; - size_t tokenProxyDht; - GetCallback gcb; - InfoHash hash; - Value::Filter f; - Where w; - }; - std::map<size_t, Listener> listeners_ {}; + struct Listener; + std::map<size_t, Listener> listeners_; size_t listener_token_ {1}; mutable std::mutex dht_mtx {}; diff --git a/include/opendht/network_engine.h b/include/opendht/network_engine.h index 2a75e9a3b11bf5ae080cd8290a8550d029ab64e2..1afc93e1eaf4731c6f1b175bc88a794b849188b0 100644 --- a/include/opendht/network_engine.h +++ b/include/opendht/network_engine.h @@ -1,5 +1,5 @@ /* - * Copyright (C) 2014-2017 Savoir-faire Linux Inc. + * Copyright (C) 2014-2018 Savoir-faire Linux Inc. * Author(s) : Adrien Béraud <adrien.beraud@savoirfairelinux.com> * Simon Désaulniers <simon.desaulniers@savoirfairelinux.com> * @@ -87,6 +87,8 @@ struct RequestAnswer { Blob ntoken {}; Value::Id vid {}; std::vector<Sp<Value>> values {}; + std::vector<Value::Id> refreshed_values {}; + std::vector<Value::Id> expired_values {}; std::vector<Sp<FieldValueIndex>> fields {}; std::vector<Sp<Node>> nodes4 {}; std::vector<Sp<Node>> nodes6 {}; @@ -235,6 +237,9 @@ public: std::vector<Sp<Node>>&& nodes, std::vector<Sp<Node>>&& nodes6, std::vector<Sp<Value>>&& values, const Query& q); + void tellListenerRefreshed(Sp<Node> n, Tid socket_id, const InfoHash& hash, const Blob& ntoken, const std::vector<Value::Id>& values); + void tellListenerExpired(Sp<Node> n, Tid socket_id, const InfoHash& hash, const Blob& ntoken, const std::vector<Value::Id>& values); + bool isRunning(sa_family_t af) const; inline want_t want () const { return dht_socket >= 0 && dht_socket6 >= 0 ? (WANT4 | WANT6) : -1; } diff --git a/include/opendht/securedht.h b/include/opendht/securedht.h index 7d300fba2a0636935b4e7985706110027d58c738..287dca53fe96a410c8f9b5aefa6f3ab2ee2bbacd 100644 --- a/include/opendht/securedht.h +++ b/include/opendht/securedht.h @@ -100,8 +100,6 @@ public: get(key, bindGetCb(cb), bindDoneCb(donecb), std::forward<Value::Filter>(f), std::forward<Where>(w)); } - size_t listen(const InfoHash& id, GetCallback cb, Value::Filter = {}, Where w = {}); - /** * Will take ownership of the value, sign it using our private key and put it in the DHT. */ @@ -280,8 +278,11 @@ public: bool cancelPut(const InfoHash& h, const Value::Id& vid) { return dht_->cancelPut(h, vid); } + + size_t listen(const InfoHash& key, ValueCallback, Value::Filter={}, Where={}); + size_t listen(const InfoHash& key, GetCallback cb, Value::Filter = {}, Where w = {}); size_t listen(const InfoHash& key, GetCallbackSimple cb, Value::Filter f={}, Where w = {}) { - return dht_->listen(key, cb, f, w); + return listen(key, bindGetCb(cb), f, w); } bool cancelListen(const InfoHash& h, size_t token) { return dht_->cancelListen(h, token); @@ -308,13 +309,6 @@ public: void pushNotificationReceived(const std::map<std::string, std::string>& notification) { dht_->pushNotificationReceived(notification); } - /** - * Refresh a listen via a token - * @param token - */ - void resubscribe(const unsigned token) { - dht_->resubscribe(token); - } void setLoggers(LogMethod error = NOLOG, LogMethod warn = NOLOG, LogMethod debug = NOLOG) { @@ -338,6 +332,8 @@ private: SecureDht(const SecureDht&) = delete; SecureDht& operator=(const SecureDht&) = delete; + Sp<Value> checkValue(const Sp<Value>& v); + ValueCallback getCallbackFilter(ValueCallback, Value::Filter&&); GetCallback getCallbackFilter(GetCallback, Value::Filter&&); Sp<crypto::PrivateKey> key_ {}; diff --git a/include/opendht/value.h b/include/opendht/value.h index 388f8fb077ce81e785bd140adb94d08ffe8ab17d..056d5bf900acf417a78cc7d664cab57e9dbecb37 100644 --- a/include/opendht/value.h +++ b/include/opendht/value.h @@ -842,6 +842,10 @@ struct OPENDHT_PUBLIC Where return ss.str(); } + bool empty() const { + return filters_.empty(); + } + OPENDHT_PUBLIC friend std::ostream& operator<<(std::ostream& s, const dht::Where& q); private: diff --git a/src/Makefile.am b/src/Makefile.am index 0af5b39665f22afcd15ac3c0e0602c2103cc58fc..fc65afea4ab0dff1f06c39b33bdc0214b5c493d7 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -9,6 +9,8 @@ libopendht_la_SOURCES = \ listener.h \ request.h \ search.h \ + value_cache.h \ + op_cache.h \ net.h \ parsed_message.h \ node_cache.cpp \ diff --git a/src/dht.cpp b/src/dht.cpp index 277911ce4f92ad9cfedd8e5031693c61cf01eeaf..298267d43319b6514528d7df89279f674975716b 100644 --- a/src/dht.cpp +++ b/src/dht.cpp @@ -538,18 +538,30 @@ Dht::searchStep(Sp<Search> sr) for (auto& n : sr->nodes) { if (not n.isSynced(now)) continue; + std::weak_ptr<Search> ws = sr; for (const auto& l : sr->listeners) { const auto& query = l.second.query; + auto list_token = l.first; if (n.getListenTime(query) > now) continue; DHT_LOG.w(sr->id, n.node->id, "[search %s] [node %s] sending 'listen'", sr->id.toString().c_str(), n.node->toString().c_str()); - const auto& r = n.listenStatus.find(query); - auto prev_req = r != n.listenStatus.end() ? r->second : nullptr; - - std::weak_ptr<Search> ws = sr; - n.listenStatus[query] = network_engine.sendListen(n.node, sr->id, *query, n.token, prev_req, + auto r = n.listenStatus.find(query); + if (r == n.listenStatus.end()) { + r = n.listenStatus.emplace(query, SearchNode::CachedListenStatus{ + [this,ws,list_token](const std::vector<Sp<Value>>& values, bool expired){ + if (auto sr = ws.lock()) { + auto l = sr->listeners.find(list_token); + if (l != sr->listeners.end()) { + l->second.get_cb(l->second.filter.filter(values), expired); + } + } + } + }).first; + } + auto prev_req = r != n.listenStatus.end() ? r->second.req : nullptr; + r->second.req = network_engine.sendListen(n.node, sr->id, *query, n.token, prev_req, [this,ws,query](const net::Request& req, net::RequestAnswer&& answer) mutable { /* on done */ if (auto sr = ws.lock()) { @@ -572,7 +584,9 @@ Dht::searchStep(Sp<Search> sr) { /* on new values */ if (auto sr = ws.lock()) { scheduler.edit(sr->nextSearchStep, scheduler.time()); - onGetValuesDone(node, answer, sr, query); + if (auto sn = sr->getNode(node)) + sn->onValues(query, std::move(answer), types, scheduler.time()); + //onGetValuesDone(node, answer, sr, query); } } ); @@ -772,9 +786,8 @@ Dht::announce(const InfoHash& id, } size_t -Dht::listenTo(const InfoHash& id, sa_family_t af, GetCallback cb, Value::Filter f, const Sp<Query>& q) +Dht::listenTo(const InfoHash& id, sa_family_t af, ValueCallback cb, Value::Filter f, const Sp<Query>& q) { - const auto& now = scheduler.time(); if (!isRunning(af)) return 0; // DHT_LOG_ERR("[search %s IPv%c] search_time is now in %lfs", sr->id.toString().c_str(), (sr->af == AF_INET) ? '4' : '6', print_dt(tm-clock::now())); @@ -786,15 +799,11 @@ Dht::listenTo(const InfoHash& id, sa_family_t af, GetCallback cb, Value::Filter if (!sr) throw DhtException("Can't create search"); DHT_LOG.e(id, "[search %s IPv%c] listen", id.toString().c_str(), (af == AF_INET) ? '4' : '6'); - sr->done = false; - auto token = ++sr->listener_token; - sr->listeners.emplace(token, LocalListener{q, f, cb}); - scheduler.edit(sr->nextSearchStep, now); - return token; + return sr->listen(cb, f, q, scheduler); } size_t -Dht::listen(const InfoHash& id, GetCallback cb, Value::Filter f, Where where) +Dht::listen(const InfoHash& id, ValueCallback cb, Value::Filter f, Where where) { scheduler.syncTime(); @@ -802,27 +811,7 @@ Dht::listen(const InfoHash& id, GetCallback cb, Value::Filter f, Where where) auto vals = std::make_shared<std::map<Value::Id, Sp<Value>>>(); auto token = ++listener_token; - auto gcb = [=](const std::vector<Sp<Value>>& values) { - std::vector<Sp<Value>> newvals; - for (const auto& v : values) { - auto it = vals->find(v->id); - if (it == vals->cend() || !(*it->second == *v)) - newvals.push_back(v); - } - if (!newvals.empty()) { - if (!cb(newvals)) { - // cancelListen is useful here, because we need to cancel on IPv4 and 6 - cancelListen(id, token); - return false; - } - for (const auto& v : newvals) { - auto it = vals->emplace(v->id, v); - if (not it.second) - it.first->second = v; - } - } - return true; - }; + auto gcb = OpValueCache::cacheCallback(std::move(cb)); auto query = std::make_shared<Query>(q); auto filter = f.chain(q.where.getFilter()); @@ -834,7 +823,7 @@ Dht::listen(const InfoHash& id, GetCallback cb, Value::Filter f, Where where) if (not st->second.empty()) { std::vector<Sp<Value>> newvals = st->second.get(filter); if (not newvals.empty()) { - if (!cb(newvals)) + if (!gcb(newvals, false)) return 0; for (const auto& v : newvals) { auto it = vals->emplace(v->id, v); @@ -1140,7 +1129,7 @@ Dht::storageChanged(const InfoHash& id, Storage& st, ValueStorage& v) { if (not st.local_listeners.empty()) { DHT_LOG.d(id, "[store %s] %lu local listeners", id.toString().c_str(), st.local_listeners.size()); - std::vector<std::pair<GetCallback, std::vector<Sp<Value>>>> cbs; + std::vector<std::pair<ValueCallback, std::vector<Sp<Value>>>> cbs; for (const auto& l : st.local_listeners) { std::vector<Sp<Value>> vals; if (not l.second.filter or l.second.filter(*v.data)) @@ -1154,7 +1143,7 @@ Dht::storageChanged(const InfoHash& id, Storage& st, ValueStorage& v) } // listeners are copied: they may be deleted by the callback for (auto& cb : cbs) - cb.first(cb.second); + cb.first(cb.second, false); } if (not st.listeners.empty()) { @@ -1242,11 +1231,33 @@ Dht::storageAddListener(const InfoHash& id, const Sp<Node>& node, size_t socket_ void Dht::expireStore(decltype(store)::iterator i) { - auto stats = i->second.expire(i->first, scheduler.time()); - total_store_size += stats.size_diff; - total_values += stats.values_diff; - if (stats.values_diff) { - DHT_LOG.d(i->first, "[store %s] discarded %ld expired values (%ld bytes)", i->first.toString().c_str(), -stats.values_diff, -stats.size_diff); + const auto& id = i->first; + auto& st = i->second; + auto stats = st.expire(id, scheduler.time()); + total_store_size += stats.first; + total_values -= stats.second.size(); + if (not stats.second.empty()) { + DHT_LOG.d(id, "[store %s] discarded %ld expired values (%ld bytes)", + id.toString().c_str(), stats.second.size(), -stats.first); + + if (not st.listeners.empty()) { + DHT_LOG.d(id, "[store %s] %lu remote listeners", id.toString().c_str(), st.listeners.size()); + + std::vector<Value::Id> ids; + ids.reserve(stats.second.size()); + for (const auto& v : stats.second) + ids.emplace_back(v->id); + + for (const auto& node_listeners : st.listeners) { + for (const auto& l : node_listeners.second) { + DHT_LOG.w(id, node_listeners.first->id, "[store %s] [node %s] sending expired", + id.toString().c_str(), + node_listeners.first->toString().c_str()); + Blob ntoken = makeToken(node_listeners.first->getAddr(), false); + network_engine.tellListenerExpired(node_listeners.first, l.first, id, ntoken, ids); + } + } + } } } @@ -1319,7 +1330,7 @@ Dht::connectivityChanged(sa_family_t af) for (auto& sp : searches(af)) for (auto& sn : sp.second->nodes) { for (auto& ls : sn.listenStatus) - sn.node->cancelRequest(ls.second); + sn.node->cancelRequest(ls.second.req); sn.listenStatus.clear(); } reported_addr.erase(std::remove_if(reported_addr.begin(), reported_addr.end(), [&](const ReportedAddr& addr){ @@ -1872,7 +1883,7 @@ Dht::periodic(const uint8_t *buf, size_t buflen, const SockAddr& from) try { network_engine.processMessage(buf, buflen, from); } catch (const std::exception& e) { - DHT_LOG.e("Can't parse message from %s: %s", from.toString().c_str(), e.what()); + DHT_LOG.e("Can't process message from %s: %s", from.toString().c_str(), e.what()); } } return scheduler.run(); @@ -2169,7 +2180,7 @@ void Dht::onGetValuesDone(const Sp<Node>& node, } /* callbacks for local search listeners */ - std::vector<std::pair<GetCallback, std::vector<Sp<Value>>>> tmp_lists; + /*std::vector<std::pair<ValueCallback, std::vector<Sp<Value>>>> tmp_lists; for (auto& l : sr->listeners) { if (!l.second.get_cb or (orig_query and l.second.query and not l.second.query->isSatisfiedBy(*orig_query))) continue; @@ -2181,7 +2192,10 @@ void Dht::onGetValuesDone(const Sp<Node>& node, tmp_lists.emplace_back(l.second.get_cb, std::move(tmp)); } for (auto& l : tmp_lists) - l.first(l.second); + l.first(l.second, false);*/ + } else if (not a.expired_values.empty()) { + DHT_LOG.w(sr->id, node->id, "[search %s] [node %s] %u expired values", + sr->id.toString().c_str(), node->toString().c_str(), a.expired_values.size()); } } else { DHT_LOG.w(sr->id, "[node %s] no token provided. Ignoring response content.", node->toString().c_str()); diff --git a/src/dht_proxy_client.cpp b/src/dht_proxy_client.cpp index f0fd168c2cb21fa1546c157594d61d98292e615d..9a5b2d05b6738973bc82a52e1c85bf455c9b43bf 100644 --- a/src/dht_proxy_client.cpp +++ b/src/dht_proxy_client.cpp @@ -27,11 +27,34 @@ #include <vector> #include "dhtrunner.h" +#include "op_cache.h" constexpr const char* const HTTP_PROTO {"http://"}; namespace dht { +struct DhtProxyClient::Listener +{ + ValueCache cache; + ValueCallback cb; + Value::Filter filter; + Sp<restbed::Request> req; + std::thread thread; + unsigned callbackId; + Sp<bool> isCanceledViaClose; + Sp<unsigned> pushNotifToken; // NOTE: unused if not using push notifications + Listener(ValueCache&& c, const Sp<restbed::Request>& r, Value::Filter&& f, unsigned cid) + : cache(std::move(c)), filter(std::move(f)), req(r), callbackId(cid), isCanceledViaClose(std::make_shared<bool>(false)) + {} +}; + +struct DhtProxyClient::ProxySearch { + SearchCache ops {}; + std::map<size_t, Listener> listeners {}; +}; + +DhtProxyClient::DhtProxyClient() : scheduler(DHT_LOG) {} + DhtProxyClient::DhtProxyClient(std::function<void()> signal, const std::string& serverHost, const std::string& pushClientId) : serverHost_(serverHost), pushClientId_(pushClientId), scheduler(DHT_LOG), loopSignal_(signal) { @@ -61,6 +84,22 @@ DhtProxyClient::~DhtProxyClient() cancelAllListeners(); } +std::vector<Sp<Value>> +DhtProxyClient::getLocal(const InfoHash& k, Value::Filter filter) const { + auto s = listeners_.find(k); + if (s == listeners_.end()) + return {}; + return s->second.ops.get(filter); +} + +Sp<Value> +DhtProxyClient::getLocalById(const InfoHash& k, Value::Id id) const { + auto s = listeners_.find(k); + if (s == listeners_.end()) + return {}; + return s->second.ops.get(id); +} + void DhtProxyClient::cancelAllOperations() { @@ -82,13 +121,14 @@ void DhtProxyClient::cancelAllListeners() { std::lock_guard<std::mutex> lock(lockListener_); - for (auto& listener: listeners_) { - if (listener.thread.joinable()) { - // Close connection to stop listener? - if (listener.req) - restbed::Http::close(listener.req); - listener.thread.join(); - } + for (auto& s: listeners_) { + for (auto& l : s.second.listeners) + if (l.second.thread.joinable()) { + // Close connection to stop listener? + if (l.second.req) + restbed::Http::close(l.second.req); + l.second.thread.join(); + } } } @@ -163,10 +203,11 @@ DhtProxyClient::periodic(const uint8_t*, size_t, const SockAddr&) } void -DhtProxyClient::get(const InfoHash& key, const GetCallback& cb, DoneCallback donecb, const Value::Filter& filterChain) +DhtProxyClient::get(const InfoHash& key, GetCallback cb, DoneCallback donecb, Value::Filter&& f, Where&& w) { restbed::Uri uri(HTTP_PROTO + serverHost_ + "/" + key.toString()); auto req = std::make_shared<restbed::Request>(uri); + Value::Filter filter = w.empty() ? f : f.chain(w.getFilter()); auto finished = std::make_shared<std::atomic_bool>(false); Operation o; @@ -198,7 +239,7 @@ DhtProxyClient::get(const InfoHash& key, const GetCallback& cb, DoneCallback don auto reader = std::unique_ptr<Json::CharReader>(rbuilder.newCharReader()); if (reader->parse(char_data, char_data + body.size(), &json, &err)) { auto value = std::make_shared<Value>(json); - if ((not filterChain or filterChain(*value)) && cb) { + if ((not filter or filter(*value)) && cb) { std::lock_guard<std::mutex> lock(lockCallbacks); callbacks_.emplace_back([cb, value, finished]() { if (not *finished and not cb({value})) @@ -234,15 +275,6 @@ DhtProxyClient::get(const InfoHash& key, const GetCallback& cb, DoneCallback don } } -void -DhtProxyClient::get(const InfoHash& key, GetCallback cb, DoneCallback donecb, - Value::Filter&& filter, Where&& where) -{ - Query query {{}, where}; - auto filterChain = filter.chain(query.where.getFilter()); - get(key, cb, donecb, filterChain); -} - void DhtProxyClient::put(const InfoHash& key, Sp<Value> val, DoneCallback cb, time_point, bool permanent) { @@ -427,61 +459,64 @@ DhtProxyClient::getPublicAddress(sa_family_t family) } size_t -DhtProxyClient::listen(const InfoHash& key, GetCallback cb, Value::Filter filter, Where where) +DhtProxyClient::listen(const InfoHash& key, ValueCallback cb, Value::Filter filter, Where where) { + auto it = listeners_.find(key); + if (it == listeners_.end()) { + it = listeners_.emplace(key, ProxySearch{}).first; + } + auto query = std::make_shared<Query>(Select{}, where); + auto token = it->second.ops.listen(cb, query, filter, [&](Sp<Query> q, ValueCallback vcb){ + return doListen(key, vcb, filter); + }); + return token; +} + +bool +DhtProxyClient::cancelListen(const InfoHash& key, size_t gtoken) { + auto it = listeners_.find(key); + if (it == listeners_.end()) + return false; + return it->second.ops.cancelListen(gtoken, [&](size_t ltoken){ + doCancelListen(key, ltoken); + }); +} + +size_t +DhtProxyClient::doListen(const InfoHash& key, ValueCallback cb, Value::Filter filter/*, Where where*/) { restbed::Uri uri(HTTP_PROTO + serverHost_ + "/" + key.toString()); auto req = std::make_shared<restbed::Request>(uri); req->set_method(deviceKey_.empty() ? "LISTEN" : "SUBSCRIBE"); - Query query {{}, where}; - auto filterChain = filter.chain(query.where.getFilter()); - auto pushNotifToken = std::make_shared<unsigned>(0); - - unsigned callbackId = 0; - { - std::lock_guard<std::mutex> lock(lockCallback_); - callbackId_ += 1; - callbackId = callbackId_; + std::lock_guard<std::mutex> lock(lockListener_); + auto search = listeners_.find(key); + if (search == listeners_.end()) { + std::cerr << "doListen: search not found" << std::endl; + return 0; } - Listener l; - ++listener_token_; - auto isCanceled = std::make_shared<bool>(false); - - // Wrap cb in another callback to avoid value duplication. - auto vals = std::make_shared<std::map<Value::Id, Sp<Value>>>(); - auto gcb = [=](const std::vector<Sp<Value>>& values) { - std::vector<Sp<Value>> newvals; - for (const auto& v : values) { - auto it = vals->find(v->id); - if (it == vals->cend() || !(*it->second == *v)) - newvals.push_back(v); - } - if (!newvals.empty()) { - if (not *isCanceled and !cb(newvals)) { - *isCanceled = true; - cancelListen(key, listener_token_); - return false; - } - for (const auto& v : newvals) { - auto it = vals->emplace(v->id, v); - if (not it.second) - it.first->second = v; - } + auto token = ++listener_token_; + auto callbackId = ++callbackId_; + auto l = search->second.listeners.emplace(token, Listener{ + ValueCache(cb), req, std::move(filter), callbackId + }).first; + + ValueCache& cache = l->second.cache; + l->second.cb = [this,&cache](const std::vector<Sp<Value>>& values, bool expired) { + const std::vector<Sp<Value>> new_values_empty; + std::vector<Value::Id> expired_ids; + if (expired) { + expired_ids.reserve(values.size()); + for (const auto& v : values) + expired_ids.emplace_back(v->id); } + cache.onValues(expired ? new_values_empty : values, std::vector<Value::Id>{}, expired_ids, types, scheduler.time()); return true; }; - - l.key = key.toString(); - l.token = listener_token_; - l.req = req; - l.cb = gcb; - l.callbackId = callbackId; - l.pushNotifToken = pushNotifToken; - l.filterChain = std::move(filterChain); - l.isCanceledViaClose = isCanceled; - std::weak_ptr<bool> isCanceledViaClose(l.isCanceledViaClose); - l.thread = std::thread([=]() + std::weak_ptr<bool> isCanceledViaClose(l->second.isCanceledViaClose); + auto pushNotifToken = std::make_shared<unsigned>(0); + l->second.pushNotifToken = pushNotifToken; + l->second.thread = std::thread([=]() { auto settings = std::make_shared<restbed::Settings>(); if (deviceKey_.empty()) { @@ -499,7 +534,7 @@ DhtProxyClient::listen(const InfoHash& key, GetCallback cb, Value::Filter filter }; auto state = std::make_shared<State>(); restbed::Http::async(req, - [this, filterChain, gcb, pushNotifToken, state](const std::shared_ptr<restbed::Request>& req, + [this, filter, cb, pushNotifToken, state](const std::shared_ptr<restbed::Request>& req, const std::shared_ptr<restbed::Response>& reply) { auto code = reply->get_status_code(); if (code == 200) { @@ -534,10 +569,10 @@ DhtProxyClient::listen(const InfoHash& key, GetCallback cb, Value::Filter filter auto* char_data = reinterpret_cast<const char*>(&body[0]); if (reader->parse(char_data, char_data + body.size(), &json, &err)) { auto value = std::make_shared<Value>(json); - if (not filterChain or filterChain(*value)) { + if (not filter or filter(*value)) { std::lock_guard<std::mutex> lock(lockCallbacks); - callbacks_.emplace_back([gcb, value, state]() { - if (not state->cancel and not gcb({value})) + callbacks_.emplace_back([cb, value, state]() { + if (not state->cancel and not cb({value}, false)) state->cancel = true; }); loopSignal_(); @@ -560,65 +595,63 @@ DhtProxyClient::listen(const InfoHash& key, GetCallback cb, Value::Filter filter } } ); - { - std::lock_guard<std::mutex> lock(lockListener_); - listeners_.emplace_back(std::move(l)); - } - return listener_token_; + return token; } bool -DhtProxyClient::cancelListen(const InfoHash&, size_t token) +DhtProxyClient::doCancelListen(const InfoHash& key, size_t ltoken) { std::lock_guard<std::mutex> lock(lockListener_); - for (auto it = listeners_.begin(); it != listeners_.end(); ++it) { - auto& listener = *it; - if (listener.token == token) { - if (!deviceKey_.empty()) { - // First, be sure to have a token - if (listener.thread.joinable()) { - listener.thread.join(); - } - // UNSUBSCRIBE - restbed::Uri uri(HTTP_PROTO + serverHost_ + "/" + listener.key); - auto req = std::make_shared<restbed::Request>(uri); - req->set_method("UNSUBSCRIBE"); - // fill request body - Json::Value body; - body["key"] = deviceKey_; - body["client_id"] = pushClientId_; - body["token"] = std::to_string(token); - body["callback_id"] = listener.callbackId; - Json::StreamWriterBuilder wbuilder; - wbuilder["commentStyle"] = "None"; - wbuilder["indentation"] = ""; - auto content = Json::writeString(wbuilder, body) + "\n"; - std::replace(content.begin(), content.end(), '\n', ' '); - req->set_body(content); - req->set_header("Content-Length", std::to_string(content.size())); - restbed::Http::async(req, - [](const std::shared_ptr<restbed::Request>&, - const std::shared_ptr<restbed::Response>&){} - ); - // And remove - listeners_.erase(it); - return true; - } else { - // Just stop the request - if (listener.thread.joinable()) { - // Close connection to stop listener? - *(listener.isCanceledViaClose) = true; - if (listener.req) - restbed::Http::close(listener.req); - listener.thread.join(); - listeners_.erase(it); - return true; - } - } + auto search = listeners_.find(key); + if (search == listeners_.end()) + return false; + + auto it = search->second.listeners.find(ltoken); + if (it == search->second.listeners.end()) + return false; + + auto& listener = it->second; + if (!deviceKey_.empty()) { + // First, be sure to have a token + if (listener.thread.joinable()) { + listener.thread.join(); + } + // UNSUBSCRIBE + restbed::Uri uri(HTTP_PROTO + serverHost_ + "/" + key.toString()); + auto req = std::make_shared<restbed::Request>(uri); + req->set_method("UNSUBSCRIBE"); + // fill request body + Json::Value body; + body["key"] = deviceKey_; + body["client_id"] = pushClientId_; + body["token"] = std::to_string(ltoken); + body["callback_id"] = listener.callbackId; + Json::StreamWriterBuilder wbuilder; + wbuilder["commentStyle"] = "None"; + wbuilder["indentation"] = ""; + auto content = Json::writeString(wbuilder, body) + "\n"; + std::replace(content.begin(), content.end(), '\n', ' '); + req->set_body(content); + req->set_header("Content-Length", std::to_string(content.size())); + + restbed::Http::async(req, [](const std::shared_ptr<restbed::Request>&, const std::shared_ptr<restbed::Response>&){}); + } else { + // Just stop the request + if (listener.thread.joinable()) { + // Close connection to stop listener? + *(listener.isCanceledViaClose) = true; + if (listener.req) + restbed::Http::close(listener.req); + listener.thread.join(); } } - return false; + search->second.listeners.erase(it); + if (search->second.listeners.empty()) { + listeners_.erase(search); + } + + return true; } void @@ -644,17 +677,19 @@ void DhtProxyClient::restartListeners() { std::lock_guard<std::mutex> lock(lockListener_); - for (auto& listener: listeners_) { + for (auto& search: listeners_) { + for (auto& l: search.second.listeners) { + auto& listener = l.second; if (listener.thread.joinable()) listener.thread.join(); // Redo listen - auto filterChain = listener.filterChain; + auto filter = listener.filter; auto cb = listener.cb; - restbed::Uri uri(HTTP_PROTO + serverHost_ + "/" + listener.key); + restbed::Uri uri(HTTP_PROTO + serverHost_ + "/" + search.first.toString()); auto req = std::make_shared<restbed::Request>(uri); req->set_method("LISTEN"); listener.req = req; - listener.thread = std::thread([this, filterChain, cb, req]() + listener.thread = std::thread([this, filter, cb, req]() { auto settings = std::make_shared<restbed::Settings>(); std::chrono::milliseconds timeout(std::numeric_limits<int>::max()); @@ -662,7 +697,7 @@ DhtProxyClient::restartListeners() auto ok = std::make_shared<std::atomic_bool>(true); restbed::Http::async(req, - [this, filterChain, cb, ok](const std::shared_ptr<restbed::Request>& req, + [this, filter, cb, ok](const std::shared_ptr<restbed::Request>& req, const std::shared_ptr<restbed::Response>& reply) { auto code = reply->get_status_code(); @@ -681,13 +716,13 @@ DhtProxyClient::restartListeners() auto reader = std::unique_ptr<Json::CharReader>(rbuilder.newCharReader()); if (reader->parse(char_data, char_data + body.size(), &json, &err)) { auto value = std::make_shared<Value>(json); - if ((not filterChain or filterChain(*value)) && cb) { + if ((not filter or filter(*value)) && cb) { auto okCb = std::make_shared<std::promise<bool>>(); auto futureCb = okCb->get_future(); { std::lock_guard<std::mutex> lock(lockCallbacks); callbacks_.emplace_back([cb, value, okCb](){ - okCb->set_value(cb({value})); + okCb->set_value(cb({value}, false)); }); loopSignal_(); } @@ -709,6 +744,7 @@ DhtProxyClient::restartListeners() } ); } + } } void @@ -717,78 +753,81 @@ DhtProxyClient::pushNotificationReceived(const std::map<std::string, std::string #if OPENDHT_PUSH_NOTIFICATIONS try { auto token = std::stoul(notification.at("token")); - for (const auto& listener: listeners_) { - if (*(listener.pushNotifToken) != token) - continue; - if (notification.find("timeout") == notification.cend()) { - // Wake up daemon and get values - get(InfoHash(listener.key), listener.cb, {}, listener.filterChain); - } else { - // A timeout has occured, we need to relaunch the listener - resubscribe(token); + for (auto& search: listeners_) { + for (auto& list : search.second.listeners) { + auto& listener = list.second; + if (*listener.pushNotifToken!= token) + continue; + if (notification.find("timeout") == notification.cend()) { + // Wake up daemon and get values + auto cb = listener.cb; + auto filter = listener.filter; + get(search.first, [cb](const std::vector<Sp<Value>>& vals){ + cb(vals, false); + return true; + }, DoneCallbackSimple{}, std::move(filter)); + } else { + // A timeout has occured, we need to relaunch the listener + resubscribe(search.first, listener); + } } - } - } catch (...) { - + } catch (const std::exception& e) { + std::cerr << "pushNotificationReceived: error " << e.what() << std::endl; } #endif } void -DhtProxyClient::resubscribe(const unsigned token) +DhtProxyClient::resubscribe(const InfoHash& key, Listener& listener) { #if OPENDHT_PUSH_NOTIFICATIONS if (deviceKey_.empty()) return; - for (auto& listener: listeners_) { - if (*(listener.pushNotifToken) == token) { - // Subscribe - restbed::Uri uri(HTTP_PROTO + serverHost_ + "/" + listener.key); - auto req = std::make_shared<restbed::Request>(uri); - req->set_method("SUBSCRIBE"); - - auto pushNotifToken = std::make_shared<unsigned>(0); - - if (listener.thread.joinable()) - listener.thread.join(); - listener.req = req; - listener.pushNotifToken = pushNotifToken; - auto callbackId = listener.callbackId; - listener.thread = std::thread([=]() - { - fillBodyToGetToken(req, callbackId); - auto settings = std::make_shared<restbed::Settings>(); - auto ok = std::make_shared<std::atomic_bool>(true); - restbed::Http::async(req, - [this, pushNotifToken, ok](const std::shared_ptr<restbed::Request>&, - const std::shared_ptr<restbed::Response>& reply) { - auto code = reply->get_status_code(); - if (code == 200) { - try { - restbed::Http::fetch("\n", reply); - std::string body; - reply->get_body(body); + // Subscribe + restbed::Uri uri(HTTP_PROTO + serverHost_ + "/" + key.toString()); + auto req = std::make_shared<restbed::Request>(uri); + req->set_method("SUBSCRIBE"); - std::string err; - Json::Value json; - Json::CharReaderBuilder rbuilder; - auto* char_data = reinterpret_cast<const char*>(&body[0]); - auto reader = std::unique_ptr<Json::CharReader>(rbuilder.newCharReader()); - if (reader->parse(char_data, char_data + body.size(), &json, &err)) { - if (!json.isMember("token")) return; - *pushNotifToken = json["token"].asLargestUInt(); - } - } catch (std::runtime_error&) { - // NOTE: Http::close() can occurs here. Ignore this. - } - } else { - *ok = false; + auto pushNotifToken = std::make_shared<unsigned>(0); + + if (listener.thread.joinable()) + listener.thread.join(); + listener.req = req; + listener.pushNotifToken = pushNotifToken; + auto callbackId = listener.callbackId; + listener.thread = std::thread([=]() + { + fillBodyToGetToken(req, callbackId); + auto settings = std::make_shared<restbed::Settings>(); + auto ok = std::make_shared<std::atomic_bool>(true); + restbed::Http::async(req, + [this, pushNotifToken, ok](const std::shared_ptr<restbed::Request>&, + const std::shared_ptr<restbed::Response>& reply) { + auto code = reply->get_status_code(); + if (code == 200) { + try { + restbed::Http::fetch("\n", reply); + std::string body; + reply->get_body(body); + + std::string err; + Json::Value json; + Json::CharReaderBuilder rbuilder; + auto* char_data = reinterpret_cast<const char*>(&body[0]); + auto reader = std::unique_ptr<Json::CharReader>(rbuilder.newCharReader()); + if (reader->parse(char_data, char_data + body.size(), &json, &err)) { + if (!json.isMember("token")) return; + *pushNotifToken = json["token"].asLargestUInt(); } - }, settings).get(); - if (!ok) opFailed(); - }); - } - } + } catch (std::runtime_error&) { + // NOTE: Http::close() can occurs here. Ignore this. + } + } else { + *ok = false; + } + }, settings).get(); + if (!ok) opFailed(); + }); #endif } diff --git a/src/dhtrunner.cpp b/src/dhtrunner.cpp index 5f8ac96fc7a127357bf81206db8e54e7c8ca126f..adeb5a06a1afa8b0a4d45767db93017f5c861feb 100644 --- a/src/dhtrunner.cpp +++ b/src/dhtrunner.cpp @@ -43,6 +43,15 @@ namespace dht { constexpr std::chrono::seconds DhtRunner::BOOTSTRAP_PERIOD; +struct DhtRunner::Listener { + size_t tokenClassicDht; + size_t tokenProxyDht; + ValueCallback gcb; + InfoHash hash; + Value::Filter f; + Where w; +}; + DhtRunner::DhtRunner() : dht_() #if OPENDHT_PROXY_CLIENT , dht_via_proxy_() @@ -542,7 +551,7 @@ DhtRunner::query(const InfoHash& hash, QueryCallback cb, DoneCallback done_cb, Q } std::future<size_t> -DhtRunner::listen(InfoHash hash, GetCallback vcb, Value::Filter f, Where w) +DhtRunner::listen(InfoHash hash, ValueCallback vcb, Value::Filter f, Where w) { auto ret_token = std::make_shared<std::promise<size_t>>(); { @@ -553,8 +562,8 @@ DhtRunner::listen(InfoHash hash, GetCallback vcb, Value::Filter f, Where w) listener.hash = hash; listener.f = std::move(f); listener.w = std::move(w); - listener.gcb = [hash,vcb,tokenbGlobal,this](const std::vector<Sp<Value>>& vals){ - if (not vcb(vals)) { + listener.gcb = [hash,vcb,tokenbGlobal,this](const std::vector<Sp<Value>>& vals, bool expired){ + if (not vcb(vals, expired)) { #if OPENDHT_PROXY_CLIENT cancelListen(hash, tokenbGlobal); #endif @@ -952,13 +961,4 @@ DhtRunner::pushNotificationReceived(const std::map<std::string, std::string>& da #endif } -void -DhtRunner::resubscribe(unsigned token) -{ -#if OPENDHT_PROXY_CLIENT && OPENDHT_PUSH_NOTIFICATIONS - if (dht_via_proxy_) - dht_via_proxy_->resubscribe(token); -#endif -} - } diff --git a/src/listener.h b/src/listener.h index 3ce95cdfc0b89d20429dc43992b7846a91ff1899..f491929e03d2b0bfffbf2a096ce3278183fdb200 100644 --- a/src/listener.h +++ b/src/listener.h @@ -44,7 +44,14 @@ struct Listener { struct LocalListener { Sp<Query> query; Value::Filter filter; - GetCallback get_cb; + ValueCallback get_cb; +}; + + +struct SearchListener { + Sp<Query> query; + Value::Filter filter; + ValueCallback get_cb; }; } diff --git a/src/network_engine.cpp b/src/network_engine.cpp index c1ff5a12c27330d89630b2743a154f538380ed4b..9fbc806aeeefa2f4eea6dd8c3c8f44899f277dbe 100644 --- a/src/network_engine.cpp +++ b/src/network_engine.cpp @@ -122,9 +122,22 @@ serializeValues(const std::vector<Sp<Value>>& st) return svals; } +void +packToken(msgpack::packer<msgpack::sbuffer>& pk, const Blob& token) +{ + pk.pack_bin(token.size()); + pk.pack_bin_body((char*)token.data(), token.size()); +} + RequestAnswer::RequestAnswer(ParsedMessage&& msg) - : ntoken(std::move(msg.token)), values(std::move(msg.values)), fields(std::move(msg.fields)), - nodes4(std::move(msg.nodes4)), nodes6(std::move(msg.nodes6)) {} + : ntoken(std::move(msg.token)), + values(std::move(msg.values)), + refreshed_values(std::move(msg.refreshed_values)), + expired_values(std::move(msg.expired_values)), + fields(std::move(msg.fields)), + nodes4(std::move(msg.nodes4)), + nodes6(std::move(msg.nodes6)) +{} NetworkEngine::NetworkEngine(Logger& log, Scheduler& scheduler, const int& s, const int& s6) : myid(zeroes), DHT_LOG(log), scheduler(scheduler), dht_socket(s), dht_socket6(s6) @@ -171,6 +184,67 @@ NetworkEngine::tellListener(Sp<Node> node, Tid socket_id, const InfoHash& hash, } } +void +NetworkEngine::tellListenerRefreshed(Sp<Node> n, Tid socket_id, const InfoHash& hash, const Blob& token, const std::vector<Value::Id>& values) +{ + msgpack::sbuffer buffer; + msgpack::packer<msgpack::sbuffer> pk(&buffer); + pk.pack_map(4+(network?1:0)); + + pk.pack(std::string("u")); + pk.pack_map(1 + (not values.empty()?1:0) + (not token.empty()?1:0)); + pk.pack(std::string("id")); pk.pack(myid); + if (not token.empty()) { + pk.pack(std::string("token")); packToken(pk, token); + } + if (not values.empty()) { + pk.pack(std::string("re")); + pk.pack(values); + DHT_LOG.d(n->id, "[node %s] sending %zu refreshed values", n->toString().c_str(), values.size()); + } + + pk.pack(std::string("t")); pk.pack(socket_id); + pk.pack(std::string("y")); pk.pack(std::string("r")); + pk.pack(std::string("v")); pk.pack(my_v); + if (network) { + pk.pack(std::string("n")); pk.pack(network); + } + + // send response + send(buffer.data(), buffer.size(), 0, n->getAddr()); +} + +void +NetworkEngine::tellListenerExpired(Sp<Node> n, Tid socket_id, const InfoHash& hash, const Blob& token, const std::vector<Value::Id>& values) +{ + msgpack::sbuffer buffer; + msgpack::packer<msgpack::sbuffer> pk(&buffer); + pk.pack_map(4+(network?1:0)); + + pk.pack(std::string("u")); + pk.pack_map(1 + (not values.empty()?1:0) + (not token.empty()?1:0)); + pk.pack(std::string("id")); pk.pack(myid); + if (not token.empty()) { + pk.pack(std::string("token")); packToken(pk, token); + } + if (not values.empty()) { + pk.pack(std::string("exp")); + pk.pack(values); + DHT_LOG.d(n->id, "[node %s] sending %zu expired values", n->toString().c_str(), values.size()); + } + + pk.pack(std::string("t")); pk.pack(socket_id); + pk.pack(std::string("y")); pk.pack(std::string("r")); + pk.pack(std::string("v")); pk.pack(my_v); + if (network) { + pk.pack(std::string("n")); pk.pack(network); + } + + // send response + send(buffer.data(), buffer.size(), 0, n->getAddr()); +} + + bool NetworkEngine::isRunning(sa_family_t af) const { @@ -337,7 +411,7 @@ NetworkEngine::processMessage(const uint8_t *buf, size_t buflen, const SockAddr& msgpack::unpacked msg_res = msgpack::unpack((const char*)buf, buflen); msg->msgpack_unpack(msg_res.get()); } catch (const std::exception& e) { - DHT_LOG.w("Can't process message of size %lu: %s", buflen, e.what()); + DHT_LOG.w("Can't parse message of size %lu: %s", buflen, e.what()); DHT_LOG.DEBUG.logPrintable(buf, buflen); return; } @@ -550,13 +624,6 @@ NetworkEngine::process(std::unique_ptr<ParsedMessage>&& msg, const SockAddr& fro } } -void -packToken(msgpack::packer<msgpack::sbuffer>& pk, const Blob& token) -{ - pk.pack_bin(token.size()); - pk.pack_bin_body((char*)token.data(), token.size()); -} - void insertAddr(msgpack::packer<msgpack::sbuffer>& pk, const SockAddr& addr) { diff --git a/src/op_cache.h b/src/op_cache.h new file mode 100644 index 0000000000000000000000000000000000000000..e2f26008723f444dc516e2850c471078b08afb2e --- /dev/null +++ b/src/op_cache.h @@ -0,0 +1,236 @@ +/* + * Copyright (C) 2018 Savoir-faire Linux Inc. + * Author(s) : Adrien Béraud <adrien.beraud@savoirfairelinux.com> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <https://www.gnu.org/licenses/>. + */ +#pragma once + +#include "value.h" +#include "value_cache.h" +#include "listener.h" + +namespace dht { + +struct OpCacheValueStorage +{ + Sp<Value> data {}; + unsigned refCount {1}; + OpCacheValueStorage(Sp<Value> val = {}) : data(val) {} +}; + +class OpValueCache { +public: + OpValueCache(ValueCallback&& cb) : callback(std::forward<ValueCallback>(cb)) {} + + static ValueCallback cacheCallback(ValueCallback&& cb) { + auto cache = std::make_shared<OpValueCache>(std::forward<ValueCallback>(cb)); + return [cache](const std::vector<Sp<Value>>& vals, bool expired){ + return cache->onValue(vals, expired); + }; + } + + bool onValue(const std::vector<Sp<Value>>& vals, bool expired) { + if (expired) + return onValuesExpired(vals); + else + return onValuesAdded(vals); + } + + bool onValuesAdded(const std::vector<Sp<Value>>& vals) { + std::vector<Sp<Value>> newValues; + for (const auto& v : vals) { + auto viop = values.emplace(v->id, OpCacheValueStorage{v}); + if (viop.second) { + newValues.emplace_back(v); + //std::cout << "onValuesAdded: new value " << v->id << std::endl; + } else { + viop.first->second.refCount++; + //std::cout << "onValuesAdded: " << viop.first->second.refCount << " refs for value " << v->id << std::endl; + } + } + return callback(newValues, false); + } + bool onValuesExpired(const std::vector<Sp<Value>>& vals) { + std::vector<Sp<Value>> expiredValues; + for (const auto& v : vals) { + auto vit = values.find(v->id); + if (vit != values.end()) { + vit->second.refCount--; + //std::cout << "onValuesExpired: " << vit->second.refCount << " refs remaining for value " << v->id << std::endl; + if (not vit->second.refCount) + values.erase(vit); + } + } + return callback(expiredValues, true); + } +private: + std::map<Value::Id, OpCacheValueStorage> values {}; + ValueCallback callback; +}; + +class OpCache { +public: + bool onValue(const std::vector<Sp<Value>>& vals, bool expired) { + if (expired) + onValuesExpired(vals); + else + onValuesAdded(vals); + return not listeners.empty(); + } + + void onValuesAdded(const std::vector<Sp<Value>>& vals) { + std::vector<Sp<Value>> newValues; + for (const auto& v : vals) { + auto viop = values.emplace(v->id, OpCacheValueStorage{v}); + if (viop.second) { + newValues.emplace_back(v); + //std::cout << "onValuesAdded: new value " << v->id << std::endl; + } else { + viop.first->second.refCount++; + //std::cout << "onValuesAdded: " << viop.first->second.refCount << " refs for value " << v->id << std::endl; + } + } + auto list = listeners; + for (auto& l : list) + l.second.get_cb(l.second.filter.filter(newValues), false); + } + void onValuesExpired(const std::vector<Sp<Value>>& vals) { + std::vector<Sp<Value>> expiredValues; + for (const auto& v : vals) { + auto vit = values.find(v->id); + if (vit != values.end()) { + vit->second.refCount--; + //std::cout << "onValuesExpired: " << vit->second.refCount << " refs remaining for value " << v->id << std::endl; + if (not vit->second.refCount) + values.erase(vit); + } + } + auto list = listeners; + for (auto& l : list) + l.second.get_cb(l.second.filter.filter(expiredValues), true); + } + + void addListener(size_t token, ValueCallback cb, Sp<Query> q, Value::Filter filter) { + listeners.emplace(token, LocalListener{q, filter, cb}); + std::vector<Sp<Value>> newValues; + newValues.reserve(values.size()); + for (const auto& v : values) + newValues.emplace_back(v.second.data); + cb(newValues, false); + } + + bool removeListener(size_t token) { + return listeners.erase(token) > 0; + } + + bool isDone() { + return listeners.empty(); + } + + std::vector<Sp<Value>> get(Value::Filter& filter) const { + std::vector<Sp<Value>> ret; + if (not filter) + ret.reserve(values.size()); + for (const auto& v : values) + if (not filter or filter(*v.second.data)) + ret.emplace_back(v.second.data); + return ret; + } + + Sp<Value> get(Value::Id id) const { + auto v = values.find(id); + if (v == values.end()) + return {}; + return v->second.data; + } + + size_t searchToken; +private: + std::map<size_t, LocalListener> listeners; + std::map<Value::Id, OpCacheValueStorage> values; +}; + +class SearchCache { +public: + size_t listen(ValueCallback get_cb, Sp<Query> q, Value::Filter filter, std::function<size_t(Sp<Query>, ValueCallback)> onListen) { + // find exact match + auto op = ops.find(q); + if (op == ops.end()) { + // find satisfying query + for (auto it = ops.begin(); it != ops.end(); it++) { + if (q->isSatisfiedBy(*it->first)) { + op = it; + break; + } + } + } + if (op == ops.end()) { + // New query + op = ops.emplace(q, OpCache{}).first; + auto& cache = op->second; + cache.searchToken = onListen(q, [&](const std::vector<Sp<Value>>& values, bool expired){ + return cache.onValue(values, expired); + }); + } + auto token = nextToken_++; + if (token == 0) + token++; + op->second.addListener(token, get_cb, q, filter); + return token; + } + + bool cancelListen(size_t gtoken, std::function<void(size_t)> onCancel) { + for (auto it = ops.begin(); it != ops.end(); it++) { + if (it->second.removeListener(gtoken)) { + if (it->second.isDone()) { + auto ltoken = it->second.searchToken; + ops.erase(it); + onCancel(ltoken); + } + return true; + } + } + return false; + } + + std::vector<Sp<Value>> get(Value::Filter& filter) const { + if (ops.size() == 1) + return ops.begin()->second.get(filter); + std::map<Value::Id, Sp<Value>> c; + for (const auto& op : ops) { + for (const auto& v : op.second.get(filter)) + c.emplace(v->id, v); + } + std::vector<Sp<Value>> ret; + ret.reserve(c.size()); + for (auto& v : c) + ret.emplace_back(std::move(v.second)); + return ret; + } + + Sp<Value> get(Value::Id id) const { + for (const auto& op : ops) + if (auto v = op.second.get(id)) + return v; + return {}; + } + +private: + std::map<Sp<Query>, OpCache> ops; + size_t nextToken_ {1}; +}; + + +} diff --git a/src/parsed_message.h b/src/parsed_message.h index c4ac56cfd75f5755b3ecbe26930fe71065811663..37cefa3c66980c74161c2f0b22c416c5e5814d78 100644 --- a/src/parsed_message.h +++ b/src/parsed_message.h @@ -62,6 +62,8 @@ struct ParsedMessage { std::vector<Sp<Node>> nodes4, nodes6; /* values to store or retreive request */ std::vector<Sp<Value>> values; + std::vector<Value::Id> refreshed_values {}; + std::vector<Value::Id> expired_values {}; /* index for fields values */ std::vector<Sp<FieldValueIndex>> fields; /** When part of the message header: {index -> (total size, {})} @@ -282,6 +284,10 @@ ParsedMessage::msgpack_unpack(msgpack::object msg) } else { throw msgpack::type_error(); } + } else if (auto raw_fields = findMapValue(req, "exp")) { + expired_values = raw_fields->as<decltype(expired_values)>(); + } else if (auto raw_fields = findMapValue(req, "re")) { + refreshed_values = raw_fields->as<decltype(refreshed_values)>(); } if (auto w = findMapValue(req, "w")) { diff --git a/src/search.h b/src/search.h index b33dfdb5fea2657e024a16a0a40a2744144f0e4a..9a492eb4d9cac4837dc3e7c2f3655aadfa14e4ea 100644 --- a/src/search.h +++ b/src/search.h @@ -21,6 +21,8 @@ #include "value.h" #include "request.h" #include "listener.h" +#include "value_cache.h" +#include "op_cache.h" namespace dht { @@ -59,6 +61,16 @@ struct Dht::SearchNode { */ using SyncStatus = std::map<Sp<Query>, Sp<net::Request>>; + struct CachedListenStatus { + ValueCache cache; + Sp<net::Request> req {}; + CachedListenStatus(ValueStateCallback&& cb) : cache(std::forward<ValueStateCallback>(cb)) {} + CachedListenStatus(CachedListenStatus&&) = default; + CachedListenStatus(const CachedListenStatus&) = delete; + CachedListenStatus& operator=(const CachedListenStatus&) = delete; + }; + using NodeListenerStatus = std::map<Sp<Query>, CachedListenStatus>; + Sp<Node> node {}; /* the node info */ /* queries sent for finding out values hosted by the node */ @@ -67,7 +79,7 @@ struct Dht::SearchNode { std::map<Sp<Query>, std::vector<Sp<Query>>> pagination_queries {}; SyncStatus getStatus {}; /* get/sync status */ - SyncStatus listenStatus {}; /* listen status */ + NodeListenerStatus listenStatus {}; /* listen status */ AnnounceStatus acked {}; /* announcement status for a given value id */ Blob token {}; /* last token the node sent to us after a get request */ @@ -201,6 +213,16 @@ struct Dht::SearchNode { getStatus.clear(); } + void onValues(const Sp<Query>& q, net::RequestAnswer&& answer, const TypeStore& types, const time_point& now) + { + auto l = listenStatus.find(q); + if (l != listenStatus.end()) { + l->second.cache.onValues(answer.values, + answer.refreshed_values, + answer.expired_values, types, now); + } + } + /** * Tells if a request in the status map is expired. * @@ -228,6 +250,12 @@ struct Dht::SearchNode { return r.second and r.second->pending(); }) != status.end(); } + static bool pending(const NodeListenerStatus& status) { + return std::find_if(status.begin(), status.end(), + [](const NodeListenerStatus::value_type& r){ + return r.second.req and r.second.req->pending(); + }) != status.end(); + } bool pendingGet() const { return pending(getStatus); } @@ -263,20 +291,20 @@ struct Dht::SearchNode { else return isListening(now, ls); } - bool isListening(time_point now, SyncStatus::const_iterator listen_status) const { + bool isListening(time_point now, NodeListenerStatus::const_iterator listen_status) const { if (listen_status == listenStatus.end()) return false; - return listen_status->second->reply_time + LISTEN_EXPIRE_TIME > now; + return listen_status->second.req->reply_time + LISTEN_EXPIRE_TIME > now; } void cancelListen() { for (const auto& status : listenStatus) - node->cancelRequest(status.second); + node->cancelRequest(status.second.req); listenStatus.clear(); } void cancelListen(const Sp<Query>& query) { auto it = listenStatus.find(query); if (it != listenStatus.end()) { - node->cancelRequest(it->second); + node->cancelRequest(it->second.req); listenStatus.erase(it); } } @@ -302,10 +330,10 @@ struct Dht::SearchNode { */ time_point getListenTime(const Sp<Query>& q) const { auto listen_status = listenStatus.find(q); - if (listen_status == listenStatus.end() or not listen_status->second) + if (listen_status == listenStatus.end() or not listen_status->second.req) return time_point::min(); - return listen_status->second->pending() ? time_point::max() : - listen_status->second->reply_time + LISTEN_EXPIRE_TIME - REANNOUNCE_MARGIN; + return listen_status->second.req->pending() ? time_point::max() : + listen_status->second.req->reply_time + LISTEN_EXPIRE_TIME - REANNOUNCE_MARGIN; } /** @@ -340,9 +368,12 @@ struct Dht::Search { std::multimap<time_point, Get> callbacks {}; /* listeners */ - std::map<size_t, LocalListener> listeners {}; + std::map<size_t, SearchListener> listeners {}; size_t listener_token = 1; + /* Cache */ + SearchCache cache; + ~Search() { for (auto& get : callbacks) { get.second.done_cb(false, {}); @@ -431,19 +462,32 @@ struct Dht::Search { bool isAnnounced(Value::Id id) const; bool isListening(time_point now) const; + size_t listen(ValueCallback cb, Value::Filter f, const Sp<Query>& q, Scheduler& scheduler) { + //DHT_LOG.e(id, "[search %s IPv%c] listen", id.toString().c_str(), (af == AF_INET) ? '4' : '6'); + return cache.listen(cb, q, f, [&](const Sp<Query>& q, ValueCallback vcb){ + done = false; + auto token = ++listener_token; + listeners.emplace(token, SearchListener{q, f, vcb}); + scheduler.edit(nextSearchStep, scheduler.time()); + return token; + }); + } + void cancelListen(size_t token) { - Sp<Query> query; - const auto& ll = listeners.find(token); - if (ll != listeners.cend()) { - query = ll->second.query; - listeners.erase(ll); - } - for (auto& sn : nodes) { - if (listeners.empty()) - sn.cancelListen(); - else if (query) - sn.cancelListen(query); - } + cache.cancelListen(token, [&](size_t t){ + Sp<Query> query; + const auto& ll = listeners.find(t); + if (ll != listeners.cend()) { + query = ll->second.query; + listeners.erase(ll); + } + for (auto& sn : nodes) { + if (listeners.empty()) + sn.cancelListen(); + else if (query) + sn.cancelListen(query); + } + }); } /** @@ -732,7 +776,7 @@ Dht::Search::isListening(time_point now) const for (const auto& n : nodes) { if (n.isBad()) continue; - SearchNode::SyncStatus::const_iterator ls {}; + SearchNode::NodeListenerStatus::const_iterator ls {}; for (ls = n.listenStatus.begin(); ls != n.listenStatus.end() ; ++ls) { if (n.isListening(now, ls)) break; diff --git a/src/securedht.cpp b/src/securedht.cpp index 7dd8dc86909002e28fd1a0c3b2e0553bc7ed349f..3b67abdb7e9d47dce3cf6a4c5ccac3784485457f 100644 --- a/src/securedht.cpp +++ b/src/securedht.cpp @@ -221,50 +221,73 @@ SecureDht::findPublicKey(const InfoHash& node, std::function<void(const Sp<const }); } +Sp<Value> +SecureDht::checkValue(const Sp<Value>& v) +{ + // Decrypt encrypted values + if (v->isEncrypted()) { + if (not key_) { +#if OPENDHT_PROXY_SERVER + if (forward_all_) // We are currently a proxy, send messages to clients. + return v; +#endif + return {}; + } + try { + Value decrypted_val (decrypt(*v)); + if (decrypted_val.recipient == getId()) { + nodesPubKeys_[decrypted_val.owner->getId()] = decrypted_val.owner; + return std::make_shared<Value>(std::move(decrypted_val)); + } + // Ignore values belonging to other people + } catch (const std::exception& e) { + DHT_LOG.WARN("Could not decrypt value %s : %s", v->toString().c_str(), e.what()); + } + } + // Check signed values + else if (v->isSigned()) { + if (v->owner and v->owner->checkSignature(v->getToSign(), v->signature)) { + nodesPubKeys_[v->owner->getId()] = v->owner; + return v; + } + else + DHT_LOG.WARN("Signature verification failed for %s", v->toString().c_str()); + } + // Forward normal values + else { + return v; + } + return {}; +} + +ValueCallback +SecureDht::getCallbackFilter(ValueCallback cb, Value::Filter&& filter) +{ + return [=](const std::vector<Sp<Value>>& values, bool expired) { + std::vector<Sp<Value>> tmpvals {}; + for (const auto& v : values) { + if (auto nv = checkValue(v)) + if (not filter or filter(*nv)) + tmpvals.emplace_back(std::move(nv)); + } + if (cb and not tmpvals.empty()) + return cb(tmpvals, expired); + return true; + }; +} + + GetCallback SecureDht::getCallbackFilter(GetCallback cb, Value::Filter&& filter) { return [=](const std::vector<Sp<Value>>& values) { std::vector<Sp<Value>> tmpvals {}; for (const auto& v : values) { - // Decrypt encrypted values - if (v->isEncrypted()) { - if (not key_) { -#if OPENDHT_PROXY_SERVER - if (forward_all_) // We are currently a proxy, send messages to clients. - tmpvals.push_back(v); -#endif - continue; - } - try { - Value decrypted_val (decrypt(*v)); - if (decrypted_val.recipient == getId()) { - nodesPubKeys_[decrypted_val.owner->getId()] = decrypted_val.owner; - if (not filter or filter(decrypted_val)) - tmpvals.push_back(std::make_shared<Value>(std::move(decrypted_val))); - } - // Ignore values belonging to other people - } catch (const std::exception& e) { - DHT_LOG.WARN("Could not decrypt value %s : %s", v->toString().c_str(), e.what()); - } - } - // Check signed values - else if (v->isSigned()) { - if (v->owner and v->owner->checkSignature(v->getToSign(), v->signature)) { - nodesPubKeys_[v->owner->getId()] = v->owner; - if (not filter or filter(*v)) - tmpvals.push_back(v); - } - else - DHT_LOG.WARN("Signature verification failed for %s", v->toString().c_str()); - } - // Forward normal values - else { - if (not filter or filter(*v)) - tmpvals.push_back(v); - } + if (auto nv = checkValue(v)) + if (not filter or filter(*nv)) + tmpvals.emplace_back(std::move(nv)); } - if (cb && not tmpvals.empty()) + if (cb and not tmpvals.empty()) return cb(tmpvals); return true; }; @@ -276,6 +299,13 @@ SecureDht::get(const InfoHash& id, GetCallback cb, DoneCallback donecb, Value::F dht_->get(id, getCallbackFilter(cb, std::forward<Value::Filter>(f)), donecb, {}, std::forward<Where>(w)); } +size_t +SecureDht::listen(const InfoHash& id, ValueCallback cb, Value::Filter f, Where w) +{ + return dht_->listen(id, getCallbackFilter(cb, std::forward<Value::Filter>(f)), {}, std::forward<Where>(w)); +} + + size_t SecureDht::listen(const InfoHash& id, GetCallback cb, Value::Filter f, Where w) { diff --git a/src/storage.h b/src/storage.h index 0d4763bbd2832b47924f61b08dba7cd22635c36a..1d5896579cde5ca05c62178e98910410583e9b36 100644 --- a/src/storage.h +++ b/src/storage.h @@ -167,7 +167,7 @@ struct Storage { StoreDiff remove(const InfoHash& id, Value::Id); - StoreDiff expire(const InfoHash& id, time_point now); + std::pair<ssize_t, std::vector<Sp<Value>>> expire(const InfoHash& id, time_point now); private: Storage(const Storage&) = delete; @@ -245,7 +245,7 @@ Storage::clear() return {-tot_size, -num_values, 0}; } -Storage::StoreDiff +std::pair<ssize_t, std::vector<Sp<Value>>> Storage::expire(const InfoHash& id, time_point now) { // expire listeners @@ -271,16 +271,18 @@ Storage::expire(const InfoHash& id, time_point now) auto r = std::partition(values.begin(), values.end(), [&](const ValueStorage& v) { return v.expiration > now; }); - ssize_t del_num = -std::distance(r, values.end()); + std::vector<Sp<Value>> ret; + ret.reserve(std::distance(r, values.end())); ssize_t size_diff {}; std::for_each(r, values.end(), [&](const ValueStorage& v) { size_diff -= v.data->size(); if (v.store_bucket) v.store_bucket->erase(id, *v.data, v.expiration); + ret.emplace_back(std::move(v.data)); }); total_size += size_diff; values.erase(r, values.end()); - return {size_diff, del_num, del_listen}; + return {size_diff, std::move(ret)}; } } diff --git a/src/value.cpp b/src/value.cpp index 54b96f3ec543d1f646c6f5d9b0563f4713012ed9..a6f66c5d0f7a3db85f049010432058601c462faa 100644 --- a/src/value.cpp +++ b/src/value.cpp @@ -230,7 +230,7 @@ Value::toJson() const if (has_owner) { // isSigned val["seq"] = seq; val["owner"] = owner->toString(); - if (recipient != InfoHash()) + if (recipient) val["to"] = recipient.toString(); } val["type"] = type; diff --git a/src/value_cache.h b/src/value_cache.h new file mode 100644 index 0000000000000000000000000000000000000000..06fe255f2eb61e64156f76730389a270704861c9 --- /dev/null +++ b/src/value_cache.h @@ -0,0 +1,178 @@ +/* + * Copyright (C) 2018 Savoir-faire Linux Inc. + * Author(s) : Adrien Béraud <adrien.beraud@savoirfairelinux.com> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <https://www.gnu.org/licenses/>. + */ +#pragma once + +#include "value.h" + +namespace dht { + +using ValueStateCallback = std::function<void(const std::vector<Sp<Value>>&, bool)>; +using CallbackQueue = std::list<std::function<void()>>; + +class ValueCache { +public: + ValueCache(ValueStateCallback&& cb) { + callbacks.emplace_back(std::forward<ValueStateCallback>(cb)); + } + ValueCache(ValueCache&&) = default; + + ~ValueCache() { + auto q = clear(); + for (auto& cb: q) + cb(); + } + + CallbackQueue clear() { + std::vector<Sp<Value>> expired_values; + expired_values.reserve(values.size()); + for (const auto& v : values) + expired_values.emplace_back(std::move(v.second.data)); + values.clear(); + CallbackQueue ret; + if (not expired_values.empty() and not callbacks.empty()) { + auto cbs = callbacks; + ret.emplace_back([expired_values, cbs]{ + for (auto& cb : cbs) + cb(expired_values, true); + }); + } + return ret; + } + + CallbackQueue expireValues(const time_point& now) { + std::vector<Sp<Value>> expired_values; + for (auto it = values.begin(); it != values.end();) { + if (it->second.expiration < now) { + expired_values.emplace_back(std::move(it->second.data)); + it = values.erase(it); + } else + ++it; + } + while (values.size() >= MAX_VALUES) { + // too many values, remove oldest values + time_point oldest_creation = time_point::max(); + auto oldest_value = values.end(); + for (auto it = values.begin(); it != values.end(); ++it) + if (it->second.created < oldest_creation) { + oldest_value = it; + oldest_creation = it->second.created; + } + if (oldest_value != values.end()) { + expired_values.emplace_back(std::move(oldest_value->second.data)); + values.erase(oldest_value); + } + } + CallbackQueue ret; + if (not expired_values.empty()) { + auto cbs = callbacks; + ret.emplace_back([cbs, expired_values]{ + for (auto& cb : cbs) + if (cb) cb(expired_values, true); + }); + } + return ret; + } + + void onValues + (const std::vector<Sp<Value>>& values, + const std::vector<Value::Id>& refreshed_values, + const std::vector<Value::Id>& expired_values, + const TypeStore& types, const time_point& now) + { + CallbackQueue cbs; + if (not values.empty()) + cbs.splice(cbs.end(), addValues(values, types, now)); + for (const auto& vid : refreshed_values) + refreshValue(vid, types, now); + for (const auto& vid : expired_values) + cbs.splice(cbs.end(), expireValue(vid)); + cbs.splice(cbs.end(), expireValues(now)); + while (not cbs.empty()) { + cbs.front()(); + cbs.pop_front(); + } + } + +private: + // prevent copy + ValueCache(const ValueCache&) = delete; + ValueCache& operator=(const ValueCache&) = delete; + + /* The maximum number of values we store in the cache. */ + static constexpr unsigned MAX_VALUES {1024}; + + struct CacheValueStorage { + Sp<Value> data {}; + time_point created {}; + time_point expiration {}; + + CacheValueStorage() {} + CacheValueStorage(const Sp<Value>& v, time_point t, time_point e) + : data(v), created(t), expiration(e) {} + }; + + std::map<Value::Id, CacheValueStorage> values; + std::vector<ValueStateCallback> callbacks; + + CallbackQueue addValues(const std::vector<Sp<Value>>& new_values, const TypeStore& types, const time_point& now) { + std::vector<Sp<Value>> nvals; + for (const auto& value : new_values) { + auto v = values.find(value->id); + if (v == values.end()) { + nvals.emplace_back(value); + // new value + values.emplace(value->id, CacheValueStorage(value, now, now + types.getType(value->type).expiration)); + } else { + v->second.created = now; + v->second.expiration = now + types.getType(v->second.data->type).expiration; + } + } + auto cbs = callbacks; + CallbackQueue ret; + if (not nvals.empty()) + ret.emplace_back([cbs, nvals]{ + for (auto& cb : cbs) + if (cb) cb(nvals, false); + }); + return ret; + } + CallbackQueue expireValue(Value::Id vid) { + auto v = values.find(vid); + if (v == values.end()) + return {}; + const std::vector<Sp<Value>> val {std::move(v->second.data)}; + values.erase(v); + auto cbs = callbacks; + CallbackQueue ret; + ret.emplace_back([cbs, val]{ + for (auto& cb : cbs) + if (cb) cb(val, true); + }); + return ret; + } + void refreshValue(Value::Id vid, const TypeStore& types, const time_point& now) { + auto v = values.find(vid); + if (v == values.end()) + return; + v->second.created = now; + v->second.expiration = now + types.getType(v->second.data->type).expiration; + } +}; + + +} diff --git a/tools/dhtnode.cpp b/tools/dhtnode.cpp index b5a7640539d6e25e0cda619add4a93294b40357b..777903a18492a09ee51db47ce0b93560885efe02 100644 --- a/tools/dhtnode.cpp +++ b/tools/dhtnode.cpp @@ -232,13 +232,6 @@ void cmd_loop(std::shared_ptr<DhtRunner>& dht, dht_params& params iss >> value; dht->pushNotificationReceived({{"token", value}}); continue; - } else if (op == "re") { - iss >> value; - try { - unsigned token = std::stoul(value); - dht->resubscribe(token); - } catch (...) { } - continue; } #endif // OPENDHT_PUSH_NOTIFICATIONS #endif //OPENDHT_PROXY_CLIENT @@ -254,11 +247,7 @@ void cmd_loop(std::shared_ptr<DhtRunner>& dht, dht_params& params } dht::InfoHash id; - if (op == "cl") { - std::string hash, rem; - iss >> hash >> rem; - dht->cancelListen(dht::InfoHash(hash), std::stoul(rem)); - } + if (false) {} #ifdef OPENDHT_INDEXATION else if (op == "il" or op == "ii") { // Pht syntax @@ -268,7 +257,7 @@ void cmd_loop(std::shared_ptr<DhtRunner>& dht, dht_params& params return i.first == index; }) == indexes.end(); if (not index.size()) { - std::cout << "You must enter the index name." << std::endl; + std::cerr << "You must enter the index name." << std::endl; continue; } else if (new_index) { using namespace dht::indexation; @@ -330,14 +319,27 @@ void cmd_loop(std::shared_ptr<DhtRunner>& dht, dht_params& params else if (op == "l") { std::string rem; std::getline(iss, rem); - auto token = dht->listen(id, [](std::shared_ptr<Value> value) { - std::cout << "Listen: found value:" << std::endl; - std::cout << "\t" << *value << std::endl; + auto token = dht->listen(id, [](const std::vector<std::shared_ptr<Value>>& values, bool expired) { + std::cout << "Listen: found " << values.size() << " values" << (expired ? " expired" : "") << std::endl; + for (const auto& value : values) + std::cout << "\t" << *value << std::endl; return true; }, {}, dht::Where {std::move(rem)}); auto t = token.get(); std::cout << "Listening, token: " << t << std::endl; } + if (op == "cl") { + std::string rem; + iss >> rem; + size_t token; + try { + token = std::stoul(rem); + } catch(...) { + std::cerr << "Syntax: cl [key] [token]" << std::endl; + continue; + } + dht->cancelListen(id, token); + } else if (op == "p") { std::string v; iss >> v;