/*
 *  Copyright (C) 2017-2019 Savoir-faire Linux Inc.
 *  Author: Sébastien Blin <sebastien.blin@savoirfairelinux.com>
 *          Adrien Béraud <adrien.beraud@savoirfairelinux.com>
 *          Vsevolod Ivanov <vsevolod.ivanov@savoirfairelinux.com>
 *
 *  This program is free software; you can redistribute it and/or modify
 *  it under the terms of the GNU General Public License as published by
 *  the Free Software Foundation; either version 3 of the License, or
 *  (at your option) any later version.
 *
 *  This program is distributed in the hope that it will be useful,
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *  GNU General Public License for more details.
 *
 *  You should have received a copy of the GNU General Public License
 *  along with this program. If not, see <https://www.gnu.org/licenses/>.
 */

#include "dht_proxy_server.h"

#include "default_types.h"
#include "dhtrunner.h"

#include <msgpack.hpp>
#include <json/json.h>

#include <chrono>
#include <functional>
#include <limits>
#include <iostream>

using namespace std::placeholders;

#ifdef OPENDHT_PROXY_HTTP_PARSER_FORK
namespace restinio {
struct custom_http_methods_t
{
    static constexpr restinio::http_method_id_t from_nodejs(int m) noexcept {
        if(m == method_listen.raw_id())
            return method_listen;
        else if(m == method_stats.raw_id())
            return method_stats;
        else if(m == method_sign.raw_id())
            return method_sign;
        else if(m == method_encrypt.raw_id())
            return method_encrypt;
        else
            return restinio::default_http_methods_t::from_nodejs(m);
    }
};
}
#endif

namespace dht {

constexpr char RESP_MSG_DESTINATION_NOT_FOUND[] = "{\"err\":\"No destination found\"}";
constexpr char RESP_MSG_NO_TOKEN[] = "{\"err\":\"No token\"}";
constexpr char RESP_MSG_JSON_NOT_ENABLED[] = "{\"err\":\"JSON not enabled on this instance\"}";
constexpr char RESP_MSG_JSON_INCORRECT[] = "{\"err:\":\"Incorrect JSON\"}";
constexpr char RESP_MSG_SERVICE_UNAVAILABLE[] = "{\"err\":\"Incorrect DhtRunner\"}";
constexpr char RESP_MSG_INTERNAL_SERVER_ERRROR[] = "{\"err\":\"Internal server error\"}";
constexpr char RESP_MSG_MISSING_PARAMS[] = "{\"err\":\"Missing parameters\"}";
constexpr char RESP_MSG_PUT_FAILED[] = "{\"err\":\"Put failed\"}";

constexpr const std::chrono::minutes PRINT_STATS_PERIOD {2};

class opendht_logger_t
{
public:
    opendht_logger_t(std::shared_ptr<dht::Logger> logger = {}){
        if (logger)
            m_logger = logger;
    }

    template <typename Builder>
    void trace(Builder && msg_builder){
        if (m_logger)
            m_logger->d("[proxy:server] %s", msg_builder().c_str());
    }

    template <typename Builder>
    void info(Builder && msg_builder){
        if (m_logger)
            m_logger->d("[proxy:server] %s", msg_builder().c_str());
    }

    template <typename Builder>
    void warn(Builder && msg_builder){
        if (m_logger)
            m_logger->w("[proxy:server] %s", msg_builder().c_str());
    }

    template <typename Builder>
    void error(Builder && msg_builder){
        if (m_logger)
            m_logger->e("[proxy:server] %s", msg_builder().c_str());
    }

private:
    std::shared_ptr<dht::Logger> m_logger;
};

// connection listener

class DhtProxyServer::ConnectionListener
{
public:
    ConnectionListener() {};
    ConnectionListener(std::shared_ptr<dht::DhtRunner> dht,
        std::shared_ptr<std::map<restinio::connection_id_t, http::ListenerSession>> listeners,
        std::shared_ptr<std::mutex> lock, std::shared_ptr<dht::Logger> logger):
        dht_(dht), lock_(lock), listeners_(listeners), logger_(logger) {};
    ~ConnectionListener() {};

    /**
     * Connection state change used to handle Listeners disconnects.
     * RESTinio >= 0.5.1 https://github.com/Stiffstream/restinio/issues/28
     */
    void state_changed(const restinio::connection_state::notice_t& notice) noexcept;

private:
    std::string to_str( restinio::connection_state::cause_t cause ) noexcept;

    std::shared_ptr<dht::DhtRunner> dht_;
    std::shared_ptr<std::mutex> lock_;
    std::shared_ptr<std::map<restinio::connection_id_t, http::ListenerSession>> listeners_;

    std::shared_ptr<dht::Logger> logger_;
};

void
DhtProxyServer::ConnectionListener::state_changed(const restinio::connection_state::notice_t& notice) noexcept
{
    std::lock_guard<std::mutex> lock(*lock_);
    auto id = notice.connection_id();
    auto cause = to_str(notice.cause());

    if (listeners_->find(id) != listeners_->end()){
        if (notice.cause() == restinio::connection_state::cause_t::closed){
            if (logger_)
                logger_->d("[proxy:server] [connection:%li] cancelling listener", id);
            dht_->cancelListen(listeners_->at(id).hash,
                               std::move(listeners_->at(id).token));
            listeners_->erase(id);
            if (logger_)
                logger_->d("[proxy:server] %li listeners are connected", listeners_->size());
        }
    }
}

std::string
DhtProxyServer::ConnectionListener::to_str(restinio::connection_state::cause_t cause) noexcept
{
    std::string result;
    switch(cause)
    {
    case restinio::connection_state::cause_t::accepted:
        result = "accepted";
        break;
    case restinio::connection_state::cause_t::closed:
        result = "closed";
        break;
    case restinio::connection_state::cause_t::upgraded_to_websocket:
        result = "upgraded";
        break;
    default:
        result = "unknown";
    }
    return result;
}

struct DhtProxyServer::RestRouterTraitsTls : public restinio::default_tls_traits_t
{
    using timer_manager_t = restinio::asio_timer_manager_t;
#ifdef OPENDHT_PROXY_HTTP_PARSER_FORK
    using http_methods_mapper_t = restinio::custom_http_methods_t;
#endif
    using logger_t = opendht_logger_t;
    using request_handler_t = RestRouter;
    using connection_state_listener_t = ConnectionListener;
};
struct DhtProxyServer::RestRouterTraits : public restinio::default_traits_t
{
    using timer_manager_t = restinio::asio_timer_manager_t;
#ifdef OPENDHT_PROXY_HTTP_PARSER_FORK
    using http_methods_mapper_t = restinio::custom_http_methods_t;
#endif
    using logger_t = opendht_logger_t;
    using request_handler_t = RestRouter;
    using connection_state_listener_t = ConnectionListener;
};

DhtProxyServer::DhtProxyServer(
    dht::crypto::Identity identity,
    std::shared_ptr<DhtRunner> dht, in_port_t port, const std::string& pushServer,
    std::shared_ptr<dht::Logger> logger
)
    :   dht_(dht), logger_(logger), lockListener_(std::make_shared<std::mutex>()),
        listeners_(std::make_shared<std::map<restinio::connection_id_t, http::ListenerSession>>()),
        connListener_(std::make_shared<ConnectionListener>(dht, listeners_, lockListener_, logger)),
        pushServer_(pushServer)
{
    if (not dht_)
        throw std::invalid_argument("A DHT instance must be provided");

    if (logger_)
        logger_->d("[proxy:server] [init] running on %i", port);
    if (not pushServer.empty()){
#ifdef OPENDHT_PUSH_NOTIFICATIONS
        if (logger_)
            logger_->d("[proxy:server] [init] using push server %s", pushServer.c_str());
#else
        if (logger_)
            logger_->e("[proxy:server] [init] opendht built without push notification support");
#endif
    }

    jsonBuilder_["commentStyle"] = "None";
    jsonBuilder_["indentation"] = "";

    if (identity.first and identity.second) {
        asio::error_code ec;
        // define tls context
        asio::ssl::context tls_context { asio::ssl::context::sslv23 };
        tls_context.set_options(asio::ssl::context::default_workarounds
                                | asio::ssl::context::no_sslv2
                                | asio::ssl::context::single_dh_use, ec);
        if (ec)
            throw std::runtime_error("Error setting tls context options: " + ec.message());
        // add more security options
#ifdef SSL_OP_NO_RENEGOTIATION
        SSL_CTX_set_options(tls_context.native_handle(), SSL_OP_NO_RENEGOTIATION); // CVE-2009-3555
#endif
        // node private key
        auto pk = identity.first->serialize();
        pk_ = std::make_unique<asio::const_buffer>(static_cast<void*>(pk.data()), (std::size_t) pk.size());
        tls_context.use_private_key(*pk_, asio::ssl::context::file_format::pem, ec);
        if (ec)
            throw std::runtime_error("Error setting node's private key: " + ec.message());
        // certificate chain
        auto cc = identity.second->toString(true/*chain*/);
        cc_ = std::make_unique<asio::const_buffer>(static_cast<const void*>(cc.data()), (std::size_t) cc.size());
        tls_context.use_certificate_chain(*cc_, ec);
        if (ec)
            throw std::runtime_error("Error setting certificate chain: " + ec.message());
        if (logger_)
            logger_->d("[proxy:server] using certificate chain for ssl:\n%s", cc.c_str());
        // build http server
        auto settings = restinio::run_on_this_thread_settings_t<RestRouterTraitsTls>();
        addServerSettings(settings);
        settings.port(port);
        settings.tls_context(std::move(tls_context));
        httpsServer_ = std::make_unique<restinio::http_server_t<RestRouterTraitsTls>>(
            restinio::own_io_context(),
            std::forward<restinio::run_on_this_thread_settings_t<RestRouterTraitsTls>>(std::move(settings))
        );
        // define http request destination
        pushHostPort_ = splitPort(pushServer_);
        // run http server
        serverThread_ = std::thread([this]{
            httpsServer_->open_async([]{/*ok*/}, [](std::exception_ptr ex){
                std::rethrow_exception(ex);
            });
            httpsServer_->io_context().run();
        });
    }
    else {
        auto settings = restinio::run_on_this_thread_settings_t<RestRouterTraits>();
        addServerSettings(settings);
        settings.port(port);
        httpServer_ = std::make_unique<restinio::http_server_t<RestRouterTraits>>(
            restinio::own_io_context(),
            std::forward<restinio::run_on_this_thread_settings_t<RestRouterTraits>>(std::move(settings))
        );
        // define http request destination
        pushHostPort_ = splitPort(pushServer_);
        // run http server
        serverThread_ = std::thread([this](){
            httpServer_->open_async([]{/*ok*/}, [](std::exception_ptr ex){
                std::rethrow_exception(ex);
            });
            httpServer_->io_context().run();
        });
    }
    dht->forwardAllMessages(true);

    printStatsTimer_ = std::make_unique<asio::steady_timer>(io_context(), PRINT_STATS_PERIOD);
    printStatsTimer_->async_wait(std::bind(&DhtProxyServer::handlePrintStats, this, std::placeholders::_1));
}


asio::io_context&
DhtProxyServer::io_context() const
{
    if (httpsServer_)
        return httpsServer_->io_context();
    else if (httpServer_)
        return httpServer_->io_context();
}

DhtProxyServer::~DhtProxyServer()
{
    if (logger_)
        logger_->d("[proxy:server] closing http server");
    if (httpServer_)
        httpServer_->io_context().stop();
    if (httpsServer_)
        httpsServer_->io_context().stop();
    if (serverThread_.joinable())
        serverThread_.join();
    if (logger_)
        logger_->d("[proxy:server] http server closed");
}

template< typename ServerSettings >
void
DhtProxyServer::addServerSettings(ServerSettings& settings, const unsigned int max_pipelined_requests)
{
    using namespace std::chrono;
    /**
     * If max_pipelined_requests is greater than 1 then RESTinio will continue
     * to read from the socket after parsing the first request.
     * In that case, RESTinio can detect the disconnection
     * and calls state listener as expected.
     * https://github.com/Stiffstream/restinio/issues/28
     */
    settings.max_pipelined_requests(max_pipelined_requests);
    // one less to detect the listener disconnect
    settings.concurrent_accepts_count(max_pipelined_requests - 1);
    settings.separate_accept_and_create_connect(true);
    settings.logger(logger_);
    settings.protocol(restinio::asio_ns::ip::tcp::v6());
    settings.request_handler(createRestRouter());
    // time limits                                              // ~ 0.8 month
    std::chrono::milliseconds timeout_request(std::numeric_limits<int>::max());
    settings.read_next_http_message_timelimit(timeout_request);
    settings.write_http_response_timelimit(60s);
    settings.handle_request_timeout(timeout_request);
    // socket options
    settings.socket_options_setter([](auto & options){
        options.set_option(asio::ip::tcp::no_delay{true});
    });
    settings.connection_state_listener(connListener_);
}

void
DhtProxyServer::updateStats() const
{
    auto now = clock::now();
    auto last = lastStatsReset_.exchange(now);
    auto count = requestNum_.exchange(0);
    auto dt = std::chrono::duration<double>(now - last);
    stats_.requestRate = count / dt.count();
#ifdef OPENDHT_PUSH_NOTIFICATIONS
    stats_.pushListenersCount = pushListeners_.size();
#endif
    stats_.putCount = puts_.size();
    stats_.listenCount = listeners_->size();
    stats_.nodeInfo = nodeInfo_;
}

void
DhtProxyServer::handlePrintStats(const asio::error_code &ec)
{
    if (ec == asio::error::operation_aborted)
        return;
    else if (ec){
        if (logger_)
            logger_->e("[proxy:server] [stats] error printing: %s", ec.message().c_str());
    }
    if (io_context().stopped())
        return;

    if (dht_){
        updateStats();
        // Refresh stats cache
        auto newInfo = dht_->getNodeInfo();
        std::lock_guard<std::mutex> lck(statsMutex_);
        nodeInfo_ = std::move(newInfo);
        auto json = nodeInfo_.toJson();
        auto str = Json::writeString(jsonBuilder_, json);
        if (logger_)
            logger_->d("[proxy:server] [stats] %s", str.c_str());
    }
    printStatsTimer_->expires_at(printStatsTimer_->expiry() + PRINT_STATS_PERIOD);
    printStatsTimer_->async_wait(std::bind(&DhtProxyServer::handlePrintStats, this, std::placeholders::_1));
}

template <typename HttpResponse>
HttpResponse DhtProxyServer::initHttpResponse(HttpResponse response) const
{
    response.append_header("Server", "RESTinio");
    response.append_header(restinio::http_field::content_type, "application/json");
    response.append_header(restinio::http_field::access_control_allow_origin, "*");
    response.connection_keep_alive();
    return response;
}

std::unique_ptr<RestRouter>
DhtProxyServer::createRestRouter()
{
    using namespace std::placeholders;
    auto router = std::make_unique<RestRouter>();

    // **************************** LEGACY ROUTES ****************************
    // node.info
    router->http_get("/", std::bind(&DhtProxyServer::getNodeInfo, this, _1, _2));
#ifdef OPENDHT_PROXY_HTTP_PARSER_FORK
    // node.stats
    router->add_handler(restinio::custom_http_methods_t::from_nodejs(restinio::method_stats.raw_id()),
                        "/", std::bind(&DhtProxyServer::getStats, this, _1, _2));
#endif
    // key.options
    router->add_handler(restinio::http_method_options(),
                        "/:hash", std::bind(&DhtProxyServer::options, this, _1, _2));
    // key.get
    router->http_get("/:hash", std::bind(&DhtProxyServer::get, this, _1, _2));
    // key.post
    router->http_post("/:hash", std::bind(&DhtProxyServer::put, this, _1, _2));
#ifdef OPENDHT_PROXY_HTTP_PARSER_FORK
    // key.listen
    router->add_handler(restinio::custom_http_methods_t::from_nodejs(restinio::method_listen.raw_id()),
                        "/:hash", std::bind(&DhtProxyServer::listen, this, _1, _2));
#endif
#ifdef OPENDHT_PUSH_NOTIFICATIONS
    // key.subscribe
    router->add_handler(restinio::http_method_subscribe(),
                        "/:hash", std::bind(&DhtProxyServer::subscribe, this, _1, _2));
    // key.unsubscribe
    router->add_handler(restinio::http_method_unsubscribe(),
                        "/:hash", std::bind(&DhtProxyServer::unsubscribe, this, _1, _2));
#endif //OPENDHT_PUSH_NOTIFICATIONS
#ifdef OPENDHT_PROXY_SERVER_IDENTITY
#ifdef OPENDHT_PROXY_HTTP_PARSER_FORK
    // key.sign
    router->add_handler(restinio::custom_http_methods_t::from_nodejs(restinio::method_sign.raw_id()),
                        "/:hash", std::bind(&DhtProxyServer::putSigned, this, _1, _2));
    // key.encrypt
    router->add_handler(restinio::custom_http_methods_t::from_nodejs(restinio::method_encrypt.raw_id()),
                        "/:hash", std::bind(&DhtProxyServer::putEncrypted, this, _1, _2));
#endif
#endif // OPENDHT_PROXY_SERVER_IDENTITY

    // **************************** NEW ROUTES ****************************
    // node.info
    router->http_get("/node/info", std::bind(&DhtProxyServer::getNodeInfo, this, _1, _2));
    // node.stats
    router->http_get("/node/stats", std::bind(&DhtProxyServer::getStats, this, _1, _2));
    // key.options
    router->http_get("/key/:hash/options", std::bind(&DhtProxyServer::options, this, _1, _2));
    // key.get
    router->http_get("/key/:hash", std::bind(&DhtProxyServer::get, this, _1, _2));
    // key.post
    router->http_post("/key/:hash", std::bind(&DhtProxyServer::put, this, _1, _2));
    // key.listen
    router->http_get("/key/:hash/listen", std::bind(&DhtProxyServer::listen, this, _1, _2));
#ifdef OPENDHT_PUSH_NOTIFICATIONS
    // key.subscribe
    router->add_handler(restinio::http_method_subscribe(),
                        "/key/:hash", std::bind(&DhtProxyServer::subscribe, this, _1, _2));
    // key.unsubscribe
    router->add_handler(restinio::http_method_unsubscribe(),
                        "/key/:hash", std::bind(&DhtProxyServer::unsubscribe, this, _1, _2));
#endif //OPENDHT_PUSH_NOTIFICATIONS
#ifdef OPENDHT_PROXY_SERVER_IDENTITY
    // key.sign
    router->http_post("/key/:hash/sign", std::bind(&DhtProxyServer::putSigned, this, _1, _2));
    // key.encrypt
    router->http_post("/key/:hash/encrypt", std::bind(&DhtProxyServer::putEncrypted, this, _1, _2));
#endif // OPENDHT_PROXY_SERVER_IDENTITY

    return router;
}

RequestStatus
DhtProxyServer::getNodeInfo(restinio::request_handle_t request,
                            restinio::router::route_params_t /*params*/) const
{
    Json::Value result;
    std::lock_guard<std::mutex> lck(statsMutex_);
    if (nodeInfo_.ipv4.good_nodes == 0 &&
        nodeInfo_.ipv6.good_nodes == 0){
        nodeInfo_ = this->dht_->getNodeInfo();
    }
    result = nodeInfo_.toJson();
    // [ipv6:ipv4]:port or ipv4:port
    result["public_ip"] = request->remote_endpoint().address().to_string();
    auto output = Json::writeString(jsonBuilder_, result) + "\n";

    auto response = this->initHttpResponse(request->create_response());
    response.append_body(output);
    return response.done();
}

RequestStatus
DhtProxyServer::getStats(restinio::request_handle_t request,
                         restinio::router::route_params_t /*params*/)
{
    requestNum_++;
    try {
        if (dht_){
#ifdef OPENDHT_JSONCPP
            auto output = Json::writeString(jsonBuilder_, stats_.toJson()) + "\n";
            auto response = this->initHttpResponse(request->create_response());
            response.append_body(output);
            response.done();
#else
            auto response = this->initHttpResponse(
                request->create_response(restinio::status_not_found()));
            response.set_body(RESP_MSG_JSON_NOT_ENABLED);
            return response.done();
#endif
        } else {
            auto response = this->initHttpResponse(
                request->create_response(restinio::status_service_unavailable()));
            response.set_body(RESP_MSG_SERVICE_UNAVAILABLE);
            return response.done();
        }
    } catch (...){
        auto response = this->initHttpResponse(
            request->create_response(restinio::status_internal_server_error()));
        response.set_body(RESP_MSG_INTERNAL_SERVER_ERRROR);
        return response.done();
    }
    return restinio::request_handling_status_t::accepted;
}

RequestStatus
DhtProxyServer::get(restinio::request_handle_t request,
                    restinio::router::route_params_t params)
{
    requestNum_++;
    dht::InfoHash infoHash(params["hash"].to_string());
    if (!infoHash)
        infoHash = dht::InfoHash::get(params["hash"].to_string());

    if (!dht_){
        auto response = this->initHttpResponse(
            request->create_response(restinio::status_service_unavailable()));
        response.set_body(RESP_MSG_SERVICE_UNAVAILABLE);
        return response.done();
    }

    auto response = std::make_shared<ResponseByPartsBuilder>(
        this->initHttpResponse(request->create_response<ResponseByParts>()));
    response->flush();
    try {
        dht_->get(infoHash, [this, response](const dht::Sp<dht::Value>& value){
            auto output = Json::writeString(jsonBuilder_, value->toJson()) + "\n";
            response->append_chunk(output);
            response->flush();
            return true;
        },
        [response] (bool /*ok*/){
            response->done();
        });
    } catch (const std::exception& e){
        auto response = this->initHttpResponse(
            request->create_response(restinio::status_internal_server_error()));
        response.set_body(RESP_MSG_INTERNAL_SERVER_ERRROR);
        return response.done();
    }
    return restinio::request_handling_status_t::accepted;
}

RequestStatus
DhtProxyServer::listen(restinio::request_handle_t request,
                       restinio::router::route_params_t params)
{
    requestNum_++;
    dht::InfoHash infoHash(params["hash"].to_string());
    if (!infoHash)
        infoHash = dht::InfoHash::get(params["hash"].to_string());

    if (!dht_){
        auto response = this->initHttpResponse(
            request->create_response(restinio::status_service_unavailable()));
        response.set_body(RESP_MSG_SERVICE_UNAVAILABLE);
        return response.done();
    }
    auto response = std::make_shared<ResponseByPartsBuilder>(
        this->initHttpResponse(request->create_response<ResponseByParts>()));
    response->flush();
    try {
        std::lock_guard<std::mutex> lock(*lockListener_);
        // save the listener to handle a disconnect
        auto &session = (*listeners_)[request->connection_id()];
        session.hash = infoHash;
        session.response = response;
        session.token = dht_->listen(infoHash, [this, response]
                (const std::vector<dht::Sp<dht::Value>>& values, bool expired){
            for (const auto& value: values){
                auto jsonVal = value->toJson();
                if (expired)
                    jsonVal["expired"] = true;
                auto output = Json::writeString(jsonBuilder_, jsonVal) + "\n";
                response->append_chunk(output);
                response->flush();
            }
            return true;
        });

    } catch (const std::exception& e){
        auto response = this->initHttpResponse(
            request->create_response(restinio::status_internal_server_error()));
        response.set_body(RESP_MSG_INTERNAL_SERVER_ERRROR);
        return response.done();
    }
    return restinio::request_handling_status_t::accepted;
}

#ifdef OPENDHT_PUSH_NOTIFICATIONS

RequestStatus
DhtProxyServer::subscribe(restinio::request_handle_t request,
                          restinio::router::route_params_t params)
{
    requestNum_++;

    dht::InfoHash infoHash(params["hash"].to_string());
    if (!infoHash)
        infoHash = dht::InfoHash::get(params["hash"].to_string());

    if (!dht_){
        auto response = this->initHttpResponse(
            request->create_response(restinio::status_service_unavailable()));
        response.set_body(RESP_MSG_SERVICE_UNAVAILABLE);
        return response.done();
    }
    try {
        std::string err;
        Json::Value root;
        Json::CharReaderBuilder rbuilder;
        auto* char_data = reinterpret_cast<const char*>(request->body().data());
        auto reader = std::unique_ptr<Json::CharReader>(rbuilder.newCharReader());
        if (!reader->parse(char_data, char_data + request->body().size(), &root, &err)){
            auto response = this->initHttpResponse(
                request->create_response(restinio::status_bad_request()));
            response.set_body(RESP_MSG_JSON_INCORRECT);
            return response.done();
        }
        auto pushToken = root["key"].asString();
        if (pushToken.empty()){
            auto response = this->initHttpResponse(
                request->create_response(restinio::status_bad_request()));
            response.set_body(RESP_MSG_NO_TOKEN);
            return response.done();
        }
        auto platform = root["platform"].asString();
        auto isAndroid = platform == "android";
        auto clientId = root.isMember("client_id") ? root["client_id"].asString() : std::string();

        if (logger_)
            logger_->d("[proxy:server] [subscribe %s] [client %s]", infoHash.toString().c_str(), clientId.c_str());
        // ================ Search for existing listener ===================
        // start the timer
        auto timeout = std::chrono::steady_clock::now() + proxy::OP_TIMEOUT;
        std::lock_guard<std::mutex> lock(lockPushListeners_);

        // Insert new or return existing push listeners of a token
        auto pushListener = pushListeners_.emplace(pushToken, PushListener{}).first;
        auto pushListeners = pushListener->second.listeners.emplace(infoHash, std::vector<Listener>{}).first;

        for (auto &listener: pushListeners->second){
            if (logger_)
                logger_->d("[proxy:server] [subscribe] found [client %s]", listener.clientId.c_str());
            // Found -> Resubscribe
            if (listener.clientId == clientId){
                // Reset timers
                listener.expireTimer->expires_at(timeout);
                listener.expireNotifyTimer->expires_at(timeout - proxy::OP_MARGIN);
                // Send response header
                auto response = std::make_shared<ResponseByPartsBuilder>(
                    this->initHttpResponse(request->create_response<ResponseByParts>()));
                response->flush();
                // No Refresh
                if (!root.isMember("refresh") or !root["refresh"].asBool()){
                    dht_->get(infoHash, [this, response](const dht::Sp<dht::Value>& value){
                        auto output = Json::writeString(jsonBuilder_, value->toJson()) + "\n";
                        response->append_chunk(output);
                        response->flush();
                        return true;
                    },
                    [response] (bool){
                        response->done();
                    });
                // Refresh
                } else {
                    response->append_chunk("{}\n");
                    response->done();
                }
                return restinio::request_handling_status_t::accepted;
            }
        }
        // =========== No existing listener for an infoHash ============
        // Add new listener to list of listeners
        pushListeners->second.emplace_back(Listener{});
        auto &listener = pushListeners->second.back();
        listener.clientId = clientId;

        // Add listen on dht
        listener.internalToken = dht_->listen(infoHash,
            [this, infoHash, pushToken, isAndroid, clientId]
            (const std::vector<std::shared_ptr<Value>>& values, bool expired){
                // Build message content
                Json::Value json;
                json["key"] = infoHash.toString();
                json["to"] = clientId;
                if (expired and values.size() < 2){
                    std::stringstream ss;
                    for(size_t i = 0; i < values.size(); ++i){
                        if(i != 0) ss << ",";
                        ss << values[i]->id;
                    }
                    json["exp"] = ss.str();
                }
                sendPushNotification(pushToken, std::move(json), isAndroid);
                return true;
            }
        );
        // Launch timers
        auto &ctx = io_context();
        // expire notify
        if (!listener.expireNotifyTimer)
            listener.expireNotifyTimer = std::make_unique<asio::steady_timer>(ctx, timeout - proxy::OP_MARGIN);
        else
            listener.expireNotifyTimer->expires_at(timeout - proxy::OP_MARGIN);
        Json::Value json;
        json["timeout"] = infoHash.toString();
        json["to"] = clientId;
        listener.expireNotifyTimer->async_wait(std::bind(&DhtProxyServer::handleNotifyPushListenExpire, this,
                                               std::placeholders::_1, pushToken, json, isAndroid));
        // cancel push listen
        if (!listener.expireTimer)
            listener.expireTimer = std::make_unique<asio::steady_timer>(ctx, timeout);
        else
            listener.expireTimer->expires_at(timeout);
        listener.expireTimer->async_wait(std::bind(&DhtProxyServer::handleCancelPushListen, this,
                                         std::placeholders::_1, pushToken, infoHash, clientId));
        auto response = this->initHttpResponse(request->create_response());
        response.set_body("{}\n");
        return response.done();
    }
    catch (...) {
        auto response = this->initHttpResponse(
            request->create_response(restinio::status_internal_server_error()));
        response.set_body(RESP_MSG_INTERNAL_SERVER_ERRROR);
        return response.done();
    }
    return restinio::request_handling_status_t::accepted;
}

RequestStatus
DhtProxyServer::unsubscribe(restinio::request_handle_t request,
                            restinio::router::route_params_t params)
{
    requestNum_++;

    dht::InfoHash infoHash(params["hash"].to_string());
    if (!infoHash)
        infoHash = dht::InfoHash::get(params["hash"].to_string());

    if (!dht_){
        auto response = this->initHttpResponse(
            request->create_response(restinio::status_service_unavailable()));
        response.set_body(RESP_MSG_SERVICE_UNAVAILABLE);
        return response.done();
    }
    if (logger_)
        logger_->d("[proxy:server] [unsubscribe %s]", infoHash.toString().c_str());

    try {
        std::string err;
        Json::Value root;
        Json::CharReaderBuilder rbuilder;
        auto* char_data = reinterpret_cast<const char*>(request->body().data());
        auto reader = std::unique_ptr<Json::CharReader>(rbuilder.newCharReader());

        if (!reader->parse(char_data, char_data + request->body().size(), &root, &err)){
            auto response = this->initHttpResponse(
                request->create_response(restinio::status_bad_request()));
            response.set_body(RESP_MSG_JSON_INCORRECT);
            return response.done();
        }
        auto pushToken = root["key"].asString();
        if (pushToken.empty())
            return restinio::request_handling_status_t::rejected;
        auto clientId = root["client_id"].asString();

        handleCancelPushListen(asio::error_code() /*success*/, pushToken, infoHash, clientId);
        auto response = this->initHttpResponse(request->create_response());
        return response.done();
    }
    catch (...) {
        auto response = this->initHttpResponse(
            request->create_response(restinio::status_internal_server_error()));
        response.set_body(RESP_MSG_INTERNAL_SERVER_ERRROR);
        return response.done();
    }
}

void
DhtProxyServer::handleNotifyPushListenExpire(const asio::error_code &ec, const std::string pushToken,
                                             Json::Value json, const bool isAndroid)
{
    if (ec == asio::error::operation_aborted)
        return;
    else if (ec){
        if (logger_)
            logger_->e("[proxy:server] [subscribe] error sending put refresh: %s", ec.message().c_str());
    }
    if (logger_)
        logger_->d("[proxy:server] [subscribe] sending put refresh to %s token", pushToken.c_str());
    sendPushNotification(pushToken, std::move(json), isAndroid);
}

void
DhtProxyServer::handleCancelPushListen(const asio::error_code &ec, const std::string pushToken,
                                       const dht::InfoHash key, const std::string clientId)
{
    if (ec == asio::error::operation_aborted)
        return;
    else if (ec){
        if (logger_)
            logger_->e("[proxy:server] [listen:push %s] error cancel: %s",
                        key.toString().c_str(), ec.message().c_str());
    }
    if (logger_)
        logger_->d("[proxy:server] [listen:push %s] cancelled for %s",
                   key.toString().c_str(), clientId.c_str());
    std::lock_guard<std::mutex> lock(*lockListener_);

    auto pushListener = pushListeners_.find(pushToken);
    if (pushListener == pushListeners_.end())
        return;
    auto listeners = pushListener->second.listeners.find(key);
    if (listeners == pushListener->second.listeners.end())
        return;

    for (auto listener = listeners->second.begin(); listener != listeners->second.end();){
        if (listener->clientId == clientId){
            if (dht_)
                dht_->cancelListen(key, std::move(listener->internalToken));
            listener = listeners->second.erase(listener);
        } else {
            ++listener;
        }
    }
    if (listeners->second.empty())
        pushListener->second.listeners.erase(listeners);
    if (pushListener->second.listeners.empty())
        pushListeners_.erase(pushListener);
}

void
DhtProxyServer::sendPushNotification(const std::string& token, Json::Value&& json, bool isAndroid)
{
    if (pushServer_.empty())
        return;

    auto request = std::make_shared<http::Request>(io_context(), pushHostPort_.first, pushHostPort_.second,
                                                                 httpsServer_ ? true : false, logger_);
    auto reqid = request->id();
    try {
        request->set_target("/api/push");
        request->set_method(restinio::http_method_post());

        request->set_header_field(restinio::http_field_t::host, pushServer_.c_str());
        request->set_header_field(restinio::http_field_t::user_agent, "RESTinio client");
        request->set_header_field(restinio::http_field_t::accept, "*/*");
        request->set_header_field(restinio::http_field_t::content_type, "application/json");

        // NOTE: see https://github.com/appleboy/gorush
        Json::Value notification(Json::objectValue);
        Json::Value tokens(Json::arrayValue);
        tokens[0] = token;
        notification["tokens"] = std::move(tokens);
        notification["platform"] = isAndroid ? 2 : 1;
        notification["data"] = std::move(json);
        notification["priority"] = "high";
        notification["time_to_live"] = 600;

        Json::Value notifications(Json::arrayValue);
        notifications[0] = notification;

        Json::Value content;
        content["notifications"] = std::move(notifications);

        Json::StreamWriterBuilder wbuilder;
        wbuilder["commentStyle"] = "None";
        wbuilder["indentation"] = "";

        auto body = Json::writeString(wbuilder, content);
        request->set_body(body);

        request->add_on_state_change_callback([this, reqid]
                                              (const http::Request::State state, const http::Response response){
            if (state == http::Request::State::DONE){
                if (logger_ and response.status_code != 200)
                    logger_->e("[proxy:server] [notification] push failed: %i", response.status_code);
                requests_.erase(reqid);
            }
        });
        request->send();
        requests_[reqid] = request;
    }
    catch (const std::exception &e){
        if (logger_)
            logger_->e("[proxy:server] [notification] error send push: %i", e.what());
        requests_.erase(reqid);
    }
}

#endif //OPENDHT_PUSH_NOTIFICATIONS

void
DhtProxyServer::handleCancelPermamentPut(const asio::error_code &ec, const InfoHash& key, Value::Id vid)
{
    if (ec == asio::error::operation_aborted)
        return;
    else if (ec){
        if (logger_)
            logger_->e("[proxy:server] [put:permament] error sending put refresh: %s", ec.message().c_str());
    }
    if (logger_)
        logger_->d("[proxy:server] [put %s] cancel permament put %i", key.toString().c_str(), vid);
    auto sPuts = puts_.find(key);
    if (sPuts == puts_.end())
        return;
    auto& sPutsMap = sPuts->second.puts;
    auto put = sPutsMap.find(vid);
    if (put == sPutsMap.end())
        return;
    if (dht_)
        dht_->cancelPut(key, vid);
    if (put->second.expireNotifyTimer)
        put->second.expireNotifyTimer->cancel();
    sPutsMap.erase(put);
    if (sPutsMap.empty())
        puts_.erase(sPuts);
}

RequestStatus
DhtProxyServer::put(restinio::request_handle_t request,
                    restinio::router::route_params_t params)
{
    requestNum_++;
    dht::InfoHash infoHash(params["hash"].to_string());
    if (!infoHash)
        infoHash = dht::InfoHash::get(params["hash"].to_string());

    if (!dht_){
        auto response = this->initHttpResponse(
            request->create_response(restinio::status_service_unavailable()));
        response.set_body(RESP_MSG_SERVICE_UNAVAILABLE);
        return response.done();
    }
    else if (request->body().empty()){
        auto response = this->initHttpResponse(
            request->create_response(restinio::status_bad_request()));
        response.set_body(RESP_MSG_MISSING_PARAMS);
        return response.done();
    }

    try {
        std::string err;
        Json::Value root;
        Json::CharReaderBuilder rbuilder;
        auto* char_data = reinterpret_cast<const char*>(request->body().data());
        auto reader = std::unique_ptr<Json::CharReader>(rbuilder.newCharReader());

        if (reader->parse(char_data, char_data + request->body().size(), &root, &err)){
            auto value = std::make_shared<dht::Value>(root);
            bool permanent = root.isMember("permanent");
            if (logger_)
                logger_->d("[proxy:server] [put %s] %s %s", infoHash.toString().c_str(),
                          value->toString().c_str(), (permanent ? "permanent" : ""));
            if (permanent){
                std::string pushToken, clientId, platform;
                auto& pVal = root["permanent"];
                if (pVal.isObject()){
                    pushToken = pVal["key"].asString();
                    clientId = pVal["client_id"].asString();
                    platform = pVal["platform"].asString();
                }
                std::unique_lock<std::mutex> lock(lockSearchPuts_);
                auto timeout = std::chrono::steady_clock::now() + proxy::OP_TIMEOUT;
                auto vid = value->id;
                auto sPuts = puts_.emplace(infoHash, SearchPuts{}).first;
                auto r = sPuts->second.puts.emplace(vid, PermanentPut{});
                auto& pput = r.first->second;
                if (r.second){
                    auto &ctx = io_context();
                    // cancel permanent put
                    if (!pput.expireTimer)
                        pput.expireTimer = std::make_unique<asio::steady_timer>(ctx, timeout);
                    else
                        pput.expireTimer->expires_at(timeout);
                    pput.expireTimer->async_wait(std::bind(&DhtProxyServer::handleCancelPermamentPut, this,
                                                 std::placeholders::_1, infoHash, vid));
#ifdef OPENDHT_PUSH_NOTIFICATIONS
                    if (not pushToken.empty()){
                        // notify push listen expire
                        bool isAndroid = platform == "android";
                        Json::Value json;
                        json["timeout"] = infoHash.toString();
                        json["to"] = clientId;
                        json["vid"] = std::to_string(vid);
                        if (!pput.expireNotifyTimer)
                            pput.expireNotifyTimer = std::make_unique<asio::steady_timer>(ctx,
                                                     timeout - proxy::OP_MARGIN);
                        else
                            pput.expireNotifyTimer->expires_at(timeout - proxy::OP_MARGIN);
                        pput.expireNotifyTimer->async_wait(std::bind(
                            &DhtProxyServer::handleNotifyPushListenExpire, this,
                            std::placeholders::_1, pushToken, json, isAndroid));
                    }
#endif
                } else {
                    pput.expireTimer->expires_at(timeout);
                    if (pput.expireNotifyTimer)
                        pput.expireNotifyTimer->expires_at(timeout - proxy::OP_MARGIN);
                }
                lock.unlock();
            }
            dht_->put(infoHash, value, [this, request, value](bool ok){
                if (ok){
                    auto output = Json::writeString(jsonBuilder_, value->toJson()) + "\n";
                    auto response = this->initHttpResponse(request->create_response());
                    response.append_body(output);
                    response.done();
                } else {
                    auto response = this->initHttpResponse(request->create_response(
                        restinio::status_bad_gateway()));
                    response.set_body(RESP_MSG_PUT_FAILED);
                    response.done();
                }
            }, dht::time_point::max(), permanent);
        } else {
            auto response = this->initHttpResponse(
                request->create_response(restinio::status_bad_request()));
            response.set_body(RESP_MSG_JSON_INCORRECT);
            return response.done();
        }
    } catch (const std::exception& e){
        if (logger_)
            logger_->d("[proxy:server] error in put: %s", e.what());
        auto response = this->initHttpResponse(
            request->create_response(restinio::status_internal_server_error()));
        response.set_body(RESP_MSG_INTERNAL_SERVER_ERRROR);
        return response.done();
    }
    return restinio::request_handling_status_t::accepted;
}

#ifdef OPENDHT_PROXY_SERVER_IDENTITY

RequestStatus DhtProxyServer::putSigned(restinio::request_handle_t request,
                                        restinio::router::route_params_t params) const
{
    requestNum_++;
    dht::InfoHash infoHash(params["hash"].to_string());
    if (!infoHash)
        infoHash = dht::InfoHash::get(params["hash"].to_string());

    if (!dht_){
        auto response = this->initHttpResponse(
            request->create_response(restinio::status_service_unavailable()));
        response.set_body(RESP_MSG_SERVICE_UNAVAILABLE);
        return response.done();
    }
    else if (request->body().empty()){
        auto response = this->initHttpResponse(
            request->create_response(restinio::status_bad_request()));
        response.set_body(RESP_MSG_MISSING_PARAMS);
        return response.done();
    }

    try {
        std::string err;
        Json::Value root;
        Json::CharReaderBuilder rbuilder;
        auto* char_data = reinterpret_cast<const char*>(request->body().data());
        auto reader = std::unique_ptr<Json::CharReader>(rbuilder.newCharReader());

        if (reader->parse(char_data, char_data + request->body().size(), &root, &err)){

            auto value = std::make_shared<Value>(root);

            dht_->putSigned(infoHash, value, [this, request, value](bool ok){
                if (ok){
                    auto output = Json::writeString(jsonBuilder_, value->toJson()) + "\n";
                    auto response = this->initHttpResponse(request->create_response());
                    response.append_body(output);
                    response.done();
                } else {
                    auto response = this->initHttpResponse(request->create_response(
                        restinio::status_bad_gateway()));
                    response.set_body(RESP_MSG_PUT_FAILED);
                    response.done();
                }
            });
        } else {
            auto response = this->initHttpResponse(
                request->create_response(restinio::status_bad_request()));
            response.set_body(RESP_MSG_JSON_INCORRECT);
            return response.done();
        }
    } catch (const std::exception& e){
        if (logger_)
            logger_->d("[proxy:server] error in put: %s", e.what());
        auto response = this->initHttpResponse(
            request->create_response(restinio::status_internal_server_error()));
        response.set_body(RESP_MSG_INTERNAL_SERVER_ERRROR);
        return response.done();
    }
    return restinio::request_handling_status_t::accepted;
}

RequestStatus
DhtProxyServer::putEncrypted(restinio::request_handle_t request,
                             restinio::router::route_params_t params)
{
    requestNum_++;
    dht::InfoHash infoHash(params["hash"].to_string());
    if (!infoHash)
        infoHash = dht::InfoHash::get(params["hash"].to_string());

    if (!dht_){
        auto response = this->initHttpResponse(
            request->create_response(restinio::status_service_unavailable()));
        response.set_body(RESP_MSG_SERVICE_UNAVAILABLE);
        return response.done();
    }
    else if (request->body().empty()){
        auto response = this->initHttpResponse(
            request->create_response(restinio::status_bad_request()));
        response.set_body(RESP_MSG_MISSING_PARAMS);
        return response.done();
    }

    try {
        std::string err;
        Json::Value root;
        Json::CharReaderBuilder rbuilder;
        auto* char_data = reinterpret_cast<const char*>(request->body().data());
        auto reader = std::unique_ptr<Json::CharReader>(rbuilder.newCharReader());

        if (reader->parse(char_data, char_data + request->body().size(), &root, &err)){
            InfoHash to(root["to"].asString());
            if (!to){
                auto response = this->initHttpResponse(
                    request->create_response(restinio::status_bad_request()));
                response.set_body(RESP_MSG_DESTINATION_NOT_FOUND);
                return response.done();
            }
            auto value = std::make_shared<Value>(root);
            dht_->putEncrypted(infoHash, to, value, [this, request, value](bool ok){
                if (ok){
                    auto output = Json::writeString(jsonBuilder_, value->toJson()) + "\n";
                    auto response = this->initHttpResponse(request->create_response());
                    response.append_body(output);
                    response.done();
                } else {
                    auto response = this->initHttpResponse(request->create_response(
                        restinio::status_bad_gateway()));
                    response.set_body(RESP_MSG_PUT_FAILED);
                    response.done();
                }
            });
        } else {
            auto response = this->initHttpResponse(
                request->create_response(restinio::status_bad_request()));
            response.set_body(RESP_MSG_JSON_INCORRECT);
            return response.done();
        }
    } catch (const std::exception& e){
        if (logger_)
            logger_->d("[proxy:server] error in put: %s", e.what());
        auto response = this->initHttpResponse(
            request->create_response(restinio::status_internal_server_error()));
        response.set_body(RESP_MSG_INTERNAL_SERVER_ERRROR);
        return response.done();
    }
    return restinio::request_handling_status_t::accepted;
}

#endif // OPENDHT_PROXY_SERVER_IDENTITY

RequestStatus
DhtProxyServer::options(restinio::request_handle_t request,
                        restinio::router::route_params_t /*params*/)
{
    this->requestNum_++;
#ifdef OPENDHT_PROXY_SERVER_IDENTITY
    const auto methods = "OPTIONS, GET, POST, LISTEN, SIGN, ENCRYPT";
#else
    const auto methods = "OPTIONS, GET, POST, LISTEN";
#endif
    auto response = initHttpResponse(request->create_response());
    response.append_header(restinio::http_field::access_control_allow_methods, methods);
    response.append_header(restinio::http_field::access_control_allow_headers, "content-type");
    response.append_header(restinio::http_field::access_control_max_age, "86400");
    return response.done();
}

RequestStatus
DhtProxyServer::getFiltered(restinio::request_handle_t request,
                            restinio::router::route_params_t params)
{
    requestNum_++;
    auto value = params["value"].to_string();
    dht::InfoHash infoHash(params["hash"].to_string());
    if (!infoHash)
        infoHash = dht::InfoHash::get(params["hash"].to_string());

    if (!dht_){
        auto response = this->initHttpResponse(
            request->create_response(restinio::status_service_unavailable()));
        response.set_body(RESP_MSG_SERVICE_UNAVAILABLE);
        return response.done();
    }

    auto response = std::make_shared<ResponseByPartsBuilder>(
        this->initHttpResponse(request->create_response<ResponseByParts>()));
    response->flush();
    try {
        dht_->get(infoHash, [this, response](const dht::Sp<dht::Value>& value){
            auto output = Json::writeString(jsonBuilder_, value->toJson()) + "\n";
            response->append_chunk(output);
            response->flush();
            return true;
        },
        [response] (bool /*ok*/){
            response->done();
        },
            {}, value
        );
    } catch (const std::exception& e){
        auto response = this->initHttpResponse(
            request->create_response(restinio::status_internal_server_error()));
        response.set_body(RESP_MSG_INTERNAL_SERVER_ERRROR);
        return response.done();
    }
    return restinio::request_handling_status_t::accepted;
}

}