Skip to content
Snippets Groups Projects
Select Git revision
  • 6c0a0aaf6a9a3649bdba46efb815d707bbcbea5b
  • master default protected
  • release/202005
  • release/202001
  • release/201912
  • release/201911
  • release/releaseWindowsTestOne
  • release/windowsReleaseTest
  • release/releaseTest
  • release/releaseWindowsTest
  • release/201910
  • release/qt/201910
  • release/windows-test/201910
  • release/201908
  • release/201906
  • release/201905
  • release/201904
  • release/201903
  • release/201902
  • release/201901
  • release/201812
  • 4.0.0
  • 2.2.0
  • 2.1.0
  • 2.0.1
  • 2.0.0
  • 1.4.1
  • 1.4.0
  • 1.3.0
  • 1.2.0
  • 1.1.0
31 results

message_engine.cpp

Blame
    • Sébastien Blin's avatar
      6c0a0aaf
      jamiaccount: extract conversation's related code from class · 6c0a0aaf
      Sébastien Blin authored
      This patches introduces two new concepts in order to reduce the code
      of JamiAccount.
      
      ChannelHandlers to manages protocols logic. The idea of this class
      is to handle channels per protocol, accept and reject it.
      AccountModule, to be able to separate logic between call managements
      datatransfer, config and conversation but give an interface to
      detect when some events should occurs.
      
      Change-Id: I34ff07852c06d7266411f1ffb32b71a1834aba4f
      GitLab: #603
      6c0a0aaf
      History
      jamiaccount: extract conversation's related code from class
      Sébastien Blin authored
      This patches introduces two new concepts in order to reduce the code
      of JamiAccount.
      
      ChannelHandlers to manages protocols logic. The idea of this class
      is to handle channels per protocol, accept and reject it.
      AccountModule, to be able to separate logic between call managements
      datatransfer, config and conversation but give an interface to
      detect when some events should occurs.
      
      Change-Id: I34ff07852c06d7266411f1ffb32b71a1834aba4f
      GitLab: #603
    Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    message_engine.cpp 12.28 KiB
    /*
     *  Copyright (C) 2004-2021 Savoir-faire Linux Inc.
     *
     *  Author: Adrien Béraud <adrien.beraud@savoirfairelinux.com>
     *
     *  This program is free software; you can redistribute it and/or modify
     *  it under the terms of the GNU General Public License as published by
     *  the Free Software Foundation; either version 3 of the License, or
     *  (at your option) any later version.
     *
     *  This program is distributed in the hope that it will be useful,
     *  but WITHOUT ANY WARRANTY; without even the implied warranty of
     *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
     *  GNU General Public License for more details.
     *
     *  You should have received a copy of the GNU General Public License
     *  along with this program. If not, see <http://www.gnu.org/licenses/>.
     */
    
    #include "message_engine.h"
    #include "sip/sipaccountbase.h"
    #include "manager.h"
    #include "fileutils.h"
    
    #include "client/ring_signal.h"
    #include "jami/account_const.h"
    
    #include <opendht/thread_pool.h>
    #include <json/json.h>
    
    #include <fstream>
    
    namespace jami {
    namespace im {
    
    MessageEngine::MessageEngine(SIPAccountBase& acc, const std::string& path)
        : account_(acc)
        , savePath_(path)
    {
        auto found = savePath_.find_last_of(DIR_SEPARATOR_CH);
        auto dir = savePath_.substr(0, found);
        fileutils::check_dir(dir.c_str());
    }
    
    MessageToken
    MessageEngine::sendMessage(const std::string& to, const std::map<std::string, std::string>& payloads)
    {
        if (payloads.empty() or to.empty())
            return 0;
        MessageToken token;
        {
            std::lock_guard<std::mutex> lock(messagesMutex_);
            auto& peerMessages = messages_[to];
            do {
                token = std::uniform_int_distribution<MessageToken> {1, JAMI_ID_MAX_VAL}(account_.rand);
            } while (peerMessages.find(token) != peerMessages.end());
            auto m = peerMessages.emplace(token, Message {});
            m.first->second.to = to;
            m.first->second.payloads = payloads;
            save_();
        }
        runOnMainThread([this, to]() { retrySend(to); });
        return token;
    }
    
    void
    MessageEngine::onPeerOnline(const std::string& peer, bool retryOnTimeout)
    {
        retrySend(peer, retryOnTimeout);
    }
    
    void
    MessageEngine::retrySend(const std::string& peer, bool retryOnTimeout)
    {
        if (account_.getRegistrationState() != RegistrationState::REGISTERED) {
            return;
        }
        struct PendingMsg
        {
            MessageToken token;
            std::string to;
            std::map<std::string, std::string> payloads;
        };
        std::vector<PendingMsg> pending {};
        {
            std::lock_guard<std::mutex> lock(messagesMutex_);
            auto p = messages_.find(peer);
            if (p == messages_.end())
                return;
            auto& messages = p->second;
            for (auto m = messages.begin(); m != messages.end(); ++m) {
                if (m->second.status == MessageStatus::UNKNOWN
                    || m->second.status == MessageStatus::IDLE) {
                    m->second.status = MessageStatus::SENDING;
                    m->second.retried++;
                    m->second.last_op = clock::now();
                    pending.emplace_back(PendingMsg {m->first, m->second.to, m->second.payloads});
                }
            }
        }
        // avoid locking while calling callback
        for (const auto& p : pending) {
            JAMI_DBG() << "[message " << p.token << "] Retry sending";
            if (p.payloads.find("application/im-gitmessage-id") == p.payloads.end())
                emitSignal<DRing::ConfigurationSignal::AccountMessageStatusChanged>(
                    account_.getAccountID(),
                    "",
                    p.to,
                    std::to_string(p.token),
                    (int) DRing::Account::MessageStates::SENDING);
            account_.sendTextMessage(p.to, p.payloads, p.token, retryOnTimeout);
        }
    }
    
    MessageStatus
    MessageEngine::getStatus(MessageToken t) const
    {
        std::lock_guard<std::mutex> lock(messagesMutex_);
        for (const auto& p : messages_) {
            const auto m = p.second.find(t);
            if (m != p.second.end())
                return m->second.status;
        }
        return MessageStatus::UNKNOWN;
    }
    
    bool
    MessageEngine::cancel(MessageToken t)
    {
        std::lock_guard<std::mutex> lock(messagesMutex_);
        for (auto& p : messages_) {
            auto m = p.second.find(t);
            if (m != p.second.end()) {
                auto emit = m->second.payloads.find("application/im-gitmessage-id")
                            == m->second.payloads.end();
                m->second.status = MessageStatus::CANCELLED;
                if (emit)
                    emitSignal<DRing::ConfigurationSignal::AccountMessageStatusChanged>(
                        account_.getAccountID(),
                        "",
                        m->second.to,
                        std::to_string(t),
                        static_cast<int>(DRing::Account::MessageStates::CANCELLED));
                save_();
                return true;
            }
        }
        return false;
    }
    
    void
    MessageEngine::onMessageSent(const std::string& peer, MessageToken token, bool ok)
    {
        JAMI_DBG() << "[message " << token << "] Message sent: " << (ok ? "success" : "failure");
        std::lock_guard<std::mutex> lock(messagesMutex_);
        auto p = messages_.find(peer);
        if (p == messages_.end()) {
            JAMI_DBG() << "[message " << token << "] Can't find peer";
            return;
        }
        auto f = p->second.find(token);
        if (f != p->second.end()) {
            auto emit = f->second.payloads.find("application/im-gitmessage-id")
                        == f->second.payloads.end();
            if (f->second.status == MessageStatus::SENDING) {
                if (ok) {
                    f->second.status = MessageStatus::SENT;
                    JAMI_DBG() << "[message " << token << "] Status changed to SENT";
                    if (emit)
                        emitSignal<DRing::ConfigurationSignal::AccountMessageStatusChanged>(
                            account_.getAccountID(),
                            "",
                            f->second.to,
                            std::to_string(token),
                            static_cast<int>(DRing::Account::MessageStates::SENT));
                    save_();
                } else if (f->second.retried >= MAX_RETRIES) {
                    f->second.status = MessageStatus::FAILURE;
                    JAMI_DBG() << "[message " << token << "] Status changed to FAILURE";
                    if (emit)
                        emitSignal<DRing::ConfigurationSignal::AccountMessageStatusChanged>(
                            account_.getAccountID(),
                            "",
                            f->second.to,
                            std::to_string(token),
                            static_cast<int>(DRing::Account::MessageStates::FAILURE));
                    save_();
                } else {
                    f->second.status = MessageStatus::IDLE;
                    JAMI_DBG() << "[message " << token << "] Status changed to IDLE";
                }
            } else {
                JAMI_DBG() << "[message " << token << "] State is not SENDING";
            }
        } else {
            JAMI_DBG() << "[message " << token << "] Can't find message";
        }
    }
    
    void
    MessageEngine::onMessageDisplayed(const std::string& peer, MessageToken token, bool displayed)
    {
        if (not displayed)
            return;
        JAMI_DBG() << "[message " << token << "] Displayed by peer";
        emitSignal<DRing::ConfigurationSignal::AccountMessageStatusChanged>(
            account_.getAccountID(),
            "", /* No related conversation */
            peer,
            std::to_string(token),
            static_cast<int>(DRing::Account::MessageStates::DISPLAYED));
    }
    
    void
    MessageEngine::load()
    {
        try {
            Json::Value root;
            {
                std::lock_guard<std::mutex> lock(fileutils::getFileLock(savePath_));
                std::ifstream file;
                file.exceptions(std::ifstream::failbit | std::ifstream::badbit);
                fileutils::openStream(file, savePath_);
                if (file.is_open())
                    file >> root;
            }
            std::lock_guard<std::mutex> lock(messagesMutex_);
            long unsigned loaded {0};
            for (auto i = root.begin(); i != root.end(); ++i) {
                auto to = i.key().asString();
                auto& pmessages = *i;
                auto& p = messages_[to];
                for (auto m = pmessages.begin(); m != pmessages.end(); ++m) {
                    const auto& jmsg = *m;
                    MessageToken token = from_hex_string(m.key().asString());
                    Message msg;
                    msg.status = (MessageStatus) jmsg["status"].asInt();
                    msg.to = jmsg["to"].asString();
                    auto wall_time = std::chrono::system_clock::from_time_t(jmsg["last_op"].asInt64());
                    msg.last_op = clock::now() + (wall_time - std::chrono::system_clock::now());
                    msg.retried = jmsg.get("retried", 0).asUInt();
                    const auto& pl = jmsg["payload"];
                    for (auto p = pl.begin(); p != pl.end(); ++p)
                        msg.payloads[p.key().asString()] = p->asString();
                    p.emplace(token, std::move(msg));
                    loaded++;
                }
            }
            if (loaded > 0) {
                JAMI_DBG("[Account %s] loaded %lu messages from %s",
                         account_.getAccountID().c_str(),
                         loaded,
                         savePath_.c_str());
            }
        } catch (const std::exception& e) {
            JAMI_DBG("[Account %s] couldn't load messages from %s: %s",
                     account_.getAccountID().c_str(),
                     savePath_.c_str(),
                     e.what());
        }
    }
    
    void
    MessageEngine::save() const
    {
        std::lock_guard<std::mutex> lock(messagesMutex_);
        save_();
    }
    
    void
    MessageEngine::save_() const
    {
        try {
            Json::Value root(Json::objectValue);
            for (auto& c : messages_) {
                Json::Value peerRoot(Json::objectValue);
                for (auto& m : c.second) {
                    auto& v = m.second;
                    if (v.status == MessageStatus::FAILURE || v.status == MessageStatus::SENT
                        || v.status == MessageStatus::CANCELLED)
                        continue;
                    Json::Value msg;
                    msg["status"] = (int) (v.status == MessageStatus::SENDING ? MessageStatus::IDLE
                                                                              : v.status);
                    msg["to"] = v.to;
                    auto wall_time = std::chrono::system_clock::now()
                                     + std::chrono::duration_cast<std::chrono::system_clock::duration>(
                                         v.last_op - clock::now());
                    msg["last_op"] = (Json::Value::Int64) std::chrono::system_clock::to_time_t(
                        wall_time);
                    msg["retried"] = v.retried;
                    auto& payloads = msg["payload"];
                    for (const auto& p : v.payloads)
                        payloads[p.first] = p.second;
                    peerRoot[to_hex_string(m.first)] = std::move(msg);
                }
                root[c.first] = std::move(peerRoot);
            }
            // Save asynchronously
            dht::ThreadPool::computation().run([path = savePath_,
                                                root = std::move(root),
                                                accountID = account_.getAccountID(),
                                                messageNum = messages_.size()] {
                std::lock_guard<std::mutex> lock(fileutils::getFileLock(path));
                try {
                    Json::StreamWriterBuilder wbuilder;
                    wbuilder["commentStyle"] = "None";
                    wbuilder["indentation"] = "";
                    const std::unique_ptr<Json::StreamWriter> writer(wbuilder.newStreamWriter());
                    std::ofstream file;
                    file.exceptions(std::ifstream::failbit | std::ifstream::badbit);
                    fileutils::openStream(file, path, std::ios::trunc);
                    if (file.is_open())
                        writer->write(root, &file);
                } catch (const std::exception& e) {
                    JAMI_ERR("[Account %s] Couldn't save messages to %s: %s",
                             accountID.c_str(),
                             path.c_str(),
                             e.what());
                }
                JAMI_DBG("[Account %s] saved %zu messages to %s",
                         accountID.c_str(),
                         messageNum,
                         path.c_str());
            });
        } catch (const std::exception& e) {
            JAMI_ERR("[Account %s] couldn't save messages to %s: %s",
                     account_.getAccountID().c_str(),
                     savePath_.c_str(),
                     e.what());
        }
    }
    
    } // namespace im
    } // namespace jami