diff --git a/include/opendht/dhtrunner.h b/include/opendht/dhtrunner.h index dc342bf9951c2e73a901ad1d02eddcd6f6543bda..101442266165265d51ddcdaf7a28f668afd90b23 100644 --- a/include/opendht/dhtrunner.h +++ b/include/opendht/dhtrunner.h @@ -156,16 +156,16 @@ public: void cancelListen(InfoHash h, size_t token); void cancelListen(InfoHash h, std::shared_future<size_t> token); - void put(InfoHash hash, std::shared_ptr<Value> value, DoneCallback cb={}, bool permanent = false); - void put(InfoHash hash, std::shared_ptr<Value> value, DoneCallbackSimple cb, bool permanent = false) { - put(hash, value, bindDoneCb(cb), permanent); + void put(InfoHash hash, std::shared_ptr<Value> value, DoneCallback cb={}, time_point created=time_point::max(), bool permanent = false); + void put(InfoHash hash, std::shared_ptr<Value> value, DoneCallbackSimple cb, time_point created=time_point::max(), bool permanent = false) { + put(hash, value, bindDoneCb(cb), created, permanent); } - void put(InfoHash hash, Value&& value, DoneCallback cb={}, bool permanent = false); - void put(InfoHash hash, Value&& value, DoneCallbackSimple cb, bool permanent = false) { - put(hash, std::forward<Value>(value), bindDoneCb(cb), permanent); + void put(InfoHash hash, Value&& value, DoneCallback cb={}, time_point created=time_point::max(), bool permanent = false); + void put(InfoHash hash, Value&& value, DoneCallbackSimple cb, time_point created=time_point::max(), bool permanent = false) { + put(hash, std::forward<Value>(value), bindDoneCb(cb), created, permanent); } - void put(const std::string& key, Value&& value, DoneCallbackSimple cb={}, bool permanent = false); + void put(const std::string& key, Value&& value, DoneCallbackSimple cb={}, time_point created=time_point::max(), bool permanent = false); void cancelPut(const InfoHash& h, const Value::Id& id); diff --git a/include/opendht/indexation/pht.h b/include/opendht/indexation/pht.h index 7d1e5e644029e2ddd7f141ca3e8c0bfa4ca43a4f..8af618f7dd6c95e94f4116808dd5c6a58a60fd44 100644 --- a/include/opendht/indexation/pht.h +++ b/include/opendht/indexation/pht.h @@ -40,7 +40,7 @@ struct Prefix { } Prefix getPrefix(ssize_t len) const { - if ((size_t)std::abs(len) > size_) + if ((size_t)std::abs(len) >= content_.size() * 8) throw std::out_of_range("len larger than prefix size."); if (len < 0) len += size_; @@ -55,7 +55,7 @@ struct Prefix { * @throw out_of_range Throw out of range if the bit at 'pos' does not exist */ bool isActiveBit(size_t pos) const { - if ( pos >= size_ ) + if ( pos >= content_.size() * 8 ) throw std::out_of_range("Can't detect active bit at pos, pos larger than prefix size or empty prefix"); return ((this->content_[pos / 8] >> (7 - (pos % 8)) ) & 1) == 1; @@ -69,7 +69,9 @@ struct Prefix { * @return The prefix of this sibling. */ Prefix getSibling() const { - return swapBit(size_ - 1); + if ( not size_ ) + return Prefix(*this); + return swapBit(size_); } InfoHash hash() const { @@ -90,6 +92,13 @@ struct Prefix { return ss.str(); } + /** + * This method count total of bit in common between 2 prefix + * + * @param p1 first prefix to compared + * @param p2 second prefix to compared + * @return Lenght of the larger common prefix between both + */ static inline unsigned commonBits(const Prefix& p1, const Prefix& p2) { unsigned i, j; uint8_t x; @@ -118,17 +127,14 @@ struct Prefix { * This method swap the bit a the position 'bit' and return the new prefix * * @param bit Position of the bit to swap - * * @return The prefix with the bit at position 'bit' swapped - * * @throw out_of_range Throw out of range if bit does not exist */ Prefix swapBit(size_t bit) const { - if ( bit >= content_.size() * 8 ) + if ( bit > content_.size() * 8 ) throw std::out_of_range("bit larger than prefix size."); Prefix copy = *this; - size_t offset_bit = (8 - bit) % 8; copy.content_[bit / 8] ^= (1 << offset_bit); @@ -137,10 +143,32 @@ struct Prefix { size_t size_ {0}; Blob content_ {}; + }; using Value = std::pair<InfoHash, dht::Value::Id>; +struct IndexEntry : public dht::Value::Serializable<IndexEntry> { + static const ValueType TYPE; + + virtual void unpackValue(const dht::Value& v) { + Serializable<IndexEntry>::unpackValue(v); + name = v.user_type; + } + + virtual dht::Value packValue() const { + auto pack = Serializable<IndexEntry>::packValue(); + pack.user_type = name; + return pack; + } + + Blob prefix; + Value value; + std::string name; + MSGPACK_DEFINE_MAP(prefix, value); +}; + + class Pht { static constexpr const char* INVALID_KEY = "Key does not match the PHT key spec."; @@ -158,8 +186,8 @@ public: /* Specifications of the keys. It defines the number, the length and the * serialization order of fields. */ using KeySpec = std::map<std::string, size_t>; - using LookupCallback = std::function<void(std::vector<std::shared_ptr<Value>>& values, Prefix p)>; + typedef void (*LookupCallbackRaw)(std::vector<std::shared_ptr<Value>>* values, Prefix* p, void *user_data); static LookupCallback bindLookupCb(LookupCallbackRaw raw_cb, void* user_data) { @@ -196,11 +224,43 @@ public: } /** - * Adds an entry into the index. + * Wrapper function which call the private one. + * + * @param k : Key to insert [i.e map of string, blob] + * @param v : Value to insert + * @param done_cb : Callbakc which going to be call when all the insert is done */ - void insert(Key k, Value v, DoneCallbackSimple cb = {}); + void insert(Key k, Value v, DoneCallbackSimple done_cb = {}) { + Prefix p = linearize(k); + + auto lo = std::make_shared<int>(0); + auto hi = std::make_shared<int>(p.size_); + + IndexEntry entry; + entry.value = v; + entry.prefix = p.content_; + entry.name = name_; + + Pht::insert(p, entry, lo, hi, clock::now(), true, done_cb); + } private: + + /** + * Insert function which really insert onto the pht + * + * @param kp : Prefix to insert (linearize the the key) + * @param entry : Entry created from the value + * @param lo : Lowest point to start in the prefix + * @param hi : Highest point to end in the prefix + * @param time_p : Timepoint to use for the insertion into the dht (must be < now) + * @param check_split : If this flag is true then the algoritm will not use the merge algorithm + * @param done_cb : Callback to call when the insert is done + */ + + void insert(Prefix kp, IndexEntry entry, std::shared_ptr<int> lo, std::shared_ptr<int> hi, time_point time_p, + bool check_split, DoneCallbackSimple done_cb = {}); + class Cache { public: /** @@ -213,13 +273,15 @@ private: * Lookup into the tree to return the maximum prefix length in the cache tree * * @param p : Prefix that we are looking for + * * @return : The size of the longest prefix known in the cache between 0 and p.size_ */ + int lookup(const Prefix& p); private: static constexpr const size_t MAX_ELEMENT {1024}; - static constexpr const std::chrono::minutes NODE_EXPIRE_TIME {5}; + static constexpr const std::chrono::minutes NODE_EXPIRE_TIME {10}; struct Node { time_point last_reply; /* Made the assocation between leaves and leaves multimap */ @@ -238,13 +300,28 @@ private: std::multimap<time_point, std::shared_ptr<Node>> leaves_; }; + /* Callback used for insert value by using the pht */ + using RealInsertCallback = std::function<void(std::shared_ptr<Prefix> p, IndexEntry entry)>; + using LookupCallbackWrapper = std::function<void(std::vector<std::shared_ptr<IndexEntry>>& values, Prefix p)>; + /** * Performs a step in the lookup operation. Each steps are performed * asynchronously. + * + * @param k : Prefix on which the lookup is performed + * @param lo : lowest bound on the prefix (where to start) + * @param hi : highest bound on the prefix (where to stop) + * @param vals : Shared ptr to a vector of IndexEntry (going to contains all values found) + * @param cb : Callback to use at the end of the lookupStep (call on the value of vals) + * @param done_cb : Callback at the end of the lookupStep + * @param max_common_prefix_len: used in the inexacte lookup match case, indicate the longest common prefix found + * @param start : If start is set then lo and hi will be ignore for the first step, if the step fail lo and hi will be used + * @param all_values : If all value is true then all value met during the lookupstep will be in the vector vals */ void lookupStep(Prefix k, std::shared_ptr<int> lo, std::shared_ptr<int> hi, - std::shared_ptr<std::vector<std::shared_ptr<Value>>> vals, LookupCallback cb, - DoneCallbackSimple done_cb, std::shared_ptr<unsigned> max_common_prefix_len, + std::shared_ptr<std::vector<std::shared_ptr<IndexEntry>>> vals, + LookupCallbackWrapper cb, DoneCallbackSimple done_cb, + std::shared_ptr<unsigned> max_common_prefix_len, int start = -1, bool all_values = false); /** @@ -255,17 +332,53 @@ private: * * @return the prefix of the linearized key. */ - virtual Prefix linearize(Key k) const { - if (not validKey(k)) { throw std::invalid_argument(INVALID_KEY); } + virtual Prefix linearize(Key k) const; + + /** + * Looking where to put the data cause if there is free space on the node + * above then this node will became the real leave. + * + * @param p Share_ptr on the Prefix to check + * @param entry The entry to put at the prefix p + * @param end_cb Callback to use at the end of counting + */ + void getRealPrefix(std::shared_ptr<Prefix> p, IndexEntry entry, RealInsertCallback end_cb ); - Prefix p = Blob {k.begin()->second.begin(), k.begin()->second.end()}; + /** + * Looking where to put the data cause if there is free space on the node + * above then this node will became the real leave. + * + * @param p Share_ptr on the Prefix to check + * @param entry The entry to put at the prefix p + * @param end_cb Callback to use at the end of counting + */ + void checkPhtUpdate(Prefix p, IndexEntry entry, time_point time_p); + + /** + * Search for the split location by comparing 'compared' to all values in vals. + * + * @param compared : Value which going to be compared + * @param vals : The vector of values to compare with comapred + * @return position compared diverge from all others + */ + size_t foundSplitLocation(Prefix compared, std::shared_ptr<std::vector<std::shared_ptr<IndexEntry>>> vals) { + for ( size_t i = 0; i < compared.content_.size() * 8 - 1; i++ ) + for ( auto const& v : *vals) + if ( Prefix(v->prefix).isActiveBit(i) != compared.isActiveBit(i) ) + return i + 1; - auto bit_loc = p.size_ + 1; - for ( auto i = p.content_.size(); i < keySpec_.begin()->second + 1; i++ ) - p.content_.push_back(0); + return compared.content_.size() * 8 - 1; + } - return p.swapBit(bit_loc); - }; + /** + * Put canary from the split point until the last known canary and add the prefix at the good place + * + * @param insert : Prefix to insertm but also prefix which going to check where we need to split + * @param vals : Vector of vals for the comparaison + * @param entry : Entry to put on the pht + * @param end_cb : Callback to apply to the insert prefi (here does the insert) + */ + void split(Prefix insert, std::shared_ptr<std::vector<std::shared_ptr<IndexEntry>>> vals, IndexEntry entry, RealInsertCallback end_cb); /** * Tells if the key is valid according to the key spec. diff --git a/src/dht.cpp b/src/dht.cpp index d9969fe32e4f10f0885bafb2c1ae0f2e6b61118b..982a089adafdb73dab6d5c52b4f8aa3173ef4458 100644 --- a/src/dht.cpp +++ b/src/dht.cpp @@ -841,7 +841,7 @@ Dht::searchStep(std::shared_ptr<Search> sr) continue; } } - if (in) storageStore(sr->id, a.value, a.created); + unsigned i = 0; for (auto& n : sr->nodes) { if (not n.isSynced(now)) @@ -1243,6 +1243,9 @@ Dht::announce(const InfoHash& id, sa_family_t af, std::shared_ptr<Value> value, callback(false, {}); return; } + + storageStore(id, value, created); + auto& srs = af == AF_INET ? searches4 : searches6; auto srp = srs.find(id); auto sr = srp == srs.end() ? search(id, af, nullptr, nullptr) : srp->second; diff --git a/src/dhtrunner.cpp b/src/dhtrunner.cpp index ed24473d877a296661aebbfdb04a3b463d003559..1d5354366d4c2dff54ee1615abadc0eebb51dc08 100644 --- a/src/dhtrunner.cpp +++ b/src/dhtrunner.cpp @@ -484,30 +484,30 @@ DhtRunner::cancelListen(InfoHash h, std::shared_future<size_t> token) } void -DhtRunner::put(InfoHash hash, Value&& value, DoneCallback cb, bool permanent) +DhtRunner::put(InfoHash hash, Value&& value, DoneCallback cb, time_point created, bool permanent) { std::lock_guard<std::mutex> lck(storage_mtx); auto sv = std::make_shared<Value>(std::move(value)); pending_ops.emplace([=](SecureDht& dht) { - dht.put(hash, sv, cb, {}, permanent); + dht.put(hash, sv, cb, created, permanent); }); cv.notify_all(); } void -DhtRunner::put(InfoHash hash, std::shared_ptr<Value> value, DoneCallback cb, bool permanent) +DhtRunner::put(InfoHash hash, std::shared_ptr<Value> value, DoneCallback cb, time_point created, bool permanent) { std::lock_guard<std::mutex> lck(storage_mtx); pending_ops.emplace([=](SecureDht& dht) { - dht.put(hash, value, cb, {}, permanent); + dht.put(hash, value, cb, created, permanent); }); cv.notify_all(); } void -DhtRunner::put(const std::string& key, Value&& value, DoneCallbackSimple cb, bool permanent) +DhtRunner::put(const std::string& key, Value&& value, DoneCallbackSimple cb, time_point created, bool permanent) { - put(InfoHash::get(key), std::forward<Value>(value), cb, permanent); + put(InfoHash::get(key), std::forward<Value>(value), cb, created, permanent); } void diff --git a/src/indexation/pht.cpp b/src/indexation/pht.cpp index 7fa0a042935c2876b543abe1ae08070e0e139232..65f2ea62f7831b028753e01d46a2df02d59a3295 100644 --- a/src/indexation/pht.cpp +++ b/src/indexation/pht.cpp @@ -4,36 +4,16 @@ namespace dht { namespace indexation { -struct IndexEntry : public dht::Value::Serializable<IndexEntry> { - static const ValueType TYPE; - - virtual void unpackValue(const dht::Value& v) { - Serializable<IndexEntry>::unpackValue(v); - name = v.user_type; - } - - virtual dht::Value packValue() const { - auto pack = Serializable<IndexEntry>::packValue(); - pack.user_type = name; - return pack; - } - - Blob prefix; - Value value; - std::string name; - MSGPACK_DEFINE_MAP(prefix, value); -}; - void Pht::Cache::insert(const Prefix& p) { size_t i = 0; auto now = clock::now(); std::shared_ptr<Node> curr_node; - while ((leaves_.size() > 0 && leaves_.begin()->first + NODE_EXPIRE_TIME < now) || leaves_.size() > MAX_ELEMENT) + while ((leaves_.size() > 0 and leaves_.begin()->first + NODE_EXPIRE_TIME < now) or leaves_.size() > MAX_ELEMENT) leaves_.erase(leaves_.begin()); - if (not (curr_node = root_.lock())) { + if (not (curr_node = root_.lock()) ) { /* Root does not exist, need to create one*/ curr_node = std::make_shared<Node>(); root_ = curr_node; @@ -46,7 +26,6 @@ void Pht::Cache::insert(const Prefix& p) { /* According to the bit define which node is the next one */ auto& next = ( p.isActiveBit(i) ) ? curr_node->right_child : curr_node->left_child; - /** * If lock, node exists * else create it @@ -69,11 +48,11 @@ void Pht::Cache::insert(const Prefix& p) { } int Pht::Cache::lookup(const Prefix& p) { - int pos = 0; + int pos = -1; auto now = clock::now(), last_node_time = now; /* Before lookup remove the useless one [i.e. too old] */ - while ( leaves_.size() > 0 && leaves_.begin()->first + NODE_EXPIRE_TIME < now ) { + while ( leaves_.size() > 0 and leaves_.begin()->first + NODE_EXPIRE_TIME < now ) { leaves_.erase(leaves_.begin()); } @@ -81,8 +60,9 @@ int Pht::Cache::lookup(const Prefix& p) { std::shared_ptr<Node> curr_node; while ( auto n = next.lock() ) { + ++pos; /* Safe since pos is equal to 0 until here */ - if ( (unsigned) pos >= p.size_ ) break; + if ( (unsigned) pos >= p.content_.size() * 8) break; curr_node = n; last_node_time = curr_node->last_reply; @@ -90,11 +70,9 @@ int Pht::Cache::lookup(const Prefix& p) { /* Get the Prefix bit by bit, starting from left */ next = ( p.isActiveBit(pos) ) ? curr_node->right_child : curr_node->left_child; - - ++pos; } - if ( pos > 0 ) { + if ( pos >= 0 ) { auto to_erase = leaves_.find(last_node_time); if ( to_erase != leaves_.end() ) leaves_.erase( to_erase ); @@ -102,16 +80,17 @@ int Pht::Cache::lookup(const Prefix& p) { leaves_.emplace( std::move(now), std::move(curr_node) ); } - return --pos; + return pos; } const ValueType IndexEntry::TYPE = ValueType::USER_DATA; constexpr std::chrono::minutes Pht::Cache::NODE_EXPIRE_TIME; void Pht::lookupStep(Prefix p, std::shared_ptr<int> lo, std::shared_ptr<int> hi, - std::shared_ptr<std::vector<std::shared_ptr<Value>>> vals, - LookupCallback cb, DoneCallbackSimple done_cb, - std::shared_ptr<unsigned> max_common_prefix_len, int start, bool all_values) + std::shared_ptr<std::vector<std::shared_ptr<IndexEntry>>> vals, + LookupCallbackWrapper cb, DoneCallbackSimple done_cb, + std::shared_ptr<unsigned> max_common_prefix_len, + int start, bool all_values) { struct node_lookup_result { bool done {false}; @@ -122,6 +101,7 @@ void Pht::lookupStep(Prefix p, std::shared_ptr<int> lo, std::shared_ptr<int> hi, auto mid = (start >= 0) ? (unsigned) start : (*lo + *hi)/2; auto first_res = std::make_shared<node_lookup_result>(); auto second_res = std::make_shared<node_lookup_result>(); + auto on_done = [=](bool ok) { bool is_leaf = first_res->is_pht and not second_res->is_pht; if (not ok) { @@ -130,8 +110,7 @@ void Pht::lookupStep(Prefix p, std::shared_ptr<int> lo, std::shared_ptr<int> hi, } else if (is_leaf or *lo > *hi) { // leaf node - auto to_insert = p.getPrefix(mid); - cache_.insert(to_insert); + Prefix to_insert = p.getPrefix(mid); if (cb) { if (vals->size() == 0 and max_common_prefix_len and mid > 0) { @@ -156,14 +135,16 @@ void Pht::lookupStep(Prefix p, std::shared_ptr<int> lo, std::shared_ptr<int> hi, done_cb(false); } }; + if (*lo <= *hi) { auto pht_filter = [&](const dht::Value& v) { return v.user_type.compare(0, name_.size(), name_) == 0; }; auto on_get = [=](const std::shared_ptr<dht::Value>& value, std::shared_ptr<node_lookup_result> res) { - if (value->user_type == canary_) + if (value->user_type == canary_) { res->is_pht = true; + } else { IndexEntry entry; entry.unpackValue(*value); @@ -171,20 +152,20 @@ void Pht::lookupStep(Prefix p, std::shared_ptr<int> lo, std::shared_ptr<int> hi, if (max_common_prefix_len) { /* inexact match case */ auto common_bits = Prefix::commonBits(p, entry.prefix); if (vals->empty()) { - vals->emplace_back(std::make_shared<Value>(entry.value)); + vals->emplace_back(std::make_shared<IndexEntry>(entry)); *max_common_prefix_len = common_bits; } else { if (common_bits == *max_common_prefix_len) /* this is the max so far */ - vals->emplace_back(std::make_shared<Value>(entry.value)); + vals->emplace_back(std::make_shared<IndexEntry>(entry)); else if (common_bits > *max_common_prefix_len) { /* new max found! */ vals->clear(); - vals->emplace_back(std::make_shared<Value>(entry.value)); + vals->emplace_back(std::make_shared<IndexEntry>(entry)); *max_common_prefix_len = common_bits; } } } else if (all_values or entry.prefix == p.content_) /* exact match case */ - vals->emplace_back(std::make_shared<Value>(entry.value)); + vals->emplace_back(std::make_shared<IndexEntry>(entry)); } return true; }; @@ -205,13 +186,13 @@ void Pht::lookupStep(Prefix p, std::shared_ptr<int> lo, std::shared_ptr<int> hi, lookupStep(p, lo, hi, vals, cb, done_cb, max_common_prefix_len, -1, all_values); } else { first_res->done = true; - if (second_res->done) + if (second_res->done or mid >= p.size_ - 1) on_done(true); } } }, pht_filter); - if (mid < p.size_) + if (mid < p.size_ - 1) dht_->get(p.getPrefix(mid+1).hash(), std::bind(on_get, std::placeholders::_1, second_res), [=](bool ok) { @@ -234,24 +215,36 @@ void Pht::lookupStep(Prefix p, std::shared_ptr<int> lo, std::shared_ptr<int> hi, } void Pht::lookup(Key k, Pht::LookupCallback cb, DoneCallbackSimple done_cb, bool exact_match) { - auto values = std::make_shared<std::vector<std::shared_ptr<Value>>>(); auto prefix = linearize(k); + auto values = std::make_shared<std::vector<std::shared_ptr<IndexEntry>>>(); + auto lo = std::make_shared<int>(0); auto hi = std::make_shared<int>(prefix.size_); std::shared_ptr<unsigned> max_common_prefix_len = not exact_match ? std::make_shared<unsigned>(0) : nullptr; - lookupStep(prefix, lo, hi, values, cb, done_cb, max_common_prefix_len, cache_.lookup(prefix)); + lookupStep(prefix, lo, hi, values, + [=](std::vector<std::shared_ptr<IndexEntry>>& entries, Prefix p) { + std::vector<std::shared_ptr<Value>> vals(entries.size()); + + std::transform(entries.begin(), entries.end(), vals.begin(), + [](const std::shared_ptr<IndexEntry>& ie) { + return std::make_shared<Value>(ie->value); + }); + + cb(vals, p); + }, done_cb, max_common_prefix_len, cache_.lookup(prefix)); } void Pht::updateCanary(Prefix p) { // TODO: change this... copy value dht::Value canary_value; canary_value.user_type = canary_; + dht_->put(p.hash(), std::move(canary_value), [=](bool){ static std::bernoulli_distribution d(0.5); crypto::random_device rd; - if (p.size_ && d(rd)) + if (p.size_ and d(rd)) updateCanary(p.getPrefix(-1)); } ); @@ -263,16 +256,16 @@ void Pht::updateCanary(Prefix p) { } } -void Pht::insert(Key k, Value v, DoneCallbackSimple done_cb) { - Prefix kp = linearize(k); +void Pht::insert(Prefix kp, IndexEntry entry, std::shared_ptr<int> lo, std::shared_ptr<int> hi, time_point time_p, + bool check_split, DoneCallbackSimple done_cb) { - auto lo = std::make_shared<int>(0); - auto hi = std::make_shared<int>(kp.size_); - auto vals = std::make_shared<std::vector<std::shared_ptr<Value>>>(); + if (time_p + ValueType::USER_DATA.expiration < clock::now()) return; + + auto vals = std::make_shared<std::vector<std::shared_ptr<IndexEntry>>>(); auto final_prefix = std::make_shared<Prefix>(); lookupStep(kp, lo, hi, vals, - [=](std::vector<std::shared_ptr<Value>>&, Prefix p) { + [=](std::vector<std::shared_ptr<IndexEntry>>&, Prefix p) { *final_prefix = Prefix(p); }, [=](bool ok){ @@ -280,20 +273,130 @@ void Pht::insert(Key k, Value v, DoneCallbackSimple done_cb) { if (done_cb) done_cb(false); } else { - if (vals->size() >= MAX_NODE_ENTRY_COUNT) - *final_prefix = kp.getPrefix(final_prefix->size_+1); - - IndexEntry entry; - entry.value = v; - entry.prefix = kp.content_; - entry.name = name_; - updateCanary(*final_prefix); - dht_->put(final_prefix->hash(), std::move(entry), done_cb); + RealInsertCallback real_insert = [=]( std::shared_ptr<Prefix> p, IndexEntry entry) { + updateCanary(*p); + checkPhtUpdate(*p, entry, time_p); + cache_.insert(*p); + dht_->put(p->hash(), std::move(entry), done_cb, time_p); + }; + + if ( not check_split or final_prefix->size_ == kp.size_ ) { + real_insert(final_prefix, std::move(entry)); + } else { + if ( vals->size() < MAX_NODE_ENTRY_COUNT ) + getRealPrefix(final_prefix, std::move(entry), real_insert); + else + split(*final_prefix, vals, entry, real_insert); + } } }, nullptr, cache_.lookup(kp), true ); } +Prefix Pht::linearize(Key k) const { + if (not validKey(k)) { throw std::invalid_argument(INVALID_KEY); } + + Prefix p = Blob {k.begin()->second.begin(), k.begin()->second.end()}; + + auto bit_loc = p.size_ + 1; + for ( auto i = p.content_.size(); i < keySpec_.begin()->second + 1; i++ ) + p.content_.push_back(0); + + return p.swapBit(bit_loc); +}; + +void Pht::getRealPrefix(std::shared_ptr<Prefix> p, IndexEntry entry, RealInsertCallback end_cb ) { + if ( p->size_ == 0 ) { + end_cb(p, std::move(entry)); + return; + } + + auto total = std::make_shared<unsigned int>(0); /* Will contains the total number of data on 3 nodes */ + auto ended = std::make_shared<unsigned int>(0); /* Just indicate how many have end */ + + auto parent = std::make_shared<Prefix>(p->getPrefix(-1)); + auto sibling = std::make_shared<Prefix>(p->getSibling()); + + auto pht_filter = [&](const dht::Value& v) { + return v.user_type.compare(0, name_.size(), name_) == 0; + }; + + /* Lambda will count total number of data node */ + auto count = [=]( const std::shared_ptr<dht::Value> value ) { + if ( value->user_type != canary_) + (*total)++; + + return true; + }; + + auto on_done = [=] ( bool ) { + (*ended)++; + /* Only the last one do the CallBack*/ + if ( *ended == 3 ) { + if ( *total < MAX_NODE_ENTRY_COUNT ) + end_cb(parent, std::move(entry)); + else + end_cb(p, std::move(entry)); + } + }; + + dht_->get(parent->hash(), + count, + on_done, + pht_filter + ); + + dht_->get(p->hash(), + count, + on_done, + pht_filter + ); + + dht_->get(sibling->hash(), + count, + on_done, + pht_filter + ); +} + +void Pht::checkPhtUpdate(Prefix p, IndexEntry entry, time_point time_p) { + + Prefix full = entry.prefix; + if ( p.content_.size() * 8 >= full.content_.size() * 8 ) return; + + auto next_prefix = full.getPrefix( p.size_ + 1 ); + + dht_->listen(next_prefix.hash(), + [=](const std::shared_ptr<dht::Value> &value) { + if (value->user_type == canary_) { + insert(full, entry, std::make_shared<int>(0), std::make_shared<int>(full.size_), time_p, false, nullptr); + + /* Cancel listen since we found where we need to update*/ + return false; + } + + return true; + }, + [=](const dht::Value& v) { + /* Filter value v thats start with the same name as ours */ + return v.user_type.compare(0, name_.size(), name_) == 0; + } + ); +} + +void Pht::split(Prefix insert, std::shared_ptr<std::vector<std::shared_ptr<IndexEntry>>> vals, IndexEntry entry, RealInsertCallback end_cb ) { + auto full = Prefix(entry.prefix); + + auto loc = foundSplitLocation(full, vals); + auto prefix_to_insert = std::make_shared<Prefix>(full.getPrefix(loc)); + + for(;loc != insert.size_ - 1; loc--) + updateCanary(full.getPrefix(loc)); + + end_cb(prefix_to_insert, entry); +} + } /* indexation */ + } /* dht */