diff --git a/src/dht_proxy_server.cpp b/src/dht_proxy_server.cpp index bbec69ca04cb5ffe87b807e90b0a09b7652507b5..4bd9de70ab7c63f9d13b78fe90645f266d595a32 100644 --- a/src/dht_proxy_server.cpp +++ b/src/dht_proxy_server.cpp @@ -49,6 +49,8 @@ struct DhtProxyServer::SearchPuts { DhtProxyServer::DhtProxyServer(std::shared_ptr<DhtRunner> dht, in_port_t port , const std::string& pushServer) : dht_(dht) , pushServer_(pushServer) { + if (not dht_) + throw std::invalid_argument("A DHT instance must be provided"); // NOTE in c++14, use make_unique service_ = std::unique_ptr<restbed::Service>(new restbed::Service()); @@ -187,23 +189,27 @@ DhtProxyServer::getNodeInfo(const std::shared_ptr<restbed::Session>& session) co session->fetch(content_length, [=](const std::shared_ptr<restbed::Session> s, const restbed::Bytes& /*b*/) { - if (dht_) { - Json::Value result; - auto id = dht_->getId(); - if (id) - result["id"] = id.toString(); - result["node_id"] = dht_->getNodeId().toString(); - result["ipv4"] = dht_->getNodesStats(AF_INET).toJson(); - result["ipv6"] = dht_->getNodesStats(AF_INET6).toJson(); - result["public_ip"] = s->get_origin(); // [ipv6:ipv4]:port or ipv4:port - Json::StreamWriterBuilder wbuilder; - wbuilder["commentStyle"] = "None"; - wbuilder["indentation"] = ""; - auto output = Json::writeString(wbuilder, result) + "\n"; - s->close(restbed::OK, output); + try { + if (dht_) { + Json::Value result; + auto id = dht_->getId(); + if (id) + result["id"] = id.toString(); + result["node_id"] = dht_->getNodeId().toString(); + result["ipv4"] = dht_->getNodesStats(AF_INET).toJson(); + result["ipv6"] = dht_->getNodesStats(AF_INET6).toJson(); + result["public_ip"] = s->get_origin(); // [ipv6:ipv4]:port or ipv4:port + Json::StreamWriterBuilder wbuilder; + wbuilder["commentStyle"] = "None"; + wbuilder["indentation"] = ""; + auto output = Json::writeString(wbuilder, result) + "\n"; + s->close(restbed::OK, output); + } + else + s->close(restbed::SERVICE_UNAVAILABLE, "{\"err\":\"Incorrect DhtRunner\"}"); + } catch (...) { + s->close(restbed::INTERNAL_SERVER_ERROR, "{\"err\":\"Internal server error\"}"); } - else - s->close(restbed::SERVICE_UNAVAILABLE, "{\"err\":\"Incorrect DhtRunner\"}"); } ); } @@ -218,31 +224,35 @@ DhtProxyServer::get(const std::shared_ptr<restbed::Session>& session) const session->fetch(content_length, [=](const std::shared_ptr<restbed::Session> s, const restbed::Bytes& /*b* */) { - if (dht_) { - InfoHash infoHash(hash); - if (!infoHash) { - infoHash = InfoHash::get(hash); - } - s->yield(restbed::OK, "", [=]( const std::shared_ptr< restbed::Session > s) { - auto cacheSession = std::weak_ptr<restbed::Session>(s); - - dht_->get(infoHash, [cacheSession](std::shared_ptr<Value> value) { - auto s = cacheSession.lock(); - if (!s) return false; - // Send values as soon as we get them - Json::StreamWriterBuilder wbuilder; - wbuilder["commentStyle"] = "None"; - wbuilder["indentation"] = ""; - auto output = Json::writeString(wbuilder, value->toJson()) + "\n"; - s->yield(output, [](const std::shared_ptr<restbed::Session> /*session*/){ }); - return true; - }, [s](bool /*ok* */) { - // Communication is finished - s->close(); + try { + if (dht_) { + InfoHash infoHash(hash); + if (!infoHash) { + infoHash = InfoHash::get(hash); + } + s->yield(restbed::OK, "", [=]( const std::shared_ptr< restbed::Session > s) { + auto cacheSession = std::weak_ptr<restbed::Session>(s); + + dht_->get(infoHash, [cacheSession](std::shared_ptr<Value> value) { + auto s = cacheSession.lock(); + if (!s) return false; + // Send values as soon as we get them + Json::StreamWriterBuilder wbuilder; + wbuilder["commentStyle"] = "None"; + wbuilder["indentation"] = ""; + auto output = Json::writeString(wbuilder, value->toJson()) + "\n"; + s->yield(output, [](const std::shared_ptr<restbed::Session> /*session*/){ }); + return true; + }, [s](bool /*ok* */) { + // Communication is finished + s->close(); + }); }); - }); - } else { - s->close(restbed::SERVICE_UNAVAILABLE, "{\"err\":\"Incorrect DhtRunner\"}"); + } else { + s->close(restbed::SERVICE_UNAVAILABLE, "{\"err\":\"Incorrect DhtRunner\"}"); + } + } catch (...) { + s->close(restbed::INTERNAL_SERVER_ERROR, "{\"err\":\"Internal server error\"}"); } } ); @@ -261,40 +271,44 @@ DhtProxyServer::listen(const std::shared_ptr<restbed::Session>& session) session->fetch(content_length, [=](const std::shared_ptr<restbed::Session> s, const restbed::Bytes& /*b* */) { - if (dht_) { - InfoHash infoHash(hash); - if (!infoHash) { - infoHash = InfoHash::get(hash); - } - s->yield(restbed::OK); - // Handle client deconnection - // NOTE: for now, there is no handler, so we test the session in a thread - // will be the case in restbed 5.0 - SessionToHashToken listener; - listener.session = session; - listener.hash = infoHash; - // cache the session to avoid an incrementation of the shared_ptr's counter - // else, the session->close() will not close the socket. - auto cacheSession = std::weak_ptr<restbed::Session>(s); - listener.token = dht_->listen(infoHash, [cacheSession](std::shared_ptr<Value> value) { - auto s = cacheSession.lock(); - if (!s) return false; - // Send values as soon as we get them - if (!s->is_closed()) { - Json::StreamWriterBuilder wbuilder; - wbuilder["commentStyle"] = "None"; - wbuilder["indentation"] = ""; - auto output = Json::writeString(wbuilder, value->toJson()) + "\n"; - s->yield(output, [](const std::shared_ptr<restbed::Session>){ }); + try { + if (dht_) { + InfoHash infoHash(hash); + if (!infoHash) { + infoHash = InfoHash::get(hash); } - return !s->is_closed(); - }); - { - std::lock_guard<std::mutex> lock(lockListener_); - currentListeners_.emplace_back(std::move(listener)); + s->yield(restbed::OK); + // Handle client deconnection + // NOTE: for now, there is no handler, so we test the session in a thread + // will be the case in restbed 5.0 + SessionToHashToken listener; + listener.session = session; + listener.hash = infoHash; + // cache the session to avoid an incrementation of the shared_ptr's counter + // else, the session->close() will not close the socket. + auto cacheSession = std::weak_ptr<restbed::Session>(s); + listener.token = dht_->listen(infoHash, [cacheSession](std::shared_ptr<Value> value) { + auto s = cacheSession.lock(); + if (!s) return false; + // Send values as soon as we get them + if (!s->is_closed()) { + Json::StreamWriterBuilder wbuilder; + wbuilder["commentStyle"] = "None"; + wbuilder["indentation"] = ""; + auto output = Json::writeString(wbuilder, value->toJson()) + "\n"; + s->yield(output, [](const std::shared_ptr<restbed::Session>){ }); + } + return !s->is_closed(); + }); + { + std::lock_guard<std::mutex> lock(lockListener_); + currentListeners_.emplace_back(std::move(listener)); + } + } else { + session->close(restbed::SERVICE_UNAVAILABLE, "{\"err\":\"Incorrect DhtRunner\"}"); } - } else { - session->close(restbed::SERVICE_UNAVAILABLE, "{\"err\":\"Incorrect DhtRunner\"}"); + } catch (...) { + s->close(restbed::INTERNAL_SERVER_ERROR, "{\"err\":\"Internal server error\"}"); } } ); @@ -399,7 +413,7 @@ DhtProxyServer::subscribe(const std::shared_ptr<restbed::Session>& session) } s->close(restbed::OK, "{\"token\": " + std::to_string(tokenFromReq) + "}\n"); } catch (...) { - // do nothing + s->close(restbed::INTERNAL_SERVER_ERROR, "{\"err\":\"Internal server error\"}"); } } ); @@ -437,8 +451,9 @@ DhtProxyServer::unsubscribe(const std::shared_ptr<restbed::Session>& session) if (token == 0) return; cancelPushListen(pushToken, infoHash, token); + s->close(restbed::OK); } catch (...) { - // do nothing + s->close(restbed::INTERNAL_SERVER_ERROR, "{\"err\":\"Internal server error\"}"); } } ); @@ -545,82 +560,86 @@ DhtProxyServer::put(const std::shared_ptr<restbed::Session>& session) session->fetch(content_length, [=](const std::shared_ptr<restbed::Session> s, const restbed::Bytes& b) { - if (dht_) { - if(b.empty()) { - std::string response("{\"err\":\"Missing parameters\"}"); - s->close(restbed::BAD_REQUEST, response); - } else { - restbed::Bytes buf(b); - std::string strJson(buf.begin(), buf.end()); - - std::string err; - Json::Value root; - Json::CharReaderBuilder rbuilder; - auto* char_data = reinterpret_cast<const char*>(&strJson[0]); - auto reader = std::unique_ptr<Json::CharReader>(rbuilder.newCharReader()); - if (reader->parse(char_data, char_data + strJson.size(), &root, &err)) { - // Build the Value from json - auto value = std::make_shared<Value>(root); - auto permanent = root.isMember("permanent"); - - if (permanent) { - std::string pushToken, clientId, platform; - auto& pVal = root["permanent"]; - if (pVal.isObject()) { - pushToken = pVal["key"].asString(); - clientId = pVal["client_id"].asString(); - platform = pVal["platform"].asString(); - } - bool isAndroid = platform == "android"; - std::unique_lock<std::mutex> lock(schedulerLock_); - scheduler_.syncTime(); - auto sPuts = puts_.emplace(infoHash, SearchPuts{}).first; - auto timeout = scheduler_.time() + proxy::OP_TIMEOUT; - auto vid = value->id; - auto r = sPuts->second.puts.emplace(vid, PermanentPut{}); - if (r.second) { - r.first->second.expireJob = scheduler_.add(timeout, [this, infoHash, vid]{ - cancelPut(infoHash, vid); - }); -#if OPENDHT_PUSH_NOTIFICATIONS - if (not pushToken.empty()) - r.first->second.expireNotifyJob = scheduler_.add(timeout - proxy::OP_MARGIN, - [this, infoHash, vid, pushToken, clientId, isAndroid] - { - Json::Value json; - json["timeout"] = infoHash.toString(); - json["to"] = clientId; - json["vid"] = std::to_string(vid); - sendPushNotification(pushToken, json, isAndroid); + try { + if (dht_) { + if(b.empty()) { + std::string response("{\"err\":\"Missing parameters\"}"); + s->close(restbed::BAD_REQUEST, response); + } else { + restbed::Bytes buf(b); + std::string strJson(buf.begin(), buf.end()); + + std::string err; + Json::Value root; + Json::CharReaderBuilder rbuilder; + auto* char_data = reinterpret_cast<const char*>(&strJson[0]); + auto reader = std::unique_ptr<Json::CharReader>(rbuilder.newCharReader()); + if (reader->parse(char_data, char_data + strJson.size(), &root, &err)) { + // Build the Value from json + auto value = std::make_shared<Value>(root); + auto permanent = root.isMember("permanent"); + + if (permanent) { + std::string pushToken, clientId, platform; + auto& pVal = root["permanent"]; + if (pVal.isObject()) { + pushToken = pVal["key"].asString(); + clientId = pVal["client_id"].asString(); + platform = pVal["platform"].asString(); + } + bool isAndroid = platform == "android"; + std::unique_lock<std::mutex> lock(schedulerLock_); + scheduler_.syncTime(); + auto sPuts = puts_.emplace(infoHash, SearchPuts{}).first; + auto timeout = scheduler_.time() + proxy::OP_TIMEOUT; + auto vid = value->id; + auto r = sPuts->second.puts.emplace(vid, PermanentPut{}); + if (r.second) { + r.first->second.expireJob = scheduler_.add(timeout, [this, infoHash, vid]{ + cancelPut(infoHash, vid); }); +#if OPENDHT_PUSH_NOTIFICATIONS + if (not pushToken.empty()) + r.first->second.expireNotifyJob = scheduler_.add(timeout - proxy::OP_MARGIN, + [this, infoHash, vid, pushToken, clientId, isAndroid] + { + Json::Value json; + json["timeout"] = infoHash.toString(); + json["to"] = clientId; + json["vid"] = std::to_string(vid); + sendPushNotification(pushToken, json, isAndroid); + }); #endif - } else { - scheduler_.edit(r.first->second.expireJob, timeout); - if (r.first->second.expireNotifyJob) - scheduler_.edit(r.first->second.expireNotifyJob, timeout - proxy::OP_MARGIN); + } else { + scheduler_.edit(r.first->second.expireJob, timeout); + if (r.first->second.expireNotifyJob) + scheduler_.edit(r.first->second.expireNotifyJob, timeout - proxy::OP_MARGIN); + } + lock.unlock(); + schedulerCv_.notify_one(); } - lock.unlock(); - schedulerCv_.notify_one(); - } - dht_->put(infoHash, value, [s, value](bool ok) { - if (ok) { - Json::StreamWriterBuilder wbuilder; - wbuilder["commentStyle"] = "None"; - wbuilder["indentation"] = ""; - if (s->is_open()) - s->close(restbed::OK, Json::writeString(wbuilder, value->toJson()) + "\n"); - } else { - if (s->is_open()) - s->close(restbed::BAD_GATEWAY, "{\"err\":\"put failed\"}"); - } - }, time_point::max(), permanent); - } else { - s->close(restbed::BAD_REQUEST, "{\"err\":\"Incorrect JSON\"}"); + dht_->put(infoHash, value, [s, value](bool ok) { + if (ok) { + Json::StreamWriterBuilder wbuilder; + wbuilder["commentStyle"] = "None"; + wbuilder["indentation"] = ""; + if (s->is_open()) + s->close(restbed::OK, Json::writeString(wbuilder, value->toJson()) + "\n"); + } else { + if (s->is_open()) + s->close(restbed::BAD_GATEWAY, "{\"err\":\"put failed\"}"); + } + }, time_point::max(), permanent); + } else { + s->close(restbed::BAD_REQUEST, "{\"err\":\"Incorrect JSON\"}"); + } } + } else { + s->close(restbed::SERVICE_UNAVAILABLE, "{\"err\":\"Incorrect DhtRunner\"}"); } - } else { - s->close(restbed::SERVICE_UNAVAILABLE, "{\"err\":\"Incorrect DhtRunner\"}"); + } catch (...) { + s->close(restbed::INTERNAL_SERVER_ERROR, "{\"err\":\"Internal server error\"}"); } } ); @@ -641,34 +660,38 @@ DhtProxyServer::putSigned(const std::shared_ptr<restbed::Session>& session) cons session->fetch(content_length, [=](const std::shared_ptr<restbed::Session> s, const restbed::Bytes& b) { - if (dht_) { - if(b.empty()) { - std::string response("{\"err\":\"Missing parameters\"}"); - s->close(restbed::BAD_REQUEST, response); - } else { - restbed::Bytes buf(b); - std::string strJson(buf.begin(), buf.end()); - - std::string err; - Json::Value root; - Json::CharReaderBuilder rbuilder; - auto* char_data = reinterpret_cast<const char*>(&strJson[0]); - auto reader = std::unique_ptr<Json::CharReader>(rbuilder.newCharReader()); - if (reader->parse(char_data, char_data + strJson.size(), &root, &err)) { - auto value = std::make_shared<Value>(root); - - Json::StreamWriterBuilder wbuilder; - wbuilder["commentStyle"] = "None"; - wbuilder["indentation"] = ""; - auto output = Json::writeString(wbuilder, value->toJson()) + "\n"; - dht_->putSigned(infoHash, value); - s->close(restbed::OK, output); + try { + if (dht_) { + if(b.empty()) { + std::string response("{\"err\":\"Missing parameters\"}"); + s->close(restbed::BAD_REQUEST, response); } else { - s->close(restbed::BAD_REQUEST, "{\"err\":\"Incorrect JSON\"}"); + restbed::Bytes buf(b); + std::string strJson(buf.begin(), buf.end()); + + std::string err; + Json::Value root; + Json::CharReaderBuilder rbuilder; + auto* char_data = reinterpret_cast<const char*>(&strJson[0]); + auto reader = std::unique_ptr<Json::CharReader>(rbuilder.newCharReader()); + if (reader->parse(char_data, char_data + strJson.size(), &root, &err)) { + auto value = std::make_shared<Value>(root); + + Json::StreamWriterBuilder wbuilder; + wbuilder["commentStyle"] = "None"; + wbuilder["indentation"] = ""; + auto output = Json::writeString(wbuilder, value->toJson()) + "\n"; + dht_->putSigned(infoHash, value); + s->close(restbed::OK, output); + } else { + s->close(restbed::BAD_REQUEST, "{\"err\":\"Incorrect JSON\"}"); + } } + } else { + s->close(restbed::SERVICE_UNAVAILABLE, "{\"err\":\"Incorrect DhtRunner\"}"); } - } else { - s->close(restbed::SERVICE_UNAVAILABLE, "{\"err\":\"Incorrect DhtRunner\"}"); + } catch (...) { + s->close(restbed::INTERNAL_SERVER_ERROR, "{\"err\":\"Internal server error\"}"); } } ); @@ -688,38 +711,42 @@ DhtProxyServer::putEncrypted(const std::shared_ptr<restbed::Session>& session) c session->fetch(content_length, [=](const std::shared_ptr<restbed::Session> s, const restbed::Bytes& b) { - if (dht_) { - if(b.empty()) { - std::string response("{\"err\":\"Missing parameters\"}"); - s->close(restbed::BAD_REQUEST, response); - } else { - restbed::Bytes buf(b); - std::string strJson(buf.begin(), buf.end()); - - std::string err; - Json::Value root; - Json::CharReaderBuilder rbuilder; - auto* char_data = reinterpret_cast<const char*>(&strJson[0]); - auto reader = std::unique_ptr<Json::CharReader>(rbuilder.newCharReader()); - bool parsingSuccessful = reader->parse(char_data, char_data + strJson.size(), &root, &err); - InfoHash to(root["to"].asString()); - if (parsingSuccessful && to) { - auto value = std::make_shared<Value>(root); - Json::StreamWriterBuilder wbuilder; - wbuilder["commentStyle"] = "None"; - wbuilder["indentation"] = ""; - auto output = Json::writeString(wbuilder, value->toJson()) + "\n"; - dht_->putEncrypted(key, to, value); - s->close(restbed::OK, output); + try { + if (dht_) { + if(b.empty()) { + std::string response("{\"err\":\"Missing parameters\"}"); + s->close(restbed::BAD_REQUEST, response); } else { - if(!parsingSuccessful) - s->close(restbed::BAD_REQUEST, "{\"err\":\"Incorrect JSON\"}"); - else - s->close(restbed::BAD_REQUEST, "{\"err\":\"No destination found\"}"); + restbed::Bytes buf(b); + std::string strJson(buf.begin(), buf.end()); + + std::string err; + Json::Value root; + Json::CharReaderBuilder rbuilder; + auto* char_data = reinterpret_cast<const char*>(&strJson[0]); + auto reader = std::unique_ptr<Json::CharReader>(rbuilder.newCharReader()); + bool parsingSuccessful = reader->parse(char_data, char_data + strJson.size(), &root, &err); + InfoHash to(root["to"].asString()); + if (parsingSuccessful && to) { + auto value = std::make_shared<Value>(root); + Json::StreamWriterBuilder wbuilder; + wbuilder["commentStyle"] = "None"; + wbuilder["indentation"] = ""; + auto output = Json::writeString(wbuilder, value->toJson()) + "\n"; + dht_->putEncrypted(key, to, value); + s->close(restbed::OK, output); + } else { + if(!parsingSuccessful) + s->close(restbed::BAD_REQUEST, "{\"err\":\"Incorrect JSON\"}"); + else + s->close(restbed::BAD_REQUEST, "{\"err\":\"No destination found\"}"); + } } + } else { + s->close(restbed::SERVICE_UNAVAILABLE, "{\"err\":\"Incorrect DhtRunner\"}"); } - } else { - s->close(restbed::SERVICE_UNAVAILABLE, "{\"err\":\"Incorrect DhtRunner\"}"); + } catch (...) { + s->close(restbed::INTERNAL_SERVER_ERROR, "{\"err\":\"Internal server error\"}"); } } ); @@ -751,27 +778,31 @@ DhtProxyServer::getFiltered(const std::shared_ptr<restbed::Session>& session) co session->fetch(content_length, [=](const std::shared_ptr<restbed::Session> s, const restbed::Bytes& /*b* */) { - if (dht_) { - InfoHash infoHash(hash); - if (!infoHash) { - infoHash = InfoHash::get(hash); + try { + if (dht_) { + InfoHash infoHash(hash); + if (!infoHash) { + infoHash = InfoHash::get(hash); + } + s->yield(restbed::OK, "", [=]( const std::shared_ptr< restbed::Session > s) { + dht_->get(infoHash, [s](std::shared_ptr<Value> v) { + // Send values as soon as we get them + Json::StreamWriterBuilder wbuilder; + wbuilder["commentStyle"] = "None"; + wbuilder["indentation"] = ""; + auto output = Json::writeString(wbuilder, v->toJson()) + "\n"; + s->yield(output, [](const std::shared_ptr<restbed::Session> /*session*/){ }); + return true; + }, [s](bool /*ok* */) { + // Communication is finished + s->close(); + }, {}, value); + }); + } else { + s->close(restbed::SERVICE_UNAVAILABLE, "{\"err\":\"Incorrect DhtRunner\"}"); } - s->yield(restbed::OK, "", [=]( const std::shared_ptr< restbed::Session > s) { - dht_->get(infoHash, [s](std::shared_ptr<Value> v) { - // Send values as soon as we get them - Json::StreamWriterBuilder wbuilder; - wbuilder["commentStyle"] = "None"; - wbuilder["indentation"] = ""; - auto output = Json::writeString(wbuilder, v->toJson()) + "\n"; - s->yield(output, [](const std::shared_ptr<restbed::Session> /*session*/){ }); - return true; - }, [s](bool /*ok* */) { - // Communication is finished - s->close(); - }, {}, value); - }); - } else { - s->close(restbed::SERVICE_UNAVAILABLE, "{\"err\":\"Incorrect DhtRunner\"}"); + } catch (...) { + s->close(restbed::INTERNAL_SERVER_ERROR, "{\"err\":\"Internal server error\"}"); } } );