Project 'savoirfairelinux/ring-daemon' was moved to 'savoirfairelinux/jami-daemon'. Please update any links and bookmarks that may still have the old path.
Select Git revision
sflphonegui-uml.xmi
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
message_engine.cpp 10.88 KiB
/*
* Copyright (C) 2004-2020 Savoir-faire Linux Inc.
*
* Author: Adrien Béraud <adrien.beraud@savoirfairelinux.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "message_engine.h"
#include "sip/sipaccountbase.h"
#include "manager.h"
#include "fileutils.h"
#include "client/ring_signal.h"
#include "dring/account_const.h"
#include <opendht/thread_pool.h>
#include <json/json.h>
#include <fstream>
namespace jami {
namespace im {
MessageEngine::MessageEngine(SIPAccountBase& acc, const std::string& path) : account_(acc), savePath_(path)
{}
MessageToken
MessageEngine::sendMessage(const std::string& to, const std::map<std::string, std::string>& payloads)
{
if (payloads.empty() or to.empty())
return 0;
MessageToken token;
{
std::lock_guard<std::mutex> lock(messagesMutex_);
auto& peerMessages = messages_[to];
do {
token = std::uniform_int_distribution<MessageToken>{1, DRING_ID_MAX_VAL}(account_.rand);
} while (peerMessages.find(token) != peerMessages.end());
auto m = peerMessages.emplace(token, Message{});
m.first->second.to = to;
m.first->second.payloads = payloads;
save_();
}
runOnMainThread([this, to]() {
retrySend(to);
});
return token;
}
void
MessageEngine::onPeerOnline(const std::string& peer, bool retryOnTimeout)
{
retrySend(peer, retryOnTimeout);
}
void
MessageEngine::retrySend(const std::string& peer, bool retryOnTimeout)
{
struct PendingMsg {
MessageToken token;
std::string to;
std::map<std::string, std::string> payloads;
};
std::vector<PendingMsg> pending {};
{
std::lock_guard<std::mutex> lock(messagesMutex_);
auto p = messages_.find(peer);
if (p == messages_.end())
return;
auto& messages = p->second;
for (auto m = messages.begin(); m != messages.end(); ++m) {
if (m->second.status == MessageStatus::UNKNOWN || m->second.status == MessageStatus::IDLE) {
m->second.status = MessageStatus::SENDING;
m->second.retried++;
m->second.last_op = clock::now();
pending.emplace_back(PendingMsg {m->first, m->second.to, m->second.payloads});
}
}
}
// avoid locking while calling callback
for (const auto& p : pending) {
JAMI_DBG() << "[message " << p.token << "] Retry sending";
emitSignal<DRing::ConfigurationSignal::AccountMessageStatusChanged>(
account_.getAccountID(),
p.token,
p.to,
(int)DRing::Account::MessageStates::SENDING);
account_.sendTextMessage(p.to, p.payloads, p.token, retryOnTimeout);
}
}
MessageStatus
MessageEngine::getStatus(MessageToken t) const
{
std::lock_guard<std::mutex> lock(messagesMutex_);
for (const auto& p : messages_) {
const auto m = p.second.find(t);
if (m != p.second.end())
return m->second.status;
}
return MessageStatus::UNKNOWN;
}
bool
MessageEngine::cancel(MessageToken t)
{
std::lock_guard<std::mutex> lock(messagesMutex_);
for (auto& p : messages_) {
auto m = p.second.find(t);
if (m != p.second.end()) {
m->second.status = MessageStatus::CANCELLED;
emitSignal<DRing::ConfigurationSignal::AccountMessageStatusChanged>(account_.getAccountID(),
t,
m->second.to,
static_cast<int>(DRing::Account::MessageStates::CANCELLED));
save_();
return true;
}
}
return false;
}
void
MessageEngine::onMessageSent(const std::string& peer, MessageToken token, bool ok)
{
JAMI_DBG() << "[message " << token << "] Message sent: " << (ok ? "success" : "failure");
std::lock_guard<std::mutex> lock(messagesMutex_);
auto p = messages_.find(peer);
if (p == messages_.end()) {
JAMI_DBG() << "[message " << token << "] Can't find peer";
return;
}
auto f = p->second.find(token);
if (f != p->second.end()) {
if (f->second.status == MessageStatus::SENDING) {
if (ok) {
f->second.status = MessageStatus::SENT;
JAMI_DBG() << "[message " << token << "] Status changed to SENT";
emitSignal<DRing::ConfigurationSignal::AccountMessageStatusChanged>(account_.getAccountID(),
token,
f->second.to,
static_cast<int>(DRing::Account::MessageStates::SENT));
save_();
} else if (f->second.retried >= MAX_RETRIES) {
f->second.status = MessageStatus::FAILURE;
JAMI_DBG() << "[message " << token << "] Status changed to FAILURE";
emitSignal<DRing::ConfigurationSignal::AccountMessageStatusChanged>(account_.getAccountID(),
token,
f->second.to,
static_cast<int>(DRing::Account::MessageStates::FAILURE));
save_();
} else {
f->second.status = MessageStatus::IDLE;
JAMI_DBG() << "[message " << token << "] Status changed to IDLE";
}
} else {
JAMI_DBG() << "[message " << token << "] State is not SENDING";
}
} else {
JAMI_DBG() << "[message " << token << "] Can't find message";
}
}
void
MessageEngine::load()
{
try {
Json::Value root;
{
std::lock_guard<std::mutex> lock(fileutils::getFileLock(savePath_));
std::ifstream file;
file.exceptions(std::ifstream::failbit | std::ifstream::badbit);
fileutils::openStream(file, savePath_);
file >> root;
}
std::lock_guard<std::mutex> lock(messagesMutex_);
long unsigned loaded {0};
for (auto i = root.begin(); i != root.end(); ++i) {
auto to = i.key().asString();
auto& pmessages = *i;
auto& p = messages_[to];
for (auto m = pmessages.begin(); m != pmessages.end(); ++m) {
const auto& jmsg = *m;
MessageToken token;
std::istringstream iss(m.key().asString());
iss >> std::hex >> token;
Message msg;
msg.status = (MessageStatus)jmsg["status"].asInt();
msg.to = jmsg["to"].asString();
auto wall_time = std::chrono::system_clock::from_time_t(jmsg["last_op"].asInt64());
msg.last_op = clock::now() + (wall_time - std::chrono::system_clock::now());
msg.retried = jmsg.get("retried", 0).asUInt();
const auto& pl = jmsg["payload"];
for (auto p = pl.begin(); p != pl.end(); ++p)
msg.payloads[p.key().asString()] = p->asString();
p.emplace(token, std::move(msg));
loaded++;
}
}
JAMI_DBG("[Account %s] loaded %lu messages from %s", account_.getAccountID().c_str(), loaded, savePath_.c_str());
} catch (const std::exception& e) {
JAMI_DBG("[Account %s] couldn't load messages from %s: %s", account_.getAccountID().c_str(), savePath_.c_str(), e.what());
}
}
void
MessageEngine::save() const
{
std::lock_guard<std::mutex> lock(messagesMutex_);
save_();
}
void
MessageEngine::save_() const
{
try {
Json::Value root(Json::objectValue);
for (auto& c : messages_) {
Json::Value peerRoot(Json::objectValue);
for (auto& m : c.second) {
auto& v = m.second;
if (v.status == MessageStatus::FAILURE || v.status == MessageStatus::SENT || v.status == MessageStatus::CANCELLED)
continue;
Json::Value msg;
std::ostringstream msgsId;
msgsId << std::hex << m.first;
msg["status"] = (int)(v.status == MessageStatus::SENDING ? MessageStatus::IDLE : v.status);
msg["to"] = v.to;
auto wall_time = std::chrono::system_clock::now() + std::chrono::duration_cast<std::chrono::system_clock::duration>(v.last_op - clock::now());
msg["last_op"] = (Json::Value::Int64) std::chrono::system_clock::to_time_t(wall_time);
msg["retried"] = v.retried;
auto& payloads = msg["payload"];
for (const auto& p : v.payloads)
payloads[p.first] = p.second;
peerRoot[msgsId.str()] = std::move(msg);
}
root[c.first] = std::move(peerRoot);
}
// Save asynchronously
dht::ThreadPool::computation().run([path = savePath_,
root = std::move(root),
accountID = account_.getAccountID(),
messageNum = messages_.size()]
{
std::lock_guard<std::mutex> lock(fileutils::getFileLock(path));
try {
Json::StreamWriterBuilder wbuilder;
wbuilder["commentStyle"] = "None";
wbuilder["indentation"] = "";
const std::unique_ptr<Json::StreamWriter> writer(wbuilder.newStreamWriter());
std::ofstream file;
file.exceptions(std::ifstream::failbit | std::ifstream::badbit);
fileutils::openStream(file, path, std::ios::trunc);
writer->write(root, &file);
} catch (const std::exception& e) {
JAMI_ERR("[Account %s] Couldn't save messages to %s: %s", accountID.c_str(), path.c_str(), e.what());
}
JAMI_DBG("[Account %s] saved %zu messages to %s", accountID.c_str(), messageNum, path.c_str());
});
} catch (const std::exception& e) {
JAMI_ERR("[Account %s] couldn't save messages to %s: %s", account_.getAccountID().c_str(), savePath_.c_str(), e.what());
}
}
}}