From eb1f9d64e1e3d305895ffb5d55bcc04773f3a016 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?S=C3=A9bastien=20Blin?=
 <sebastien.blin@savoirfairelinux.com>
Date: Mon, 13 Nov 2017 11:04:51 -0500
Subject: [PATCH] dhtproxyclient: add listen ability.

+ Fix LISTEN endpoint for DhtProxyServer
= DhtProxyServer use multiple worker threads
+ Add listen/cancelListen for DhtProxyClient
+ Add connectivityChanged.
---
 include/opendht/dht_proxy_client.h |  65 ++++++++--
 include/opendht/dhtrunner.h        |  27 +++++
 src/dht_proxy_client.cpp           | 188 +++++++++++++++++++++++++++--
 src/dht_proxy_server.cpp           |  48 ++++++--
 src/dhtrunner.cpp                  |  92 +++++++++++++-
 5 files changed, 386 insertions(+), 34 deletions(-)

diff --git a/include/opendht/dht_proxy_client.h b/include/opendht/dht_proxy_client.h
index 64a2881f..c6ac3093 100644
--- a/include/opendht/dht_proxy_client.h
+++ b/include/opendht/dht_proxy_client.h
@@ -131,16 +131,17 @@ public:
         put(key, std::forward<Value>(v), bindDoneCb(cb), created, permanent);
     }
 
+    /**
+     * @param  af the socket family
+     * @return node stats from the proxy
+     */
     NodeStats getNodesStats(sa_family_t af) const;
 
-    std::vector<SockAddr> getPublicAddress(sa_family_t family = 0);
-
-
     /**
-     * TODO
-     * NOTE: For now, there is no endpoint in the DhtProxyServer to do the following methods.
-     * It will come in another version.
+     * @param  family the socket family
+     * @return public address
      */
+    std::vector<SockAddr> getPublicAddress(sa_family_t family = 0);
 
     /**
      * Listen on the network for any changes involving a specified hash.
@@ -149,11 +150,18 @@ public:
      *
      * @return a token to cancel the listener later.
      */
-    virtual size_t listen(const InfoHash&, GetCallback, Value::Filter&&={}, Where&&={}) { return 0; };
+    virtual size_t listen(const InfoHash&, GetCallback, Value::Filter&&={}, Where&&={});
     virtual size_t listen(const InfoHash& key, GetCallbackSimple cb, Value::Filter f={}, Where w = {}) {
         return listen(key, bindGetCb(cb), std::forward<Value::Filter>(f), std::forward<Where>(w));
     }
-    virtual bool cancelListen(const InfoHash&, size_t /*token*/) { return false; }
+
+
+    /**
+     * TODO
+     * NOTE: For now, there is no endpoint in the DhtProxyServer to do the following methods.
+     * It will come in another version. (with push_notifications support)
+     */
+    virtual bool cancelListen(const InfoHash&, size_t token);
 
     /**
      * Similar to Dht::get, but sends a Query to filter data remotely.
@@ -257,6 +265,7 @@ public:
     }
 
     time_point periodic(const uint8_t*, size_t, const SockAddr&) {
+        // The DhtProxyClient doesn't use NetworkEngine, so here, we have nothing to do for now.
         scheduler.syncTime();
         return scheduler.run();
     }
@@ -265,11 +274,19 @@ public:
     }
 
 private:
+    /**
+     * Get informations from the proxy node
+     * @return the JSON returned by the proxy
+     */
     Json::Value getProxyInfos() const;
     /**
      * Initialize statusIpvX_
      */
     void getConnectivityStatus();
+    /**
+     * cancel all Listeners
+     */
+    void cancelAllListeners();
     /**
      * cancel all Operations
      */
@@ -279,6 +296,24 @@ private:
     NodeStatus statusIpv6_ {NodeStatus::Disconnected};
 
     InfoHash myid {};
