diff --git a/include/opendht/indexation/pht.h b/include/opendht/indexation/pht.h index 8af618f7dd6c95e94f4116808dd5c6a58a60fd44..1c5557af5dd66e762c643a287996e7e47625d352 100644 --- a/include/opendht/indexation/pht.h +++ b/include/opendht/indexation/pht.h @@ -28,37 +28,61 @@ namespace indexation { */ struct Prefix { Prefix() {} - Prefix(InfoHash h) : size_(h.size() * 8), content_(h.begin(), h.end()) {} - Prefix(const Blob& d) : size_(d.size()*8), content_(d) {} + Prefix(InfoHash h) : size_(h.size() * 8), content_(h.begin(), h.end()) { } + Prefix(const Blob& d, const Blob& f={}) : size_(d.size()*8), flags_(f), content_(d) { } + Prefix(const Prefix& p, size_t first) : size_(std::min(first, p.content_.size()*8)), content_(Blob(p.content_.begin(), p.content_.begin()+size_/8)) { + auto rem = size_ % 8; + if ( not p.flags_.empty() ) { + flags_ = Blob(p.flags_.begin(), p.flags_.begin()+size_/8); + if (rem) + flags_.push_back(p.flags_[size_/8] & (0xFF << (8 - rem))); + } + if (rem) content_.push_back(p.content_[size_/8] & (0xFF << (8 - rem))); } + /** + * Get a sub prefix of the Prefix + * + * @param len lenght of the prefix to get, could be negative + * if len is negativ then you will get the prefix + * of size of the previous prefix minus len + * + * @return Sub-prefix of size len or if len is negative sub-prefix of size + * of prefix minus len + * + * @throw out_of_range if len is larger than size of the content + */ Prefix getPrefix(ssize_t len) const { if ((size_t)std::abs(len) >= content_.size() * 8) throw std::out_of_range("len larger than prefix size."); if (len < 0) len += size_; + return Prefix(*this, len); } /** - * Method for getting the state of the bit at the position pos. - * @param pos : Pos of the needed bit - * @return : true if the bit is at 1 - * false otherwise - * @throw out_of_range Throw out of range if the bit at 'pos' does not exist + * Flags are considered as active if flag is empty or if the flag + * at pos 'pos' is active +ee * + * @see isActiveBit in private function */ - bool isActiveBit(size_t pos) const { - if ( pos >= content_.size() * 8 ) - throw std::out_of_range("Can't detect active bit at pos, pos larger than prefix size or empty prefix"); + bool isFlagActive(size_t pos) const { + return flags_.empty() or isActiveBit(flags_, pos); + } - return ((this->content_[pos / 8] >> (7 - (pos % 8)) ) & 1) == 1; + /** + * @see isActiveBit in private function + */ + bool isContentBitActive(size_t pos) const { + return isActiveBit(content_, pos); } Prefix getFullSize() { return Prefix(*this, content_.size()*8); } @@ -69,9 +93,11 @@ struct Prefix { * @return The prefix of this sibling. */ Prefix getSibling() const { - if ( not size_ ) - return Prefix(*this); - return swapBit(size_); + Prefix copy = *this; + if ( size_ ) + copy.swapContentBit(size_ - 1); + + return copy; } InfoHash hash() const { @@ -80,18 +106,6 @@ struct Prefix { return InfoHash::get(copy); } - std::string toString() const { - std::stringstream ss; - auto bn = size_ % 8; - auto n = size_ / 8; - for (size_t i = 0; i<n; i++) - ss << std::bitset<8>(content_[i]); - if (bn) - for (unsigned b=0; b<bn; b++) - ss << (char)((content_[n] & (1 << (7 - b))) ? '1':'0'); - return ss.str(); - } - /** * This method count total of bit in common between 2 prefix * @@ -105,8 +119,12 @@ struct Prefix { auto longest_prefix_size = std::min(p1.size_, p2.size_); for (i = 0; i < longest_prefix_size; i++) { - if (p1.content_.data()[i] != p2.content_.data()[i]) - break; + if (p1.content_.data()[i] != p2.content_.data()[i] + or not p1.isFlagActive(i) + or not p2.isFlagActive(i) ) { + + break; + } } if (i == longest_prefix_size) @@ -124,30 +142,109 @@ struct Prefix { } /** - * This method swap the bit a the position 'bit' and return the new prefix - * - * @param bit Position of the bit to swap - * @return The prefix with the bit at position 'bit' swapped - * @throw out_of_range Throw out of range if bit does not exist + * @see doc of swap private function */ - Prefix swapBit(size_t bit) const { - if ( bit > content_.size() * 8 ) - throw std::out_of_range("bit larger than prefix size."); + void swapContentBit(size_t bit) { + swapBit(content_, bit); + } - Prefix copy = *this; - size_t offset_bit = (8 - bit) % 8; - copy.content_[bit / 8] ^= (1 << offset_bit); + /** + * @see doc of swap private function + */ + void swapFlagBit(size_t bit) { + swapBit(flags_, bit); + } - return copy; + /** + * @see doc of addPadding private function + */ + void addPaddingContent(size_t size) { + content_ = addPadding(content_, size); + } + + void updateFlags() { + /* Fill first known bit */ + auto csize = size_ - flags_.size() * 8; + while(csize >= 8) { + flags_.push_back(0xFF); + csize -= 8; + } + + /* if needed fill remaining bit */ + if ( csize ) + flags_.push_back(0xFF << (8 - csize)); + + /* Complet vector space missing */ + for ( auto i = flags_.size(); i < content_.size(); i++ ) + flags_.push_back(0xFF); } + std::string toString() const; + size_t size_ {0}; + + /* Will contain flags according to content_. + If flags_[i] == 0, then content_[i] is unknown + else if flags_[i] == 1, then content_[i] is known */ + Blob flags_ {}; Blob content_ {}; +private: + + /** + * Add a padding to the input blob + * + * @param toP : Prefix where to add a padding + * @param size : Final size of the prefix with padding + * + * @return Copy of the input Blob but with a padding + */ + Blob addPadding(Blob toP, size_t size) { + Blob copy = toP; + for ( auto i = copy.size(); i < size; i++ ) + copy.push_back(0); + + swapBit(copy, size_ + 1); + return copy; + } + + /** + * Check if the bit a pos 'pos' is active, i.e. equal to 1 + * + * @param b : Blob to check + * @param pos : Position to check + * + * @return true if the bit is equal to 1, false otherwise + * + * @throw out_of_range if bit is superior to blob size * 8 + */ + bool isActiveBit(const Blob &b, size_t pos) const { + if ( pos >= content_.size() * 8 ) + throw std::out_of_range("Can't detect active bit at pos, pos larger than prefix size or empty prefix"); + + return ((b[pos / 8] >> (7 - (pos % 8)) ) & 1) == 1; + } + + /** + * Swap bit at position bit [from 0 to 1 and vice-versa] + * + * @param b : Blob to swap + * @param bit : Bit to swap on b + * + * @return the input prefix with the bit at pos 'bit' swapped + * + * @throw out_of_range if bit is superior to blob size * 8 + */ + void swapBit(Blob &b, size_t bit) { + if ( bit >= b.size() * 8 ) + throw std::out_of_range("bit larger than prefix size."); + + size_t offset_bit = (8 - bit) % 8; + b[bit / 8] ^= (1 << offset_bit); + } }; using Value = std::pair<InfoHash, dht::Value::Id>; - struct IndexEntry : public dht::Value::Serializable<IndexEntry> { static const ValueType TYPE; @@ -168,7 +265,6 @@ struct IndexEntry : public dht::Value::Serializable<IndexEntry> { MSGPACK_DEFINE_MAP(prefix, value); }; - class Pht { static constexpr const char* INVALID_KEY = "Key does not match the PHT key spec."; @@ -176,6 +272,7 @@ class Pht { static constexpr const char* INDEX_PREFIX = "index.pht."; public: + /* This is the maximum number of entries per node. This parameter is * critical and influences the traffic a lot during a lookup operation. */ @@ -183,6 +280,7 @@ public: /* A key for a an index entry */ using Key = std::map<std::string, Blob>; + /* Specifications of the keys. It defines the number, the length and the * serialization order of fields. */ using KeySpec = std::map<std::string, size_t>; @@ -207,11 +305,8 @@ public: } Pht(std::string name, KeySpec k_spec, std::shared_ptr<DhtRunner> dht) - : name_(INDEX_PREFIX + name), canary_(name_ + ".canary"), keySpec_(k_spec), dht_(dht) - { - if (k_spec.size() != 1) - throw std::invalid_argument("PHT only supports unidimensional data."); - } + : name_(INDEX_PREFIX + name), canary_(name_ + ".canary"), keySpec_(k_spec), dht_(dht) {} + virtual ~Pht () { } /** @@ -281,7 +376,7 @@ private: private: static constexpr const size_t MAX_ELEMENT {1024}; - static constexpr const std::chrono::minutes NODE_EXPIRE_TIME {10}; + static constexpr const std::chrono::minutes NODE_EXPIRE_TIME {5}; struct Node { time_point last_reply; /* Made the assocation between leaves and leaves multimap */ @@ -324,6 +419,15 @@ private: std::shared_ptr<unsigned> max_common_prefix_len, int start = -1, bool all_values = false); + /** + * Apply the zcurve algorithm on the list of input prefix + * + * @param all_prefix : Vector of prefix to interleave + * + * @return The output prefix where all flags and content are interleaves + */ + Prefix zcurve(const std::vector<Prefix>& all_prefix) const; + /** * Linearizes the key into a unidimensional key. A pht only takes * unidimensional key. @@ -364,7 +468,7 @@ private: size_t foundSplitLocation(Prefix compared, std::shared_ptr<std::vector<std::shared_ptr<IndexEntry>>> vals) { for ( size_t i = 0; i < compared.content_.size() * 8 - 1; i++ ) for ( auto const& v : *vals) - if ( Prefix(v->prefix).isActiveBit(i) != compared.isActiveBit(i) ) + if ( Prefix(v->prefix).isContentBitActive(i) != compared.isContentBitActive(i) ) return i + 1; return compared.content_.size() * 8 - 1; diff --git a/src/Makefile.am b/src/Makefile.am index 0340392f2fa6b88b458e1171c4b317241c38c6d1..74451a36d45775bedc0bd58b5f28a1669cf79b24 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -32,7 +32,7 @@ nobase_include_HEADERS = \ ../include/opendht/dht.h \ ../include/opendht/callbacks.h \ ../include/opendht/node_cache.h \ - ../include/opendht/routing_table.h \ + ../include/opendht/routing_table.h \ ../include/opendht/network_engine.h \ ../include/opendht/scheduler.h \ ../include/opendht/utils.h \ diff --git a/src/indexation/pht.cpp b/src/indexation/pht.cpp index 65f2ea62f7831b028753e01d46a2df02d59a3295..f3ef596acbe1793575ded663daffd3a522304b98 100644 --- a/src/indexation/pht.cpp +++ b/src/indexation/pht.cpp @@ -4,14 +4,53 @@ namespace dht { namespace indexation { +/** + * Output the blob into string and readable way + * + * @param bl : Blob to print + * + * @return string that represent the blob into a readable way + */ +static std::string blobToString(const Blob &bl) { + std::stringstream ss; + auto bn = bl.size() % 8; + auto n = bl.size() / 8; + + for (size_t i = 0; i < bl.size(); i++) + ss << std::bitset<8>(bl[i]) << " "; + if (bn) + for (unsigned b=0; b < bn; b++) + ss << (char)((bl[n] & (1 << (7 - b))) ? '1':'0'); + + return ss.str(); +} + +std::string Prefix::toString() const { + std::stringstream ss; + + ss << "Prefix : " << std::endl << "\tContent_ : "; + ss << blobToString(content_); + ss << std::endl; + + ss << "\tFlags_ : "; + ss << blobToString(flags_); + ss << std::endl; + + return ss.str(); +} + void Pht::Cache::insert(const Prefix& p) { size_t i = 0; auto now = clock::now(); std::shared_ptr<Node> curr_node; - while ((leaves_.size() > 0 and leaves_.begin()->first + NODE_EXPIRE_TIME < now) or leaves_.size() > MAX_ELEMENT) + while ((leaves_.size() > 0 + and leaves_.begin()->first + NODE_EXPIRE_TIME < now) + or leaves_.size() > MAX_ELEMENT) { + leaves_.erase(leaves_.begin()); + } if (not (curr_node = root_.lock()) ) { /* Root does not exist, need to create one*/ @@ -25,7 +64,8 @@ void Pht::Cache::insert(const Prefix& p) { for ( i = 0; i < p.size_; i++ ) { /* According to the bit define which node is the next one */ - auto& next = ( p.isActiveBit(i) ) ? curr_node->right_child : curr_node->left_child; + auto& next = ( p.isContentBitActive(i) ) ? curr_node->right_child : curr_node->left_child; + /** * If lock, node exists * else create it @@ -52,7 +92,9 @@ int Pht::Cache::lookup(const Prefix& p) { auto now = clock::now(), last_node_time = now; /* Before lookup remove the useless one [i.e. too old] */ - while ( leaves_.size() > 0 and leaves_.begin()->first + NODE_EXPIRE_TIME < now ) { + while ( leaves_.size() > 0 + and leaves_.begin()->first + NODE_EXPIRE_TIME < now ) { + leaves_.erase(leaves_.begin()); } @@ -69,7 +111,7 @@ int Pht::Cache::lookup(const Prefix& p) { curr_node->last_reply = now; /* Get the Prefix bit by bit, starting from left */ - next = ( p.isActiveBit(pos) ) ? curr_node->right_child : curr_node->left_child; + next = ( p.isContentBitActive(pos) ) ? curr_node->right_child : curr_node->left_child; } if ( pos >= 0 ) { @@ -99,6 +141,7 @@ void Pht::lookupStep(Prefix p, std::shared_ptr<int> lo, std::shared_ptr<int> hi, /* start could be under 0 but after the compare it to 0 it always will be unsigned, so we can cast it*/ auto mid = (start >= 0) ? (unsigned) start : (*lo + *hi)/2; + auto first_res = std::make_shared<node_lookup_result>(); auto second_res = std::make_shared<node_lookup_result>(); @@ -111,6 +154,7 @@ void Pht::lookupStep(Prefix p, std::shared_ptr<int> lo, std::shared_ptr<int> hi, else if (is_leaf or *lo > *hi) { // leaf node Prefix to_insert = p.getPrefix(mid); + cache_.insert(to_insert); if (cb) { if (vals->size() == 0 and max_common_prefix_len and mid > 0) { @@ -151,6 +195,7 @@ void Pht::lookupStep(Prefix p, std::shared_ptr<int> lo, std::shared_ptr<int> hi, if (max_common_prefix_len) { /* inexact match case */ auto common_bits = Prefix::commonBits(p, entry.prefix); + if (vals->empty()) { vals->emplace_back(std::make_shared<IndexEntry>(entry)); *max_common_prefix_len = common_bits; @@ -167,6 +212,7 @@ void Pht::lookupStep(Prefix p, std::shared_ptr<int> lo, std::shared_ptr<int> hi, } else if (all_values or entry.prefix == p.content_) /* exact match case */ vals->emplace_back(std::make_shared<IndexEntry>(entry)); } + return true; }; @@ -207,8 +253,7 @@ void Pht::lookupStep(Prefix p, std::shared_ptr<int> lo, std::shared_ptr<int> hi, if (first_res->done) on_done(true); } - }, pht_filter); - + }, pht_filter); } else { on_done(true); } @@ -278,35 +323,97 @@ void Pht::insert(Prefix kp, IndexEntry entry, std::shared_ptr<int> lo, std::shar updateCanary(*p); checkPhtUpdate(*p, entry, time_p); cache_.insert(*p); - dht_->put(p->hash(), std::move(entry), done_cb, time_p); + dht_->put(p->hash(), std::move(entry), done_cb , time_p); }; if ( not check_split or final_prefix->size_ == kp.size_ ) { real_insert(final_prefix, std::move(entry)); } else { - if ( vals->size() < MAX_NODE_ENTRY_COUNT ) + if ( vals->size() < MAX_NODE_ENTRY_COUNT ) { getRealPrefix(final_prefix, std::move(entry), real_insert); - else + } + else { split(*final_prefix, vals, entry, real_insert); + } } } }, nullptr, cache_.lookup(kp), true ); } +Prefix Pht::zcurve(const std::vector<Prefix>& all_prefix) const { + Prefix p; + + if ( all_prefix.size() == 1 ) + return all_prefix[0]; + + /* All prefix got the same size (thanks to padding) */ + size_t prefix_size = all_prefix[0].content_.size(); + + /* Loop on all uint8_t of the input prefix */ + for ( size_t j = 0, bit = 0; j < prefix_size; j++) { + + uint8_t mask = 0x80; + /* For each of the 8 bits of the input uint8_t */ + for ( int i = 0; i < 8; ) { + + uint8_t flags = 0; + uint8_t content = 0; + + /* For each bit of the output uint8_t */ + for ( int k = 0 ; k < 8; k++ ) { + + auto diff = k - i; + + /*get the content 'c', and the flag 'f' of the input prefix */ + auto c = all_prefix[bit].content_[j] & mask; + auto f = all_prefix[bit].flags_[j] & mask; + + /* Move this bit at the right position according to the diff + and merge it into content and flags in the same way */ + content |= ( diff >= 0 ) ? c >> diff : c << std::abs(diff); + flags |= ( diff >= 0 ) ? f >> diff : f << std::abs(diff); + + /* If we are on the last prefix of the vector get back to the first and + ,move the mask in order to get the n + 1nth bit */ + if ( ++bit == all_prefix.size() ) { bit = 0; ++i; mask >>= 1; } + } + + /* Add the next flags + content to the output prefix */ + p.content_.push_back(content); + p.flags_.push_back(flags); + p.size_ += 8; + } + } + + return p; +} + Prefix Pht::linearize(Key k) const { if (not validKey(k)) { throw std::invalid_argument(INVALID_KEY); } - Prefix p = Blob {k.begin()->second.begin(), k.begin()->second.end()}; + std::vector<Prefix> all_prefix; + all_prefix.reserve(k.size()); + + /* Get the max size of the keyspec and take it for size limit (for padding) */ + auto max = std::max_element(keySpec_.begin(), keySpec_.end(), + [](const std::pair<std::string, size_t>& a, const std::pair<std::string, size_t>& b) { + return a.second < b.second; + })->second + 1; - auto bit_loc = p.size_ + 1; - for ( auto i = p.content_.size(); i < keySpec_.begin()->second + 1; i++ ) - p.content_.push_back(0); + for ( auto const& it : k ) { + Prefix p = Blob {it.second.begin(), it.second.end()}; + p.addPaddingContent(max); + p.updateFlags(); - return p.swapBit(bit_loc); -}; + all_prefix.emplace_back(std::move(p)); + } + + return zcurve(all_prefix); +} void Pht::getRealPrefix(std::shared_ptr<Prefix> p, IndexEntry entry, RealInsertCallback end_cb ) { + if ( p->size_ == 0 ) { end_cb(p, std::move(entry)); return; @@ -391,8 +498,9 @@ void Pht::split(Prefix insert, std::shared_ptr<std::vector<std::shared_ptr<Index auto loc = foundSplitLocation(full, vals); auto prefix_to_insert = std::make_shared<Prefix>(full.getPrefix(loc)); - for(;loc != insert.size_ - 1; loc--) + for(;loc != insert.size_ - 1; loc--) { updateCanary(full.getPrefix(loc)); + } end_cb(prefix_to_insert, entry); } diff --git a/tools/dhtnode.cpp b/tools/dhtnode.cpp index c3a59ade28570f71e0a33fd9fe709762776926b4..85e09f846837289d1a43ebaf3fd03c22ed8584f6 100644 --- a/tools/dhtnode.cpp +++ b/tools/dhtnode.cpp @@ -269,7 +269,7 @@ void cmd_loop(std::shared_ptr<DhtRunner>& dht, std::map<std::string, indexation: if (vals.empty()) return; std::cout << "Pht::lookup: found entries!" << std::endl - << " prefix: \"" << p.toString() << "\"" << std::endl + << p.toString() << std::endl << " hash: " << p.hash() << std::endl; std::cout << " entries:" << std::endl; for (auto v : vals)