message_engine.cpp 10.8 KB
Newer Older
Adrien Béraud's avatar
Adrien Béraud committed
1
/*
Sébastien Blin's avatar
Sébastien Blin committed
2
 *  Copyright (C) 2016-2019 Savoir-faire Linux Inc.
Adrien Béraud's avatar
Adrien Béraud committed
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
 *
 *  Author: Adrien Béraud <adrien.beraud@savoirfairelinux.com>
 *
 *  This program is free software; you can redistribute it and/or modify
 *  it under the terms of the GNU General Public License as published by
 *  the Free Software Foundation; either version 3 of the License, or
 *  (at your option) any later version.
 *
 *  This program is distributed in the hope that it will be useful,
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *  GNU General Public License for more details.
 *
 *  You should have received a copy of the GNU General Public License
 *  along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "message_engine.h"
#include "sip/sipaccountbase.h"
#include "manager.h"
23
#include "fileutils.h"
Adrien Béraud's avatar
Adrien Béraud committed
24 25 26 27

#include "client/ring_signal.h"
#include "dring/account_const.h"

Adrien Béraud's avatar
Adrien Béraud committed
28
#include <opendht/thread_pool.h>
Adrien Béraud's avatar
Adrien Béraud committed
29 30 31 32
#include <json/json.h>

#include <fstream>

Adrien Béraud's avatar
Adrien Béraud committed
33
namespace jami {
Adrien Béraud's avatar
Adrien Béraud committed
34 35 36 37 38 39 40 41 42 43 44 45 46
namespace im {

MessageEngine::MessageEngine(SIPAccountBase& acc, const std::string& path) : account_(acc), savePath_(path)
{}

MessageToken
MessageEngine::sendMessage(const std::string& to, const std::map<std::string, std::string>& payloads)
{
    if (payloads.empty() or to.empty())
        return 0;
    MessageToken token;
    {
        std::lock_guard<std::mutex> lock(messagesMutex_);
47
        auto& peerMessages = messages_[to];
Adrien Béraud's avatar
Adrien Béraud committed
48
        do {
49
            token = std::uniform_int_distribution<MessageToken>{1, DRING_ID_MAX_VAL}(account_.rand);
50 51
        } while (peerMessages.find(token) != peerMessages.end());
        auto m = peerMessages.emplace(token, Message{});
Adrien Béraud's avatar
Adrien Béraud committed
52 53
        m.first->second.to = to;
        m.first->second.payloads = payloads;
54
        save_();
Adrien Béraud's avatar
Adrien Béraud committed
55
    }
56 57
    runOnMainThread([this, to]() {
        retrySend(to);
Adrien Béraud's avatar
Adrien Béraud committed
58 59 60 61 62
    });
    return token;
}

void
63
MessageEngine::onPeerOnline(const std::string& peer)
Adrien Béraud's avatar
Adrien Béraud committed
64
{
65
    retrySend(peer);
Adrien Béraud's avatar
Adrien Béraud committed
66 67 68
}

void
69
MessageEngine::retrySend(const std::string& peer)
Adrien Béraud's avatar
Adrien Béraud committed
70
{
Adrien Béraud's avatar
Adrien Béraud committed
71 72 73 74 75 76 77 78
    struct PendingMsg {
        MessageToken token;
        std::string to;
        std::map<std::string, std::string> payloads;
    };
    std::vector<PendingMsg> pending {};
    {
        std::lock_guard<std::mutex> lock(messagesMutex_);
79 80 81 82 83
        auto p = messages_.find(peer);
        if (p == messages_.end())
            return;
        auto& messages = p->second;
        for (auto m = messages.begin(); m != messages.end(); ++m) {
Adrien Béraud's avatar
Adrien Béraud committed
84
            if (m->second.status == MessageStatus::UNKNOWN || m->second.status == MessageStatus::IDLE) {
85 86 87 88
                m->second.status = MessageStatus::SENDING;
                m->second.retried++;
                m->second.last_op = clock::now();
                pending.emplace_back(PendingMsg {m->first, m->second.to, m->second.payloads});
Adrien Béraud's avatar
Adrien Béraud committed
89
            }
Adrien Béraud's avatar
Adrien Béraud committed
90 91
        }
    }
Adrien Béraud's avatar
Adrien Béraud committed
92
    // avoid locking while calling callback
93
    for (const auto& p : pending) {
Adrien Béraud's avatar
Adrien Béraud committed
94
        JAMI_DBG() << "[message " << p.token << "] Retry sending";
Adrien Béraud's avatar
Adrien Béraud committed
95 96 97 98 99 100 101
        emitSignal<DRing::ConfigurationSignal::AccountMessageStatusChanged>(
            account_.getAccountID(),
            p.token,
            p.to,
            (int)DRing::Account::MessageStates::SENDING);
        account_.sendTextMessage(p.to, p.payloads, p.token);
    }
Adrien Béraud's avatar
Adrien Béraud committed
102 103 104 105 106 107
}

MessageStatus
MessageEngine::getStatus(MessageToken t) const
{
    std::lock_guard<std::mutex> lock(messagesMutex_);
108 109 110 111 112 113
    for (const auto& p : messages_) {
        const auto m = p.second.find(t);
        if (m != p.second.end())
            return m->second.status;
    }
    return MessageStatus::UNKNOWN;
Adrien Béraud's avatar
Adrien Béraud committed
114 115
}

Adrien Béraud's avatar
Adrien Béraud committed
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134
bool
MessageEngine::cancel(MessageToken t)
{
    std::lock_guard<std::mutex> lock(messagesMutex_);
    for (auto& p : messages_) {
        auto m = p.second.find(t);
        if (m != p.second.end()) {
            m->second.status = MessageStatus::CANCELLED;
            emitSignal<DRing::ConfigurationSignal::AccountMessageStatusChanged>(account_.getAccountID(),
                                                                            t,
                                                                            m->second.to,
                                                                            static_cast<int>(DRing::Account::MessageStates::CANCELLED));
            save_();
            return true;
        }
    }
    return false;
}

Adrien Béraud's avatar
Adrien Béraud committed
135
void
136
MessageEngine::onMessageSent(const std::string& peer, MessageToken token, bool ok)
Adrien Béraud's avatar
Adrien Béraud committed
137
{
Adrien Béraud's avatar
Adrien Béraud committed
138
    JAMI_DBG() << "[message " << token << "] Message sent: " << (ok ? "success" : "failure");
Adrien Béraud's avatar
Adrien Béraud committed
139
    std::lock_guard<std::mutex> lock(messagesMutex_);
140 141
    auto p = messages_.find(peer);
    if (p == messages_.end()) {
Adrien Béraud's avatar
Adrien Béraud committed
142
        JAMI_DBG() << "[message " << token << "] Can't find peer";
143 144 145 146
        return;
    }
    auto f = p->second.find(token);
    if (f != p->second.end()) {
Adrien Béraud's avatar
Adrien Béraud committed
147 148 149
        if (f->second.status == MessageStatus::SENDING) {
            if (ok) {
                f->second.status = MessageStatus::SENT;
Adrien Béraud's avatar
Adrien Béraud committed
150
                JAMI_DBG() << "[message " << token << "] Status changed to SENT";
Edric Milaret's avatar
Edric Milaret committed
151 152 153 154
                emitSignal<DRing::ConfigurationSignal::AccountMessageStatusChanged>(account_.getAccountID(),
                                                                             token,
                                                                             f->second.to,
                                                                             static_cast<int>(DRing::Account::MessageStates::SENT));
155
                save_();
156
            } else if (f->second.retried >= MAX_RETRIES) {
Adrien Béraud's avatar
Adrien Béraud committed
157
                f->second.status = MessageStatus::FAILURE;
Adrien Béraud's avatar
Adrien Béraud committed
158
                JAMI_DBG() << "[message " << token << "] Status changed to FAILURE";
Edric Milaret's avatar
Edric Milaret committed
159 160 161 162
                emitSignal<DRing::ConfigurationSignal::AccountMessageStatusChanged>(account_.getAccountID(),
                                                                             token,
                                                                             f->second.to,
                                                                             static_cast<int>(DRing::Account::MessageStates::FAILURE));
163
                save_();
Adrien Béraud's avatar
Adrien Béraud committed
164 165
            } else {
                f->second.status = MessageStatus::IDLE;
Adrien Béraud's avatar
Adrien Béraud committed
166
                JAMI_DBG() << "[message " << token << "] Status changed to IDLE";
Adrien Béraud's avatar
Adrien Béraud committed
167
            }
168
        } else {
Adrien Béraud's avatar
Adrien Béraud committed
169
           JAMI_DBG() << "[message " << token << "] State is not SENDING";
Adrien Béraud's avatar
Adrien Béraud committed
170
        }
171
    } else {
Adrien Béraud's avatar
Adrien Béraud committed
172
        JAMI_DBG() << "[message " << token << "] Can't find message";
Adrien Béraud's avatar
Adrien Béraud committed
173 174 175 176 177 178 179 180
    }
}

void
MessageEngine::load()
{
    try {
        Json::Value root;
181 182 183 184 185 186 187 188
        {
            std::lock_guard<std::mutex> lock(fileutils::getFileLock(savePath_));
            std::ifstream file;
            file.exceptions(std::ifstream::failbit | std::ifstream::badbit);
            file.open(savePath_);
            file >> root;
        }
        std::lock_guard<std::mutex> lock(messagesMutex_);
189
        long unsigned loaded {0};
Adrien Béraud's avatar
Adrien Béraud committed
190
        for (auto i = root.begin(); i != root.end(); ++i) {
191 192 193 194 195 196
            auto to = i.key().asString();
            auto& pmessages = *i;
            auto& p = messages_[to];
            for (auto m = pmessages.begin(); m != pmessages.end(); ++m) {
                const auto& jmsg = *m;
                MessageToken token;
Kateryna Kostiuk's avatar
Kateryna Kostiuk committed
197
                std::istringstream iss(m.key().asString());
198 199 200 201 202 203 204 205 206 207 208 209 210
                iss >> std::hex >> token;
                Message msg;
                msg.status = (MessageStatus)jmsg["status"].asInt();
                msg.to = jmsg["to"].asString();
                auto wall_time = std::chrono::system_clock::from_time_t(jmsg["last_op"].asInt64());
                msg.last_op = clock::now() + (wall_time - std::chrono::system_clock::now());
                msg.retried = jmsg.get("retried", 0).asUInt();
                const auto& pl = jmsg["payload"];
                for (auto p = pl.begin(); p != pl.end(); ++p)
                    msg.payloads[p.key().asString()] = p->asString();
                p.emplace(token, std::move(msg));
                loaded++;
            }
Adrien Béraud's avatar
Adrien Béraud committed
211
        }
Adrien Béraud's avatar
Adrien Béraud committed
212
        JAMI_DBG("[Account %s] loaded %lu messages from %s", account_.getAccountID().c_str(), loaded, savePath_.c_str());
Adrien Béraud's avatar
Adrien Béraud committed
213
    } catch (const std::exception& e) {
Adrien Béraud's avatar
Adrien Béraud committed
214
        JAMI_ERR("[Account %s] couldn't load messages from %s: %s", account_.getAccountID().c_str(), savePath_.c_str(), e.what());
Adrien Béraud's avatar
Adrien Béraud committed
215 216 217 218 219
    }
}

void
MessageEngine::save() const
220 221 222 223 224 225 226
{
    std::lock_guard<std::mutex> lock(messagesMutex_);
    save_();
}

void
MessageEngine::save_() const
Adrien Béraud's avatar
Adrien Béraud committed
227 228 229 230
{
    try {
        Json::Value root(Json::objectValue);
        for (auto& c : messages_) {
231 232 233
            Json::Value peerRoot(Json::objectValue);
            for (auto& m : c.second) {
                auto& v = m.second;
Adrien Béraud's avatar
Adrien Béraud committed
234
                if (v.status == MessageStatus::FAILURE || v.status == MessageStatus::SENT || v.status == MessageStatus::CANCELLED)
235 236 237
                    continue;
                Json::Value msg;
                std::ostringstream msgsId;
Kateryna Kostiuk's avatar
Kateryna Kostiuk committed
238
                msgsId << std::hex << m.first;
239 240 241 242 243 244 245 246 247 248 249
                msg["status"] = (int)(v.status == MessageStatus::SENDING ? MessageStatus::IDLE : v.status);
                msg["to"] = v.to;
                auto wall_time = std::chrono::system_clock::now() + std::chrono::duration_cast<std::chrono::system_clock::duration>(v.last_op - clock::now());
                msg["last_op"] = (Json::Value::Int64) std::chrono::system_clock::to_time_t(wall_time);
                msg["retried"] = v.retried;
                auto& payloads = msg["payload"];
                for (const auto& p : v.payloads)
                    payloads[p.first] = p.second;
                peerRoot[msgsId.str()] = std::move(msg);
            }
            root[c.first] = std::move(peerRoot);
Adrien Béraud's avatar
Adrien Béraud committed
250
        }
251
        // Save asynchronously
Adrien Béraud's avatar
Adrien Béraud committed
252
        dht::ThreadPool::computation().run([path = savePath_,
253 254 255 256 257 258 259 260 261 262 263 264 265 266 267
                                    root = std::move(root),
                                    accountID = account_.getAccountID(),
                                    messageNum = messages_.size()]
        {
            std::lock_guard<std::mutex> lock(fileutils::getFileLock(path));
            try {
                Json::StreamWriterBuilder wbuilder;
                wbuilder["commentStyle"] = "None";
                wbuilder["indentation"] = "";
                const std::unique_ptr<Json::StreamWriter> writer(wbuilder.newStreamWriter());
                std::ofstream file;
                file.exceptions(std::ifstream::failbit | std::ifstream::badbit);
                file.open(path, std::ios::trunc);
                writer->write(root, &file);
            } catch (const std::exception& e) {
Adrien Béraud's avatar
Adrien Béraud committed
268
                JAMI_ERR("[Account %s] Couldn't save messages to %s: %s", accountID.c_str(), path.c_str(), e.what());
269
            }
Adrien Béraud's avatar
Adrien Béraud committed
270
            JAMI_DBG("[Account %s] saved %zu messages to %s", accountID.c_str(), messageNum, path.c_str());
271
        });
Adrien Béraud's avatar
Adrien Béraud committed
272
    } catch (const std::exception& e) {
Adrien Béraud's avatar
Adrien Béraud committed
273
        JAMI_ERR("[Account %s] couldn't save messages to %s: %s", account_.getAccountID().c_str(), savePath_.c_str(), e.what());
Adrien Béraud's avatar
Adrien Béraud committed
274 275 276 277
    }
}

}}