diff --git a/include/opendht/dht.h b/include/opendht/dht.h index 2e301dac009f5d88e9212aa00cf9e07cd2e04413..24176029900d8dd0e5707fb89fabbb401199d8e4 100644 --- a/include/opendht/dht.h +++ b/include/opendht/dht.h @@ -163,16 +163,35 @@ public: * reannounced on a regular basis. * User can call #cancelPut(InfoHash, Value::Id) to cancel a put operation. */ - void put(const InfoHash& key, std::shared_ptr<Value>, DoneCallback cb=nullptr, time_point created=time_point::max()); - void put(const InfoHash& key, const std::shared_ptr<Value>& v, DoneCallbackSimple cb, time_point created=time_point::max()) { - put(key, v, bindDoneCb(cb), created); + void put(const InfoHash& key, + std::shared_ptr<Value>, + DoneCallback cb=nullptr, + time_point created=time_point::max(), + bool permanent = false); + void put(const InfoHash& key, + const std::shared_ptr<Value>& v, + DoneCallbackSimple cb, + time_point created=time_point::max(), + bool permanent = false) + { + put(key, v, bindDoneCb(cb), created, permanent); } - void put(const InfoHash& key, Value&& v, DoneCallback cb=nullptr, time_point created=time_point::max()) { - put(key, std::make_shared<Value>(std::move(v)), cb, created); + void put(const InfoHash& key, + Value&& v, + DoneCallback cb=nullptr, + time_point created=time_point::max(), + bool permanent = false) + { + put(key, std::make_shared<Value>(std::move(v)), cb, created, permanent); } - void put(const InfoHash& key, Value&& v, DoneCallbackSimple cb, time_point created=time_point::max()) { - put(key, std::forward<Value>(v), bindDoneCb(cb), created); + void put(const InfoHash& key, + Value&& v, + DoneCallbackSimple cb, + time_point created=time_point::max(), + bool permanent = false) + { + put(key, std::forward<Value>(v), bindDoneCb(cb), created, permanent); } /** @@ -302,6 +321,7 @@ private: * A single "put" operation data */ struct Announce { + bool permanent; std::shared_ptr<Value> value; time_point created; DoneCallback callback; @@ -453,7 +473,7 @@ private: * The values can be filtered by an arbitrary provided filter. */ std::shared_ptr<Search> search(const InfoHash& id, sa_family_t af, GetCallback = nullptr, DoneCallback = nullptr, Value::Filter = Value::AllFilter()); - void announce(const InfoHash& id, sa_family_t af, std::shared_ptr<Value> value, DoneCallback callback, time_point created=time_point::max()); + void announce(const InfoHash& id, sa_family_t af, std::shared_ptr<Value> value, DoneCallback callback, time_point created=time_point::max(), bool permanent = false); size_t listenTo(const InfoHash& id, sa_family_t af, GetCallback cb, Value::Filter f = Value::AllFilter()); void bootstrapSearch(Search& sr); diff --git a/include/opendht/dhtrunner.h b/include/opendht/dhtrunner.h index 3bb98ea7a59800c297dc5d36f8a73d53f278286a..2bc114f4be2a1df5e2909726e68fbb08ad01be89 100644 --- a/include/opendht/dhtrunner.h +++ b/include/opendht/dhtrunner.h @@ -156,16 +156,16 @@ public: void cancelListen(InfoHash h, size_t token); void cancelListen(InfoHash h, std::shared_future<size_t> token); - void put(InfoHash hash, std::shared_ptr<Value> value, DoneCallback cb={}); - void put(InfoHash hash, std::shared_ptr<Value> value, DoneCallbackSimple cb) { - put(hash, value, bindDoneCb(cb)); + void put(InfoHash hash, std::shared_ptr<Value> value, DoneCallback cb={}, bool permanent = false); + void put(InfoHash hash, std::shared_ptr<Value> value, DoneCallbackSimple cb, bool permanent = false) { + put(hash, value, bindDoneCb(cb), permanent); } - void put(InfoHash hash, Value&& value, DoneCallback cb={}); - void put(InfoHash hash, Value&& value, DoneCallbackSimple cb) { - put(hash, std::forward<Value>(value), bindDoneCb(cb)); + void put(InfoHash hash, Value&& value, DoneCallback cb={}, bool permanent = false); + void put(InfoHash hash, Value&& value, DoneCallbackSimple cb, bool permanent = false) { + put(hash, std::forward<Value>(value), bindDoneCb(cb), permanent); } - void put(const std::string& key, Value&& value, DoneCallbackSimple cb={}); + void put(const std::string& key, Value&& value, DoneCallbackSimple cb={}, bool permanent = false); void cancelPut(const InfoHash& h, const Value::Id& id); diff --git a/include/opendht/securedht.h b/include/opendht/securedht.h index b61b3238932ec8ec870adecbdd8dea13ad1a1f82..a41ecf26413e619462cf7a57b86ddd513746979f 100644 --- a/include/opendht/securedht.h +++ b/include/opendht/securedht.h @@ -101,9 +101,9 @@ public: /** * Will take ownership of the value, sign it using our private key and put it in the DHT. */ - void putSigned(const InfoHash& hash, std::shared_ptr<Value> val, DoneCallback callback); - void putSigned(const InfoHash& hash, Value&& v, DoneCallback callback) { - putSigned(hash, std::make_shared<Value>(std::move(v)), callback); + void putSigned(const InfoHash& hash, std::shared_ptr<Value> val, DoneCallback callback, bool permanent = false); + void putSigned(const InfoHash& hash, Value&& v, DoneCallback callback, bool permanent = false) { + putSigned(hash, std::make_shared<Value>(std::move(v)), callback, permanent); } /** @@ -111,9 +111,9 @@ public: * and put it in the DHT. * The operation will be immediate if the recipient' public key is known (otherwise it will be retrived first). */ - void putEncrypted(const InfoHash& hash, const InfoHash& to, std::shared_ptr<Value> val, DoneCallback callback); - void putEncrypted(const InfoHash& hash, const InfoHash& to, Value&& v, DoneCallback callback) { - putEncrypted(hash, to, std::make_shared<Value>(std::move(v)), callback); + void putEncrypted(const InfoHash& hash, const InfoHash& to, std::shared_ptr<Value> val, DoneCallback callback, bool permanent = false); + void putEncrypted(const InfoHash& hash, const InfoHash& to, Value&& v, DoneCallback callback, bool permanent = false) { + putEncrypted(hash, to, std::make_shared<Value>(std::move(v)), callback, permanent); } /** diff --git a/src/dht.cpp b/src/dht.cpp index c915d7da1bba0c66526a197fe110538cfba2e3ac..44a50cce100d9184869aed4a1e46e7659dbe6b3b 100644 --- a/src/dht.cpp +++ b/src/dht.cpp @@ -830,8 +830,10 @@ Dht::searchStep(std::shared_ptr<Search> sr) a.callback(true, sr->getNodes()); a.callback = nullptr; } - ait = sr->announce.erase(ait); - continue; + if (not a.permanent) { + ait = sr->announce.erase(ait); + continue; + } } if (in) storageStore(sr->id, a.value, a.created); unsigned i = 0; @@ -1222,7 +1224,8 @@ Dht::search(const InfoHash& id, sa_family_t af, GetCallback callback, DoneCallba } void -Dht::announce(const InfoHash& id, sa_family_t af, std::shared_ptr<Value> value, DoneCallback callback, time_point created) +Dht::announce(const InfoHash& id, sa_family_t af, std::shared_ptr<Value> value, DoneCallback callback, + time_point created, bool permanent) { const auto& now = scheduler.time(); if (!value) { @@ -1244,7 +1247,7 @@ Dht::announce(const InfoHash& id, sa_family_t af, std::shared_ptr<Value> value, return a.value->id == value->id; }); if (a_sr == sr->announce.end()) { - sr->announce.emplace_back(Announce {value, std::min(now, created), callback}); + sr->announce.emplace_back(Announce {permanent, value, std::min(now, created), callback}); for (auto& n : sr->nodes) n.acked[value->id] = {}; } @@ -1399,7 +1402,7 @@ Dht::cancelListen(const InfoHash& id, size_t token) } void -Dht::put(const InfoHash& id, std::shared_ptr<Value> val, DoneCallback callback, time_point created) +Dht::put(const InfoHash& id, std::shared_ptr<Value> val, DoneCallback callback, time_point created, bool permanent) { scheduler.syncTime(); @@ -1427,13 +1430,13 @@ Dht::put(const InfoHash& id, std::shared_ptr<Value> val, DoneCallback callback, *done4 = true; *ok |= ok4; donecb(nodes); - }, created); + }, created, permanent); announce(id, AF_INET6, val, [=](bool ok6, const std::vector<std::shared_ptr<Node>>& nodes) { DHT_LOG.DEBUG("Announce done IPv6 %d", ok6); *done6 = true; *ok |= ok6; donecb(nodes); - }, created); + }, created, permanent); } struct OpStatus { @@ -2703,7 +2706,8 @@ Dht::onAnnounceDone(const Request&, NetworkEngine::RequestAnswer& answer, a.callback(true, sr->getNodes()); a.callback = nullptr; } - return true; + if (not a.permanent) + return true; } return false; }), sr->announce.end()); diff --git a/src/dhtrunner.cpp b/src/dhtrunner.cpp index 18088d0477bc3a1fe3d431251b704be3779985f6..cd4231cb769d781286bc2fbf1876993450f446f0 100644 --- a/src/dhtrunner.cpp +++ b/src/dhtrunner.cpp @@ -480,30 +480,30 @@ DhtRunner::cancelListen(InfoHash h, std::shared_future<size_t> token) } void -DhtRunner::put(InfoHash hash, Value&& value, DoneCallback cb) +DhtRunner::put(InfoHash hash, Value&& value, DoneCallback cb, bool permanent) { std::lock_guard<std::mutex> lck(storage_mtx); auto sv = std::make_shared<Value>(std::move(value)); pending_ops.emplace([=](SecureDht& dht) { - dht.put(hash, sv, cb); + dht.put(hash, sv, cb, {}, permanent); }); cv.notify_all(); } void -DhtRunner::put(InfoHash hash, std::shared_ptr<Value> value, DoneCallback cb) +DhtRunner::put(InfoHash hash, std::shared_ptr<Value> value, DoneCallback cb, bool permanent) { std::lock_guard<std::mutex> lck(storage_mtx); pending_ops.emplace([=](SecureDht& dht) { - dht.put(hash, value, cb); + dht.put(hash, value, cb, {}, permanent); }); cv.notify_all(); } void -DhtRunner::put(const std::string& key, Value&& value, DoneCallbackSimple cb) +DhtRunner::put(const std::string& key, Value&& value, DoneCallbackSimple cb, bool permanent) { - put(InfoHash::get(key), std::forward<Value>(value), cb); + put(InfoHash::get(key), std::forward<Value>(value), cb, permanent); } void diff --git a/src/securedht.cpp b/src/securedht.cpp index 93098f2a8e6b72082776805ab0277a4c01560ed2..c004dbbf8cf7bc3d3b31a1c63ea424d583c6d5b4 100644 --- a/src/securedht.cpp +++ b/src/securedht.cpp @@ -303,7 +303,7 @@ SecureDht::listen(const InfoHash& id, GetCallback cb, Value::Filter&& f) } void -SecureDht::putSigned(const InfoHash& hash, std::shared_ptr<Value> val, DoneCallback callback) +SecureDht::putSigned(const InfoHash& hash, std::shared_ptr<Value> val, DoneCallback callback, bool permanent) { if (val->id == Value::INVALID_ID) { crypto::random_device rdev; @@ -331,16 +331,16 @@ SecureDht::putSigned(const InfoHash& hash, std::shared_ptr<Value> val, DoneCallb } return true; }, - [hash,val,this,callback] (bool /* ok */) { + [hash,val,this,callback,permanent] (bool /* ok */) { sign(*val); - put(hash, val, callback); + put(hash, val, callback, {}, permanent); }, Value::IdFilter(val->id) ); } void -SecureDht::putEncrypted(const InfoHash& hash, const InfoHash& to, std::shared_ptr<Value> val, DoneCallback callback) +SecureDht::putEncrypted(const InfoHash& hash, const InfoHash& to, std::shared_ptr<Value> val, DoneCallback callback, bool permanent) { findPublicKey(to, [=](const std::shared_ptr<crypto::PublicKey> pk) { if(!pk || !*pk) { @@ -350,7 +350,7 @@ SecureDht::putEncrypted(const InfoHash& hash, const InfoHash& to, std::shared_pt } DHT_LOG.WARN("Encrypting data for PK: %s", pk->getId().toString().c_str()); try { - put(hash, encrypt(*val, *pk), callback); + put(hash, encrypt(*val, *pk), callback, {}, permanent); } catch (const std::exception& e) { DHT_LOG.ERR("Error putting encrypted data: %s", e.what()); if (callback)