Commit 044ac764 authored by Adrien Béraud's avatar Adrien Béraud

message engine: retry on peer online

Change-Id: Ic769a3928f84efc3bc83188766292e720b3061df
parent be41645a
......@@ -34,7 +34,6 @@ namespace ring {
namespace im {
static std::uniform_int_distribution<MessageToken> udist {1};
const std::chrono::minutes MessageEngine::RETRY_PERIOD = std::chrono::minutes(1);
MessageEngine::MessageEngine(SIPAccountBase& acc, const std::string& path) : account_(acc), savePath_(path)
{}
......@@ -47,50 +46,29 @@ MessageEngine::sendMessage(const std::string& to, const std::map<std::string, st
MessageToken token;
{
std::lock_guard<std::mutex> lock(messagesMutex_);
auto& peerMessages = messages_[to];
do {
token = udist(account_.rand);
} while (messages_.find(token) != messages_.end());
auto m = messages_.emplace(token, Message{});
} while (peerMessages.find(token) != peerMessages.end());
auto m = peerMessages.emplace(token, Message{});
m.first->second.to = to;
m.first->second.payloads = payloads;
save_();
}
runOnMainThread([this]() {
retrySend();
runOnMainThread([this, to]() {
retrySend(to);
});
return token;
}
void
MessageEngine::reschedule()
MessageEngine::onPeerOnline(const std::string& peer)
{
if (messages_.empty())
return;
std::weak_ptr<Account> w = std::static_pointer_cast<Account>(account_.shared_from_this());
auto next = nextEvent();
if (next != clock::time_point::max())
Manager::instance().scheduleTask([w,this](){
if (auto s = w.lock())
retrySend();
}, next);
}
MessageEngine::clock::time_point
MessageEngine::nextEvent() const
{
auto next = clock::time_point::max();
for (const auto& m : messages_) {
if (m.second.status == MessageStatus::UNKNOWN || m.second.status == MessageStatus::IDLE) {
auto next_op = m.second.last_op + RETRY_PERIOD;
if (next_op < next)
next = next_op;
}
}
return next;
retrySend(peer);
}
void
MessageEngine::retrySend()
MessageEngine::retrySend(const std::string& peer)
{
struct PendingMsg {
MessageToken token;
......@@ -101,20 +79,21 @@ MessageEngine::retrySend()
{
std::lock_guard<std::mutex> lock(messagesMutex_);
auto now = clock::now();
for (auto m = messages_.begin(); m != messages_.end(); ++m) {
auto p = messages_.find(peer);
if (p == messages_.end())
return;
auto& messages = p->second;
for (auto m = messages.begin(); m != messages.end(); ++m) {
if (m->second.status == MessageStatus::UNKNOWN || m->second.status == MessageStatus::IDLE) {
auto next_op = m->second.last_op + RETRY_PERIOD;
if (next_op <= now) {
m->second.status = MessageStatus::SENDING;
m->second.retried++;
m->second.last_op = clock::now();
pending.emplace_back(PendingMsg {m->first, m->second.to, m->second.payloads});
}
m->second.status = MessageStatus::SENDING;
m->second.retried++;
m->second.last_op = clock::now();
pending.emplace_back(PendingMsg {m->first, m->second.to, m->second.payloads});
}
}
}
// avoid locking while calling callback
for (auto& p : pending) {
for (const auto& p : pending) {
RING_DBG() << "[message " << p.token << "] Retry sending";
emitSignal<DRing::ConfigurationSignal::AccountMessageStatusChanged>(
account_.getAccountID(),
......@@ -129,17 +108,26 @@ MessageStatus
MessageEngine::getStatus(MessageToken t) const
{
std::lock_guard<std::mutex> lock(messagesMutex_);
const auto m = messages_.find(t);
return (m == messages_.end()) ? MessageStatus::UNKNOWN : m->second.status;
for (const auto& p : messages_) {
const auto m = p.second.find(t);
if (m != p.second.end())
return m->second.status;
}
return MessageStatus::UNKNOWN;
}
void
MessageEngine::onMessageSent(MessageToken token, bool ok)
MessageEngine::onMessageSent(const std::string& peer, MessageToken token, bool ok)
{
RING_DBG() << "[message " << token << "] Message sent: " << (ok ? "success" : "failure");
std::lock_guard<std::mutex> lock(messagesMutex_);
auto f = messages_.find(token);
if (f != messages_.end()) {
auto p = messages_.find(peer);
if (p == messages_.end()) {
RING_DBG() << "[message " << token << "] Can't find peer";
return;
}
auto f = p->second.find(token);
if (f != p->second.end()) {
if (f->second.status == MessageStatus::SENDING) {
if (ok) {
f->second.status = MessageStatus::SENT;
......@@ -160,7 +148,6 @@ MessageEngine::onMessageSent(MessageToken token, bool ok)
} else {
f->second.status = MessageStatus::IDLE;
RING_DBG() << "[message " << token << "] Status changed to IDLE";
reschedule();
}
} else {
RING_DBG() << "[message " << token << "] State is not SENDING";
......@@ -185,27 +172,31 @@ MessageEngine::load()
std::lock_guard<std::mutex> lock(messagesMutex_);
long unsigned loaded {0};
for (auto i = root.begin(); i != root.end(); ++i) {
const auto& jmsg = *i;
MessageToken token;
std::istringstream iss(i.key().asString());
iss >> std::hex >> token;
Message msg;
msg.status = (MessageStatus)jmsg["status"].asInt();
msg.to = jmsg["to"].asString();
auto wall_time = std::chrono::system_clock::from_time_t(jmsg["last_op"].asInt64());
msg.last_op = clock::now() + (wall_time - std::chrono::system_clock::now());
msg.retried = jmsg.get("retried", 0).asUInt();
const auto& pl = jmsg["payload"];
for (auto p = pl.begin(); p != pl.end(); ++p)
msg.payloads[p.key().asString()] = p->asString();
messages_.emplace(token, std::move(msg));
loaded++;
auto to = i.key().asString();
auto& pmessages = *i;
auto& p = messages_[to];
for (auto m = pmessages.begin(); m != pmessages.end(); ++m) {
const auto& jmsg = *m;
MessageToken token;
std::istringstream iss(i.key().asString());
iss >> std::hex >> token;
Message msg;
msg.status = (MessageStatus)jmsg["status"].asInt();
msg.to = jmsg["to"].asString();
auto wall_time = std::chrono::system_clock::from_time_t(jmsg["last_op"].asInt64());
msg.last_op = clock::now() + (wall_time - std::chrono::system_clock::now());
msg.retried = jmsg.get("retried", 0).asUInt();
const auto& pl = jmsg["payload"];
for (auto p = pl.begin(); p != pl.end(); ++p)
msg.payloads[p.key().asString()] = p->asString();
p.emplace(token, std::move(msg));
loaded++;
}
}
RING_DBG("[Account %s] loaded %lu messages from %s", account_.getAccountID().c_str(), loaded, savePath_.c_str());
} catch (const std::exception& e) {
RING_ERR("[Account %s] couldn't load messages from %s: %s", account_.getAccountID().c_str(), savePath_.c_str(), e.what());
}
reschedule();
}
void
......@@ -221,21 +212,25 @@ MessageEngine::save_() const
try {
Json::Value root(Json::objectValue);
for (auto& c : messages_) {
auto& v = c.second;
if (v.status == MessageStatus::FAILURE || v.status == MessageStatus::SENT)
continue;
Json::Value msg;
std::ostringstream msgsId;
msgsId << std::hex << c.first;
msg["status"] = (int)(v.status == MessageStatus::SENDING ? MessageStatus::IDLE : v.status);
msg["to"] = v.to;
auto wall_time = std::chrono::system_clock::now() + std::chrono::duration_cast<std::chrono::system_clock::duration>(v.last_op - clock::now());
msg["last_op"] = (Json::Value::Int64) std::chrono::system_clock::to_time_t(wall_time);
msg["retried"] = v.retried;
auto& payloads = msg["payload"];
for (const auto& p : v.payloads)
payloads[p.first] = p.second;
root[msgsId.str()] = std::move(msg);
Json::Value peerRoot(Json::objectValue);
for (auto& m : c.second) {
auto& v = m.second;
if (v.status == MessageStatus::FAILURE || v.status == MessageStatus::SENT)
continue;
Json::Value msg;
std::ostringstream msgsId;
msgsId << std::hex << c.first;
msg["status"] = (int)(v.status == MessageStatus::SENDING ? MessageStatus::IDLE : v.status);
msg["to"] = v.to;
auto wall_time = std::chrono::system_clock::now() + std::chrono::duration_cast<std::chrono::system_clock::duration>(v.last_op - clock::now());
msg["last_op"] = (Json::Value::Int64) std::chrono::system_clock::to_time_t(wall_time);
msg["retried"] = v.retried;
auto& payloads = msg["payload"];
for (const auto& p : v.payloads)
payloads[p.first] = p.second;
peerRoot[msgsId.str()] = std::move(msg);
}
root[c.first] = std::move(peerRoot);
}
// Save asynchronously
ThreadPool::instance().run([path = savePath_,
......
......@@ -21,6 +21,7 @@
#include <string>
#include <map>
#include <set>
#include <chrono>
#include <mutex>
#include <cstdint>
......@@ -56,7 +57,8 @@ public:
return getStatus(t) == MessageStatus::SENT;
}
void onMessageSent(MessageToken t, bool success);
void onMessageSent(const std::string& peer, MessageToken t, bool success);
void onPeerOnline(const std::string& peer);
/**
* Load persisted messages
......@@ -70,13 +72,11 @@ public:
private:
static const constexpr unsigned MAX_RETRIES = 1;
static const constexpr unsigned MAX_RETRIES = 20;
static const std::chrono::minutes RETRY_PERIOD;
using clock = std::chrono::steady_clock;
clock::time_point nextEvent() const;
void retrySend();
void reschedule();
void retrySend(const std::string& peer);
void save_() const;
struct Message {
......@@ -90,7 +90,9 @@ private:
SIPAccountBase& account_;
const std::string savePath_;
std::map<MessageToken, Message> messages_;
std::map<std::string, std::map<MessageToken, Message>> messages_;
std::set<MessageToken> sentMessages_;
mutable std::mutex messagesMutex_ {};
};
......
......@@ -2043,7 +2043,9 @@ void
RingAccount::onTrackedBuddyOnline(const dht::InfoHash& contactId)
{
RING_DBG("Buddy %s online", contactId.toString().c_str());
emitSignal<DRing::PresenceSignal::NewBuddyNotification>(getAccountID(), contactId.toString(), 1, "");
std::string id(contactId.toString());
emitSignal<DRing::PresenceSignal::NewBuddyNotification>(getAccountID(), id, 1, "");
messageEngine_.onPeerOnline(id);
}
void
......@@ -3360,29 +3362,39 @@ RingAccount::forEachDevice(const dht::InfoHash& to,
});
}
void
RingAccount::sendTextMessage(const std::string& to, const std::map<std::string, std::string>& payloads, uint64_t token)
uint64_t
RingAccount::sendTextMessage(const std::string& to, const std::map<std::string, std::string>& payloads)
{
if (to.empty() or payloads.empty()) {
messageEngine_.onMessageSent(token, false);
return;
std::string toUri;
try {
toUri = parseRingUri(to);
} catch (...) {
RING_ERR("Failed to send a text message due to an invalid URI %s", to.c_str());
return 0;
}
if (payloads.size() != 1) {
// Multi-part message
// TODO: not supported yet
RING_ERR("Multi-part im is not supported yet by RingAccount");
messageEngine_.onMessageSent(token, false);
return;
return 0;
}
return SIPAccountBase::sendTextMessage(toUri, payloads);
}
void
RingAccount::sendTextMessage(const std::string& to, const std::map<std::string, std::string>& payloads, uint64_t token)
{
std::string toUri;
try {
toUri = parseRingUri(to);
}
catch (...) {
} catch (...) {
RING_ERR("Failed to send a text message due to an invalid URI %s", to.c_str());
messageEngine_.onMessageSent(token, false);
messageEngine_.onMessageSent(to, token, false);
return;
}
if (payloads.size() != 1) {
// Multi-part message
// TODO: not supported yet
RING_ERR("Multi-part im is not supported yet by RingAccount");
messageEngine_.onMessageSent(toUri, token, false);
return;
}
......@@ -3396,7 +3408,7 @@ RingAccount::sendTextMessage(const std::string& to, const std::map<std::string,
auto confirm = std::make_shared<PendingConfirmation>();
// Find listening devices for this account
forEachDevice(toH, [confirm,token,payloads,now](const std::shared_ptr<RingAccount>& this_, const dht::InfoHash& dev)
forEachDevice(toH, [confirm,to,token,payloads,now](const std::shared_ptr<RingAccount>& this_, const dht::InfoHash& dev)
{
{
std::lock_guard<std::mutex> lock(this_->messageMutex_);
......@@ -3406,7 +3418,7 @@ RingAccount::sendTextMessage(const std::string& to, const std::map<std::string,
auto h = dht::InfoHash::get("inbox:"+dev.toString());
std::weak_ptr<RingAccount> w = this_;
auto list_token = this_->dht_.listen<dht::ImMessage>(h, [w,token,confirm](dht::ImMessage&& msg) {
auto list_token = this_->dht_.listen<dht::ImMessage>(h, [to, w, token, confirm](dht::ImMessage&& msg) {
if (auto sthis = w.lock()) {
auto& this_ = *sthis;
// check expected message confirmation
......@@ -3435,14 +3447,14 @@ RingAccount::sendTextMessage(const std::string& to, const std::map<std::string,
this_.dht_.cancelListen(t.first, t.second.get());
confirm->listenTokens.clear();
confirm->replied = true;
this_.messageEngine_.onMessageSent(token, true);
this_.messageEngine_.onMessageSent(to, token, true);
}
return false;
});
confirm->listenTokens.emplace(h, std::move(list_token));
this_->dht_.putEncrypted(h, dev,
dht::ImMessage(token, std::string(payloads.begin()->first), std::string(payloads.begin()->second), now),
[w,token,confirm,h](bool ok) {
[w,to,token,confirm,h](bool ok) {
if (auto this_ = w.lock()) {
RING_DBG() << "[Account " << this_->getAccountID() << "] [message " << token << "] Put encrypted " << (ok ? "ok" : "failed");
if (not ok) {
......@@ -3452,20 +3464,20 @@ RingAccount::sendTextMessage(const std::string& to, const std::map<std::string,
confirm->listenTokens.erase(lt);
}
if (confirm->listenTokens.empty() and not confirm->replied)
this_->messageEngine_.onMessageSent(token, false);
this_->messageEngine_.onMessageSent(to, token, false);
}
}
});
RING_DBG() << "[Account " << this_->getAccountID() << "] [message " << token << "] Sending message for device " << dev.toString();
}, [token](const std::shared_ptr<RingAccount>& shared, bool ok) {
}, [to, token](const std::shared_ptr<RingAccount>& shared, bool ok) {
if (not ok) {
shared->messageEngine_.onMessageSent(token, false);
shared->messageEngine_.onMessageSent(to, token, false);
}
});
// Timeout cleanup
Manager::instance().scheduleTask([w=weak(), confirm, token]() {
Manager::instance().scheduleTask([w=weak(), confirm, to, token]() {
if (not confirm->replied) {
if (auto this_ = w.lock()) {
RING_DBG() << "[Account " << this_->getAccountID() << "] [message " << token << "] Timeout";
......@@ -3473,7 +3485,7 @@ RingAccount::sendTextMessage(const std::string& to, const std::map<std::string,
this_->dht_.cancelListen(t.first, t.second.get());
confirm->listenTokens.clear();
confirm->replied = true;
this_->messageEngine_.onMessageSent(token, false);
this_->messageEngine_.onMessageSent(to, token, false);
}
}
}, std::chrono::steady_clock::now() + std::chrono::minutes(1));
......
......@@ -310,6 +310,7 @@ class RingAccount : public SIPAccountBase {
void sendTrustRequest(const std::string& to, const std::vector<uint8_t>& payload);
void sendTrustRequestConfirm(const dht::InfoHash& to);
virtual void sendTextMessage(const std::string& to, const std::map<std::string, std::string>& payloads, uint64_t id) override;
virtual uint64_t sendTextMessage(const std::string& to, const std::map<std::string, std::string>& payloads) override;
/* Devices */
void addDevice(const std::string& password);
......
......@@ -2068,7 +2068,7 @@ SIPAccount::sendTextMessage(const std::string& to, const std::map<std::string, s
{
if (to.empty() or payloads.empty()) {
RING_WARN("No sender or payload");
messageEngine_.onMessageSent(id, false);
messageEngine_.onMessageSent(to, id, false);
return;
}
......@@ -2086,7 +2086,7 @@ SIPAccount::sendTextMessage(const std::string& to, const std::map<std::string, s
nullptr, &tdata);
if (status != PJ_SUCCESS) {
RING_ERR("Unable to create request: %s", sip_utils::sip_strerror(status).c_str());
messageEngine_.onMessageSent(id, false);
messageEngine_.onMessageSent(to, id, false);
return;
}
......@@ -2097,17 +2097,19 @@ SIPAccount::sendTextMessage(const std::string& to, const std::map<std::string, s
struct ctx {
std::weak_ptr<SIPAccount> acc;
std::string to;
uint64_t id;
};
ctx* t = new ctx;
t->acc = shared();
t->to = to;
t->id = id;
status = pjsip_endpt_send_request(link_->getEndpoint(), tdata, -1, t, [](void *token, pjsip_event *e) {
auto c = (ctx*) token;
try {
if (auto acc = c->acc.lock()) {
acc->messageEngine_.onMessageSent(c->id, e
acc->messageEngine_.onMessageSent(c->to, c->id, e
&& e->body.tsx_state.tsx
&& e->body.tsx_state.tsx->status_code == PJSIP_SC_OK);
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment