diff --git a/include/opendht/callbacks.h b/include/opendht/callbacks.h index 21f2dd0c0562d00e8306ffaccea394eca7c1acec..e354ba2184c9ca38f8fd2c887e8da7a6f9964e49 100644 --- a/include/opendht/callbacks.h +++ b/include/opendht/callbacks.h @@ -26,6 +26,10 @@ #include <memory> #include <functional> +#if OPENDHT_PROXY_SERVER +#include <json/json.h> +#endif //OPENDHT_PROXY_SERVER + namespace dht { struct Node; @@ -47,6 +51,12 @@ struct OPENDHT_PUBLIC NodeStats { unsigned table_depth; unsigned getKnownNodes() const { return good_nodes + dubious_nodes; } std::string toString() const; +#if OPENDHT_PROXY_SERVER + /** + * Build a json object from a NodeStats + */ + Json::Value toJson() const; +#endif //OPENDHT_PROXY_SERVER }; /** diff --git a/include/opendht/dht_proxy_server.h b/include/opendht/dht_proxy_server.h index e2b74525cee5c4f1f9511f52a253555db5d8a200..5ac64c4e5d7554515306896fc7e03ee19ebf45aa 100644 --- a/include/opendht/dht_proxy_server.h +++ b/include/opendht/dht_proxy_server.h @@ -31,7 +31,6 @@ namespace dht { class DhtRunner; -class InfoHash; /** * Describes the REST API @@ -64,7 +63,7 @@ private: * Return the PublicKey id, the node id and node stats * Method: GET "/" * Result: HTTP 200, body: Value in JSON format (one part = one value) - * On error: HTTP 404, body: {"err":"xxxx"} + * On error: HTTP 503, body: {"err":"xxxx"} * @param session */ void getNodeInfo(const std::shared_ptr<restbed::Session>& session) const; @@ -72,12 +71,11 @@ private: /** * Return Values of a InfoHash * Method: GET "/{InfoHash: .*}" - * Return: Multiple JSON object in parts. For 2 values, you will have 3 parts: - * Value in JSON format (HTTP/1.1 200 OK Content-Type: application/json) - * Value in JSON format (HTTP/1.1 200 OK Content-Type: application/json) - * {"ok": 1} + * Return: Multiple JSON object in parts. Example: + * Value in JSON format\n + * Value in JSON format * - * On error: HTTP 404, body: {"err":"xxxx"} + * On error: HTTP 503, body: {"err":"xxxx"} * @param session */ void get(const std::shared_ptr<restbed::Session>& session) const; @@ -85,11 +83,11 @@ private: /** * Listen incoming Values of a InfoHash. * Method: LISTEN "/{InfoHash: .*}" - * Return: Multiple JSON object in parts. For 2 values, you will have 2 parts: - * Value in JSON format (HTTP/1.1 200 OK Content-Type: application/json) - * Value in JSON format (HTTP/1.1 200 OK Content-Type: application/json) + * Return: Multiple JSON object in parts. Example: + * Value in JSON format\n + * Value in JSON format * - * On error: HTTP 404, body: {"err":"xxxx"} + * On error: HTTP 503, body: {"err":"xxxx"} * @param session */ void listen(const std::shared_ptr<restbed::Session>& session) const; @@ -98,8 +96,8 @@ private: * Put a value on the DHT * Method: POST "/{InfoHash: .*}" * body = Value to put in JSON - * Return: {"ok":"1"} - * On error: HTTP 404, body: {"err":"xxxx"} if no dht + * Return: HTTP 200 if success and the value put in JSON + * On error: HTTP 503, body: {"err":"xxxx"} if no dht * HTTP 400, body: {"err":"xxxx"} if bad json * @param session */ @@ -110,8 +108,8 @@ private: * Put a value to sign by the proxy on the DHT * Method: SIGN "/{InfoHash: .*}" * body = Value to put in JSON - * Return: {"ok":"1"} - * On error: HTTP 404, body: {"err":"xxxx"} if no dht + * Return: HTTP 200 if success and the value put in JSON + * On error: HTTP 503, body: {"err":"xxxx"} if no dht * HTTP 400, body: {"err":"xxxx"} if bad json * @param session */ @@ -121,8 +119,8 @@ private: * Put a value to encrypt by the proxy on the DHT * Method: ENCRYPT "/{hash: .*}" * body = Value to put in JSON + "to":"infoHash" - * Return: {"ok":"1"} - * On error: HTTP 404, body: {"err":"xxxx"} if no dht + * Return: HTTP 200 if success and the value put in JSON + * On error: HTTP 503, body: {"err":"xxxx"} if no dht * HTTP 400, body: {"err":"xxxx"} if bad json * @param session */ @@ -132,12 +130,11 @@ private: /** * Return Values of a InfoHash filtered by a value id * Method: GET "/{InfoHash: .*}/{ValueId: .*}" - * Return: Multiple JSON object in parts. For 2 values, you will have 3 parts: - * Value in JSON format (HTTP/1.1 200 OK Content-Type: application/json) - * Value in JSON format (HTTP/1.1 200 OK Content-Type: application/json) - * {"ok": 1} + * Return: Multiple JSON object in parts. Example: + * Value in JSON format\n + * Value in JSON format * - * On error: HTTP 404, body: {"err":"xxxx"} + * On error: HTTP 503, body: {"err":"xxxx"} * @param session */ void getFiltered(const std::shared_ptr<restbed::Session>& session) const; diff --git a/src/callbacks.cpp b/src/callbacks.cpp index 8908a2dff2405223e1c2715e6c2b2b1262cbbfaf..89719ef1113a98a593ae745b37646f52f401f75f 100644 --- a/src/callbacks.cpp +++ b/src/callbacks.cpp @@ -68,4 +68,24 @@ NodeStats::toString() const return ss.str(); } +#if OPENDHT_PROXY_SERVER +/** + * Build a json object from a NodeStats + */ +Json::Value +NodeStats::toJson() const +{ + Json::Value val; + val["good"] = static_cast<Json::LargestUInt>(good_nodes); + val["dubious"] = static_cast<Json::LargestUInt>(dubious_nodes); + val["incoming"] = static_cast<Json::LargestUInt>(incoming_nodes); + if (table_depth > 1) { + val["table_depth"] = static_cast<Json::LargestUInt>(table_depth); + unsigned long tot_nodes = 8 * std::exp2(table_depth); + val["network_size_estimation"] = static_cast<Json::LargestUInt>(tot_nodes); + } + return val; +} +#endif //OPENDHT_PROXY_SERVER + } diff --git a/src/dht_proxy_server.cpp b/src/dht_proxy_server.cpp index 9f70e2bf4cbc0fe0e76831a332ea45e7e1b7a6df..f813f2390c42219898f3529a42cb557e66358659 100644 --- a/src/dht_proxy_server.cpp +++ b/src/dht_proxy_server.cpp @@ -19,13 +19,12 @@ #if OPENDHT_PROXY_SERVER #include "dht_proxy_server.h" -#include <chrono> -#include <functional> - #include "default_types.h" #include "dhtrunner.h" #include "msgpack.hpp" +#include <chrono> +#include <functional> #include <json/json.h> #include <limits> @@ -96,13 +95,14 @@ DhtProxyServer::DhtProxyServer(std::shared_ptr<DhtRunner> dht, in_port_t port) // Start server auto settings = std::make_shared<restbed::Settings>(); settings->set_default_header("Content-Type", "application/json"); + settings->set_default_header("Connection", "keep-alive"); std::chrono::milliseconds timeout(std::numeric_limits<int>::max()); settings->set_connection_timeout(timeout); // there is a timeout, but really huge settings->set_port(port); try { service_->start(settings); } catch(std::system_error& e) { - // Fail silently for now. + std::cerr << "Error running server on port " << port << ": " << e.what() << std::endl; } }); @@ -150,20 +150,21 @@ DhtProxyServer::getNodeInfo(const std::shared_ptr<restbed::Session>& session) co const auto request = session->get_request(); int content_length = std::stoi(request->get_header("Content-Length", "0")); session->fetch(content_length, - [=](const std::shared_ptr<restbed::Session> s, const restbed::Bytes& b) + [=](const std::shared_ptr<restbed::Session> s, const restbed::Bytes& /*b*/) { - (void)b; if (dht_) { Json::Value result; - result["id"] = dht_->getId().toString(); + auto id = dht_->getId(); + if (id) + result["id"] = id.toString(); result["node_id"] = dht_->getNodeId().toString(); - result["ipv4"] = dht_->getNodesStats(AF_INET).toString(); - result["ipv6"] = dht_->getNodesStats(AF_INET6).toString(); + result["ipv4"] = dht_->getNodesStats(AF_INET).toJson(); + result["ipv6"] = dht_->getNodesStats(AF_INET6).toJson(); Json::FastWriter writer; s->close(restbed::OK, writer.write(result)); } else - s->close(restbed::NOT_FOUND, "{\"err\":\"Incorrect DhtRunner\"}"); + s->close(restbed::SERVICE_UNAVAILABLE, "{\"err\":\"Incorrect DhtRunner\"}"); } ); } @@ -175,35 +176,36 @@ DhtProxyServer::get(const std::shared_ptr<restbed::Session>& session) const int content_length = std::stoi(request->get_header("Content-Length", "0")); auto hash = request->get_path_parameter("hash"); session->fetch(content_length, - [=](const std::shared_ptr<restbed::Session> s, const restbed::Bytes& b) + [=](const std::shared_ptr<restbed::Session> s, const restbed::Bytes& /*b* */) { - (void)b; if (dht_) { InfoHash infoHash(hash); if (!infoHash) { infoHash = InfoHash::get(hash); } - Json::FastWriter writer; - dht_->get(infoHash, [s, &writer](std::shared_ptr<Value> value) { - // Send values as soon as we get them - Json::Value result; - s->yield(restbed::OK, writer.write(value->toJson())); - return true; - }, [s, &writer](bool ok) { - // Communication is finished - auto response = std::to_string(ok); - s->close(restbed::OK, "{\"ok\": " + response + "}"); + s->yield(restbed::OK, "", [=]( const std::shared_ptr< restbed::Session > s) { + dht_->get(infoHash, [s](std::shared_ptr<Value> value) { + // Send values as soon as we get them + Json::FastWriter writer; + s->yield(writer.write(value->toJson()), [](const std::shared_ptr<restbed::Session> /*session*/){ }); + return true; + }, [s](bool /*ok* */) { + // Communication is finished + s->close(); + }); }); } else { - s->close(restbed::NOT_FOUND, "{\"err\":\"Incorrect DhtRunner\"}"); + s->close(restbed::SERVICE_UNAVAILABLE, "{\"err\":\"Incorrect DhtRunner\"}"); } } ); + } void DhtProxyServer::listen(const std::shared_ptr<restbed::Session>& session) const { + const auto request = session->get_request(); int content_length = std::stoi(request->get_header("Content-Length", "0")); auto hash = request->get_path_parameter("hash"); @@ -211,28 +213,33 @@ DhtProxyServer::listen(const std::shared_ptr<restbed::Session>& session) const if (!infoHash) infoHash = InfoHash::get(hash); session->fetch(content_length, - [=](const std::shared_ptr<restbed::Session> s, const restbed::Bytes& b) + [=](const std::shared_ptr<restbed::Session> s, const restbed::Bytes& /*b* */) { - (void)b; if (dht_) { - s->yield(restbed::OK, ""); // Open the connection - Json::FastWriter writer; - size_t token = dht_->listen(infoHash, [s, &writer](std::shared_ptr<Value> value) { - // Send values as soon as we get them - if (!s->is_closed()) - s->yield(restbed::OK, writer.write(value->toJson())); - return !s->is_closed(); - }).get(); - // Handle client deconnection - // NOTE: for now, there is no handler, so we test the session in a thread - // will be the case in restbed 5.0 - SessionToHashToken listener; - listener.session = s; - listener.hash = infoHash; - listener.token = token; - currentListeners_.emplace_back(listener); + InfoHash infoHash(hash); + if (!infoHash) { + infoHash = InfoHash::get(hash); + } + s->yield(restbed::OK, "", [=]( const std::shared_ptr< restbed::Session > s) { + size_t token = dht_->listen(infoHash, [s](std::shared_ptr<Value> value) { + // Send values as soon as we get them + if (!s->is_closed()) { + Json::FastWriter writer; + s->yield(writer.write(value->toJson()), [](const std::shared_ptr<restbed::Session> /*session*/){ }); + } + return !s->is_closed(); + }).get(); + // Handle client deconnection + // NOTE: for now, there is no handler, so we test the session in a thread + // will be the case in restbed 5.0 + SessionToHashToken listener; + listener.session = s; + listener.hash = infoHash; + listener.token = token; + currentListeners_.emplace_back(listener); + }); } else { - s->close(restbed::NOT_FOUND, "{\"err\":\"Incorrect DhtRunner\"}"); + s->close(restbed::SERVICE_UNAVAILABLE, "{\"err\":\"Incorrect DhtRunner\"}"); } } ); @@ -267,15 +274,15 @@ DhtProxyServer::put(const std::shared_ptr<restbed::Session>& session) const // Build the Value from json auto value = std::make_shared<Value>(root); - auto response = value->toString(); + Json::FastWriter writer; dht_->put(infoHash, value); - s->close(restbed::OK, response); + s->close(restbed::OK, writer.write(value->toJson())); } else { s->close(restbed::BAD_REQUEST, "Incorrect JSON"); } } } else { - s->close(restbed::NOT_FOUND, ""); + s->close(restbed::SERVICE_UNAVAILABLE, "{\"err\":\"Incorrect DhtRunner\"}"); } } ); @@ -308,15 +315,15 @@ DhtProxyServer::putSigned(const std::shared_ptr<restbed::Session>& session) cons if (parsingSuccessful) { auto value = std::make_shared<Value>(root); - auto response = value->toString(); + Json::FastWriter writer; dht_->putSigned(infoHash, value); - s->close(restbed::OK, response); + s->close(restbed::OK, writer.write(value->toJson())); } else { s->close(restbed::BAD_REQUEST, "Incorrect JSON" + strJson); } } } else { - s->close(restbed::NOT_FOUND, ""); + s->close(restbed::SERVICE_UNAVAILABLE, "{\"err\":\"Incorrect DhtRunner\"}"); } } ); @@ -353,9 +360,9 @@ DhtProxyServer::putEncrypted(const std::shared_ptr<restbed::Session>& session) c if (toinfoHash) toInfoHash = InfoHash::get(toHash); - auto response = value->toString(); + Json::FastWriter writer; dht_->putEncrypted(infoHash, toInfoHash, value); - s->close(restbed::OK, response); + s->close(restbed::OK, writer.write(value->toJson())); } else { if(!parsingSuccessful) s->close(restbed::BAD_REQUEST, "Incorrect JSON"); @@ -364,7 +371,7 @@ DhtProxyServer::putEncrypted(const std::shared_ptr<restbed::Session>& session) c } } } else { - s->close(restbed::NOT_FOUND, ""); + s->close(restbed::SERVICE_UNAVAILABLE, "{\"err\":\"Incorrect DhtRunner\"}"); } } ); @@ -379,25 +386,26 @@ DhtProxyServer::getFiltered(const std::shared_ptr<restbed::Session>& session) co auto hash = request->get_path_parameter("hash"); auto value = request->get_path_parameter("value"); session->fetch(content_length, - [=](const std::shared_ptr<restbed::Session> s, const restbed::Bytes& b) + [=](const std::shared_ptr<restbed::Session> s, const restbed::Bytes& /*b* */) { - (void)b; if (dht_) { InfoHash infoHash(hash); if (!infoHash) { infoHash = InfoHash::get(hash); } - Json::FastWriter writer; - dht_->get(infoHash, [s, &writer](std::shared_ptr<Value> value) { - Json::Value result; - s->yield(restbed::OK, writer.write(value->toJson())); - return true; - }, [s, &writer](bool ok) { - auto response = std::to_string(ok); - s->close(restbed::OK, "{\"ok\": " + response + "}"); - }, {}, value); + s->yield(restbed::OK, "", [=]( const std::shared_ptr< restbed::Session > s) { + dht_->get(infoHash, [s](std::shared_ptr<Value> v) { + // Send values as soon as we get them + Json::FastWriter writer; + s->yield(writer.write(v->toJson()), [](const std::shared_ptr<restbed::Session> /*session*/){ }); + return true; + }, [s](bool /*ok* */) { + // Communication is finished + s->close(); + }, {}, value); + }); } else { - s->close(restbed::NOT_FOUND, "{\"err\":\"Incorrect DhtRunner\"}"); + s->close(restbed::SERVICE_UNAVAILABLE, "{\"err\":\"Incorrect DhtRunner\"}"); } } );