From 40cae3dba857042a8c52b1d796e6edeabde63899 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Adrien=20B=C3=A9raud?= <adrien.beraud@savoirfairelinux.com>
Date: Wed, 1 May 2019 10:34:27 -0400
Subject: [PATCH] namedirectory: use dht executor

Change-Id: I232a001d3e62b7470e38adc1aa74eabadcc8b210
---
 src/jamidht/namedirectory.cpp | 461 +++++++++++++++++-----------------
 src/jamidht/namedirectory.h   |   4 +
 2 files changed, 236 insertions(+), 229 deletions(-)

diff --git a/src/jamidht/namedirectory.cpp b/src/jamidht/namedirectory.cpp
index e392e46954..05ba01c9b7 100644
--- a/src/jamidht/namedirectory.cpp
+++ b/src/jamidht/namedirectory.cpp
@@ -73,7 +73,8 @@ NameDirectory::lookupUri(const std::string& uri, const std::string& default_serv
 
 NameDirectory::NameDirectory(const std::string& s)
    : serverHost_(s),
-     cachePath_(fileutils::get_cache_dir()+DIR_SEPARATOR_STR+CACHE_DIRECTORY+DIR_SEPARATOR_STR+serverHost_)
+     cachePath_(fileutils::get_cache_dir()+DIR_SEPARATOR_STR+CACHE_DIRECTORY+DIR_SEPARATOR_STR+serverHost_),
+     executor_(std::make_shared<dht::Executor>(dht::ThreadPool::io(), 8))
 {}
 
 void
@@ -110,67 +111,68 @@ size_t getContentLength(restbed::Response& reply)
 
 void NameDirectory::lookupAddress(const std::string& addr, LookupCallback cb)
 {
-    try {
-        std::string cacheResult = nameCache(addr);
-        if (not cacheResult.empty()) {
-            cb(cacheResult, Response::found);
-            return;
-        }
-
-        restbed::Uri uri(HTTPS_PROTO + serverHost_ + QUERY_ADDR + addr);
-        auto req = std::make_shared<restbed::Request>(uri);
-        req->set_header("Accept", "*/*");
-        req->set_header("Host", serverHost_);
-
-        JAMI_DBG("Address lookup for %s: %s", addr.c_str(), uri.to_string().c_str());
+    std::string cacheResult = nameCache(addr);
+    if (not cacheResult.empty()) {
+        cb(cacheResult, Response::found);
+        return;
+    }
 
-        auto ret = restbed::Http::async(req, [this,cb,addr](const std::shared_ptr<restbed::Request>&,
-                                                 const std::shared_ptr<restbed::Response>& reply) {
-            auto code = reply->get_status_code();
-            if (code == 200) {
-                size_t length = getContentLength(*reply);
-                if (length > MAX_RESPONSE_SIZE) {
-                    cb("", Response::error);
-                    return;
-                }
-                restbed::Http::fetch(length, reply);
-                std::string body;
-                reply->get_body(body);
-
-                Json::Value json;
-                Json::CharReaderBuilder rbuilder;
-                auto reader = std::unique_ptr<Json::CharReader>(rbuilder.newCharReader());
-                if (!reader->parse(&body[0], &body[body.size()], &json, nullptr)) {
-                    JAMI_ERR("Address lookup for %s: can't parse server response: %s", addr.c_str(), body.c_str());
-                    cb("", Response::error);
-                    return;
-                }
-                auto name = json["name"].asString();
-                if (not name.empty()) {
-                    JAMI_DBG("Found name for %s: %s", addr.c_str(), name.c_str());
-                    {
-                        std::lock_guard<std::mutex> l(lock_);
-                        addrCache_.emplace(name, addr);
-                        nameCache_.emplace(addr, name);
+    restbed::Uri uri(HTTPS_PROTO + serverHost_ + QUERY_ADDR + addr);
+    auto req = std::make_shared<restbed::Request>(uri);
+    req->set_header("Accept", "*/*");
+    req->set_header("Host", serverHost_);
+
+    JAMI_DBG("Address lookup for %s: %s", addr.c_str(), uri.to_string().c_str());
+
+    executor_->run([this, req, cb=std::move(cb), addr] {
+        try {
+            restbed::Http::async(req, [this, cb=std::move(cb), addr=std::move(addr)]
+                (const std::shared_ptr<restbed::Request>&,
+                 const std::shared_ptr<restbed::Response>& reply)
+            {
+                auto code = reply->get_status_code();
+                if (code == 200) {
+                    size_t length = getContentLength(*reply);
+                    if (length > MAX_RESPONSE_SIZE) {
+                        cb("", Response::error);
+                        return;
                     }
-                    cb(name, Response::found);
-                    saveCache();
-                } else {
+                    restbed::Http::fetch(length, reply);
+                    std::string body;
+                    reply->get_body(body);
+
+                    Json::Value json;
+                    Json::CharReaderBuilder rbuilder;
+                    auto reader = std::unique_ptr<Json::CharReader>(rbuilder.newCharReader());
+                    if (!reader->parse(&body[0], &body[body.size()], &json, nullptr)) {
+                        JAMI_ERR("Address lookup for %s: can't parse server response: %s", addr.c_str(), body.c_str());
+                        cb("", Response::error);
+                        return;
+                    }
+                    auto name = json["name"].asString();
+                    if (not name.empty()) {
+                        JAMI_DBG("Found name for %s: %s", addr.c_str(), name.c_str());
+                        {
+                            std::lock_guard<std::mutex> l(lock_);
+                            addrCache_.emplace(name, addr);
+                            nameCache_.emplace(addr, name);
+                        }
+                        cb(name, Response::found);
+                        saveCache();
+                    } else {
+                        cb("", Response::notFound);
+                    }
+                } else if (code >= 400 && code < 500) {
                     cb("", Response::notFound);
+                } else {
+                    cb("", Response::error);
                 }
-            } else if (code >= 400 && code < 500) {
-                cb("", Response::notFound);
-            } else {
-                cb("", Response::error);
-            }
-        }).share();
-
-        // avoid blocking on future destruction
-        dht::ThreadPool::io().run([ret](){ ret.get(); });
-    } catch (const std::exception& e) {
-        JAMI_ERR("Error when performing address lookup: %s", e.what());
-        cb("", Response::error);
-    }
+            });
+        } catch (const std::exception& e) {
+            JAMI_ERR("Error when performing address lookup: %s", e.what());
+            cb("", Response::error);
+        }
+    });
 }
 
 bool
@@ -181,95 +183,96 @@ NameDirectory::verify(const std::string& name, const dht::crypto::PublicKey& pk,
 
 void NameDirectory::lookupName(const std::string& n, LookupCallback cb)
 {
-    try {
-        std::string name {n};
-        if (not validateName(name)) {
-            cb(name, Response::invalidResponse);
-            return;
-        }
-        toLower(name);
-
-        std::string cacheResult = addrCache(name);
-        if (not cacheResult.empty()) {
-            cb(cacheResult, Response::found);
-            return;
-        }
+    std::string name {n};
+    if (not validateName(name)) {
+        cb(name, Response::invalidResponse);
+        return;
+    }
+    toLower(name);
 
-        restbed::Uri uri(HTTPS_PROTO + serverHost_ + QUERY_NAME + name);
-        JAMI_DBG("Name lookup for %s: %s", name.c_str(), uri.to_string().c_str());
-
-        auto request = std::make_shared<restbed::Request>(std::move(uri));
-        request->set_header("Accept", "*/*");
-        request->set_header("Host", serverHost_);
-
-        auto ret = restbed::Http::async(request, [this,cb,name](const std::shared_ptr<restbed::Request>&,
-                                                     const std::shared_ptr<restbed::Response>& reply) {
-            auto code = reply->get_status_code();
-            if (code != 200)
-                JAMI_DBG("Name lookup for %s: got reply code %d", name.c_str(), code);
-            if (code >= 200 && code < 300) {
-                size_t length = getContentLength(*reply);
-                if (length > MAX_RESPONSE_SIZE) {
-                    cb("", Response::error);
-                    return;
-                }
-                restbed::Http::fetch(length, reply);
-                std::string body;
-                reply->get_body(body);
-
-                Json::Value json;
-                Json::CharReaderBuilder rbuilder;
-                auto reader = std::unique_ptr<Json::CharReader>(rbuilder.newCharReader());
-                if (!reader->parse(&body[0], &body[body.size()], &json, nullptr)) {
-                    JAMI_ERR("Name lookup for %s: can't parse server response: %s", name.c_str(), body.c_str());
-                    cb("", Response::error);
-                    return;
-                }
-                auto addr = json["addr"].asString();
-                auto publickey = json["publickey"].asString();
-                auto signature = json["signature"].asString();
+    std::string cacheResult = addrCache(name);
+    if (not cacheResult.empty()) {
+        cb(cacheResult, Response::found);
+        return;
+    }
 
-                if (!addr.compare(0, HEX_PREFIX.size(), HEX_PREFIX))
-                    addr = addr.substr(HEX_PREFIX.size());
-                if (addr.empty()) {
-                    cb("", Response::notFound);
-                    return;
-                }
+    restbed::Uri uri(HTTPS_PROTO + serverHost_ + QUERY_NAME + name);
+    JAMI_DBG("Name lookup for %s: %s", name.c_str(), uri.to_string().c_str());
+
+    auto request = std::make_shared<restbed::Request>(std::move(uri));
+    request->set_header("Accept", "*/*");
+    request->set_header("Host", serverHost_);
+
+    executor_->run([this, request, cb=std::move(cb), name]{
+        try {
+            restbed::Http::async(request, [this, cb=std::move(cb), name=std::move(name)]
+                (const std::shared_ptr<restbed::Request>&,
+                 const std::shared_ptr<restbed::Response>& reply)
+            {
+                auto code = reply->get_status_code();
+                if (code != 200)
+                    JAMI_DBG("Name lookup for %s: got reply code %d", name.c_str(), code);
+                if (code >= 200 && code < 300) {
+                    size_t length = getContentLength(*reply);
+                    if (length > MAX_RESPONSE_SIZE) {
+                        cb("", Response::error);
+                        return;
+                    }
+                    restbed::Http::fetch(length, reply);
+                    std::string body;
+                    reply->get_body(body);
+
+                    Json::Value json;
+                    Json::CharReaderBuilder rbuilder;
+                    auto reader = std::unique_ptr<Json::CharReader>(rbuilder.newCharReader());
+                    if (!reader->parse(&body[0], &body[body.size()], &json, nullptr)) {
+                        JAMI_ERR("Name lookup for %s: can't parse server response: %s", name.c_str(), body.c_str());
+                        cb("", Response::error);
+                        return;
+                    }
+                    auto addr = json["addr"].asString();
+                    auto publickey = json["publickey"].asString();
+                    auto signature = json["signature"].asString();
+
+                    if (!addr.compare(0, HEX_PREFIX.size(), HEX_PREFIX))
+                        addr = addr.substr(HEX_PREFIX.size());
+                    if (addr.empty()) {
+                        cb("", Response::notFound);
+                        return;
+                    }
 
-                if (not publickey.empty() and not signature.empty()) {
-                    try {
-                        auto pk = dht::crypto::PublicKey(base64::decode(publickey));
-                        if(pk.getId().toString() != addr or not verify(name, pk, signature)) {
+                    if (not publickey.empty() and not signature.empty()) {
+                        try {
+                            auto pk = dht::crypto::PublicKey(base64::decode(publickey));
+                            if(pk.getId().toString() != addr or not verify(name, pk, signature)) {
+                                cb("", Response::invalidResponse);
+                                return;
+                            }
+                        } catch (const std::exception& e) {
                             cb("", Response::invalidResponse);
                             return;
                         }
-                    } catch (const std::exception& e) {
-                        cb("", Response::invalidResponse);
-                        return;
                     }
-                }
 
-                JAMI_DBG("Found address for %s: %s", name.c_str(), addr.c_str());
-                {
-                    std::lock_guard<std::mutex> l(lock_);
-                    addrCache_.emplace(name, addr);
-                    nameCache_.emplace(addr, name);
+                    JAMI_DBG("Found address for %s: %s", name.c_str(), addr.c_str());
+                    {
+                        std::lock_guard<std::mutex> l(lock_);
+                        addrCache_.emplace(name, addr);
+                        nameCache_.emplace(addr, name);
+                    }
+                    cb(addr, Response::found);
+                    saveCache();
+                } else if (code >= 400 && code < 500) {
+                    cb("", Response::notFound);
+                } else {
+                    cb("", Response::error);
                 }
-                cb(addr, Response::found);
-                saveCache();
-            } else if (code >= 400 && code < 500) {
-                cb("", Response::notFound);
-            } else {
-                cb("", Response::error);
-            }
-        }).share();
-
-        // avoid blocking on future destruction
-        dht::ThreadPool::io().run([ret](){ ret.get(); });
-    } catch (const std::exception& e) {
-        JAMI_ERR("Error when performing name lookup: %s", e.what());
-        cb("", Response::error);
-    }
+            });
+        } catch (const std::exception& e) {
+            JAMI_ERR("Error when performing name lookup: %s", e.what());
+            cb("", Response::error);
+        }
+    });
 }
 
 bool NameDirectory::validateName(const std::string& name) const
@@ -279,100 +282,100 @@ bool NameDirectory::validateName(const std::string& name) const
 using Blob = std::vector<uint8_t>;
 void NameDirectory::registerName(const std::string& addr, const std::string& n, const std::string& owner, RegistrationCallback cb, const std::string& signedname, const std::string& publickey)
 {
-    try {
-        std::string name {n};
-        if (not validateName(name)) {
-            cb(RegistrationResponse::invalidName);
-            return;
-        }
-        toLower(name);
-        auto cacheResult = addrCache(name);
-        if (not cacheResult.empty()) {
-            if (cacheResult == addr)
-                cb(RegistrationResponse::success);
-            else
-                cb(RegistrationResponse::alreadyTaken);
-            return;
-        }
+    std::string name {n};
+    if (not validateName(name)) {
+        cb(RegistrationResponse::invalidName);
+        return;
+    }
+    toLower(name);
+    auto cacheResult = addrCache(name);
+    if (not cacheResult.empty()) {
+        if (cacheResult == addr)
+            cb(RegistrationResponse::success);
+        else
+            cb(RegistrationResponse::alreadyTaken);
+        return;
+    }
 
-        auto request = std::make_shared<restbed::Request>(restbed::Uri(HTTPS_PROTO + serverHost_ + QUERY_NAME + name));
-        request->set_header("Accept", "*/*");
-        request->set_header("Host", serverHost_);
-        request->set_header("Content-Type", "application/json");
-        request->set_method("POST");
-        std::string body;
-        {
-            std::stringstream ss;
-            ss << "{\"addr\":\"" << addr << "\",\"owner\":\"" << owner <<
-                "\",\"signature\":\"" << signedname << "\",\"publickey\":\"" << base64::encode(jami::Blob(publickey.begin(), publickey.end()))  << "\"}";
-
-            body = ss.str();
-        }
-        request->set_body(body);
-        request->set_header("Content-Length", jami::to_string(body.size()));
-
-        auto params = std::make_shared<restbed::Settings>();
-        params->set_connection_timeout(std::chrono::seconds(120));
-
-        JAMI_WARN("registerName: sending request %s %s", addr.c_str(), name.c_str());
-        auto ret = restbed::Http::async(request,
-         [this,cb,addr,name](const std::shared_ptr<restbed::Request>&,
-             const std::shared_ptr<restbed::Response>& reply)
-         {
-            auto code = reply->get_status_code();
-            JAMI_DBG("Got reply for registration of %s -> %s: code %d", name.c_str(), addr.c_str(), code);
-            if (code >= 200 && code < 300) {
-                size_t length = getContentLength(*reply);
-                if (length > MAX_RESPONSE_SIZE) {
-                    cb(RegistrationResponse::error);
-                    return;
-                }
-                restbed::Http::fetch(length, reply);
-                std::string body;
-                reply->get_body(body);
-
-                Json::Value json;
-                Json::CharReaderBuilder rbuilder;
-                auto reader = std::unique_ptr<Json::CharReader>(rbuilder.newCharReader());
-                if (!reader->parse(&body[0], &body[body.size()], &json, nullptr)) {
+    auto request = std::make_shared<restbed::Request>(restbed::Uri(HTTPS_PROTO + serverHost_ + QUERY_NAME + name));
+    request->set_header("Accept", "*/*");
+    request->set_header("Host", serverHost_);
+    request->set_header("Content-Type", "application/json");
+    request->set_method("POST");
+    std::string body;
+    {
+        std::stringstream ss;
+        ss << "{\"addr\":\"" << addr << "\",\"owner\":\"" << owner <<
+            "\",\"signature\":\"" << signedname << "\",\"publickey\":\"" << base64::encode(jami::Blob(publickey.begin(), publickey.end()))  << "\"}";
+
+        body = ss.str();
+    }
+    request->set_body(body);
+    request->set_header("Content-Length", jami::to_string(body.size()));
+
+    auto params = std::make_shared<restbed::Settings>();
+    params->set_connection_timeout(std::chrono::seconds(120));
+
+    JAMI_WARN("registerName: sending request %s %s", addr.c_str(), name.c_str());
+
+    executor_->run([this, request, params, cb=std::move(cb), addr, name]{
+        try {
+            restbed::Http::async(request, [this, cb=std::move(cb), name=std::move(name), addr=std::move(addr)]
+                (const std::shared_ptr<restbed::Request>&,
+                 const std::shared_ptr<restbed::Response>& reply)
+            {
+                auto code = reply->get_status_code();
+                JAMI_DBG("Got reply for registration of %s -> %s: code %d", name.c_str(), addr.c_str(), code);
+                if (code >= 200 && code < 300) {
+                    size_t length = getContentLength(*reply);
+                    if (length > MAX_RESPONSE_SIZE) {
+                        cb(RegistrationResponse::error);
+                        return;
+                    }
+                    restbed::Http::fetch(length, reply);
+                    std::string body;
+                    reply->get_body(body);
+
+                    Json::Value json;
+                    Json::CharReaderBuilder rbuilder;
+                    auto reader = std::unique_ptr<Json::CharReader>(rbuilder.newCharReader());
+                    if (!reader->parse(&body[0], &body[body.size()], &json, nullptr)) {
+                        cb(RegistrationResponse::error);
+                        return;
+                    }
+                    auto success = json["success"].asBool();
+                    JAMI_DBG("Got reply for registration of %s -> %s: %s", name.c_str(), addr.c_str(), success ? "success" : "failure");
+                    if (success) {
+                        std::lock_guard<std::mutex> l(lock_);
+                        addrCache_.emplace(name, addr);
+                        nameCache_.emplace(addr, name);
+                    }
+                    cb(success ? RegistrationResponse::success : RegistrationResponse::error);
+                } else if(code == 400){
+                    cb(RegistrationResponse::incompleteRequest);
+                    JAMI_ERR("RegistrationResponse::incompleteRequest");
+                } else if(code == 401){
+                    cb(RegistrationResponse::signatureVerificationFailed);
+                    JAMI_ERR("RegistrationResponse::signatureVerificationFailed");
+                } else if (code == 403) {
+                    cb(RegistrationResponse::alreadyTaken);
+                    JAMI_ERR("RegistrationResponse::alreadyTaken");
+                } else if (code == 409) {
+                    cb(RegistrationResponse::alreadyTaken);
+                    JAMI_ERR("RegistrationResponse::alreadyTaken");
+                } else if (code > 400 && code < 500) {
+                    cb(RegistrationResponse::alreadyTaken);
+                    JAMI_ERR("RegistrationResponse::alreadyTaken");
+                } else {
                     cb(RegistrationResponse::error);
-                    return;
-                }
-                auto success = json["success"].asBool();
-                JAMI_DBG("Got reply for registration of %s -> %s: %s", name.c_str(), addr.c_str(), success ? "success" : "failure");
-                if (success) {
-                    std::lock_guard<std::mutex> l(lock_);
-                    addrCache_.emplace(name, addr);
-                    nameCache_.emplace(addr, name);
+                    JAMI_ERR("RegistrationResponse::error");
                 }
-                cb(success ? RegistrationResponse::success : RegistrationResponse::error);
-            } else if(code == 400){
-                cb(RegistrationResponse::incompleteRequest);
-                JAMI_ERR("RegistrationResponse::incompleteRequest");
-            } else if(code == 401){
-                cb(RegistrationResponse::signatureVerificationFailed);
-                JAMI_ERR("RegistrationResponse::signatureVerificationFailed");
-            } else if (code == 403) {
-                cb(RegistrationResponse::alreadyTaken);
-                JAMI_ERR("RegistrationResponse::alreadyTaken");
-            } else if (code == 409) {
-                cb(RegistrationResponse::alreadyTaken);
-                JAMI_ERR("RegistrationResponse::alreadyTaken");
-            } else if (code > 400 && code < 500) {
-                cb(RegistrationResponse::alreadyTaken);
-                JAMI_ERR("RegistrationResponse::alreadyTaken");
-            } else {
-                cb(RegistrationResponse::error);
-                JAMI_ERR("RegistrationResponse::error");
-            }
-        }, params).share();
-
-        // avoid blocking on future destruction
-        dht::ThreadPool::io().run([ret](){ ret.get(); });
-    } catch (const std::exception& e) {
-        JAMI_ERR("Error when performing name registration: %s", e.what());
-        cb(RegistrationResponse::error);
-    }
+            }, params);
+        } catch (const std::exception& e) {
+            JAMI_ERR("Error when performing name registration: %s", e.what());
+            cb(RegistrationResponse::error);
+        }
+    });
 }
 
 void
diff --git a/src/jamidht/namedirectory.h b/src/jamidht/namedirectory.h
index 09c872df42..be65dd98d8 100644
--- a/src/jamidht/namedirectory.h
+++ b/src/jamidht/namedirectory.h
@@ -23,8 +23,10 @@
 #include <map>
 #include <string>
 #include <mutex>
+#include <memory>
 
 namespace dht {
+class Executor;
 namespace crypto {
 struct PublicKey;
 }
@@ -78,6 +80,8 @@ private:
     std::map<std::string, std::string> nameCache_ {};
     std::map<std::string, std::string> addrCache_ {};
 
+    std::shared_ptr<dht::Executor> executor_;
+
     std::string nameCache(const std::string& addr) {
         std::lock_guard<std::mutex> l(lock_);
         auto cacheRes = nameCache_.find(addr);
-- 
GitLab