diff --git a/include/opendht/indexation/pht.h b/include/opendht/indexation/pht.h index 2e687cbac6daa6181fde53f7ae41c8aee101fd13..c4013c5a1a612fc1caf65207fca62e9e436a9907 100644 --- a/include/opendht/indexation/pht.h +++ b/include/opendht/indexation/pht.h @@ -47,6 +47,16 @@ struct Prefix { return Prefix(*this, len); } + /** + * Method for getting the state of the bit at the position pos. + * @param pos : Pos of the needed bit + * @return : true if the bit is at 1 + * false otherwise + */ + bool isActivBit(size_t pos) const { + return ((this->content_[pos / 8] >> (7 - (pos % 8)) ) & 1) == 1; + } + Prefix getFullSize() { return Prefix(*this, content_.size()*8); } /** @@ -136,7 +146,7 @@ class Pht { public: /* This is the maximum number of entries per node. This parameter is - * critical and influences the traffic alot during a lookup operation. + * critical and influences the traffic a lot during a lookup operation. */ static constexpr const size_t MAX_NODE_ENTRY_COUNT {16}; @@ -166,6 +176,125 @@ public: void insert(Key k, Value v, Dht::DoneCallbackSimple cb = {}); private: + class Cache { + public: + /** + * Insert all needed node into the tree according to a prefix + * @param p : Prefix that we need to insert + */ + void insert(const Prefix& p) { + size_t i = 0; + auto now = clock::now(); + + std::shared_ptr<Node> curr_node; + + while ( ( leaves_.size() > 0 && leaves_.begin()->first + NODE_EXPIRE_TIME < now ) + || leaves_.size() > MAX_ELEMENT ) { + + leaves_.erase(leaves_.begin()); + } + + if ( !(curr_node = root_.lock()) ) { + + /* Root does not exist, need to create one*/ + curr_node = std::make_shared<Node>(); + root_ = curr_node; + } + + curr_node->last_reply = now; + + /* Iterate through all bit of the Blob */ + for ( i = 0; i < p.size_; i++ ) { + + /* According to the bit define which node is the next one */ + auto& next = ( p.isActivBit(i) ) ? curr_node->right_child : curr_node->left_child; + + /** + * If lock, node exists + * else create it + */ + if (auto n = next.lock()) { + curr_node = std::move(n); + } else { + /* Create the next node if doesn't exist*/ + auto tmp_curr_node = std::make_shared<Node>(); + tmp_curr_node->parent = curr_node; + next = tmp_curr_node; + curr_node = std::move(tmp_curr_node); + } + + curr_node->last_reply = now; + } + + /* Insert the leaf (curr_node) into the multimap */ + leaves_.emplace(std::move(now), std::move(curr_node) ); + } + + /** + * Lookup into the tree to return the maximum prefix length in the cache tree + * + * @param p : Prefix that we are looking for + * @return : The size of the longest prefix known in the cache between 0 and p.size_ + */ + int lookup(const Prefix& p) { + int pos = 0; + auto now = clock::now(), last_node_time = now; + + /* Before lookup remove the useless one [i.e. too old] */ + while ( leaves_.size() > 0 && leaves_.begin()->first + NODE_EXPIRE_TIME < now ) { + leaves_.erase(leaves_.begin()); + } + + auto next = root_; + std::shared_ptr<Node> curr_node; + + while ( auto n = next.lock() ) { + /* Safe since pos is equal to 0 until here */ + if ( (unsigned) pos >= p.size_ ) break; + + curr_node = n; + last_node_time = curr_node->last_reply; + curr_node->last_reply = now; + + /* Get the Prefix bit by bit, starting from left */ + next = ( p.isActivBit(pos) ) ? curr_node->right_child : curr_node->left_child; + + ++pos; + } + + if ( pos > 0 ) { + auto to_erase = leaves_.find(last_node_time); + if ( to_erase != leaves_.end() ) + leaves_.erase( to_erase ); + + leaves_.emplace( std::move(now), std::move(curr_node) ); + } + + return --pos; + } + + + private: + static constexpr const size_t MAX_ELEMENT {1024}; + static constexpr const std::chrono::minutes NODE_EXPIRE_TIME {5}; + + struct Node { + time_point last_reply; /* Made the assocation between leaves and leaves multimap */ + std::shared_ptr<Node> parent; /* Share_ptr to the parent, it allow the self destruction of tree */ + std::weak_ptr<Node> left_child; /* Left child, for bit equal to 1 */ + std::weak_ptr<Node> right_child; /* Right child, for bit equal to 0 */ + }; + + std::weak_ptr<Node> root_; /* Root of the tree */ + + /** + * This mutlimap contains all prefix insert in the tree in time order + * We could then delete the last one if there is too much node + * The tree will self destroy is branch ( thanks to share_ptr ) + */ + std::multimap<time_point, std::shared_ptr<Node>> leaves_; + }; + /** * Linearizes the key into a unidimensional key. A pht only takes * unidimensional key. @@ -184,9 +313,9 @@ private: * asynchronously. */ void lookupStep(Prefix k, std::shared_ptr<int> lo, std::shared_ptr<int> hi, - std::shared_ptr<std::vector<std::shared_ptr<Value>>> vals, - LookupCallback cb, Dht::DoneCallbackSimple done_cb, - std::shared_ptr<unsigned> max_common_prefix_len, bool all_values = false); + std::shared_ptr<std::vector<std::shared_ptr<Value>>> vals, LookupCallback cb, + Dht::DoneCallbackSimple done_cb, std::shared_ptr<unsigned> max_common_prefix_len, + int start = -1, bool all_values = false); /** * Updates the canary token on the node responsible for the specified @@ -196,7 +325,7 @@ private: const std::string name_; const std::string canary_; - + Cache cache_; std::shared_ptr<DhtRunner> dht_; }; diff --git a/python/tools/benchmark.py b/python/tools/benchmark.py index f6fcc52251734f12bb028697da5cf8e58205ce5b..2352481757154a3511c2873cfce6d2978d69eaec 100755 --- a/python/tools/benchmark.py +++ b/python/tools/benchmark.py @@ -160,6 +160,7 @@ if __name__ == '__main__': featureArgs.add_argument('--pht', action='store_true', default=False, help='Launches PHT benchmark test. '\ 'Available args for "-t" are: insert. '\ + 'Timer available by adding "timer" to "-o" args'\ 'Use "-m" option for fixing number of keys to create during the test.') featureArgs.add_argument('--data-persistence', action='store_true', default=0, help='Launches data persistence benchmark test. '\ diff --git a/python/tools/dht/tests.py b/python/tools/dht/tests.py index 2c65563e92da824c44a11aed7188d3679f6870b7..67cdd6a4ea67c8ee6687745159d0a27ee7e28e14 100644 --- a/python/tools/dht/tests.py +++ b/python/tools/dht/tests.py @@ -33,6 +33,24 @@ Mbit_format = FuncFormatter(lambda x, pos: '%1.1f' % (x*1024**-2) + 'Mb') def random_hash(): return InfoHash(''.join(random.SystemRandom().choice(string.hexdigits) for _ in range(40)).encode()) +def timer(f, *args): + """ + Start a timer which count time taken for execute function f + + @param f : Function to time + @type f : function + + @param args : Arguments of the function f + @type args : list + + @rtype : timer + @return : Time taken by the function f + """ + start = time.time() + f(*args) + + return time.time() - start + def reset_before_test(featureTestMethod): """ This is a decorator for all test methods needing reset(). @@ -160,7 +178,6 @@ class FeatureTest(object): def run(self): raise NotImplementedError('This method must be implemented.') - ################################## # PHT # ################################## @@ -189,6 +206,7 @@ class PhtTest(FeatureTest): """ super(PhtTest, self).__init__(test, workbench) self._num_keys = opts['num_keys'] if 'num_keys' in opts else 32 + self._timer = True if 'timer' in opts else False def _reset(self): super(PhtTest, self)._reset() @@ -278,14 +296,18 @@ class PhtTest(FeatureTest): for key in keys: PhtTest.key = key with FeatureTest.lock: - pht.insert(key, IndexValue(random_hash()), PhtTest.insertDoneCb) + time_taken = timer(pht.insert, key, IndexValue(random_hash()), PhtTest.insertDoneCb) + if self._timer: + DhtNetwork.log('This insert step took : ', time_taken, 'second') FeatureTest.lock.wait() # Recover entries now that the trie is complete. for key in keys: PhtTest.key = key with FeatureTest.lock: - pht.lookup(key, PhtTest.lookupCb, PhtTest.lookupDoneCb) + time_taken = timer(pht.lookup, key, PhtTest.lookupCb, PhtTest.lookupDoneCb) + if self._timer: + DhtNetwork.log('This lookup step took : ', time_taken, 'second') FeatureTest.lock.wait() all_entries[PhtTest.prefix] = [e.__str__() @@ -296,7 +318,6 @@ class PhtTest(FeatureTest): DhtNetwork.log(all_entries[p]) PhtTest.drawTrie(all_entries) - ################################## # DHT # ################################## @@ -372,7 +393,6 @@ class DhtFeatureTest(FeatureTest): for n in DhtFeatureTest.foreignNodes: nodes.add(n) - class PersistenceTest(DhtFeatureTest): """ This tests persistence of data on the network. @@ -676,7 +696,7 @@ class PersistenceTest(DhtFeatureTest): hashes = [] - # Generating considerable amount of values of size 1KB. + # Generating considerable amount of values of size 1KB. VALUE_SIZE = 1024 NUM_VALUES = self._num_values if self._num_values else 50 values = [Value(''.join(random.choice(string.hexdigits) for _ in range(VALUE_SIZE)).encode()) for _ in range(NUM_VALUES)] diff --git a/src/indexation/pht.cpp b/src/indexation/pht.cpp index 80aad4b8c9ddaecf5d34702a6220d3af979ff0e3..2e0b467319f3393a1e25f9173cc2ba93a8104856 100644 --- a/src/indexation/pht.cpp +++ b/src/indexation/pht.cpp @@ -5,18 +5,20 @@ namespace dht { namespace indexation { const ValueType IndexEntry::TYPE = ValueType::USER_DATA; +constexpr std::chrono::minutes Pht::Cache::NODE_EXPIRE_TIME; void Pht::lookupStep(Prefix p, std::shared_ptr<int> lo, std::shared_ptr<int> hi, std::shared_ptr<std::vector<std::shared_ptr<Value>>> vals, LookupCallback cb, Dht::DoneCallbackSimple done_cb, - std::shared_ptr<unsigned> max_common_prefix_len, bool all_values) + std::shared_ptr<unsigned> max_common_prefix_len, int start, bool all_values) { struct node_lookup_result { bool done {false}; bool is_pht {false}; }; - auto mid = (*lo + *hi)/2; + /* start could be under 0 but after the compare it to 0 it always will be unsigned, so we can cast it*/ + auto mid = (start >= 0) ? (unsigned) start : (*lo + *hi)/2; auto first_res = std::make_shared<node_lookup_result>(); auto second_res = std::make_shared<node_lookup_result>(); auto on_done = [=](bool ok) { @@ -27,21 +29,26 @@ void Pht::lookupStep(Prefix p, std::shared_ptr<int> lo, std::shared_ptr<int> hi, } else if (is_leaf or *lo > *hi) { // leaf node + auto to_insert = p.getPrefix(mid); + cache_.insert(to_insert); + if (cb) { if (vals->size() == 0 and max_common_prefix_len and mid > 0) { auto p_ = (p.getPrefix(mid)).getSibling().getFullSize(); *lo = mid; *hi = p_.size_; - lookupStep(p_, lo, hi, vals, cb, done_cb, max_common_prefix_len, all_values); + lookupStep(p_, lo, hi, vals, cb, done_cb, max_common_prefix_len, -1, all_values); } - cb(*vals, p.getPrefix(mid)); + + cb(*vals, to_insert); } + if (done_cb) done_cb(true); } else if (first_res->is_pht) { // internal node *lo = mid+1; - lookupStep(p, lo, hi, vals, cb, done_cb, max_common_prefix_len, all_values); + lookupStep(p, lo, hi, vals, cb, done_cb, max_common_prefix_len, -1, all_values); } else { // first get failed before second. if (done_cb) @@ -83,6 +90,7 @@ void Pht::lookupStep(Prefix p, std::shared_ptr<int> lo, std::shared_ptr<int> hi, } return true; }; + dht_->get(p.getPrefix(mid).hash(), std::bind(on_get, std::placeholders::_1, first_res), [=](bool ok) { @@ -96,7 +104,7 @@ void Pht::lookupStep(Prefix p, std::shared_ptr<int> lo, std::shared_ptr<int> hi, if (not first_res->is_pht) { // Not a PHT node. *hi = mid-1; - lookupStep(p, lo, hi, vals, cb, done_cb, max_common_prefix_len, all_values); + lookupStep(p, lo, hi, vals, cb, done_cb, max_common_prefix_len, -1, all_values); } else { first_res->done = true; if (second_res->done) @@ -104,6 +112,7 @@ void Pht::lookupStep(Prefix p, std::shared_ptr<int> lo, std::shared_ptr<int> hi, } } }, pht_filter); + if (mid < p.size_) dht_->get(p.getPrefix(mid+1).hash(), std::bind(on_get, std::placeholders::_1, second_res), @@ -132,7 +141,8 @@ void Pht::lookup(Key k, Pht::LookupCallback cb, Dht::DoneCallbackSimple done_cb, auto lo = std::make_shared<int>(0); auto hi = std::make_shared<int>(prefix.size_); std::shared_ptr<unsigned> max_common_prefix_len = not exact_match ? std::make_shared<unsigned>(0) : nullptr; - lookupStep(prefix, lo, hi, values, cb, done_cb, max_common_prefix_len); + + lookupStep(prefix, lo, hi, values, cb, done_cb, max_common_prefix_len, cache_.lookup(prefix)); } void Pht::updateCanary(Prefix p) { @@ -140,7 +150,7 @@ void Pht::updateCanary(Prefix p) { dht::Value canary_value; canary_value.user_type = canary_; dht_->put(p.hash(), std::move(canary_value), - [=](bool ok){ + [=](bool){ static std::bernoulli_distribution d(0.5); crypto::random_device rd; if (p.size_ && d(rd)) @@ -164,7 +174,7 @@ void Pht::insert(Key k, Value v, Dht::DoneCallbackSimple done_cb) { auto final_prefix = std::make_shared<Prefix>(); lookupStep(kp, lo, hi, vals, - [=](std::vector<std::shared_ptr<Value>>& values, Prefix p) { + [=](std::vector<std::shared_ptr<Value>>&, Prefix p) { *final_prefix = Prefix(p); }, [=](bool ok){ @@ -183,7 +193,7 @@ void Pht::insert(Key k, Value v, Dht::DoneCallbackSimple done_cb) { updateCanary(*final_prefix); dht_->put(final_prefix->hash(), std::move(entry), done_cb); } - }, nullptr, true + }, nullptr, cache_.lookup(kp), true ); } diff --git a/tools/dhtnode.cpp b/tools/dhtnode.cpp index 0410397f1a60c64f1413cc4e32fef78f28565201..da075f503fd224cc87b9d5fbf770dfaf692f6550 100644 --- a/tools/dhtnode.cpp +++ b/tools/dhtnode.cpp @@ -259,10 +259,14 @@ void cmd_loop(std::shared_ptr<DhtRunner>& dht, std::map<std::string, dht::indexa } std::cout << "Pht::lookup: done." << std::endl; }, - [](bool ok) { + [start](bool ok) { if (not ok) { std::cout << "Pht::lookup: dht Get failed." << std::endl; } + + auto end = std::chrono::high_resolution_clock::now(); + std::cout << "Pht::lookup: took " << print_dt(end-start) << "s)" << std::endl; + }, exact_match.size() != 0 and exact_match == "false" ? false : true ); }