Select Git revision
message_engine.cpp
-
Sébastien Blin authored
This patches introduces two new concepts in order to reduce the code of JamiAccount. ChannelHandlers to manages protocols logic. The idea of this class is to handle channels per protocol, accept and reject it. AccountModule, to be able to separate logic between call managements datatransfer, config and conversation but give an interface to detect when some events should occurs. Change-Id: I34ff07852c06d7266411f1ffb32b71a1834aba4f GitLab: #603
Sébastien Blin authoredThis patches introduces two new concepts in order to reduce the code of JamiAccount. ChannelHandlers to manages protocols logic. The idea of this class is to handle channels per protocol, accept and reject it. AccountModule, to be able to separate logic between call managements datatransfer, config and conversation but give an interface to detect when some events should occurs. Change-Id: I34ff07852c06d7266411f1ffb32b71a1834aba4f GitLab: #603
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
message_engine.cpp 12.28 KiB
/*
* Copyright (C) 2004-2021 Savoir-faire Linux Inc.
*
* Author: Adrien Béraud <adrien.beraud@savoirfairelinux.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "message_engine.h"
#include "sip/sipaccountbase.h"
#include "manager.h"
#include "fileutils.h"
#include "client/ring_signal.h"
#include "jami/account_const.h"
#include <opendht/thread_pool.h>
#include <json/json.h>
#include <fstream>
namespace jami {
namespace im {
MessageEngine::MessageEngine(SIPAccountBase& acc, const std::string& path)
: account_(acc)
, savePath_(path)
{
auto found = savePath_.find_last_of(DIR_SEPARATOR_CH);
auto dir = savePath_.substr(0, found);
fileutils::check_dir(dir.c_str());
}
MessageToken
MessageEngine::sendMessage(const std::string& to, const std::map<std::string, std::string>& payloads)
{
if (payloads.empty() or to.empty())
return 0;
MessageToken token;
{
std::lock_guard<std::mutex> lock(messagesMutex_);
auto& peerMessages = messages_[to];
do {
token = std::uniform_int_distribution<MessageToken> {1, JAMI_ID_MAX_VAL}(account_.rand);
} while (peerMessages.find(token) != peerMessages.end());
auto m = peerMessages.emplace(token, Message {});
m.first->second.to = to;
m.first->second.payloads = payloads;
save_();
}
runOnMainThread([this, to]() { retrySend(to); });
return token;
}
void
MessageEngine::onPeerOnline(const std::string& peer, bool retryOnTimeout)
{
retrySend(peer, retryOnTimeout);
}
void
MessageEngine::retrySend(const std::string& peer, bool retryOnTimeout)
{
if (account_.getRegistrationState() != RegistrationState::REGISTERED) {
return;
}
struct PendingMsg
{
MessageToken token;
std::string to;
std::map<std::string, std::string> payloads;
};
std::vector<PendingMsg> pending {};
{
std::lock_guard<std::mutex> lock(messagesMutex_);
auto p = messages_.find(peer);
if (p == messages_.end())
return;
auto& messages = p->second;
for (auto m = messages.begin(); m != messages.end(); ++m) {
if (m->second.status == MessageStatus::UNKNOWN
|| m->second.status == MessageStatus::IDLE) {
m->second.status = MessageStatus::SENDING;
m->second.retried++;
m->second.last_op = clock::now();
pending.emplace_back(PendingMsg {m->first, m->second.to, m->second.payloads});
}
}
}
// avoid locking while calling callback
for (const auto& p : pending) {
JAMI_DBG() << "[message " << p.token << "] Retry sending";
if (p.payloads.find("application/im-gitmessage-id") == p.payloads.end())
emitSignal<DRing::ConfigurationSignal::AccountMessageStatusChanged>(
account_.getAccountID(),
"",
p.to,
std::to_string(p.token),
(int) DRing::Account::MessageStates::SENDING);
account_.sendTextMessage(p.to, p.payloads, p.token, retryOnTimeout);
}
}
MessageStatus
MessageEngine::getStatus(MessageToken t) const
{
std::lock_guard<std::mutex> lock(messagesMutex_);
for (const auto& p : messages_) {
const auto m = p.second.find(t);
if (m != p.second.end())
return m->second.status;
}
return MessageStatus::UNKNOWN;
}
bool
MessageEngine::cancel(MessageToken t)
{
std::lock_guard<std::mutex> lock(messagesMutex_);
for (auto& p : messages_) {
auto m = p.second.find(t);
if (m != p.second.end()) {
auto emit = m->second.payloads.find("application/im-gitmessage-id")
== m->second.payloads.end();
m->second.status = MessageStatus::CANCELLED;
if (emit)
emitSignal<DRing::ConfigurationSignal::AccountMessageStatusChanged>(
account_.getAccountID(),
"",
m->second.to,
std::to_string(t),
static_cast<int>(DRing::Account::MessageStates::CANCELLED));
save_();
return true;
}
}
return false;
}
void
MessageEngine::onMessageSent(const std::string& peer, MessageToken token, bool ok)
{
JAMI_DBG() << "[message " << token << "] Message sent: " << (ok ? "success" : "failure");
std::lock_guard<std::mutex> lock(messagesMutex_);
auto p = messages_.find(peer);
if (p == messages_.end()) {
JAMI_DBG() << "[message " << token << "] Can't find peer";
return;
}
auto f = p->second.find(token);
if (f != p->second.end()) {
auto emit = f->second.payloads.find("application/im-gitmessage-id")
== f->second.payloads.end();
if (f->second.status == MessageStatus::SENDING) {
if (ok) {
f->second.status = MessageStatus::SENT;
JAMI_DBG() << "[message " << token << "] Status changed to SENT";
if (emit)
emitSignal<DRing::ConfigurationSignal::AccountMessageStatusChanged>(
account_.getAccountID(),
"",
f->second.to,
std::to_string(token),
static_cast<int>(DRing::Account::MessageStates::SENT));
save_();
} else if (f->second.retried >= MAX_RETRIES) {
f->second.status = MessageStatus::FAILURE;
JAMI_DBG() << "[message " << token << "] Status changed to FAILURE";
if (emit)
emitSignal<DRing::ConfigurationSignal::AccountMessageStatusChanged>(
account_.getAccountID(),
"",
f->second.to,
std::to_string(token),
static_cast<int>(DRing::Account::MessageStates::FAILURE));
save_();
} else {
f->second.status = MessageStatus::IDLE;
JAMI_DBG() << "[message " << token << "] Status changed to IDLE";
}
} else {
JAMI_DBG() << "[message " << token << "] State is not SENDING";
}
} else {
JAMI_DBG() << "[message " << token << "] Can't find message";
}
}
void
MessageEngine::onMessageDisplayed(const std::string& peer, MessageToken token, bool displayed)
{
if (not displayed)
return;
JAMI_DBG() << "[message " << token << "] Displayed by peer";
emitSignal<DRing::ConfigurationSignal::AccountMessageStatusChanged>(
account_.getAccountID(),
"", /* No related conversation */
peer,
std::to_string(token),
static_cast<int>(DRing::Account::MessageStates::DISPLAYED));
}
void
MessageEngine::load()
{
try {
Json::Value root;
{
std::lock_guard<std::mutex> lock(fileutils::getFileLock(savePath_));
std::ifstream file;
file.exceptions(std::ifstream::failbit | std::ifstream::badbit);
fileutils::openStream(file, savePath_);
if (file.is_open())
file >> root;
}
std::lock_guard<std::mutex> lock(messagesMutex_);
long unsigned loaded {0};
for (auto i = root.begin(); i != root.end(); ++i) {
auto to = i.key().asString();
auto& pmessages = *i;
auto& p = messages_[to];
for (auto m = pmessages.begin(); m != pmessages.end(); ++m) {
const auto& jmsg = *m;
MessageToken token = from_hex_string(m.key().asString());
Message msg;
msg.status = (MessageStatus) jmsg["status"].asInt();
msg.to = jmsg["to"].asString();
auto wall_time = std::chrono::system_clock::from_time_t(jmsg["last_op"].asInt64());
msg.last_op = clock::now() + (wall_time - std::chrono::system_clock::now());
msg.retried = jmsg.get("retried", 0).asUInt();
const auto& pl = jmsg["payload"];
for (auto p = pl.begin(); p != pl.end(); ++p)
msg.payloads[p.key().asString()] = p->asString();
p.emplace(token, std::move(msg));
loaded++;
}
}
if (loaded > 0) {
JAMI_DBG("[Account %s] loaded %lu messages from %s",
account_.getAccountID().c_str(),
loaded,
savePath_.c_str());
}
} catch (const std::exception& e) {
JAMI_DBG("[Account %s] couldn't load messages from %s: %s",
account_.getAccountID().c_str(),
savePath_.c_str(),
e.what());
}
}
void
MessageEngine::save() const
{
std::lock_guard<std::mutex> lock(messagesMutex_);
save_();
}
void
MessageEngine::save_() const
{
try {
Json::Value root(Json::objectValue);
for (auto& c : messages_) {
Json::Value peerRoot(Json::objectValue);
for (auto& m : c.second) {
auto& v = m.second;
if (v.status == MessageStatus::FAILURE || v.status == MessageStatus::SENT
|| v.status == MessageStatus::CANCELLED)
continue;
Json::Value msg;
msg["status"] = (int) (v.status == MessageStatus::SENDING ? MessageStatus::IDLE
: v.status);
msg["to"] = v.to;
auto wall_time = std::chrono::system_clock::now()
+ std::chrono::duration_cast<std::chrono::system_clock::duration>(
v.last_op - clock::now());
msg["last_op"] = (Json::Value::Int64) std::chrono::system_clock::to_time_t(
wall_time);
msg["retried"] = v.retried;
auto& payloads = msg["payload"];
for (const auto& p : v.payloads)
payloads[p.first] = p.second;
peerRoot[to_hex_string(m.first)] = std::move(msg);
}
root[c.first] = std::move(peerRoot);
}
// Save asynchronously
dht::ThreadPool::computation().run([path = savePath_,
root = std::move(root),
accountID = account_.getAccountID(),
messageNum = messages_.size()] {
std::lock_guard<std::mutex> lock(fileutils::getFileLock(path));
try {
Json::StreamWriterBuilder wbuilder;
wbuilder["commentStyle"] = "None";
wbuilder["indentation"] = "";
const std::unique_ptr<Json::StreamWriter> writer(wbuilder.newStreamWriter());
std::ofstream file;
file.exceptions(std::ifstream::failbit | std::ifstream::badbit);
fileutils::openStream(file, path, std::ios::trunc);
if (file.is_open())
writer->write(root, &file);
} catch (const std::exception& e) {
JAMI_ERR("[Account %s] Couldn't save messages to %s: %s",
accountID.c_str(),
path.c_str(),
e.what());
}
JAMI_DBG("[Account %s] saved %zu messages to %s",
accountID.c_str(),
messageNum,
path.c_str());
});
} catch (const std::exception& e) {
JAMI_ERR("[Account %s] couldn't save messages to %s: %s",
account_.getAccountID().c_str(),
savePath_.c_str(),
e.what());
}
}
} // namespace im
} // namespace jami