diff --git a/include/opendht/value.h b/include/opendht/value.h index dac624f55dc5fea3e0bded97a717ebc29bb72506..64c5220628ad600f72e4134794deda4c7b69dfde 100644 --- a/include/opendht/value.h +++ b/include/opendht/value.h @@ -427,7 +427,7 @@ struct OPENDHT_PUBLIC Value /** * Returns true if value contents are equals (not considering the value ID) */ - inline bool contentEquals(const Value& o) { + inline bool contentEquals(const Value& o) const { return isEncrypted() ? cypher == o.cypher : ((owner == o.owner || (owner and o.owner and *owner == *o.owner)) && type == o.type @@ -436,9 +436,12 @@ struct OPENDHT_PUBLIC Value && signature == o.signature); } - inline bool operator== (const Value& o) { + inline bool operator== (const Value& o) const { return id == o.id and contentEquals(o); } + inline bool operator!= (const Value& o) const { + return !(*this == o); + } inline void setRecipient(const InfoHash& r) { recipient = r; diff --git a/src/dht.cpp b/src/dht.cpp index 946102c2a967f487980fb51ac08f2ae8f6fea680..84495267e022a52ee7d4e681634a129c3df8fbe1 100644 --- a/src/dht.cpp +++ b/src/dht.cpp @@ -1289,10 +1289,11 @@ Dht::storageStore(const InfoHash& id, const Sp<Value>& value, time_point created if (total_store_size > max_store_size) { auto value = vs->data; auto value_diff = store.second.values_diff; + auto value_edit = store.second.edited_values; expireStore(); - storageChanged(id, st->second, value, value_diff > 0); + storageChanged(id, st->second, value, value_diff > 0 || value_edit > 0); } else { - storageChanged(id, st->second, vs->data, store.second.values_diff > 0); + storageChanged(id, st->second, vs->data, store.second.values_diff > 0 || store.second.edited_values > 0); } } diff --git a/src/op_cache.cpp b/src/op_cache.cpp index de2c4606db2810fd7e3664aad99dd84615242d14..fa66583c97c1da08e21126cd62b44a07f4ad3ea7 100644 --- a/src/op_cache.cpp +++ b/src/op_cache.cpp @@ -28,6 +28,12 @@ OpValueCache::onValuesAdded(const std::vector<Sp<Value>>& vals, const system_clo auto viop = values.emplace(v->id, v); if (viop.second) { newValues.emplace_back(v); + } else if (*viop.first->second.data != *v) { + // Special case for edition + if (v->seq > viop.first->second.data->seq) { + viop.first->second.data = v; + newValues.emplace_back(v); + } } else { viop.first->second.refCount++; } diff --git a/src/storage.h b/src/storage.h index bff04ab5cdcdcdb56a7f0faeefbb659e79dbeba0..2533214e142c067b76ee9a1f6302e3860ec973e1 100644 --- a/src/storage.h +++ b/src/storage.h @@ -100,6 +100,8 @@ struct Storage { ssize_t values_diff; /** Difference in number of listeners */ ssize_t listeners_diff; + /** Number of edited values */ + size_t edited_values; }; Storage() {} @@ -230,7 +232,7 @@ Storage::store(const InfoHash& id, const Sp<Value>& value, time_point created, t sb->insert(id, *value, expiration); it->data = value; total_size += size_diff; - return std::make_pair(&(*it), StoreDiff{size_diff, 0, 0}); + return std::make_pair(&(*it), StoreDiff{size_diff, 0, 0, 1}); } } else { //DHT_LOG.DEBUG("Storing %s -> %s", id.toString().c_str(), value->toString().c_str()); @@ -240,7 +242,7 @@ Storage::store(const InfoHash& id, const Sp<Value>& value, time_point created, t values.back().store_bucket = sb; if (sb) sb->insert(id, *value, expiration); - return std::make_pair(&values.back(), StoreDiff{size_new, 1, 0}); + return std::make_pair(&values.back(), StoreDiff{size_new, 1, 0, 0}); } } return std::make_pair(nullptr, StoreDiff{}); @@ -272,7 +274,7 @@ Storage::clear() ssize_t tot_size = total_size; values.clear(); total_size = 0; - return {-tot_size, -num_values, 0}; + return {-tot_size, -num_values, 0, 0}; } std::pair<ssize_t, std::vector<Sp<Value>>> diff --git a/tests/dhtrunnertester.cpp b/tests/dhtrunnertester.cpp index 79513ad97b223fe602525dfaaea12ca8b4831c5e..f56caad452d96cfb49d23c5a18b8bdc33c8d5afa 100644 --- a/tests/dhtrunnertester.cpp +++ b/tests/dhtrunnertester.cpp @@ -209,6 +209,7 @@ DhtRunnerTester::testIdOps() { std::mutex mutex; std::condition_variable cv; unsigned valueCount(0); + unsigned valueCountEdit(0); dht::DhtRunner::Config config2; config2.dht_config.node_config.max_peer_req_per_sec = -1; @@ -280,10 +281,44 @@ DhtRunnerTester::testIdOps() { return true; }); + auto key2 = dht::InfoHash::get("key2"); + auto editValue = std::make_shared<dht::Value>("v1"); + node1.putSigned(key2, editValue, [&](bool ok){ + CPPUNIT_ASSERT(ok); + std::lock_guard<std::mutex> lk(mutex); + valueCountEdit++; + cv.notify_all(); + }); + node2.listen(key2, [&](const std::vector<std::shared_ptr<dht::Value>>& values, bool expired){ + for (const auto& v : values) { + if (v->seq == 0) + CPPUNIT_ASSERT_EQUAL("v1"s, dht::unpackMsg<std::string>(v->data)); + else if (v->seq == 1) + CPPUNIT_ASSERT_EQUAL("v2"s, dht::unpackMsg<std::string>(v->data)); + CPPUNIT_ASSERT_EQUAL(v->owner->getLongId(), node1.getPublicKey()->getLongId()); + } + std::lock_guard<std::mutex> lk(mutex); + valueCountEdit += values.size(); + cv.notify_all(); + return true; + }); + { std::unique_lock<std::mutex> lk(mutex); - CPPUNIT_ASSERT(cv.wait_for(lk, 20s, [&]{ return valueCount == 7; })); + CPPUNIT_ASSERT(cv.wait_for(lk, 20s, [&]{ return valueCount == 7 && valueCountEdit == 2; })); } + + // editValue->data = dht::packMsg("v2"); + editValue = std::make_shared<dht::Value>(editValue->id); + editValue->data = dht::packMsg("v2"); + node1.putSigned(key2, editValue, [&](bool ok){ + CPPUNIT_ASSERT(ok); + std::lock_guard<std::mutex> lk(mutex); + valueCountEdit++; + cv.notify_all(); + }); + std::unique_lock<std::mutex> lk(mutex); + CPPUNIT_ASSERT(cv.wait_for(lk, 20s, [&]{ return valueCountEdit == 4; })); } void @@ -297,7 +332,7 @@ DhtRunnerTester::testListenLotOfBytes() { std::string data(10000, 'a'); auto foo = dht::InfoHash::get("foo"); - constexpr unsigned N = 50; + constexpr unsigned N = 1024; for (unsigned i=0; i<N; i++) { node2.put(foo, data, [&](bool ok) {