Skip to content
Snippets Groups Projects
Unverified Commit 5b71af3c authored by Sébastien Blin's avatar Sébastien Blin
Browse files

proxy: remove useless callback_id

callback_id was used for token problems which are not present anymore.
Now token is sufficient and we can remove callback_id.
parent fcf71118
Branches
Tags
No related merge requests found
......@@ -354,12 +354,11 @@ private:
* NOTE: empty by default to avoid to use services like FCM or APN.
*/
std::string deviceKey_ {};
std::atomic_uint callbackId_ {0};
const std::function<void()> loopSignal_;
#if OPENDHT_PUSH_NOTIFICATIONS
void fillBodyToGetToken(std::shared_ptr<restbed::Request> request, unsigned callbackId);
void fillBodyToGetToken(std::shared_ptr<restbed::Request> request, unsigned token = 0);
#endif // OPENDHT_PUSH_NOTIFICATIONS
bool isDestroying_ {false};
......
......@@ -183,24 +183,18 @@ private:
/**
* Subscribe to push notifications for an iOS or Android device.
* Method: SUBSCRIBE "/{InfoHash: .*}"
* Body: {"key": "device_key", (optional)"callback_id":y,
* (optional)"isAndroid":false (default true)}"
* Body: {"key": "device_key", (optional)"isAndroid":false (default true)}"
* Return: {"token": x}" where x if a token to save
* @note: the listen will timeout after six hours (and send a push notification).
* so you need to refresh the operation each six hours.
* @note: callback_id is used to add the possibility to have multiple listen
* on same hash for same device and must be > 0
* @param session
*/
void subscribe(const std::shared_ptr<restbed::Session>& session);
/**
* Unsubscribe to push notifications for an iOS or Android device.
* Method: UNSUBSCRIBE "/{InfoHash: .*}"
* Body: {"key": "device_key", "token": x, (optional)"callback_id":y"
* where x if the token to cancel
* Body: {"key": "device_key", "token": x} where x if the token to cancel
* Return: nothing
* @note: callback id is used to add the possibility to have multiple listen
* on same hash for same device
* @param session
*/
void unsubscribe(const std::shared_ptr<restbed::Session>& session);
......@@ -247,7 +241,7 @@ private:
std::map<std::string, PushListener> pushListeners_;
unsigned tokenPushNotif_ {0};
void cancelPushListen(const std::string& pushToken, const InfoHash& key, unsigned token, unsigned callbackId);
void cancelPushListen(const std::string& pushToken, const InfoHash& key, unsigned token);
#endif //OPENDHT_PUSH_NOTIFICATIONS
const std::string pushServer_;
......
......@@ -48,8 +48,8 @@ struct DhtProxyClient::Listener
Sp<bool> isCanceledViaClose;
Sp<unsigned> pushNotifToken; // NOTE: unused if not using push notifications
Sp<Scheduler::Job> refreshJob;
Listener(ValueCache&& c, const Sp<restbed::Request>& r, Value::Filter&& f, unsigned cid)
: cache(std::move(c)), filter(std::move(f)), req(r), callbackId(cid), isCanceledViaClose(std::make_shared<bool>(false))
Listener(ValueCache&& c, const Sp<restbed::Request>& r, Value::Filter&& f)
: cache(std::move(c)), filter(std::move(f)), req(r), isCanceledViaClose(std::make_shared<bool>(false))
{}
};
......@@ -599,9 +599,8 @@ DhtProxyClient::doListen(const InfoHash& key, ValueCallback cb, Value::Filter fi
}
auto token = ++listener_token_;
auto callbackId = ++callbackId_;
auto l = search->second.listeners.emplace(token, Listener{
ValueCache(cb), req, std::move(filter), callbackId
ValueCache(cb), req, std::move(filter)
}).first;
if (not l->second.cacheExpirationJob) {
l->second.cacheExpirationJob = scheduler.add(time_point::max(), [this, key, token]{
......@@ -645,7 +644,7 @@ DhtProxyClient::doListen(const InfoHash& key, ValueCallback cb, Value::Filter fi
}
#if OPENDHT_PUSH_NOTIFICATIONS
else
fillBodyToGetToken(req, callbackId);
fillBodyToGetToken(req);
#endif
struct State {
......@@ -746,7 +745,6 @@ DhtProxyClient::doCancelListen(const InfoHash& key, size_t ltoken)
body["key"] = deviceKey_;
body["client_id"] = pushClientId_;
body["token"] = std::to_string(ltoken);
body["callback_id"] = listener.callbackId;
Json::StreamWriterBuilder wbuilder;
wbuilder["commentStyle"] = "None";
wbuilder["indentation"] = "";
......@@ -914,10 +912,9 @@ DhtProxyClient::resubscribe(const InfoHash& key, Listener& listener)
listener.thread.join();
listener.req = req;
listener.pushNotifToken = pushNotifToken;
auto callbackId = listener.callbackId;
listener.thread = std::thread([=]()
{
fillBodyToGetToken(req, callbackId);
fillBodyToGetToken(req);
auto settings = std::make_shared<restbed::Settings>();
auto ok = std::make_shared<std::atomic_bool>(true);
restbed::Http::async(req,
......@@ -953,17 +950,18 @@ DhtProxyClient::resubscribe(const InfoHash& key, Listener& listener)
#if OPENDHT_PUSH_NOTIFICATIONS
void
DhtProxyClient::fillBodyToGetToken(std::shared_ptr<restbed::Request> req, unsigned callbackId)
DhtProxyClient::fillBodyToGetToken(std::shared_ptr<restbed::Request> req, unsigned token)
{
// Fill body with
// {
// "key":"device_key",
// "callback_id": xxx
// "token": xxx
// }
Json::Value body;
body["key"] = deviceKey_;
body["client_id"] = pushClientId_;
body["callback_id"] = callbackId;
if (token > 0)
body["token"] = token;
#ifdef __ANDROID__
body["platform"] = "android";
#endif
......
......@@ -305,7 +305,6 @@ DhtProxyServer::listen(const std::shared_ptr<restbed::Session>& session)
struct DhtProxyServer::Listener {
unsigned token;
unsigned callbackId {0};
std::future<size_t> internalToken;
Sp<Scheduler::Job> expireJob;
};
......@@ -342,7 +341,7 @@ DhtProxyServer::subscribe(const std::shared_ptr<restbed::Session>& session)
}
auto pushToken = root["key"].asString();
if (pushToken.empty()) return;
auto callbackId = root.isMember("callback_id") ? root["callback_id"].asLargestUInt() : 0;
auto tokenFromReq = root.isMember("token") ? root["token"].asLargestUInt() : 0;
auto platform = root["platform"].asString();
auto isAndroid = platform == "android";
auto clientId = root.isMember("client_id") ? root["client_id"].asString() : std::string();
......@@ -351,10 +350,11 @@ DhtProxyServer::subscribe(const std::shared_ptr<restbed::Session>& session)
{
std::lock_guard<std::mutex> lock(lockListener_);
// Check if listener is already present and refresh timeout if launched
auto pushListener = pushListeners_.emplace(pushToken, PushListener{}).first;
auto listeners = pushListener->second.listeners.emplace(infoHash, std::vector<Listener>{}).first;
for (auto& listener: listeners->second) {
if (listener.callbackId == callbackId) {
if (listener.token == tokenFromReq) {
{
std::lock_guard<std::mutex> l(schedulerLock_);
scheduler_.edit(listener.expireJob, scheduler_.time() + OP_TIMEOUT);
......@@ -371,14 +371,10 @@ DhtProxyServer::subscribe(const std::shared_ptr<restbed::Session>& session)
token = tokenPushNotif_;
Listener listener;
listener.token = token;
listener.callbackId = callbackId;
listener.internalToken = dht_->listen(infoHash,
[this, pushToken, callbackId, token, isAndroid, clientId](std::vector<std::shared_ptr<Value>> /*value*/) {
[this, pushToken, token, isAndroid, clientId](std::vector<std::shared_ptr<Value>> /*value*/) {
// Build message content.
Json::Value json;
if (callbackId > 0) {
json["callback_id"] = callbackId;
}
json["to"] = clientId;
json["token"] = token;
sendPushNotification(pushToken, json, isAndroid);
......@@ -387,14 +383,11 @@ DhtProxyServer::subscribe(const std::shared_ptr<restbed::Session>& session)
);
std::lock_guard<std::mutex> l(schedulerLock_);
listener.expireJob = scheduler_.add(scheduler_.time() + OP_TIMEOUT,
[this, pushToken, infoHash, token, callbackId, clientId, isAndroid] {
cancelPushListen(pushToken, infoHash, token, callbackId);
[this, pushToken, infoHash, token, clientId, isAndroid] {
cancelPushListen(pushToken, infoHash, token);
// Send a push notification to inform the client that this listen has timeout
Json::Value json;
json["timeout"] = infoHash.toString();
if (callbackId > 0) {
json["callback_id"] = callbackId;
}
json["to"] = clientId;
json["token"] = token;
sendPushNotification(pushToken, json, isAndroid);
......@@ -413,6 +406,7 @@ DhtProxyServer::subscribe(const std::shared_ptr<restbed::Session>& session)
void
DhtProxyServer::unsubscribe(const std::shared_ptr<restbed::Session>& session)
{
requestNum_++;
const auto request = session->get_request();
int content_length = std::stoi(request->get_header("Content-Length", "0"));
......@@ -439,9 +433,8 @@ DhtProxyServer::unsubscribe(const std::shared_ptr<restbed::Session>& session)
if (pushToken.empty()) return;
auto token = std::stoull(root["token"].asString());
if (token == 0) return;
auto callbackId = root.isMember("callback_id") ? root["callback_id"].asLargestUInt() : 0;
cancelPushListen(pushToken, infoHash, token, callbackId);
cancelPushListen(pushToken, infoHash, token);
} catch (...) {
// do nothing
}
......@@ -450,7 +443,7 @@ DhtProxyServer::unsubscribe(const std::shared_ptr<restbed::Session>& session)
}
void
DhtProxyServer::cancelPushListen(const std::string& pushToken, const dht::InfoHash& key, unsigned token, unsigned callbackId)
DhtProxyServer::cancelPushListen(const std::string& pushToken, const dht::InfoHash& key, unsigned token)
{
std::lock_guard<std::mutex> lock(lockListener_);
auto pushListener = pushListeners_.find(pushToken);
......@@ -460,8 +453,7 @@ DhtProxyServer::cancelPushListen(const std::string& pushToken, const dht::InfoHa
if (listeners == pushListener->second.listeners.end())
return;
for (auto listener = listeners->second.begin(); listener != listeners->second.end();) {
if (listener->token == token && listener->callbackId == callbackId) {
std::cout << "cancelPushListen " << key << " token:" << token << " cid:" << callbackId << std::endl;
if (listener->token == token) {
if (dht_)
dht_->cancelListen(key, std::move(listener->internalToken));
listener = listeners->second.erase(listener);
......@@ -510,6 +502,7 @@ DhtProxyServer::sendPushNotification(const std::string& token, const Json::Value
req->set_header("Host", pushServer_);
req->set_header("Content-Length", std::to_string(valueStr.length()));
req->set_body(valueStr);
// Send request.
restbed::Http::async(req, {});
}
......
......@@ -73,7 +73,7 @@ void print_help() {
#if OPENDHT_PUSH_NOTIFICATIONS
<< " stt [server_address] <device_key> Start the proxy client." << std::endl
<< " rs [token] Resubscribe to opendht." << std::endl
<< " rp [push_notification] Inject a push notification in Opendht." << std::endl
<< " rp [token] Inject a push notification in Opendht." << std::endl
#else
<< " stt [server_address] Start the proxy client." << std::endl
#endif // OPENDHT_PUSH_NOTIFICATIONS
......@@ -237,7 +237,7 @@ void cmd_loop(std::shared_ptr<DhtRunner>& dht, dht_params& params
#if OPENDHT_PUSH_NOTIFICATIONS
else if (op == "rp") {
iss >> value;
dht->pushNotificationReceived({{"token", value}});
dht->pushNotificationReceived({{"to", "dhtnode"}, {"token", value}});
continue;
}
#endif // OPENDHT_PUSH_NOTIFICATIONS
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment