diff --git a/include/opendht/proxy.h b/include/opendht/proxy.h index 103e131c66cf11c73d272205386a8c8e673b7532..112e555004a5db6bf0db517d2958df49afb7f4a1 100644 --- a/include/opendht/proxy.h +++ b/include/opendht/proxy.h @@ -22,8 +22,8 @@ namespace dht { namespace proxy { -constexpr const std::chrono::seconds OP_TIMEOUT {1 * 60 * 60}; // onw hour -constexpr const std::chrono::seconds OP_MARGIN {5 * 60}; // 5 minutes +constexpr const std::chrono::minutes OP_TIMEOUT {1 * 10}; // one hour +constexpr const std::chrono::minutes OP_MARGIN {5}; // 5 minutes constexpr const char* const HTTP_PROTO {"http://"}; using ListenToken = uint64_t; diff --git a/src/dht_proxy_client.cpp b/src/dht_proxy_client.cpp index 6874d6cd8683082208447d4fcffdfa159b65dbe5..f1bad0ac3e8cb99c1e31c71cf6a71155e91e829b 100644 --- a/src/dht_proxy_client.cpp +++ b/src/dht_proxy_client.cpp @@ -325,11 +325,15 @@ DhtProxyClient::put(const InfoHash& key, Sp<Value> val, DoneCallback cb, time_po auto nextRefresh = scheduler.time() + proxy::OP_TIMEOUT; search->second.puts.erase(id); search->second.puts.emplace(id, PermanentPut {val, scheduler.add(nextRefresh, [this, key, id]{ + DHT_LOG.w(key, "[search %s]: PermanentPut refresh callback vid %llu", key.to_c_str(), id); + std::lock_guard<std::mutex> lock(searchLock_); auto s = searches_.find(key); if (s != searches_.end()) { auto p = s->second.puts.find(id); if (p != s->second.puts.end()) { - doPut(key, p->second.value, {}, time_point::max(), true); + doPut(key, p->second.value, [this](bool ok, const std::vector<Sp<Node>>&) { + DHT_LOG.w("PermanentPut refresh %s", (ok ? "ok" : "failed")); + }, time_point::max(), true); scheduler.edit(p->second.refreshJob, scheduler.time() + proxy::OP_TIMEOUT); } } @@ -957,6 +961,10 @@ DhtProxyClient::pushNotificationReceived(const std::map<std::string, std::string { #if OPENDHT_PUSH_NOTIFICATIONS scheduler.syncTime(); + DHT_LOG.d("Got push notification"); + for (const auto& n : notification) { + DHT_LOG.d(" %s -> %s", n.first.c_str(), n.second.c_str()); + } try { std::lock_guard<std::mutex> lock(searchLock_); auto timeout = notification.find("timeout"); @@ -967,7 +975,8 @@ DhtProxyClient::pushNotificationReceived(const std::map<std::string, std::string if (vidIt != notification.end()) { // Refresh put auto vid = std::stoull(vidIt->second); - auto put = search.puts.at(vid); + auto& put = search.puts.at(vid); + DHT_LOG.d("refresh PermanentPut: %s %llx %p %s", key.to_c_str(), vid, put.refreshJob.get(), put.refreshJob ? (put.refreshJob->do_ ? "set":"not set") : "(null job)"); scheduler.edit(put.refreshJob, scheduler.time()); loopSignal_(); } else { diff --git a/src/dht_proxy_server.cpp b/src/dht_proxy_server.cpp index 79f997e425114e9b27077341b63b66b10cc62770..4830d2082d0df4bd9a593f3e51d1f790d7af8f8a 100644 --- a/src/dht_proxy_server.cpp +++ b/src/dht_proxy_server.cpp @@ -588,8 +588,8 @@ DhtProxyServer::put(const std::shared_ptr<restbed::Session>& session) if (reader->parse(char_data, char_data + b.size(), &root, &err)) { // Build the Value from json auto value = std::make_shared<Value>(root); - auto permanent = root.isMember("permanent"); - std::cout << "Got put " << infoHash << " " << value << " " << (permanent ? "permanent" : "") << std::endl; + bool permanent = root.isMember("permanent"); + std::cout << "Got put " << infoHash << " " << *value << " " << (permanent ? "permanent" : "") << std::endl; if (permanent) { std::string pushToken, clientId, platform; @@ -599,6 +599,7 @@ DhtProxyServer::put(const std::shared_ptr<restbed::Session>& session) clientId = pVal["client_id"].asString(); platform = pVal["platform"].asString(); } + std::cout << "permanent: pushToken " << pushToken << " clientId " << clientId << " platform " << platform << std::endl; bool isAndroid = platform == "android"; std::unique_lock<std::mutex> lock(schedulerLock_); scheduler_.syncTime(); @@ -606,28 +607,33 @@ DhtProxyServer::put(const std::shared_ptr<restbed::Session>& session) auto vid = value->id; auto sPuts = puts_.emplace(infoHash, SearchPuts{}).first; auto r = sPuts->second.puts.emplace(vid, PermanentPut{}); + auto& pput = r.first->second; if (r.second) { - r.first->second.expireJob = scheduler_.add(timeout, [this, infoHash, vid]{ - std::cout << "Permanent put expired: " << infoHash << std::endl; + std::cout << "New ppush entry" << std::endl; + pput.expireJob = scheduler_.add(timeout, [this, infoHash, vid]{ + std::cout << "Permanent put expired: " << infoHash << " " << vid << std::endl; cancelPut(infoHash, vid); }); #if OPENDHT_PUSH_NOTIFICATIONS - if (not pushToken.empty()) - r.first->second.expireNotifyJob = scheduler_.add(timeout - proxy::OP_MARGIN, + if (not pushToken.empty()) { + std::cout << "Adding refresh job" << std::endl; + pput.expireNotifyJob = scheduler_.add(timeout - proxy::OP_MARGIN, [this, infoHash, vid, pushToken, clientId, isAndroid] { - std::cout << "Permanent put refresh: " << infoHash << std::endl; + std::cout << "Permanent put refresh: " << infoHash << " " << std::to_string(vid) << " " << clientId << std::endl; Json::Value json; json["timeout"] = infoHash.toString(); json["to"] = clientId; json["vid"] = std::to_string(vid); sendPushNotification(pushToken, json, isAndroid); }); + } #endif } else { - scheduler_.edit(r.first->second.expireJob, timeout); - if (r.first->second.expireNotifyJob) - scheduler_.edit(r.first->second.expireNotifyJob, timeout - proxy::OP_MARGIN); + std::cout << "Refreshing put jobs" << std::endl; + scheduler_.edit(pput.expireJob, timeout); + if (pput.expireNotifyJob) + scheduler_.edit(pput.expireNotifyJob, timeout - proxy::OP_MARGIN); } lock.unlock(); schedulerCv_.notify_one(); @@ -652,7 +658,8 @@ DhtProxyServer::put(const std::shared_ptr<restbed::Session>& session) } else { s->close(restbed::SERVICE_UNAVAILABLE, "{\"err\":\"Incorrect DhtRunner\"}"); } - } catch (...) { + } catch (const std::exception& e) { + std::cout << "Error performing put: " << e.what() << std::endl; s->close(restbed::INTERNAL_SERVER_ERROR, "{\"err\":\"Internal server error\"}"); } }