diff --git a/include/opendht/indexation/pht.h b/include/opendht/indexation/pht.h index c2b2e5c29a4db16e0d7db4b2ef8ae65ad25014a6..caaa7e5b505538e7716afc8ff43b1ff3b185021a 100644 --- a/include/opendht/indexation/pht.h +++ b/include/opendht/indexation/pht.h @@ -52,8 +52,12 @@ struct Prefix { * @param pos : Pos of the needed bit * @return : true if the bit is at 1 * false otherwise + * @throw out_of_range Throw out of range if the bit at 'pos' does not exist */ - bool isActivBit(size_t pos) const { + bool isActiveBit(size_t pos) const { + if ( pos >= size_ ) + throw std::out_of_range("Can't detect active bit at pos, pos larger than prefix size or empty prefix"); + return ((this->content_[pos / 8] >> (7 - (pos % 8)) ) & 1) == 1; } @@ -65,12 +69,7 @@ struct Prefix { * @return The prefix of this sibling. */ Prefix getSibling() const { - Prefix copy = *this; - if (size_) { - size_t last_bit = (8 - size_) % 8; - copy.content_.back() ^= (1 << last_bit); - } - return copy; + return swapBit(size_ - 1); } InfoHash hash() const { @@ -115,33 +114,37 @@ struct Prefix { return 8 * i + j; } - size_t size_ {0}; - Blob content_ {}; -}; - -using Value = std::pair<InfoHash, dht::Value::Id>; + /** + * This method swap the bit a the position 'bit' and return the new prefix + * + * @param bit Position of the bit to swap + * + * @return The prefix with the bit at position 'bit' swapped + * + * @throw out_of_range Throw out of range if bit does not exist + */ + Prefix swapBit(size_t bit) const { + if ( bit >= size_ ) + throw std::out_of_range("bit larger than prefix size."); -struct IndexEntry : public dht::Value::Serializable<IndexEntry> { - static const ValueType TYPE; + Prefix copy = *this; - virtual void unpackValue(const dht::Value& v) { - Serializable<IndexEntry>::unpackValue(v); - name = v.user_type; - } + size_t offset_bit = (8 - bit) % 8; + copy.content_[bit / 8] ^= (1 << offset_bit); - virtual dht::Value packValue() const { - auto pack = Serializable<IndexEntry>::packValue(); - pack.user_type = name; - return pack; + return copy; } - Blob prefix; - Value value; - std::string name; - MSGPACK_DEFINE_MAP(prefix, value); + size_t size_ {0}; + Blob content_ {}; }; +using Value = std::pair<InfoHash, dht::Value::Id>; + class Pht { + static constexpr const char* INVALID_KEY = "Key does not match the PHT key spec."; + + /* Prefixes the user_type for all dht values put on the DHT */ static constexpr const char* INDEX_PREFIX = "index.pht."; public: @@ -150,7 +153,11 @@ public: */ static constexpr const size_t MAX_NODE_ENTRY_COUNT {16}; - using Key = std::map<std::string, Prefix>; + /* A key for a an index entry */ + using Key = std::map<std::string, Blob>; + /* Specifications of the keys. It defines the number, the length and the + * serialization order of fields. */ + using KeySpec = std::map<std::string, size_t>; using LookupCallback = std::function<void(std::vector<std::shared_ptr<Value>>& values, Prefix p)>; typedef void (*LookupCallbackRaw)(std::vector<std::shared_ptr<Value>>* values, Prefix* p, void *user_data); @@ -161,15 +168,33 @@ public: raw_cb((std::vector<std::shared_ptr<Value>>*) &values, (Prefix*) &p, user_data); }; } + using LookupCallbackSimple = std::function<void(std::vector<std::shared_ptr<Value>>& values)>; + typedef void (*LookupCallbackSimpleRaw)(std::vector<std::shared_ptr<Value>>* values, void *user_data); + static LookupCallbackSimple + bindLookupCbSimple(LookupCallbackSimpleRaw raw_cb, void* user_data) { + if (not raw_cb) return {}; + return [=](std::vector<std::shared_ptr<Value>>& values) { + raw_cb((std::vector<std::shared_ptr<Value>>*) &values, user_data); + }; + } - Pht(std::string name, std::shared_ptr<DhtRunner> dht) - : name_(INDEX_PREFIX + name), canary_(name_ + ".canary"), dht_(dht) { } + Pht(std::string name, KeySpec k_spec, std::shared_ptr<DhtRunner> dht) + : name_(INDEX_PREFIX + name), canary_(name_ + ".canary"), keySpec_(k_spec), dht_(dht) + { + if (k_spec.size() != 1) + throw std::invalid_argument("PHT only supports unidimensional data."); + } virtual ~Pht () { } /** * Lookup a key for a value. */ - void lookup(Key k, LookupCallback cb = {}, DoneCallbackSimple doneCb = {}, bool exact_match = true); + void lookup(Key k, LookupCallback cb = {}, DoneCallbackSimple done_cb = {}, bool exact_match = true); + void lookup(Key k, LookupCallbackSimple cb = {}, DoneCallbackSimple done_cb = {}, bool exact_match = true) + { + lookup(k, [=](std::vector<std::shared_ptr<Value>>& values, Prefix) { cb(values); }, done_cb, exact_match); + } + /** * Adds an entry into the index. */ @@ -182,53 +207,7 @@ private: * Insert all needed node into the tree according to a prefix * @param p : Prefix that we need to insert */ - void insert(const Prefix& p) { - size_t i = 0; - auto now = clock::now(); - - std::shared_ptr<Node> curr_node; - - while ( ( leaves_.size() > 0 && leaves_.begin()->first + NODE_EXPIRE_TIME < now ) - || leaves_.size() > MAX_ELEMENT ) { - - leaves_.erase(leaves_.begin()); - } - - if ( !(curr_node = root_.lock()) ) { - - /* Root does not exist, need to create one*/ - curr_node = std::make_shared<Node>(); - root_ = curr_node; - } - - curr_node->last_reply = now; - - /* Iterate through all bit of the Blob */ - for ( i = 0; i < p.size_; i++ ) { - - /* According to the bit define which node is the next one */ - auto& next = ( p.isActivBit(i) ) ? curr_node->right_child : curr_node->left_child; - - /** - * If lock, node exists - * else create it - */ - if (auto n = next.lock()) { - curr_node = std::move(n); - } else { - /* Create the next node if doesn't exist*/ - auto tmp_curr_node = std::make_shared<Node>(); - tmp_curr_node->parent = curr_node; - next = tmp_curr_node; - curr_node = std::move(tmp_curr_node); - } - - curr_node->last_reply = now; - } - - /* Insert the leaf (curr_node) into the multimap */ - leaves_.emplace(std::move(now), std::move(curr_node) ); - } + void insert(const Prefix& p); /** * Lookup into the tree to return the maximum prefix length in the cache tree @@ -236,43 +215,7 @@ private: * @param p : Prefix that we are looking for * @return : The size of the longest prefix known in the cache between 0 and p.size_ */ - int lookup(const Prefix& p) { - int pos = 0; - auto now = clock::now(), last_node_time = now; - - /* Before lookup remove the useless one [i.e. too old] */ - while ( leaves_.size() > 0 && leaves_.begin()->first + NODE_EXPIRE_TIME < now ) { - leaves_.erase(leaves_.begin()); - } - - auto next = root_; - std::shared_ptr<Node> curr_node; - - while ( auto n = next.lock() ) { - /* Safe since pos is equal to 0 until here */ - if ( (unsigned) pos >= p.size_ ) break; - - curr_node = n; - last_node_time = curr_node->last_reply; - curr_node->last_reply = now; - - /* Get the Prefix bit by bit, starting from left */ - next = ( p.isActivBit(pos) ) ? curr_node->right_child : curr_node->left_child; - - ++pos; - } - - if ( pos > 0 ) { - auto to_erase = leaves_.find(last_node_time); - if ( to_erase != leaves_.end() ) - leaves_.erase( to_erase ); - - leaves_.emplace( std::move(now), std::move(curr_node) ); - } - - return --pos; - } - + int lookup(const Prefix& p); private: static constexpr const size_t MAX_ELEMENT {1024}; @@ -295,27 +238,48 @@ private: std::multimap<time_point, std::shared_ptr<Node>> leaves_; }; + /** + * Performs a step in the lookup operation. Each steps are performed + * asynchronously. + */ + void lookupStep(Prefix k, std::shared_ptr<int> lo, std::shared_ptr<int> hi, + std::shared_ptr<std::vector<std::shared_ptr<Value>>> vals, LookupCallback cb, + DoneCallbackSimple done_cb, std::shared_ptr<unsigned> max_common_prefix_len, + int start = -1, bool all_values = false); + /** * Linearizes the key into a unidimensional key. A pht only takes * unidimensional key. * * @param Key The initial key. * - * @return return The linearized key. + * @return the prefix of the linearized key. */ - static Prefix linearize(Key k) { - if (k.size() != 1) { throw std::invalid_argument("PHT only supports unidimensional data."); } - return k.begin()->second; + virtual Prefix linearize(Key k) const { + if (not validKey(k)) { throw std::invalid_argument(INVALID_KEY); } + + Prefix p = Blob {k.begin()->second.begin(), k.begin()->second.end()}; + + auto bit_loc = p.size_ + 1; + for ( auto i = p.content_.size(); i <= keySpec_.begin()->second; i++ ) { + p.content_.push_back(0); + p.size_ += 8; + } + + return p.swapBit(bit_loc); }; /** - * Performs a step in the lookup operation. Each steps are performed - * asynchronously. + * Tells if the key is valid according to the key spec. */ - void lookupStep(Prefix k, std::shared_ptr<int> lo, std::shared_ptr<int> hi, - std::shared_ptr<std::vector<std::shared_ptr<Value>>> vals, LookupCallback cb, - DoneCallbackSimple done_cb, std::shared_ptr<unsigned> max_common_prefix_len, - int start = -1, bool all_values = false); + bool validKey(const Key& k) const { + return k.size() == keySpec_.size() and + std::equal(k.begin(), k.end(), keySpec_.begin(), + [&](const Key::value_type& key, const KeySpec::value_type& key_spec) { + return key.first == key_spec.first and key.second.size() <= key_spec.second; + } + ); + } /** * Updates the canary token on the node responsible for the specified @@ -325,6 +289,7 @@ private: const std::string name_; const std::string canary_; + const KeySpec keySpec_; Cache cache_; std::shared_ptr<DhtRunner> dht_; }; diff --git a/python/opendht.pyx b/python/opendht.pyx index a2f408b2a250b64c85da20de56b3c74b2bf9f853..c1f493c154ba2eca59a5a217cc0f6dfdc410d6fa 100644 --- a/python/opendht.pyx +++ b/python/opendht.pyx @@ -406,8 +406,11 @@ cdef class IndexValue(object): cdef class Pht(object): cdef cpp.Pht* thisptr - def __cinit__(self, bytes name, DhtRunner dht): - self.thisptr = new cpp.Pht(name, dht.thisptr) + def __cinit__(self, bytes name, key_spec, DhtRunner dht): + cdef cpp.IndexKeySpec cpp_key_spec + for kk, size in key_spec.items(): + cpp_key_spec[bytes(kk, 'utf-8')] = size + self.thisptr = new cpp.Pht(name, cpp_key_spec, dht.thisptr) property MAX_NODE_ENTRY_COUNT: def __get__(self): return cpp.PHT_MAX_NODE_ENTRY_COUNT @@ -424,7 +427,7 @@ cdef class Pht(object): ref.Py_INCREF(cb_obj) cdef cpp.IndexKey cppk for kk, v in key.items(): - cppk[bytes(kk, 'utf-8')] = cpp.Prefix(bytes(v)) + cppk[bytes(kk, 'utf-8')] = bytes(v) self.thisptr.lookup( cppk, cpp.Pht.bindLookupCb(lookup_callback, <void*>cb_obj), @@ -441,7 +444,7 @@ cdef class Pht(object): ref.Py_INCREF(cb_obj) cdef cpp.IndexKey cppk for kk, v in key.items(): - cppk[bytes(kk, 'utf-8')] = cpp.Prefix(bytes(v)) + cppk[bytes(kk, 'utf-8')] = bytes(v) cdef cpp.IndexValue val val.first = (<InfoHash>value.getKey())._infohash val.second = value.getValueId() diff --git a/python/opendht_cpp.pxd b/python/opendht_cpp.pxd index 95b95500e7ba1bce1aeb3bacaf4592fa59b95307..d2f53026bb35c936a2ff84b2b10b45f40ca254a3 100644 --- a/python/opendht_cpp.pxd +++ b/python/opendht_cpp.pxd @@ -167,12 +167,13 @@ cdef extern from "opendht/indexation/pht.h" namespace "dht::indexation": Prefix(vector[uint8_t]) except + string toString() const ctypedef pair[InfoHash, uint64_t] IndexValue "dht::indexation::Value" - ctypedef map[string, Prefix] IndexKey "dht::indexation::Pht::Key" + ctypedef map[string, vector[uint8_t]] IndexKey "dht::indexation::Pht::Key" + ctypedef map[string, uint32_t] IndexKeySpec "dht::indexation::Pht::KeySpec" ctypedef void (*LookupCallbackRaw)(vector[shared_ptr[IndexValue]]* values, Prefix* p, void* user_data); cdef cppclass Pht: cppclass LookupCallback: LookupCallback() except + - Pht(string, shared_ptr[DhtRunner]) except + + Pht(string, IndexKeySpec, shared_ptr[DhtRunner]) except + void lookup(IndexKey k, LookupCallback cb, DoneCallbackSimple doneCb); void insert(IndexKey k, IndexValue v, DoneCallbackSimple cb) @staticmethod diff --git a/python/tools/dht/tests.py b/python/tools/dht/tests.py index 67cdd6a4ea67c8ee6687745159d0a27ee7e28e14..91eb810ff260b9c499907191a7903ff227362967 100644 --- a/python/tools/dht/tests.py +++ b/python/tools/dht/tests.py @@ -10,6 +10,7 @@ import string import time import subprocess import re +import collections from matplotlib.ticker import FuncFormatter import math @@ -276,17 +277,18 @@ class PhtTest(FeatureTest): """ bootstrap = self._bootstrap bootstrap.resize(2) - dht = bootstrap.get(1) - pht = Pht(b'foo_index', dht) + + NUM_DIG = max(math.log(self._num_keys, 2)/4, 5) # at least 5 digit keys. + keyspec = collections.OrderedDict([('foo', NUM_DIG)]) + pht = Pht(b'foo_index', keyspec, dht) DhtNetwork.log('PHT has', pht.MAX_NODE_ENTRY_COUNT, 'node'+ ('s' if pht.MAX_NODE_ENTRY_COUNT > 1 else ''), 'per leaf bucket.') - NUM_DIG = max(math.log(self._num_keys, 2)/4, 5) # at least 5 digit keys. keys = [{ - 'foo' : + [_ for _ in keyspec.keys()][0] : ''.join(random.SystemRandom().choice(string.hexdigits) for _ in range(NUM_DIG)).encode() } for n in range(self._num_keys)] diff --git a/src/indexation/pht.cpp b/src/indexation/pht.cpp index 17d92b157a86e52e1d8d09a7c97aa6f149a2ff5f..7fa0a042935c2876b543abe1ae08070e0e139232 100644 --- a/src/indexation/pht.cpp +++ b/src/indexation/pht.cpp @@ -4,6 +4,107 @@ namespace dht { namespace indexation { +struct IndexEntry : public dht::Value::Serializable<IndexEntry> { + static const ValueType TYPE; + + virtual void unpackValue(const dht::Value& v) { + Serializable<IndexEntry>::unpackValue(v); + name = v.user_type; + } + + virtual dht::Value packValue() const { + auto pack = Serializable<IndexEntry>::packValue(); + pack.user_type = name; + return pack; + } + + Blob prefix; + Value value; + std::string name; + MSGPACK_DEFINE_MAP(prefix, value); +}; + +void Pht::Cache::insert(const Prefix& p) { + size_t i = 0; + auto now = clock::now(); + + std::shared_ptr<Node> curr_node; + + while ((leaves_.size() > 0 && leaves_.begin()->first + NODE_EXPIRE_TIME < now) || leaves_.size() > MAX_ELEMENT) + leaves_.erase(leaves_.begin()); + + if (not (curr_node = root_.lock())) { + /* Root does not exist, need to create one*/ + curr_node = std::make_shared<Node>(); + root_ = curr_node; + } + + curr_node->last_reply = now; + + /* Iterate through all bit of the Blob */ + for ( i = 0; i < p.size_; i++ ) { + + /* According to the bit define which node is the next one */ + auto& next = ( p.isActiveBit(i) ) ? curr_node->right_child : curr_node->left_child; + + /** + * If lock, node exists + * else create it + */ + if (auto n = next.lock()) { + curr_node = std::move(n); + } else { + /* Create the next node if doesn't exist*/ + auto tmp_curr_node = std::make_shared<Node>(); + tmp_curr_node->parent = curr_node; + next = tmp_curr_node; + curr_node = std::move(tmp_curr_node); + } + + curr_node->last_reply = now; + } + + /* Insert the leaf (curr_node) into the multimap */ + leaves_.emplace(std::move(now), std::move(curr_node) ); +} + +int Pht::Cache::lookup(const Prefix& p) { + int pos = 0; + auto now = clock::now(), last_node_time = now; + + /* Before lookup remove the useless one [i.e. too old] */ + while ( leaves_.size() > 0 && leaves_.begin()->first + NODE_EXPIRE_TIME < now ) { + leaves_.erase(leaves_.begin()); + } + + auto next = root_; + std::shared_ptr<Node> curr_node; + + while ( auto n = next.lock() ) { + /* Safe since pos is equal to 0 until here */ + if ( (unsigned) pos >= p.size_ ) break; + + curr_node = n; + last_node_time = curr_node->last_reply; + curr_node->last_reply = now; + + /* Get the Prefix bit by bit, starting from left */ + next = ( p.isActiveBit(pos) ) ? curr_node->right_child : curr_node->left_child; + + ++pos; + } + + if ( pos > 0 ) { + auto to_erase = leaves_.find(last_node_time); + if ( to_erase != leaves_.end() ) + leaves_.erase( to_erase ); + + leaves_.emplace( std::move(now), std::move(curr_node) ); + } + + return --pos; +} + const ValueType IndexEntry::TYPE = ValueType::USER_DATA; constexpr std::chrono::minutes Pht::Cache::NODE_EXPIRE_TIME; diff --git a/tools/dhtnode.cpp b/tools/dhtnode.cpp index 7f5e714484d10463ffd66c0392356f6b33e84f45..c3a59ade28570f71e0a33fd9fe709762776926b4 100644 --- a/tools/dhtnode.cpp +++ b/tools/dhtnode.cpp @@ -73,7 +73,7 @@ void print_help() { << std::endl; } -void cmd_loop(std::shared_ptr<DhtRunner>& dht, std::map<std::string, dht::indexation::Pht> indexes, dht_params& params) +void cmd_loop(std::shared_ptr<DhtRunner>& dht, std::map<std::string, indexation::Pht> indexes, dht_params& params) { print_node_info(dht, params); std::cout << " (type 'h' or 'help' for a list of possible commands)" << std::endl << std::endl; @@ -161,11 +161,23 @@ void cmd_loop(std::shared_ptr<DhtRunner>& dht, std::map<std::string, dht::indexa if (op == "il" or op == "ii") { // Pht syntax iss >> index >> keystr; + auto new_index = std::find_if(indexes.begin(), indexes.end(), + [&](std::pair<const std::string, indexation::Pht>& i) { + return i.first == index; + }) == indexes.end(); if (not index.size()) { std::cout << "You must enter the index name." << std::endl; continue; - } else { - indexes.emplace(index, dht::indexation::Pht {index, dht}); + } else if (new_index) { + using namespace dht::indexation; + try { + auto key = createPhtKey(parseStringMap(keystr)); + Pht::KeySpec ks; + std::transform(key.begin(), key.end(), std::inserter(ks, ks.end()), [](Pht::Key::value_type& f) { + return std::make_pair(f.first, f.second.size()); + }); + indexes.emplace(index, Pht {index, std::move(ks), dht}); + } catch (std::invalid_argument& e) { std::cout << e.what() << std::endl; } } } else { @@ -250,25 +262,29 @@ void cmd_loop(std::shared_ptr<DhtRunner>& dht, std::map<std::string, dht::indexa else if (op == "il") { std::string exact_match; iss >> exact_match; - indexes.at(index).lookup(createPhtKey(parseStringMap(keystr)), - [=](std::vector<std::shared_ptr<indexation::Value>>& vals, indexation::Prefix p) { - std::cout << "Pht::lookup: at prefix \"" << p.toString() << "\"" << ", hash is " << p.hash() << std::endl; - for (auto v : vals) { - std::cout << "Pht::lookup: found hash" << std::endl - << "\t" << v->first << "[vid: " << v->second << "]" << std::endl; - } - std::cout << "Pht::lookup: done." << std::endl; - }, - [start](bool ok) { - if (not ok) { - std::cout << "Pht::lookup: dht Get failed." << std::endl; - } - - auto end = std::chrono::high_resolution_clock::now(); - std::cout << "Pht::lookup: took " << print_dt(end-start) << "s)" << std::endl; - - }, exact_match.size() != 0 and exact_match == "false" ? false : true - ); + try { + auto key = createPhtKey(parseStringMap(keystr)); + indexes.at(index).lookup(key, + [=](std::vector<std::shared_ptr<indexation::Value>>& vals, indexation::Prefix p) { + if (vals.empty()) + return; + std::cout << "Pht::lookup: found entries!" << std::endl + << " prefix: \"" << p.toString() << "\"" << std::endl + << " hash: " << p.hash() << std::endl; + std::cout << " entries:" << std::endl; + for (auto v : vals) + std::cout << " " << v->first.toString() << "[vid: " << v->second << "]" << std::endl; + }, + [start](bool ok) { + auto end = std::chrono::high_resolution_clock::now(); + std::cout << "Pht::lookup: " << (ok ? "done." : "failed.") + << " took " << print_dt(end-start) << "s)" << std::endl; + + }, exact_match.size() != 0 and exact_match == "false" ? false : true + ); + } + catch (std::invalid_argument& e) { std::cout << e.what() << std::endl; } + catch (std::out_of_range& e) { } } else if (op == "ii") { iss >> idstr; @@ -277,15 +293,16 @@ void cmd_loop(std::shared_ptr<DhtRunner>& dht, std::map<std::string, dht::indexa continue; indexation::Value v {h, 0}; - indexes.at(index).insert(createPhtKey(parseStringMap(keystr)), v, - [=](bool success) { - if (not success) { - std::cout << "Pht::insert: failed." << std::endl; - return; + try { + auto key = createPhtKey(parseStringMap(keystr)); + indexes.at(index).insert(key, v, + [=](bool ok) { + std::cout << "Pht::insert: " << (ok ? "done." : "failed.") << std::endl; } - std::cout << "Pht::insert: done." << std::endl; - } - ); + ); + } + catch (std::invalid_argument& e) { std::cout << e.what() << std::endl; } + catch (std::out_of_range& e) { } } } @@ -299,7 +316,7 @@ main(int argc, char **argv) throw std::runtime_error(std::string("Error initializing GnuTLS: ")+gnutls_strerror(rc)); auto dht = std::make_shared<DhtRunner>(); - std::map<std::string, dht::indexation::Pht> indexes; + std::map<std::string, indexation::Pht> indexes; try { auto params = parseArgs(argc, argv); diff --git a/tools/tools_common.h b/tools/tools_common.h index 700440fb431a0c96eac7668834364f35a2b9ffb2..05c5c0dc9a14ac795fc1b0cf526acff47cb1933e 100644 --- a/tools/tools_common.h +++ b/tools/tools_common.h @@ -161,7 +161,7 @@ dht::indexation::Pht::Key createPhtKey(std::map<std::string, std::string> pht_ke dht::indexation::Pht::Key pht_key; for (auto f : pht_key_str_map) { dht::Blob prefix {f.second.begin(), f.second.end()}; - pht_key.emplace(f.first, prefix); + pht_key.emplace(f.first, std::move(prefix)); } return pht_key; }