Skip to content
Snippets Groups Projects
Commit bd32edc6 authored by Seva's avatar Seva
Browse files

dhtproxy: uniform client logging

parent d0d3ef73
No related branches found
No related tags found
No related merge requests found
...@@ -86,16 +86,16 @@ DhtProxyClient::DhtProxyClient(std::function<void()> signal, const std::string & ...@@ -86,16 +86,16 @@ DhtProxyClient::DhtProxyClient(std::function<void()> signal, const std::string &
httpClientThread_ = std::thread([this](){ httpClientThread_ = std::thread([this](){
try { try {
if (logger_) if (logger_)
logger_->d("Starting the HTTP Client io_context"); logger_->d("[proxy:client] starting io context");
// Ensures the httpContext_ won't run out of work // Ensures the httpContext_ won't run out of work
auto work = asio::make_work_guard(httpContext_); auto work = asio::make_work_guard(httpContext_);
httpContext_.run(); httpContext_.run();
if (logger_) if (logger_)
logger_->d("HTTP Client io_context stopped"); logger_->d("[proxy:client] http client io context stopped");
} }
catch(const std::exception &ex){ catch(const std::exception &ex){
if (logger_) if (logger_)
logger_->e("Error starting the HTTP Client io_context"); logger_->e("[proxy:client] error starting io context");
} }
}); });
...@@ -121,7 +121,7 @@ DhtProxyClient::startProxy() ...@@ -121,7 +121,7 @@ DhtProxyClient::startProxy()
return; return;
if (logger_) if (logger_)
logger_->w("Staring proxy client to %s", serverHost_.c_str()); logger_->d("[proxy:client] staring proxy with %s", serverHost_.c_str());
nextProxyConfirmationTimer_ = std::make_shared<asio::steady_timer>( nextProxyConfirmationTimer_ = std::make_shared<asio::steady_timer>(
httpContext_, std::chrono::steady_clock::now()); httpContext_, std::chrono::steady_clock::now());
...@@ -176,7 +176,7 @@ DhtProxyClient::cancelAllListeners() ...@@ -176,7 +176,7 @@ DhtProxyClient::cancelAllListeners()
{ {
std::lock_guard<std::mutex> lock(searchLock_); std::lock_guard<std::mutex> lock(searchLock_);
if (logger_) if (logger_)
logger_->w("[proxy:client] [listeners:cancel:all] [%zu searches]", searches_.size()); logger_->d("[proxy:client] [listeners:cancel:all] [%zu searches]", searches_.size());
for (auto& s: searches_) { for (auto& s: searches_) {
s.second.ops.cancelAll([&](size_t token){ s.second.ops.cancelAll([&](size_t token){
auto l = s.second.listeners.find(token); auto l = s.second.listeners.find(token);
...@@ -188,7 +188,7 @@ DhtProxyClient::cancelAllListeners() ...@@ -188,7 +188,7 @@ DhtProxyClient::cancelAllListeners()
httpClient_->close_connection(l->second.connId); httpClient_->close_connection(l->second.connId);
} catch (const std::exception& e) { } catch (const std::exception& e) {
if (logger_) if (logger_)
logger_->w("[proxy:client] [listeners:cancel:all] error closing socket: %s", e.what()); logger_->e("[proxy:client] [listeners:cancel:all] error closing socket: %s", e.what());
} }
l->second.connId = 0; l->second.connId = 0;
} }
...@@ -267,7 +267,7 @@ DhtProxyClient::get(const InfoHash& key, GetCallback cb, DoneCallback donecb, ...@@ -267,7 +267,7 @@ DhtProxyClient::get(const InfoHash& key, GetCallback cb, DoneCallback donecb,
Value::Filter&& f, Where&& w) Value::Filter&& f, Where&& w)
{ {
if (logger_) if (logger_)
logger_->d(key, "[search %s]: get", key.to_c_str()); logger_->d("[proxy:client] [get] [search %s]", key.to_c_str());
restinio::http_request_header_t header; restinio::http_request_header_t header;
header.request_target("/" + key.toString()); header.request_target("/" + key.toString());
header.method(restinio::http_method_get()); header.method(restinio::http_method_get());
...@@ -324,7 +324,7 @@ DhtProxyClient::get(const InfoHash& key, GetCallback cb, DoneCallback donecb, ...@@ -324,7 +324,7 @@ DhtProxyClient::get(const InfoHash& key, GetCallback cb, DoneCallback donecb,
auto context = static_cast<GetContext*>(parser->data); auto context = static_cast<GetContext*>(parser->data);
if (parser->status_code != 200){ if (parser->status_code != 200){
if (context->logger) if (context->logger)
context->logger->e("[proxy:client] get status error: %i", parser->status_code); context->logger->e("[proxy:client] [get] status error: %i", parser->status_code);
context->ok = true; context->ok = true;
} }
return 0; return 0;
...@@ -348,7 +348,7 @@ DhtProxyClient::get(const InfoHash& key, GetCallback cb, DoneCallback donecb, ...@@ -348,7 +348,7 @@ DhtProxyClient::get(const InfoHash& key, GetCallback cb, DoneCallback donecb,
} }
} catch(const std::exception& e) { } catch(const std::exception& e) {
if (context->logger) if (context->logger)
context->logger->e("[proxy:client] get body parsing error: %s", e.what()); context->logger->e("[proxy:client] [get] body parsing error: %s", e.what());
context->ok = false; context->ok = false;
return 1; return 1;
} }
...@@ -361,7 +361,7 @@ DhtProxyClient::get(const InfoHash& key, GetCallback cb, DoneCallback donecb, ...@@ -361,7 +361,7 @@ DhtProxyClient::get(const InfoHash& key, GetCallback cb, DoneCallback donecb,
context->donecb(context->ok); context->donecb(context->ok);
} catch(const std::exception& e) { } catch(const std::exception& e) {
if (context->logger) if (context->logger)
context->logger->e("[proxy:client] get message complete parsing error: %i", context->logger->e("[proxy:client] [get] message complete parsing error: %i",
parser->status_code); parser->status_code);
return 1; return 1;
} }
...@@ -375,7 +375,7 @@ DhtProxyClient::put(const InfoHash& key, Sp<Value> val, DoneCallback cb, ...@@ -375,7 +375,7 @@ DhtProxyClient::put(const InfoHash& key, Sp<Value> val, DoneCallback cb,
time_point created, bool permanent) time_point created, bool permanent)
{ {
if (logger_) if (logger_)
logger_->d(key, "[search %s]: put", key.to_c_str()); logger_->d("[proxy:client] [put] [search %s]", key.to_c_str());
if (not val){ if (not val){
if (cb) if (cb)
cb(false, {}); cb(false, {});
...@@ -397,7 +397,7 @@ DhtProxyClient::put(const InfoHash& key, Sp<Value> val, DoneCallback cb, ...@@ -397,7 +397,7 @@ DhtProxyClient::put(const InfoHash& key, Sp<Value> val, DoneCallback cb,
refreshTimer->async_wait([this, key, id, ok](const asio::error_code &ec){ refreshTimer->async_wait([this, key, id, ok](const asio::error_code &ec){
if (ec){ if (ec){
if (logger_) if (logger_)
logger_->d("[listener:refresh] error key=%s", key.toString().c_str()); logger_->e("[proxy:client] [listener:refresh] error key=%s", key.toString().c_str());
return; return;
} }
std::lock_guard<std::mutex> lock(searchLock_); std::lock_guard<std::mutex> lock(searchLock_);
...@@ -426,7 +426,7 @@ void ...@@ -426,7 +426,7 @@ void
DhtProxyClient::doPut(const InfoHash& key, Sp<Value> val, DoneCallback cb, time_point /*created*/, bool permanent) DhtProxyClient::doPut(const InfoHash& key, Sp<Value> val, DoneCallback cb, time_point /*created*/, bool permanent)
{ {
if (logger_) if (logger_)
logger_->d(key, "[search %s] performing put of %s", key.to_c_str(), val->toString().c_str()); logger_->d("[proxy:client] [put] [search %s] executing for %s", key.to_c_str(), val->toString().c_str());
restinio::http_request_header_t header; restinio::http_request_header_t header;
header.request_target("/" + key.toString()); header.request_target("/" + key.toString());
header.method(restinio::http_method_post()); header.method(restinio::http_method_post());
...@@ -486,7 +486,7 @@ DhtProxyClient::doPut(const InfoHash& key, Sp<Value> val, DoneCallback cb, time_ ...@@ -486,7 +486,7 @@ DhtProxyClient::doPut(const InfoHash& key, Sp<Value> val, DoneCallback cb, time_
context->ok = true; context->ok = true;
} else { } else {
if (context->logger) if (context->logger)
context->logger->e("[proxy:client] put status error: %i", parser->status_code); context->logger->e("[proxy:client] [put] status error: %i", parser->status_code);
} }
return 0; return 0;
}; };
...@@ -497,7 +497,7 @@ DhtProxyClient::doPut(const InfoHash& key, Sp<Value> val, DoneCallback cb, time_ ...@@ -497,7 +497,7 @@ DhtProxyClient::doPut(const InfoHash& key, Sp<Value> val, DoneCallback cb, time_
context->donecb(context->ok); context->donecb(context->ok);
} catch(const std::exception& e) { } catch(const std::exception& e) {
if (context->logger) if (context->logger)
context->logger->e("[proxy:client] put message complete error: %s", e.what()); context->logger->e("[proxy:client] [put] message complete error: %s", e.what());
return 1; return 1;
} }
return 0; return 0;
...@@ -545,7 +545,7 @@ DhtProxyClient::cancelPut(const InfoHash& key, const Value::Id& id) ...@@ -545,7 +545,7 @@ DhtProxyClient::cancelPut(const InfoHash& key, const Value::Id& id)
if (search == searches_.end()) if (search == searches_.end())
return false; return false;
if (logger_) if (logger_)
logger_->d(key, "[search %s] cancel put", key.to_c_str()); logger_->d("[proxy:client] [put:cancel] [search %s]", key.to_c_str());
return search->second.puts.erase(id) > 0; return search->second.puts.erase(id) > 0;
} }
...@@ -559,7 +559,7 @@ void ...@@ -559,7 +559,7 @@ void
DhtProxyClient::getProxyInfos() DhtProxyClient::getProxyInfos()
{ {
if (logger_) if (logger_)
logger_->d("Requesting proxy server node information"); logger_->d("[proxy:client] [info] requesting proxy server node information");
std::lock_guard<std::mutex> l(statusLock_); std::lock_guard<std::mutex> l(statusLock_);
auto infoState = std::make_shared<InfoState>(); auto infoState = std::make_shared<InfoState>();
...@@ -730,7 +730,7 @@ DhtProxyClient::onProxyInfos(const Json::Value& proxyInfos, sa_family_t family) ...@@ -730,7 +730,7 @@ DhtProxyClient::onProxyInfos(const Json::Value& proxyInfos, sa_family_t family)
publicAddressV6_ = publicIp; publicAddressV6_ = publicIp;
} catch (const std::exception& e) { } catch (const std::exception& e) {
if (logger_) if (logger_)
logger_->w("[proxy:client] [info] error processing: %s", e.what()); logger_->e("[proxy:client] [info] error processing: %s", e.what());
} }
} }
auto newStatus = std::max(statusIpv4_, statusIpv6_); auto newStatus = std::max(statusIpv4_, statusIpv6_);
...@@ -771,7 +771,7 @@ DhtProxyClient::getPublicAddress(sa_family_t family) ...@@ -771,7 +771,7 @@ DhtProxyClient::getPublicAddress(sa_family_t family)
size_t size_t
DhtProxyClient::listen(const InfoHash& key, ValueCallback cb, Value::Filter filter, Where where) { DhtProxyClient::listen(const InfoHash& key, ValueCallback cb, Value::Filter filter, Where where) {
if (logger_) if (logger_)
logger_->d(key, "[search %s]: listen", key.to_c_str()); logger_->d("[proxy:client] [listen] [search %s]", key.to_c_str());
auto& search = searches_[key]; auto& search = searches_[key];
auto query = std::make_shared<Query>(Select{}, where); auto query = std::make_shared<Query>(Select{}, where);
...@@ -781,11 +781,11 @@ DhtProxyClient::listen(const InfoHash& key, ValueCallback cb, Value::Filter filt ...@@ -781,11 +781,11 @@ DhtProxyClient::listen(const InfoHash& key, ValueCallback cb, Value::Filter filt
auto search = searches_.find(key); auto search = searches_.find(key);
if (search == searches_.end()) { if (search == searches_.end()) {
if (logger_) if (logger_)
logger_->e(key, "[search %s] listen: search not found", key.to_c_str()); logger_->e("[proxy:client] [listen] [search %s] search not found", key.to_c_str());
return 0; return 0;
} }
if (logger_) if (logger_)
logger_->d("[search %s] sending %s", key.to_c_str(), logger_->d("[proxy:client] [listen] [search %s] sending %s", key.to_c_str(),
deviceKey_.empty() ? "listen" : "subscribe"); deviceKey_.empty() ? "listen" : "subscribe");
auto token = ++listenerToken_; auto token = ++listenerToken_;
...@@ -832,7 +832,7 @@ DhtProxyClient::listen(const InfoHash& key, ValueCallback cb, Value::Filter filt ...@@ -832,7 +832,7 @@ DhtProxyClient::listen(const InfoHash& key, ValueCallback cb, Value::Filter filt
{ {
if (ec){ if (ec){
if (logger_) if (logger_)
logger_->d("[listener:refresh] error key=%s", key.toString().c_str()); logger_->d("[proxy:client] [listen] refresh error key=%s", key.toString().c_str());
return; return;
} }
if (state->cancel) if (state->cancel)
...@@ -951,7 +951,7 @@ DhtProxyClient::sendListen(const restinio::http_request_header_t header, ...@@ -951,7 +951,7 @@ DhtProxyClient::sendListen(const restinio::http_request_header_t header,
auto context = static_cast<ListenContext*>(parser->data); auto context = static_cast<ListenContext*>(parser->data);
if (parser->status_code != 200){ if (parser->status_code != 200){
if (context->logger) if (context->logger)
context->logger->e("[proxy:client] listen status error: %i", parser->status_code); context->logger->e("[proxy:client] [listen] status error: %i", parser->status_code);
context->state->ok = false; context->state->ok = false;
} }
return 0; return 0;
...@@ -979,7 +979,7 @@ DhtProxyClient::sendListen(const restinio::http_request_header_t header, ...@@ -979,7 +979,7 @@ DhtProxyClient::sendListen(const restinio::http_request_header_t header,
} }
} catch(const std::exception& e) { } catch(const std::exception& e) {
if (context->logger) if (context->logger)
context->logger->e("Error in listen parsing: %s", e.what()); context->logger->e("[proxy:client] [listen] error in parsing: %s", e.what());
context->state->ok = false; context->state->ok = false;
return 1; return 1;
} }
...@@ -1003,7 +1003,7 @@ DhtProxyClient::doCancelListen(const InfoHash& key, size_t ltoken) ...@@ -1003,7 +1003,7 @@ DhtProxyClient::doCancelListen(const InfoHash& key, size_t ltoken)
return false; return false;
if (logger_) if (logger_)
logger_->d(key, "[search %s] cancel listen", key.to_c_str()); logger_->d("[proxy:client] [listen:cancel] [search %s]", key.to_c_str());
auto& listener = it->second; auto& listener = it->second;
listener.state->cancel = true; listener.state->cancel = true;
...@@ -1026,7 +1026,7 @@ DhtProxyClient::doCancelListen(const InfoHash& key, size_t ltoken) ...@@ -1026,7 +1026,7 @@ DhtProxyClient::doCancelListen(const InfoHash& key, size_t ltoken)
auto request = httpClient_->create_request(header, header_fields, auto request = httpClient_->create_request(header, header_fields,
restinio::http_connection_header_t::keep_alive, content); restinio::http_connection_header_t::keep_alive, content);
if (logger_) if (logger_)
logger_->w(request.c_str()); logger_->d(request.c_str());
// define context // define context
struct UnsubscribeContext { struct UnsubscribeContext {
InfoHash key; InfoHash key;
...@@ -1061,13 +1061,13 @@ DhtProxyClient::doCancelListen(const InfoHash& key, size_t ltoken) ...@@ -1061,13 +1061,13 @@ DhtProxyClient::doCancelListen(const InfoHash& key, size_t ltoken)
} }
catch (const std::exception& e){ catch (const std::exception& e){
if (logger_) if (logger_)
logger_->w("Error closing socket: %s", e.what()); logger_->e("[proxy:client] [listen:cancel] error closing socket: %s", e.what());
} }
} }
} }
search->second.listeners.erase(it); search->second.listeners.erase(it);
if (logger_) if (logger_)
logger_->d(key, "[search %s] cancelListen: %zu listener remaining", logger_->d("[proxy:client] [listen:cancel] [search %s] %zu listener remaining",
key.to_c_str(), search->second.listeners.size()); key.to_c_str(), search->second.listeners.size());
if (search->second.listeners.empty()){ if (search->second.listeners.empty()){
searches_.erase(search); searches_.erase(search);
...@@ -1079,7 +1079,7 @@ void ...@@ -1079,7 +1079,7 @@ void
DhtProxyClient::opFailed() DhtProxyClient::opFailed()
{ {
if (logger_) if (logger_)
logger_->e("Proxy request failed"); logger_->e("[proxy:client] proxy request failed");
{ {
std::lock_guard<std::mutex> l(lockCurrentProxyInfos_); std::lock_guard<std::mutex> l(lockCurrentProxyInfos_);
statusIpv4_ = NodeStatus::Disconnected; statusIpv4_ = NodeStatus::Disconnected;
...@@ -1102,7 +1102,7 @@ DhtProxyClient::restartListeners() ...@@ -1102,7 +1102,7 @@ DhtProxyClient::restartListeners()
if (isDestroying_) return; if (isDestroying_) return;
std::lock_guard<std::mutex> lock(searchLock_); std::lock_guard<std::mutex> lock(searchLock_);
if (logger_) if (logger_)
logger_->d("Refresh permanent puts"); logger_->d("[proxy:client] [listeners:restart] refresh permanent puts");
for (auto& search : searches_) { for (auto& search : searches_) {
for (auto& put : search.second.puts) { for (auto& put : search.second.puts) {
if (!*put.second.ok) { if (!*put.second.ok) {
...@@ -1122,7 +1122,7 @@ DhtProxyClient::restartListeners() ...@@ -1122,7 +1122,7 @@ DhtProxyClient::restartListeners()
} }
if (not deviceKey_.empty()) { if (not deviceKey_.empty()) {
if (logger_) if (logger_)
logger_->d("resubscribe due to a connectivity change"); logger_->d("[proxy:client] [listeners:restart] resubscribe due to a connectivity change");
// Connectivity changed, refresh all subscribe // Connectivity changed, refresh all subscribe
for (auto& search : searches_) for (auto& search : searches_)
for (auto& listener : search.second.listeners) for (auto& listener : search.second.listeners)
...@@ -1131,7 +1131,7 @@ DhtProxyClient::restartListeners() ...@@ -1131,7 +1131,7 @@ DhtProxyClient::restartListeners()
return; return;
} }
if (logger_) if (logger_)
logger_->d("Restarting listeners"); logger_->d("[proxy:client] [listeners:restart] restarting listeners");
for (auto& search: searches_) { for (auto& search: searches_) {
for (auto& l: search.second.listeners) { for (auto& l: search.second.listeners) {
auto& listener = l.second; auto& listener = l.second;
...@@ -1142,7 +1142,7 @@ DhtProxyClient::restartListeners() ...@@ -1142,7 +1142,7 @@ DhtProxyClient::restartListeners()
httpClient_->close_connection(listener.connId); httpClient_->close_connection(listener.connId);
} catch (const std::exception& e) { } catch (const std::exception& e) {
if (logger_) if (logger_)
logger_->w("Error closing socket: %s", e.what()); logger_->e("[proxy:client] [listeners:restart] error closing socket: %s", e.what());
} }
l.second.connId = 0; l.second.connId = 0;
} }
...@@ -1206,7 +1206,7 @@ DhtProxyClient::pushNotificationReceived(const std::map<std::string, std::string ...@@ -1206,7 +1206,7 @@ DhtProxyClient::pushNotificationReceived(const std::map<std::string, std::string
if (list.second.state->cancel) if (list.second.state->cancel)
continue; continue;
if (logger_) if (logger_)
logger_->d(key, "[search %s] handling push notification", key.to_c_str()); logger_->d("[proxy:client] [push:received] [search %s] handling", key.to_c_str());
auto expired = notification.find("exp"); auto expired = notification.find("exp");
auto token = list.first; auto token = list.first;
auto state = list.second.state; auto state = list.second.state;
...@@ -1251,7 +1251,7 @@ DhtProxyClient::pushNotificationReceived(const std::map<std::string, std::string ...@@ -1251,7 +1251,7 @@ DhtProxyClient::pushNotificationReceived(const std::map<std::string, std::string
} }
} catch (const std::exception& e) { } catch (const std::exception& e) {
if (logger_) if (logger_)
logger_->e("Error handling push notification: %s", e.what()); logger_->e("[proxy:client] [push:received] error handling: %s", e.what());
} }
#else #else
(void) notification; (void) notification;
...@@ -1265,7 +1265,7 @@ DhtProxyClient::resubscribe(const InfoHash& key, Listener& listener) ...@@ -1265,7 +1265,7 @@ DhtProxyClient::resubscribe(const InfoHash& key, Listener& listener)
if (deviceKey_.empty()) if (deviceKey_.empty())
return; return;
if (logger_) if (logger_)
logger_->d(key, "[search %s] resubscribe push listener", key.to_c_str()); logger_->d("[proxy:client] [resubscribe] [search %s] resubscribe push listener", key.to_c_str());
// Subscribe // Subscribe
auto state = listener.state; auto state = listener.state;
state->cancel = true; state->cancel = true;
...@@ -1274,7 +1274,7 @@ DhtProxyClient::resubscribe(const InfoHash& key, Listener& listener) ...@@ -1274,7 +1274,7 @@ DhtProxyClient::resubscribe(const InfoHash& key, Listener& listener)
httpClient_->close_connection(listener.connId); httpClient_->close_connection(listener.connId);
} catch (const std::exception& e) { } catch (const std::exception& e) {
if (logger_) if (logger_)
logger_->w("[resubscribe] error closing socket: %s", e.what()); logger_->e("[proxy:client] [resubscribe] error closing socket: %s", e.what());
} }
} }
state->cancel = false; state->cancel = false;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment