diff --git a/include/opendht/dht.h b/include/opendht/dht.h index d8b6b7e8d473537f9e9ec8969505e1418929a16c..4fb8cbb6a4a51e2276cd65b624f174a2182972ac 100644 --- a/include/opendht/dht.h +++ b/include/opendht/dht.h @@ -163,7 +163,7 @@ public: /** * Get locally stored data for the given hash. */ - std::vector<Sp<Value>> getLocal(const InfoHash& key, Value::Filter f = {}) const; + std::vector<Sp<Value>> getLocal(const InfoHash& key, const Value::Filter& f = {}) const; /** * Get locally stored data for the given key and value id. @@ -210,12 +210,12 @@ public: /** * Get data currently being put at the given hash. */ - std::vector<Sp<Value>> getPut(const InfoHash&); + std::vector<Sp<Value>> getPut(const InfoHash&) const; /** * Get data currently being put at the given hash with the given id. */ - Sp<Value> getPut(const InfoHash&, const Value::Id&); + Sp<Value> getPut(const InfoHash&, const Value::Id&) const; /** * Stop any put/announce operation at the given location, diff --git a/include/opendht/dht_interface.h b/include/opendht/dht_interface.h index 41c463c05246bee1a3eeb6e752d5071aaa00ec80..7d1694bee7987abd9ca00ea96f9fbc2bea79f607 100644 --- a/include/opendht/dht_interface.h +++ b/include/opendht/dht_interface.h @@ -106,7 +106,7 @@ public: /** * Get locally stored data for the given hash. */ - virtual std::vector<Sp<Value>> getLocal(const InfoHash& key, Value::Filter f = {}) const = 0; + virtual std::vector<Sp<Value>> getLocal(const InfoHash& key, const Value::Filter& f = {}) const = 0; /** * Get locally stored data for the given key and value id. @@ -143,12 +143,12 @@ public: /** * Get data currently being put at the given hash. */ - virtual std::vector<Sp<Value>> getPut(const InfoHash&) = 0; + virtual std::vector<Sp<Value>> getPut(const InfoHash&) const = 0; /** * Get data currently being put at the given hash with the given id. */ - virtual Sp<Value> getPut(const InfoHash&, const Value::Id&) = 0; + virtual Sp<Value> getPut(const InfoHash&, const Value::Id&) const = 0; /** * Stop any put/announce operation at the given location, diff --git a/include/opendht/dht_proxy_client.h b/include/opendht/dht_proxy_client.h index 07f5cac609d8ac01c71fa325f086b5119ceefa5e..aacc2fca6c21b12fe343b509c8421ff1ce145de6 100644 --- a/include/opendht/dht_proxy_client.h +++ b/include/opendht/dht_proxy_client.h @@ -202,12 +202,12 @@ public: /** * Get data currently being put at the given hash. */ - std::vector<Sp<Value>> getPut(const InfoHash&); + std::vector<Sp<Value>> getPut(const InfoHash&) const; /** * Get data currently being put at the given hash with the given id. */ - Sp<Value> getPut(const InfoHash&, const Value::Id&); + Sp<Value> getPut(const InfoHash&, const Value::Id&) const; /** * Stop any put/announce operation at the given location, @@ -224,7 +224,7 @@ public: return types.getType(type_id); } - std::vector<Sp<Value>> getLocal(const InfoHash& k, Value::Filter filter) const; + std::vector<Sp<Value>> getLocal(const InfoHash& k, const Value::Filter& filter) const; Sp<Value> getLocalById(const InfoHash& k, Value::Id id) const; /** diff --git a/include/opendht/securedht.h b/include/opendht/securedht.h index 5c6eb0b3b8e636f6655aaf89d03aef469bb58ed2..a0327146998b35dd9918109ef1a18aa4c69aff6d 100644 --- a/include/opendht/securedht.h +++ b/include/opendht/securedht.h @@ -231,7 +231,7 @@ public: void query(const InfoHash& key, QueryCallback cb, DoneCallbackSimple done_cb = {}, Query&& q = {}) override { dht_->query(key, cb, done_cb, std::move(q)); } - std::vector<Sp<Value>> getLocal(const InfoHash& key, Value::Filter f = Value::AllFilter()) const override { + std::vector<Sp<Value>> getLocal(const InfoHash& key, const Value::Filter& f = {}) const override { return dht_->getLocal(key, f); } Sp<Value> getLocalById(const InfoHash& key, Value::Id vid) const override { @@ -270,10 +270,10 @@ public: { dht_->put(key, std::move(v), cb, created, permanent); } - std::vector<Sp<Value>> getPut(const InfoHash& h) override { + std::vector<Sp<Value>> getPut(const InfoHash& h) const override { return dht_->getPut(h); } - Sp<Value> getPut(const InfoHash& h, const Value::Id& vid) override { + Sp<Value> getPut(const InfoHash& h, const Value::Id& vid) const override { return dht_->getPut(h, vid); } bool cancelPut(const InfoHash& h, const Value::Id& vid) override { diff --git a/src/dht.cpp b/src/dht.cpp index 4862a067aaf82c6b8229e0e837305a21ef9f6b8c..87d0544b65d5dc2b4d8c4b2d2af5139d457babd3 100644 --- a/src/dht.cpp +++ b/src/dht.cpp @@ -986,19 +986,15 @@ Dht::get(const InfoHash& id, GetCallback getcb, DoneCallback donecb, Value::Filt { scheduler.syncTime(); - auto q = std::make_shared<Query>(Select {}, std::move(where)); auto op = std::make_shared<GetStatus<std::map<Value::Id, Sp<Value>>>>(); - auto f = filter.chain(q->where.getFilter()); - - auto gcb = [getcb, donecb, op, f](const std::vector<Sp<Value>>& vals) { + auto gcb = [getcb, donecb, op](const std::vector<Sp<Value>>& vals) { auto& o = *op; - return callbackWrapper(getcb, donecb, vals, [&o,&f](const std::vector<Sp<Value>>& values) { + return callbackWrapper(getcb, donecb, vals, [&o](const std::vector<Sp<Value>>& values) { std::vector<Sp<Value>> newvals {}; for (const auto& v : values) { auto it = o.values.find(v->id); if (it == o.values.cend() or (it->second != v && !(*it->second == *v))) { - if (not f or f(*v)) - newvals.push_back(v); + newvals.push_back(v); } } return newvals; @@ -1008,6 +1004,9 @@ Dht::get(const InfoHash& id, GetCallback getcb, DoneCallback donecb, Value::Filt }, o); }; + auto q = std::make_shared<Query>(Select {}, std::move(where)); + auto f = filter.chain(q->where.getFilter()); + /* Try to answer this search locally. */ gcb(getLocal(id, f)); @@ -1075,7 +1074,7 @@ void Dht::query(const InfoHash& id, QueryCallback cb, DoneCallback done_cb, Quer } std::vector<Sp<Value>> -Dht::getLocal(const InfoHash& id, Value::Filter f) const +Dht::getLocal(const InfoHash& id, const Value::Filter& f) const { auto s = store.find(id); if (s == store.end()) return {}; @@ -1092,17 +1091,14 @@ Dht::getLocalById(const InfoHash& id, Value::Id vid) const } std::vector<Sp<Value>> -Dht::getPut(const InfoHash& id) +Dht::getPut(const InfoHash& id) const { std::vector<Sp<Value>> ret; - auto find_values = [&](std::map<InfoHash, Sp<Search>> srs) { + auto find_values = [&](const std::map<InfoHash, Sp<Search>>& srs) { auto srp = srs.find(id); - if (srp == srs.end()) - return; - auto& search = srp->second; - ret.reserve(ret.size() + search->announce.size()); - for (const auto& a : search->announce) - ret.push_back(a.value); + if (srp == srs.end()) return; + auto vals = srp->second->getPut(); + ret.insert(ret.end(), vals.begin(), vals.end()); }; find_values(searches4); find_values(searches6); @@ -1110,18 +1106,11 @@ Dht::getPut(const InfoHash& id) } Sp<Value> -Dht::getPut(const InfoHash& id, const Value::Id& vid) +Dht::getPut(const InfoHash& id, const Value::Id& vid) const { - auto find_value = [&](std::map<InfoHash, Sp<Search>> srs) { + auto find_value = [&](const std::map<InfoHash, Sp<Search>>& srs) { auto srp = srs.find(id); - if (srp == srs.end()) - return Sp<Value> {}; - auto& search = srp->second; - for (auto& a : search->announce) { - if (a.value->id == vid) - return a.value; - } - return Sp<Value> {}; + return (srp != srs.end()) ? srp->second->getPut(vid) : Sp<Value> {}; }; if (auto v4 = find_value(searches4)) return v4; @@ -1134,29 +1123,17 @@ bool Dht::cancelPut(const InfoHash& id, const Value::Id& vid) { bool canceled {false}; - if (storageErase(id, vid)) - canceled = true; - auto sr_cancel_put = [&](std::map<InfoHash, Sp<Search>> srs) { + auto sr_cancel_put = [&](std::map<InfoHash, Sp<Search>>& srs) { auto srp = srs.find(id); - if (srp == srs.end()) - return; - - auto& sr = srp->second; - for (auto it = sr->announce.begin(); it != sr->announce.end();) { - if (it->value->id == vid) { - canceled = true; - it = sr->announce.erase(it); - } - else - ++it; - } + return (srp != srs.end()) ? srp->second->cancelPut(vid) : false; }; - sr_cancel_put(searches4); - sr_cancel_put(searches6); + canceled |= sr_cancel_put(searches4); + canceled |= sr_cancel_put(searches6); + if (canceled) + storageErase(id, vid); return canceled; } - // Storage void @@ -2016,14 +1993,15 @@ Dht::exportValues() const void Dht::importValues(const std::vector<ValuesExport>& import) { - for (const auto& h : import) { - if (h.second.empty()) + const auto& now = scheduler.time(); + + for (const auto& node : import) { + if (node.second.empty()) continue; - const auto& now = scheduler.time(); try { msgpack::unpacked msg; - msgpack::unpack(msg, (const char*)h.second.data(), h.second.size()); + msgpack::unpack(msg, (const char*)node.second.data(), node.second.size()); auto valarr = msg.get(); if (valarr.type != msgpack::type::ARRAY) throw msgpack::type_error(); @@ -2037,14 +2015,14 @@ Dht::importValues(const std::vector<ValuesExport>& import) val_time = time_point{time_point::duration{valel.via.array.ptr[0].as<time_point::duration::rep>()}}; tmp_val.msgpack_unpack(valel.via.array.ptr[1]); } catch (const std::exception&) { - DHT_LOG.e(h.first, "Error reading value at %s", h.first.toString().c_str()); + DHT_LOG.e(node.first, "Error reading value at %s", node.first.toString().c_str()); continue; } val_time = std::min(val_time, now); - storageStore(h.first, std::make_shared<Value>(std::move(tmp_val)), val_time); + storageStore(node.first, std::make_shared<Value>(std::move(tmp_val)), val_time); } } catch (const std::exception&) { - DHT_LOG.e(h.first, "Error reading values at %s", h.first.toString().c_str()); + DHT_LOG.e(node.first, "Error reading values at %s", node.first.toString().c_str()); continue; } } diff --git a/src/dht_proxy_client.cpp b/src/dht_proxy_client.cpp index 1744c1d674ee3853ed9b54ccfe1eff3454bc9864..eb36df6367741ef841efeb5aa847921bd0292fdb 100644 --- a/src/dht_proxy_client.cpp +++ b/src/dht_proxy_client.cpp @@ -110,7 +110,7 @@ DhtProxyClient::~DhtProxyClient() } std::vector<Sp<Value>> -DhtProxyClient::getLocal(const InfoHash& k, Value::Filter filter) const { +DhtProxyClient::getLocal(const InfoHash& k, const Value::Filter& filter) const { std::lock_guard<std::mutex> lock(searchLock_); auto s = searches_.find(k); if (s == searches_.end()) @@ -445,7 +445,7 @@ DhtProxyClient::doPut(const InfoHash& key, Sp<Value> val, DoneCallback cb, time_ * Get data currently being put at the given hash. */ std::vector<Sp<Value>> -DhtProxyClient::getPut(const InfoHash& key) { +DhtProxyClient::getPut(const InfoHash& key) const { std::vector<Sp<Value>> ret; auto search = searches_.find(key); if (search != searches_.end()) { @@ -460,7 +460,7 @@ DhtProxyClient::getPut(const InfoHash& key) { * Get data currently being put at the given hash with the given id. */ Sp<Value> -DhtProxyClient::getPut(const InfoHash& key, const Value::Id& id) { +DhtProxyClient::getPut(const InfoHash& key, const Value::Id& id) const { auto search = searches_.find(key); if (search == searches_.end()) return {}; diff --git a/src/search.h b/src/search.h index da80f1af722373c10364e6ac7dd971b9aa8179b2..f292e5e681848746d3c55f374eb54e1667b518b3 100644 --- a/src/search.h +++ b/src/search.h @@ -532,6 +532,34 @@ struct Dht::Search { scheduler.edit(opExpirationJob, cache.getExpiration()); } + std::vector<Sp<Value>> getPut() const { + std::vector<Sp<Value>> ret; + ret.reserve(announce.size()); + for (const auto& a : announce) + ret.push_back(a.value); + return ret; + } + + Sp<Value> getPut(Value::Id vid) const { + for (auto& a : announce) { + if (a.value->id == vid) + return a.value; + } + } + + bool cancelPut(Value::Id vid) { + bool canceled {false}; + for (auto it = announce.begin(); it != announce.end();) { + if (it->value->id == vid) { + canceled = true; + it = announce.erase(it); + } + else + ++it; + } + return canceled; + } + /** * @return The number of non-good search nodes. */ diff --git a/src/storage.h b/src/storage.h index 6dcc88a58f149ecbb8a18346a01b7ea4a6f1e07f..fb4792e6d1b8ef26b3c0ad28aa10bf4d769d1cb7 100644 --- a/src/storage.h +++ b/src/storage.h @@ -115,7 +115,7 @@ struct Storage { return {}; } - std::vector<Sp<Value>> get(Value::Filter f = {}) const { + std::vector<Sp<Value>> get(const Value::Filter& f = {}) const { std::vector<Sp<Value>> newvals {}; if (not f) newvals.reserve(values.size()); for (auto& v : values) {