From 48c40c8a273d8dadae2f9101e44ce43a5e31b59c Mon Sep 17 00:00:00 2001 From: kaldoran <kaldoran@live.fr> Date: Wed, 15 Jun 2016 15:48:56 -0400 Subject: [PATCH] pht: correction on the algorithm of complet node splitting --- include/opendht/indexation/pht.h | 163 ++++++++++++----------- src/indexation/pht.cpp | 216 ++++++++++++++++++++++--------- 2 files changed, 241 insertions(+), 138 deletions(-) diff --git a/include/opendht/indexation/pht.h b/include/opendht/indexation/pht.h index 95fadd53..c372445d 100644 --- a/include/opendht/indexation/pht.h +++ b/include/opendht/indexation/pht.h @@ -128,7 +128,6 @@ struct Prefix { throw std::out_of_range("bit larger than prefix size."); Prefix copy = *this; - size_t offset_bit = (8 - bit) % 8; copy.content_[bit / 8] ^= (1 << offset_bit); @@ -137,10 +136,32 @@ struct Prefix { size_t size_ {0}; Blob content_ {}; + }; using Value = std::pair<InfoHash, dht::Value::Id>; +struct IndexEntry : public dht::Value::Serializable<IndexEntry> { + static const ValueType TYPE; + + virtual void unpackValue(const dht::Value& v) { + Serializable<IndexEntry>::unpackValue(v); + name = v.user_type; + } + + virtual dht::Value packValue() const { + auto pack = Serializable<IndexEntry>::packValue(); + pack.user_type = name; + return pack; + } + + Blob prefix; + Value value; + std::string name; + MSGPACK_DEFINE_MAP(prefix, value); +}; + + class Pht { static constexpr const char* INVALID_KEY = "Key does not match the PHT key spec."; @@ -151,15 +172,13 @@ public: /* This is the maximum number of entries per node. This parameter is * critical and influences the traffic a lot during a lookup operation. */ - static constexpr const size_t MAX_NODE_ENTRY_COUNT {16}; + static constexpr const size_t MAX_NODE_ENTRY_COUNT {2}; // FIX : 16 /* A key for a an index entry */ using Key = std::map<std::string, Blob>; /* Specifications of the keys. It defines the number, the length and the * serialization order of fields. */ using KeySpec = std::map<std::string, size_t>; - - using RealInsertCallback = std::function<void(std::shared_ptr<Prefix> p, IndexEntry entry )>; using LookupCallback = std::function<void(std::vector<std::shared_ptr<Value>>& values, Prefix p)>; typedef void (*LookupCallbackRaw)(std::vector<std::shared_ptr<Value>>* values, Prefix* p, void *user_data); @@ -200,9 +219,24 @@ public: /** * Adds an entry into the index. */ - void insert(Key k, Value v, DoneCallbackSimple cb = {}); + void insert(Key k, Value v, DoneCallbackSimple done_cb = {}) { + Prefix p = linearize(k); + + auto lo = std::make_shared<int>(0); + auto hi = std::make_shared<int>(p.size_); + + IndexEntry entry; + entry.value = v; + entry.prefix = p.content_; + entry.name = name_; + + Pht::insert(p, entry, lo, hi, clock::now(), true, done_cb); + } private: + void insert( Prefix kp, IndexEntry entry, std::shared_ptr<int> lo, std::shared_ptr<int> hi, time_point time_p, + bool check_split, DoneCallbackSimple done_cb = {}); + class Cache { public: /** @@ -217,11 +251,12 @@ private: * @param p : Prefix that we are looking for * @return : The size of the longest prefix known in the cache between 0 and p.size_ */ + int lookup(const Prefix& p); private: static constexpr const size_t MAX_ELEMENT {1024}; - static constexpr const std::chrono::minutes NODE_EXPIRE_TIME {5}; + static constexpr const std::chrono::minutes NODE_EXPIRE_TIME {10}; struct Node { time_point last_reply; /* Made the assocation between leaves and leaves multimap */ @@ -240,12 +275,16 @@ private: std::multimap<time_point, std::shared_ptr<Node>> leaves_; }; + /* Callback used for insert value by using the pht */ + using RealInsertCallback = std::function<void(std::shared_ptr<Prefix> p, IndexEntry entry )>; + using LookupCallbackWrapper = std::function<void(std::vector<std::shared_ptr<IndexEntry>>& values, Prefix p)>; + /** * Performs a step in the lookup operation. Each steps are performed * asynchronously. */ void lookupStep(Prefix k, std::shared_ptr<int> lo, std::shared_ptr<int> hi, - std::shared_ptr<std::vector<std::shared_ptr<Value>>> vals, LookupCallback cb, + std::shared_ptr<std::vector<std::shared_ptr<IndexEntry>>> vals, LookupCallbackWrapper cb, DoneCallbackSimple done_cb, std::shared_ptr<unsigned> max_common_prefix_len, int start = -1, bool all_values = false); @@ -257,17 +296,7 @@ private: * * @return the prefix of the linearized key. */ - virtual Prefix linearize(Key k) const { - if (not validKey(k)) { throw std::invalid_argument(INVALID_KEY); } - - Prefix p = Blob {k.begin()->second.begin(), k.begin()->second.end()}; - - auto bit_loc = p.size_ + 1; - for ( auto i = p.content_.size(); i < keySpec_.begin()->second + 1; i++ ) - p.content_.push_back(0); - - return p.swapBit(bit_loc); - }; + virtual Prefix linearize(Key k) const; /** * Check if there is a new canary on the next bit of the prefix p, @@ -276,25 +305,30 @@ private: * @param p Prefix that need to be kept update * @paran entry The entry to put back */ - void checkPhtUpdate(const Prefix &p, const IndexEntry &entry) { + void checkPhtUpdate(Key k, Prefix p, IndexEntry entry) { - /* Don't try to go further than the end of the prefix */ - if ( p.size_ >= p.content_.size() * 8 ) return; + Prefix full = entry.prefix; + if ( p.size_ >= full.size_ ) return; - auto next_prefix = p.getPrefix( p.size_ + 1); + auto next_prefix = full.getPrefix( p.size_ + 1 ); dht_->listen(next_prefix.hash(), [=](const std::shared_ptr<dht::Value> &value) { if (value->user_type == canary_) { - dht_->put(next_prefix.hash(), std::move(entry)); - checkPhtUpdate(next_prefix, entry); + // updateCanary(next_prefix); + // checkPhtUpdate(next_prefix, entry); + // std::cerr << " LISTEN PUT HERE " << next_prefix.toString() << std::endl; + // dht_->put(next_prefix.hash(), std::move(entry)); + // checkPhtUpdate(next_prefix, entry); + // dht_->put(next_prefix.hash(), entry); + // std::cerr << "New canary found ! " << next_prefix.toString() << std::endl; + + insert(k, entry.value, nullptr); - std::cerr << "New canary found !" << std::endl; /* Cancel listen since we found where we need to update*/ return false; } - std::cerr << "Next value since no canary_ found !" << std::endl; return true; }, [=](const dht::Value& v) { @@ -311,62 +345,39 @@ private: * @param entry The entry to put at the prefix p * @param end_cb Callback to use at the end of counting */ - void getRealPrefix(std::shared_ptr<Prefix> p, IndexEntry entry, RealInsertCallback end_cb ) { + void getRealPrefix(std::shared_ptr<Prefix> p, IndexEntry entry, RealInsertCallback end_cb ); - auto total = std::make_shared<int>(0); /* Will contains the total number of data on 3 nodes */ - auto ended = std::make_shared<int>(0); /* Just indicate how many have end */ - - auto parent = std::make_shared<Prefix>(p->getPrefix(-1)); - auto sibling = std::make_shared<Prefix>(p->getSibling()); - - auto pht_filter = [&](const dht::Value& v) { - return v.user_type.compare(0, name_.size(), name_) == 0; - }; - - /* Lambda will count total number of data node */ - auto count = [=]( const std::shared_ptr<dht::Value> val ) { - if ( val->user_type != canary_) - (*total)++; - - std::cerr << "Total data " << *total << std::endl; - return true; - }; - - auto on_done = [=] ( bool ) { - (*ended)++; - /* Only the last one do the CallBack*/ - if ( *ended == 3 ) { - if ( *total < MAX_NODE_ENTRY_COUNT ) - end_cb(parent, std::move(entry)); - else - end_cb(p, std::move(entry)); - - std::cerr << "Total" << *total << std::endl; - } - }; - - dht_->get(parent->hash(), - count, - on_done, - pht_filter - ); + /** + * Looking where to put the data cause if there is free space on the node + * above then this node will became the real leave. + * + * @param p Share_ptr on the Prefix to check + * @param entry The entry to put at the prefix p + * @param end_cb Callback to use at the end of counting + */ + void checkPhtUpdate(Prefix p, IndexEntry entry, time_point time_p); - dht_->get(p->hash(), - count, - on_done, - pht_filter - ); + size_t foundSplitLocation(Prefix compared, std::shared_ptr<std::vector<std::shared_ptr<IndexEntry>>> vals) { + for ( size_t i = 0; i < compared.content_.size() * 8; i++ ) + for ( auto const& v : *vals) + if ( Prefix(v->prefix).isActiveBit(i) != compared.isActiveBit(i) ) + return i; - dht_->get(sibling->hash(), - count, - on_done, - pht_filter - ); + return compared.size_ - 1; } /** - * Performs a step in the lookup operation. Each steps are performed - * asynchronously. + * Put canary from the split point until the last known canary and add the prefix at the good place + * + * @param insert : Prefix to insertm but also prefix which going to check where we need to split + * @param vals : Vector of vals for the comparaison + * @param entry : Entry to put on the pht + * @param end_cb : Callback to apply to the insert prefi (here does the insert) + */ + void split(Prefix insert, std::shared_ptr<std::vector<std::shared_ptr<IndexEntry>>> vals, IndexEntry entry, RealInsertCallback end_cb ); + + /** + * Tells if the key is valid according to the key spec. */ bool validKey(const Key& k) const { return k.size() == keySpec_.size() and diff --git a/src/indexation/pht.cpp b/src/indexation/pht.cpp index 6a1cff62..bffada42 100644 --- a/src/indexation/pht.cpp +++ b/src/indexation/pht.cpp @@ -4,26 +4,6 @@ namespace dht { namespace indexation { -struct IndexEntry : public dht::Value::Serializable<IndexEntry> { - static const ValueType TYPE; - - virtual void unpackValue(const dht::Value& v) { - Serializable<IndexEntry>::unpackValue(v); - name = v.user_type; - } - - virtual dht::Value packValue() const { - auto pack = Serializable<IndexEntry>::packValue(); - pack.user_type = name; - return pack; - } - - Blob prefix; - Value value; - std::string name; - MSGPACK_DEFINE_MAP(prefix, value); -}; - void Pht::Cache::insert(const Prefix& p) { size_t i = 0; auto now = clock::now(); @@ -32,13 +12,13 @@ void Pht::Cache::insert(const Prefix& p) { while ((leaves_.size() > 0 && leaves_.begin()->first + NODE_EXPIRE_TIME < now) || leaves_.size() > MAX_ELEMENT) leaves_.erase(leaves_.begin()); - - if (not (curr_node = root_.lock())) { + + if (not (curr_node = root_.lock()) ) { /* Root does not exist, need to create one*/ curr_node = std::make_shared<Node>(); root_ = curr_node; } - + curr_node->last_reply = now; /* Iterate through all bit of the Blob */ @@ -69,7 +49,7 @@ void Pht::Cache::insert(const Prefix& p) { } int Pht::Cache::lookup(const Prefix& p) { - int pos = 0; + int pos = -1; auto now = clock::now(), last_node_time = now; /* Before lookup remove the useless one [i.e. too old] */ @@ -81,6 +61,8 @@ int Pht::Cache::lookup(const Prefix& p) { std::shared_ptr<Node> curr_node; while ( auto n = next.lock() ) { + ++pos; + /* Safe since pos is equal to 0 until here */ if ( (unsigned) pos >= p.size_ ) break; @@ -90,11 +72,9 @@ int Pht::Cache::lookup(const Prefix& p) { /* Get the Prefix bit by bit, starting from left */ next = ( p.isActiveBit(pos) ) ? curr_node->right_child : curr_node->left_child; - - ++pos; } - if ( pos > 0 ) { + if ( pos >= 0 ) { auto to_erase = leaves_.find(last_node_time); if ( to_erase != leaves_.end() ) leaves_.erase( to_erase ); @@ -102,15 +82,15 @@ int Pht::Cache::lookup(const Prefix& p) { leaves_.emplace( std::move(now), std::move(curr_node) ); } - return --pos; + return pos; } const ValueType IndexEntry::TYPE = ValueType::USER_DATA; constexpr std::chrono::minutes Pht::Cache::NODE_EXPIRE_TIME; void Pht::lookupStep(Prefix p, std::shared_ptr<int> lo, std::shared_ptr<int> hi, - std::shared_ptr<std::vector<std::shared_ptr<Value>>> vals, - LookupCallback cb, DoneCallbackSimple done_cb, + std::shared_ptr<std::vector<std::shared_ptr<IndexEntry>>> vals, + LookupCallbackWrapper cb, DoneCallbackSimple done_cb, std::shared_ptr<unsigned> max_common_prefix_len, int start, bool all_values) { struct node_lookup_result { @@ -122,6 +102,7 @@ void Pht::lookupStep(Prefix p, std::shared_ptr<int> lo, std::shared_ptr<int> hi, auto mid = (start >= 0) ? (unsigned) start : (*lo + *hi)/2; auto first_res = std::make_shared<node_lookup_result>(); auto second_res = std::make_shared<node_lookup_result>(); + auto on_done = [=](bool ok) { bool is_leaf = first_res->is_pht and not second_res->is_pht; if (not ok) { @@ -130,7 +111,7 @@ void Pht::lookupStep(Prefix p, std::shared_ptr<int> lo, std::shared_ptr<int> hi, } else if (is_leaf or *lo > *hi) { // leaf node - auto to_insert = p.getPrefix(mid); + Prefix to_insert = p.getPrefix(mid); cache_.insert(to_insert); if (cb) { @@ -162,8 +143,9 @@ void Pht::lookupStep(Prefix p, std::shared_ptr<int> lo, std::shared_ptr<int> hi, }; auto on_get = [=](const std::shared_ptr<dht::Value>& value, std::shared_ptr<node_lookup_result> res) { - if (value->user_type == canary_) + if (value->user_type == canary_) { res->is_pht = true; + } else { IndexEntry entry; entry.unpackValue(*value); @@ -171,20 +153,20 @@ void Pht::lookupStep(Prefix p, std::shared_ptr<int> lo, std::shared_ptr<int> hi, if (max_common_prefix_len) { /* inexact match case */ auto common_bits = Prefix::commonBits(p, entry.prefix); if (vals->empty()) { - vals->emplace_back(std::make_shared<Value>(entry.value)); + vals->emplace_back(std::make_shared<IndexEntry>(entry)); *max_common_prefix_len = common_bits; } else { if (common_bits == *max_common_prefix_len) /* this is the max so far */ - vals->emplace_back(std::make_shared<Value>(entry.value)); + vals->emplace_back(std::make_shared<IndexEntry>(entry)); else if (common_bits > *max_common_prefix_len) { /* new max found! */ vals->clear(); - vals->emplace_back(std::make_shared<Value>(entry.value)); + vals->emplace_back(std::make_shared<IndexEntry>(entry)); *max_common_prefix_len = common_bits; } } } else if (all_values or entry.prefix == p.content_) /* exact match case */ - vals->emplace_back(std::make_shared<Value>(entry.value)); + vals->emplace_back(std::make_shared<IndexEntry>(entry)); } return true; }; @@ -205,7 +187,7 @@ void Pht::lookupStep(Prefix p, std::shared_ptr<int> lo, std::shared_ptr<int> hi, lookupStep(p, lo, hi, vals, cb, done_cb, max_common_prefix_len, -1, all_values); } else { first_res->done = true; - if (second_res->done) + if (second_res->done or mid >= p.size_ ) on_done(true); } } @@ -234,20 +216,31 @@ void Pht::lookupStep(Prefix p, std::shared_ptr<int> lo, std::shared_ptr<int> hi, } void Pht::lookup(Key k, Pht::LookupCallback cb, DoneCallbackSimple done_cb, bool exact_match) { - auto values = std::make_shared<std::vector<std::shared_ptr<Value>>>(); auto prefix = linearize(k); + auto values = std::make_shared<std::vector<std::shared_ptr<IndexEntry>>>(); + auto lo = std::make_shared<int>(0); auto hi = std::make_shared<int>(prefix.size_); std::shared_ptr<unsigned> max_common_prefix_len = not exact_match ? std::make_shared<unsigned>(0) : nullptr; - lookupStep(prefix, lo, hi, values, cb, done_cb, max_common_prefix_len, cache_.lookup(prefix)); + lookupStep(prefix, lo, hi, values, + [=](std::vector<std::shared_ptr<IndexEntry>>& entries, Prefix p) { + std::vector<std::shared_ptr<Value>> vals(entries.size()); + + std::transform(entries.begin(), entries.end(), vals.begin(), + [](const std::shared_ptr<IndexEntry>& ie) { + return std::make_shared<Value>(ie->value); + }); + + cb(vals, p); + }, done_cb, max_common_prefix_len, cache_.lookup(prefix)); } void Pht::updateCanary(Prefix p) { // TODO: change this... copy value dht::Value canary_value; canary_value.user_type = canary_; - + dht_->put(p.hash(), std::move(canary_value), [=](bool){ static std::bernoulli_distribution d(0.5); @@ -264,17 +257,17 @@ void Pht::updateCanary(Prefix p) { } } -void Pht::insert(Key k, Value v, DoneCallbackSimple done_cb) { - Prefix kp = linearize(k); - auto lo = std::make_shared<int>(0); - auto hi = std::make_shared<int>(kp.size_); - auto vals = std::make_shared<std::vector<std::shared_ptr<Value>>>(); - auto final_prefix = std::make_shared<Prefix>(); +void Pht::insert(Prefix kp, IndexEntry entry, std::shared_ptr<int> lo, std::shared_ptr<int> hi, time_point time_p, + bool check_split, DoneCallbackSimple done_cb) { + if (time_p + ValueType::USER_DATA.expiration < clock::now()) return; + + auto vals = std::make_shared<std::vector<std::shared_ptr<IndexEntry>>>(); + auto final_prefix = std::make_shared<Prefix>(); lookupStep(kp, lo, hi, vals, - [=](std::vector<std::shared_ptr<Value>>&, Prefix p) { + [=](std::vector<std::shared_ptr<IndexEntry>>&, Prefix p) { *final_prefix = Prefix(p); }, [=](bool ok){ @@ -283,29 +276,128 @@ void Pht::insert(Key k, Value v, DoneCallbackSimple done_cb) { done_cb(false); } else { - IndexEntry entry; - entry.value = v; - entry.prefix = kp.content_; - entry.name = name_; - - RealInsertCallback real_insert = [=]( std::shared_ptr<Prefix> p, IndexEntry entry ) { + RealInsertCallback real_insert = [=]( std::shared_ptr<Prefix> p, IndexEntry entry) { updateCanary(*p); - checkPhtUpdate(*p, entry); - dht_->put(p->hash(), std::move(entry), done_cb); + checkPhtUpdate(*p, entry, time_p); + cache_.insert(*p); + dht_->put(p->hash(), std::move(entry), done_cb, time_p); }; - std::cerr << "Insert prefix" << p.toString() << std::endl; - - if ( vals->size() <= MAX_NODE_ENTRY_COUNT ) - getRealPrefix( final_prefix, std::move(entry), real_insert); - else { - *final_prefix = kp.getPrefix(final_prefix->size_+1); - real_insert( final_prefix, std::move(entry) ); + if ( not check_split or final_prefix->size_ == kp.size_ ) { + real_insert(final_prefix, std::move(entry)); + } else { + if ( vals->size() < MAX_NODE_ENTRY_COUNT ) + getRealPrefix(final_prefix, std::move(entry), real_insert); + else + split(*final_prefix, vals, entry, real_insert); } } }, nullptr, cache_.lookup(kp), true ); } +Prefix Pht::linearize(Key k) const { + if (not validKey(k)) { throw std::invalid_argument(INVALID_KEY); } + + Prefix p = Blob {k.begin()->second.begin(), k.begin()->second.end()}; + + auto bit_loc = p.size_ + 1; + for ( auto i = p.content_.size(); i < keySpec_.begin()->second + 1; i++ ) + p.content_.push_back(0); + + return p.swapBit(bit_loc); +}; + +void Pht::getRealPrefix(std::shared_ptr<Prefix> p, IndexEntry entry, RealInsertCallback end_cb ) { + if ( p->size_ == 0 ) { + end_cb(p, std::move(entry)); + return; + } + + auto total = std::make_shared<unsigned int>(0); /* Will contains the total number of data on 3 nodes */ + auto ended = std::make_shared<unsigned int>(0); /* Just indicate how many have end */ + + auto parent = std::make_shared<Prefix>(p->getPrefix(-1)); + auto sibling = std::make_shared<Prefix>(p->getSibling()); + + auto pht_filter = [&](const dht::Value& v) { + return v.user_type.compare(0, name_.size(), name_) == 0; + }; + + /* Lambda will count total number of data node */ + auto count = [=]( const std::shared_ptr<dht::Value> value ) { + if ( value->user_type != canary_) + (*total)++; + + return true; + }; + + auto on_done = [=] ( bool ) { + (*ended)++; + /* Only the last one do the CallBack*/ + if ( *ended == 3 ) { + if ( *total < MAX_NODE_ENTRY_COUNT ) + end_cb(parent, std::move(entry)); + else + end_cb(p, std::move(entry)); + } + }; + + dht_->get(parent->hash(), + count, + on_done, + pht_filter + ); + + dht_->get(p->hash(), + count, + on_done, + pht_filter + ); + + dht_->get(sibling->hash(), + count, + on_done, + pht_filter + ); +} + +void Pht::checkPhtUpdate(Prefix p, IndexEntry entry, time_point time_p) { + + Prefix full = entry.prefix; + if ( p.content_.size() * 8 >= full.content_.size() * 8 ) return; + + auto next_prefix = full.getPrefix( p.size_ + 1 ); + + dht_->listen(next_prefix.hash(), + [=](const std::shared_ptr<dht::Value> &value) { + if (value->user_type == canary_) { + insert(full, entry, std::make_shared<int>(0), std::make_shared<int>(full.size_), time_p, false, nullptr); + + /* Cancel listen since we found where we need to update*/ + return false; + } + + return true; + }, + [=](const dht::Value& v) { + /* Filter value v thats start with the same name as ours */ + return v.user_type.compare(0, name_.size(), name_) == 0; + } + ); +} + +void Pht::split(Prefix insert, std::shared_ptr<std::vector<std::shared_ptr<IndexEntry>>> vals, IndexEntry entry, RealInsertCallback end_cb ) { + auto full = Prefix(entry.prefix); + + auto loc = foundSplitLocation(full, vals); + auto prefix_to_insert = std::make_shared<Prefix>(full.getPrefix(loc)); + + for(;loc != insert.size_ - 1; loc--) + updateCanary(full.getPrefix(loc)); + + end_cb(prefix_to_insert, entry); +} + } /* indexation */ } /* dht */ -- GitLab