Skip to content
Snippets Groups Projects
Commit 26cbf5d8 authored by Adrien Béraud's avatar Adrien Béraud Committed by Andreas Traczyk
Browse files

message engine: don't persist sent/failed messages


* Avoid message accumulation by not persisting the state
of failed and sent messages, which are already saved by clients.
Clients are still able to know the state of a sent/failed
message sent during the same session, and are expected to catch
and persist the message state signal to know when a message failed
or was successfully sent across daemon restarts.

* Save pending messages at every state change and don't
delete the previous file to avoid loosing the message state
in case of crash.

* Make message saving asynchronous to avoid a slow/high latency
storage device to block the UI or DHT at every message

Change-Id: I96221152a86990c8e9f1be13903a675a87e4e975
Reviewed-by: default avatarAndreas Traczyk <andreas.traczyk@savoirfairelinux.com>
parent 8e8481c2
Branches
No related tags found
No related merge requests found
...@@ -217,6 +217,14 @@ expand_path(const std::string &path) ...@@ -217,6 +217,14 @@ expand_path(const std::string &path)
#endif #endif
} }
std::map<std::string, std::mutex> fileLocks {};
std::mutex&
getFileLock(const std::string& path)
{
return fileLocks[path];
}
bool isFile (const std::string& path) { bool isFile (const std::string& path) {
struct stat s; struct stat s;
return (stat (path.c_str(), &s) == 0) and not (s.st_mode & S_IFDIR); return (stat (path.c_str(), &s) == 0) and not (s.st_mode & S_IFDIR);
......
...@@ -24,6 +24,7 @@ ...@@ -24,6 +24,7 @@
#include <string> #include <string>
#include <vector> #include <vector>
#include <chrono> #include <chrono>
#include <mutex>
#include <cstdio> #include <cstdio>
#ifndef RING_UWP #ifndef RING_UWP
...@@ -106,6 +107,8 @@ namespace ring { namespace fileutils { ...@@ -106,6 +107,8 @@ namespace ring { namespace fileutils {
std::vector<uint8_t> readArchive(const std::string& path, const std::string& password = {}); std::vector<uint8_t> readArchive(const std::string& path, const std::string& password = {});
void writeArchive(const std::string& data, const std::string& path, const std::string& password = {}); void writeArchive(const std::string& data, const std::string& path, const std::string& password = {});
std::mutex& getFileLock(const std::string& path);
struct FileHandle { struct FileHandle {
int fd; int fd;
const std::string name; const std::string name;
......
...@@ -20,6 +20,8 @@ ...@@ -20,6 +20,8 @@
#include "message_engine.h" #include "message_engine.h"
#include "sip/sipaccountbase.h" #include "sip/sipaccountbase.h"
#include "manager.h" #include "manager.h"
#include "thread_pool.h"
#include "fileutils.h"
#include "client/ring_signal.h" #include "client/ring_signal.h"
#include "dring/account_const.h" #include "dring/account_const.h"
...@@ -51,8 +53,8 @@ MessageEngine::sendMessage(const std::string& to, const std::map<std::string, st ...@@ -51,8 +53,8 @@ MessageEngine::sendMessage(const std::string& to, const std::map<std::string, st
auto m = messages_.emplace(token, Message{}); auto m = messages_.emplace(token, Message{});
m.first->second.to = to; m.first->second.to = to;
m.first->second.payloads = payloads; m.first->second.payloads = payloads;
save_();
} }
save();
runOnMainThread([this]() { runOnMainThread([this]() {
retrySend(); retrySend();
}); });
...@@ -146,6 +148,7 @@ MessageEngine::onMessageSent(MessageToken token, bool ok) ...@@ -146,6 +148,7 @@ MessageEngine::onMessageSent(MessageToken token, bool ok)
token, token,
f->second.to, f->second.to,
static_cast<int>(DRing::Account::MessageStates::SENT)); static_cast<int>(DRing::Account::MessageStates::SENT));
save_();
} else if (f->second.retried >= MAX_RETRIES) { } else if (f->second.retried >= MAX_RETRIES) {
f->second.status = MessageStatus::FAILURE; f->second.status = MessageStatus::FAILURE;
RING_WARN("[message %" PRIx64 "] status changed to FAILURE", token); RING_WARN("[message %" PRIx64 "] status changed to FAILURE", token);
...@@ -153,6 +156,7 @@ MessageEngine::onMessageSent(MessageToken token, bool ok) ...@@ -153,6 +156,7 @@ MessageEngine::onMessageSent(MessageToken token, bool ok)
token, token,
f->second.to, f->second.to,
static_cast<int>(DRing::Account::MessageStates::FAILURE)); static_cast<int>(DRing::Account::MessageStates::FAILURE));
save_();
} else { } else {
f->second.status = MessageStatus::IDLE; f->second.status = MessageStatus::IDLE;
RING_DBG("[message %" PRIx64 "] status changed to IDLE", token); RING_DBG("[message %" PRIx64 "] status changed to IDLE", token);
...@@ -171,15 +175,16 @@ MessageEngine::onMessageSent(MessageToken token, bool ok) ...@@ -171,15 +175,16 @@ MessageEngine::onMessageSent(MessageToken token, bool ok)
void void
MessageEngine::load() MessageEngine::load()
{ {
std::lock_guard<std::mutex> lock(messagesMutex_);
try { try {
Json::Value root;
{
std::lock_guard<std::mutex> lock(fileutils::getFileLock(savePath_));
std::ifstream file; std::ifstream file;
file.exceptions(std::ifstream::failbit | std::ifstream::badbit); file.exceptions(std::ifstream::failbit | std::ifstream::badbit);
file.open(savePath_); file.open(savePath_);
Json::Value root;
file >> root; file >> root;
}
std::lock_guard<std::mutex> lock(messagesMutex_);
long unsigned loaded {0}; long unsigned loaded {0};
for (auto i = root.begin(); i != root.end(); ++i) { for (auto i = root.begin(); i != root.end(); ++i) {
const auto& jmsg = *i; const auto& jmsg = *i;
...@@ -199,9 +204,6 @@ MessageEngine::load() ...@@ -199,9 +204,6 @@ MessageEngine::load()
loaded++; loaded++;
} }
RING_DBG("[Account %s] loaded %lu messages from %s", account_.getAccountID().c_str(), loaded, savePath_.c_str()); RING_DBG("[Account %s] loaded %lu messages from %s", account_.getAccountID().c_str(), loaded, savePath_.c_str());
// everything whent fine, removing the file
std::remove(savePath_.c_str());
} catch (const std::exception& e) { } catch (const std::exception& e) {
RING_ERR("[Account %s] couldn't load messages from %s: %s", account_.getAccountID().c_str(), savePath_.c_str(), e.what()); RING_ERR("[Account %s] couldn't load messages from %s: %s", account_.getAccountID().c_str(), savePath_.c_str(), e.what());
} }
...@@ -210,12 +212,20 @@ MessageEngine::load() ...@@ -210,12 +212,20 @@ MessageEngine::load()
void void
MessageEngine::save() const MessageEngine::save() const
{
std::lock_guard<std::mutex> lock(messagesMutex_);
save_();
}
void
MessageEngine::save_() const
{ {
try { try {
Json::Value root(Json::objectValue); Json::Value root(Json::objectValue);
std::unique_lock<std::mutex> lock(messagesMutex_);
for (auto& c : messages_) { for (auto& c : messages_) {
auto& v = c.second; auto& v = c.second;
if (v.status == MessageStatus::FAILURE || v.status == MessageStatus::SENT)
continue;
Json::Value msg; Json::Value msg;
std::ostringstream msgsId; std::ostringstream msgsId;
msgsId << std::hex << c.first; msgsId << std::hex << c.first;
...@@ -229,15 +239,27 @@ MessageEngine::save() const ...@@ -229,15 +239,27 @@ MessageEngine::save() const
payloads[p.first] = p.second; payloads[p.first] = p.second;
root[msgsId.str()] = std::move(msg); root[msgsId.str()] = std::move(msg);
} }
lock.unlock(); // Save asynchronously
std::ofstream file; ThreadPool::instance().run([path = savePath_,
file.exceptions(std::ifstream::failbit | std::ifstream::badbit); root = std::move(root),
file.open(savePath_, std::ios::trunc); accountID = account_.getAccountID(),
messageNum = messages_.size()]
{
std::lock_guard<std::mutex> lock(fileutils::getFileLock(path));
try {
Json::StreamWriterBuilder wbuilder; Json::StreamWriterBuilder wbuilder;
wbuilder["commentStyle"] = "None"; wbuilder["commentStyle"] = "None";
wbuilder["indentation"] = ""; wbuilder["indentation"] = "";
file << Json::writeString(wbuilder, root); const std::unique_ptr<Json::StreamWriter> writer(wbuilder.newStreamWriter());
RING_DBG("[Account %s] saved %lu messages to %s", account_.getAccountID().c_str(), messages_.size(), savePath_.c_str()); std::ofstream file;
file.exceptions(std::ifstream::failbit | std::ifstream::badbit);
file.open(path, std::ios::trunc);
writer->write(root, &file);
} catch (const std::exception& e) {
RING_ERR("[Account %s] Couldn't save messages to %s: %s", accountID.c_str(), path.c_str(), e.what());
}
RING_DBG("[Account %s] saved %zu messages to %s", accountID.c_str(), messageNum, path.c_str());
});
} catch (const std::exception& e) { } catch (const std::exception& e) {
RING_ERR("[Account %s] couldn't save messages to %s: %s", account_.getAccountID().c_str(), savePath_.c_str(), e.what()); RING_ERR("[Account %s] couldn't save messages to %s: %s", account_.getAccountID().c_str(), savePath_.c_str(), e.what());
} }
......
...@@ -77,6 +77,7 @@ private: ...@@ -77,6 +77,7 @@ private:
clock::time_point nextEvent() const; clock::time_point nextEvent() const;
void retrySend(); void retrySend();
void reschedule(); void reschedule();
void save_() const;
struct Message { struct Message {
std::string to; std::string to;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment