Commit 40cae3db authored by Adrien Béraud's avatar Adrien Béraud
Browse files

namedirectory: use dht executor

Change-Id: I232a001d3e62b7470e38adc1aa74eabadcc8b210
parent 827c546d
......@@ -73,7 +73,8 @@ NameDirectory::lookupUri(const std::string& uri, const std::string& default_serv
NameDirectory::NameDirectory(const std::string& s)
: serverHost_(s),
cachePath_(fileutils::get_cache_dir()+DIR_SEPARATOR_STR+CACHE_DIRECTORY+DIR_SEPARATOR_STR+serverHost_)
cachePath_(fileutils::get_cache_dir()+DIR_SEPARATOR_STR+CACHE_DIRECTORY+DIR_SEPARATOR_STR+serverHost_),
executor_(std::make_shared<dht::Executor>(dht::ThreadPool::io(), 8))
{}
void
......@@ -110,67 +111,68 @@ size_t getContentLength(restbed::Response& reply)
void NameDirectory::lookupAddress(const std::string& addr, LookupCallback cb)
{
try {
std::string cacheResult = nameCache(addr);
if (not cacheResult.empty()) {
cb(cacheResult, Response::found);
return;
}
restbed::Uri uri(HTTPS_PROTO + serverHost_ + QUERY_ADDR + addr);
auto req = std::make_shared<restbed::Request>(uri);
req->set_header("Accept", "*/*");
req->set_header("Host", serverHost_);
JAMI_DBG("Address lookup for %s: %s", addr.c_str(), uri.to_string().c_str());
std::string cacheResult = nameCache(addr);
if (not cacheResult.empty()) {
cb(cacheResult, Response::found);
return;
}
auto ret = restbed::Http::async(req, [this,cb,addr](const std::shared_ptr<restbed::Request>&,
const std::shared_ptr<restbed::Response>& reply) {
auto code = reply->get_status_code();
if (code == 200) {
size_t length = getContentLength(*reply);
if (length > MAX_RESPONSE_SIZE) {
cb("", Response::error);
return;
}
restbed::Http::fetch(length, reply);
std::string body;
reply->get_body(body);
Json::Value json;
Json::CharReaderBuilder rbuilder;
auto reader = std::unique_ptr<Json::CharReader>(rbuilder.newCharReader());
if (!reader->parse(&body[0], &body[body.size()], &json, nullptr)) {
JAMI_ERR("Address lookup for %s: can't parse server response: %s", addr.c_str(), body.c_str());
cb("", Response::error);
return;
}
auto name = json["name"].asString();
if (not name.empty()) {
JAMI_DBG("Found name for %s: %s", addr.c_str(), name.c_str());
{
std::lock_guard<std::mutex> l(lock_);
addrCache_.emplace(name, addr);
nameCache_.emplace(addr, name);
restbed::Uri uri(HTTPS_PROTO + serverHost_ + QUERY_ADDR + addr);
auto req = std::make_shared<restbed::Request>(uri);
req->set_header("Accept", "*/*");
req->set_header("Host", serverHost_);
JAMI_DBG("Address lookup for %s: %s", addr.c_str(), uri.to_string().c_str());
executor_->run([this, req, cb=std::move(cb), addr] {
try {
restbed::Http::async(req, [this, cb=std::move(cb), addr=std::move(addr)]
(const std::shared_ptr<restbed::Request>&,
const std::shared_ptr<restbed::Response>& reply)
{
auto code = reply->get_status_code();
if (code == 200) {
size_t length = getContentLength(*reply);
if (length > MAX_RESPONSE_SIZE) {
cb("", Response::error);
return;
}
cb(name, Response::found);
saveCache();
} else {
restbed::Http::fetch(length, reply);
std::string body;
reply->get_body(body);
Json::Value json;
Json::CharReaderBuilder rbuilder;
auto reader = std::unique_ptr<Json::CharReader>(rbuilder.newCharReader());
if (!reader->parse(&body[0], &body[body.size()], &json, nullptr)) {
JAMI_ERR("Address lookup for %s: can't parse server response: %s", addr.c_str(), body.c_str());
cb("", Response::error);
return;
}
auto name = json["name"].asString();
if (not name.empty()) {
JAMI_DBG("Found name for %s: %s", addr.c_str(), name.c_str());
{
std::lock_guard<std::mutex> l(lock_);
addrCache_.emplace(name, addr);
nameCache_.emplace(addr, name);
}
cb(name, Response::found);
saveCache();
} else {
cb("", Response::notFound);
}
} else if (code >= 400 && code < 500) {
cb("", Response::notFound);
} else {
cb("", Response::error);
}
} else if (code >= 400 && code < 500) {
cb("", Response::notFound);
} else {
cb("", Response::error);
}
}).share();
// avoid blocking on future destruction
dht::ThreadPool::io().run([ret](){ ret.get(); });
} catch (const std::exception& e) {
JAMI_ERR("Error when performing address lookup: %s", e.what());
cb("", Response::error);
}
});
} catch (const std::exception& e) {
JAMI_ERR("Error when performing address lookup: %s", e.what());
cb("", Response::error);
}
});
}
bool
......@@ -181,95 +183,96 @@ NameDirectory::verify(const std::string& name, const dht::crypto::PublicKey& pk,
void NameDirectory::lookupName(const std::string& n, LookupCallback cb)
{
try {
std::string name {n};
if (not validateName(name)) {
cb(name, Response::invalidResponse);
return;
}
toLower(name);
std::string cacheResult = addrCache(name);
if (not cacheResult.empty()) {
cb(cacheResult, Response::found);
return;
}
std::string name {n};
if (not validateName(name)) {
cb(name, Response::invalidResponse);
return;
}
toLower(name);
restbed::Uri uri(HTTPS_PROTO + serverHost_ + QUERY_NAME + name);
JAMI_DBG("Name lookup for %s: %s", name.c_str(), uri.to_string().c_str());
auto request = std::make_shared<restbed::Request>(std::move(uri));
request->set_header("Accept", "*/*");
request->set_header("Host", serverHost_);
auto ret = restbed::Http::async(request, [this,cb,name](const std::shared_ptr<restbed::Request>&,
const std::shared_ptr<restbed::Response>& reply) {
auto code = reply->get_status_code();
if (code != 200)
JAMI_DBG("Name lookup for %s: got reply code %d", name.c_str(), code);
if (code >= 200 && code < 300) {
size_t length = getContentLength(*reply);
if (length > MAX_RESPONSE_SIZE) {
cb("", Response::error);
return;
}
restbed::Http::fetch(length, reply);
std::string body;
reply->get_body(body);
Json::Value json;
Json::CharReaderBuilder rbuilder;
auto reader = std::unique_ptr<Json::CharReader>(rbuilder.newCharReader());
if (!reader->parse(&body[0], &body[body.size()], &json, nullptr)) {
JAMI_ERR("Name lookup for %s: can't parse server response: %s", name.c_str(), body.c_str());
cb("", Response::error);
return;
}
auto addr = json["addr"].asString();
auto publickey = json["publickey"].asString();
auto signature = json["signature"].asString();
std::string cacheResult = addrCache(name);
if (not cacheResult.empty()) {
cb(cacheResult, Response::found);
return;
}
if (!addr.compare(0, HEX_PREFIX.size(), HEX_PREFIX))
addr = addr.substr(HEX_PREFIX.size());
if (addr.empty()) {
cb("", Response::notFound);
return;
}
restbed::Uri uri(HTTPS_PROTO + serverHost_ + QUERY_NAME + name);
JAMI_DBG("Name lookup for %s: %s", name.c_str(), uri.to_string().c_str());
auto request = std::make_shared<restbed::Request>(std::move(uri));
request->set_header("Accept", "*/*");
request->set_header("Host", serverHost_);
executor_->run([this, request, cb=std::move(cb), name]{
try {
restbed::Http::async(request, [this, cb=std::move(cb), name=std::move(name)]
(const std::shared_ptr<restbed::Request>&,
const std::shared_ptr<restbed::Response>& reply)
{
auto code = reply->get_status_code();
if (code != 200)
JAMI_DBG("Name lookup for %s: got reply code %d", name.c_str(), code);
if (code >= 200 && code < 300) {
size_t length = getContentLength(*reply);
if (length > MAX_RESPONSE_SIZE) {
cb("", Response::error);
return;
}
restbed::Http::fetch(length, reply);
std::string body;
reply->get_body(body);
Json::Value json;
Json::CharReaderBuilder rbuilder;
auto reader = std::unique_ptr<Json::CharReader>(rbuilder.newCharReader());
if (!reader->parse(&body[0], &body[body.size()], &json, nullptr)) {
JAMI_ERR("Name lookup for %s: can't parse server response: %s", name.c_str(), body.c_str());
cb("", Response::error);
return;
}
auto addr = json["addr"].asString();
auto publickey = json["publickey"].asString();
auto signature = json["signature"].asString();
if (!addr.compare(0, HEX_PREFIX.size(), HEX_PREFIX))
addr = addr.substr(HEX_PREFIX.size());
if (addr.empty()) {
cb("", Response::notFound);
return;
}
if (not publickey.empty() and not signature.empty()) {
try {
auto pk = dht::crypto::PublicKey(base64::decode(publickey));
if(pk.getId().toString() != addr or not verify(name, pk, signature)) {
if (not publickey.empty() and not signature.empty()) {
try {
auto pk = dht::crypto::PublicKey(base64::decode(publickey));
if(pk.getId().toString() != addr or not verify(name, pk, signature)) {
cb("", Response::invalidResponse);
return;
}
} catch (const std::exception& e) {
cb("", Response::invalidResponse);
return;
}
} catch (const std::exception& e) {
cb("", Response::invalidResponse);
return;
}
}
JAMI_DBG("Found address for %s: %s", name.c_str(), addr.c_str());
{
std::lock_guard<std::mutex> l(lock_);
addrCache_.emplace(name, addr);
nameCache_.emplace(addr, name);
JAMI_DBG("Found address for %s: %s", name.c_str(), addr.c_str());
{
std::lock_guard<std::mutex> l(lock_);
addrCache_.emplace(name, addr);
nameCache_.emplace(addr, name);
}
cb(addr, Response::found);
saveCache();
} else if (code >= 400 && code < 500) {
cb("", Response::notFound);
} else {
cb("", Response::error);
}
cb(addr, Response::found);
saveCache();
} else if (code >= 400 && code < 500) {
cb("", Response::notFound);
} else {
cb("", Response::error);
}
}).share();
// avoid blocking on future destruction
dht::ThreadPool::io().run([ret](){ ret.get(); });
} catch (const std::exception& e) {
JAMI_ERR("Error when performing name lookup: %s", e.what());
cb("", Response::error);
}
});
} catch (const std::exception& e) {
JAMI_ERR("Error when performing name lookup: %s", e.what());
cb("", Response::error);
}
});
}
bool NameDirectory::validateName(const std::string& name) const
......@@ -279,100 +282,100 @@ bool NameDirectory::validateName(const std::string& name) const
using Blob = std::vector<uint8_t>;
void NameDirectory::registerName(const std::string& addr, const std::string& n, const std::string& owner, RegistrationCallback cb, const std::string& signedname, const std::string& publickey)
{
try {
std::string name {n};
if (not validateName(name)) {
cb(RegistrationResponse::invalidName);
return;
}
toLower(name);
auto cacheResult = addrCache(name);
if (not cacheResult.empty()) {
if (cacheResult == addr)
cb(RegistrationResponse::success);
else
cb(RegistrationResponse::alreadyTaken);
return;
}
std::string name {n};
if (not validateName(name)) {
cb(RegistrationResponse::invalidName);
return;
}
toLower(name);
auto cacheResult = addrCache(name);
if (not cacheResult.empty()) {
if (cacheResult == addr)
cb(RegistrationResponse::success);
else
cb(RegistrationResponse::alreadyTaken);
return;
}
auto request = std::make_shared<restbed::Request>(restbed::Uri(HTTPS_PROTO + serverHost_ + QUERY_NAME + name));
request->set_header("Accept", "*/*");
request->set_header("Host", serverHost_);
request->set_header("Content-Type", "application/json");
request->set_method("POST");
std::string body;
{
std::stringstream ss;
ss << "{\"addr\":\"" << addr << "\",\"owner\":\"" << owner <<
"\",\"signature\":\"" << signedname << "\",\"publickey\":\"" << base64::encode(jami::Blob(publickey.begin(), publickey.end())) << "\"}";
body = ss.str();
}
request->set_body(body);
request->set_header("Content-Length", jami::to_string(body.size()));
auto params = std::make_shared<restbed::Settings>();
params->set_connection_timeout(std::chrono::seconds(120));
JAMI_WARN("registerName: sending request %s %s", addr.c_str(), name.c_str());
auto ret = restbed::Http::async(request,
[this,cb,addr,name](const std::shared_ptr<restbed::Request>&,
const std::shared_ptr<restbed::Response>& reply)
{
auto code = reply->get_status_code();
JAMI_DBG("Got reply for registration of %s -> %s: code %d", name.c_str(), addr.c_str(), code);
if (code >= 200 && code < 300) {
size_t length = getContentLength(*reply);
if (length > MAX_RESPONSE_SIZE) {
cb(RegistrationResponse::error);
return;
}
restbed::Http::fetch(length, reply);
std::string body;
reply->get_body(body);
Json::Value json;
Json::CharReaderBuilder rbuilder;
auto reader = std::unique_ptr<Json::CharReader>(rbuilder.newCharReader());
if (!reader->parse(&body[0], &body[body.size()], &json, nullptr)) {
auto request = std::make_shared<restbed::Request>(restbed::Uri(HTTPS_PROTO + serverHost_ + QUERY_NAME + name));
request->set_header("Accept", "*/*");
request->set_header("Host", serverHost_);
request->set_header("Content-Type", "application/json");
request->set_method("POST");
std::string body;
{
std::stringstream ss;
ss << "{\"addr\":\"" << addr << "\",\"owner\":\"" << owner <<
"\",\"signature\":\"" << signedname << "\",\"publickey\":\"" << base64::encode(jami::Blob(publickey.begin(), publickey.end())) << "\"}";
body = ss.str();
}
request->set_body(body);
request->set_header("Content-Length", jami::to_string(body.size()));
auto params = std::make_shared<restbed::Settings>();
params->set_connection_timeout(std::chrono::seconds(120));
JAMI_WARN("registerName: sending request %s %s", addr.c_str(), name.c_str());
executor_->run([this, request, params, cb=std::move(cb), addr, name]{
try {
restbed::Http::async(request, [this, cb=std::move(cb), name=std::move(name), addr=std::move(addr)]
(const std::shared_ptr<restbed::Request>&,
const std::shared_ptr<restbed::Response>& reply)
{
auto code = reply->get_status_code();
JAMI_DBG("Got reply for registration of %s -> %s: code %d", name.c_str(), addr.c_str(), code);
if (code >= 200 && code < 300) {
size_t length = getContentLength(*reply);
if (length > MAX_RESPONSE_SIZE) {
cb(RegistrationResponse::error);
return;
}
restbed::Http::fetch(length, reply);
std::string body;
reply->get_body(body);
Json::Value json;
Json::CharReaderBuilder rbuilder;
auto reader = std::unique_ptr<Json::CharReader>(rbuilder.newCharReader());
if (!reader->parse(&body[0], &body[body.size()], &json, nullptr)) {
cb(RegistrationResponse::error);
return;
}
auto success = json["success"].asBool();
JAMI_DBG("Got reply for registration of %s -> %s: %s", name.c_str(), addr.c_str(), success ? "success" : "failure");
if (success) {
std::lock_guard<std::mutex> l(lock_);
addrCache_.emplace(name, addr);
nameCache_.emplace(addr, name);
}
cb(success ? RegistrationResponse::success : RegistrationResponse::error);
} else if(code == 400){
cb(RegistrationResponse::incompleteRequest);
JAMI_ERR("RegistrationResponse::incompleteRequest");
} else if(code == 401){
cb(RegistrationResponse::signatureVerificationFailed);
JAMI_ERR("RegistrationResponse::signatureVerificationFailed");
} else if (code == 403) {
cb(RegistrationResponse::alreadyTaken);
JAMI_ERR("RegistrationResponse::alreadyTaken");
} else if (code == 409) {
cb(RegistrationResponse::alreadyTaken);
JAMI_ERR("RegistrationResponse::alreadyTaken");
} else if (code > 400 && code < 500) {
cb(RegistrationResponse::alreadyTaken);
JAMI_ERR("RegistrationResponse::alreadyTaken");
} else {
cb(RegistrationResponse::error);
return;
}
auto success = json["success"].asBool();
JAMI_DBG("Got reply for registration of %s -> %s: %s", name.c_str(), addr.c_str(), success ? "success" : "failure");
if (success) {
std::lock_guard<std::mutex> l(lock_);
addrCache_.emplace(name, addr);
nameCache_.emplace(addr, name);
JAMI_ERR("RegistrationResponse::error");
}
cb(success ? RegistrationResponse::success : RegistrationResponse::error);
} else if(code == 400){
cb(RegistrationResponse::incompleteRequest);
JAMI_ERR("RegistrationResponse::incompleteRequest");
} else if(code == 401){
cb(RegistrationResponse::signatureVerificationFailed);
JAMI_ERR("RegistrationResponse::signatureVerificationFailed");
} else if (code == 403) {
cb(RegistrationResponse::alreadyTaken);
JAMI_ERR("RegistrationResponse::alreadyTaken");
} else if (code == 409) {
cb(RegistrationResponse::alreadyTaken);
JAMI_ERR("RegistrationResponse::alreadyTaken");
} else if (code > 400 && code < 500) {
cb(RegistrationResponse::alreadyTaken);
JAMI_ERR("RegistrationResponse::alreadyTaken");
} else {
cb(RegistrationResponse::error);
JAMI_ERR("RegistrationResponse::error");
}
}, params).share();
// avoid blocking on future destruction
dht::ThreadPool::io().run([ret](){ ret.get(); });
} catch (const std::exception& e) {
JAMI_ERR("Error when performing name registration: %s", e.what());
cb(RegistrationResponse::error);
}
}, params);
} catch (const std::exception& e) {
JAMI_ERR("Error when performing name registration: %s", e.what());
cb(RegistrationResponse::error);
}
});
}
void
......
......@@ -23,8 +23,10 @@
#include <map>
#include <string>
#include <mutex>
#include <memory>
namespace dht {
class Executor;
namespace crypto {
struct PublicKey;
}
......@@ -78,6 +80,8 @@ private:
std::map<std::string, std::string> nameCache_ {};
std::map<std::string, std::string> addrCache_ {};
std::shared_ptr<dht::Executor> executor_;
std::string nameCache(const std::string& addr) {
std::lock_guard<std::mutex> l(lock_);
auto cacheRes = nameCache_.find(addr);
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment