Project 'savoirfairelinux/ring-client-uwp' was moved to 'savoirfairelinux/jami-client-uwp'. Please update any links and bookmarks that may still have the old path.
Select Git revision
mydisplay.cpp
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
dht_proxy_server.cpp 58.13 KiB
/*
* Copyright (C) 2014-2020 Savoir-faire Linux Inc.
* Author: Sébastien Blin <sebastien.blin@savoirfairelinux.com>
* Adrien Béraud <adrien.beraud@savoirfairelinux.com>
* Vsevolod Ivanov <vsevolod.ivanov@savoirfairelinux.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include "dht_proxy_server.h"
#include "default_types.h"
#include "dhtrunner.h"
#include <msgpack.hpp>
#include <json/json.h>
#include <chrono>
#include <functional>
#include <limits>
#include <iostream>
#include <fstream>
using namespace std::placeholders;
using namespace std::chrono_literals;
#ifdef OPENDHT_PROXY_HTTP_PARSER_FORK
namespace restinio {
struct custom_http_methods_t
{
static constexpr restinio::http_method_id_t from_nodejs(int m) noexcept {
if(m == method_listen.raw_id())
return method_listen;
else if(m == method_stats.raw_id())
return method_stats;
else if(m == method_sign.raw_id())
return method_sign;
else if(m == method_encrypt.raw_id())
return method_encrypt;
else
return restinio::default_http_methods_t::from_nodejs(m);
}
};
}
#endif
namespace dht {
constexpr char RESP_MSG_JSON_INCORRECT[] = "{\"err:\":\"Incorrect JSON\"}";
constexpr char RESP_MSG_SERVICE_UNAVAILABLE[] = "{\"err\":\"Incorrect DhtRunner\"}";
constexpr char RESP_MSG_INTERNAL_SERVER_ERRROR[] = "{\"err\":\"Internal server error\"}";
constexpr char RESP_MSG_MISSING_PARAMS[] = "{\"err\":\"Missing parameters\"}";
constexpr char RESP_MSG_PUT_FAILED[] = "{\"err\":\"Put failed\"}";
#ifdef OPENDHT_PROXY_SERVER_IDENTITY
constexpr char RESP_MSG_DESTINATION_NOT_FOUND[] = "{\"err\":\"No destination found\"}";
#endif
#ifdef OPENDHT_PUSH_NOTIFICATIONS
constexpr char RESP_MSG_NO_TOKEN[] = "{\"err\":\"No token\"}";
#endif
constexpr const std::chrono::minutes PRINT_STATS_PERIOD {2};
using ResponseByParts = restinio::chunked_output_t;
using ResponseByPartsBuilder = restinio::response_builder_t<ResponseByParts>;
class opendht_logger_t
{
public:
opendht_logger_t(std::shared_ptr<Logger> logger = {}) : m_logger(std::move(logger)) {}
template <typename Builder>
void trace(Builder&& /* msg_builder */) {
/* if (m_logger) m_logger->d("[proxy:server] %s", msg_builder().c_str()); */
}
template <typename Builder>
void info(Builder&& msg_builder) {
if (m_logger) m_logger->d("[proxy:server] %s", msg_builder().c_str());
}
template <typename Builder>
void warn(Builder&& msg_builder) {
if (m_logger) m_logger->w("[proxy:server] %s", msg_builder().c_str());
}
template <typename Builder>
void error(Builder&& msg_builder) {
if (m_logger) m_logger->e("[proxy:server] %s", msg_builder().c_str());
}
private:
std::shared_ptr<Logger> m_logger;
};
restinio::request_handling_status_t
DhtProxyServer::serverError(restinio::request_t& request) {
auto response = initHttpResponse(request.create_response(restinio::status_internal_server_error()));
response.set_body(RESP_MSG_INTERNAL_SERVER_ERRROR);
return response.done();
}
// connection listener
class DhtProxyServer::ConnectionListener
{
public:
ConnectionListener() {};
ConnectionListener(std::function<void(restinio::connection_id_t)> onClosed) : onClosed_(std::move(onClosed)) {};
~ConnectionListener() {};
/**
* Connection state change used to handle Listeners disconnects.
* RESTinio >= 0.5.1 https://github.com/Stiffstream/restinio/issues/28
*/
void state_changed(const restinio::connection_state::notice_t& notice) noexcept;
private:
std::function<void(restinio::connection_id_t)> onClosed_;
};
void
DhtProxyServer::ConnectionListener::state_changed(const restinio::connection_state::notice_t& notice) noexcept
{
if (restinio::holds_alternative<restinio::connection_state::closed_t>(notice.cause())) {
onClosed_(notice.connection_id());
}
}
void
DhtProxyServer::onConnectionClosed(restinio::connection_id_t id)
{
std::lock_guard<std::mutex> lock(lockListener_);
auto it = listeners_.find(id);
if (it != listeners_.end()) {
dht_->cancelListen(it->second.hash, std::move(it->second.token));
listeners_.erase(it);
if (logger_)
logger_->d("[proxy:server] [connection:%li] listener cancelled, %li still connected", id, listeners_.size());
}
}
struct DhtProxyServer::RestRouterTraitsTls : public restinio::default_tls_traits_t
{
using timer_manager_t = restinio::asio_timer_manager_t;
#ifdef OPENDHT_PROXY_HTTP_PARSER_FORK
using http_methods_mapper_t = restinio::custom_http_methods_t;
#endif
using logger_t = opendht_logger_t;
using request_handler_t = RestRouter;
using connection_state_listener_t = ConnectionListener;
};
struct DhtProxyServer::RestRouterTraits : public restinio::default_traits_t
{
using timer_manager_t = restinio::asio_timer_manager_t;
#ifdef OPENDHT_PROXY_HTTP_PARSER_FORK
using http_methods_mapper_t = restinio::custom_http_methods_t;
#endif
using logger_t = opendht_logger_t;
using request_handler_t = RestRouter;
using connection_state_listener_t = ConnectionListener;
};
void
DhtProxyServer::PermanentPut::msgpack_unpack(const msgpack::object& o)
{
if (auto cid = findMapValue(o, "cid")) {
clientId = cid->as<std::string>();
}
if (auto exp = findMapValue(o, "exp")) {
expiration = from_time_t(exp->as<time_t>());
}
if (auto token = findMapValue(o, "token")) {
pushToken = token->as<std::string>();
}
if (auto sid = findMapValue(o, "sid")) {
if (not sessionCtx)
sessionCtx = std::make_shared<PushSessionContext>(sid->as<std::string>());
else
sessionCtx->sessionId = sid->as<std::string>();
}
if (auto t = findMapValue(o, "t")) {
type = t->as<PushType>();
}
if (auto val = findMapValue(o, "value")) {
value = std::make_shared<dht::Value>(*val);
}
}
#ifdef OPENDHT_PUSH_NOTIFICATIONS
void
DhtProxyServer::Listener::msgpack_unpack(const msgpack::object& o)
{
if (auto cid = findMapValue(o, "cid")) {
clientId = cid->as<std::string>();
}
if (auto exp = findMapValue(o, "exp")) {
expiration = from_time_t(exp->as<time_t>());
}
if (auto sid = findMapValue(o, "sid")) {
if (not sessionCtx)
sessionCtx = std::make_shared<PushSessionContext>(sid->as<std::string>());
else
sessionCtx->sessionId = sid->as<std::string>();
}
if (auto t = findMapValue(o, "t")) {
type = t->as<PushType>();
}
}
#endif
DhtProxyServer::DhtProxyServer(const std::shared_ptr<DhtRunner>& dht,
const ProxyServerConfig& config,
const std::shared_ptr<dht::Logger>& logger
)
: ioContext_(std::make_shared<asio::io_context>()),
dht_(dht), persistPath_(config.persistStatePath), logger_(logger),
printStatsTimer_(std::make_unique<asio::steady_timer>(*ioContext_, 3s)),
connListener_(std::make_shared<ConnectionListener>(std::bind(&DhtProxyServer::onConnectionClosed, this, std::placeholders::_1))),
pushServer_(config.pushServer)
{
if (not dht_)
throw std::invalid_argument("A DHT instance must be provided");
if (logger_)
logger_->d("[proxy:server] [init] running on %i", config.port);
if (not pushServer_.empty()){
#ifdef OPENDHT_PUSH_NOTIFICATIONS
if (logger_)
logger_->d("[proxy:server] [init] using push server %s", pushServer_.c_str());
#else
if (logger_)
logger_->e("[proxy:server] [init] opendht built without push notification support");
#endif
}
jsonBuilder_["commentStyle"] = "None";
jsonBuilder_["indentation"] = "";
if (!pushServer_.empty()){
// no host delim, assume port only
if (pushServer_.find(":") == std::string::npos)
pushServer_ = "localhost:" + pushServer_;
// define http request destination for push notifications
pushHostPort_ = splitPort(pushServer_);
if (logger_)
logger_->d("Using push server for notifications: %s:%s", pushHostPort_.first.c_str(),
pushHostPort_.second.c_str());
}
if (config.identity.first and config.identity.second) {
asio::error_code ec;
// define tls context
asio::ssl::context tls_context { asio::ssl::context::sslv23 };
tls_context.set_options(asio::ssl::context::default_workarounds
| asio::ssl::context::no_sslv2
| asio::ssl::context::single_dh_use, ec);
if (ec)
throw std::runtime_error("Error setting tls context options: " + ec.message());
// add more security options
#ifdef SSL_OP_NO_RENEGOTIATION
SSL_CTX_set_options(tls_context.native_handle(), SSL_OP_NO_RENEGOTIATION); // CVE-2009-3555
#endif
// node private key
auto key = config.identity.first->serialize();
tls_context.use_private_key(asio::const_buffer{key.data(), key.size()},
asio::ssl::context::file_format::pem, ec);
if (ec)
throw std::runtime_error("Error setting node's private key: " + ec.message());
// certificate chain
auto certchain = config.identity.second->toString(true/*chain*/);
tls_context.use_certificate_chain(asio::const_buffer{certchain.data(), certchain.size()}, ec);
if (ec)
throw std::runtime_error("Error setting certificate chain: " + ec.message());
if (logger_)
logger_->d("[proxy:server] using certificate chain for ssl:\n%s", certchain.c_str());
// build http server
auto settings = restinio::run_on_this_thread_settings_t<RestRouterTraitsTls>();
addServerSettings(settings);
settings.port(config.port);
settings.tls_context(std::move(tls_context));
httpsServer_ = std::make_unique<restinio::http_server_t<RestRouterTraitsTls>>(
ioContext_,
std::forward<restinio::run_on_this_thread_settings_t<RestRouterTraitsTls>>(std::move(settings))
);
// run http server
serverThread_ = std::thread([this]{
httpsServer_->open_async([]{/*ok*/}, [](std::exception_ptr ex){
std::rethrow_exception(ex);
});
httpsServer_->io_context().run();
});
}
else {
auto settings = restinio::run_on_this_thread_settings_t<RestRouterTraits>();
addServerSettings(settings);
settings.port(config.port);
httpServer_ = std::make_unique<restinio::http_server_t<RestRouterTraits>>(
ioContext_,
std::forward<restinio::run_on_this_thread_settings_t<RestRouterTraits>>(std::move(settings))
);
// run http server
serverThread_ = std::thread([this](){
httpServer_->open_async([]{/*ok*/}, [](std::exception_ptr ex){
std::rethrow_exception(ex);
});
httpServer_->io_context().run();
});
}
dht->forwardAllMessages(true);
updateStats();
printStatsTimer_->async_wait(std::bind(&DhtProxyServer::handlePrintStats, this, std::placeholders::_1));
if (not persistPath_.empty()) {
try {
std::ifstream stateFile(persistPath_, std::ios::binary | std::ios::ate);
if (stateFile) {
std::streamsize size = stateFile.tellg();
stateFile.seekg(0, std::ios::beg);
if (logger_)
logger_->d("Loading proxy state from %.*s (%td bytes)", (int)persistPath_.size(), persistPath_.c_str(), size);
loadState(stateFile, size);
}
} catch (const std::exception& e) {
if (logger_)
logger_->e("Error loading state from file: %s", e.what());
}
}
}
template <typename Os>
void
DhtProxyServer::saveState(Os& stream) {
msgpack::packer<Os> pk(&stream);
pk.pack_map(2);
{
std::lock_guard<std::mutex> lock(lockSearchPuts_);
pk.pack("puts");
pk.pack(puts_);
}
{
std::lock_guard<std::mutex> lock(lockListener_);
pk.pack("pushListeners");
pk.pack(pushListeners_);
}
}
template <typename Is>
void
DhtProxyServer::loadState(Is& is, size_t size) {
msgpack::unpacker pac;
pac.reserve_buffer(size);
if (is.read(pac.buffer(), size)) {
pac.buffer_consumed(size);
msgpack::object_handle oh;
while (pac.next(oh)) {
if (oh.get().type != msgpack::type::MAP)
continue;
if (auto puts = findMapValue(oh.get(), "puts")) {
std::lock_guard<std::mutex> lock(lockSearchPuts_);
puts_ = puts->as<decltype(puts_)>();
if (logger_)
logger_->d("Loading %zu persistent puts", puts_.size());
for (auto& put : puts_) {
for (auto& pput : put.second.puts) {
pput.second.expireTimer = std::make_unique<asio::steady_timer>(io_context(), pput.second.expiration);
pput.second.expireTimer->async_wait(std::bind(&DhtProxyServer::handleCancelPermamentPut, this,
std::placeholders::_1, put.first, pput.first));
#ifdef OPENDHT_PUSH_NOTIFICATIONS
if (not pput.second.pushToken.empty()) {
auto jsonProvider = [infoHash=put.first.toString(), clientId=pput.second.clientId, vid = pput.first, sessionCtx = pput.second.sessionCtx](){
Json::Value json;
json["timeout"] = infoHash;
json["to"] = clientId;
json["vid"] = std::to_string(vid);
if (sessionCtx) {
std::lock_guard<std::mutex> l(sessionCtx->lock);
json["s"] = sessionCtx->sessionId;
}
return json;
};
pput.second.expireNotifyTimer = std::make_unique<asio::steady_timer>(io_context(), pput.second.expiration - proxy::OP_MARGIN);
pput.second.expireNotifyTimer->async_wait(std::bind(
&DhtProxyServer::handleNotifyPushListenExpire, this,
std::placeholders::_1, pput.second.pushToken, std::move(jsonProvider), pput.second.type));
}
#endif
dht_->put(put.first, pput.second.value, DoneCallbackSimple{}, time_point::max(), true);
}
}
} else {
if (logger_)
logger_->d("No persistent puts in state");
}
#ifdef OPENDHT_PUSH_NOTIFICATIONS
if (auto listeners = findMapValue(oh.get(), "pushListeners")) {
std::lock_guard<std::mutex> lock(lockListener_);
pushListeners_ = listeners->as<decltype(pushListeners_)>();
if (logger_)
logger_->d("Loading %zu push listeners", pushListeners_.size());
for (auto& pushListener : pushListeners_) {
for (auto& listeners : pushListener.second.listeners) {
for (auto& listener : listeners.second) {
listener.internalToken = dht_->listen(listeners.first,
[this, infoHash=listeners.first, pushToken=pushListener.first, type=listener.type, clientId=listener.clientId, sessionCtx = listener.sessionCtx]
(const std::vector<std::shared_ptr<Value>>& values, bool expired) {
// Build message content
Json::Value json;
json["key"] = infoHash.toString();
json["to"] = clientId;
json["t"] = Json::Value::Int64(std::chrono::duration_cast<std::chrono::milliseconds>(system_clock::now().time_since_epoch()).count());
{
std::lock_guard<std::mutex> l(sessionCtx->lock);
json["s"] = sessionCtx->sessionId;
}
if (expired and values.size() < 2){
std::stringstream ss;
for(size_t i = 0; i < values.size(); ++i){
if(i != 0) ss << ",";
ss << values[i]->id;
}
json["exp"] = ss.str();
}
auto maxPrio = 1000u;
for (const auto& v : values)
maxPrio = std::min(maxPrio, v->priority);
sendPushNotification(pushToken, std::move(json), type, !expired and maxPrio == 0);
return true;
}
);
// expire notify
listener.expireNotifyTimer = std::make_unique<asio::steady_timer>(io_context(), listener.expiration - proxy::OP_MARGIN);
auto jsonProvider = [infoHash = listeners.first.toString(), clientId = listener.clientId, sessionCtx = listener.sessionCtx](){
Json::Value json;
json["timeout"] = infoHash;
json["to"] = clientId;
std::lock_guard<std::mutex> l(sessionCtx->lock);
json["s"] = sessionCtx->sessionId;
return json;
};
listener.expireNotifyTimer->async_wait(std::bind(&DhtProxyServer::handleNotifyPushListenExpire, this,
std::placeholders::_1, pushListener.first, std::move(jsonProvider), listener.type));
// cancel push listen
listener.expireTimer = std::make_unique<asio::steady_timer>(io_context(), listener.expiration);
listener.expireTimer->async_wait(std::bind(&DhtProxyServer::handleCancelPushListen, this,
std::placeholders::_1, pushListener.first, listeners.first, listener.clientId));
}
}
}
} else {
if (logger_)
logger_->d("No push listeners in state");
}
#endif
}
if (logger_)
logger_->d("loading ended");
}
}
asio::io_context&
DhtProxyServer::io_context() const
{
return *ioContext_;
}
DhtProxyServer::~DhtProxyServer()
{
if (not persistPath_.empty()) {
if (logger_)
logger_->d("Saving proxy state to %.*s", (int)persistPath_.size(), persistPath_.c_str());
std::ofstream stateFile(persistPath_, std::ios::binary);
saveState(stateFile);
}
if (dht_) {
std::lock_guard<std::mutex> lock(lockListener_);
for (auto& l : listeners_) {
dht_->cancelListen(l.second.hash, std::move(l.second.token));
if (l.second.response)
l.second.response->done();
}
#ifdef OPENDHT_PUSH_NOTIFICATIONS
for (auto& lm: pushListeners_) {
for (auto& ls: lm.second.listeners)
for (auto& l : ls.second) {
if (l.expireNotifyTimer)
l.expireNotifyTimer->cancel();
if (l.expireTimer)
l.expireTimer->cancel();
dht_->cancelListen(ls.first, std::move(l.internalToken));
}
}
pushListeners_.clear();
#endif
}
if (logger_)
logger_->d("[proxy:server] closing http server");
ioContext_->stop();
if (serverThread_.joinable())
serverThread_.join();
if (logger_)
logger_->d("[proxy:server] http server closed");
}
template< typename ServerSettings >
void
DhtProxyServer::addServerSettings(ServerSettings& settings, const unsigned int max_pipelined_requests)
{
using namespace std::chrono;
/**
* If max_pipelined_requests is greater than 1 then RESTinio will continue
* to read from the socket after parsing the first request.
* In that case, RESTinio can detect the disconnection
* and calls state listener as expected.
* https://github.com/Stiffstream/restinio/issues/28
*/
settings.max_pipelined_requests(max_pipelined_requests);
// one less to detect the listener disconnect
settings.concurrent_accepts_count(max_pipelined_requests - 1);
settings.separate_accept_and_create_connect(true);
settings.logger(logger_);
settings.protocol(restinio::asio_ns::ip::tcp::v6());
settings.request_handler(createRestRouter());
// time limits // ~ 0.8 month
std::chrono::milliseconds timeout_request(std::numeric_limits<int>::max());
settings.read_next_http_message_timelimit(timeout_request);
settings.write_http_response_timelimit(60s);
settings.handle_request_timeout(timeout_request);
// socket options
settings.socket_options_setter([](auto & options){
options.set_option(asio::ip::tcp::no_delay{true});
options.set_option(asio::socket_base::keep_alive{true});
});
settings.connection_state_listener(connListener_);
}
std::shared_ptr<DhtProxyServer::ServerStats>
DhtProxyServer::updateStats(std::shared_ptr<NodeInfo> info) const
{
auto now = clock::now();
auto last = lastStatsReset_.exchange(now);
auto count = requestNum_.exchange(0);
auto dt = std::chrono::duration<double>(now - last);
auto sstats = std::make_shared<ServerStats>();
auto& stats = *sstats;
stats.requestRate = count / dt.count();
#ifdef OPENDHT_PUSH_NOTIFICATIONS
stats.pushListenersCount = pushListeners_.size();
#endif
stats.totalPermanentPuts = 0;
std::for_each(puts_.begin(), puts_.end(), [&stats](const auto& put) {
stats.totalPermanentPuts += put.second.puts.size();
});
stats.putCount = puts_.size();
stats.listenCount = listeners_.size();
stats.nodeInfo = std::move(info);
return sstats;
}
void
DhtProxyServer::updateStats() {
dht_->getNodeInfo([this](std::shared_ptr<NodeInfo> newInfo){
stats_ = updateStats(newInfo);
nodeInfo_ = newInfo;
if (logger_) {
auto str = Json::writeString(jsonBuilder_, newInfo->toJson());
logger_->d("[proxy:server] [stats] %s", str.c_str());
}
});
}
void
DhtProxyServer::handlePrintStats(const asio::error_code &ec)
{
if (ec == asio::error::operation_aborted)
return;
updateStats();
printStatsTimer_->expires_at(printStatsTimer_->expiry() + PRINT_STATS_PERIOD);
printStatsTimer_->async_wait(std::bind(&DhtProxyServer::handlePrintStats, this, std::placeholders::_1));
}
template <typename HttpResponse>
HttpResponse DhtProxyServer::initHttpResponse(HttpResponse response)
{
response.append_header("Server", "RESTinio");
response.append_header(restinio::http_field::content_type, "application/json");
response.append_header(restinio::http_field::access_control_allow_origin, "*");
return response;
}
std::unique_ptr<RestRouter>
DhtProxyServer::createRestRouter()
{
using namespace std::placeholders;
auto router = std::make_unique<RestRouter>();
// **************************** LEGACY ROUTES ****************************
// node.info
router->http_get("/", std::bind(&DhtProxyServer::getNodeInfo, this, _1, _2));
#ifdef OPENDHT_PROXY_HTTP_PARSER_FORK
// node.stats
router->add_handler(restinio::custom_http_methods_t::from_nodejs(restinio::method_stats.raw_id()),
"/", std::bind(&DhtProxyServer::getStats, this, _1, _2));
#endif
// key.options
router->add_handler(restinio::http_method_options(),
"/:hash", std::bind(&DhtProxyServer::options, this, _1, _2));
// key.get
router->http_get("/:hash", std::bind(&DhtProxyServer::get, this, _1, _2));
// key.post
router->http_post("/:hash", std::bind(&DhtProxyServer::put, this, _1, _2));
#ifdef OPENDHT_PROXY_HTTP_PARSER_FORK
// key.listen
router->add_handler(restinio::custom_http_methods_t::from_nodejs(restinio::method_listen.raw_id()),
"/:hash", std::bind(&DhtProxyServer::listen, this, _1, _2));
#endif
#ifdef OPENDHT_PUSH_NOTIFICATIONS
// key.subscribe
router->add_handler(restinio::http_method_subscribe(),
"/:hash", std::bind(&DhtProxyServer::subscribe, this, _1, _2));
// key.unsubscribe
router->add_handler(restinio::http_method_unsubscribe(),
"/:hash", std::bind(&DhtProxyServer::unsubscribe, this, _1, _2));
#endif //OPENDHT_PUSH_NOTIFICATIONS
#ifdef OPENDHT_PROXY_SERVER_IDENTITY
#ifdef OPENDHT_PROXY_HTTP_PARSER_FORK
// key.sign
router->add_handler(restinio::custom_http_methods_t::from_nodejs(restinio::method_sign.raw_id()),
"/:hash", std::bind(&DhtProxyServer::putSigned, this, _1, _2));
// key.encrypt
router->add_handler(restinio::custom_http_methods_t::from_nodejs(restinio::method_encrypt.raw_id()),
"/:hash", std::bind(&DhtProxyServer::putEncrypted, this, _1, _2));
#endif
#endif // OPENDHT_PROXY_SERVER_IDENTITY
// **************************** NEW ROUTES ****************************
// node.info
router->http_get("/node/info", std::bind(&DhtProxyServer::getNodeInfo, this, _1, _2));
// node.stats
router->http_get("/node/stats", std::bind(&DhtProxyServer::getStats, this, _1, _2));
// key.options
router->http_get("/key/:hash/options", std::bind(&DhtProxyServer::options, this, _1, _2));
// key.get
router->http_get("/key/:hash", std::bind(&DhtProxyServer::get, this, _1, _2));
// key.post
router->http_post("/key/:hash", std::bind(&DhtProxyServer::put, this, _1, _2));
// key.listen
router->http_get("/key/:hash/listen", std::bind(&DhtProxyServer::listen, this, _1, _2));
#ifdef OPENDHT_PUSH_NOTIFICATIONS
// key.subscribe
router->add_handler(restinio::http_method_subscribe(),
"/key/:hash", std::bind(&DhtProxyServer::subscribe, this, _1, _2));
// key.unsubscribe
router->add_handler(restinio::http_method_unsubscribe(),
"/key/:hash", std::bind(&DhtProxyServer::unsubscribe, this, _1, _2));
#endif //OPENDHT_PUSH_NOTIFICATIONS
#ifdef OPENDHT_PROXY_SERVER_IDENTITY
// key.sign
router->http_post("/key/:hash/sign", std::bind(&DhtProxyServer::putSigned, this, _1, _2));
// key.encrypt
router->http_post("/key/:hash/encrypt", std::bind(&DhtProxyServer::putEncrypted, this, _1, _2));
#endif // OPENDHT_PROXY_SERVER_IDENTITY
return router;
}
RequestStatus
DhtProxyServer::getNodeInfo(restinio::request_handle_t request,
restinio::router::route_params_t /*params*/) const
{
try {
if (auto nodeInfo = nodeInfo_) {
auto result = nodeInfo->toJson();
// [ipv6:ipv4]:port or ipv4:port
result["public_ip"] = request->remote_endpoint().address().to_string();
auto response = initHttpResponse(request->create_response());
response.append_body(Json::writeString(jsonBuilder_, result) + "\n");
return response.done();
}
auto response = initHttpResponse(request->create_response(restinio::status_service_unavailable()));
response.set_body(RESP_MSG_SERVICE_UNAVAILABLE);
return response.done();
} catch (...) {
return serverError(*request);
}
}
RequestStatus
DhtProxyServer::getStats(restinio::request_handle_t request,
restinio::router::route_params_t /*params*/)
{
requestNum_++;
try {
if (auto stats = stats_) {
auto response = initHttpResponse(request->create_response());
response.append_body(Json::writeString(jsonBuilder_, stats->toJson()) + "\n");
return response.done();
} else {
auto response = initHttpResponse(request->create_response(restinio::status_service_unavailable()));
response.set_body(RESP_MSG_SERVICE_UNAVAILABLE);
return response.done();
}
} catch (...){
return serverError(*request);
}
}
RequestStatus
DhtProxyServer::get(restinio::request_handle_t request,
restinio::router::route_params_t params)
{
requestNum_++;
try {
InfoHash infoHash(params["hash"].to_string());
if (!infoHash)
infoHash = InfoHash::get(params["hash"].to_string());
auto response = std::make_shared<ResponseByPartsBuilder>(
initHttpResponse(request->create_response<ResponseByParts>()));
response->flush();
dht_->get(infoHash, [this, response](const std::vector<Sp<Value>>& values) {
std::stringstream output;
for (const auto& value : values) {
output << Json::writeString(jsonBuilder_, value->toJson()) << "\n";
}
response->append_chunk(output.str());
response->flush();
return true;
},
[response] (bool /*ok*/){
response->done();
});
return restinio::request_handling_status_t::accepted;
} catch (const std::exception& e){
return serverError(*request);
}
}
RequestStatus
DhtProxyServer::listen(restinio::request_handle_t request,
restinio::router::route_params_t params)
{
requestNum_++;
try {
InfoHash infoHash(params["hash"].to_string());
if (!infoHash)
infoHash = InfoHash::get(params["hash"].to_string());
auto response = std::make_shared<ResponseByPartsBuilder>(
initHttpResponse(request->create_response<ResponseByParts>()));
response->flush();
std::lock_guard<std::mutex> lock(lockListener_);
// save the listener to handle a disconnect
auto &session = listeners_[request->connection_id()];
session.hash = infoHash;
session.response = response;
session.token = dht_->listen(infoHash, [this, response]
(const std::vector<Sp<Value>>& values, bool expired){
for (const auto& value: values){
auto jsonVal = value->toJson();
if (expired)
jsonVal["expired"] = true;
response->append_chunk(Json::writeString(jsonBuilder_, jsonVal) + "\n");
}
response->flush();
return true;
});
return restinio::request_handling_status_t::accepted;
} catch (const std::exception& e){
return serverError(*request);
}
}
#ifdef OPENDHT_PUSH_NOTIFICATIONS
RequestStatus
DhtProxyServer::subscribe(restinio::request_handle_t request,
restinio::router::route_params_t params)
{
requestNum_++;
try {
InfoHash infoHash(params["hash"].to_string());
if (!infoHash)
infoHash = InfoHash::get(params["hash"].to_string());
std::string err;
Json::Value r;
auto* char_data = reinterpret_cast<const char*>(request->body().data());
auto reader = std::unique_ptr<Json::CharReader>(jsonReaderBuilder_.newCharReader());
if (!reader->parse(char_data, char_data + request->body().size(), &r, &err)){
auto response = initHttpResponse(request->create_response(restinio::status_bad_request()));
response.set_body(RESP_MSG_JSON_INCORRECT);
return response.done();
}
const Json::Value& root(r); // parse using const Json so [] never creates element
auto pushToken = root["key"].asString();
if (pushToken.empty()){
auto response = initHttpResponse(request->create_response(restinio::status_bad_request()));
response.set_body(RESP_MSG_NO_TOKEN);
return response.done();
}
auto type = root["platform"].asString() == "android" ? PushType::Android : PushType::iOS;
auto clientId = root["client_id"].asString();
auto sessionId = root["session_id"].asString();
if (logger_)
logger_->d("[proxy:server] [subscribe %s] [client %s] [session %s]", infoHash.toString().c_str(), clientId.c_str(), sessionId.c_str());
// Insert new or return existing push listeners of a token
std::lock_guard<std::mutex> lock(lockPushListeners_);
auto& pushListener = pushListeners_[pushToken];
auto& pushListeners = pushListener.listeners[infoHash];
auto listIt = std::find_if(pushListeners.begin(), pushListeners.end(), [&](const Listener& l) {
return l.clientId == clientId;
});
bool newListener = listIt == pushListeners.end();
if (newListener) {
pushListeners.emplace_back(Listener{});
listIt = std::prev(pushListeners.end());
listIt->clientId = clientId;
listIt->sessionCtx = std::make_shared<PushSessionContext>(sessionId);
} else {
std::lock_guard<std::mutex> l(listIt->sessionCtx->lock);
listIt->sessionCtx->sessionId = sessionId;
}
auto& listener = *listIt;
// Expiration
auto timeout = std::chrono::steady_clock::now() + proxy::OP_TIMEOUT;
listener.expiration = timeout;
listener.type = type;
if (listener.expireNotifyTimer)
listener.expireNotifyTimer->expires_at(timeout - proxy::OP_MARGIN);
else
listener.expireNotifyTimer = std::make_unique<asio::steady_timer>(io_context(), timeout - proxy::OP_MARGIN);
auto jsonProvider = [h=infoHash.toString(), clientId, sessionCtx = listener.sessionCtx](){
Json::Value json;
json["timeout"] = h;
json["to"] = clientId;
std::lock_guard<std::mutex> l(sessionCtx->lock);
json["s"] = sessionCtx->sessionId;
return json;
};
listener.expireNotifyTimer->async_wait(std::bind(&DhtProxyServer::handleNotifyPushListenExpire, this,
std::placeholders::_1, pushToken, std::move(jsonProvider), listener.type));
if (!listener.expireTimer)
listener.expireTimer = std::make_unique<asio::steady_timer>(io_context(), timeout);
else
listener.expireTimer->expires_at(timeout);
listener.expireTimer->async_wait(std::bind(&DhtProxyServer::handleCancelPushListen, this,
std::placeholders::_1, pushToken, infoHash, clientId));
// Send response
if (not newListener) {
if (logger_)
logger_->d("[proxy:server] [subscribe] found [client %s]", listener.clientId.c_str());
// Send response header
auto response = std::make_shared<ResponseByPartsBuilder>(initHttpResponse(request->create_response<ResponseByParts>()));
response->flush();
if (!root["refresh"].asBool()) {
// No Refresh
dht_->get(infoHash, [this, response](const Sp<Value>& value){
auto output = Json::writeString(jsonBuilder_, value->toJson()) + "\n";
response->append_chunk(output);
response->flush();
return true;
},
[response] (bool){
response->done();
});
} else {
// Refresh
response->append_chunk("{}\n");
return response->done();
}
} else {
// =========== No existing listener for an infoHash ============
// Add listen on dht
listener.internalToken = dht_->listen(infoHash,
[this, infoHash, pushToken, type, clientId, sessionCtx = listener.sessionCtx]
(const std::vector<std::shared_ptr<Value>>& values, bool expired){
// Build message content
Json::Value json;
json["key"] = infoHash.toString();
json["to"] = clientId;
json["t"] = Json::Value::Int64(std::chrono::duration_cast<std::chrono::milliseconds>(system_clock::now().time_since_epoch()).count());
{
std::lock_guard<std::mutex> l(sessionCtx->lock);
json["s"] = sessionCtx->sessionId;
}
if (expired and values.size() < 2){
std::stringstream ss;
for(size_t i = 0; i < values.size(); ++i){
if(i != 0) ss << ",";
ss << values[i]->id;
}
json["exp"] = ss.str();
}
auto maxPrio = 1000u;
for (const auto& v : values)
maxPrio = std::min(maxPrio, v->priority);
sendPushNotification(pushToken, std::move(json), type, !expired and maxPrio == 0);
return true;
}
);
auto response = initHttpResponse(request->create_response());
response.set_body("{}\n");
return response.done();
}
}
catch (...) {
return serverError(*request);
}
return restinio::request_handling_status_t::accepted;
}
RequestStatus
DhtProxyServer::unsubscribe(restinio::request_handle_t request,
restinio::router::route_params_t params)
{
requestNum_++;
InfoHash infoHash(params["hash"].to_string());
if (!infoHash)
infoHash = InfoHash::get(params["hash"].to_string());
if (logger_)
logger_->d("[proxy:server] [unsubscribe %s]", infoHash.toString().c_str());
try {
std::string err;
Json::Value root;
auto* char_data = reinterpret_cast<const char*>(request->body().data());
auto reader = std::unique_ptr<Json::CharReader>(jsonReaderBuilder_.newCharReader());
if (!reader->parse(char_data, char_data + request->body().size(), &root, &err)){
auto response = initHttpResponse(
request->create_response(restinio::status_bad_request()));
response.set_body(RESP_MSG_JSON_INCORRECT);
return response.done();
}
auto pushToken = root["key"].asString();
if (pushToken.empty())
return restinio::request_handling_status_t::rejected;
auto clientId = root["client_id"].asString();
handleCancelPushListen(asio::error_code() /*success*/, pushToken, infoHash, clientId);
auto response = initHttpResponse(request->create_response());
return response.done();
}
catch (...) {
return serverError(*request);
}
}
void
DhtProxyServer::handleNotifyPushListenExpire(const asio::error_code &ec, const std::string pushToken,
std::function<Json::Value()> jsonProvider, PushType type)
{
if (ec == asio::error::operation_aborted)
return;
else if (ec) {
if (logger_)
logger_->e("[proxy:server] [subscribe] error sending put refresh: %s", ec.message().c_str());
}
if (logger_)
logger_->d("[proxy:server] [subscribe] sending put refresh to %s token", pushToken.c_str());
sendPushNotification(pushToken, jsonProvider(), type, false);
}
void
DhtProxyServer::handleCancelPushListen(const asio::error_code &ec, const std::string pushToken,
const InfoHash key, const std::string clientId)
{
if (ec == asio::error::operation_aborted)
return;
else if (ec){
if (logger_)
logger_->e("[proxy:server] [listen:push %s] error cancel: %s",
key.toString().c_str(), ec.message().c_str());
}
if (logger_)
logger_->d("[proxy:server] [listen:push %s] cancelled for %s",
key.toString().c_str(), clientId.c_str());
std::lock_guard<std::mutex> lock(lockListener_);
auto pushListener = pushListeners_.find(pushToken);
if (pushListener == pushListeners_.end())
return;
auto listeners = pushListener->second.listeners.find(key);
if (listeners == pushListener->second.listeners.end())
return;
for (auto listener = listeners->second.begin(); listener != listeners->second.end();){
if (listener->clientId == clientId){
if (dht_)
dht_->cancelListen(key, std::move(listener->internalToken));
listener = listeners->second.erase(listener);
} else {
++listener;
}
}
if (listeners->second.empty())
pushListener->second.listeners.erase(listeners);
if (pushListener->second.listeners.empty())
pushListeners_.erase(pushListener);
}
void
DhtProxyServer::sendPushNotification(const std::string& token, Json::Value&& json, PushType type, bool highPriority)
{
if (pushServer_.empty())
return;
unsigned reqid = 0;
try {
auto request = std::make_shared<http::Request>(io_context(), pushHostPort_.first, pushHostPort_.second,
httpsServer_ ? true : false, logger_);
reqid = request->id();
request->set_target("/api/push");
request->set_method(restinio::http_method_post());
request->set_header_field(restinio::http_field_t::host, pushServer_.c_str());
request->set_header_field(restinio::http_field_t::user_agent, "RESTinio client");
request->set_header_field(restinio::http_field_t::accept, "*/*");
request->set_header_field(restinio::http_field_t::content_type, "application/json");
// NOTE: see https://github.com/appleboy/gorush
Json::Value notification(Json::objectValue);
Json::Value tokens(Json::arrayValue);
tokens[0] = token;
notification["tokens"] = std::move(tokens);
notification["platform"] = type == PushType::Android ? 2 : 1;
notification["data"] = std::move(json);
notification["priority"] = highPriority ? "high" : "normal";
notification["time_to_live"] = 600;
Json::Value notifications(Json::arrayValue);
notifications[0] = notification;
Json::Value content;
content["notifications"] = std::move(notifications);
request->set_body(Json::writeString(jsonBuilder_, content));
request->add_on_state_change_callback([this, reqid]
(http::Request::State state, const http::Response& response){
if (state == http::Request::State::DONE){
if (logger_ and response.status_code != 200)
logger_->e("[proxy:server] [notification] push failed: %i", response.status_code);
std::lock_guard<std::mutex> l(requestLock_);
requests_.erase(reqid);
}
});
{
std::lock_guard<std::mutex> l(requestLock_);
requests_[reqid] = request;
}
request->send();
}
catch (const std::exception &e){
if (logger_)
logger_->e("[proxy:server] [notification] error send push: %i", e.what());
if (reqid) {
std::lock_guard<std::mutex> l(requestLock_);
requests_.erase(reqid);
}
}
}
#endif //OPENDHT_PUSH_NOTIFICATIONS
void
DhtProxyServer::handleCancelPermamentPut(const asio::error_code &ec, const InfoHash& key, Value::Id vid)
{
if (ec == asio::error::operation_aborted)
return;
else if (ec){
if (logger_)
logger_->e("[proxy:server] [put:permament] error sending put refresh: %s", ec.message().c_str());
}
if (logger_)
logger_->d("[proxy:server] [put %s] cancel permament put %i", key.toString().c_str(), vid);
std::lock_guard<std::mutex> lock(lockSearchPuts_);
auto sPuts = puts_.find(key);
if (sPuts == puts_.end())
return;
auto& sPutsMap = sPuts->second.puts;
auto put = sPutsMap.find(vid);
if (put == sPutsMap.end())
return;
if (dht_)
dht_->cancelPut(key, vid);
if (put->second.expireTimer)
put->second.expireTimer->cancel();
if (put->second.expireNotifyTimer)
put->second.expireNotifyTimer->cancel();
sPutsMap.erase(put);
if (sPutsMap.empty())
puts_.erase(sPuts);
}
RequestStatus
DhtProxyServer::put(restinio::request_handle_t request,
restinio::router::route_params_t params)
{
requestNum_++;
InfoHash infoHash(params["hash"].to_string());
if (!infoHash)
infoHash = InfoHash::get(params["hash"].to_string());
if (request->body().empty()){
auto response = initHttpResponse(request->create_response(restinio::status_bad_request()));
response.set_body(RESP_MSG_MISSING_PARAMS);
return response.done();
}
try {
std::string err;
Json::Value root;
auto* char_data = reinterpret_cast<const char*>(request->body().data());
auto reader = std::unique_ptr<Json::CharReader>(jsonReaderBuilder_.newCharReader());
if (reader->parse(char_data, char_data + request->body().size(), &root, &err)){
auto value = std::make_shared<Value>(root);
bool permanent = root.isMember("permanent");
if (logger_)
logger_->d("[proxy:server] [put %s] %s %s", infoHash.toString().c_str(),
value->toString().c_str(), (permanent ? "permanent" : ""));
if (permanent) {
std::string pushToken, clientId, sessionId, platform;
auto& pVal = root["permanent"];
if (pVal.isObject()){
pushToken = pVal["key"].asString();
clientId = pVal["client_id"].asString();
platform = pVal["platform"].asString();
sessionId = pVal["session_id"].asString();
}
std::lock_guard<std::mutex> lock(lockSearchPuts_);
auto timeout = std::chrono::steady_clock::now() + proxy::OP_TIMEOUT;
auto& sPuts = puts_[infoHash];
if (value->id == Value::INVALID_ID) {
for (auto& pp : sPuts.puts) {
if (pp.second.pushToken == pushToken
and pp.second.clientId == clientId
and pp.second.value->contentEquals(*value))
{
pp.second.expireTimer->expires_at(timeout);
pp.second.expireTimer->async_wait(std::bind(&DhtProxyServer::handleCancelPermamentPut, this,
std::placeholders::_1, infoHash, pp.second.value->id));
if (not sessionId.empty()) {
if (not pp.second.sessionCtx)
pp.second.sessionCtx = std::make_shared<PushSessionContext>(sessionId);
else {
std::lock_guard<std::mutex> l(pp.second.sessionCtx->lock);
pp.second.sessionCtx->sessionId = sessionId;
}
}
auto response = initHttpResponse(request->create_response());
response.append_body(Json::writeString(jsonBuilder_, value->toJson()) + "\n");
return response.done();
}
}
value->id = std::uniform_int_distribution<Value::Id>{1}(rd);
}
auto vid = value->id;
auto& pput = sPuts.puts[vid];
pput.value = value;
pput.expiration = timeout;
if (not pput.expireTimer) {
auto &ctx = io_context();
// cancel permanent put
pput.expireTimer = std::make_unique<asio::steady_timer>(ctx, timeout);
#ifdef OPENDHT_PUSH_NOTIFICATIONS
if (not pushToken.empty()){
bool isAndroid = platform == "android";
pput.pushToken = pushToken;
pput.clientId = clientId;
pput.type = isAndroid ? PushType::Android : PushType::iOS;
pput.sessionCtx = std::make_shared<PushSessionContext>(sessionId);
// notify push listen expire
auto jsonProvider = [infoHash, clientId, vid, sessionCtx = pput.sessionCtx](){
Json::Value json;
json["timeout"] = infoHash.toString();
json["to"] = clientId;
json["vid"] = std::to_string(vid);
std::lock_guard<std::mutex> l(sessionCtx->lock);
json["s"] = sessionCtx->sessionId;
return json;
};
if (!pput.expireNotifyTimer)
pput.expireNotifyTimer = std::make_unique<asio::steady_timer>(ctx,
timeout - proxy::OP_MARGIN);
else
pput.expireNotifyTimer->expires_at(timeout - proxy::OP_MARGIN);
pput.expireNotifyTimer->async_wait(std::bind(
&DhtProxyServer::handleNotifyPushListenExpire, this,
std::placeholders::_1, pushToken, std::move(jsonProvider), pput.type));
}
#endif
} else {
if (not sessionId.empty()) {
if (not pput.sessionCtx)
pput.sessionCtx = std::make_shared<PushSessionContext>(sessionId);
else {
std::lock_guard<std::mutex> l(pput.sessionCtx->lock);
pput.sessionCtx->sessionId = sessionId;
}
}
pput.expireTimer->expires_at(timeout);
if (pput.expireNotifyTimer)
pput.expireNotifyTimer->expires_at(timeout - proxy::OP_MARGIN);
}
pput.expireTimer->async_wait(std::bind(&DhtProxyServer::handleCancelPermamentPut, this,
std::placeholders::_1, infoHash, vid));
}
dht_->put(infoHash, value, [this, request, value](bool ok){
if (ok){
auto response = initHttpResponse(request->create_response());
response.append_body(Json::writeString(jsonBuilder_, value->toJson()) + "\n");
response.done();
} else {
auto response = initHttpResponse(request->create_response(restinio::status_bad_gateway()));
response.set_body(RESP_MSG_PUT_FAILED);
response.done();
}
}, time_point::max(), permanent);
return restinio::request_handling_status_t::accepted;
} else {
auto response = initHttpResponse(request->create_response(restinio::status_bad_request()));
response.set_body(RESP_MSG_JSON_INCORRECT);
return response.done();
}
} catch (const std::exception& e){
if (logger_)
logger_->d("[proxy:server] error in put: %s", e.what());
return serverError(*request);
}
}
#ifdef OPENDHT_PROXY_SERVER_IDENTITY
RequestStatus
DhtProxyServer::putSigned(restinio::request_handle_t request,
restinio::router::route_params_t params) const
{
requestNum_++;
InfoHash infoHash(params["hash"].to_string());
if (!infoHash)
infoHash = InfoHash::get(params["hash"].to_string());
if (request->body().empty()){
auto response = initHttpResponse(request->create_response(restinio::status_bad_request()));
response.set_body(RESP_MSG_MISSING_PARAMS);
return response.done();
}
try {
std::string err;
Json::Value root;
auto* char_data = reinterpret_cast<const char*>(request->body().data());
auto reader = std::unique_ptr<Json::CharReader>(jsonReaderBuilder_.newCharReader());
if (reader->parse(char_data, char_data + request->body().size(), &root, &err)){
auto value = std::make_shared<Value>(root);
dht_->putSigned(infoHash, value, [this, request, value](bool ok){
if (ok){
auto output = Json::writeString(jsonBuilder_, value->toJson()) + "\n";
auto response = initHttpResponse(request->create_response());
response.append_body(output);
response.done();
} else {
auto response = initHttpResponse(request->create_response(restinio::status_bad_gateway()));
response.set_body(RESP_MSG_PUT_FAILED);
response.done();
}
});
return restinio::request_handling_status_t::accepted;
} else {
auto response = initHttpResponse(request->create_response(restinio::status_bad_request()));
response.set_body(RESP_MSG_JSON_INCORRECT);
return response.done();
}
} catch (const std::exception& e){
if (logger_)
logger_->d("[proxy:server] error in putSigned: %s", e.what());
return serverError(*request);
}
}
RequestStatus
DhtProxyServer::putEncrypted(restinio::request_handle_t request,
restinio::router::route_params_t params)
{
requestNum_++;
InfoHash infoHash(params["hash"].to_string());
if (!infoHash)
infoHash = InfoHash::get(params["hash"].to_string());
if (request->body().empty()){
auto response = initHttpResponse(request->create_response(restinio::status_bad_request()));
response.set_body(RESP_MSG_MISSING_PARAMS);
return response.done();
}
try {
std::string err;
Json::Value root;
auto* char_data = reinterpret_cast<const char*>(request->body().data());
auto reader = std::unique_ptr<Json::CharReader>(jsonReaderBuilder_.newCharReader());
if (reader->parse(char_data, char_data + request->body().size(), &root, &err)){
InfoHash to(root["to"].asString());
if (!to){
auto response = initHttpResponse(request->create_response(restinio::status_bad_request()));
response.set_body(RESP_MSG_DESTINATION_NOT_FOUND);
return response.done();
}
auto value = std::make_shared<Value>(root);
dht_->putEncrypted(infoHash, to, value, [this, request, value](bool ok){
if (ok){
auto response = initHttpResponse(request->create_response());
response.append_body(Json::writeString(jsonBuilder_, value->toJson()) + "\n");
response.done();
} else {
auto response = initHttpResponse(request->create_response(restinio::status_bad_gateway()));
response.set_body(RESP_MSG_PUT_FAILED);
response.done();
}
});
return restinio::request_handling_status_t::accepted;
} else {
auto response = initHttpResponse(request->create_response(restinio::status_bad_request()));
response.set_body(RESP_MSG_JSON_INCORRECT);
return response.done();
}
} catch (const std::exception& e){
if (logger_)
logger_->d("[proxy:server] error in put: %s", e.what());
return serverError(*request);
}
}
#endif // OPENDHT_PROXY_SERVER_IDENTITY
RequestStatus
DhtProxyServer::options(restinio::request_handle_t request,
restinio::router::route_params_t /*params*/)
{
requestNum_++;
#ifdef OPENDHT_PROXY_SERVER_IDENTITY
const auto methods = "OPTIONS, GET, POST, LISTEN, SIGN, ENCRYPT";
#else
const auto methods = "OPTIONS, GET, POST, LISTEN";
#endif
auto response = initHttpResponse(request->create_response());
response.append_header(restinio::http_field::access_control_allow_methods, methods);
response.append_header(restinio::http_field::access_control_allow_headers, "content-type");
response.append_header(restinio::http_field::access_control_max_age, "86400");
return response.done();
}
RequestStatus
DhtProxyServer::getFiltered(restinio::request_handle_t request,
restinio::router::route_params_t params)
{
requestNum_++;
auto value = params["value"].to_string();
InfoHash infoHash(params["hash"].to_string());
if (!infoHash)
infoHash = InfoHash::get(params["hash"].to_string());
try {
auto response = std::make_shared<ResponseByPartsBuilder>(
initHttpResponse(request->create_response<ResponseByParts>()));
response->flush();
dht_->get(infoHash,
[this, response](const Sp<Value>& value) {
response->append_chunk(Json::writeString(jsonBuilder_, value->toJson()) + "\n");
response->flush();
return true;
},
[response] (bool /*ok*/){
response->done();
},
{}, value);
return restinio::request_handling_status_t::accepted;
} catch (const std::exception& e){
return serverError(*request);
}
}
}