Commit 41e304a4 authored by Adrien Béraud's avatar Adrien Béraud

message engine

Tuleap: #359
Change-Id: I823ac0b9b84558d3a88d85561b506d20bd5fc8d1
parent 3facadc7
......@@ -222,6 +222,27 @@
<arg type="s" name="to" direction="in"/>
<annotation name="org.qtproject.QtDBus.QtTypeName.In2" value="MapStringString"/>
<arg type="a{ss}" name="payloads" direction="in"/>
<arg type="t" name="id" direction="out">
<tp:docstring>
The message ID.
</tp:docstring>
</arg>
</method>
<method name="getMessageStatus" tp:name-for-bindings="getMessageStatus">
<arg type="t" name="id" direction="in"/>
<arg type="s" name="status" direction="out">
<tp:docstring>
The message status.
<ul>
<li>UNKNOWN: unknown message or message status</li>
<li>SENDING: message is being sent or waiting for peer confirmation.</li>
<li>SENT: message have been received from the other end.</li>
<li>READ: message have been read by the peer.</li>
<li>FAILURE: the message coudn't be delivered.</li>
</ul>
</tp:docstring>
</arg>
</method>
<signal name="incomingAccountMessage" tp:name-for-bindings="incomingAccountMessage">
......@@ -235,6 +256,19 @@
<arg type="a{ss}" name="payloads"/>
</signal>
<signal name="accountMessageStatus" tp:name-for-bindings="accountMessageStatus">
<tp:added version="2.3.0"/>
<tp:docstring>
Notify clients that a sent text message status have changed
</tp:docstring>
<arg type="t" name="id"/>
<arg type="s" name="status">
<tp:docstring>The new status of the message, see getMessageStatus for possible values.</tp:docstring>
</arg>
<annotation name="org.qtproject.QtDBus.QtTypeName.In2" value="MapStringString"/>
<arg type="a{ss}" name="payloads"/>
</signal>
<method name="setVolume" tp:name-for-bindings="setVolume">
<tp:docstring>
<p>Sets the volume using a linear scale [0,100].</p>
......
......@@ -91,10 +91,16 @@ DBusConfigurationManager::registerAllAccounts(void)
DRing::registerAllAccounts();
}
void
DBusConfigurationManager::sendTextMessage(const std::string& accountID, const std::string& to, const std::map<std::string, std::string>& payloads)
auto
DBusConfigurationManager::sendTextMessage(const std::string& accountID, const std::string& to, const std::map<std::string, std::string>& payloads) -> decltype(DRing::sendAccountTextMessage(accountID, to, payloads))
{
return DRing::sendAccountTextMessage(accountID, to, payloads);
}
auto
DBusConfigurationManager::getMessageStatus(const uint64_t& id) -> decltype(DRing::getMessageStatus(id))
{
DRing::sendAccountTextMessage(accountID, to, payloads);
return DRing::getMessageStatus(id);
}
auto
......
......@@ -66,7 +66,8 @@ class DBusConfigurationManager :
std::vector<std::string> getAccountList();
void sendRegister(const std::string& accoundID, const bool& enable);
void registerAllAccounts(void);
void sendTextMessage(const std::string& accoundID, const std::string& to, const std::map<std::string, std::string>& payloads);
uint64_t sendTextMessage(const std::string& accoundID, const std::string& to, const std::map<std::string, std::string>& payloads);
std::string getMessageStatus(const uint64_t& id);
std::map<std::string, std::string> getTlsDefaultSettings();
std::vector<std::string> getSupportedCiphers(const std::string& accountID);
std::vector<unsigned> getCodecList();
......
......@@ -30,6 +30,7 @@
#include "noncopyable.h"
#include "config/serializable.h"
#include "registration_states.h"
#include "im/message_engine.h"
#include "ip_utils.h"
#include "media_codec.h"
#include "logger.h"
......@@ -147,9 +148,17 @@ class Account : public Serializable, public std::enable_shared_from_this<Account
/**
* If supported, send a text message from this account.
* @return a token to query the message status
*/
virtual void sendTextMessage(const std::string& to UNUSED,
const std::map<std::string, std::string>& payloads UNUSED) {};
virtual uint64_t sendTextMessage(const std::string& to UNUSED,
const std::map<std::string, std::string>& payloads UNUSED) { return 0; }
/**
* Return the status corresponding to the token.
*/
virtual im::MessageStatus getMessageStatus(uint64_t id UNUSED) const {
return im::MessageStatus::UNKNOWN;
}
std::vector<std::shared_ptr<Call>> getCalls();
......@@ -281,6 +290,12 @@ class Account : public Serializable, public std::enable_shared_from_this<Account
virtual const IceTransportOptions getIceOptions() const noexcept;
/**
* Random generator engine
* Logical account state shall never rely on the state of the random generator.
*/
mutable std::mt19937_64 rand_;
private:
NON_COPYABLE(Account);
......@@ -431,12 +446,6 @@ class Account : public Serializable, public std::enable_shared_from_this<Account
*/
std::string mailBox_;
/**
* Random generator engine
* Logical account state shall never rely on the state of the random generator.
*/
mutable std::mt19937_64 rand_;
/**
* UPnP IGD controller and the mutex to access it
*/
......
......@@ -276,10 +276,16 @@ registerAllAccounts()
ring::Manager::instance().registerAccounts();
}
void
uint64_t
sendAccountTextMessage(const std::string& accountID, const std::string& to, const std::map<std::string, std::string>& payloads)
{
ring::Manager::instance().sendTextMessage(accountID, to, payloads);
return ring::Manager::instance().sendTextMessage(accountID, to, payloads);
}
std::string
getMessageStatus(uint64_t id)
{
return ring::Manager::instance().getMessageStatus(id);
}
/* contact requests */
......
......@@ -65,6 +65,7 @@ getSignalHandlers()
exported_callback<DRing::ConfigurationSignal::CertificateExpired>(),
exported_callback<DRing::ConfigurationSignal::CertificateStateChanged>(),
exported_callback<DRing::ConfigurationSignal::IncomingAccountMessage>(),
exported_callback<DRing::ConfigurationSignal::AccountMessageStatus>(),
exported_callback<DRing::ConfigurationSignal::IncomingTrustRequest>(),
exported_callback<DRing::ConfigurationSignal::MediaParametersChanged>(),
exported_callback<DRing::ConfigurationSignal::Error>(),
......
......@@ -58,6 +58,16 @@ constexpr static const char REQUEST_TIMEOUT [] = "Request Timeout";
} //namespace DRing::Account
namespace MessageStates {
constexpr static const char UNKNOWN [] = "UNKNOWN";
constexpr static const char SENDING [] = "SENDING";
constexpr static const char SENT [] = "SENT";
constexpr static const char READ [] = "READ";
constexpr static const char FAILURE [] = "FAILURE";
} //namespace DRing::MessageStates
namespace VolatileProperties {
constexpr static const char ACTIVE [] = "Account.active";
......
......@@ -49,7 +49,8 @@ void setAccountEnabled(const std::string& accountID, bool enable);
std::vector<std::string> getAccountList();
void sendRegister(const std::string& accountID, bool enable);
void registerAllAccounts(void);
void sendAccountTextMessage(const std::string& accountID, const std::string& to, const std::map<std::string, std::string>& payloads);
uint64_t sendAccountTextMessage(const std::string& accountID, const std::string& to, const std::map<std::string, std::string>& payloads);
std::string getMessageStatus(uint64_t id);
std::map<std::string, std::string> getTlsDefaultSettings();
......@@ -188,6 +189,10 @@ struct ConfigurationSignal {
constexpr static const char* name = "IncomingAccountMessage";
using cb_type = void(const std::string& /*account_id*/, const std::string& /*from*/, const std::map<std::string, std::string>& /*payloads*/);
};
struct AccountMessageStatus {
constexpr static const char* name = "AccountMessageStatus";
using cb_type = void(uint64_t /*message_id*/, const std::string& /*state*/);
};
struct IncomingTrustRequest {
constexpr static const char* name = "IncomingTrustRequest";
using cb_type = void(const std::string& /*account_id*/, const std::string& /*from*/, const std::vector<uint8_t>& payload, time_t received);
......
......@@ -2,4 +2,4 @@ include $(top_srcdir)/globals.mak
noinst_LTLIBRARIES = libim.la
libim_la_SOURCES = instant_messaging.cpp instant_messaging.h
libim_la_SOURCES = instant_messaging.cpp message_engine.cpp instant_messaging.h message_engine.h
/*
* Copyright (C) 2016 Savoir-faire Linux Inc.
*
* Author: Adrien Béraud <adrien.beraud@savoirfairelinux.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "message_engine.h"
#include "sip/sipaccountbase.h"
#include "manager.h"
#include "client/ring_signal.h"
#include "dring/account_const.h"
#include <json/json.h>
#include <fstream>
namespace ring {
namespace im {
static std::uniform_int_distribution<MessageToken> udist {1};
const std::chrono::minutes MessageEngine::RETRY_PERIOD = std::chrono::minutes(1);
MessageEngine::MessageEngine(SIPAccountBase& acc, const std::string& path) : account_(acc), savePath_(path)
{}
MessageToken
MessageEngine::sendMessage(const std::string& to, const std::map<std::string, std::string>& payloads)
{
if (payloads.empty() or to.empty())
return 0;
MessageToken token;
{
std::lock_guard<std::mutex> lock(messagesMutex_);
do {
token = udist(account_.rand_);
} while (messages_.find(token) != messages_.end());
auto m = messages_.emplace(token, Message{});
m.first->second.to = to;
m.first->second.payloads = payloads;
}
save();
runOnMainThread([this,token]() {
std::lock_guard<std::mutex> lock(messagesMutex_);
auto m = messages_.find(token);
if (m != messages_.end())
trySend(m);
});
return token;
}
void
MessageEngine::reschedule()
{
std::lock_guard<std::mutex> lock(messagesMutex_);
if (messages_.empty())
return;
std::weak_ptr<Account> w = std::static_pointer_cast<Account>(account_.shared_from_this());
auto next = nextEvent();
if (next != clock::time_point::max())
Manager::instance().scheduleTask([w,this](){
if (auto s = w.lock())
retrySend();
}, next);
}
MessageEngine::clock::time_point
MessageEngine::nextEvent() const
{
auto next = clock::time_point::max();
for (const auto& m : messages_) {
if (m.second.status == MessageStatus::UNKNOWN || m.second.status == MessageStatus::IDLE) {
auto next_op = m.second.last_op + RETRY_PERIOD;
if (next_op < next)
next = next_op;
}
}
return next;
}
void
MessageEngine::retrySend()
{
auto now = clock::now();
std::lock_guard<std::mutex> lock(messagesMutex_);
for (auto m = messages_.begin(); m != messages_.end(); ++m) {
if (m->second.status == MessageStatus::UNKNOWN || m->second.status == MessageStatus::IDLE) {
auto next_op = m->second.last_op + RETRY_PERIOD;
if (next_op <= now)
trySend(m);
}
}
}
MessageStatus
MessageEngine::getStatus(MessageToken t) const
{
std::lock_guard<std::mutex> lock(messagesMutex_);
const auto m = messages_.find(t);
return (m == messages_.end()) ? MessageStatus::UNKNOWN : m->second.status;
}
void
MessageEngine::trySend(decltype(MessageEngine::messages_)::iterator m)
{
if (m->second.status != MessageStatus::IDLE &&
m->second.status != MessageStatus::UNKNOWN) {
RING_WARN("Can't send message in status %d", (int)m->second.status);
return;
}
RING_DBG("Retrying to send message %lu", m->first);
m->second.status = MessageStatus::SENDING;
m->second.retried++;
m->second.last_op = clock::now();
emitSignal<DRing::ConfigurationSignal::AccountMessageStatus>(m->first, DRing::Account::MessageStates::SENDING);
account_.sendTextMessage(m->second.to, m->second.payloads, m->first);
}
void
MessageEngine::onMessageSent(MessageToken token, bool ok)
{
RING_WARN("Message %lu: %s", token, ok ? "success" : "failure");
std::lock_guard<std::mutex> lock(messagesMutex_);
auto f = messages_.find(token);
if (f != messages_.end()) {
if (f->second.status == MessageStatus::SENDING) {
if (ok) {
f->second.status = MessageStatus::SENT;
emitSignal<DRing::ConfigurationSignal::AccountMessageStatus>(token, DRing::Account::MessageStates::SENT);
} else if (f->second.retried == MAX_RETRIES) {
f->second.status = MessageStatus::FAILURE;
emitSignal<DRing::ConfigurationSignal::AccountMessageStatus>(token, DRing::Account::MessageStates::FAILURE);
} else {
f->second.status = MessageStatus::IDLE;
// TODO: reschedule sending
}
}
}
}
void
MessageEngine::load()
{
try {
std::ifstream file;
file.exceptions(std::ifstream::failbit | std::ifstream::badbit);
file.open(savePath_);
Json::Value root;
Json::Reader reader;
if (!reader.parse(file, root))
throw std::runtime_error("can't parse JSON.");
std::lock_guard<std::mutex> lock(messagesMutex_);
for (auto i = root.begin(); i != root.end(); ++i) {
const auto& jmsg = *i;
MessageToken token;
std::istringstream iss(i.key().asString());
iss >> std::hex >> token;
Message msg;
msg.status = (MessageStatus)jmsg["status"].asInt();
msg.to = jmsg["to"].asString();
auto wall_time = std::chrono::system_clock::from_time_t(jmsg["last_op"].asInt64());
msg.last_op = clock::now() + (wall_time - std::chrono::system_clock::now());
msg.retried = jmsg.get("retried", 0).asUInt();
const auto& pl = jmsg["payload"];
for (auto p = pl.begin(); p != pl.end(); ++p)
msg.payloads[p.key().asString()] = p->asString();
messages_.emplace(token, std::move(msg));
}
// everything whent fine, removing the file
std::remove(savePath_.c_str());
} catch (const std::exception& e) {
RING_ERR("Could not load messages from %s: %s", savePath_.c_str(), e.what());
}
reschedule();
}
void
MessageEngine::save() const
{
try {
Json::Value root(Json::objectValue);
std::unique_lock<std::mutex> lock(messagesMutex_);
for (auto& c : messages_) {
auto& v = c.second;
Json::Value msg;
std::ostringstream msgsId;
msgsId << std::hex << c.first;
msg["status"] = (int)(v.status == MessageStatus::SENDING ? MessageStatus::IDLE : v.status);
msg["to"] = v.to;
auto wall_time = std::chrono::system_clock::now() + std::chrono::duration_cast<std::chrono::system_clock::duration>(v.last_op - clock::now());
msg["last_op"] = (Json::Value::Int64) std::chrono::system_clock::to_time_t(wall_time);
msg["retried"] = v.retried;
auto& payloads = msg["payload"];
for (const auto& p : v.payloads)
payloads[p.first] = p.second;
root[msgsId.str()] = std::move(msg);
}
lock.unlock();
Json::FastWriter fastWriter;
std::ofstream file;
file.exceptions(std::ifstream::failbit | std::ifstream::badbit);
file.open(savePath_, std::ios::trunc);
file << fastWriter.write(root);
} catch (const std::exception& e) {
RING_ERR("Could not save messages to %s: %s", savePath_.c_str(), e.what());
}
}
}}
/*
* Copyright (C) 2016 Savoir-faire Linux Inc.
*
* Author: Adrien Béraud <adrien.beraud@savoirfairelinux.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include <string>
#include <map>
#include <chrono>
#include <mutex>
#include <cstdint>
namespace ring {
class SIPAccountBase;
namespace im {
using MessageToken = uint64_t;
enum class MessageStatus {
UNKNOWN = 0,
IDLE,
SENDING,
SENT,
READ,
FAILURE
};
class MessageEngine
{
public:
MessageEngine(SIPAccountBase&, const std::string& path);
MessageToken sendMessage(const std::string& to, const std::map<std::string, std::string>& payloads);
MessageStatus getStatus(MessageToken t) const;
bool isSent(MessageToken t) const {
return getStatus(t) == MessageStatus::SENT;
}
void onMessageSent(MessageToken t, bool success);
/**
* Load persisted messages
*/
void load();
/**
* Persist messages
*/
void save() const;
private:
static const constexpr unsigned MAX_RETRIES = 3;
static const std::chrono::minutes RETRY_PERIOD;
using clock = std::chrono::steady_clock;
clock::time_point nextEvent() const;
void retrySend();
void reschedule();
struct Message {
std::string to;
std::map<std::string, std::string> payloads;
MessageStatus status {MessageStatus::UNKNOWN};
unsigned retried {0};
clock::time_point last_op;
};
SIPAccountBase& account_;
const std::string savePath_;
std::map<MessageToken, Message> messages_;
mutable std::mutex messagesMutex_ {};
void trySend(decltype(messages_)::iterator);
};
}} // namespace ring::im
......@@ -76,6 +76,7 @@ using random_device = std::random_device;
#include "client/ring_signal.h"
#include "dring/call_const.h"
#include "dring/account_const.h"
#include "libav_utils.h"
#include "video/sinkclient.h"
......@@ -1390,7 +1391,24 @@ Manager::unregisterEventHandler(uintptr_t handlerId)
void
Manager::addTask(const std::function<bool()>&& task)
{
pendingTaskList_.emplace_back(task);
pendingTaskList_.emplace_back(std::move(task));
}
std::shared_ptr<Manager::Runnable>
Manager::scheduleTask(const std::function<void()>&& task, std::chrono::steady_clock::time_point when)
{
auto runnable = std::make_shared<Runnable>(std::move(task));
scheduleTask(runnable, when);
return runnable;
}
void
Manager::scheduleTask(std::shared_ptr<Runnable> task, std::chrono::steady_clock::time_point when)
{
std::lock_guard<std::mutex> lock(scheduledTasksMutex_);
scheduledTasks_.emplace(when, task);
RING_DBG("Task scheduled. Next in %lds", std::chrono::duration_cast<std::chrono::seconds>(scheduledTasks_.begin()->first - std::chrono::steady_clock::now()).count());
}
// Must be invoked periodically by a timer from the main event loop
......@@ -1417,6 +1435,22 @@ void Manager::pollEvents()
}
}
//-- Scheduled tasks
{
auto now = std::chrono::steady_clock::now();
std::lock_guard<std::mutex> lock(scheduledTasksMutex_);
while (not scheduledTasks_.empty() && scheduledTasks_.begin()->first <= now) {
auto f = scheduledTasks_.begin();
auto task = std::move(f->second->cb);
if (task)
pendingTaskList_.emplace_back([task](){
task();
return false;
});
scheduledTasks_.erase(f);
}
}
//-- Tasks
{
auto tmpList = std::move(pendingTaskList_);
......@@ -2695,20 +2729,45 @@ Manager::sendRegister(const std::string& accountID, bool enable)
acc->doUnregister();
}
void
uint64_t
Manager::sendTextMessage(const std::string& accountID, const std::string& to,
const std::map<std::string, std::string>& payloads)
{
const auto acc = getAccount(accountID);
if (!acc)
return;
return 0;
try {
acc->sendTextMessage(to, payloads);
return acc->sendTextMessage(to, payloads);
} catch (const std::exception& e) {
RING_ERR("Exception during text message sending: %s", e.what());
}
}
std::string
Manager::getMessageStatus(uint64_t id)
{
const auto& allAccounts = accountFactory_.getAllAccounts();
for (auto acc : allAccounts) {
auto status = acc->getMessageStatus(id);
if (status != im::MessageStatus::UNKNOWN) {
switch (status) {
case im::MessageStatus::IDLE:
case im::MessageStatus::SENDING:
return DRing::Account::MessageStates::SENDING;
case im::MessageStatus::SENT:
return DRing::Account::MessageStates::SENT;
case im::MessageStatus::READ:
return DRing::Account::MessageStates::READ;
case im::MessageStatus::FAILURE:
return DRing::Account::MessageStates::FAILURE;
default:
return DRing::Account::MessageStates::UNKNOWN;
}
}
}
return DRing::Account::MessageStates::UNKNOWN;
}
void
Manager::setAccountActive(const std::string& accountID, bool active)
{
......
......@@ -416,9 +416,11 @@ class Manager {
*/
void sendRegister(const std::string& accountId, bool enable);
void sendTextMessage(const std::string& accountID, const std::string& to,
uint64_t sendTextMessage(const std::string& accountID, const std::string& to,
const std::map<std::string, std::string>& payloads);
std::string getMessageStatus(uint64_t id);
/**
* Get account list
* @return std::vector<std::string> A list of accoundIDs
......@@ -976,6 +978,13 @@ class Manager {
void addTask(const std::function<bool()>&& task);
struct Runnable {
std::function<void()> cb;
Runnable(const std::function<void()>&& t) : cb(std::move(t)) {}
};
std::shared_ptr<Runnable> scheduleTask(const std::function<void()>&& task, std::chrono::steady_clock::time_point when);
void scheduleTask(std::shared_ptr<Runnable> task, std::chrono::steady_clock::time_point when);
#ifdef RING_VIDEO
std::shared_ptr<video::SinkClient> createSinkClient(const std::string& id="", bool mixer=false);
std::shared_ptr<video::SinkClient> getSinkClient(const std::string& id);
......@@ -989,6 +998,8 @@ class Manager {
decltype(eventHandlerMap_)::iterator nextEventHandler_;
std::list<std::function<bool()>> pendingTaskList_;
std::multimap<std::chrono::steady_clock::time_point, std::shared_ptr<Runnable>> scheduledTasks_;
std::mutex scheduledTasksMutex_;
/**
* Test if call is a valid call, i.e. have been created and stored in
......
......@@ -570,9 +570,7 @@ void
RingAccount::handlePendingCallList()
{
// Process pending call into a local list to not block threads depending on this list,
// as incoming call handlers. As example, current implementation of this processing depends on
// a future.get(), a blocking method.
// as incoming call handlers.
decltype(pendingCalls_) pending_calls;
{
std::lock_guard<std::mutex> lock(callsMutex_);
......@@ -953,17 +951,22 @@ RingAccount::doRegister_()
dht_.listen<dht::ImMessage>(
inboxKey,
[shared](dht::ImMessage&& v) {
[shared, inboxKey](dht::ImMessage&& v) {
auto& this_ = *shared.get();
auto res = this_.treatedMessages_.insert(v.id);
this_.saveTreatedMessages();
if (!res.second)
return true;
this_.saveTreatedMessages();
auto from = v.from.toString();
auto now = system_clock::to_time_t(system_clock::now());
std::map<std::string, std::string> payloads = {{"text/plain",
utf8_make_valid(v.msg)}};
shared->onTextMessage(from, payloads);
RING_WARN("Sending message confirmation %lu", v.id);
this_.dht_.putEncrypted(inboxKey,
v.from,
dht::ImMessage(v.id, std::string(), now));
return true;
}
);
......@@ -1395,26 +1398,65 @@ RingAccount::connectivityChanged()
}
void
RingAccount::sendTextMessage(const std::string& to,
const std::map<std::string, std::string>& payloads)