Skip to content
Snippets Groups Projects
Commit acc2b71c authored by Simon Désaulniers's avatar Simon Désaulniers Committed by Adrien Béraud
Browse files

base pht indexation

parent 09d849c9
No related branches found
No related tags found
No related merge requests found
...@@ -53,6 +53,7 @@ list (APPEND opendht_SOURCES ...@@ -53,6 +53,7 @@ list (APPEND opendht_SOURCES
src/network_engine.cpp src/network_engine.cpp
src/securedht.cpp src/securedht.cpp
src/dhtrunner.cpp src/dhtrunner.cpp
src/indexation/pht.cpp
src/argon2/argon2.c src/argon2/argon2.c
src/argon2/core.c src/argon2/core.c
src/argon2/blake2/blake2b.c src/argon2/blake2/blake2b.c
...@@ -78,6 +79,7 @@ list (APPEND opendht_HEADERS ...@@ -78,6 +79,7 @@ list (APPEND opendht_HEADERS
include/opendht/securedht.h include/opendht/securedht.h
include/opendht/log.h include/opendht/log.h
include/opendht.h include/opendht.h
include/opendht/indexation/pht.h
) )
configure_file ( configure_file (
......
...@@ -22,3 +22,4 @@ ...@@ -22,3 +22,4 @@
#include "opendht/dhtrunner.h" #include "opendht/dhtrunner.h"
#include "opendht/log.h" #include "opendht/log.h"
#include "opendht/default_types.h" #include "opendht/default_types.h"
#include "opendht/indexation/pht.h"
#pragma once
#include <string>
#include <vector>
#include <memory>
#include <map>
#include <functional>
#include <stdexcept>
#include <bitset>
#include <iostream>
#include <sstream>
#include "opendht.h"
namespace dht {
namespace indexation {
struct Prefix {
Prefix() {}
Prefix(InfoHash h) : size_(h.size() * 8), content_(h.begin(), h.end()) {}
Prefix(const Blob& d) : size_(d.size()*8), content_(d) {}
Prefix(const Prefix& p, size_t first) : size_(first), content_(p.content_.begin(), p.content_.begin()+first/8) {
auto rem = first % 8;
if (rem)
content_.push_back(p.content_[first/8] & (0xFF << (7 - rem)));
}
Prefix getPrefix(ssize_t len) const {
if ((size_t)std::abs(len) > size_)
throw std::out_of_range("len larger than prefix size.");
if (len < 0)
len += size_;
return Prefix(*this, len);
}
Prefix getSibling() const {
Prefix copy = *this;
if (size_) {
size_t last_bit = (8 - size_) % 8;
copy.content_.back() ^= (1 << last_bit);
}
return copy;
}
InfoHash hash() const {
Blob copy(content_);
copy.push_back(size_);
return InfoHash::get(copy);
}
std::string toString() const {
std::stringstream ss;
auto bn = size_ % 8;
auto n = size_ / 8;
for (size_t i = 0; i<n; i++)
ss << std::bitset<8>(content_[i]);
if (bn)
for (unsigned b=0; b<bn; b++)
ss << (char)((content_[n] & (1 << (7 - b))) ? '1':'0');
return ss.str();
}
size_t size_ {0};
Blob content_ {};
};
using Value = std::pair<InfoHash, dht::Value::Id>;
struct IndexEntry : public dht::Value::Serializable<IndexEntry> {
const ValueType& getType() const {
return ValueType::USER_DATA;
}
virtual void unpackValue(const dht::Value& v) {
Serializable<IndexEntry>::unpackValue(v);
name = v.user_type;
}
virtual dht::Value packValue() const {
auto pack = Serializable<IndexEntry>::packValue();
pack.user_type = name;
return pack;
}
Blob prefix;
Value value;
std::string name;
MSGPACK_DEFINE_MAP(prefix, value);
};
class Pht {
static constexpr const char* INDEX_PREFIX = "index.pht.";
static constexpr const size_t MAX_NODE_ENTRY_COUNT {128};
public:
using Key = std::map<std::string, Prefix>;
using LookupCallback = std::function<void(std::vector<std::shared_ptr<Value>>& values, Prefix p)>;
typedef void (*LookupCallbackRaw)(std::vector<std::shared_ptr<Value>>* values, Prefix* p, void *user_data);
static LookupCallback
bindLookupCb(LookupCallbackRaw raw_cb, void* user_data) {
if (not raw_cb) return {};
return [=](std::vector<std::shared_ptr<Value>>& values, Prefix p) {
raw_cb((std::vector<std::shared_ptr<Value>>*) &values, (Prefix*) &p, user_data);
};
}
Pht(std::string name, std::shared_ptr<DhtRunner> dht)
: name_(INDEX_PREFIX + name), canary_(name_ + ".canary"), dht_(dht) { }
virtual ~Pht () { }
/**
* Lookup a key for a value.
*/
void lookup(Key k, LookupCallback cb = {}, Dht::DoneCallbackSimple doneCb = {});
/**
* Adds an entry into the index.
*/
void insert(Key k, Value v, Dht::DoneCallbackSimple cb = {});
private:
/**
* Linearizes the key into a unidimensional key. A pht only takes
* unidimensional key.
*
* @param Key The initial key.
*
* @return return The linearized key.
*/
static Prefix linearize(Key k) {
if (k.size() != 1) { throw std::invalid_argument("PHT only supports unidimensional data."); }
return k.begin()->second;
};
/**
* Performs a step in the lookup operation. Each steps are performed
* asynchronously.
*/
void lookupStep(Prefix k, std::shared_ptr<int> lo,
std::shared_ptr<int> hi,
std::shared_ptr<std::vector<std::shared_ptr<Value>>> vals,
LookupCallback cb, Dht::DoneCallbackSimple done_cb);
/**
* Updates the canary token on the node responsible for the specified
* Prefix.
*/
void updateCanary(Prefix p);
const std::string name_;
const std::string canary_;
std::shared_ptr<DhtRunner> dht_;
};
} /* indexation */
} /* dht */
#include "indexation/pht.h"
#include "rng.h"
namespace dht {
namespace indexation {
void Pht::lookupStep(Prefix p, std::shared_ptr<int> lo,
std::shared_ptr<int> hi,
std::shared_ptr<std::vector<std::shared_ptr<Value>>> vals,
LookupCallback cb, Dht::DoneCallbackSimple done_cb)
{
struct node_lookup_result {
bool done {false};
bool is_pht {false};
};
auto mid = (*lo + *hi)/2;
auto first_res = std::make_shared<node_lookup_result>();
auto second_res = std::make_shared<node_lookup_result>();
auto on_done = [=](){
bool is_leaf = first_res->is_pht and not second_res->is_pht;
if (is_leaf or *lo > *hi) {
// leaf node
if (cb)
cb(*vals, p.getPrefix(mid));
if (done_cb)
done_cb(true);
} else {
// internal node
*lo = mid+1;
lookupStep(p, lo, hi, vals, cb, done_cb);
}
};
if (*lo <= *hi) {
auto pht_filter = [&](const dht::Value& v) {
return v.user_type.compare(0, name_.size(), name_) == 0;
};
auto on_get = [=](const std::shared_ptr<dht::Value>& value, std::shared_ptr<node_lookup_result> res) {
if (value->user_type == canary_)
res->is_pht = true;
else {
IndexEntry entry;
entry.unpackValue(*value);
if (entry.prefix == p.content_)
vals->emplace_back(std::make_shared<Value>(entry.value));
}
return true;
};
dht_->get(p.getPrefix(mid).hash(),
std::bind(on_get, std::placeholders::_1, first_res),
[=](bool ok) {
if (not ok) {
// DHT failed
if (done_cb)
done_cb(false);
}
else {
if (not first_res->is_pht) {
// Not a PHT node.
*hi = mid-1;
lookupStep(p, lo, hi, vals, cb, done_cb);
} else {
first_res->done = true;
if (second_res->done)
on_done();
}
}
}, pht_filter);
if (mid < p.size_)
dht_->get(p.getPrefix(mid+1).hash(),
std::bind(on_get, std::placeholders::_1, second_res),
[=](bool ok) {
if (not ok) {
// DHT failed
if (done_cb)
done_cb(false);
}
else {
second_res->done = true;
if (first_res->done)
on_done();
}
}, pht_filter);
} else {
on_done();
}
}
void Pht::lookup(Key k, Pht::LookupCallback cb, Dht::DoneCallbackSimple done_cb) {
auto values = std::make_shared<std::vector<std::shared_ptr<Value>>>();
auto prefix = linearize(k);
auto lo = std::make_shared<int>(0);
auto hi = std::make_shared<int>(prefix.size_);
lookupStep(prefix, lo, hi, values, cb, done_cb);
}
void Pht::updateCanary(Prefix p) {
// TODO: change this... copy value
dht::Value canary_value;
canary_value.user_type = canary_;
dht_->put(p.hash(), std::move(canary_value),
[=](bool ok){
static std::bernoulli_distribution d(0.5);
crypto::random_device rd;
if (p.size_ && d(rd))
updateCanary(p.getPrefix(-1));
}
);
if (p.size_) {
dht::Value canary_second_value;
canary_second_value.user_type = canary_;
dht_->put(p.getSibling().hash(), std::move(canary_second_value));
}
}
void Pht::insert(Key k, Value v, Dht::DoneCallbackSimple done_cb) {
Prefix kp = linearize(k);
auto lo = std::make_shared<int>(0);
auto hi = std::make_shared<int>(kp.size_);
auto vals = std::make_shared<std::vector<std::shared_ptr<Value>>>();
auto final_prefix = std::make_shared<Prefix>();
lookupStep(kp, lo, hi, vals,
[=](std::vector<std::shared_ptr<Value>>& values, Prefix p) {
*final_prefix = Prefix(p);
return true;
},
[=](bool ok){
if (not ok) {
if (done_cb)
done_cb(false);
} else {
if (vals->size() > MAX_NODE_ENTRY_COUNT)
*final_prefix = kp.getPrefix(final_prefix->size_+1);
IndexEntry entry;
entry.value = v;
entry.prefix = kp.content_;
entry.name = name_;
updateCanary(*final_prefix);
dht_->put(final_prefix->hash(), std::move(entry), done_cb);
}
}
);
}
} /* indexation */
} /* dht */
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment