diff --git a/CMakeLists.txt b/CMakeLists.txt index 605e349964d73546847fbcb2253c98a31aad8c45..545b449b2bd35925d262c301d1feb50361da120f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -53,6 +53,7 @@ list (APPEND opendht_SOURCES src/network_engine.cpp src/securedht.cpp src/dhtrunner.cpp + src/indexation/pht.cpp src/argon2/argon2.c src/argon2/core.c src/argon2/blake2/blake2b.c @@ -78,6 +79,7 @@ list (APPEND opendht_HEADERS include/opendht/securedht.h include/opendht/log.h include/opendht.h + include/opendht/indexation/pht.h ) configure_file ( diff --git a/include/opendht.h b/include/opendht.h index 7765476c8f0db7e8f9225642183d161eee2441c5..28fef543e668b56d3db27787b3f854ede97c0b10 100644 --- a/include/opendht.h +++ b/include/opendht.h @@ -22,3 +22,4 @@ #include "opendht/dhtrunner.h" #include "opendht/log.h" #include "opendht/default_types.h" +#include "opendht/indexation/pht.h" diff --git a/include/opendht/indexation/pht.h b/include/opendht/indexation/pht.h new file mode 100644 index 0000000000000000000000000000000000000000..5e91ac9cdb35628e2cc38d23b4d97993724140da --- /dev/null +++ b/include/opendht/indexation/pht.h @@ -0,0 +1,157 @@ +#pragma once + +#include <string> +#include <vector> +#include <memory> +#include <map> +#include <functional> +#include <stdexcept> +#include <bitset> +#include <iostream> +#include <sstream> + +#include "opendht.h" + +namespace dht { +namespace indexation { + +struct Prefix { + Prefix() {} + Prefix(InfoHash h) : size_(h.size() * 8), content_(h.begin(), h.end()) {} + Prefix(const Blob& d) : size_(d.size()*8), content_(d) {} + Prefix(const Prefix& p, size_t first) : size_(first), content_(p.content_.begin(), p.content_.begin()+first/8) { + auto rem = first % 8; + if (rem) + content_.push_back(p.content_[first/8] & (0xFF << (7 - rem))); + } + + Prefix getPrefix(ssize_t len) const { + if ((size_t)std::abs(len) > size_) + throw std::out_of_range("len larger than prefix size."); + if (len < 0) + len += size_; + return Prefix(*this, len); + } + Prefix getSibling() const { + Prefix copy = *this; + if (size_) { + size_t last_bit = (8 - size_) % 8; + copy.content_.back() ^= (1 << last_bit); + } + return copy; + } + + InfoHash hash() const { + Blob copy(content_); + copy.push_back(size_); + return InfoHash::get(copy); + } + + std::string toString() const { + std::stringstream ss; + auto bn = size_ % 8; + auto n = size_ / 8; + for (size_t i = 0; i<n; i++) + ss << std::bitset<8>(content_[i]); + if (bn) + for (unsigned b=0; b<bn; b++) + ss << (char)((content_[n] & (1 << (7 - b))) ? '1':'0'); + return ss.str(); + } + + size_t size_ {0}; + Blob content_ {}; +}; + +using Value = std::pair<InfoHash, dht::Value::Id>; + +struct IndexEntry : public dht::Value::Serializable<IndexEntry> { + const ValueType& getType() const { + return ValueType::USER_DATA; + } + + virtual void unpackValue(const dht::Value& v) { + Serializable<IndexEntry>::unpackValue(v); + name = v.user_type; + } + + virtual dht::Value packValue() const { + auto pack = Serializable<IndexEntry>::packValue(); + pack.user_type = name; + return pack; + } + + Blob prefix; + Value value; + std::string name; + MSGPACK_DEFINE_MAP(prefix, value); +}; + +class Pht { + static constexpr const char* INDEX_PREFIX = "index.pht."; + static constexpr const size_t MAX_NODE_ENTRY_COUNT {128}; + +public: + using Key = std::map<std::string, Prefix>; + + using LookupCallback = std::function<void(std::vector<std::shared_ptr<Value>>& values, Prefix p)>; + typedef void (*LookupCallbackRaw)(std::vector<std::shared_ptr<Value>>* values, Prefix* p, void *user_data); + static LookupCallback + bindLookupCb(LookupCallbackRaw raw_cb, void* user_data) { + if (not raw_cb) return {}; + return [=](std::vector<std::shared_ptr<Value>>& values, Prefix p) { + raw_cb((std::vector<std::shared_ptr<Value>>*) &values, (Prefix*) &p, user_data); + }; + } + + Pht(std::string name, std::shared_ptr<DhtRunner> dht) + : name_(INDEX_PREFIX + name), canary_(name_ + ".canary"), dht_(dht) { } + virtual ~Pht () { } + + /** + * Lookup a key for a value. + */ + void lookup(Key k, LookupCallback cb = {}, Dht::DoneCallbackSimple doneCb = {}); + /** + * Adds an entry into the index. + */ + void insert(Key k, Value v, Dht::DoneCallbackSimple cb = {}); + +private: + /** + * Linearizes the key into a unidimensional key. A pht only takes + * unidimensional key. + * + * @param Key The initial key. + * + * @return return The linearized key. + */ + static Prefix linearize(Key k) { + if (k.size() != 1) { throw std::invalid_argument("PHT only supports unidimensional data."); } + return k.begin()->second; + }; + + /** + * Performs a step in the lookup operation. Each steps are performed + * asynchronously. + */ + void lookupStep(Prefix k, std::shared_ptr<int> lo, + std::shared_ptr<int> hi, + std::shared_ptr<std::vector<std::shared_ptr<Value>>> vals, + LookupCallback cb, Dht::DoneCallbackSimple done_cb); + + /** + * Updates the canary token on the node responsible for the specified + * Prefix. + */ + void updateCanary(Prefix p); + + const std::string name_; + const std::string canary_; + + std::shared_ptr<DhtRunner> dht_; +}; + +} /* indexation */ +} /* dht */ + diff --git a/src/indexation/pht.cpp b/src/indexation/pht.cpp new file mode 100644 index 0000000000000000000000000000000000000000..07442a6781ac72cb7e5a70aba515c5ecd88c8df1 --- /dev/null +++ b/src/indexation/pht.cpp @@ -0,0 +1,153 @@ +#include "indexation/pht.h" +#include "rng.h" + +namespace dht { +namespace indexation { + +void Pht::lookupStep(Prefix p, std::shared_ptr<int> lo, + std::shared_ptr<int> hi, + std::shared_ptr<std::vector<std::shared_ptr<Value>>> vals, + LookupCallback cb, Dht::DoneCallbackSimple done_cb) +{ + struct node_lookup_result { + bool done {false}; + bool is_pht {false}; + }; + + auto mid = (*lo + *hi)/2; + auto first_res = std::make_shared<node_lookup_result>(); + auto second_res = std::make_shared<node_lookup_result>(); + auto on_done = [=](){ + bool is_leaf = first_res->is_pht and not second_res->is_pht; + if (is_leaf or *lo > *hi) { + // leaf node + if (cb) + cb(*vals, p.getPrefix(mid)); + if (done_cb) + done_cb(true); + } else { + // internal node + *lo = mid+1; + lookupStep(p, lo, hi, vals, cb, done_cb); + } + }; + if (*lo <= *hi) { + auto pht_filter = [&](const dht::Value& v) { + return v.user_type.compare(0, name_.size(), name_) == 0; + }; + + auto on_get = [=](const std::shared_ptr<dht::Value>& value, std::shared_ptr<node_lookup_result> res) { + if (value->user_type == canary_) + res->is_pht = true; + else { + IndexEntry entry; + entry.unpackValue(*value); + if (entry.prefix == p.content_) + vals->emplace_back(std::make_shared<Value>(entry.value)); + } + return true; + }; + dht_->get(p.getPrefix(mid).hash(), + std::bind(on_get, std::placeholders::_1, first_res), + [=](bool ok) { + if (not ok) { + // DHT failed + if (done_cb) + done_cb(false); + } + else { + if (not first_res->is_pht) { + // Not a PHT node. + *hi = mid-1; + lookupStep(p, lo, hi, vals, cb, done_cb); + } else { + first_res->done = true; + if (second_res->done) + on_done(); + } + } + }, pht_filter); + if (mid < p.size_) + dht_->get(p.getPrefix(mid+1).hash(), + std::bind(on_get, std::placeholders::_1, second_res), + [=](bool ok) { + if (not ok) { + // DHT failed + if (done_cb) + done_cb(false); + } + else { + second_res->done = true; + if (first_res->done) + on_done(); + } + }, pht_filter); + + } else { + on_done(); + } +} + +void Pht::lookup(Key k, Pht::LookupCallback cb, Dht::DoneCallbackSimple done_cb) { + auto values = std::make_shared<std::vector<std::shared_ptr<Value>>>(); + auto prefix = linearize(k); + auto lo = std::make_shared<int>(0); + auto hi = std::make_shared<int>(prefix.size_); + lookupStep(prefix, lo, hi, values, cb, done_cb); +} + +void Pht::updateCanary(Prefix p) { + // TODO: change this... copy value + dht::Value canary_value; + canary_value.user_type = canary_; + dht_->put(p.hash(), std::move(canary_value), + [=](bool ok){ + static std::bernoulli_distribution d(0.5); + crypto::random_device rd; + if (p.size_ && d(rd)) + updateCanary(p.getPrefix(-1)); + } + ); + + if (p.size_) { + dht::Value canary_second_value; + canary_second_value.user_type = canary_; + dht_->put(p.getSibling().hash(), std::move(canary_second_value)); + } +} + +void Pht::insert(Key k, Value v, Dht::DoneCallbackSimple done_cb) { + Prefix kp = linearize(k); + + auto lo = std::make_shared<int>(0); + auto hi = std::make_shared<int>(kp.size_); + auto vals = std::make_shared<std::vector<std::shared_ptr<Value>>>(); + auto final_prefix = std::make_shared<Prefix>(); + + lookupStep(kp, lo, hi, vals, + [=](std::vector<std::shared_ptr<Value>>& values, Prefix p) { + *final_prefix = Prefix(p); + return true; + }, + [=](bool ok){ + if (not ok) { + if (done_cb) + done_cb(false); + } else { + if (vals->size() > MAX_NODE_ENTRY_COUNT) + *final_prefix = kp.getPrefix(final_prefix->size_+1); + + IndexEntry entry; + entry.value = v; + entry.prefix = kp.content_; + entry.name = name_; + + updateCanary(*final_prefix); + dht_->put(final_prefix->hash(), std::move(entry), done_cb); + } + } + ); +} + +} /* indexation */ +} /* dht */