diff --git a/include/opendht/indexation/pht.h b/include/opendht/indexation/pht.h index 7d1e5e644029e2ddd7f141ca3e8c0bfa4ca43a4f..95fadd537697cd2c5e02daf7683885b88cbf8499 100644 --- a/include/opendht/indexation/pht.h +++ b/include/opendht/indexation/pht.h @@ -159,7 +159,9 @@ public: * serialization order of fields. */ using KeySpec = std::map<std::string, size_t>; + using RealInsertCallback = std::function<void(std::shared_ptr<Prefix> p, IndexEntry entry )>; using LookupCallback = std::function<void(std::vector<std::shared_ptr<Value>>& values, Prefix p)>; + typedef void (*LookupCallbackRaw)(std::vector<std::shared_ptr<Value>>* values, Prefix* p, void *user_data); static LookupCallback bindLookupCb(LookupCallbackRaw raw_cb, void* user_data) { @@ -268,7 +270,103 @@ private: }; /** - * Tells if the key is valid according to the key spec. + * Check if there is a new canary on the next bit of the prefix p, + * If there is a new one then it will re-put the data there. + * + * @param p Prefix that need to be kept update + * @paran entry The entry to put back + */ + void checkPhtUpdate(const Prefix &p, const IndexEntry &entry) { + + /* Don't try to go further than the end of the prefix */ + if ( p.size_ >= p.content_.size() * 8 ) return; + + auto next_prefix = p.getPrefix( p.size_ + 1); + + dht_->listen(next_prefix.hash(), + [=](const std::shared_ptr<dht::Value> &value) { + if (value->user_type == canary_) { + dht_->put(next_prefix.hash(), std::move(entry)); + checkPhtUpdate(next_prefix, entry); + + std::cerr << "New canary found !" << std::endl; + /* Cancel listen since we found where we need to update*/ + return false; + } + + std::cerr << "Next value since no canary_ found !" << std::endl; + return true; + }, + [=](const dht::Value& v) { + /* Filter value v thats start with the same name as ours */ + return v.user_type.compare(0, name_.size(), name_) == 0; + } + ); + } + + /** + * Looking where to put the data cause if there i free space on the node above then this node will became the real leave. + * + * @param p Share_ptr on the Prefix to check + * @param entry The entry to put at the prefix p + * @param end_cb Callback to use at the end of counting + */ + void getRealPrefix(std::shared_ptr<Prefix> p, IndexEntry entry, RealInsertCallback end_cb ) { + + auto total = std::make_shared<int>(0); /* Will contains the total number of data on 3 nodes */ + auto ended = std::make_shared<int>(0); /* Just indicate how many have end */ + + auto parent = std::make_shared<Prefix>(p->getPrefix(-1)); + auto sibling = std::make_shared<Prefix>(p->getSibling()); + + auto pht_filter = [&](const dht::Value& v) { + return v.user_type.compare(0, name_.size(), name_) == 0; + }; + + /* Lambda will count total number of data node */ + auto count = [=]( const std::shared_ptr<dht::Value> val ) { + if ( val->user_type != canary_) + (*total)++; + + std::cerr << "Total data " << *total << std::endl; + return true; + }; + + auto on_done = [=] ( bool ) { + (*ended)++; + /* Only the last one do the CallBack*/ + if ( *ended == 3 ) { + if ( *total < MAX_NODE_ENTRY_COUNT ) + end_cb(parent, std::move(entry)); + else + end_cb(p, std::move(entry)); + + std::cerr << "Total" << *total << std::endl; + } + }; + + dht_->get(parent->hash(), + count, + on_done, + pht_filter + ); + + dht_->get(p->hash(), + count, + on_done, + pht_filter + ); + + dht_->get(sibling->hash(), + count, + on_done, + pht_filter + ); + } + + /** + * Performs a step in the lookup operation. Each steps are performed + * asynchronously. */ bool validKey(const Key& k) const { return k.size() == keySpec_.size() and diff --git a/src/indexation/pht.cpp b/src/indexation/pht.cpp index 7fa0a042935c2876b543abe1ae08070e0e139232..6a1cff62fcb58f08119a7eef32a3bc2e3a8457f9 100644 --- a/src/indexation/pht.cpp +++ b/src/indexation/pht.cpp @@ -247,6 +247,7 @@ void Pht::updateCanary(Prefix p) { // TODO: change this... copy value dht::Value canary_value; canary_value.user_type = canary_; + dht_->put(p.hash(), std::move(canary_value), [=](bool){ static std::bernoulli_distribution d(0.5); @@ -271,6 +272,7 @@ void Pht::insert(Key k, Value v, DoneCallbackSimple done_cb) { auto vals = std::make_shared<std::vector<std::shared_ptr<Value>>>(); auto final_prefix = std::make_shared<Prefix>(); + lookupStep(kp, lo, hi, vals, [=](std::vector<std::shared_ptr<Value>>&, Prefix p) { *final_prefix = Prefix(p); @@ -280,16 +282,26 @@ void Pht::insert(Key k, Value v, DoneCallbackSimple done_cb) { if (done_cb) done_cb(false); } else { - if (vals->size() >= MAX_NODE_ENTRY_COUNT) - *final_prefix = kp.getPrefix(final_prefix->size_+1); IndexEntry entry; entry.value = v; entry.prefix = kp.content_; entry.name = name_; - updateCanary(*final_prefix); - dht_->put(final_prefix->hash(), std::move(entry), done_cb); + RealInsertCallback real_insert = [=]( std::shared_ptr<Prefix> p, IndexEntry entry ) { + updateCanary(*p); + checkPhtUpdate(*p, entry); + dht_->put(p->hash(), std::move(entry), done_cb); + }; + + std::cerr << "Insert prefix" << p.toString() << std::endl; + + if ( vals->size() <= MAX_NODE_ENTRY_COUNT ) + getRealPrefix( final_prefix, std::move(entry), real_insert); + else { + *final_prefix = kp.getPrefix(final_prefix->size_+1); + real_insert( final_prefix, std::move(entry) ); + } } }, nullptr, cache_.lookup(kp), true );