Skip to content
Snippets Groups Projects
Commit a4a4eee4 authored by kaldoran's avatar kaldoran Committed by Adrien Béraud
Browse files

pht: introduction of complet node splitting

parent d185428d
No related branches found
No related tags found
No related merge requests found
......@@ -159,7 +159,9 @@ public:
* serialization order of fields. */
using KeySpec = std::map<std::string, size_t>;
using RealInsertCallback = std::function<void(std::shared_ptr<Prefix> p, IndexEntry entry )>;
using LookupCallback = std::function<void(std::vector<std::shared_ptr<Value>>& values, Prefix p)>;
typedef void (*LookupCallbackRaw)(std::vector<std::shared_ptr<Value>>* values, Prefix* p, void *user_data);
static LookupCallback
bindLookupCb(LookupCallbackRaw raw_cb, void* user_data) {
......@@ -268,7 +270,103 @@ private:
};
/**
* Tells if the key is valid according to the key spec.
* Check if there is a new canary on the next bit of the prefix p,
* If there is a new one then it will re-put the data there.
*
* @param p Prefix that need to be kept update
* @paran entry The entry to put back
*/
void checkPhtUpdate(const Prefix &p, const IndexEntry &entry) {
/* Don't try to go further than the end of the prefix */
if ( p.size_ >= p.content_.size() * 8 ) return;
auto next_prefix = p.getPrefix( p.size_ + 1);
dht_->listen(next_prefix.hash(),
[=](const std::shared_ptr<dht::Value> &value) {
if (value->user_type == canary_) {
dht_->put(next_prefix.hash(), std::move(entry));
checkPhtUpdate(next_prefix, entry);
std::cerr << "New canary found !" << std::endl;
/* Cancel listen since we found where we need to update*/
return false;
}
std::cerr << "Next value since no canary_ found !" << std::endl;
return true;
},
[=](const dht::Value& v) {
/* Filter value v thats start with the same name as ours */
return v.user_type.compare(0, name_.size(), name_) == 0;
}
);
}
/**
* Looking where to put the data cause if there i free space on the node above then this node will became the real leave.
*
* @param p Share_ptr on the Prefix to check
* @param entry The entry to put at the prefix p
* @param end_cb Callback to use at the end of counting
*/
void getRealPrefix(std::shared_ptr<Prefix> p, IndexEntry entry, RealInsertCallback end_cb ) {
auto total = std::make_shared<int>(0); /* Will contains the total number of data on 3 nodes */
auto ended = std::make_shared<int>(0); /* Just indicate how many have end */
auto parent = std::make_shared<Prefix>(p->getPrefix(-1));
auto sibling = std::make_shared<Prefix>(p->getSibling());
auto pht_filter = [&](const dht::Value& v) {
return v.user_type.compare(0, name_.size(), name_) == 0;
};
/* Lambda will count total number of data node */
auto count = [=]( const std::shared_ptr<dht::Value> val ) {
if ( val->user_type != canary_)
(*total)++;
std::cerr << "Total data " << *total << std::endl;
return true;
};
auto on_done = [=] ( bool ) {
(*ended)++;
/* Only the last one do the CallBack*/
if ( *ended == 3 ) {
if ( *total < MAX_NODE_ENTRY_COUNT )
end_cb(parent, std::move(entry));
else
end_cb(p, std::move(entry));
std::cerr << "Total" << *total << std::endl;
}
};
dht_->get(parent->hash(),
count,
on_done,
pht_filter
);
dht_->get(p->hash(),
count,
on_done,
pht_filter
);
dht_->get(sibling->hash(),
count,
on_done,
pht_filter
);
}
/**
* Performs a step in the lookup operation. Each steps are performed
* asynchronously.
*/
bool validKey(const Key& k) const {
return k.size() == keySpec_.size() and
......
......@@ -247,6 +247,7 @@ void Pht::updateCanary(Prefix p) {
// TODO: change this... copy value
dht::Value canary_value;
canary_value.user_type = canary_;
dht_->put(p.hash(), std::move(canary_value),
[=](bool){
static std::bernoulli_distribution d(0.5);
......@@ -271,6 +272,7 @@ void Pht::insert(Key k, Value v, DoneCallbackSimple done_cb) {
auto vals = std::make_shared<std::vector<std::shared_ptr<Value>>>();
auto final_prefix = std::make_shared<Prefix>();
lookupStep(kp, lo, hi, vals,
[=](std::vector<std::shared_ptr<Value>>&, Prefix p) {
*final_prefix = Prefix(p);
......@@ -280,16 +282,26 @@ void Pht::insert(Key k, Value v, DoneCallbackSimple done_cb) {
if (done_cb)
done_cb(false);
} else {
if (vals->size() >= MAX_NODE_ENTRY_COUNT)
*final_prefix = kp.getPrefix(final_prefix->size_+1);
IndexEntry entry;
entry.value = v;
entry.prefix = kp.content_;
entry.name = name_;
updateCanary(*final_prefix);
dht_->put(final_prefix->hash(), std::move(entry), done_cb);
RealInsertCallback real_insert = [=]( std::shared_ptr<Prefix> p, IndexEntry entry ) {
updateCanary(*p);
checkPhtUpdate(*p, entry);
dht_->put(p->hash(), std::move(entry), done_cb);
};
std::cerr << "Insert prefix" << p.toString() << std::endl;
if ( vals->size() <= MAX_NODE_ENTRY_COUNT )
getRealPrefix( final_prefix, std::move(entry), real_insert);
else {
*final_prefix = kp.getPrefix(final_prefix->size_+1);
real_insert( final_prefix, std::move(entry) );
}
}
}, nullptr, cache_.lookup(kp), true
);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment