diff --git a/include/opendht/indexation/pht.h b/include/opendht/indexation/pht.h index 8af618f7dd6c95e94f4116808dd5c6a58a60fd44..e6b37f8ebaa5161084e6a32a6bc18a3bd9a85cce 100644 --- a/include/opendht/indexation/pht.h +++ b/include/opendht/indexation/pht.h @@ -28,13 +28,20 @@ namespace indexation { */ struct Prefix { Prefix() {} - Prefix(InfoHash h) : size_(h.size() * 8), content_(h.begin(), h.end()) {} - Prefix(const Blob& d) : size_(d.size()*8), content_(d) {} + Prefix(InfoHash h) : size_(h.size() * 8), content_(h.begin(), h.end()) { } + Prefix(const Blob& d, const Blob& f={}) : size_(d.size()*8), flags_(f), content_(d) { } + Prefix(const Prefix& p, size_t first) : size_(std::min(first, p.content_.size()*8)), content_(Blob(p.content_.begin(), p.content_.begin()+size_/8)) { auto rem = size_ % 8; + if ( not flags_.empty() ) { + flags_ = Blob(p.flags_.begin(), p.flags_.begin()+size_/8); + if (rem) + flags_.push_back(p.flags_[size_/8] & (0xFF << (8 - rem))); + } + if (rem) content_.push_back(p.content_[size_/8] & (0xFF << (8 - rem))); } @@ -54,11 +61,12 @@ struct Prefix { * false otherwise * @throw out_of_range Throw out of range if the bit at 'pos' does not exist */ - bool isActiveBit(size_t pos) const { - if ( pos >= content_.size() * 8 ) - throw std::out_of_range("Can't detect active bit at pos, pos larger than prefix size or empty prefix"); + bool isFlagActive(size_t pos) const { + return flags_.empty() or isActiveBit(flags_, pos); + } - return ((this->content_[pos / 8] >> (7 - (pos % 8)) ) & 1) == 1; + bool isContentBitActive(size_t pos) const { + return isActiveBit(content_, pos); } Prefix getFullSize() { return Prefix(*this, content_.size()*8); } @@ -82,13 +90,15 @@ struct Prefix { std::string toString() const { std::stringstream ss; - auto bn = size_ % 8; - auto n = size_ / 8; - for (size_t i = 0; i<n; i++) - ss << std::bitset<8>(content_[i]); - if (bn) - for (unsigned b=0; b<bn; b++) - ss << (char)((content_[n] & (1 << (7 - b))) ? '1':'0'); + + ss << "Prefix : " << std::endl << "\tContent_ : "; + ss << blobToString(content_); + ss << std::endl; + + ss << "\tFlags_ : "; + ss << blobToString(flags_); + ss << std::endl; + return ss.str(); } @@ -105,8 +115,12 @@ struct Prefix { auto longest_prefix_size = std::min(p1.size_, p2.size_); for (i = 0; i < longest_prefix_size; i++) { - if (p1.content_.data()[i] != p2.content_.data()[i]) - break; + if (p1.content_.data()[i] != p2.content_.data()[i] + or not p1.isFlagActive(i) + or not p2.isFlagActive(i) ) { + + break; + } } if (i == longest_prefix_size) @@ -124,30 +138,87 @@ struct Prefix { } /** - * This method swap the bit a the position 'bit' and return the new prefix + * This method swap the bit a the position 'bit' * * @param bit Position of the bit to swap * @return The prefix with the bit at position 'bit' swapped * @throw out_of_range Throw out of range if bit does not exist */ Prefix swapBit(size_t bit) const { - if ( bit > content_.size() * 8 ) + if ( bit >= content_.size() * 8 ) throw std::out_of_range("bit larger than prefix size."); - Prefix copy = *this; - size_t offset_bit = (8 - bit) % 8; - copy.content_[bit / 8] ^= (1 << offset_bit); + void swapFlagBit(size_t bit) { + swapBit(flags_, bit); + } - return copy; + void addPaddingContent(size_t size) { + content_ = addPadding(content_, size); + } + + void updateFlags() { + // Fill first known bit + auto csize = size_ - flags_.size() * 8; + while(csize >= 8) { + flags_.push_back(0xFF); + csize -= 8; + } + + // if needed fill remaining bit + if ( csize ) + flags_.push_back(0xFF << (8 - csize)); + + // Complet vector space missing + for ( auto i = flags_.size(); i < content_.size(); i++ ) + flags_.push_back(0xFF); } size_t size_ {0}; + + Blob flags_ {}; Blob content_ {}; +private: + std::string blobToString(const Blob &bl) const { + std::stringstream ss; + + auto bn = size_ % 8; + auto n = size_ / 8; + + for (size_t i = 0; i < bl.size(); i++) + ss << std::bitset<8>(bl[i]) << " "; + if (bn) + for (unsigned b=0; b < bn; b++) + ss << (char)((bl[n] & (1 << (7 - b))) ? '1':'0'); + + return ss.str(); + } + + Blob addPadding(Blob toP, size_t size) { + Blob copy = toP; + for ( auto i = copy.size(); i < size; i++ ) + copy.push_back(0); + + swapBit(copy, size_ + 1); + return copy; + } + + bool isActiveBit(const Blob &b, size_t pos) const { + if ( pos >= size_ ) + throw std::out_of_range("Can't detect active bit at pos, pos larger than prefix size or empty prefix"); + + return ((b[pos / 8] >> (7 - (pos % 8)) ) & 1) == 1; + } + + void swapBit(Blob &b, size_t bit) { + if ( bit >= b.size() * 8 ) + throw std::out_of_range("bit larger than prefix size."); + size_t offset_bit = (8 - bit) % 8; + b[bit / 8] ^= (1 << offset_bit); + } }; using Value = std::pair<InfoHash, dht::Value::Id>; - struct IndexEntry : public dht::Value::Serializable<IndexEntry> { static const ValueType TYPE; @@ -168,7 +239,6 @@ struct IndexEntry : public dht::Value::Serializable<IndexEntry> { MSGPACK_DEFINE_MAP(prefix, value); }; - class Pht { static constexpr const char* INVALID_KEY = "Key does not match the PHT key spec."; @@ -176,6 +246,7 @@ class Pht { static constexpr const char* INDEX_PREFIX = "index.pht."; public: + /* This is the maximum number of entries per node. This parameter is * critical and influences the traffic a lot during a lookup operation. */ @@ -207,11 +278,8 @@ public: } Pht(std::string name, KeySpec k_spec, std::shared_ptr<DhtRunner> dht) - : name_(INDEX_PREFIX + name), canary_(name_ + ".canary"), keySpec_(k_spec), dht_(dht) - { - if (k_spec.size() != 1) - throw std::invalid_argument("PHT only supports unidimensional data."); - } + : name_(INDEX_PREFIX + name), canary_(name_ + ".canary"), keySpec_(k_spec), dht_(dht) {} + virtual ~Pht () { } /** @@ -281,7 +349,7 @@ private: private: static constexpr const size_t MAX_ELEMENT {1024}; - static constexpr const std::chrono::minutes NODE_EXPIRE_TIME {10}; + static constexpr const std::chrono::minutes NODE_EXPIRE_TIME {5}; struct Node { time_point last_reply; /* Made the assocation between leaves and leaves multimap */ @@ -324,6 +392,33 @@ private: std::shared_ptr<unsigned> max_common_prefix_len, int start = -1, bool all_values = false); + Prefix zcurve(const std::vector<Prefix>& all_prefix) const { + Prefix p; + if ( all_prefix.size() == 1 ) return all_prefix[0]; + + for ( size_t j = 0, bit = 0; j < all_prefix[0].content_.size(); j++) { + uint8_t mask = 0x80; + for ( int i = 0; i < 8; ) { + uint8_t content = 0; + uint8_t flags = 0; + for ( int k = 0 ; k < 8; k++, bit++ ) { + auto diff = k - i; + auto x = all_prefix[bit].content_[j] & mask; + auto y = all_prefix[bit].flags_[j] & mask; + content |= ( diff >= 0 ) ? x >> diff : x << std::abs(diff); + flags |= ( diff >= 0 ) ? y >> diff : y << std::abs(diff); + + if ( bit == all_prefix.size() - 1 ) { bit = -1; ++i; mask >>= 1; } + } + p.content_.push_back(content); + p.flags_.push_back(flags); + p.size_ += 8; + } + } + + return p; + } + /** * Linearizes the key into a unidimensional key. A pht only takes * unidimensional key. diff --git a/src/Makefile.am b/src/Makefile.am index 2eb007a5347284aed47e5618295c4a88abe8feec..0a4b80e999b6e79a788abdc360a23bfd372c8268 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -3,7 +3,8 @@ SUBDIRS = argon2 lib_LTLIBRARIES = libopendht.la AM_CPPFLAGS = -I../include/opendht -libopendht_la_LDFLAGS = @LDFLAGS@ @GnuTLS_LIBS@ @Nettle_LIBS@ -version-number @OPENDHT_MAJOR_VERSION@:@OPENDHT_MINOR_VERSION@:@OPENDHT_PATCH_VERSION@ +libopendht_la_CXXFLAGS = @CXXFLAGS@ +libopendht_la_LDFLAGS = @LDFLAGS@ @GNUTLS_LIBS@ @nettle_LIBS@ -version-info 6:0:2 libopendht_la_LIBADD = ./argon2/libargon2.la libopendht_la_SOURCES = \ diff --git a/src/indexation/pht.cpp b/src/indexation/pht.cpp index 65f2ea62f7831b028753e01d46a2df02d59a3295..df80d8b42c3dcd155c7960823622e419a5077235 100644 --- a/src/indexation/pht.cpp +++ b/src/indexation/pht.cpp @@ -10,8 +10,12 @@ void Pht::Cache::insert(const Prefix& p) { std::shared_ptr<Node> curr_node; - while ((leaves_.size() > 0 and leaves_.begin()->first + NODE_EXPIRE_TIME < now) or leaves_.size() > MAX_ELEMENT) + while ((leaves_.size() > 0 + and leaves_.begin()->first + NODE_EXPIRE_TIME < now) + or leaves_.size() > MAX_ELEMENT) { + leaves_.erase(leaves_.begin()); + } if (not (curr_node = root_.lock()) ) { /* Root does not exist, need to create one*/ @@ -25,7 +29,8 @@ void Pht::Cache::insert(const Prefix& p) { for ( i = 0; i < p.size_; i++ ) { /* According to the bit define which node is the next one */ - auto& next = ( p.isActiveBit(i) ) ? curr_node->right_child : curr_node->left_child; + auto& next = ( p.isContentBitActive(i) ) ? curr_node->right_child : curr_node->left_child; + /** * If lock, node exists * else create it @@ -52,7 +57,9 @@ int Pht::Cache::lookup(const Prefix& p) { auto now = clock::now(), last_node_time = now; /* Before lookup remove the useless one [i.e. too old] */ - while ( leaves_.size() > 0 and leaves_.begin()->first + NODE_EXPIRE_TIME < now ) { + while ( leaves_.size() > 0 + and leaves_.begin()->first + NODE_EXPIRE_TIME < now ) { + leaves_.erase(leaves_.begin()); } @@ -69,7 +76,7 @@ int Pht::Cache::lookup(const Prefix& p) { curr_node->last_reply = now; /* Get the Prefix bit by bit, starting from left */ - next = ( p.isActiveBit(pos) ) ? curr_node->right_child : curr_node->left_child; + next = ( p.isContentBitActive(pos) ) ? curr_node->right_child : curr_node->left_child; } if ( pos >= 0 ) { @@ -110,7 +117,6 @@ void Pht::lookupStep(Prefix p, std::shared_ptr<int> lo, std::shared_ptr<int> hi, } else if (is_leaf or *lo > *hi) { // leaf node - Prefix to_insert = p.getPrefix(mid); if (cb) { if (vals->size() == 0 and max_common_prefix_len and mid > 0) { @@ -120,7 +126,7 @@ void Pht::lookupStep(Prefix p, std::shared_ptr<int> lo, std::shared_ptr<int> hi, lookupStep(p_, lo, hi, vals, cb, done_cb, max_common_prefix_len, -1, all_values); } - cb(*vals, to_insert); + cb(*vals, p.getPrefix(mid)); } if (done_cb) @@ -151,6 +157,7 @@ void Pht::lookupStep(Prefix p, std::shared_ptr<int> lo, std::shared_ptr<int> hi, if (max_common_prefix_len) { /* inexact match case */ auto common_bits = Prefix::commonBits(p, entry.prefix); + if (vals->empty()) { vals->emplace_back(std::make_shared<IndexEntry>(entry)); *max_common_prefix_len = common_bits; @@ -167,6 +174,7 @@ void Pht::lookupStep(Prefix p, std::shared_ptr<int> lo, std::shared_ptr<int> hi, } else if (all_values or entry.prefix == p.content_) /* exact match case */ vals->emplace_back(std::make_shared<IndexEntry>(entry)); } + return true; }; @@ -278,7 +286,7 @@ void Pht::insert(Prefix kp, IndexEntry entry, std::shared_ptr<int> lo, std::shar updateCanary(*p); checkPhtUpdate(*p, entry, time_p); cache_.insert(*p); - dht_->put(p->hash(), std::move(entry), done_cb, time_p); + dht_->put(p->hash(), std::move(entry), done_cb /*, time_p */); }; if ( not check_split or final_prefix->size_ == kp.size_ ) { @@ -294,19 +302,49 @@ void Pht::insert(Prefix kp, IndexEntry entry, std::shared_ptr<int> lo, std::shar ); } -Prefix Pht::linearize(Key k) const { +Prefix linearize(Key k) const { if (not validKey(k)) { throw std::invalid_argument(INVALID_KEY); } + std::vector<Prefix> all_prefix; + auto max = std::max_element(keySpec_.begin(), keySpec_.end(), + [](const std::pair<string, size_t>& a, const std::pair<string, size_t>& b) { + return a.second < b.second; + }); + + for ( auto i = 0; i < k.size; i++ ) { + Prefix p = Blob {k.begin()->second.begin(), k.begin()->second.end()}; + p.addPaddingContent(max); + p.updateFlags(); + + all_prefix.push_back(p); + } - Prefix p = Blob {k.begin()->second.begin(), k.begin()->second.end()}; - - auto bit_loc = p.size_ + 1; - for ( auto i = p.content_.size(); i < keySpec_.begin()->second + 1; i++ ) - p.content_.push_back(0); - - return p.swapBit(bit_loc); + return zcurve(all_prefix); +/* + virtual Prefix linearize(Key k) const { + if (not validKey(k)) { throw std::invalid_argument(INVALID_KEY); } + + std::vector<Prefix> all_prefix; + auto max = std::max_element(keySpec_.begin(), keySpec_.end(), + [&](const std::pair<std::string, size_t>& a, const std::pair<std::string, size_t>& b) { + return a.second < b.second; + })->second + 1; + + for ( auto const& it : k ) { + Prefix p = Blob {it.second.begin(), it.second.end()}; + p.addPaddingContent(max); + p.updateFlags(); + all_prefix.push_back(p); + + auto bit_loc = p.size_ + 1; + for ( auto i = p.content_.size(); i < keySpec_.begin()->second + 1; i++ ) + p.content_.push_back(0); + + return zcurve(all_prefix); + };*/ }; void Pht::getRealPrefix(std::shared_ptr<Prefix> p, IndexEntry entry, RealInsertCallback end_cb ) { + if ( p->size_ == 0 ) { end_cb(p, std::move(entry)); return;