Skip to content
Snippets Groups Projects
Commit 48c40c8a authored by kaldoran's avatar kaldoran Committed by Adrien Béraud
Browse files

pht: correction on the algorithm of complet node splitting

parent a4a4eee4
Branches
Tags
No related merge requests found
......@@ -128,7 +128,6 @@ struct Prefix {
throw std::out_of_range("bit larger than prefix size.");
Prefix copy = *this;
size_t offset_bit = (8 - bit) % 8;
copy.content_[bit / 8] ^= (1 << offset_bit);
......@@ -137,10 +136,32 @@ struct Prefix {
size_t size_ {0};
Blob content_ {};
};
using Value = std::pair<InfoHash, dht::Value::Id>;
struct IndexEntry : public dht::Value::Serializable<IndexEntry> {
static const ValueType TYPE;
virtual void unpackValue(const dht::Value& v) {
Serializable<IndexEntry>::unpackValue(v);
name = v.user_type;
}
virtual dht::Value packValue() const {
auto pack = Serializable<IndexEntry>::packValue();
pack.user_type = name;
return pack;
}
Blob prefix;
Value value;
std::string name;
MSGPACK_DEFINE_MAP(prefix, value);
};
class Pht {
static constexpr const char* INVALID_KEY = "Key does not match the PHT key spec.";
......@@ -151,15 +172,13 @@ public:
/* This is the maximum number of entries per node. This parameter is
* critical and influences the traffic a lot during a lookup operation.
*/
static constexpr const size_t MAX_NODE_ENTRY_COUNT {16};
static constexpr const size_t MAX_NODE_ENTRY_COUNT {2}; // FIX : 16
/* A key for a an index entry */
using Key = std::map<std::string, Blob>;
/* Specifications of the keys. It defines the number, the length and the
* serialization order of fields. */
using KeySpec = std::map<std::string, size_t>;
using RealInsertCallback = std::function<void(std::shared_ptr<Prefix> p, IndexEntry entry )>;
using LookupCallback = std::function<void(std::vector<std::shared_ptr<Value>>& values, Prefix p)>;
typedef void (*LookupCallbackRaw)(std::vector<std::shared_ptr<Value>>* values, Prefix* p, void *user_data);
......@@ -200,9 +219,24 @@ public:
/**
* Adds an entry into the index.
*/
void insert(Key k, Value v, DoneCallbackSimple cb = {});
void insert(Key k, Value v, DoneCallbackSimple done_cb = {}) {
Prefix p = linearize(k);
auto lo = std::make_shared<int>(0);
auto hi = std::make_shared<int>(p.size_);
IndexEntry entry;
entry.value = v;
entry.prefix = p.content_;
entry.name = name_;
Pht::insert(p, entry, lo, hi, clock::now(), true, done_cb);
}
private:
void insert( Prefix kp, IndexEntry entry, std::shared_ptr<int> lo, std::shared_ptr<int> hi, time_point time_p,
bool check_split, DoneCallbackSimple done_cb = {});
class Cache {
public:
/**
......@@ -217,11 +251,12 @@ private:
* @param p : Prefix that we are looking for
* @return : The size of the longest prefix known in the cache between 0 and p.size_
*/
int lookup(const Prefix& p);
private:
static constexpr const size_t MAX_ELEMENT {1024};
static constexpr const std::chrono::minutes NODE_EXPIRE_TIME {5};
static constexpr const std::chrono::minutes NODE_EXPIRE_TIME {10};
struct Node {
time_point last_reply; /* Made the assocation between leaves and leaves multimap */
......@@ -240,12 +275,16 @@ private:
std::multimap<time_point, std::shared_ptr<Node>> leaves_;
};
/* Callback used for insert value by using the pht */
using RealInsertCallback = std::function<void(std::shared_ptr<Prefix> p, IndexEntry entry )>;
using LookupCallbackWrapper = std::function<void(std::vector<std::shared_ptr<IndexEntry>>& values, Prefix p)>;
/**
* Performs a step in the lookup operation. Each steps are performed
* asynchronously.
*/
void lookupStep(Prefix k, std::shared_ptr<int> lo, std::shared_ptr<int> hi,
std::shared_ptr<std::vector<std::shared_ptr<Value>>> vals, LookupCallback cb,
std::shared_ptr<std::vector<std::shared_ptr<IndexEntry>>> vals, LookupCallbackWrapper cb,
DoneCallbackSimple done_cb, std::shared_ptr<unsigned> max_common_prefix_len,
int start = -1, bool all_values = false);
......@@ -257,17 +296,7 @@ private:
*
* @return the prefix of the linearized key.
*/
virtual Prefix linearize(Key k) const {
if (not validKey(k)) { throw std::invalid_argument(INVALID_KEY); }
Prefix p = Blob {k.begin()->second.begin(), k.begin()->second.end()};
auto bit_loc = p.size_ + 1;
for ( auto i = p.content_.size(); i < keySpec_.begin()->second + 1; i++ )
p.content_.push_back(0);
return p.swapBit(bit_loc);
};
virtual Prefix linearize(Key k) const;
/**
* Check if there is a new canary on the next bit of the prefix p,
......@@ -276,25 +305,30 @@ private:
* @param p Prefix that need to be kept update
* @paran entry The entry to put back
*/
void checkPhtUpdate(const Prefix &p, const IndexEntry &entry) {
void checkPhtUpdate(Key k, Prefix p, IndexEntry entry) {
/* Don't try to go further than the end of the prefix */
if ( p.size_ >= p.content_.size() * 8 ) return;
Prefix full = entry.prefix;
if ( p.size_ >= full.size_ ) return;
auto next_prefix = p.getPrefix( p.size_ + 1);
auto next_prefix = full.getPrefix( p.size_ + 1 );
dht_->listen(next_prefix.hash(),
[=](const std::shared_ptr<dht::Value> &value) {
if (value->user_type == canary_) {
dht_->put(next_prefix.hash(), std::move(entry));
checkPhtUpdate(next_prefix, entry);
// updateCanary(next_prefix);
// checkPhtUpdate(next_prefix, entry);
// std::cerr << " LISTEN PUT HERE " << next_prefix.toString() << std::endl;
// dht_->put(next_prefix.hash(), std::move(entry));
// checkPhtUpdate(next_prefix, entry);
// dht_->put(next_prefix.hash(), entry);
// std::cerr << "New canary found ! " << next_prefix.toString() << std::endl;
insert(k, entry.value, nullptr);
std::cerr << "New canary found !" << std::endl;
/* Cancel listen since we found where we need to update*/
return false;
}
std::cerr << "Next value since no canary_ found !" << std::endl;
return true;
},
[=](const dht::Value& v) {
......@@ -311,62 +345,39 @@ private:
* @param entry The entry to put at the prefix p
* @param end_cb Callback to use at the end of counting
*/
void getRealPrefix(std::shared_ptr<Prefix> p, IndexEntry entry, RealInsertCallback end_cb ) {
void getRealPrefix(std::shared_ptr<Prefix> p, IndexEntry entry, RealInsertCallback end_cb );
auto total = std::make_shared<int>(0); /* Will contains the total number of data on 3 nodes */
auto ended = std::make_shared<int>(0); /* Just indicate how many have end */
auto parent = std::make_shared<Prefix>(p->getPrefix(-1));
auto sibling = std::make_shared<Prefix>(p->getSibling());
auto pht_filter = [&](const dht::Value& v) {
return v.user_type.compare(0, name_.size(), name_) == 0;
};
/* Lambda will count total number of data node */
auto count = [=]( const std::shared_ptr<dht::Value> val ) {
if ( val->user_type != canary_)
(*total)++;
std::cerr << "Total data " << *total << std::endl;
return true;
};
auto on_done = [=] ( bool ) {
(*ended)++;
/* Only the last one do the CallBack*/
if ( *ended == 3 ) {
if ( *total < MAX_NODE_ENTRY_COUNT )
end_cb(parent, std::move(entry));
else
end_cb(p, std::move(entry));
std::cerr << "Total" << *total << std::endl;
}
};
dht_->get(parent->hash(),
count,
on_done,
pht_filter
);
/**
* Looking where to put the data cause if there is free space on the node
* above then this node will became the real leave.
*
* @param p Share_ptr on the Prefix to check
* @param entry The entry to put at the prefix p
* @param end_cb Callback to use at the end of counting
*/
void checkPhtUpdate(Prefix p, IndexEntry entry, time_point time_p);
dht_->get(p->hash(),
count,
on_done,
pht_filter
);
size_t foundSplitLocation(Prefix compared, std::shared_ptr<std::vector<std::shared_ptr<IndexEntry>>> vals) {
for ( size_t i = 0; i < compared.content_.size() * 8; i++ )
for ( auto const& v : *vals)
if ( Prefix(v->prefix).isActiveBit(i) != compared.isActiveBit(i) )
return i;
dht_->get(sibling->hash(),
count,
on_done,
pht_filter
);
return compared.size_ - 1;
}
/**
* Performs a step in the lookup operation. Each steps are performed
* asynchronously.
* Put canary from the split point until the last known canary and add the prefix at the good place
*
* @param insert : Prefix to insertm but also prefix which going to check where we need to split
* @param vals : Vector of vals for the comparaison
* @param entry : Entry to put on the pht
* @param end_cb : Callback to apply to the insert prefi (here does the insert)
*/
void split(Prefix insert, std::shared_ptr<std::vector<std::shared_ptr<IndexEntry>>> vals, IndexEntry entry, RealInsertCallback end_cb );
/**
* Tells if the key is valid according to the key spec.
*/
bool validKey(const Key& k) const {
return k.size() == keySpec_.size() and
......
......@@ -4,26 +4,6 @@
namespace dht {
namespace indexation {
struct IndexEntry : public dht::Value::Serializable<IndexEntry> {
static const ValueType TYPE;
virtual void unpackValue(const dht::Value& v) {
Serializable<IndexEntry>::unpackValue(v);
name = v.user_type;
}
virtual dht::Value packValue() const {
auto pack = Serializable<IndexEntry>::packValue();
pack.user_type = name;
return pack;
}
Blob prefix;
Value value;
std::string name;
MSGPACK_DEFINE_MAP(prefix, value);
};
void Pht::Cache::insert(const Prefix& p) {
size_t i = 0;
auto now = clock::now();
......@@ -32,13 +12,13 @@ void Pht::Cache::insert(const Prefix& p) {
while ((leaves_.size() > 0 && leaves_.begin()->first + NODE_EXPIRE_TIME < now) || leaves_.size() > MAX_ELEMENT)
leaves_.erase(leaves_.begin());
if (not (curr_node = root_.lock())) {
if (not (curr_node = root_.lock()) ) {
/* Root does not exist, need to create one*/
curr_node = std::make_shared<Node>();
root_ = curr_node;
}
curr_node->last_reply = now;
/* Iterate through all bit of the Blob */
......@@ -69,7 +49,7 @@ void Pht::Cache::insert(const Prefix& p) {
}
int Pht::Cache::lookup(const Prefix& p) {
int pos = 0;
int pos = -1;
auto now = clock::now(), last_node_time = now;
/* Before lookup remove the useless one [i.e. too old] */
......@@ -81,6 +61,8 @@ int Pht::Cache::lookup(const Prefix& p) {
std::shared_ptr<Node> curr_node;
while ( auto n = next.lock() ) {
++pos;
/* Safe since pos is equal to 0 until here */
if ( (unsigned) pos >= p.size_ ) break;
......@@ -90,11 +72,9 @@ int Pht::Cache::lookup(const Prefix& p) {
/* Get the Prefix bit by bit, starting from left */
next = ( p.isActiveBit(pos) ) ? curr_node->right_child : curr_node->left_child;
++pos;
}
if ( pos > 0 ) {
if ( pos >= 0 ) {
auto to_erase = leaves_.find(last_node_time);
if ( to_erase != leaves_.end() )
leaves_.erase( to_erase );
......@@ -102,15 +82,15 @@ int Pht::Cache::lookup(const Prefix& p) {
leaves_.emplace( std::move(now), std::move(curr_node) );
}
return --pos;
return pos;
}
const ValueType IndexEntry::TYPE = ValueType::USER_DATA;
constexpr std::chrono::minutes Pht::Cache::NODE_EXPIRE_TIME;
void Pht::lookupStep(Prefix p, std::shared_ptr<int> lo, std::shared_ptr<int> hi,
std::shared_ptr<std::vector<std::shared_ptr<Value>>> vals,
LookupCallback cb, DoneCallbackSimple done_cb,
std::shared_ptr<std::vector<std::shared_ptr<IndexEntry>>> vals,
LookupCallbackWrapper cb, DoneCallbackSimple done_cb,
std::shared_ptr<unsigned> max_common_prefix_len, int start, bool all_values)
{
struct node_lookup_result {
......@@ -122,6 +102,7 @@ void Pht::lookupStep(Prefix p, std::shared_ptr<int> lo, std::shared_ptr<int> hi,
auto mid = (start >= 0) ? (unsigned) start : (*lo + *hi)/2;
auto first_res = std::make_shared<node_lookup_result>();
auto second_res = std::make_shared<node_lookup_result>();
auto on_done = [=](bool ok) {
bool is_leaf = first_res->is_pht and not second_res->is_pht;
if (not ok) {
......@@ -130,7 +111,7 @@ void Pht::lookupStep(Prefix p, std::shared_ptr<int> lo, std::shared_ptr<int> hi,
}
else if (is_leaf or *lo > *hi) {
// leaf node
auto to_insert = p.getPrefix(mid);
Prefix to_insert = p.getPrefix(mid);
cache_.insert(to_insert);
if (cb) {
......@@ -162,8 +143,9 @@ void Pht::lookupStep(Prefix p, std::shared_ptr<int> lo, std::shared_ptr<int> hi,
};
auto on_get = [=](const std::shared_ptr<dht::Value>& value, std::shared_ptr<node_lookup_result> res) {
if (value->user_type == canary_)
if (value->user_type == canary_) {
res->is_pht = true;
}
else {
IndexEntry entry;
entry.unpackValue(*value);
......@@ -171,20 +153,20 @@ void Pht::lookupStep(Prefix p, std::shared_ptr<int> lo, std::shared_ptr<int> hi,
if (max_common_prefix_len) { /* inexact match case */
auto common_bits = Prefix::commonBits(p, entry.prefix);
if (vals->empty()) {
vals->emplace_back(std::make_shared<Value>(entry.value));
vals->emplace_back(std::make_shared<IndexEntry>(entry));
*max_common_prefix_len = common_bits;
}
else {
if (common_bits == *max_common_prefix_len) /* this is the max so far */
vals->emplace_back(std::make_shared<Value>(entry.value));
vals->emplace_back(std::make_shared<IndexEntry>(entry));
else if (common_bits > *max_common_prefix_len) { /* new max found! */
vals->clear();
vals->emplace_back(std::make_shared<Value>(entry.value));
vals->emplace_back(std::make_shared<IndexEntry>(entry));
*max_common_prefix_len = common_bits;
}
}
} else if (all_values or entry.prefix == p.content_) /* exact match case */
vals->emplace_back(std::make_shared<Value>(entry.value));
vals->emplace_back(std::make_shared<IndexEntry>(entry));
}
return true;
};
......@@ -205,7 +187,7 @@ void Pht::lookupStep(Prefix p, std::shared_ptr<int> lo, std::shared_ptr<int> hi,
lookupStep(p, lo, hi, vals, cb, done_cb, max_common_prefix_len, -1, all_values);
} else {
first_res->done = true;
if (second_res->done)
if (second_res->done or mid >= p.size_ )
on_done(true);
}
}
......@@ -234,20 +216,31 @@ void Pht::lookupStep(Prefix p, std::shared_ptr<int> lo, std::shared_ptr<int> hi,
}
void Pht::lookup(Key k, Pht::LookupCallback cb, DoneCallbackSimple done_cb, bool exact_match) {
auto values = std::make_shared<std::vector<std::shared_ptr<Value>>>();
auto prefix = linearize(k);
auto values = std::make_shared<std::vector<std::shared_ptr<IndexEntry>>>();
auto lo = std::make_shared<int>(0);
auto hi = std::make_shared<int>(prefix.size_);
std::shared_ptr<unsigned> max_common_prefix_len = not exact_match ? std::make_shared<unsigned>(0) : nullptr;
lookupStep(prefix, lo, hi, values, cb, done_cb, max_common_prefix_len, cache_.lookup(prefix));
lookupStep(prefix, lo, hi, values,
[=](std::vector<std::shared_ptr<IndexEntry>>& entries, Prefix p) {
std::vector<std::shared_ptr<Value>> vals(entries.size());
std::transform(entries.begin(), entries.end(), vals.begin(),
[](const std::shared_ptr<IndexEntry>& ie) {
return std::make_shared<Value>(ie->value);
});
cb(vals, p);
}, done_cb, max_common_prefix_len, cache_.lookup(prefix));
}
void Pht::updateCanary(Prefix p) {
// TODO: change this... copy value
dht::Value canary_value;
canary_value.user_type = canary_;
dht_->put(p.hash(), std::move(canary_value),
[=](bool){
static std::bernoulli_distribution d(0.5);
......@@ -264,17 +257,17 @@ void Pht::updateCanary(Prefix p) {
}
}
void Pht::insert(Key k, Value v, DoneCallbackSimple done_cb) {
Prefix kp = linearize(k);
auto lo = std::make_shared<int>(0);
auto hi = std::make_shared<int>(kp.size_);
auto vals = std::make_shared<std::vector<std::shared_ptr<Value>>>();
auto final_prefix = std::make_shared<Prefix>();
void Pht::insert(Prefix kp, IndexEntry entry, std::shared_ptr<int> lo, std::shared_ptr<int> hi, time_point time_p,
bool check_split, DoneCallbackSimple done_cb) {
if (time_p + ValueType::USER_DATA.expiration < clock::now()) return;
auto vals = std::make_shared<std::vector<std::shared_ptr<IndexEntry>>>();
auto final_prefix = std::make_shared<Prefix>();
lookupStep(kp, lo, hi, vals,
[=](std::vector<std::shared_ptr<Value>>&, Prefix p) {
[=](std::vector<std::shared_ptr<IndexEntry>>&, Prefix p) {
*final_prefix = Prefix(p);
},
[=](bool ok){
......@@ -283,29 +276,128 @@ void Pht::insert(Key k, Value v, DoneCallbackSimple done_cb) {
done_cb(false);
} else {
IndexEntry entry;
entry.value = v;
entry.prefix = kp.content_;
entry.name = name_;
RealInsertCallback real_insert = [=]( std::shared_ptr<Prefix> p, IndexEntry entry ) {
RealInsertCallback real_insert = [=]( std::shared_ptr<Prefix> p, IndexEntry entry) {
updateCanary(*p);
checkPhtUpdate(*p, entry);
dht_->put(p->hash(), std::move(entry), done_cb);
checkPhtUpdate(*p, entry, time_p);
cache_.insert(*p);
dht_->put(p->hash(), std::move(entry), done_cb, time_p);
};
std::cerr << "Insert prefix" << p.toString() << std::endl;
if ( vals->size() <= MAX_NODE_ENTRY_COUNT )
getRealPrefix( final_prefix, std::move(entry), real_insert);
else {
*final_prefix = kp.getPrefix(final_prefix->size_+1);
real_insert( final_prefix, std::move(entry) );
if ( not check_split or final_prefix->size_ == kp.size_ ) {
real_insert(final_prefix, std::move(entry));
} else {
if ( vals->size() < MAX_NODE_ENTRY_COUNT )
getRealPrefix(final_prefix, std::move(entry), real_insert);
else
split(*final_prefix, vals, entry, real_insert);
}
}
}, nullptr, cache_.lookup(kp), true
);
}
Prefix Pht::linearize(Key k) const {
if (not validKey(k)) { throw std::invalid_argument(INVALID_KEY); }
Prefix p = Blob {k.begin()->second.begin(), k.begin()->second.end()};
auto bit_loc = p.size_ + 1;
for ( auto i = p.content_.size(); i < keySpec_.begin()->second + 1; i++ )
p.content_.push_back(0);
return p.swapBit(bit_loc);
};
void Pht::getRealPrefix(std::shared_ptr<Prefix> p, IndexEntry entry, RealInsertCallback end_cb ) {
if ( p->size_ == 0 ) {
end_cb(p, std::move(entry));
return;
}
auto total = std::make_shared<unsigned int>(0); /* Will contains the total number of data on 3 nodes */
auto ended = std::make_shared<unsigned int>(0); /* Just indicate how many have end */
auto parent = std::make_shared<Prefix>(p->getPrefix(-1));
auto sibling = std::make_shared<Prefix>(p->getSibling());
auto pht_filter = [&](const dht::Value& v) {
return v.user_type.compare(0, name_.size(), name_) == 0;
};
/* Lambda will count total number of data node */
auto count = [=]( const std::shared_ptr<dht::Value> value ) {
if ( value->user_type != canary_)
(*total)++;
return true;
};
auto on_done = [=] ( bool ) {
(*ended)++;
/* Only the last one do the CallBack*/
if ( *ended == 3 ) {
if ( *total < MAX_NODE_ENTRY_COUNT )
end_cb(parent, std::move(entry));
else
end_cb(p, std::move(entry));
}
};
dht_->get(parent->hash(),
count,
on_done,
pht_filter
);
dht_->get(p->hash(),
count,
on_done,
pht_filter
);
dht_->get(sibling->hash(),
count,
on_done,
pht_filter
);
}
void Pht::checkPhtUpdate(Prefix p, IndexEntry entry, time_point time_p) {
Prefix full = entry.prefix;
if ( p.content_.size() * 8 >= full.content_.size() * 8 ) return;
auto next_prefix = full.getPrefix( p.size_ + 1 );
dht_->listen(next_prefix.hash(),
[=](const std::shared_ptr<dht::Value> &value) {
if (value->user_type == canary_) {
insert(full, entry, std::make_shared<int>(0), std::make_shared<int>(full.size_), time_p, false, nullptr);
/* Cancel listen since we found where we need to update*/
return false;
}
return true;
},
[=](const dht::Value& v) {
/* Filter value v thats start with the same name as ours */
return v.user_type.compare(0, name_.size(), name_) == 0;
}
);
}
void Pht::split(Prefix insert, std::shared_ptr<std::vector<std::shared_ptr<IndexEntry>>> vals, IndexEntry entry, RealInsertCallback end_cb ) {
auto full = Prefix(entry.prefix);
auto loc = foundSplitLocation(full, vals);
auto prefix_to_insert = std::make_shared<Prefix>(full.getPrefix(loc));
for(;loc != insert.size_ - 1; loc--)
updateCanary(full.getPrefix(loc));
end_cb(prefix_to_insert, entry);
}
} /* indexation */
} /* dht */
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment