diff --git a/src/im/message_engine.cpp b/src/im/message_engine.cpp index 27ddfd19757f8d3de3bef1b50847f90a91625ae6..7822cd370f8a44ccb73dabc15afd205404db6c33 100644 --- a/src/im/message_engine.cpp +++ b/src/im/message_engine.cpp @@ -34,7 +34,6 @@ namespace ring { namespace im { static std::uniform_int_distribution<MessageToken> udist {1}; -const std::chrono::minutes MessageEngine::RETRY_PERIOD = std::chrono::minutes(1); MessageEngine::MessageEngine(SIPAccountBase& acc, const std::string& path) : account_(acc), savePath_(path) {} @@ -47,50 +46,29 @@ MessageEngine::sendMessage(const std::string& to, const std::map<std::string, st MessageToken token; { std::lock_guard<std::mutex> lock(messagesMutex_); + auto& peerMessages = messages_[to]; do { token = udist(account_.rand); - } while (messages_.find(token) != messages_.end()); - auto m = messages_.emplace(token, Message{}); + } while (peerMessages.find(token) != peerMessages.end()); + auto m = peerMessages.emplace(token, Message{}); m.first->second.to = to; m.first->second.payloads = payloads; save_(); } - runOnMainThread([this]() { - retrySend(); + runOnMainThread([this, to]() { + retrySend(to); }); return token; } void -MessageEngine::reschedule() +MessageEngine::onPeerOnline(const std::string& peer) { - if (messages_.empty()) - return; - std::weak_ptr<Account> w = std::static_pointer_cast<Account>(account_.shared_from_this()); - auto next = nextEvent(); - if (next != clock::time_point::max()) - Manager::instance().scheduleTask([w,this](){ - if (auto s = w.lock()) - retrySend(); - }, next); -} - -MessageEngine::clock::time_point -MessageEngine::nextEvent() const -{ - auto next = clock::time_point::max(); - for (const auto& m : messages_) { - if (m.second.status == MessageStatus::UNKNOWN || m.second.status == MessageStatus::IDLE) { - auto next_op = m.second.last_op + RETRY_PERIOD; - if (next_op < next) - next = next_op; - } - } - return next; + retrySend(peer); } void -MessageEngine::retrySend() +MessageEngine::retrySend(const std::string& peer) { struct PendingMsg { MessageToken token; @@ -101,20 +79,21 @@ MessageEngine::retrySend() { std::lock_guard<std::mutex> lock(messagesMutex_); auto now = clock::now(); - for (auto m = messages_.begin(); m != messages_.end(); ++m) { + auto p = messages_.find(peer); + if (p == messages_.end()) + return; + auto& messages = p->second; + for (auto m = messages.begin(); m != messages.end(); ++m) { if (m->second.status == MessageStatus::UNKNOWN || m->second.status == MessageStatus::IDLE) { - auto next_op = m->second.last_op + RETRY_PERIOD; - if (next_op <= now) { - m->second.status = MessageStatus::SENDING; - m->second.retried++; - m->second.last_op = clock::now(); - pending.emplace_back(PendingMsg {m->first, m->second.to, m->second.payloads}); - } + m->second.status = MessageStatus::SENDING; + m->second.retried++; + m->second.last_op = clock::now(); + pending.emplace_back(PendingMsg {m->first, m->second.to, m->second.payloads}); } } } // avoid locking while calling callback - for (auto& p : pending) { + for (const auto& p : pending) { RING_DBG() << "[message " << p.token << "] Retry sending"; emitSignal<DRing::ConfigurationSignal::AccountMessageStatusChanged>( account_.getAccountID(), @@ -129,17 +108,26 @@ MessageStatus MessageEngine::getStatus(MessageToken t) const { std::lock_guard<std::mutex> lock(messagesMutex_); - const auto m = messages_.find(t); - return (m == messages_.end()) ? MessageStatus::UNKNOWN : m->second.status; + for (const auto& p : messages_) { + const auto m = p.second.find(t); + if (m != p.second.end()) + return m->second.status; + } + return MessageStatus::UNKNOWN; } void -MessageEngine::onMessageSent(MessageToken token, bool ok) +MessageEngine::onMessageSent(const std::string& peer, MessageToken token, bool ok) { RING_DBG() << "[message " << token << "] Message sent: " << (ok ? "success" : "failure"); std::lock_guard<std::mutex> lock(messagesMutex_); - auto f = messages_.find(token); - if (f != messages_.end()) { + auto p = messages_.find(peer); + if (p == messages_.end()) { + RING_DBG() << "[message " << token << "] Can't find peer"; + return; + } + auto f = p->second.find(token); + if (f != p->second.end()) { if (f->second.status == MessageStatus::SENDING) { if (ok) { f->second.status = MessageStatus::SENT; @@ -160,7 +148,6 @@ MessageEngine::onMessageSent(MessageToken token, bool ok) } else { f->second.status = MessageStatus::IDLE; RING_DBG() << "[message " << token << "] Status changed to IDLE"; - reschedule(); } } else { RING_DBG() << "[message " << token << "] State is not SENDING"; @@ -185,27 +172,31 @@ MessageEngine::load() std::lock_guard<std::mutex> lock(messagesMutex_); long unsigned loaded {0}; for (auto i = root.begin(); i != root.end(); ++i) { - const auto& jmsg = *i; - MessageToken token; - std::istringstream iss(i.key().asString()); - iss >> std::hex >> token; - Message msg; - msg.status = (MessageStatus)jmsg["status"].asInt(); - msg.to = jmsg["to"].asString(); - auto wall_time = std::chrono::system_clock::from_time_t(jmsg["last_op"].asInt64()); - msg.last_op = clock::now() + (wall_time - std::chrono::system_clock::now()); - msg.retried = jmsg.get("retried", 0).asUInt(); - const auto& pl = jmsg["payload"]; - for (auto p = pl.begin(); p != pl.end(); ++p) - msg.payloads[p.key().asString()] = p->asString(); - messages_.emplace(token, std::move(msg)); - loaded++; + auto to = i.key().asString(); + auto& pmessages = *i; + auto& p = messages_[to]; + for (auto m = pmessages.begin(); m != pmessages.end(); ++m) { + const auto& jmsg = *m; + MessageToken token; + std::istringstream iss(i.key().asString()); + iss >> std::hex >> token; + Message msg; + msg.status = (MessageStatus)jmsg["status"].asInt(); + msg.to = jmsg["to"].asString(); + auto wall_time = std::chrono::system_clock::from_time_t(jmsg["last_op"].asInt64()); + msg.last_op = clock::now() + (wall_time - std::chrono::system_clock::now()); + msg.retried = jmsg.get("retried", 0).asUInt(); + const auto& pl = jmsg["payload"]; + for (auto p = pl.begin(); p != pl.end(); ++p) + msg.payloads[p.key().asString()] = p->asString(); + p.emplace(token, std::move(msg)); + loaded++; + } } RING_DBG("[Account %s] loaded %lu messages from %s", account_.getAccountID().c_str(), loaded, savePath_.c_str()); } catch (const std::exception& e) { RING_ERR("[Account %s] couldn't load messages from %s: %s", account_.getAccountID().c_str(), savePath_.c_str(), e.what()); } - reschedule(); } void @@ -221,21 +212,25 @@ MessageEngine::save_() const try { Json::Value root(Json::objectValue); for (auto& c : messages_) { - auto& v = c.second; - if (v.status == MessageStatus::FAILURE || v.status == MessageStatus::SENT) - continue; - Json::Value msg; - std::ostringstream msgsId; - msgsId << std::hex << c.first; - msg["status"] = (int)(v.status == MessageStatus::SENDING ? MessageStatus::IDLE : v.status); - msg["to"] = v.to; - auto wall_time = std::chrono::system_clock::now() + std::chrono::duration_cast<std::chrono::system_clock::duration>(v.last_op - clock::now()); - msg["last_op"] = (Json::Value::Int64) std::chrono::system_clock::to_time_t(wall_time); - msg["retried"] = v.retried; - auto& payloads = msg["payload"]; - for (const auto& p : v.payloads) - payloads[p.first] = p.second; - root[msgsId.str()] = std::move(msg); + Json::Value peerRoot(Json::objectValue); + for (auto& m : c.second) { + auto& v = m.second; + if (v.status == MessageStatus::FAILURE || v.status == MessageStatus::SENT) + continue; + Json::Value msg; + std::ostringstream msgsId; + msgsId << std::hex << c.first; + msg["status"] = (int)(v.status == MessageStatus::SENDING ? MessageStatus::IDLE : v.status); + msg["to"] = v.to; + auto wall_time = std::chrono::system_clock::now() + std::chrono::duration_cast<std::chrono::system_clock::duration>(v.last_op - clock::now()); + msg["last_op"] = (Json::Value::Int64) std::chrono::system_clock::to_time_t(wall_time); + msg["retried"] = v.retried; + auto& payloads = msg["payload"]; + for (const auto& p : v.payloads) + payloads[p.first] = p.second; + peerRoot[msgsId.str()] = std::move(msg); + } + root[c.first] = std::move(peerRoot); } // Save asynchronously ThreadPool::instance().run([path = savePath_, diff --git a/src/im/message_engine.h b/src/im/message_engine.h index 13b581b9d9c24a51160f47bbf6b4e1af490552e6..353c5ad59a29c7ccb4cf0789cc134a4328189ada 100644 --- a/src/im/message_engine.h +++ b/src/im/message_engine.h @@ -21,6 +21,7 @@ #include <string> #include <map> +#include <set> #include <chrono> #include <mutex> #include <cstdint> @@ -56,7 +57,8 @@ public: return getStatus(t) == MessageStatus::SENT; } - void onMessageSent(MessageToken t, bool success); + void onMessageSent(const std::string& peer, MessageToken t, bool success); + void onPeerOnline(const std::string& peer); /** * Load persisted messages @@ -70,13 +72,11 @@ public: private: - static const constexpr unsigned MAX_RETRIES = 1; + static const constexpr unsigned MAX_RETRIES = 20; static const std::chrono::minutes RETRY_PERIOD; using clock = std::chrono::steady_clock; - clock::time_point nextEvent() const; - void retrySend(); - void reschedule(); + void retrySend(const std::string& peer); void save_() const; struct Message { @@ -90,7 +90,9 @@ private: SIPAccountBase& account_; const std::string savePath_; - std::map<MessageToken, Message> messages_; + std::map<std::string, std::map<MessageToken, Message>> messages_; + std::set<MessageToken> sentMessages_; + mutable std::mutex messagesMutex_ {}; }; diff --git a/src/ringdht/ringaccount.cpp b/src/ringdht/ringaccount.cpp index 319b2213dbbf3f2c3df8968b31a5240a63d2bf07..d61217d16fb174c99b0f20f38da8c8c45b6dc550 100644 --- a/src/ringdht/ringaccount.cpp +++ b/src/ringdht/ringaccount.cpp @@ -2043,7 +2043,9 @@ void RingAccount::onTrackedBuddyOnline(const dht::InfoHash& contactId) { RING_DBG("Buddy %s online", contactId.toString().c_str()); - emitSignal<DRing::PresenceSignal::NewBuddyNotification>(getAccountID(), contactId.toString(), 1, ""); + std::string id(contactId.toString()); + emitSignal<DRing::PresenceSignal::NewBuddyNotification>(getAccountID(), id, 1, ""); + messageEngine_.onPeerOnline(id); } void @@ -3360,29 +3362,39 @@ RingAccount::forEachDevice(const dht::InfoHash& to, }); } -void -RingAccount::sendTextMessage(const std::string& to, const std::map<std::string, std::string>& payloads, uint64_t token) +uint64_t +RingAccount::sendTextMessage(const std::string& to, const std::map<std::string, std::string>& payloads) { - if (to.empty() or payloads.empty()) { - messageEngine_.onMessageSent(token, false); - return; + std::string toUri; + try { + toUri = parseRingUri(to); + } catch (...) { + RING_ERR("Failed to send a text message due to an invalid URI %s", to.c_str()); + return 0; } if (payloads.size() != 1) { - // Multi-part message - // TODO: not supported yet RING_ERR("Multi-part im is not supported yet by RingAccount"); - messageEngine_.onMessageSent(token, false); - return; + return 0; } + return SIPAccountBase::sendTextMessage(toUri, payloads); +} +void +RingAccount::sendTextMessage(const std::string& to, const std::map<std::string, std::string>& payloads, uint64_t token) +{ std::string toUri; - try { toUri = parseRingUri(to); - } - catch (...) { + } catch (...) { RING_ERR("Failed to send a text message due to an invalid URI %s", to.c_str()); - messageEngine_.onMessageSent(token, false); + messageEngine_.onMessageSent(to, token, false); + return; + } + if (payloads.size() != 1) { + // Multi-part message + // TODO: not supported yet + RING_ERR("Multi-part im is not supported yet by RingAccount"); + messageEngine_.onMessageSent(toUri, token, false); return; } @@ -3396,7 +3408,7 @@ RingAccount::sendTextMessage(const std::string& to, const std::map<std::string, auto confirm = std::make_shared<PendingConfirmation>(); // Find listening devices for this account - forEachDevice(toH, [confirm,token,payloads,now](const std::shared_ptr<RingAccount>& this_, const dht::InfoHash& dev) + forEachDevice(toH, [confirm,to,token,payloads,now](const std::shared_ptr<RingAccount>& this_, const dht::InfoHash& dev) { { std::lock_guard<std::mutex> lock(this_->messageMutex_); @@ -3406,7 +3418,7 @@ RingAccount::sendTextMessage(const std::string& to, const std::map<std::string, auto h = dht::InfoHash::get("inbox:"+dev.toString()); std::weak_ptr<RingAccount> w = this_; - auto list_token = this_->dht_.listen<dht::ImMessage>(h, [w,token,confirm](dht::ImMessage&& msg) { + auto list_token = this_->dht_.listen<dht::ImMessage>(h, [to, w, token, confirm](dht::ImMessage&& msg) { if (auto sthis = w.lock()) { auto& this_ = *sthis; // check expected message confirmation @@ -3435,14 +3447,14 @@ RingAccount::sendTextMessage(const std::string& to, const std::map<std::string, this_.dht_.cancelListen(t.first, t.second.get()); confirm->listenTokens.clear(); confirm->replied = true; - this_.messageEngine_.onMessageSent(token, true); + this_.messageEngine_.onMessageSent(to, token, true); } return false; }); confirm->listenTokens.emplace(h, std::move(list_token)); this_->dht_.putEncrypted(h, dev, dht::ImMessage(token, std::string(payloads.begin()->first), std::string(payloads.begin()->second), now), - [w,token,confirm,h](bool ok) { + [w,to,token,confirm,h](bool ok) { if (auto this_ = w.lock()) { RING_DBG() << "[Account " << this_->getAccountID() << "] [message " << token << "] Put encrypted " << (ok ? "ok" : "failed"); if (not ok) { @@ -3452,20 +3464,20 @@ RingAccount::sendTextMessage(const std::string& to, const std::map<std::string, confirm->listenTokens.erase(lt); } if (confirm->listenTokens.empty() and not confirm->replied) - this_->messageEngine_.onMessageSent(token, false); + this_->messageEngine_.onMessageSent(to, token, false); } } }); RING_DBG() << "[Account " << this_->getAccountID() << "] [message " << token << "] Sending message for device " << dev.toString(); - }, [token](const std::shared_ptr<RingAccount>& shared, bool ok) { + }, [to, token](const std::shared_ptr<RingAccount>& shared, bool ok) { if (not ok) { - shared->messageEngine_.onMessageSent(token, false); + shared->messageEngine_.onMessageSent(to, token, false); } }); // Timeout cleanup - Manager::instance().scheduleTask([w=weak(), confirm, token]() { + Manager::instance().scheduleTask([w=weak(), confirm, to, token]() { if (not confirm->replied) { if (auto this_ = w.lock()) { RING_DBG() << "[Account " << this_->getAccountID() << "] [message " << token << "] Timeout"; @@ -3473,7 +3485,7 @@ RingAccount::sendTextMessage(const std::string& to, const std::map<std::string, this_->dht_.cancelListen(t.first, t.second.get()); confirm->listenTokens.clear(); confirm->replied = true; - this_->messageEngine_.onMessageSent(token, false); + this_->messageEngine_.onMessageSent(to, token, false); } } }, std::chrono::steady_clock::now() + std::chrono::minutes(1)); diff --git a/src/ringdht/ringaccount.h b/src/ringdht/ringaccount.h index 454e4a4af23b99cbdad387d2755f5008895ef759..7ca8543458c31c653746018e8c0fb6e49f757b2f 100644 --- a/src/ringdht/ringaccount.h +++ b/src/ringdht/ringaccount.h @@ -310,6 +310,7 @@ class RingAccount : public SIPAccountBase { void sendTrustRequest(const std::string& to, const std::vector<uint8_t>& payload); void sendTrustRequestConfirm(const dht::InfoHash& to); virtual void sendTextMessage(const std::string& to, const std::map<std::string, std::string>& payloads, uint64_t id) override; + virtual uint64_t sendTextMessage(const std::string& to, const std::map<std::string, std::string>& payloads) override; /* Devices */ void addDevice(const std::string& password); diff --git a/src/sip/sipaccount.cpp b/src/sip/sipaccount.cpp index 8a3adecfaa0a4c20738c055c3f49e03e96e3be7f..f7818adcdd38281f4638e45e08b9e3971dbf4584 100644 --- a/src/sip/sipaccount.cpp +++ b/src/sip/sipaccount.cpp @@ -2068,7 +2068,7 @@ SIPAccount::sendTextMessage(const std::string& to, const std::map<std::string, s { if (to.empty() or payloads.empty()) { RING_WARN("No sender or payload"); - messageEngine_.onMessageSent(id, false); + messageEngine_.onMessageSent(to, id, false); return; } @@ -2086,7 +2086,7 @@ SIPAccount::sendTextMessage(const std::string& to, const std::map<std::string, s nullptr, &tdata); if (status != PJ_SUCCESS) { RING_ERR("Unable to create request: %s", sip_utils::sip_strerror(status).c_str()); - messageEngine_.onMessageSent(id, false); + messageEngine_.onMessageSent(to, id, false); return; } @@ -2097,17 +2097,19 @@ SIPAccount::sendTextMessage(const std::string& to, const std::map<std::string, s struct ctx { std::weak_ptr<SIPAccount> acc; + std::string to; uint64_t id; }; ctx* t = new ctx; t->acc = shared(); + t->to = to; t->id = id; status = pjsip_endpt_send_request(link_->getEndpoint(), tdata, -1, t, [](void *token, pjsip_event *e) { auto c = (ctx*) token; try { if (auto acc = c->acc.lock()) { - acc->messageEngine_.onMessageSent(c->id, e + acc->messageEngine_.onMessageSent(c->to, c->id, e && e->body.tsx_state.tsx && e->body.tsx_state.tsx->status_code == PJSIP_SC_OK); }