+
+    /**
+     * Store listen requests.
+     */
+    struct Listener
+    {
+        size_t token;
+        std::shared_ptr<restbed::Request> req;
+        std::string key;
+        GetCallback cb;
+        Value::Filter filterChain;
+        std::unique_ptr<std::thread> thread;
+    };
+    std::vector<Listener> listeners_;
+    size_t listener_token_ {0};
+    /**
+     * Store current put and get requests.
+     */
     struct Operation
     {
         std::shared_ptr<restbed::Request> req;
@@ -287,8 +322,20 @@ private:
     std::vector<Operation> operations_;
 
     Scheduler scheduler;
-    Sp<Scheduler::Job> nextNodesConfirmation {};
+    /**
+     * Retrieve if we can connect to the proxy (update statusIpvX_)
+     */
     void confirmProxy();
+    Sp<Scheduler::Job> nextProxyConfirmation {};
+    /**
+     * Verify if we are still connected.
+     */
+    void confirmConnectivity();
+    Sp<Scheduler::Job> nextConnectivityConfirmation {};
+    /**
+     * Relaunch LISTEN requests if the client disconnect/reconnect.
+     */
+    void restartListeners();
 };
 
 }
diff --git a/include/opendht/dhtrunner.h b/include/opendht/dhtrunner.h
index d75e977f..1d4a645c 100644
--- a/include/opendht/dhtrunner.h
+++ b/include/opendht/dhtrunner.h
@@ -403,13 +403,40 @@ private:
     }
 
     std::unique_ptr<SecureDht> dht_;
+    /**
+     * reset dht clients
+     */
     void resetDht();
+    /**
+     * @return the current active DHT
+     */
     SecureDht* activeDht() const;
 #if OPENDHT_PROXY_CLIENT
+    /**
+     * true if we are currently using a proxy
+     */
     std::atomic_bool use_proxy {false};
+    /**
+     * The current proxy client
+     */
     std::unique_ptr<SecureDht> dht_via_proxy_;
     Config config_;
 #endif // OPENDHT_PROXY_CLIENT
+    /**
+     * Store current listeners and translates global tokens for each client.
+     */
+    struct Listener {
+        size_t globalToken;
+        size_t tokenClassicDht;
+        size_t tokenProxyDht;
+        GetCallback gcb;
+        InfoHash hash;
+        Value::Filter f;
+        Where w;
+    };
+    std::vector<std::unique_ptr<Listener>> listeners_ {};
+    size_t listener_token_ {1};
+
     mutable std::mutex dht_mtx {};
     std::thread dht_thread {};
     std::condition_variable cv {};
diff --git a/src/dht_proxy_client.cpp b/src/dht_proxy_client.cpp
index 9f175f5b..fca754e4 100644
--- a/src/dht_proxy_client.cpp
+++ b/src/dht_proxy_client.cpp
@@ -24,21 +24,21 @@
 #include <json/json.h>
 #include <restbed>
 #include <vector>
+#include <signal.h>
 
 #include "dhtrunner.h"
 
 constexpr const char* const HTTP_PROTO {"http://"};
 
-// TODO connectivity changed
-// TODO follow listen between non proxified and proxified
-
 namespace dht {
 
 DhtProxyClient::DhtProxyClient(const std::string& serverHost)
 : serverHost_(serverHost), scheduler(DHT_LOG)
 {
-    auto confirm_nodes_time = scheduler.time() + std::chrono::seconds(5);
-    nextNodesConfirmation = scheduler.add(confirm_nodes_time, std::bind(&DhtProxyClient::confirmProxy, this));
+    auto confirm_proxy_time = scheduler.time() + std::chrono::seconds(5);
+    nextProxyConfirmation = scheduler.add(confirm_proxy_time, std::bind(&DhtProxyClient::confirmProxy, this));
+    auto confirm_connectivity = scheduler.time() + std::chrono::seconds(5);
+    nextConnectivityConfirmation = scheduler.add(confirm_connectivity, std::bind(&DhtProxyClient::confirmConnectivity, this));
 
     getConnectivityStatus();
 }
@@ -46,16 +46,30 @@ DhtProxyClient::DhtProxyClient(const std::string& serverHost)
 void
 DhtProxyClient::confirmProxy()
 {
+    // Retrieve the connectivity each hours if connected, else every 5 seconds.
+    auto disconnected_old_status =  statusIpv4_ == NodeStatus::Disconnected && statusIpv6_ == NodeStatus::Disconnected;
     getConnectivityStatus();
-    auto disconnected = statusIpv4_ == NodeStatus::Disconnected && statusIpv6_ == NodeStatus::Disconnected;
-    auto time = disconnected ? std::chrono::seconds(5) : std::chrono::seconds(600);
-    auto confirm_nodes_time = scheduler.time() + time;
-    scheduler.edit(nextNodesConfirmation, confirm_nodes_time);
+    auto disconnected_new_status = statusIpv4_ == NodeStatus::Disconnected && statusIpv6_ == NodeStatus::Disconnected;
+    auto time = disconnected_new_status ? std::chrono::seconds(5) : std::chrono::hours(1);
+    if (disconnected_old_status && !disconnected_new_status) {
+        restartListeners();
+    }
+    auto confirm_proxy_time = scheduler.time() + time;
+    scheduler.edit(nextProxyConfirmation, confirm_proxy_time);
+}
+
+void
+DhtProxyClient::confirmConnectivity()
+{
+    // The scheduler must get if the proxy is disconnected
+    auto confirm_connectivity = scheduler.time() + std::chrono::seconds(3);
+    scheduler.edit(nextConnectivityConfirmation, confirm_connectivity);
 }
 
 DhtProxyClient::~DhtProxyClient()
 {
     cancelAllOperations();
+    cancelAllListeners();
 }
 
 void
@@ -70,10 +84,23 @@ DhtProxyClient::cancelAllOperations()
     }
 }
 
+void
+DhtProxyClient::cancelAllListeners()
+{
+    for (auto& listener: listeners_) {
+        if (listener.thread && listener.thread->joinable()) {
+            // Close connection to stop listener?
+            restbed::Http::close(listener.req);
+            listener.thread->join();
+        }
+    }
+}
+
 void
 DhtProxyClient::shutdown(ShutdownCallback cb)
 {
     cancelAllOperations();
+    cancelAllListeners();
     cb();
 }
 
@@ -293,6 +320,84 @@ DhtProxyClient::getPublicAddress(sa_family_t family)
     }
 }
 
+size_t
+DhtProxyClient::listen(const InfoHash& key, GetCallback cb, Value::Filter&& filter, Where&& where)
+{
+    restbed::Uri uri(HTTP_PROTO + serverHost_ + "/" + key.toString());
+    auto req = std::make_shared<restbed::Request>(uri);
+    req->set_method("LISTEN");
+
+    Query query {{}, where};
+    auto filterChain = filter.chain(query.where.getFilter());
+
+    Listener l;
+    ++listener_token_;
+    l.key = key.toString();
+    l.token = listener_token_;
+    l.req = req;
+    l.cb = cb;
+    l.filterChain = std::move(filterChain);
+    l.thread = std::move(std::unique_ptr<std::thread>(new std::thread([=]()
+        {
+            auto settings = std::make_shared<restbed::Settings>();
+            std::chrono::milliseconds timeout(std::numeric_limits<int>::max());
+            settings->set_connection_timeout(timeout); // Avoid the client to close the socket after 5 seconds.
+
+            restbed::Http::async(req,
+                [this, filterChain, cb](const std::shared_ptr<restbed::Request>& req,
+                                        const std::shared_ptr<restbed::Response>& reply) {
+                auto code = reply->get_status_code();
+
+                if (code == 200) {
+                    try {
+                        while (restbed::Http::is_open(req)) {
+                            restbed::Http::fetch("\n", reply);
+                            std::string body;
+                            reply->get_body(body);
+                            reply->set_body(""); // Reset the body for the next fetch
+
+                            Json::Value json;
+                            Json::Reader reader;
+                            if (reader.parse(body, json)) {
+                                auto value = std::make_shared<Value>(json);
+                                if (not filterChain or filterChain(*value))
+                                    cb({value});
+                            }
+                        }
+                    } catch (std::runtime_error&) {
+                        // NOTE: Http::close() can occurs here. Ignore this.
+                    }
+
+                } else {
+                    this->statusIpv4_ = NodeStatus::Disconnected;
+                    this->statusIpv6_ = NodeStatus::Disconnected;
+                }
+            }, settings).get();
+            getConnectivityStatus();
+        })
+    ));
+    listeners_.emplace_back(std::move(l));
+    return listener_token_;
+}
+
+bool
+DhtProxyClient::cancelListen(const InfoHash&, size_t token)
+{
+    for (auto it = listeners_.begin(); it != listeners_.end(); ++it) {
+        auto& listener = *it;
+        if (listener.token == token) {
+            if (listener.thread->joinable()) {
+                // Close connection to stop listener?
+                restbed::Http::close(listener.req);
+                listener.thread->join();
+                listeners_.erase(it);
+                return true;
+            }
+        }
+    }
+    return false;
+}
+
 void
 DhtProxyClient::getConnectivityStatus()
 {
@@ -303,21 +408,84 @@ DhtProxyClient::getConnectivityStatus()
         auto dubiousIpv4 = static_cast<long>(proxyInfos["ipv4"]["dubious"].asLargestUInt());
         if (goodIpv4 + dubiousIpv4 > 0) {
             statusIpv4_ = NodeStatus::Connected;
+        } else {
+            statusIpv4_ = NodeStatus::Disconnected;
         }
         auto goodIpv6 = static_cast<long>(proxyInfos["ipv6"]["good"].asLargestUInt());
         auto dubiousIpv6 = static_cast<long>(proxyInfos["ipv6"]["dubious"].asLargestUInt());
         if (goodIpv6 + dubiousIpv6 > 0) {
             statusIpv6_ = NodeStatus::Connected;
+        } else {
+            statusIpv6_ = NodeStatus::Disconnected;
         }
         myid = InfoHash(proxyInfos["node_id"].asString());
+        if (statusIpv4_ == NodeStatus::Disconnected && statusIpv6_ == NodeStatus::Disconnected) {
+            const auto& now = scheduler.time();
+            scheduler.edit(nextProxyConfirmation, now);
+        }
     } catch (...) {
         statusIpv4_ = NodeStatus::Disconnected;
         statusIpv6_ = NodeStatus::Disconnected;
+        const auto& now = scheduler.time();
+        scheduler.edit(nextProxyConfirmation, now);
     }
+}
+
+void
+DhtProxyClient::restartListeners()
+{
+    for (auto& listener: listeners_) {
+        if (listener.thread && listener.thread->joinable())
+            listener.thread->join();
+        // Redo listen
+        auto filterChain = listener.filterChain;
+        auto cb = listener.cb;
+        restbed::Uri uri(HTTP_PROTO + serverHost_ + "/" + listener.key);
+        auto req = std::make_shared<restbed::Request>(uri);
+        req->set_method("LISTEN");
+        listener.thread = std::move(std::unique_ptr<std::thread>(new std::thread([this, filterChain, cb, req]()
+            {
+                auto settings = std::make_shared<restbed::Settings>();
+                std::chrono::milliseconds timeout(std::numeric_limits<int>::max());
+                settings->set_connection_timeout(timeout); // Avoid the client to close the socket after 5 seconds.
+
+                restbed::Http::async(req,
+                    [this, filterChain, cb](const std::shared_ptr<restbed::Request>& req,
+                                     const std::shared_ptr<restbed::Response>& reply) {
+                    auto code = reply->get_status_code();
+
+                    if (code == 200) {
+                        try {
+                            while (restbed::Http::is_open(req)) {
+                                restbed::Http::fetch("\n", reply);
+                                std::string body;
+                                reply->get_body(body);
+                                reply->set_body(""); // Reset the body for the next fetch
+
+                                Json::Value json;
+                                Json::Reader reader;
+                                if (reader.parse(body, json)) {
+                                    auto value = std::make_shared<Value>(json);
+                                    if (not filterChain or filterChain(*value))
+                                        cb({value});
+                                }
+                            }
+                        } catch (std::runtime_error&) {
+                            // NOTE: Http::close() can occurs here. Ignore this.
+                        }
 
-    // TODO for now, we don't handle connectivity issues. (when the proxy is down, we don't try to reconnect)
+                    } else {
+                        this->statusIpv4_ = NodeStatus::Disconnected;
+                        this->statusIpv6_ = NodeStatus::Disconnected;
+                    }
+                }, settings).get();
+                getConnectivityStatus();
+            })
+        ));
+    }
 }
 
+
 } // namespace dht
 
 #endif // OPENDHT_PROXY_CLIENT
diff --git a/src/dht_proxy_server.cpp b/src/dht_proxy_server.cpp
index 57be7d75..6881025f 100644
--- a/src/dht_proxy_server.cpp
+++ b/src/dht_proxy_server.cpp
@@ -70,6 +70,8 @@ DhtProxyServer::DhtProxyServer(std::shared_ptr<DhtRunner> dht, in_port_t port)
         std::chrono::milliseconds timeout(std::numeric_limits<int>::max());
         settings->set_connection_timeout(timeout); // there is a timeout, but really huge
         settings->set_port(port);
+        auto maxThreads = std::thread::hardware_concurrency() - 1;
+        settings->set_worker_limit(maxThreads > 1 ? maxThreads : 1);
         try {
             service_->start(settings);
         } catch(std::system_error& e) {
@@ -78,11 +80,13 @@ DhtProxyServer::DhtProxyServer(std::shared_ptr<DhtRunner> dht, in_port_t port)
     });
 
     listenThread_ = std::thread([this]() {
-        auto stop = false;
-        while (!stop) {
+        while (!service_->is_up() && !stopListeners) {
+            std::this_thread::sleep_for(std::chrono::seconds(1));
+        }
+        while (service_->is_up()  && !stopListeners) {
             auto listener = currentListeners_.begin();
             while (listener != currentListeners_.end()) {
-                if (listener->session->is_closed() && dht_) {
+                if (dht_ && listener->session->is_closed()) {
                     dht_->cancelListen(listener->hash, std::move(listener->token));
                     // Remove listener if unused
                     listener = currentListeners_.erase(listener);
@@ -90,10 +94,18 @@ DhtProxyServer::DhtProxyServer(std::shared_ptr<DhtRunner> dht, in_port_t port)
                      ++listener;
                 }
             }
-            //NOTE: When supports restbed 5.0: service_->is_up() and remove stopListeners
-            stop = stopListeners;
-            if (!stop)
-                std::this_thread::sleep_for(std::chrono::seconds(1));
+            std::this_thread::sleep_for(std::chrono::seconds(1));
+        }
+        // Remove last listeners
+        auto listener = currentListeners_.begin();
+        while (listener != currentListeners_.end()) {
+            if (dht_) {
+                dht_->cancelListen(listener->hash, std::move(listener->token));
+                // Remove listener if unused
+                listener = currentListeners_.erase(listener);
+            } else {
+                 ++listener;
+            }
         }
     });
 }
@@ -107,6 +119,11 @@ void
 DhtProxyServer::stop()
 {
     service_->stop();
+    auto listener = currentListeners_.begin();
+    while (listener != currentListeners_.end()) {
+        listener->session->close();
+        ++ listener;
+    }
     stopListeners = true;
     // listenThreads_ will stop because there is no more sessions
     if (listenThread_.joinable())
@@ -193,20 +210,25 @@ DhtProxyServer::listen(const std::shared_ptr<restbed::Session>& session) const
                     infoHash = InfoHash::get(hash);
                 }
                 s->yield(restbed::OK);
-                    // Handle client deconnection
-                    // NOTE: for now, there is no handler, so we test the session in a thread
-                    // will be the case in restbed 5.0
+                // Handle client deconnection
+                // NOTE: for now, there is no handler, so we test the session in a thread
+                // will be the case in restbed 5.0
                 SessionToHashToken listener;
                 listener.session = session;
                 listener.hash = infoHash;
-                listener.token = dht_->listen(infoHash, [s](std::shared_ptr<Value> value) {
+                // cache the session to avoid an incrementation of the shared_ptr's counter
+                // else, the session->close() will not close the socket.
+                auto cacheSession = std::weak_ptr<restbed::Session>(s);
+                listener.token = std::move(dht_->listen(infoHash, [cacheSession](std::shared_ptr<Value> value) {
+                    auto s = cacheSession.lock();
+                    if (!s) return false;
                     // Send values as soon as we get them
                     if (!s->is_closed()) {
                         Json::FastWriter writer;
-                        s->yield(writer.write(value->toJson()), [](const std::shared_ptr<restbed::Session> /*session*/){ });
+                        s->yield(writer.write(value->toJson()), [](const std::shared_ptr<restbed::Session>){ });
                     }
                     return !s->is_closed();
-                });
+                }));
                 currentListeners_.emplace_back(std::move(listener));
             } else {
                 session->close(restbed::SERVICE_UNAVAILABLE, "{\"err\":\"Incorrect DhtRunner\"}");
diff --git a/src/dhtrunner.cpp b/src/dhtrunner.cpp
index 2e8b6dc3..53f573f4 100644
--- a/src/dhtrunner.cpp
+++ b/src/dhtrunner.cpp
@@ -122,6 +122,10 @@ DhtRunner::run(const SockAddr& local4, const SockAddr& local6, DhtRunner::Config
 
 void
 DhtRunner::shutdown(ShutdownCallback cb) {
+#if OPENDHT_PROXY_CLIENT
+    if (dht_via_proxy_)
+        dht_via_proxy_->shutdown(cb);
+#endif
     std::lock_guard<std::mutex> lck(storage_mtx);
     pending_ops_prio.emplace([=](SecureDht& dht) mutable {
         dht.shutdown(cb);
@@ -513,8 +517,29 @@ DhtRunner::listen(InfoHash hash, GetCallback vcb, Value::Filter f, Where w)
     auto ret_token = std::make_shared<std::promise<size_t>>();
     {
         std::lock_guard<std::mutex> lck(storage_mtx);
+#if OPENDHT_PROXY_CLIENT
+        pending_ops.emplace([=](SecureDht&) mutable {
+            auto tokenProxy = 0, tokenClassic = 0;
+            if (!use_proxy)
+                tokenClassic = dht_->listen(hash, vcb, std::move(f), std::move(w));
+            else if (dht_via_proxy_)
+                tokenProxy = dht_via_proxy_->listen(hash, vcb, std::move(f), std::move(w));
+#else
         pending_ops.emplace([=](SecureDht& dht) mutable {
-            ret_token->set_value(dht.listen(hash, vcb, std::move(f), std::move(w)));
+            auto tokenClassic = dht.listen(hash, vcb, std::move(f), std::move(w));
+            auto tokenProxy = 0;
+#endif
+            auto listener = std::unique_ptr<Listener>(new Listener());
+            listener->globalToken = listener_token_;
+            listener->tokenClassicDht = tokenClassic;
+            listener->tokenProxyDht = tokenProxy;
+            listener->gcb = vcb;
+            listener->hash = hash;
+            listener->f = f;
+            listener->w = w;
+            this->listeners_.emplace_back(std::move(listener));
+            ret_token->set_value(listener_token_);
+            listener_token_++;
         });
     }
     cv.notify_all();
@@ -531,9 +556,30 @@ void
 DhtRunner::cancelListen(InfoHash h, size_t token)
 {
     {
+#if OPENDHT_PROXY_CLIENT
+        auto it = listeners_.begin();
+        for (; it != listeners_.end(); ++it) {
+            auto& listener = *it;
+            if (listener->globalToken == token) {
+                break;
+            }
+        }
+        if (it == listeners_.end()) return;
+#endif // OPENDHT_PROXY_CLIENT
         std::lock_guard<std::mutex> lck(storage_mtx);
+#if OPENDHT_PROXY_CLIENT
+        pending_ops.emplace([=](SecureDht&) {
+            auto& listener = *it;
+            if (listener->tokenClassicDht != 0) {
+                dht_->cancelListen(h, listener->tokenClassicDht);
+            }
+            if (dht_via_proxy_ && listener->tokenProxyDht > 0) {
+                dht_via_proxy_->cancelListen(h, listener->tokenProxyDht);
+            }
+#else
         pending_ops.emplace([=](SecureDht& dht) {
             dht.cancelListen(h, token);
+#endif // OPENDHT_PROXY_CLIENT
         });
     }
     cv.notify_all();
@@ -702,6 +748,30 @@ DhtRunner::tryBootstrapContinuously()
     });
 }
 
+std::vector<SockAddr>
+DhtRunner::getAddrInfo(const std::string& host, const std::string& service)
+{
+    std::vector<SockAddr> ips {};
+    if (host.empty())
+        return ips;
+
+    addrinfo hints;
+    memset(&hints, 0, sizeof(hints));
+    hints.ai_socktype = SOCK_DGRAM;
+    addrinfo* info = nullptr;
+    int rc = getaddrinfo(host.c_str(), service.c_str(), &hints, &info);
+    if(rc != 0)
+        throw std::invalid_argument(std::string("Error: `") + host + ":" + service + "`: " + gai_strerror(rc));
+
+    addrinfo* infop = info;
+    while (infop) {
+        ips.emplace_back(infop->ai_addr, infop->ai_addrlen);
+        infop = infop->ai_next;
+    }
+    freeaddrinfo(info);
+    return ips;
+}
+
 void
 DhtRunner::bootstrap(const std::string& host, const std::string& service)
 {
@@ -812,15 +882,33 @@ DhtRunner::enableProxy(bool proxify) {
             new DhtProxyClient(serverHost)
         );
         dht_via_proxy_ = std::unique_ptr<SecureDht>(new SecureDht(std::move(dht_via_proxy), config_.dht_config));
+        // add current listeners
+        for (auto& listener: listeners_) {
+            auto tokenProxy = dht_via_proxy_->listen(listener->hash, listener->gcb, std::move(listener->f), std::move(listener->w));
+            listener->tokenProxyDht = tokenProxy;
+        }
         // and use it
         use_proxy = proxify;
     } else {
         use_proxy = proxify;
+        loop_(); // Restart the classic DHT.
         // We doesn't need to maintain the connection with the proxy.
         // Delete it
         dht_via_proxy_.reset(nullptr);
+        // update all proxyToken for all proxyListener
+        auto it = listeners_.begin();
+        for (; it != listeners_.end(); ++it) {
+            auto& listener = *it;
+            if (listener->tokenClassicDht == 0) {
+                pending_ops.emplace([it](SecureDht& dht) mutable {
+                    auto& listener = *it;
+                    auto token = dht.listen(listener->hash, listener->gcb, std::move(listener->f), std::move(listener->w));
+                    listener->tokenClassicDht = token;
+                });
+            }
+            listener->tokenProxyDht = 0;
+        }
     }
 }
 #endif // OPENDHT_PROXY_CLIENT
-
 }
-- 
GitLab