diff --git a/bin/dbus/configurationmanager-introspec.xml b/bin/dbus/configurationmanager-introspec.xml index 3cf3e73d4c883112ef4108ec716c75013fb600f8..431023769b9a647acab1b2da6a392b653c3e00f7 100644 --- a/bin/dbus/configurationmanager-introspec.xml +++ b/bin/dbus/configurationmanager-introspec.xml @@ -222,6 +222,27 @@ <arg type="s" name="to" direction="in"/> <annotation name="org.qtproject.QtDBus.QtTypeName.In2" value="MapStringString"/> <arg type="a{ss}" name="payloads" direction="in"/> + <arg type="t" name="id" direction="out"> + <tp:docstring> + The message ID. + </tp:docstring> + </arg> + </method> + + <method name="getMessageStatus" tp:name-for-bindings="getMessageStatus"> + <arg type="t" name="id" direction="in"/> + <arg type="s" name="status" direction="out"> + <tp:docstring> + The message status. + <ul> + <li>UNKNOWN: unknown message or message status</li> + <li>SENDING: message is being sent or waiting for peer confirmation.</li> + <li>SENT: message have been received from the other end.</li> + <li>READ: message have been read by the peer.</li> + <li>FAILURE: the message coudn't be delivered.</li> + </ul> + </tp:docstring> + </arg> </method> <signal name="incomingAccountMessage" tp:name-for-bindings="incomingAccountMessage"> @@ -235,6 +256,19 @@ <arg type="a{ss}" name="payloads"/> </signal> + <signal name="accountMessageStatus" tp:name-for-bindings="accountMessageStatus"> + <tp:added version="2.3.0"/> + <tp:docstring> + Notify clients that a sent text message status have changed + </tp:docstring> + <arg type="t" name="id"/> + <arg type="s" name="status"> + <tp:docstring>The new status of the message, see getMessageStatus for possible values.</tp:docstring> + </arg> + <annotation name="org.qtproject.QtDBus.QtTypeName.In2" value="MapStringString"/> + <arg type="a{ss}" name="payloads"/> + </signal> + <method name="setVolume" tp:name-for-bindings="setVolume"> <tp:docstring> <p>Sets the volume using a linear scale [0,100].</p> diff --git a/bin/dbus/dbusconfigurationmanager.cpp b/bin/dbus/dbusconfigurationmanager.cpp index 5bd6cf6681f883e6793efcb8147096b76356c09d..9cd02e5f2044998d07feaaf147062dc6ecf26d72 100644 --- a/bin/dbus/dbusconfigurationmanager.cpp +++ b/bin/dbus/dbusconfigurationmanager.cpp @@ -91,10 +91,16 @@ DBusConfigurationManager::registerAllAccounts(void) DRing::registerAllAccounts(); } -void -DBusConfigurationManager::sendTextMessage(const std::string& accountID, const std::string& to, const std::map<std::string, std::string>& payloads) +auto +DBusConfigurationManager::sendTextMessage(const std::string& accountID, const std::string& to, const std::map<std::string, std::string>& payloads) -> decltype(DRing::sendAccountTextMessage(accountID, to, payloads)) +{ + return DRing::sendAccountTextMessage(accountID, to, payloads); +} + +auto +DBusConfigurationManager::getMessageStatus(const uint64_t& id) -> decltype(DRing::getMessageStatus(id)) { - DRing::sendAccountTextMessage(accountID, to, payloads); + return DRing::getMessageStatus(id); } auto diff --git a/bin/dbus/dbusconfigurationmanager.h b/bin/dbus/dbusconfigurationmanager.h index 04e3b25735d50634cf00a34690c41ae5cc315fb6..931ebdf593b519d63918af1fe02bb3529acf0dd7 100644 --- a/bin/dbus/dbusconfigurationmanager.h +++ b/bin/dbus/dbusconfigurationmanager.h @@ -66,7 +66,8 @@ class DBusConfigurationManager : std::vector<std::string> getAccountList(); void sendRegister(const std::string& accoundID, const bool& enable); void registerAllAccounts(void); - void sendTextMessage(const std::string& accoundID, const std::string& to, const std::map<std::string, std::string>& payloads); + uint64_t sendTextMessage(const std::string& accoundID, const std::string& to, const std::map<std::string, std::string>& payloads); + std::string getMessageStatus(const uint64_t& id); std::map<std::string, std::string> getTlsDefaultSettings(); std::vector<std::string> getSupportedCiphers(const std::string& accountID); std::vector<unsigned> getCodecList(); diff --git a/src/account.h b/src/account.h index d9e88beebc82d71a5b97b835a950d4c060339474..052d2a48e76737d80181ec1267b27e694e4dd6e5 100644 --- a/src/account.h +++ b/src/account.h @@ -30,6 +30,7 @@ #include "noncopyable.h" #include "config/serializable.h" #include "registration_states.h" +#include "im/message_engine.h" #include "ip_utils.h" #include "media_codec.h" #include "logger.h" @@ -147,9 +148,17 @@ class Account : public Serializable, public std::enable_shared_from_this<Account /** * If supported, send a text message from this account. + * @return a token to query the message status */ - virtual void sendTextMessage(const std::string& to UNUSED, - const std::map<std::string, std::string>& payloads UNUSED) {}; + virtual uint64_t sendTextMessage(const std::string& to UNUSED, + const std::map<std::string, std::string>& payloads UNUSED) { return 0; } + + /** + * Return the status corresponding to the token. + */ + virtual im::MessageStatus getMessageStatus(uint64_t id UNUSED) const { + return im::MessageStatus::UNKNOWN; + } std::vector<std::shared_ptr<Call>> getCalls(); @@ -281,6 +290,12 @@ class Account : public Serializable, public std::enable_shared_from_this<Account virtual const IceTransportOptions getIceOptions() const noexcept; + /** + * Random generator engine + * Logical account state shall never rely on the state of the random generator. + */ + mutable std::mt19937_64 rand_; + private: NON_COPYABLE(Account); @@ -431,12 +446,6 @@ class Account : public Serializable, public std::enable_shared_from_this<Account */ std::string mailBox_; - /** - * Random generator engine - * Logical account state shall never rely on the state of the random generator. - */ - mutable std::mt19937_64 rand_; - /** * UPnP IGD controller and the mutex to access it */ diff --git a/src/client/configurationmanager.cpp b/src/client/configurationmanager.cpp index 0a83ed1209c7443b3799e6c7432f1e69a66490a0..d8fb9441afdff80bf41771a4ae27139d7ef5ab48 100644 --- a/src/client/configurationmanager.cpp +++ b/src/client/configurationmanager.cpp @@ -276,10 +276,16 @@ registerAllAccounts() ring::Manager::instance().registerAccounts(); } -void +uint64_t sendAccountTextMessage(const std::string& accountID, const std::string& to, const std::map<std::string, std::string>& payloads) { - ring::Manager::instance().sendTextMessage(accountID, to, payloads); + return ring::Manager::instance().sendTextMessage(accountID, to, payloads); +} + +std::string +getMessageStatus(uint64_t id) +{ + return ring::Manager::instance().getMessageStatus(id); } /* contact requests */ diff --git a/src/client/ring_signal.cpp b/src/client/ring_signal.cpp index 113c3f75cb998ccb4ab6fa1d58cd197f68066e09..214a7d5e63919ea2104fec519bfd71e7fd2f6849 100644 --- a/src/client/ring_signal.cpp +++ b/src/client/ring_signal.cpp @@ -65,6 +65,7 @@ getSignalHandlers() exported_callback<DRing::ConfigurationSignal::CertificateExpired>(), exported_callback<DRing::ConfigurationSignal::CertificateStateChanged>(), exported_callback<DRing::ConfigurationSignal::IncomingAccountMessage>(), + exported_callback<DRing::ConfigurationSignal::AccountMessageStatus>(), exported_callback<DRing::ConfigurationSignal::IncomingTrustRequest>(), exported_callback<DRing::ConfigurationSignal::MediaParametersChanged>(), exported_callback<DRing::ConfigurationSignal::Error>(), diff --git a/src/dring/account_const.h b/src/dring/account_const.h index b526a76188dcd6397b7af34fd5611017ecc6605f..2e0158ec40d98ba207d6f384280838917e69e791 100644 --- a/src/dring/account_const.h +++ b/src/dring/account_const.h @@ -58,6 +58,16 @@ constexpr static const char REQUEST_TIMEOUT [] = "Request Timeout"; } //namespace DRing::Account +namespace MessageStates { + +constexpr static const char UNKNOWN [] = "UNKNOWN"; +constexpr static const char SENDING [] = "SENDING"; +constexpr static const char SENT [] = "SENT"; +constexpr static const char READ [] = "READ"; +constexpr static const char FAILURE [] = "FAILURE"; + +} //namespace DRing::MessageStates + namespace VolatileProperties { constexpr static const char ACTIVE [] = "Account.active"; diff --git a/src/dring/configurationmanager_interface.h b/src/dring/configurationmanager_interface.h index 7789041b43e3e9d4777d494e7cc4d70ffa691fa4..8836b81717cb5b7b8c9a0dbca096c09a587247e8 100644 --- a/src/dring/configurationmanager_interface.h +++ b/src/dring/configurationmanager_interface.h @@ -49,7 +49,8 @@ void setAccountEnabled(const std::string& accountID, bool enable); std::vector<std::string> getAccountList(); void sendRegister(const std::string& accountID, bool enable); void registerAllAccounts(void); -void sendAccountTextMessage(const std::string& accountID, const std::string& to, const std::map<std::string, std::string>& payloads); +uint64_t sendAccountTextMessage(const std::string& accountID, const std::string& to, const std::map<std::string, std::string>& payloads); +std::string getMessageStatus(uint64_t id); std::map<std::string, std::string> getTlsDefaultSettings(); @@ -188,6 +189,10 @@ struct ConfigurationSignal { constexpr static const char* name = "IncomingAccountMessage"; using cb_type = void(const std::string& /*account_id*/, const std::string& /*from*/, const std::map<std::string, std::string>& /*payloads*/); }; + struct AccountMessageStatus { + constexpr static const char* name = "AccountMessageStatus"; + using cb_type = void(uint64_t /*message_id*/, const std::string& /*state*/); + }; struct IncomingTrustRequest { constexpr static const char* name = "IncomingTrustRequest"; using cb_type = void(const std::string& /*account_id*/, const std::string& /*from*/, const std::vector<uint8_t>& payload, time_t received); diff --git a/src/im/Makefile.am b/src/im/Makefile.am index d3315a25f7468539473547c871c21aa8e72448cb..f066238f416a480da8dd7da340292cbde6d4b03e 100644 --- a/src/im/Makefile.am +++ b/src/im/Makefile.am @@ -2,4 +2,4 @@ include $(top_srcdir)/globals.mak noinst_LTLIBRARIES = libim.la -libim_la_SOURCES = instant_messaging.cpp instant_messaging.h +libim_la_SOURCES = instant_messaging.cpp message_engine.cpp instant_messaging.h message_engine.h diff --git a/src/im/message_engine.cpp b/src/im/message_engine.cpp new file mode 100644 index 0000000000000000000000000000000000000000..1e1eb228661441156b05dcae78a5d5f047060945 --- /dev/null +++ b/src/im/message_engine.cpp @@ -0,0 +1,226 @@ +/* + * Copyright (C) 2016 Savoir-faire Linux Inc. + * + * Author: Adrien Béraud <adrien.beraud@savoirfairelinux.com> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "message_engine.h" +#include "sip/sipaccountbase.h" +#include "manager.h" + +#include "client/ring_signal.h" +#include "dring/account_const.h" + +#include <json/json.h> + +#include <fstream> + +namespace ring { +namespace im { + +static std::uniform_int_distribution<MessageToken> udist {1}; +const std::chrono::minutes MessageEngine::RETRY_PERIOD = std::chrono::minutes(1); + +MessageEngine::MessageEngine(SIPAccountBase& acc, const std::string& path) : account_(acc), savePath_(path) +{} + +MessageToken +MessageEngine::sendMessage(const std::string& to, const std::map<std::string, std::string>& payloads) +{ + if (payloads.empty() or to.empty()) + return 0; + MessageToken token; + { + std::lock_guard<std::mutex> lock(messagesMutex_); + do { + token = udist(account_.rand_); + } while (messages_.find(token) != messages_.end()); + auto m = messages_.emplace(token, Message{}); + m.first->second.to = to; + m.first->second.payloads = payloads; + } + save(); + runOnMainThread([this,token]() { + std::lock_guard<std::mutex> lock(messagesMutex_); + auto m = messages_.find(token); + if (m != messages_.end()) + trySend(m); + }); + return token; +} + +void +MessageEngine::reschedule() +{ + std::lock_guard<std::mutex> lock(messagesMutex_); + if (messages_.empty()) + return; + std::weak_ptr<Account> w = std::static_pointer_cast<Account>(account_.shared_from_this()); + auto next = nextEvent(); + if (next != clock::time_point::max()) + Manager::instance().scheduleTask([w,this](){ + if (auto s = w.lock()) + retrySend(); + }, next); +} + +MessageEngine::clock::time_point +MessageEngine::nextEvent() const +{ + auto next = clock::time_point::max(); + for (const auto& m : messages_) { + if (m.second.status == MessageStatus::UNKNOWN || m.second.status == MessageStatus::IDLE) { + auto next_op = m.second.last_op + RETRY_PERIOD; + if (next_op < next) + next = next_op; + } + } + return next; +} + +void +MessageEngine::retrySend() +{ + auto now = clock::now(); + std::lock_guard<std::mutex> lock(messagesMutex_); + for (auto m = messages_.begin(); m != messages_.end(); ++m) { + if (m->second.status == MessageStatus::UNKNOWN || m->second.status == MessageStatus::IDLE) { + auto next_op = m->second.last_op + RETRY_PERIOD; + if (next_op <= now) + trySend(m); + } + } +} + +MessageStatus +MessageEngine::getStatus(MessageToken t) const +{ + std::lock_guard<std::mutex> lock(messagesMutex_); + const auto m = messages_.find(t); + return (m == messages_.end()) ? MessageStatus::UNKNOWN : m->second.status; +} + +void +MessageEngine::trySend(decltype(MessageEngine::messages_)::iterator m) +{ + if (m->second.status != MessageStatus::IDLE && + m->second.status != MessageStatus::UNKNOWN) { + RING_WARN("Can't send message in status %d", (int)m->second.status); + return; + } + RING_DBG("Retrying to send message %lu", m->first); + m->second.status = MessageStatus::SENDING; + m->second.retried++; + m->second.last_op = clock::now(); + emitSignal<DRing::ConfigurationSignal::AccountMessageStatus>(m->first, DRing::Account::MessageStates::SENDING); + account_.sendTextMessage(m->second.to, m->second.payloads, m->first); +} + +void +MessageEngine::onMessageSent(MessageToken token, bool ok) +{ + RING_WARN("Message %lu: %s", token, ok ? "success" : "failure"); + std::lock_guard<std::mutex> lock(messagesMutex_); + auto f = messages_.find(token); + if (f != messages_.end()) { + if (f->second.status == MessageStatus::SENDING) { + if (ok) { + f->second.status = MessageStatus::SENT; + emitSignal<DRing::ConfigurationSignal::AccountMessageStatus>(token, DRing::Account::MessageStates::SENT); + } else if (f->second.retried == MAX_RETRIES) { + f->second.status = MessageStatus::FAILURE; + emitSignal<DRing::ConfigurationSignal::AccountMessageStatus>(token, DRing::Account::MessageStates::FAILURE); + } else { + f->second.status = MessageStatus::IDLE; + // TODO: reschedule sending + } + } + } +} + +void +MessageEngine::load() +{ + try { + std::ifstream file; + file.exceptions(std::ifstream::failbit | std::ifstream::badbit); + file.open(savePath_); + + Json::Value root; + Json::Reader reader; + if (!reader.parse(file, root)) + throw std::runtime_error("can't parse JSON."); + + std::lock_guard<std::mutex> lock(messagesMutex_); + + for (auto i = root.begin(); i != root.end(); ++i) { + const auto& jmsg = *i; + MessageToken token; + std::istringstream iss(i.key().asString()); + iss >> std::hex >> token; + Message msg; + msg.status = (MessageStatus)jmsg["status"].asInt(); + msg.to = jmsg["to"].asString(); + auto wall_time = std::chrono::system_clock::from_time_t(jmsg["last_op"].asInt64()); + msg.last_op = clock::now() + (wall_time - std::chrono::system_clock::now()); + msg.retried = jmsg.get("retried", 0).asUInt(); + const auto& pl = jmsg["payload"]; + for (auto p = pl.begin(); p != pl.end(); ++p) + msg.payloads[p.key().asString()] = p->asString(); + messages_.emplace(token, std::move(msg)); + } + + // everything whent fine, removing the file + std::remove(savePath_.c_str()); + } catch (const std::exception& e) { + RING_ERR("Could not load messages from %s: %s", savePath_.c_str(), e.what()); + } + reschedule(); +} + +void +MessageEngine::save() const +{ + try { + Json::Value root(Json::objectValue); + std::unique_lock<std::mutex> lock(messagesMutex_); + for (auto& c : messages_) { + auto& v = c.second; + Json::Value msg; + std::ostringstream msgsId; + msgsId << std::hex << c.first; + msg["status"] = (int)(v.status == MessageStatus::SENDING ? MessageStatus::IDLE : v.status); + msg["to"] = v.to; + auto wall_time = std::chrono::system_clock::now() + std::chrono::duration_cast<std::chrono::system_clock::duration>(v.last_op - clock::now()); + msg["last_op"] = (Json::Value::Int64) std::chrono::system_clock::to_time_t(wall_time); + msg["retried"] = v.retried; + auto& payloads = msg["payload"]; + for (const auto& p : v.payloads) + payloads[p.first] = p.second; + root[msgsId.str()] = std::move(msg); + } + lock.unlock(); + Json::FastWriter fastWriter; + std::ofstream file; + file.exceptions(std::ifstream::failbit | std::ifstream::badbit); + file.open(savePath_, std::ios::trunc); + file << fastWriter.write(root); + } catch (const std::exception& e) { + RING_ERR("Could not save messages to %s: %s", savePath_.c_str(), e.what()); + } +} + +}} diff --git a/src/im/message_engine.h b/src/im/message_engine.h new file mode 100644 index 0000000000000000000000000000000000000000..8f737654c7a546d1190451478519f2a340909552 --- /dev/null +++ b/src/im/message_engine.h @@ -0,0 +1,98 @@ +/* + * Copyright (C) 2016 Savoir-faire Linux Inc. + * + * Author: Adrien Béraud <adrien.beraud@savoirfairelinux.com> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#pragma once + +#include <string> +#include <map> +#include <chrono> +#include <mutex> +#include <cstdint> + +namespace ring { + +class SIPAccountBase; + +namespace im { + +using MessageToken = uint64_t; + +enum class MessageStatus { + UNKNOWN = 0, + IDLE, + SENDING, + SENT, + READ, + FAILURE +}; + +class MessageEngine +{ +public: + + MessageEngine(SIPAccountBase&, const std::string& path); + + MessageToken sendMessage(const std::string& to, const std::map<std::string, std::string>& payloads); + + MessageStatus getStatus(MessageToken t) const; + + bool isSent(MessageToken t) const { + return getStatus(t) == MessageStatus::SENT; + } + + void onMessageSent(MessageToken t, bool success); + + /** + * Load persisted messages + */ + void load(); + + /** + * Persist messages + */ + void save() const; + +private: + + static const constexpr unsigned MAX_RETRIES = 3; + static const std::chrono::minutes RETRY_PERIOD; + using clock = std::chrono::steady_clock; + + clock::time_point nextEvent() const; + void retrySend(); + void reschedule(); + + struct Message { + std::string to; + std::map<std::string, std::string> payloads; + MessageStatus status {MessageStatus::UNKNOWN}; + unsigned retried {0}; + clock::time_point last_op; + }; + + SIPAccountBase& account_; + const std::string savePath_; + + std::map<MessageToken, Message> messages_; + mutable std::mutex messagesMutex_ {}; + + void trySend(decltype(messages_)::iterator); +}; + +}} // namespace ring::im diff --git a/src/manager.cpp b/src/manager.cpp index f389070faf1363bd0976ece3bf93764028b156f1..54a91831073e3011e6993bdebf9909189dc7ea4c 100644 --- a/src/manager.cpp +++ b/src/manager.cpp @@ -76,6 +76,7 @@ using random_device = std::random_device; #include "client/ring_signal.h" #include "dring/call_const.h" +#include "dring/account_const.h" #include "libav_utils.h" #include "video/sinkclient.h" @@ -1390,7 +1391,24 @@ Manager::unregisterEventHandler(uintptr_t handlerId) void Manager::addTask(const std::function<bool()>&& task) { - pendingTaskList_.emplace_back(task); + pendingTaskList_.emplace_back(std::move(task)); +} + +std::shared_ptr<Manager::Runnable> +Manager::scheduleTask(const std::function<void()>&& task, std::chrono::steady_clock::time_point when) +{ + auto runnable = std::make_shared<Runnable>(std::move(task)); + scheduleTask(runnable, when); + return runnable; +} + + +void +Manager::scheduleTask(std::shared_ptr<Runnable> task, std::chrono::steady_clock::time_point when) +{ + std::lock_guard<std::mutex> lock(scheduledTasksMutex_); + scheduledTasks_.emplace(when, task); + RING_DBG("Task scheduled. Next in %lds", std::chrono::duration_cast<std::chrono::seconds>(scheduledTasks_.begin()->first - std::chrono::steady_clock::now()).count()); } // Must be invoked periodically by a timer from the main event loop @@ -1417,6 +1435,22 @@ void Manager::pollEvents() } } + //-- Scheduled tasks + { + auto now = std::chrono::steady_clock::now(); + std::lock_guard<std::mutex> lock(scheduledTasksMutex_); + while (not scheduledTasks_.empty() && scheduledTasks_.begin()->first <= now) { + auto f = scheduledTasks_.begin(); + auto task = std::move(f->second->cb); + if (task) + pendingTaskList_.emplace_back([task](){ + task(); + return false; + }); + scheduledTasks_.erase(f); + } + } + //-- Tasks { auto tmpList = std::move(pendingTaskList_); @@ -2695,20 +2729,45 @@ Manager::sendRegister(const std::string& accountID, bool enable) acc->doUnregister(); } -void +uint64_t Manager::sendTextMessage(const std::string& accountID, const std::string& to, const std::map<std::string, std::string>& payloads) { const auto acc = getAccount(accountID); if (!acc) - return; + return 0; try { - acc->sendTextMessage(to, payloads); + return acc->sendTextMessage(to, payloads); } catch (const std::exception& e) { RING_ERR("Exception during text message sending: %s", e.what()); } } +std::string +Manager::getMessageStatus(uint64_t id) +{ + const auto& allAccounts = accountFactory_.getAllAccounts(); + for (auto acc : allAccounts) { + auto status = acc->getMessageStatus(id); + if (status != im::MessageStatus::UNKNOWN) { + switch (status) { + case im::MessageStatus::IDLE: + case im::MessageStatus::SENDING: + return DRing::Account::MessageStates::SENDING; + case im::MessageStatus::SENT: + return DRing::Account::MessageStates::SENT; + case im::MessageStatus::READ: + return DRing::Account::MessageStates::READ; + case im::MessageStatus::FAILURE: + return DRing::Account::MessageStates::FAILURE; + default: + return DRing::Account::MessageStates::UNKNOWN; + } + } + } + return DRing::Account::MessageStates::UNKNOWN; +} + void Manager::setAccountActive(const std::string& accountID, bool active) { diff --git a/src/manager.h b/src/manager.h index a792490c7634da2bee7c3c044b636e8393908d05..476e239ef21be8c5786506476264473578ec4c63 100644 --- a/src/manager.h +++ b/src/manager.h @@ -416,9 +416,11 @@ class Manager { */ void sendRegister(const std::string& accountId, bool enable); - void sendTextMessage(const std::string& accountID, const std::string& to, + uint64_t sendTextMessage(const std::string& accountID, const std::string& to, const std::map<std::string, std::string>& payloads); + std::string getMessageStatus(uint64_t id); + /** * Get account list * @return std::vector<std::string> A list of accoundIDs @@ -976,6 +978,13 @@ class Manager { void addTask(const std::function<bool()>&& task); + struct Runnable { + std::function<void()> cb; + Runnable(const std::function<void()>&& t) : cb(std::move(t)) {} + }; + std::shared_ptr<Runnable> scheduleTask(const std::function<void()>&& task, std::chrono::steady_clock::time_point when); + void scheduleTask(std::shared_ptr<Runnable> task, std::chrono::steady_clock::time_point when); + #ifdef RING_VIDEO std::shared_ptr<video::SinkClient> createSinkClient(const std::string& id="", bool mixer=false); std::shared_ptr<video::SinkClient> getSinkClient(const std::string& id); @@ -989,6 +998,8 @@ class Manager { decltype(eventHandlerMap_)::iterator nextEventHandler_; std::list<std::function<bool()>> pendingTaskList_; + std::multimap<std::chrono::steady_clock::time_point, std::shared_ptr<Runnable>> scheduledTasks_; + std::mutex scheduledTasksMutex_; /** * Test if call is a valid call, i.e. have been created and stored in diff --git a/src/ringdht/ringaccount.cpp b/src/ringdht/ringaccount.cpp index b1edb6099ae6ea0d8282e96b79341545fbd90313..15658d2bb503376a6eccc89d800cdfd5d6612202 100644 --- a/src/ringdht/ringaccount.cpp +++ b/src/ringdht/ringaccount.cpp @@ -570,9 +570,7 @@ void RingAccount::handlePendingCallList() { // Process pending call into a local list to not block threads depending on this list, - // as incoming call handlers. As example, current implementation of this processing depends on - // a future.get(), a blocking method. - + // as incoming call handlers. decltype(pendingCalls_) pending_calls; { std::lock_guard<std::mutex> lock(callsMutex_); @@ -953,17 +951,22 @@ RingAccount::doRegister_() dht_.listen<dht::ImMessage>( inboxKey, - [shared](dht::ImMessage&& v) { + [shared, inboxKey](dht::ImMessage&& v) { auto& this_ = *shared.get(); auto res = this_.treatedMessages_.insert(v.id); - this_.saveTreatedMessages(); if (!res.second) return true; + this_.saveTreatedMessages(); auto from = v.from.toString(); + auto now = system_clock::to_time_t(system_clock::now()); std::map<std::string, std::string> payloads = {{"text/plain", utf8_make_valid(v.msg)}}; shared->onTextMessage(from, payloads); + RING_WARN("Sending message confirmation %lu", v.id); + this_.dht_.putEncrypted(inboxKey, + v.from, + dht::ImMessage(v.id, std::string(), now)); return true; } ); @@ -1395,26 +1398,65 @@ RingAccount::connectivityChanged() } void -RingAccount::sendTextMessage(const std::string& to, - const std::map<std::string, std::string>& payloads) +RingAccount::sendTextMessage(const std::string& to, const std::map<std::string, std::string>& payloads, uint64_t token) { - if (to.empty() or payloads.empty()) + if (to.empty() or payloads.empty()) { + messageEngine_.onMessageSent(token, false); return; + } - const auto& toUri = parseRingUri(to); - auto now = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()); + auto toUri = parseRingUri(to); + auto toH = dht::InfoHash(toUri); + auto now = system_clock::to_time_t(system_clock::now()); // Single-part message if (payloads.size() == 1) { - dht_.putEncrypted(dht::InfoHash::get("inbox:"+toUri), - dht::InfoHash(toUri), - dht::ImMessage(udist(rand_), std::string(payloads.begin()->second), now)); + auto e = sentMessages_.emplace(token, PendingMessage {}); + e.first->second.to = toH; + + auto h = dht::InfoHash::get("inbox:"+toUri); + std::weak_ptr<RingAccount> wshared = std::static_pointer_cast<RingAccount>(shared_from_this()); + + dht_.listen<dht::ImMessage>(h, [wshared,token](dht::ImMessage&& msg) { + if (auto this_ = wshared.lock()) { + // check expected message confirmation + auto e = this_->sentMessages_.find(msg.id); + if (e == this_->sentMessages_.end() || e->second.to != msg.from) { + RING_WARN("Unrelated text message reply"); + return true; + } + this_->sentMessages_.erase(e); + RING_WARN("Relevent text message reply for %lu", token); + + // add treated message + auto res = this_->treatedMessages_.insert(msg.id); + if (!res.second) + return true; + + this_->saveTreatedMessages(); + + // report message as confirmed received + this_->messageEngine_.onMessageSent(token, true); + } + return false; + }); + + dht_.putEncrypted(h, + toH, + dht::ImMessage(token, std::string(payloads.begin()->second), now), + [wshared,token](bool ok) { + if (auto this_ = wshared.lock()) { + if (not ok) + this_->messageEngine_.onMessageSent(token, false); + } + }); return; } // Multi-part message // TODO: not supported yet RING_ERR("Multi-part im is not supported yet by RingAccount"); + messageEngine_.onMessageSent(token, false); } } // namespace ring diff --git a/src/ringdht/ringaccount.h b/src/ringdht/ringaccount.h index 0d899cf15386628df5e1d625fce64f2412a9b63b..6fda37811ed7c721f833b3179d775995947af7bd 100644 --- a/src/ringdht/ringaccount.h +++ b/src/ringdht/ringaccount.h @@ -252,7 +252,7 @@ class RingAccount : public SIPAccountBase { bool discardTrustRequest(const std::string& from); void sendTrustRequest(const std::string& to, const std::vector<uint8_t>& payload); - virtual void sendTextMessage(const std::string& to, const std::map<std::string, std::string>& payloads) override; + virtual void sendTextMessage(const std::string& to, const std::map<std::string, std::string>& payloads, uint64_t id) override; void connectivityChanged(); @@ -313,9 +313,16 @@ class RingAccount : public SIPAccountBase { */ std::list<PendingCall> pendingSipCalls_ {}; std::set<dht::Value::Id> treatedCalls_ {}; - std::set<dht::Value::Id> treatedMessages_ {}; mutable std::mutex callsMutex_ {}; + struct PendingMessage { + dht::InfoHash to; + std::chrono::steady_clock::time_point received; + }; + + std::map<dht::Value::Id, PendingMessage> sentMessages_ {}; + std::set<dht::Value::Id> treatedMessages_ {}; + std::string idPath_ {}; std::string cachePath_ {}; std::string dataPath_ {}; diff --git a/src/sip/sipaccount.cpp b/src/sip/sipaccount.cpp index ead4223ad97daaf48f173deacf0855aef7e57fcd..5e21a848eeb2ab3fd516e198442bca6d1599a133 100644 --- a/src/sip/sipaccount.cpp +++ b/src/sip/sipaccount.cpp @@ -1658,7 +1658,7 @@ SIPAccount::setRegistrationState(RegistrationState state, unsigned details_code, if (description) details_str = {description->ptr, (size_t)description->slen}; setRegistrationStateDetailed({details_code, details_str}); - Account::setRegistrationState(state, details_code, details_str); + SIPAccountBase::setRegistrationState(state, details_code, details_str); } std::string SIPAccount::getUserAgentName() const @@ -2070,11 +2070,13 @@ static pjsip_accept_hdr* im_create_accept(pj_pool_t *pool) #endif void -SIPAccount::sendTextMessage(const std::string& to, - const std::map<std::string, std::string>& payloads) +SIPAccount::sendTextMessage(const std::string& to, const std::map<std::string, std::string>& payloads, uint64_t id) { - if (to.empty() or payloads.empty()) + if (to.empty() or payloads.empty()) { + RING_WARN("No sender or payload"); + messageEngine_.onMessageSent(id, false); return; + } std::string toUri; if (to.find("sip:") != std::string::npos or @@ -2095,6 +2097,7 @@ SIPAccount::sendTextMessage(const std::string& to, nullptr, &tdata); if (status != PJ_SUCCESS) { RING_ERR("Unable to create request: %s", sip_utils::sip_strerror(status).c_str()); + messageEngine_.onMessageSent(id, false); return; } @@ -2103,9 +2106,31 @@ SIPAccount::sendTextMessage(const std::string& to, im::fillPJSIPMessageBody(*tdata, payloads); - status = pjsip_endpt_send_request(link_->getEndpoint(), tdata, -1, nullptr, nullptr); + struct ctx { + std::weak_ptr<SIPAccount> acc; + uint64_t id; + }; + ctx* t = new ctx; + t->acc = std::static_pointer_cast<SIPAccount>(shared_from_this()); + t->id = id; + + status = pjsip_endpt_send_request(link_->getEndpoint(), tdata, -1, t, [](void *token, pjsip_event *e) { + auto c = (ctx*) token; + try { + if (auto acc = c->acc.lock()) { + acc->messageEngine_.onMessageSent(c->id, e + && e->body.tsx_state.tsx + && e->body.tsx_state.tsx->status_code == PJSIP_SC_OK); + } + } catch (const std::exception& e) { + RING_ERR("Error calling message callback: %s", e.what()); + } + delete c; + }); + if (status != PJ_SUCCESS) { RING_ERR("Unable to send request: %s", sip_utils::sip_strerror(status).c_str()); + delete t; return; } } diff --git a/src/sip/sipaccount.h b/src/sip/sipaccount.h index 837b928bb0b803ca2c20f9a3892f3671c8d93f4c..7a63bf4dcb01fca5751450763aeb1f11f3d96543 100644 --- a/src/sip/sipaccount.h +++ b/src/sip/sipaccount.h @@ -490,7 +490,8 @@ class SIPAccount : public SIPAccountBase { void onRegister(pjsip_regc_cbparam *param); virtual void sendTextMessage(const std::string& to, - const std::map<std::string, std::string>& payloads) override; + const std::map<std::string, std::string>& payloads, + uint64_t id) override; private: void doRegister1_(); diff --git a/src/sip/sipaccountbase.cpp b/src/sip/sipaccountbase.cpp index 762d45f7009cfba28b76b7f499bfeefaeb50648a..e1db0a592383b2c2259fa4b2c79ad4e566a94cb8 100644 --- a/src/sip/sipaccountbase.cpp +++ b/src/sip/sipaccountbase.cpp @@ -39,13 +39,14 @@ #include "client/ring_signal.h" #include "string_utils.h" +#include "fileutils.h" #include <type_traits> namespace ring { SIPAccountBase::SIPAccountBase(const std::string& accountID) - : Account(accountID), link_(getSIPVoIPLink()) + : Account(accountID), link_(getSIPVoIPLink()), messageEngine_(*this, fileutils::get_cache_dir()+DIR_SEPARATOR_STR+getAccountID()+DIR_SEPARATOR_STR "messages") {} SIPAccountBase::~SIPAccountBase() {} @@ -260,6 +261,17 @@ SIPAccountBase::getVolatileAccountDetails() const return a; } + +void +SIPAccountBase::setRegistrationState(RegistrationState state, unsigned details_code, const std::string& details_str) +{ + if (state == RegistrationState::REGISTERED && registrationState_ != RegistrationState::REGISTERED) + messageEngine_.load(); + else if (state != RegistrationState::REGISTERED && registrationState_ == RegistrationState::REGISTERED) + messageEngine_.save(); + Account::setRegistrationState(state, details_code, details_str); +} + auto SIPAccountBase::getPortsReservation() noexcept -> decltype(getPortsReservation()) { diff --git a/src/sip/sipaccountbase.h b/src/sip/sipaccountbase.h index 395da307fe77134b45dc28d1dc11534b475c5dec..a4fd8284257a356bfc36f31d667c37e963ff110d 100644 --- a/src/sip/sipaccountbase.h +++ b/src/sip/sipaccountbase.h @@ -31,6 +31,7 @@ #include "ip_utils.h" #include "noncopyable.h" #include "security/certstore.h" +#include "im/message_engine.h" #include <pjsip/sip_types.h> #include <opendht/value.h> @@ -233,6 +234,17 @@ public: const IceTransportOptions getIceOptions() const noexcept override; + virtual void sendTextMessage(const std::string& to, const std::map<std::string, std::string>& payloads, uint64_t id) = 0; + + virtual uint64_t sendTextMessage(const std::string& to, + const std::map<std::string, std::string>& payloads) override { + return messageEngine_.sendMessage(to, payloads); + } + + virtual im::MessageStatus getMessageStatus(uint64_t id) const override { + return messageEngine_.getStatus(id); + } + void onTextMessage(const std::string& from, const std::map<std::string, std::string>& payloads); protected: @@ -250,6 +262,10 @@ protected: */ virtual std::map<std::string, std::string> getVolatileAccountDetails() const override; + virtual void setRegistrationState(RegistrationState state, unsigned code=0, const std::string& detail_str={}) override; + + im::MessageEngine messageEngine_; + /** * Voice over IP Link contains a listener thread and calls */