diff --git a/src/dht.cpp b/src/dht.cpp index 946102c2a967f487980fb51ac08f2ae8f6fea680..84495267e022a52ee7d4e681634a129c3df8fbe1 100644 --- a/src/dht.cpp +++ b/src/dht.cpp @@ -1289,10 +1289,11 @@ Dht::storageStore(const InfoHash& id, const Sp<Value>& value, time_point created if (total_store_size > max_store_size) { auto value = vs->data; auto value_diff = store.second.values_diff; + auto value_edit = store.second.edited_values; expireStore(); - storageChanged(id, st->second, value, value_diff > 0); + storageChanged(id, st->second, value, value_diff > 0 || value_edit > 0); } else { - storageChanged(id, st->second, vs->data, store.second.values_diff > 0); + storageChanged(id, st->second, vs->data, store.second.values_diff > 0 || store.second.edited_values > 0); } } diff --git a/src/op_cache.cpp b/src/op_cache.cpp index de2c4606db2810fd7e3664aad99dd84615242d14..a5c626af1332f4079c55cf1b0bf2bc92a5432d2e 100644 --- a/src/op_cache.cpp +++ b/src/op_cache.cpp @@ -28,6 +28,10 @@ OpValueCache::onValuesAdded(const std::vector<Sp<Value>>& vals, const system_clo auto viop = values.emplace(v->id, v); if (viop.second) { newValues.emplace_back(v); + } else if (!(*viop.first->second.data == *v)) { + // Special case for edition + viop.first->second.data = v; + newValues.emplace_back(v); } else { viop.first->second.refCount++; } diff --git a/src/storage.h b/src/storage.h index bff04ab5cdcdcdb56a7f0faeefbb659e79dbeba0..7bfaa401d16fb389f6c91cfdb77978a32be3e885 100644 --- a/src/storage.h +++ b/src/storage.h @@ -100,6 +100,8 @@ struct Storage { ssize_t values_diff; /** Difference in number of listeners */ ssize_t listeners_diff; + /** Number of edited values */ + size_t edited_values; }; Storage() {} @@ -159,6 +161,7 @@ struct Storage { */ std::pair<ValueStorage*, time_point> refresh(const InfoHash& id, const time_point& now, const Value::Id& vid, const TypeStore& types) { + std::cout << "storage refresh " << id.to_view() << " " << vid << std::endl; for (auto& vs : values) if (vs.data->id == vid) { vs.created = now; @@ -209,6 +212,7 @@ Storage::listen(ValueCallback& gcb, Value::Filter& filter, const Sp<Query>& quer std::pair<ValueStorage*, Storage::StoreDiff> Storage::store(const InfoHash& id, const Sp<Value>& value, time_point created, time_point expiration, StorageBucket* sb) { + //std::cout << "storage store " << id.to_view() << " " << value->id << " " << value->seq << std::endl; auto it = std::find_if (values.begin(), values.end(), [&](const ValueStorage& vr) { return vr.data == value || vr.data->id == value->id; }); @@ -230,7 +234,7 @@ Storage::store(const InfoHash& id, const Sp<Value>& value, time_point created, t sb->insert(id, *value, expiration); it->data = value; total_size += size_diff; - return std::make_pair(&(*it), StoreDiff{size_diff, 0, 0}); + return std::make_pair(&(*it), StoreDiff{size_diff, 0, 0, 1}); } } else { //DHT_LOG.DEBUG("Storing %s -> %s", id.toString().c_str(), value->toString().c_str()); @@ -240,7 +244,7 @@ Storage::store(const InfoHash& id, const Sp<Value>& value, time_point created, t values.back().store_bucket = sb; if (sb) sb->insert(id, *value, expiration); - return std::make_pair(&values.back(), StoreDiff{size_new, 1, 0}); + return std::make_pair(&values.back(), StoreDiff{size_new, 1, 0, 0}); } } return std::make_pair(nullptr, StoreDiff{}); @@ -272,7 +276,7 @@ Storage::clear() ssize_t tot_size = total_size; values.clear(); total_size = 0; - return {-tot_size, -num_values, 0}; + return {-tot_size, -num_values, 0, 0}; } std::pair<ssize_t, std::vector<Sp<Value>>> diff --git a/tests/dhtrunnertester.cpp b/tests/dhtrunnertester.cpp index 79513ad97b223fe602525dfaaea12ca8b4831c5e..69d454ece4569f462dddcc5bbd47a10e7e8b722a 100644 --- a/tests/dhtrunnertester.cpp +++ b/tests/dhtrunnertester.cpp @@ -209,6 +209,7 @@ DhtRunnerTester::testIdOps() { std::mutex mutex; std::condition_variable cv; unsigned valueCount(0); + unsigned valueCountEdit(0); dht::DhtRunner::Config config2; config2.dht_config.node_config.max_peer_req_per_sec = -1; @@ -280,10 +281,47 @@ DhtRunnerTester::testIdOps() { return true; }); + auto key2 = dht::InfoHash::get("key2"); + auto editValue = std::make_shared<dht::Value>("v1"); + node1.putSigned(key2, editValue, [&](bool ok){ + CPPUNIT_ASSERT(ok); + std::lock_guard<std::mutex> lk(mutex); + valueCountEdit++; + cv.notify_all(); + }); + node2.listen(key2, [&](const std::vector<std::shared_ptr<dht::Value>>& values, bool expired){ + for (const auto& v : values) { + std::cout << "VVVVVV " << dht::unpackMsg<std::string>(v->data) << " seq " << v->seq << " exp:" << expired << std::endl; + if (v->seq == 0) + CPPUNIT_ASSERT_EQUAL("v1"s, dht::unpackMsg<std::string>(v->data)); + else if (v->seq == 1) + CPPUNIT_ASSERT_EQUAL("v2"s, dht::unpackMsg<std::string>(v->data)); + CPPUNIT_ASSERT_EQUAL(v->owner->getLongId(), node1.getPublicKey()->getLongId()); + } + std::lock_guard<std::mutex> lk(mutex); + valueCountEdit += values.size(); + cv.notify_all(); + return true; + }); + { std::unique_lock<std::mutex> lk(mutex); - CPPUNIT_ASSERT(cv.wait_for(lk, 20s, [&]{ return valueCount == 7; })); + CPPUNIT_ASSERT(cv.wait_for(lk, 20s, [&]{ return valueCount == 7 && valueCountEdit == 2; })); } + + // editValue->data = dht::packMsg("v2"); + editValue = std::make_shared<dht::Value>(editValue->id); + editValue->data = dht::packMsg("v2"); + node1.putSigned(key2, editValue, [&](bool ok){ + std::cout << "putSigned " << ok << " " << editValue->seq << std::endl; + CPPUNIT_ASSERT(ok); + std::lock_guard<std::mutex> lk(mutex); + valueCountEdit++; + cv.notify_all(); + }); + std::unique_lock<std::mutex> lk(mutex); + CPPUNIT_ASSERT(cv.wait_for(lk, 20s, [&]{ return valueCountEdit == 4; })); + std::cout << "valueCountEdit " << valueCountEdit << std::endl; } void @@ -297,7 +335,7 @@ DhtRunnerTester::testListenLotOfBytes() { std::string data(10000, 'a'); auto foo = dht::InfoHash::get("foo"); - constexpr unsigned N = 50; + constexpr unsigned N = 1024; for (unsigned i=0; i<N; i++) { node2.put(foo, data, [&](bool ok) {