Skip to content
Snippets Groups Projects
Unverified Commit f2b692c7 authored by Adrien Béraud's avatar Adrien Béraud Committed by GitHub
Browse files

Merge pull request #245 from AmarOk1412/fixCancelListen

dhtproxy: fix cancelListen with pushNotifications
parents 2af70e6d 22ef53ac
No related branches found
No related tags found
No related merge requests found
...@@ -296,6 +296,7 @@ private: ...@@ -296,6 +296,7 @@ private:
GetCallback cb; GetCallback cb;
Value::Filter filterChain; Value::Filter filterChain;
std::thread thread; std::thread thread;
unsigned callbackId;
std::shared_ptr<unsigned> pushNotifToken; // NOTE: unused if not using push notifications std::shared_ptr<unsigned> pushNotifToken; // NOTE: unused if not using push notifications
}; };
std::vector<Listener> listeners_; std::vector<Listener> listeners_;
...@@ -344,7 +345,7 @@ private: ...@@ -344,7 +345,7 @@ private:
const std::function<void()> loopSignal_; const std::function<void()> loopSignal_;
#if OPENDHT_PUSH_NOTIFICATIONS #if OPENDHT_PUSH_NOTIFICATIONS
void fillBodyToGetToken(std::shared_ptr<restbed::Request> request); void fillBodyToGetToken(std::shared_ptr<restbed::Request> request, unsigned callbackId);
#endif // OPENDHT_PUSH_NOTIFICATIONS #endif // OPENDHT_PUSH_NOTIFICATIONS
}; };
......
...@@ -436,12 +436,20 @@ DhtProxyClient::listen(const InfoHash& key, GetCallback cb, Value::Filter filter ...@@ -436,12 +436,20 @@ DhtProxyClient::listen(const InfoHash& key, GetCallback cb, Value::Filter filter
auto filterChain = filter.chain(query.where.getFilter()); auto filterChain = filter.chain(query.where.getFilter());
auto pushNotifToken = std::make_shared<unsigned>(0); auto pushNotifToken = std::make_shared<unsigned>(0);
unsigned callbackId = 0;
{
std::lock_guard<std::mutex> lock(lockCallback_);
callbackId_ += 1;
callbackId = callbackId_;
}
Listener l; Listener l;
++listener_token_; ++listener_token_;
l.key = key.toString(); l.key = key.toString();
l.token = listener_token_; l.token = listener_token_;
l.req = req; l.req = req;
l.cb = cb; l.cb = cb;
l.callbackId = callbackId;
l.pushNotifToken = pushNotifToken; l.pushNotifToken = pushNotifToken;
l.filterChain = std::move(filterChain); l.filterChain = std::move(filterChain);
l.thread = std::thread([=]() l.thread = std::thread([=]()
...@@ -453,7 +461,7 @@ DhtProxyClient::listen(const InfoHash& key, GetCallback cb, Value::Filter filter ...@@ -453,7 +461,7 @@ DhtProxyClient::listen(const InfoHash& key, GetCallback cb, Value::Filter filter
} }
#if OPENDHT_PUSH_NOTIFICATIONS #if OPENDHT_PUSH_NOTIFICATIONS
else else
fillBodyToGetToken(req); fillBodyToGetToken(req, callbackId);
#endif #endif
struct State { struct State {
...@@ -546,6 +554,20 @@ DhtProxyClient::cancelListen(const InfoHash&, size_t token) ...@@ -546,6 +554,20 @@ DhtProxyClient::cancelListen(const InfoHash&, size_t token)
restbed::Uri uri(HTTP_PROTO + serverHost_ + "/" + listener.key); restbed::Uri uri(HTTP_PROTO + serverHost_ + "/" + listener.key);
auto req = std::make_shared<restbed::Request>(uri); auto req = std::make_shared<restbed::Request>(uri);
req->set_method("UNSUBSCRIBE"); req->set_method("UNSUBSCRIBE");
// fill request body
Json::Value body;
body["key"] = deviceKey_;
body["client_id"] = pushClientId_;
body["token"] = std::to_string(token);
body["callback_id"] = listener.callbackId;
Json::StreamWriterBuilder wbuilder;
wbuilder["commentStyle"] = "None";
wbuilder["indentation"] = "";
auto content = Json::writeString(wbuilder, body) + "\n";
std::replace(content.begin(), content.end(), '\n', ' ');
req->set_body(content);
req->set_header("Content-Length", std::to_string(content.size()));
restbed::Http::async(req, restbed::Http::async(req,
[](const std::shared_ptr<restbed::Request>&, [](const std::shared_ptr<restbed::Request>&,
const std::shared_ptr<restbed::Response>&){} const std::shared_ptr<restbed::Response>&){}
...@@ -701,9 +723,10 @@ DhtProxyClient::resubscribe(const unsigned token) ...@@ -701,9 +723,10 @@ DhtProxyClient::resubscribe(const unsigned token)
listener.thread.join(); listener.thread.join();
listener.req = req; listener.req = req;
listener.pushNotifToken = pushNotifToken; listener.pushNotifToken = pushNotifToken;
auto callbackId = listener.callbackId;
listener.thread = std::thread([=]() listener.thread = std::thread([=]()
{ {
fillBodyToGetToken(req); fillBodyToGetToken(req, callbackId);
auto settings = std::make_shared<restbed::Settings>(); auto settings = std::make_shared<restbed::Settings>();
auto ok = std::make_shared<std::atomic_bool>(true); auto ok = std::make_shared<std::atomic_bool>(true);
restbed::Http::async(req, restbed::Http::async(req,
...@@ -741,7 +764,7 @@ DhtProxyClient::resubscribe(const unsigned token) ...@@ -741,7 +764,7 @@ DhtProxyClient::resubscribe(const unsigned token)
#if OPENDHT_PUSH_NOTIFICATIONS #if OPENDHT_PUSH_NOTIFICATIONS
void void
DhtProxyClient::fillBodyToGetToken(std::shared_ptr<restbed::Request> req) DhtProxyClient::fillBodyToGetToken(std::shared_ptr<restbed::Request> req, unsigned callbackId)
{ {
// Fill body with // Fill body with
// { // {
...@@ -751,11 +774,7 @@ DhtProxyClient::fillBodyToGetToken(std::shared_ptr<restbed::Request> req) ...@@ -751,11 +774,7 @@ DhtProxyClient::fillBodyToGetToken(std::shared_ptr<restbed::Request> req)
Json::Value body; Json::Value body;
body["key"] = deviceKey_; body["key"] = deviceKey_;
body["client_id"] = pushClientId_; body["client_id"] = pushClientId_;
{ body["callback_id"] = callbackId;
std::lock_guard<std::mutex> lock(lockCallback_);
callbackId_ += 1;
body["callback_id"] = callbackId_;
}
#ifdef __ANDROID__ #ifdef __ANDROID__
body["platform"] = "android"; body["platform"] = "android";
#endif #endif
......
...@@ -274,9 +274,9 @@ void cmd_loop(std::shared_ptr<DhtRunner>& dht, dht_params& params ...@@ -274,9 +274,9 @@ void cmd_loop(std::shared_ptr<DhtRunner>& dht, dht_params& params
} catch (std::invalid_argument& e) { std::cout << e.what() << std::endl; } } catch (std::invalid_argument& e) { std::cout << e.what() << std::endl; }
} }
} else if (op == "cl") { } else if (op == "cl") {
std::string rem; std::string hash, rem;
iss >> rem; iss >> hash >> rem;
dht->cancelListen(id, std::stoul(rem)); dht->cancelListen(dht::InfoHash(hash), std::stoul(rem));
} }
else { else {
// Dht syntax // Dht syntax
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment