Skip to content
Snippets Groups Projects
Select Git revision
  • 2eadfd7224d1a60ef8bfc3f52dd151b2acc58f75
  • master default protected
  • release/202005
  • release/202001
  • release/201912
  • release/201911
  • release/releaseWindowsTestOne
  • release/windowsReleaseTest
  • release/releaseTest
  • release/releaseWindowsTest
  • release/201910
  • release/qt/201910
  • release/windows-test/201910
  • release/201908
  • release/201906
  • release/201905
  • release/201904
  • release/201903
  • release/201902
  • release/201901
  • release/201812
  • 4.0.0
  • 2.2.0
  • 2.1.0
  • 2.0.1
  • 2.0.0
  • 1.4.1
  • 1.4.0
  • 1.3.0
  • 1.2.0
  • 1.1.0
31 results

configurationpanelui.cpp

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    message_engine.cpp 10.88 KiB
    /*
     *  Copyright (C) 2004-2020 Savoir-faire Linux Inc.
     *
     *  Author: Adrien Béraud <adrien.beraud@savoirfairelinux.com>
     *
     *  This program is free software; you can redistribute it and/or modify
     *  it under the terms of the GNU General Public License as published by
     *  the Free Software Foundation; either version 3 of the License, or
     *  (at your option) any later version.
     *
     *  This program is distributed in the hope that it will be useful,
     *  but WITHOUT ANY WARRANTY; without even the implied warranty of
     *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
     *  GNU General Public License for more details.
     *
     *  You should have received a copy of the GNU General Public License
     *  along with this program. If not, see <http://www.gnu.org/licenses/>.
     */
    
    #include "message_engine.h"
    #include "sip/sipaccountbase.h"
    #include "manager.h"
    #include "fileutils.h"
    
    #include "client/ring_signal.h"
    #include "dring/account_const.h"
    
    #include <opendht/thread_pool.h>
    #include <json/json.h>
    
    #include <fstream>
    
    namespace jami {
    namespace im {
    
    MessageEngine::MessageEngine(SIPAccountBase& acc, const std::string& path) : account_(acc), savePath_(path)
    {}
    
    MessageToken
    MessageEngine::sendMessage(const std::string& to, const std::map<std::string, std::string>& payloads)
    {
        if (payloads.empty() or to.empty())
            return 0;
        MessageToken token;
        {
            std::lock_guard<std::mutex> lock(messagesMutex_);
            auto& peerMessages = messages_[to];
            do {
                token = std::uniform_int_distribution<MessageToken>{1, DRING_ID_MAX_VAL}(account_.rand);
            } while (peerMessages.find(token) != peerMessages.end());
            auto m = peerMessages.emplace(token, Message{});
            m.first->second.to = to;
            m.first->second.payloads = payloads;
            save_();
        }
        runOnMainThread([this, to]() {
            retrySend(to);
        });
        return token;
    }
    
    void
    MessageEngine::onPeerOnline(const std::string& peer, bool retryOnTimeout)
    {
        retrySend(peer, retryOnTimeout);
    }
    
    void
    MessageEngine::retrySend(const std::string& peer, bool retryOnTimeout)
    {
        struct PendingMsg {
            MessageToken token;
            std::string to;
            std::map<std::string, std::string> payloads;
        };
        std::vector<PendingMsg> pending {};
        {
            std::lock_guard<std::mutex> lock(messagesMutex_);
            auto p = messages_.find(peer);
            if (p == messages_.end())
                return;
            auto& messages = p->second;
            for (auto m = messages.begin(); m != messages.end(); ++m) {
                if (m->second.status == MessageStatus::UNKNOWN || m->second.status == MessageStatus::IDLE) {
                    m->second.status = MessageStatus::SENDING;
                    m->second.retried++;
                    m->second.last_op = clock::now();
                    pending.emplace_back(PendingMsg {m->first, m->second.to, m->second.payloads});
                }
            }
        }
        // avoid locking while calling callback
        for (const auto& p : pending) {
            JAMI_DBG() << "[message " << p.token << "] Retry sending";
            emitSignal<DRing::ConfigurationSignal::AccountMessageStatusChanged>(
                account_.getAccountID(),
                p.token,
                p.to,
                (int)DRing::Account::MessageStates::SENDING);
            account_.sendTextMessage(p.to, p.payloads, p.token, retryOnTimeout);
        }
    }
    
    MessageStatus
    MessageEngine::getStatus(MessageToken t) const
    {
        std::lock_guard<std::mutex> lock(messagesMutex_);
        for (const auto& p : messages_) {
            const auto m = p.second.find(t);
            if (m != p.second.end())
                return m->second.status;
        }
        return MessageStatus::UNKNOWN;
    }
    
    bool
    MessageEngine::cancel(MessageToken t)
    {
        std::lock_guard<std::mutex> lock(messagesMutex_);
        for (auto& p : messages_) {
            auto m = p.second.find(t);
            if (m != p.second.end()) {
                m->second.status = MessageStatus::CANCELLED;
                emitSignal<DRing::ConfigurationSignal::AccountMessageStatusChanged>(account_.getAccountID(),
                                                                                t,
                                                                                m->second.to,
                                                                                static_cast<int>(DRing::Account::MessageStates::CANCELLED));
                save_();
                return true;
            }
        }
        return false;
    }
    
    void
    MessageEngine::onMessageSent(const std::string& peer, MessageToken token, bool ok)
    {
        JAMI_DBG() << "[message " << token << "] Message sent: " << (ok ? "success" : "failure");
        std::lock_guard<std::mutex> lock(messagesMutex_);
        auto p = messages_.find(peer);
        if (p == messages_.end()) {
            JAMI_DBG() << "[message " << token << "] Can't find peer";
            return;
        }
        auto f = p->second.find(token);
        if (f != p->second.end()) {
            if (f->second.status == MessageStatus::SENDING) {
                if (ok) {
                    f->second.status = MessageStatus::SENT;
                    JAMI_DBG() << "[message " << token << "] Status changed to SENT";
                    emitSignal<DRing::ConfigurationSignal::AccountMessageStatusChanged>(account_.getAccountID(),
                                                                                 token,
                                                                                 f->second.to,
                                                                                 static_cast<int>(DRing::Account::MessageStates::SENT));
                    save_();
                } else if (f->second.retried >= MAX_RETRIES) {
                    f->second.status = MessageStatus::FAILURE;
                    JAMI_DBG() << "[message " << token << "] Status changed to FAILURE";
                    emitSignal<DRing::ConfigurationSignal::AccountMessageStatusChanged>(account_.getAccountID(),
                                                                                 token,
                                                                                 f->second.to,
                                                                                 static_cast<int>(DRing::Account::MessageStates::FAILURE));
                    save_();
                } else {
                    f->second.status = MessageStatus::IDLE;
                    JAMI_DBG() << "[message " << token << "] Status changed to IDLE";
                }
            } else {
               JAMI_DBG() << "[message " << token << "] State is not SENDING";
            }
        } else {
            JAMI_DBG() << "[message " << token << "] Can't find message";
        }
    }
    
    void
    MessageEngine::load()
    {
        try {
            Json::Value root;
            {
                std::lock_guard<std::mutex> lock(fileutils::getFileLock(savePath_));
                std::ifstream file;
                file.exceptions(std::ifstream::failbit | std::ifstream::badbit);
                fileutils::openStream(file, savePath_);
                file >> root;
            }
            std::lock_guard<std::mutex> lock(messagesMutex_);
            long unsigned loaded {0};
            for (auto i = root.begin(); i != root.end(); ++i) {
                auto to = i.key().asString();
                auto& pmessages = *i;
                auto& p = messages_[to];
                for (auto m = pmessages.begin(); m != pmessages.end(); ++m) {
                    const auto& jmsg = *m;
                    MessageToken token;
                    std::istringstream iss(m.key().asString());
                    iss >> std::hex >> token;
                    Message msg;
                    msg.status = (MessageStatus)jmsg["status"].asInt();
                    msg.to = jmsg["to"].asString();
                    auto wall_time = std::chrono::system_clock::from_time_t(jmsg["last_op"].asInt64());
                    msg.last_op = clock::now() + (wall_time - std::chrono::system_clock::now());
                    msg.retried = jmsg.get("retried", 0).asUInt();
                    const auto& pl = jmsg["payload"];
                    for (auto p = pl.begin(); p != pl.end(); ++p)
                        msg.payloads[p.key().asString()] = p->asString();
                    p.emplace(token, std::move(msg));
                    loaded++;
                }
            }
            JAMI_DBG("[Account %s] loaded %lu messages from %s", account_.getAccountID().c_str(), loaded, savePath_.c_str());
        } catch (const std::exception& e) {
            JAMI_DBG("[Account %s] couldn't load messages from %s: %s", account_.getAccountID().c_str(), savePath_.c_str(), e.what());
        }
    }
    
    void
    MessageEngine::save() const
    {
        std::lock_guard<std::mutex> lock(messagesMutex_);
        save_();
    }
    
    void
    MessageEngine::save_() const
    {
        try {
            Json::Value root(Json::objectValue);
            for (auto& c : messages_) {
                Json::Value peerRoot(Json::objectValue);
                for (auto& m : c.second) {
                    auto& v = m.second;
                    if (v.status == MessageStatus::FAILURE || v.status == MessageStatus::SENT || v.status == MessageStatus::CANCELLED)
                        continue;
                    Json::Value msg;
                    std::ostringstream msgsId;
                    msgsId << std::hex << m.first;
                    msg["status"] = (int)(v.status == MessageStatus::SENDING ? MessageStatus::IDLE : v.status);
                    msg["to"] = v.to;
                    auto wall_time = std::chrono::system_clock::now() + std::chrono::duration_cast<std::chrono::system_clock::duration>(v.last_op - clock::now());
                    msg["last_op"] = (Json::Value::Int64) std::chrono::system_clock::to_time_t(wall_time);
                    msg["retried"] = v.retried;
                    auto& payloads = msg["payload"];
                    for (const auto& p : v.payloads)
                        payloads[p.first] = p.second;
                    peerRoot[msgsId.str()] = std::move(msg);
                }
                root[c.first] = std::move(peerRoot);
            }
            // Save asynchronously
            dht::ThreadPool::computation().run([path = savePath_,
                                        root = std::move(root),
                                        accountID = account_.getAccountID(),
                                        messageNum = messages_.size()]
            {
                std::lock_guard<std::mutex> lock(fileutils::getFileLock(path));
                try {
                    Json::StreamWriterBuilder wbuilder;
                    wbuilder["commentStyle"] = "None";
                    wbuilder["indentation"] = "";
                    const std::unique_ptr<Json::StreamWriter> writer(wbuilder.newStreamWriter());
                    std::ofstream file;
                    file.exceptions(std::ifstream::failbit | std::ifstream::badbit);
                    fileutils::openStream(file, path, std::ios::trunc);
                    writer->write(root, &file);
                } catch (const std::exception& e) {
                    JAMI_ERR("[Account %s] Couldn't save messages to %s: %s", accountID.c_str(), path.c_str(), e.what());
                }
                JAMI_DBG("[Account %s] saved %zu messages to %s", accountID.c_str(), messageNum, path.c_str());
            });
        } catch (const std::exception& e) {
            JAMI_ERR("[Account %s] couldn't save messages to %s: %s", account_.getAccountID().c_str(), savePath_.c_str(), e.what());
        }
    }
    
    }}