Skip to content
Snippets Groups Projects
Select Git revision
  • master default protected
  • release/202005
  • release/202001
  • release/201912
  • release/201911
  • release/releaseWindowsTestOne
  • release/windowsReleaseTest
  • release/releaseTest
  • release/releaseWindowsTest
  • release/201910
  • release/qt/201910
  • release/windows-test/201910
  • release/201908
  • release/201906
  • release/201905
  • release/201904
  • release/201903
  • release/201902
  • release/201901
  • release/201812
  • 4.0.0
  • 2.2.0
  • 2.1.0
  • 2.0.1
  • 2.0.0
  • 1.4.1
  • 1.4.0
  • 1.3.0
  • 1.2.0
  • 1.1.0
30 results

conversation.cpp

  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    conversation.cpp 99.61 KiB
    /*
     *  Copyright (C) 2014-2024 Savoir-faire Linux Inc.
     *
     *  Author: Adrien Béraud <adrien.beraud@savoirfairelinux.com>
     *  Author: Sébastien Blin <sebastien.blin@savoirfairelinux.com>
     *
     *  This program is free software; you can redistribute it and/or modify
     *  it under the terms of the GNU General Public License as published by
     *  the Free Software Foundation; either version 3 of the License, or
     *  (at your option) any later version.
     *
     *  This program is distributed in the hope that it will be useful,
     *  but WITHOUT ANY WARRANTY; without even the implied warranty of
     *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
     *  GNU General Public License for more details.
     *
     *  You should have received a copy of the GNU General Public License
     *  along with this program. If not, see <https://www.gnu.org/licenses/>.
     */
    #include "conversation.h"
    
    #include "account_const.h"
    #include "fileutils.h"
    #include "jamiaccount.h"
    #include "client/ring_signal.h"
    
    #include <charconv>
    #include <json/json.h>
    #include <string_view>
    #include <opendht/thread_pool.h>
    #include <tuple>
    #include <optional>
    #include "swarm/swarm_manager.h"
    #ifdef ENABLE_PLUGIN
    #include "manager.h"
    #include "plugin/jamipluginmanager.h"
    #include "plugin/streamdata.h"
    #endif
    #include "jami/conversation_interface.h"
    
    namespace jami {
    
    static const char* const LAST_MODIFIED = "lastModified";
    static const auto jsonBuilder = [] {
        Json::StreamWriterBuilder wbuilder;
        wbuilder["commentStyle"] = "None";
        wbuilder["indentation"] = "";
        return wbuilder;
    }();
    
    ConvInfo::ConvInfo(const Json::Value& json)
    {
        id = json[ConversationMapKeys::ID].asString();
        created = json[ConversationMapKeys::CREATED].asLargestUInt();
        removed = json[ConversationMapKeys::REMOVED].asLargestUInt();
        erased = json[ConversationMapKeys::ERASED].asLargestUInt();
        for (const auto& v : json[ConversationMapKeys::MEMBERS]) {
            members.emplace(v["uri"].asString());
        }
        lastDisplayed = json[ConversationMapKeys::LAST_DISPLAYED].asString();
    }
    
    Json::Value
    ConvInfo::toJson() const
    {
        Json::Value json;
        json[ConversationMapKeys::ID] = id;
        json[ConversationMapKeys::CREATED] = Json::Int64(created);
        if (removed) {
            json[ConversationMapKeys::REMOVED] = Json::Int64(removed);
        }
        if (erased) {
            json[ConversationMapKeys::ERASED] = Json::Int64(erased);
        }
        for (const auto& m : members) {
            Json::Value member;
            member["uri"] = m;
            json[ConversationMapKeys::MEMBERS].append(member);
        }
        json[ConversationMapKeys::LAST_DISPLAYED] = lastDisplayed;
        return json;
    }
    
    // ConversationRequest
    ConversationRequest::ConversationRequest(const Json::Value& json)
    {
        received = json[ConversationMapKeys::RECEIVED].asLargestUInt();
        declined = json[ConversationMapKeys::DECLINED].asLargestUInt();
        from = json[ConversationMapKeys::FROM].asString();
        conversationId = json[ConversationMapKeys::CONVERSATIONID].asString();
        auto& md = json[ConversationMapKeys::METADATAS];
        for (const auto& member : md.getMemberNames()) {
            metadatas.emplace(member, md[member].asString());
        }
    }
    
    Json::Value
    ConversationRequest::toJson() const
    {
        Json::Value json;
        json[ConversationMapKeys::CONVERSATIONID] = conversationId;
        json[ConversationMapKeys::FROM] = from;
        json[ConversationMapKeys::RECEIVED] = static_cast<uint32_t>(received);
        if (declined)
            json[ConversationMapKeys::DECLINED] = static_cast<uint32_t>(declined);
        for (const auto& [key, value] : metadatas) {
            json[ConversationMapKeys::METADATAS][key] = value;
        }
        return json;
    }
    
    std::map<std::string, std::string>
    ConversationRequest::toMap() const
    {
        auto result = metadatas;
        result[ConversationMapKeys::ID] = conversationId;
        result[ConversationMapKeys::FROM] = from;
        if (declined)
            result[ConversationMapKeys::DECLINED] = std::to_string(declined);
        result[ConversationMapKeys::RECEIVED] = std::to_string(received);
        return result;
    }
    
    using MessageList = std::list<std::shared_ptr<libjami::SwarmMessage>>;
    
    struct History
    {
        MessageList messageList {};
        std::map<std::string, std::shared_ptr<libjami::SwarmMessage>> quickAccess {};
        std::map<std::string, std::list<std::shared_ptr<libjami::SwarmMessage>>> pendingEditions {};
        std::map<std::string, std::list<std::map<std::string, std::string>>> pendingReactions {};
    };
    
    class Conversation::Impl
    {
    public:
        Impl(const std::shared_ptr<JamiAccount>& account,
             ConversationMode mode,
             const std::string& otherMember = "")
            : repository_(ConversationRepository::createConversation(account, mode, otherMember))
            , account_(account)
        {
            if (!repository_) {
                throw std::logic_error("Couldn't create repository");
            }
            init();
        }
    
        Impl(const std::shared_ptr<JamiAccount>& account, const std::string& conversationId)
            : account_(account)
        {
            repository_ = std::make_unique<ConversationRepository>(account, conversationId);
            if (!repository_) {
                throw std::logic_error("Couldn't create repository");
            }
            init();
        }
    
        Impl(const std::shared_ptr<JamiAccount>& account,
             const std::string& remoteDevice,
             const std::string& conversationId)
            : account_(account)
        {
            std::vector<ConversationCommit> commits;
            repository_ = ConversationRepository::cloneConversation(account,
                                                                    remoteDevice,
                                                                    conversationId,
                                                                    [&](auto c) {
                                                                        commits = std::move(c);
                                                                    });
            if (!repository_) {
                emitSignal<libjami::ConversationSignal::OnConversationError>(
                    account->getAccountID(), conversationId, EFETCH, "Couldn't clone repository");
                throw std::logic_error("Couldn't clone repository");
            }
            // To detect current active calls, we need to check history
            conversationDataPath_ = fileutils::get_data_dir() / account->getAccountID()
                                    / "conversation_data" / conversationId;
            activeCallsPath_ = conversationDataPath_ / ConversationMapKeys::ACTIVE_CALLS;
            for (const auto& c : repository_->convCommitsToMap(commits))
                updateActiveCalls(c);
            init();
        }
    
        void init()
        {
            if (auto shared = account_.lock()) {
                ioContext_ = Manager::instance().ioContext();
                fallbackTimer_ = std::make_unique<asio::steady_timer>(*ioContext_);
                swarmManager_
                    = std::make_shared<SwarmManager>(NodeId(shared->currentDeviceId()),
                                                     Manager::instance().getSeededRandomEngine(),
                                                     [account = account_](const DeviceId& deviceId) {
                                                         if (auto acc = account.lock()) {
                                                             return acc->isConnectedWith(deviceId);
                                                         }
                                                         return false;
                                                     });
                swarmManager_->setMobility(shared->isMobile());
                accountId_ = shared->getAccountID();
                transferManager_
                    = std::make_shared<TransferManager>(shared->getAccountID(),
                                                        "",
                                                        repository_->id(),
                                                        Manager::instance().getSeededRandomEngine());
                conversationDataPath_ = fileutils::get_data_dir() / shared->getAccountID()
                                        / "conversation_data" / repository_->id();
                fetchedPath_ = conversationDataPath_ / "fetched";
                statusPath_ = conversationDataPath_ / "status";
                sendingPath_ = conversationDataPath_ / "sending";
                preferencesPath_ = conversationDataPath_ / ConversationMapKeys::PREFERENCES;
                activeCallsPath_ = conversationDataPath_ / ConversationMapKeys::ACTIVE_CALLS;
                hostedCallsPath_ = conversationDataPath_ / ConversationMapKeys::HOSTED_CALLS;
                loadActiveCalls();
                loadStatus();
                typers_ = std::make_shared<Typers>(shared, repository_->id());
            }
        }
    
        const std::string& toString() const
        {
            if (fmtStr_.empty()) {
                if (repository_->mode() == ConversationMode::ONE_TO_ONE) {
                    if (auto acc = account_.lock()) {
                        auto peer = acc->getUsername();
                        for (const auto& member : repository_->getInitialMembers()) {
                            if (member != acc->getUsername()) {
                                peer = member;
                            }
                        }
                        fmtStr_ = fmt::format("[Conversation (1:1) {}]", peer);
                    }
                } else {
                    fmtStr_ = fmt::format("[Conversation {}]", repository_->id());
                }
            }
            return fmtStr_;
        }
        mutable std::string fmtStr_;
    
        ~Impl()
        {
            try {
                if (fallbackTimer_)
                    fallbackTimer_->cancel();
            } catch (const std::exception& e) {
                JAMI_ERROR("[Conversation {:s}] {:s}", toString(), e.what());
            }
        }
    
        /**
         * If, for whatever reason, the daemon is stopped while hosting a conference,
         * we need to announce the end of this call when restarting.
         * To avoid to keep active calls forever.
         */
        std::vector<std::string> commitsEndedCalls();
        bool isAdmin() const;
        std::filesystem::path repoPath() const;
    
        void announce(const std::string& commitId, bool commitFromSelf = false) const
        {
            std::vector<std::string> vec;
            if (!commitId.empty())
                vec.emplace_back(commitId);
            announce(vec, commitFromSelf);
        }
    
        void announce(const std::vector<std::string>& commits, bool commitFromSelf = false) const
        {
            std::vector<ConversationCommit> convcommits;
            convcommits.reserve(commits.size());
            for (const auto& cid : commits) {
                auto commit = repository_->getCommit(cid);
                if (commit != std::nullopt) {
                    convcommits.emplace_back(*commit);
                }
            }
            announce(repository_->convCommitsToMap(convcommits), commitFromSelf);
        }
    
        /**
         * Update activeCalls_ via announced commits (in load or via new commits)
         * @param commit        Commit to check
         * @param eraseOnly     If we want to ignore added commits
         * @param emitSig    If we want to emit to client
         * @note eraseOnly is used by loadMessages. This is a fail-safe, this SHOULD NOT happen
         */
        void updateActiveCalls(const std::map<std::string, std::string>& commit,
                               bool eraseOnly = false,
                               bool emitSig = true) const
        {
            if (!repository_)
                return;
            if (commit.at("type") == "member") {
                // In this case, we need to check if we are not removing a hosting member or device
                std::lock_guard lk(activeCallsMtx_);
                auto it = activeCalls_.begin();
                auto updateActives = false;
                while (it != activeCalls_.end()) {
                    if (it->at("uri") == commit.at("uri") || it->at("device") == commit.at("uri")) {
                        JAMI_DEBUG("Removing {:s} from the active calls, because {:s} left",
                                   it->at("id"),
                                   commit.at("uri"));
                        it = activeCalls_.erase(it);
                        updateActives = true;
                    } else {
                        ++it;
                    }
                }
                if (updateActives) {
                    saveActiveCalls();
                    if (emitSig)
                        emitSignal<libjami::ConfigurationSignal::ActiveCallsChanged>(accountId_,
                                                                                     repository_->id(),
                                                                                     activeCalls_);
                }
                return;
            }
            // Else, it's a call information
            if (commit.find("confId") != commit.end() && commit.find("uri") != commit.end()
                && commit.find("device") != commit.end()) {
                auto convId = repository_->id();
                auto confId = commit.at("confId");
                auto uri = commit.at("uri");
                auto device = commit.at("device");
                std::lock_guard lk(activeCallsMtx_);
                auto itActive = std::find_if(activeCalls_.begin(),
                                             activeCalls_.end(),
                                             [&](const auto& value) {
                                                 return value.at("id") == confId
                                                        && value.at("uri") == uri
                                                        && value.at("device") == device;
                                             });
                if (commit.find("duration") == commit.end()) {
                    if (itActive == activeCalls_.end() && !eraseOnly) {
                        JAMI_DEBUG(
                            "swarm:{:s} new current call detected: {:s} on device {:s}, account {:s}",
                            convId,
                            confId,
                            device,
                            uri);
                        std::map<std::string, std::string> activeCall;
                        activeCall["id"] = confId;
                        activeCall["uri"] = uri;
                        activeCall["device"] = device;
                        activeCalls_.emplace_back(activeCall);
                        saveActiveCalls();
                        if (emitSig)
                            emitSignal<libjami::ConfigurationSignal::ActiveCallsChanged>(accountId_,
                                                                                         repository_
                                                                                             ->id(),
                                                                                         activeCalls_);
                    }
                } else {
                    if (itActive != activeCalls_.end()) {
                        itActive = activeCalls_.erase(itActive);
                        // Unlikely, but we must ensure that no duplicate exists
                        while (itActive != activeCalls_.end()) {
                            itActive = std::find_if(itActive, activeCalls_.end(), [&](const auto& value) {
                                return value.at("id") == confId && value.at("uri") == uri
                                       && value.at("device") == device;
                            });
                            if (itActive != activeCalls_.end()) {
                                JAMI_ERROR("Duplicate call found. (This is a bug)");
                                itActive = activeCalls_.erase(itActive);
                            }
                        }
    
                        if (eraseOnly) {
                            JAMI_WARNING("previous swarm:{:s} call finished detected: {:s} on device "
                                         "{:s}, account {:s}",
                                         convId,
                                         confId,
                                         device,
                                         uri);
                        } else {
                            JAMI_DEBUG("swarm:{:s} call finished: {:s} on device {:s}, account {:s}",
                                       convId,
                                       confId,
                                       device,
                                       uri);
                        }
                    }
                    saveActiveCalls();
                    if (emitSig)
                        emitSignal<libjami::ConfigurationSignal::ActiveCallsChanged>(accountId_,
                                                                                     repository_->id(),
                                                                                     activeCalls_);
                }
            }
        }
    
        void announce(const std::vector<std::map<std::string, std::string>>& commits, bool commitFromSelf = false) const
        {
            auto shared = account_.lock();
            if (!shared or !repository_)
                return;
            auto convId = repository_->id();
            auto ok = !commits.empty();
            auto lastId = ok ? commits.rbegin()->at(ConversationMapKeys::ID) : "";
            addToHistory(commits, true, commitFromSelf);
            if (ok) {
                bool announceMember = false;
                for (const auto& c : commits) {
                    // Announce member events
                    if (c.at("type") == "member") {
                        if (c.find("uri") != c.end() && c.find("action") != c.end()) {
                            const auto& uri = c.at("uri");
                            const auto& actionStr = c.at("action");
                            auto action = -1;
                            if (actionStr == "add")
                                action = 0;
                            else if (actionStr == "join")
                                action = 1;
                            else if (actionStr == "remove")
                                action = 2;
                            else if (actionStr == "ban")
                                action = 3;
                            else if (actionStr == "unban")
                                action = 4;
                            if (actionStr == "ban" || actionStr == "remove") {
                                // In this case, a potential host was removed during a call.
                                updateActiveCalls(c);
                                typers_->removeTyper(uri);
                            }
                            if (action != -1) {
                                announceMember = true;
                                emitSignal<libjami::ConversationSignal::ConversationMemberEvent>(
                                    accountId_, convId, uri, action);
                            }
                        }
                    } else if (c.at("type") == "application/call-history+json") {
                        updateActiveCalls(c);
                    }
    #ifdef ENABLE_PLUGIN
                    auto& pluginChatManager
                        = Manager::instance().getJamiPluginManager().getChatServicesManager();
                    if (pluginChatManager.hasHandlers()) {
                        auto cm = std::make_shared<JamiMessage>(accountId_,
                                                                convId,
                                                                c.at("author") != shared->getUsername(),
                                                                c,
                                                                false);
                        cm->isSwarm = true;
                        pluginChatManager.publishMessage(std::move(cm));
                    }
    #endif
                    // announce message
                    emitSignal<libjami::ConversationSignal::MessageReceived>(accountId_, convId, c);
                }
    
                if (announceMember && onMembersChanged_) {
                    onMembersChanged_(repository_->memberUris("", {}));
                }
            }
        }
    
        void loadStatus()
        {
            try {
                // read file
                auto file = fileutils::loadFile(statusPath_);
                // load values
                msgpack::object_handle oh = msgpack::unpack((const char*) file.data(), file.size());
                std::lock_guard lk {messageStatusMtx_};
                oh.get().convert(messagesStatus_);
            } catch (const std::exception& e) {
            }
        }
        void saveStatus()
        {
            std::ofstream file(statusPath_, std::ios::trunc | std::ios::binary);
            msgpack::pack(file, messagesStatus_);
        }
    
        void loadActiveCalls() const
        {
            try {
                // read file
                auto file = fileutils::loadFile(activeCallsPath_);
                // load values
                msgpack::object_handle oh = msgpack::unpack((const char*) file.data(), file.size());
                std::lock_guard lk {activeCallsMtx_};
                oh.get().convert(activeCalls_);
            } catch (const std::exception& e) {
                return;
            }
        }
    
        void saveActiveCalls() const
        {
            std::ofstream file(activeCallsPath_, std::ios::trunc | std::ios::binary);
            msgpack::pack(file, activeCalls_);
        }
    
        void loadHostedCalls() const
        {
            try {
                // read file
                auto file = fileutils::loadFile(hostedCallsPath_);
                // load values
                msgpack::object_handle oh = msgpack::unpack((const char*) file.data(), file.size());
                std::lock_guard lk {activeCallsMtx_};
                oh.get().convert(hostedCalls_);
            } catch (const std::exception& e) {
                return;
            }
        }
    
        void saveHostedCalls() const
        {
            std::ofstream file(hostedCallsPath_, std::ios::trunc | std::ios::binary);
            msgpack::pack(file, hostedCalls_);
        }
    
        void voteUnban(const std::string& contactUri, const std::string_view type, const OnDoneCb& cb);
    
        std::vector<std::map<std::string, std::string>> getMembers(bool includeInvited,
                                                                   bool includeLeft,
                                                                   bool includeBanned) const;
    
        std::string_view bannedType(const std::string& uri) const
        {
            auto crt = fmt::format("{}.crt", uri);
            auto bannedMember = repoPath() / "banned" / "members" / crt;
            if (std::filesystem::is_regular_file(bannedMember))
                return "members"sv;
            auto bannedAdmin = repoPath() / "banned" / "admins" / crt;
            if (std::filesystem::is_regular_file(bannedAdmin))
                return "admins"sv;
            auto bannedInvited = repoPath() / "banned" / "invited" / uri;
            if (std::filesystem::is_regular_file(bannedInvited))
                return "invited"sv;
            auto bannedDevice = repoPath() / "banned" / "devices" / crt;
            if (std::filesystem::is_regular_file(bannedDevice))
                return "devices"sv;
            return {};
        }
    
        std::shared_ptr<dhtnet::ChannelSocket> gitSocket(const DeviceId& deviceId) const
        {
            auto deviceSockets = gitSocketList_.find(deviceId);
            return (deviceSockets != gitSocketList_.end()) ? deviceSockets->second : nullptr;
        }
    
        void addGitSocket(const DeviceId& deviceId, const std::shared_ptr<dhtnet::ChannelSocket>& socket)
        {
            gitSocketList_[deviceId] = socket;
        }
        void removeGitSocket(const DeviceId& deviceId)
        {
            auto deviceSockets = gitSocketList_.find(deviceId);
            if (deviceSockets != gitSocketList_.end())
                gitSocketList_.erase(deviceSockets);
        }
    
        std::vector<std::map<std::string, std::string>> getMembers(bool includeInvited,
                                                                   bool includeLeft) const;
    
        std::set<std::string> checkedMembers_; // Store members we tried
        std::function<void()> bootstrapCb_;
    #ifdef LIBJAMI_TESTABLE
        std::function<void(std::string, BootstrapStatus)> bootstrapCbTest_;
    #endif
    
        std::mutex writeMtx_ {};
        std::unique_ptr<ConversationRepository> repository_;
        std::shared_ptr<SwarmManager> swarmManager_;
        std::weak_ptr<JamiAccount> account_;
        std::atomic_bool isRemoving_ {false};
        std::vector<std::map<std::string, std::string>> loadMessages(const LogOptions& options);
        std::vector<libjami::SwarmMessage> loadMessages2(const LogOptions& options,
                                                         History* optHistory = nullptr);
        void pull();
        std::vector<std::map<std::string, std::string>> mergeHistory(const std::string& uri);
    
        // Avoid multiple fetch/merges at the same time.
        std::mutex pullcbsMtx_ {};
        std::set<std::string> fetchingRemotes_ {}; // store current remote in fetch
        std::deque<std::tuple<std::string, std::string, OnPullCb>> pullcbs_ {};
        std::shared_ptr<TransferManager> transferManager_ {};
        std::filesystem::path conversationDataPath_ {};
        std::filesystem::path fetchedPath_ {};
    
        // Manage last message displayed and status
        std::filesystem::path sendingPath_ {};
        // Manage last message displayed
        std::string accountId_ {};
        std::filesystem::path preferencesPath_ {};
        OnMembersChanged onMembersChanged_ {};
    
        // Manage hosted calls on this device
        std::filesystem::path hostedCallsPath_ {};
        mutable std::map<std::string, uint64_t /* start time */> hostedCalls_ {};
        // Manage active calls for this conversation (can be hosted by other devices)
        std::filesystem::path activeCallsPath_ {};
        mutable std::mutex activeCallsMtx_ {};
        mutable std::vector<std::map<std::string, std::string>> activeCalls_ {};
    
        GitSocketList gitSocketList_ {};
    
        // Bootstrap
        std::shared_ptr<asio::io_context> ioContext_;
        std::unique_ptr<asio::steady_timer> fallbackTimer_;
    
        bool isMobile {false};
    
        /**
         * Loaded history represents the linearized history to show for clients
         */
        mutable History loadedHistory_ {};
        std::vector<std::shared_ptr<libjami::SwarmMessage>> addToHistory(
            const std::vector<std::map<std::string, std::string>>& commits,
            bool messageReceived = false,
            bool commitFromSelf = false,
            History* history = nullptr) const;
        // While loading the history, we need to avoid:
        // - reloading history (can just be ignored)
        // - adding new commits (should wait for history to be loaded)
        bool isLoadingHistory_ {false};
        mutable std::mutex historyMtx_ {};
        mutable std::condition_variable historyCv_ {};
    
        void handleReaction(History& history,
                            const std::shared_ptr<libjami::SwarmMessage>& sharedCommit) const;
        void handleEdition(History& history,
                           const std::shared_ptr<libjami::SwarmMessage>& sharedCommit,
                           bool messageReceived) const;
        bool handleMessage(History& history,
                           const std::shared_ptr<libjami::SwarmMessage>& sharedCommit,
                           bool messageReceived) const;
    
        /**
         * {uri, {
         *          {"fetch", "commitId"},
         *          {"fetched_ts", "timestamp"},
         *          {"read", "commitId"},
         *          {"read_ts", "timestamp"}
         *       }
         * }
         */
        mutable std::mutex messageStatusMtx_;
        std::function<void(const std::map<std::string, std::map<std::string, std::string>>&)> messageStatusCb_ {};
        std::filesystem::path statusPath_ {};
        mutable std::map<std::string, std::map<std::string, std::string>> messagesStatus_ {};
        /**
         * Status: 0 = commited, 1 = fetched, 2 = read
         * This cache the curent status to add in the messages
         */
        // Note: only store int32_t cause it's easy to pass to dbus this way
        // memberToStatus serves as a cache for loading messages
        mutable std::map<std::string, int32_t> memberToStatus;
        // futureStatus is used to store the status for receiving messages
        // (because we're not sure to fetch the commit before receiving a status change for this)
        mutable std::map<std::string, std::map<std::string, int32_t>> futureStatus;
        // Update internal structures regarding status
        void updateStatus(const std::string& uri,
                          libjami::Account::MessageStates status,
                          const std::string& commitId,
                          const std::string& ts,
                          bool emit = false);
    
    
        std::shared_ptr<Typers> typers_;
    };
    
    bool
    Conversation::Impl::isAdmin() const
    {
        auto shared = account_.lock();
        if (!shared)
            return false;
    
        auto adminsPath = repoPath() / "admins";
        auto cert = shared->identity().second;
        if (!cert->issuer)
            return false;
        auto uri = cert->issuer->getId().toString();
        return std::filesystem::is_regular_file(fileutils::getFullPath(adminsPath, uri + ".crt"));
    }
    
    std::vector<std::map<std::string, std::string>>
    Conversation::Impl::getMembers(bool includeInvited, bool includeLeft, bool includeBanned) const
    {
        std::vector<std::map<std::string, std::string>> result;
        auto members = repository_->members();
        std::lock_guard lk(messageStatusMtx_);
        for (const auto& member : members) {
            if (member.role == MemberRole::BANNED && !includeBanned) {
                continue;
            }
            if (member.role == MemberRole::INVITED && !includeInvited)
                continue;
            if (member.role == MemberRole::LEFT && !includeLeft)
                continue;
            auto mm = member.map();
            mm[ConversationMapKeys::LAST_DISPLAYED] = messagesStatus_[member.uri]["read"];
            result.emplace_back(std::move(mm));
        }
        return result;
    }
    
    std::vector<std::string>
    Conversation::Impl::commitsEndedCalls()
    {
        auto shared = account_.lock();
        // Handle current calls
        std::vector<std::string> commits {};
        std::unique_lock lk(writeMtx_);
        std::unique_lock lkA(activeCallsMtx_);
        for (const auto& hostedCall : hostedCalls_) {
            // In this case, this means that we left
            // the conference while still hosting it, so activeCalls
            // will not be correctly updated
            // We don't need to send notifications there, as peers will sync with presence
            Json::Value value;
            auto uri = shared->getUsername();
            auto device = std::string(shared->currentDeviceId());
            value["uri"] = uri;
            value["device"] = device;
            value["confId"] = hostedCall.first;
            value["type"] = "application/call-history+json";
            auto now = std::chrono::system_clock::now();
            auto nowConverted = std::chrono::duration_cast<std::chrono::seconds>(now.time_since_epoch())
                                    .count();
            value["duration"] = std::to_string((nowConverted - hostedCall.second) * 1000);
            auto itActive = std::find_if(activeCalls_.begin(),
                                         activeCalls_.end(),
                                         [confId = hostedCall.first, uri, device](auto value) {
                                             return value.at("id") == confId && value.at("uri") == uri
                                                    && value.at("device") == device;
                                         });
            if (itActive != activeCalls_.end())
                activeCalls_.erase(itActive);
            commits.emplace_back(repository_->commitMessage(Json::writeString(jsonBuilder, value)));
    
            JAMI_DEBUG("Removing hosted conference... {:s}", hostedCall.first);
        }
        hostedCalls_.clear();
        saveActiveCalls();
        saveHostedCalls();
        return commits;
    }
    
    std::filesystem::path
    Conversation::Impl::repoPath() const
    {
        return fileutils::get_data_dir() / accountId_ / "conversations" / repository_->id();
    }
    
    std::vector<std::map<std::string, std::string>>
    Conversation::Impl::loadMessages(const LogOptions& options)
    {
        if (!repository_)
            return {};
        std::vector<ConversationCommit> commits;
        auto startLogging = options.from == "";
        auto breakLogging = false;
        repository_->log(
            [&](const auto& id, const auto& author, const auto& commit) {
                if (!commits.empty()) {
                    // Set linearized parent
                    commits.rbegin()->linearized_parent = id;
                }
                if (options.skipMerge && git_commit_parentcount(commit.get()) > 1) {
                    return CallbackResult::Skip;
                }
                if ((options.nbOfCommits != 0 && commits.size() == options.nbOfCommits))
                    return CallbackResult::Break; // Stop logging
                if (breakLogging)
                    return CallbackResult::Break; // Stop logging
                if (id == options.to) {
                    if (options.includeTo)
                        breakLogging = true; // For the next commit
                    else
                        return CallbackResult::Break; // Stop logging
                }
    
                if (!startLogging && options.from != "" && options.from == id)
                    startLogging = true;
                if (!startLogging)
                    return CallbackResult::Skip; // Start logging after this one
    
                if (options.fastLog) {
                    if (options.authorUri != "") {
                        if (options.authorUri == repository_->uriFromDevice(author.email)) {
                            return CallbackResult::Break; // Found author, stop
                        }
                    }
                    // Used to only count commit
                    commits.emplace(commits.end(), ConversationCommit {});
                    return CallbackResult::Skip;
                }
    
                return CallbackResult::Ok; // Continue
            },
            [&](auto&& cc) { commits.emplace(commits.end(), std::forward<decltype(cc)>(cc)); },
            [](auto, auto, auto) { return false; },
            options.from,
            options.logIfNotFound);
        return repository_->convCommitsToMap(commits);
    }
    
    std::vector<libjami::SwarmMessage>
    Conversation::Impl::loadMessages2(const LogOptions& options, History* optHistory)
    {
        if (!optHistory) {
            std::lock_guard lock(historyMtx_);
            if (!repository_ || isLoadingHistory_)
                return {};
            isLoadingHistory_ = true;
        }
    
        auto startLogging = options.from == "";
        auto breakLogging = false;
        auto currentHistorySize = loadedHistory_.messageList.size();
        std::vector<std::string> replies;
        std::vector<std::shared_ptr<libjami::SwarmMessage>> msgList;
        repository_->log(
            [&](const auto& id, const auto& author, const auto& commit) {
                if (options.skipMerge && git_commit_parentcount(commit.get()) > 1) {
                    return CallbackResult::Skip;
                }
                if (replies.empty()) { // This avoid load until
                    // NOTE: in the future, we may want to add "Reply-Body" in commit to avoid to load
                    // until this commit
                    if ((options.nbOfCommits != 0
                         && (loadedHistory_.messageList.size() - currentHistorySize)
                                == options.nbOfCommits))
                        return CallbackResult::Break; // Stop logging
                    if (breakLogging)
                        return CallbackResult::Break; // Stop logging
                    if (id == options.to) {
                        if (options.includeTo)
                            breakLogging = true; // For the next commit
                        else
                            return CallbackResult::Break; // Stop logging
                    }
                }
    
                if (!startLogging && options.from != "" && options.from == id)
                    startLogging = true;
                if (!startLogging)
                    return CallbackResult::Skip; // Start logging after this one
    
                if (options.fastLog) {
                    if (options.authorUri != "") {
                        if (options.authorUri == repository_->uriFromDevice(author.email)) {
                            return CallbackResult::Break; // Found author, stop
                        }
                    }
                }
    
                return CallbackResult::Ok; // Continue
            },
            [&](auto&& cc) {
                auto optMessage = repository_->convCommitToMap(cc);
                if (!optMessage.has_value())
                    return;
                auto message = optMessage.value();
                if (message.find("reply-to") != message.end()) {
                    replies.emplace_back(message.at("reply-to"));
                }
                auto it = std::find(replies.begin(), replies.end(), message.at("id"));
                if (it != replies.end()) {
                    replies.erase(it);
                }
                std::shared_ptr<libjami::SwarmMessage> firstMsg;
                if (!optHistory && msgList.empty() && !loadedHistory_.messageList.empty()) {
                    firstMsg = *loadedHistory_.messageList.rbegin();
                }
                auto added = addToHistory({message}, false, false, optHistory);
                if (!added.empty() && firstMsg) {
                    emitSignal<libjami::ConversationSignal::SwarmMessageUpdated>(accountId_,
                                                                                 repository_->id(),
                                                                                 *firstMsg);
                }
                msgList.insert(msgList.end(), added.begin(), added.end());
            },
            [](auto, auto, auto) { return false; },
            options.from,
            options.logIfNotFound);
    
        // Convert for client (remove ptr)
        std::vector<libjami::SwarmMessage> ret;
        ret.reserve(msgList.size());
        for (const auto& msg: msgList) {
            ret.emplace_back(*msg);
        }
        if (!optHistory) {
            std::lock_guard lock(historyMtx_);
            isLoadingHistory_ = false;
            historyCv_.notify_all();
        }
    
        return ret;
    }
    
    void
    Conversation::Impl::handleReaction(History& history,
                                       const std::shared_ptr<libjami::SwarmMessage>& sharedCommit) const
    {
        auto it = history.quickAccess.find(sharedCommit->body.at("react-to"));
        auto peditIt = history.pendingEditions.find(sharedCommit->id);
        if (peditIt != history.pendingEditions.end()) {
            auto oldBody = sharedCommit->body;
            sharedCommit->body["body"] = peditIt->second.front()->body["body"];
            if (sharedCommit->body.at("body").empty())
                return;
            history.pendingEditions.erase(peditIt);
        }
        if (it != history.quickAccess.end()) {
            it->second->reactions.emplace_back(sharedCommit->body);
            emitSignal<libjami::ConversationSignal::ReactionAdded>(accountId_,
                                                                   repository_->id(),
                                                                   it->second->id,
                                                                   sharedCommit->body);
        } else {
            history.pendingReactions[sharedCommit->body.at("react-to")].emplace_back(sharedCommit->body);
        }
    }
    
    void
    Conversation::Impl::handleEdition(History& history,
                                      const std::shared_ptr<libjami::SwarmMessage>& sharedCommit,
                                      bool messageReceived) const
    {
        auto editId = sharedCommit->body.at("edit");
        auto it = history.quickAccess.find(editId);
        if (it != history.quickAccess.end()) {
            auto baseCommit = it->second;
            if (baseCommit) {
                auto itReact = baseCommit->body.find("react-to");
                std::string toReplace = (baseCommit->type == "application/data-transfer+json") ?
                    "tid" : "body";
                auto body = sharedCommit->body.at(toReplace);
                // Edit reaction
                if (itReact != baseCommit->body.end()) {
                    baseCommit->body[toReplace] = body; // Replace body if pending
                    it = history.quickAccess.find(itReact->second);
                    auto itPending = history.pendingReactions.find(itReact->second);
                    if (it != history.quickAccess.end()) {
                        baseCommit = it->second; // Base commit
                        auto itPreviousReact = std::find_if(baseCommit->reactions.begin(),
                                                            baseCommit->reactions.end(),
                                                            [&](const auto& reaction) {
                                                                return reaction.at("id") == editId;
                                                            });
                        if (itPreviousReact != baseCommit->reactions.end()) {
                            (*itPreviousReact)[toReplace] = body;
                            if (body.empty()) {
                                baseCommit->reactions.erase(itPreviousReact);
                                emitSignal<libjami::ConversationSignal::ReactionRemoved>(accountId_,
                                                                                         repository_
                                                                                             ->id(),
                                                                                         baseCommit->id,
                                                                                         editId);
                            }
                        }
                    } else if (itPending != history.pendingReactions.end()) {
                        // Else edit if pending
                        auto itReaction = std::find_if(itPending->second.begin(),
                                                       itPending->second.end(),
                                                       [&](const auto& reaction) {
                                                           return reaction.at("id") == editId;
                                                       });
                        if (itReaction != itPending->second.end()) {
                            (*itReaction)[toReplace] = body;
                            if (body.empty())
                                itPending->second.erase(itReaction);
                        }
                    } else {
                        // Add to pending edtions
                        messageReceived ? history.pendingEditions[editId].emplace_front(sharedCommit)
                                        : history.pendingEditions[editId].emplace_back(sharedCommit);
                    }
                } else {
                    // Normal message
                    it->second->editions.emplace(it->second->editions.begin(), it->second->body);
                    it->second->body[toReplace] = sharedCommit->body[toReplace];
                    if (toReplace == "tid") {
                        // Avoid to replace fileId in client
                        it->second->body["fileId"] = "";
                    }
                    // Remove reactions
                    if (sharedCommit->body.at(toReplace).empty())
                        it->second->reactions.clear();
                    emitSignal<libjami::ConversationSignal::SwarmMessageUpdated>(accountId_, repository_->id(), *it->second);
                }
            }
        } else {
            messageReceived ? history.pendingEditions[editId].emplace_front(sharedCommit)
                            : history.pendingEditions[editId].emplace_back(sharedCommit);
        }
    }
    
    bool
    Conversation::Impl::handleMessage(History& history,
                                      const std::shared_ptr<libjami::SwarmMessage>& sharedCommit,
                                      bool messageReceived) const
    {
        if (messageReceived) {
            // For a received message, we place it at the beginning of the list
            if (!history.messageList.empty())
                sharedCommit->linearizedParent = (*history.messageList.begin())->id;
            history.messageList.emplace_front(sharedCommit);
        } else {
            // For a loaded message, we load from newest to oldest
            // So we change the parent of the last message.
            if (!history.messageList.empty())
                (*history.messageList.rbegin())->linearizedParent = sharedCommit->id;
            history.messageList.emplace_back(sharedCommit);
        }
        // Handle pending reactions/editions
        auto reactIt = history.pendingReactions.find(sharedCommit->id);
        if (reactIt != history.pendingReactions.end()) {
            for (const auto& commitBody : reactIt->second)
                sharedCommit->reactions.emplace_back(commitBody);
            history.pendingReactions.erase(reactIt);
        }
        auto peditIt = history.pendingEditions.find(sharedCommit->id);
        if (peditIt != history.pendingEditions.end()) {
            auto oldBody = sharedCommit->body;
            if (sharedCommit->type == "application/data-transfer+json") {
                sharedCommit->body["tid"] = peditIt->second.front()->body["tid"];
                sharedCommit->body["fileId"] = "";
            } else {
                sharedCommit->body["body"] = peditIt->second.front()->body["body"];
            }
            peditIt->second.pop_front();
            for (const auto& commit : peditIt->second) {
                sharedCommit->editions.emplace_back(commit->body);
            }
            sharedCommit->editions.emplace_back(oldBody);
            history.pendingEditions.erase(peditIt);
        }
        // Announce to client
        if (messageReceived)
            emitSignal<libjami::ConversationSignal::SwarmMessageReceived>(accountId_,
                                                                          repository_->id(),
                                                                          *sharedCommit);
        return !messageReceived;
    }
    
    std::vector<std::shared_ptr<libjami::SwarmMessage>>
    Conversation::Impl::addToHistory(const std::vector<std::map<std::string, std::string>>& commits,
                                     bool messageReceived,
                                     bool commitFromSelf,
                                     History* optHistory) const
    {
        auto acc = account_.lock();
        if (!acc)
            return {};
        auto username = acc->getUsername();
        if (messageReceived && (!optHistory && isLoadingHistory_)) {
            std::unique_lock lk(historyMtx_);
            historyCv_.wait(lk, [&] { return !isLoadingHistory_; });
        }
        std::vector<std::shared_ptr<libjami::SwarmMessage>> messages;
        auto addCommit = [&](const auto& commit) {
            auto* history = optHistory ? optHistory : &loadedHistory_;
            auto commitId = commit.at("id");
            if (history->quickAccess.find(commitId) != history->quickAccess.end())
                return; // Already present
            auto typeIt = commit.find("type");
            auto reactToIt = commit.find("react-to");
            auto editIt = commit.find("edit");
            // Nothing to show for the client, skip
            if (typeIt != commit.end() && typeIt->second == "merge")
                return;
    
            auto sharedCommit = std::make_shared<libjami::SwarmMessage>();
            sharedCommit->fromMapStringString(commit);
            // Set message status based on cache (only on history for client)
            if (!commitFromSelf && optHistory == nullptr) {
                std::lock_guard lk(messageStatusMtx_);
                for (const auto& member: repository_->members()) {
                    // If we have a status cached, use it
                    auto itFuture = futureStatus.find(sharedCommit->id);
                    if (itFuture != futureStatus.end()) {
                        sharedCommit->status = std::move(itFuture->second);
                        futureStatus.erase(itFuture);
                        continue;
                    }
                    // Else we need to compute the status.
                    auto& cache = memberToStatus[member.uri];
                    if (cache == 0) {
                        // Message is sending, sent or displayed
                        cache = static_cast<int32_t>(libjami::Account::MessageStates::SENDING);
                    }
                    if (!messageReceived) {
                        // For loading previous messages, there is 3 cases. Last value cached is displayed, so is every previous commits
                        // Else, if last value is sent, we can compare to the last read commit to update the cache
                        // Finally if it's sending, we check last fetched commit
                        if (cache == static_cast<int32_t>(libjami::Account::MessageStates::SENT)) {
                            if (messagesStatus_[member.uri]["read"] == sharedCommit->id) {
                                cache = static_cast<int32_t>(libjami::Account::MessageStates::DISPLAYED);
                            }
                        } else if (cache <= static_cast<int32_t>(libjami::Account::MessageStates::SENDING)) { // SENDING or UNKNOWN
                            // cache can be upgraded to displayed or sent
                            if (messagesStatus_[member.uri]["read"] == sharedCommit->id) {
                                cache = static_cast<int32_t>(libjami::Account::MessageStates::DISPLAYED);
                            } else if (messagesStatus_[member.uri]["fetched"] == sharedCommit->id) {
                                cache = static_cast<int32_t>(libjami::Account::MessageStates::SENT);
                            }
                        }
                        sharedCommit->status[member.uri] = static_cast<int32_t>(cache);
                    } else {
                        // If member is author of the message received, they already saw it
                        if (member.uri == commit.at("author")) {
                            // If member is the author of the commit, they are considered as displayed (same for all previous commits)
                            messagesStatus_[member.uri]["read"] = sharedCommit->id;
                            messagesStatus_[member.uri]["fetched"] = sharedCommit->id;
                            sharedCommit->status[commit.at("author")] = static_cast<int32_t>(libjami::Account::MessageStates::DISPLAYED);
                            cache = static_cast<int32_t>(libjami::Account::MessageStates::DISPLAYED);
                            continue;
                        }
                        // For receiving messages, every commit is considered as SENDING, unless we got a update
                        auto status = static_cast<int32_t>(libjami::Account::MessageStates::SENDING);
                        if (messagesStatus_[member.uri]["read"] == sharedCommit->id) {
                            status = static_cast<int32_t>(libjami::Account::MessageStates::DISPLAYED);
                        } else if (messagesStatus_[member.uri]["fetched"] == sharedCommit->id) {
                            status = static_cast<int32_t>(libjami::Account::MessageStates::SENT);
                        }
                        sharedCommit->status[member.uri] = static_cast<int32_t>(status);
                    }
                }
            }
            history->quickAccess[commitId] = sharedCommit;
    
            if (reactToIt != commit.end() && !reactToIt->second.empty()) {
                handleReaction(*history, sharedCommit);
            } else if (editIt != commit.end() && !editIt->second.empty()) {
                handleEdition(*history, sharedCommit, messageReceived);
            } else if (handleMessage(*history, sharedCommit, messageReceived)) {
                messages.emplace_back(sharedCommit);
            }
        };
        std::for_each(commits.begin(), commits.end(), addCommit);
    
        return messages;
    }
    
    Conversation::Conversation(const std::shared_ptr<JamiAccount>& account,
                               ConversationMode mode,
                               const std::string& otherMember)
        : pimpl_ {new Impl {account, mode, otherMember}}
    {}
    
    Conversation::Conversation(const std::shared_ptr<JamiAccount>& account,
                               const std::string& conversationId)
        : pimpl_ {new Impl {account, conversationId}}
    {}
    
    Conversation::Conversation(const std::shared_ptr<JamiAccount>& account,
                               const std::string& remoteDevice,
                               const std::string& conversationId)
        : pimpl_ {new Impl {account, remoteDevice, conversationId}}
    {}
    
    Conversation::~Conversation() {}
    
    std::string
    Conversation::id() const
    {
        return pimpl_->repository_ ? pimpl_->repository_->id() : "";
    }
    
    void
    Conversation::addMember(const std::string& contactUri, const OnDoneCb& cb)
    {
        try {
            if (mode() == ConversationMode::ONE_TO_ONE) {
                // Only authorize to add left members
                auto initialMembers = getInitialMembers();
                auto it = std::find(initialMembers.begin(), initialMembers.end(), contactUri);
                if (it == initialMembers.end()) {
                    JAMI_WARN("Cannot add new member in one to one conversation");
                    cb(false, "");
                    return;
                }
            }
        } catch (const std::exception& e) {
            JAMI_WARN("Cannot get mode: %s", e.what());
            cb(false, "");
            return;
        }
        if (isMember(contactUri, true)) {
            JAMI_WARN("Could not add member %s because it's already a member", contactUri.c_str());
            cb(false, "");
            return;
        }
        if (isBanned(contactUri)) {
            if (pimpl_->isAdmin()) {
                dht::ThreadPool::io().run(
                    [w = weak(), contactUri = std::move(contactUri), cb = std::move(cb)] {
                        if (auto sthis = w.lock()) {
                            auto members = sthis->pimpl_->repository_->members();
                            auto type = sthis->pimpl_->bannedType(contactUri);
                            if (type.empty()) {
                                cb(false, {});
                                return;
                            }
                            sthis->pimpl_->voteUnban(contactUri, type, cb);
                        }
                    });
            } else {
                JAMI_WARN("Could not add member %s because this member is banned", contactUri.c_str());
                cb(false, "");
            }
            return;
        }
    
        dht::ThreadPool::io().run([w = weak(), contactUri = std::move(contactUri), cb = std::move(cb)] {
            if (auto sthis = w.lock()) {
                // Add member files and commit
                std::unique_lock lk(sthis->pimpl_->writeMtx_);
                auto commit = sthis->pimpl_->repository_->addMember(contactUri);
                sthis->pimpl_->announce(commit, true);
                lk.unlock();
                if (cb)
                    cb(!commit.empty(), commit);
            }
        });
    }
    
    std::shared_ptr<dhtnet::ChannelSocket>
    Conversation::gitSocket(const DeviceId& deviceId) const
    {
        return pimpl_->gitSocket(deviceId);
    }
    
    void
    Conversation::addGitSocket(const DeviceId& deviceId,
                               const std::shared_ptr<dhtnet::ChannelSocket>& socket)
    {
        pimpl_->addGitSocket(deviceId, socket);
    }
    
    void
    Conversation::removeGitSocket(const DeviceId& deviceId)
    {
        pimpl_->removeGitSocket(deviceId);
    }
    
    void
    Conversation::shutdownConnections()
    {
        pimpl_->fallbackTimer_->cancel();
        pimpl_->gitSocketList_.clear();
        if (pimpl_->swarmManager_)
            pimpl_->swarmManager_->shutdown();
        pimpl_->checkedMembers_.clear();
    }
    
    void
    Conversation::connectivityChanged()
    {
        if (pimpl_->swarmManager_)
            pimpl_->swarmManager_->maintainBuckets();
    }
    
    std::vector<jami::DeviceId>
    Conversation::getDeviceIdList() const
    {
        return pimpl_->swarmManager_->getRoutingTable().getAllNodes();
    }
    
    std::shared_ptr<Typers>
    Conversation::typers() const
    {
        return pimpl_->typers_;
    }
    
    bool
    Conversation::hasSwarmChannel(const std::string& deviceId)
    {
        if (!pimpl_->swarmManager_)
            return false;
        return pimpl_->swarmManager_->isConnectedWith(DeviceId(deviceId));
    }
    
    void
    Conversation::Impl::voteUnban(const std::string& contactUri,
                                  const std::string_view type,
                                  const OnDoneCb& cb)
    {
        // Check if admin
        if (!isAdmin()) {
            JAMI_WARN("You're not an admin of this repo. Cannot unban %s", contactUri.c_str());
            cb(false, {});
            return;
        }
    
        // Vote for removal
        std::unique_lock lk(writeMtx_);
        auto voteCommit = repository_->voteUnban(contactUri, type);
        if (voteCommit.empty()) {
            JAMI_WARN("Unbanning %s failed", contactUri.c_str());
            cb(false, "");
            return;
        }
    
        auto lastId = voteCommit;
        std::vector<std::string> commits;
        commits.emplace_back(voteCommit);
    
        // If admin, check vote
        auto resolveCommit = repository_->resolveVote(contactUri, type, "unban");
        if (!resolveCommit.empty()) {
            commits.emplace_back(resolveCommit);
            lastId = resolveCommit;
            JAMI_WARN("Vote solved for unbanning %s.", contactUri.c_str());
        }
        announce(commits, true);
        lk.unlock();
        if (cb)
            cb(!lastId.empty(), lastId);
    }
    
    void
    Conversation::removeMember(const std::string& contactUri, bool isDevice, const OnDoneCb& cb)
    {
        dht::ThreadPool::io().run([w = weak(),
                                   contactUri = std::move(contactUri),
                                   isDevice = std::move(isDevice),
                                   cb = std::move(cb)] {
            if (auto sthis = w.lock()) {
                // Check if admin
                if (!sthis->pimpl_->isAdmin()) {
                    JAMI_WARN("You're not an admin of this repo. Cannot ban %s", contactUri.c_str());
                    cb(false, {});
                    return;
                }
    
                // Get current user type
                std::string type;
                if (isDevice) {
                    type = "devices";
                } else {
                    auto members = sthis->pimpl_->repository_->members();
                    for (const auto& member : members) {
                        if (member.uri == contactUri) {
                            if (member.role == MemberRole::INVITED) {
                                type = "invited";
                            } else if (member.role == MemberRole::ADMIN) {
                                type = "admins";
                            } else if (member.role == MemberRole::MEMBER) {
                                type = "members";
                            }
                            break;
                        }
                    }
                    if (type.empty()) {
                        cb(false, {});
                        return;
                    }
                }
    
                // Vote for removal
                std::unique_lock lk(sthis->pimpl_->writeMtx_);
                auto voteCommit = sthis->pimpl_->repository_->voteKick(contactUri, type);
                if (voteCommit.empty()) {
                    JAMI_WARN("Kicking %s failed", contactUri.c_str());
                    cb(false, "");
                    return;
                }
    
                auto lastId = voteCommit;
                std::vector<std::string> commits;
                commits.emplace_back(voteCommit);
    
                // If admin, check vote
                auto resolveCommit = sthis->pimpl_->repository_->resolveVote(contactUri, type, "ban");
                if (!resolveCommit.empty()) {
                    commits.emplace_back(resolveCommit);
                    lastId = resolveCommit;
                    JAMI_WARN("Vote solved for %s. %s banned",
                              contactUri.c_str(),
                              isDevice ? "Device" : "Member");
    
                    const auto nodes = sthis->pimpl_->swarmManager_->getRoutingTable().getAllNodes();
                    // Remove nodes from swarmManager
                    std::vector<NodeId> toRemove;
                    for (const auto node : nodes)
                        if (contactUri == sthis->uriFromDevice(node.toString()))
                            toRemove.emplace_back(node);
                    sthis->pimpl_->swarmManager_->deleteNode(toRemove);
                    // Remove git sockets with this member
                    std::vector<DeviceId> gitToRm;
                    for (const auto& [deviceId, _] : sthis->pimpl_->gitSocketList_)
                        if (contactUri == sthis->uriFromDevice(deviceId.toString()))
                            gitToRm.emplace_back(deviceId);
                    for (const auto& did : gitToRm)
                        sthis->removeGitSocket(did);
                }
    
                sthis->pimpl_->announce(commits, true);
                lk.unlock();
                cb(!lastId.empty(), lastId);
            }
        });
    }
    
    std::vector<std::map<std::string, std::string>>
    Conversation::getMembers(bool includeInvited, bool includeLeft, bool includeBanned) const
    {
        return pimpl_->getMembers(includeInvited, includeLeft, includeBanned);
    }
    
    std::set<std::string>
    Conversation::memberUris(std::string_view filter, const std::set<MemberRole>& filteredRoles) const
    {
        return pimpl_->repository_->memberUris(filter, filteredRoles);
    }
    
    std::vector<NodeId>
    Conversation::peersToSyncWith() const
    {
        const auto& routingTable = pimpl_->swarmManager_->getRoutingTable();
        const auto& nodes = routingTable.getNodes();
        const auto& mobiles = routingTable.getMobileNodes();
        std::vector<NodeId> s;
        s.reserve(nodes.size() + mobiles.size());
        s.insert(s.end(), nodes.begin(), nodes.end());
        s.insert(s.end(), mobiles.begin(), mobiles.end());
        for (const auto& [deviceId, _] : pimpl_->gitSocketList_)
            if (std::find(s.cbegin(), s.cend(), deviceId) == s.cend())
                s.emplace_back(deviceId);
        return s;
    }
    
    bool
    Conversation::isBootstraped() const
    {
        const auto& routingTable = pimpl_->swarmManager_->getRoutingTable();
        return !routingTable.getNodes().empty();
    }
    
    std::string
    Conversation::uriFromDevice(const std::string& deviceId) const
    {
        return pimpl_->repository_->uriFromDevice(deviceId);
    }
    
    void
    Conversation::monitor()
    {
        pimpl_->swarmManager_->getRoutingTable().printRoutingTable();
    }
    
    std::string
    Conversation::join()
    {
        return pimpl_->repository_->join();
    }
    
    bool
    Conversation::isMember(const std::string& uri, bool includeInvited) const
    {
        auto repoPath = pimpl_->repoPath();
        auto invitedPath = repoPath / "invited";
        auto adminsPath = repoPath / "admins";
        auto membersPath = repoPath / "members";
        std::vector<std::filesystem::path> pathsToCheck = {adminsPath, membersPath};
        if (includeInvited)
            pathsToCheck.emplace_back(invitedPath);
        for (const auto& path : pathsToCheck) {
            for (const auto& certificate : dhtnet::fileutils::readDirectory(path)) {
                std::string_view crtUri = certificate;
                auto crtIt = crtUri.find(".crt");
                if (path != invitedPath && crtIt == std::string_view::npos) {
                    JAMI_WARNING("Incorrect file found: {}/{}", path, certificate);
                    continue;
                }
                if (crtIt != std::string_view::npos)
                    crtUri = crtUri.substr(0, crtIt);
                if (crtUri == uri)
                    return true;
            }
        }
    
        if (includeInvited && mode() == ConversationMode::ONE_TO_ONE) {
            for (const auto& member : getInitialMembers()) {
                if (member == uri)
                    return true;
            }
        }
    
        return false;
    }
    
    bool
    Conversation::isBanned(const std::string& uri) const
    {
        return !pimpl_->bannedType(uri).empty();
    }
    
    void
    Conversation::sendMessage(std::string&& message,
                              const std::string& type,
                              const std::string& replyTo,
                              OnCommitCb&& onCommit,
                              OnDoneCb&& cb)
    {
        Json::Value json;
        json["body"] = std::move(message);
        json["type"] = type;
        sendMessage(std::move(json), replyTo, std::move(onCommit), std::move(cb));
    }
    
    void
    Conversation::sendMessage(Json::Value&& value,
                              const std::string& replyTo,
                              OnCommitCb&& onCommit,
                              OnDoneCb&& cb)
    {
        if (!replyTo.empty()) {
            auto commit = pimpl_->repository_->getCommit(replyTo);
            if (commit == std::nullopt) {
                JAMI_ERR("Replying to invalid commit %s", replyTo.c_str());
                return;
            }
            value["reply-to"] = replyTo;
        }
        dht::ThreadPool::io().run(
            [w = weak(), value = std::move(value), onCommit = std::move(onCommit), cb = std::move(cb)] {
                if (auto sthis = w.lock()) {
                    auto acc = sthis->pimpl_->account_.lock();
                    if (!acc)
                        return;
                    std::unique_lock lk(sthis->pimpl_->writeMtx_);
                    auto commit = sthis->pimpl_->repository_->commitMessage(
                        Json::writeString(jsonBuilder, value));
                    lk.unlock();
                    if (onCommit)
                        onCommit(commit);
                    sthis->pimpl_->announce(commit, true);
                    if (cb)
                        cb(!commit.empty(), commit);
                }
            });
    }
    
    void
    Conversation::sendMessages(std::vector<Json::Value>&& messages, OnMultiDoneCb&& cb)
    {
        dht::ThreadPool::io().run([w = weak(), messages = std::move(messages), cb = std::move(cb)] {
            if (auto sthis = w.lock()) {
                std::vector<std::string> commits;
                commits.reserve(messages.size());
                std::unique_lock lk(sthis->pimpl_->writeMtx_);
                for (const auto& message : messages) {
                    auto commit = sthis->pimpl_->repository_->commitMessage(
                        Json::writeString(jsonBuilder, message));
                    commits.emplace_back(std::move(commit));
                }
                lk.unlock();
                sthis->pimpl_->announce(commits, true);
                if (cb)
                    cb(commits);
            }
        });
    }
    
    std::optional<std::map<std::string, std::string>>
    Conversation::getCommit(const std::string& commitId) const
    {
        auto commit = pimpl_->repository_->getCommit(commitId);
        if (commit == std::nullopt)
            return std::nullopt;
        return pimpl_->repository_->convCommitToMap(*commit);
    }
    
    void
    Conversation::loadMessages(OnLoadMessages cb, const LogOptions& options)
    {
        if (!cb)
            return;
        dht::ThreadPool::io().run([w = weak(), cb = std::move(cb), options] {
            if (auto sthis = w.lock()) {
                cb(sthis->pimpl_->loadMessages(options));
            }
        });
    }
    
    void
    Conversation::loadMessages2(const OnLoadMessages2& cb, const LogOptions& options)
    {
        if (!cb)
            return;
        dht::ThreadPool::io().run([w = weak(), cb = std::move(cb), options] {
            if (auto sthis = w.lock()) {
                cb(sthis->pimpl_->loadMessages2(options));
            }
        });
    }
    
    void
    Conversation::clearCache()
    {
        pimpl_->loadedHistory_.messageList.clear();
        pimpl_->loadedHistory_.quickAccess.clear();
        pimpl_->loadedHistory_.pendingEditions.clear();
        pimpl_->loadedHistory_.pendingReactions.clear();
    }
    
    std::string
    Conversation::lastCommitId() const
    {
        LogOptions options;
        options.nbOfCommits = 1;
        options.skipMerge = true;
        History optHistory;
        {
            std::lock_guard lk(pimpl_->historyMtx_);
            if (!pimpl_->loadedHistory_.messageList.empty())
                return (*pimpl_->loadedHistory_.messageList.begin())->id;
        }
    
        std::lock_guard lk(pimpl_->writeMtx_);
        auto res = pimpl_->loadMessages2(options, &optHistory);
        if (res.empty())
            return {};
        return (*optHistory.messageList.begin())->id;
    }
    
    std::vector<std::map<std::string, std::string>>
    Conversation::Impl::mergeHistory(const std::string& uri)
    {
        if (not repository_) {
            JAMI_WARN("Invalid repo. Abort merge");
            return {};
        }
        auto remoteHead = repository_->remoteHead(uri);
        if (remoteHead.empty()) {
            JAMI_WARN("Could not get HEAD of %s", uri.c_str());
            return {};
        }
    
        // Validate commit
        auto [newCommits, err] = repository_->validFetch(uri);
        if (newCommits.empty()) {
            if (err)
                JAMI_ERR("Could not validate history with %s", uri.c_str());
            repository_->removeBranchWith(uri);
            return {};
        }
    
        // If validated, merge
        auto [ok, cid] = repository_->merge(remoteHead);
        if (!ok) {
            JAMI_ERR("Could not merge history with %s", uri.c_str());
            repository_->removeBranchWith(uri);
            return {};
        }
        if (!cid.empty()) {
            // A merge commit was generated, should be added in new commits
            auto commit = repository_->getCommit(cid);
            if (commit != std::nullopt)
                newCommits.emplace_back(*commit);
        }
    
        JAMI_DEBUG("Successfully merge history with {:s}", uri);
        auto result = repository_->convCommitsToMap(newCommits);
        for (const auto& commit : result) {
            auto it = commit.find("type");
            if (it != commit.end() && it->second == "member") {
                repository_->refreshMembers();
            }
        }
        return result;
    }
    
    bool
    Conversation::pull(const std::string& deviceId, OnPullCb&& cb, std::string commitId)
    {
        std::lock_guard lk(pimpl_->pullcbsMtx_);
        auto isInProgress = not pimpl_->pullcbs_.empty();
        auto itPull = std::find_if(pimpl_->pullcbs_.begin(),
                                   pimpl_->pullcbs_.end(),
                                   [&](const auto& elem) { return std::get<0>(elem) == deviceId && std::get<1>(elem) == commitId; });
        if (itPull != pimpl_->pullcbs_.end()) {
            cb(false);
            return false;
        }
        JAMI_INFO() << "Sync " << id() << " with " << deviceId;
        pimpl_->pullcbs_.emplace_back(deviceId, std::move(commitId), std::move(cb));
        if (isInProgress)
            return true;
        dht::ThreadPool::io().run([w = weak()] {
            if (auto sthis_ = w.lock())
                sthis_->pimpl_->pull();
        });
        return true;
    }
    
    void
    Conversation::Impl::pull()
    {
        auto& repo = repository_;
    
        std::string deviceId, commitId;
        OnPullCb cb;
        while (true) {
            decltype(pullcbs_)::value_type pullcb;
            decltype(fetchingRemotes_.begin()) it;
            {
                std::lock_guard lk(pullcbsMtx_);
                if (pullcbs_.empty())
                    return;
                auto& elem = pullcbs_.front();
                deviceId = std::move(std::get<0>(elem));
                commitId = std::move(std::get<1>(elem));
                cb = std::move(std::get<2>(elem));
                pullcbs_.pop_front();
    
                // Check if already using this remote, if so, no need to pull yet
                // One pull at a time to avoid any early EOF or fetch errors.
                auto itr = fetchingRemotes_.emplace(deviceId);
                if (!itr.second) {
                    // Go to next pull
                    pullcbs_.emplace_back(std::move(deviceId), std::move(commitId), std::move(cb));
                    continue;
                }
                it = itr.first;
            }
            // If recently fetched, the commit can already be there, so no need to do complex operations
            if (commitId != "" && repo->getCommit(commitId, false) != std::nullopt) {
                cb(true);
                std::lock_guard lk(pullcbsMtx_);
                fetchingRemotes_.erase(it);
                continue;
            }
            // Pull from remote
            auto fetched = repo->fetch(deviceId);
            {
                std::lock_guard lk(pullcbsMtx_);
                fetchingRemotes_.erase(it);
            }
    
            if (!fetched) {
                cb(false);
                continue;
            }
            auto oldHead = repo->getHead();
            std::string newHead = oldHead;
            std::unique_lock lk(writeMtx_);
            auto commits = mergeHistory(deviceId);
            if (!commits.empty()) {
                newHead = commits.rbegin()->at("id");
                // Note: Because clients needs to linearize the history, they need to know all commits
                // that can be updated.
                // In this case, all commits until the common merge base should be announced.
                // The client ill need to update it's model after this.
                std::string mergeBase = oldHead; // If fast-forward, the merge base is the previous head
                auto newHeadCommit = repo->getCommit(newHead);
                if (newHeadCommit != std::nullopt && newHeadCommit->parents.size() > 1) {
                    mergeBase = repo->mergeBase(newHeadCommit->parents[0], newHeadCommit->parents[1]);
                    LogOptions options;
                    options.to = mergeBase;
                    auto updatedCommits = loadMessages(options);
                    // We announce commits from oldest to update to newest. This generally avoid
                    // to get detached commits until they are all announced.
                    std::reverse(std::begin(updatedCommits), std::end(updatedCommits));
                    announce(updatedCommits);
                } else {
                    announce(commits);
                }
            }
            lk.unlock();
            if (cb)
                cb(true);
            // Announce if profile changed
            if (oldHead != newHead) {
                auto diffStats = repo->diffStats(newHead, oldHead);
                auto changedFiles = repo->changedFiles(diffStats);
                if (find(changedFiles.begin(), changedFiles.end(), "profile.vcf")
                    != changedFiles.end()) {
                    if (auto account = account_.lock())
                        emitSignal<libjami::ConversationSignal::ConversationProfileUpdated>(
                            account->getAccountID(), repo->id(), repo->infos());
                }
            }
        }
    }
    
    void
    Conversation::sync(const std::string& member,
                       const std::string& deviceId,
                       OnPullCb&& cb,
                       std::string commitId)
    {
        if (!pull(deviceId, std::move(cb), commitId))
            return;
        dht::ThreadPool::io().run([member, deviceId, a = pimpl_->account_, w = weak_from_this()] {
            auto sthis = w.lock();
            if (auto account = a.lock()) {
                // For waiting request, downloadFile
                for (const auto& wr : sthis->dataTransfer()->waitingRequests()) {
                    auto path = fileutils::get_data_dir() / account->getAccountID()
                                / "conversation_data" / sthis->id() / wr.fileId;
                    auto start = fileutils::size(path.string());
                    if (start < 0)
                        start = 0;
                    sthis->downloadFile(wr.interactionId, wr.fileId, wr.path, member, deviceId, start);
                }
            }
        });
    }
    
    std::map<std::string, std::string>
    Conversation::generateInvitation() const
    {
        // Invite the new member to the conversation
        Json::Value root;
        auto& metadata = root[ConversationMapKeys::METADATAS];
        for (const auto& [k, v] : infos()) {
            if (v.size() >= 64000) {
                JAMI_WARNING("Cutting invite because the SIP message will be too long");
                continue;
            }
            metadata[k] = v;
        }
        root[ConversationMapKeys::CONVERSATIONID] = id();
        return {{"application/invite+json", Json::writeString(jsonBuilder, root)}};
    }
    
    std::string
    Conversation::leave()
    {
        setRemovingFlag();
        std::lock_guard lk(pimpl_->writeMtx_);
        return pimpl_->repository_->leave();
    }
    
    void
    Conversation::setRemovingFlag()
    {
        pimpl_->isRemoving_ = true;
    }
    
    bool
    Conversation::isRemoving()
    {
        return pimpl_->isRemoving_;
    }
    
    void
    Conversation::erase()
    {
        if (pimpl_->conversationDataPath_ != "")
            dhtnet::fileutils::removeAll(pimpl_->conversationDataPath_, true);
        if (!pimpl_->repository_)
            return;
        std::lock_guard lk(pimpl_->writeMtx_);
        pimpl_->repository_->erase();
    }
    
    ConversationMode
    Conversation::mode() const
    {
        return pimpl_->repository_->mode();
    }
    
    std::vector<std::string>
    Conversation::getInitialMembers() const
    {
        return pimpl_->repository_->getInitialMembers();
    }
    
    bool
    Conversation::isInitialMember(const std::string& uri) const
    {
        auto members = getInitialMembers();
        return std::find(members.begin(), members.end(), uri) != members.end();
    }
    
    void
    Conversation::updateInfos(const std::map<std::string, std::string>& map, const OnDoneCb& cb)
    {
        dht::ThreadPool::io().run([w = weak(), map = std::move(map), cb = std::move(cb)] {
            if (auto sthis = w.lock()) {
                auto& repo = sthis->pimpl_->repository_;
                std::unique_lock lk(sthis->pimpl_->writeMtx_);
                auto commit = repo->updateInfos(map);
                sthis->pimpl_->announce(commit, true);
                lk.unlock();
                if (cb)
                    cb(!commit.empty(), commit);
                if (auto account = sthis->pimpl_->account_.lock())
                    emitSignal<libjami::ConversationSignal::ConversationProfileUpdated>(
                        account->getAccountID(), repo->id(), repo->infos());
            }
        });
    }
    
    std::map<std::string, std::string>
    Conversation::infos() const
    {
        return pimpl_->repository_->infos();
    }
    
    void
    Conversation::updatePreferences(const std::map<std::string, std::string>& map)
    {
        auto filePath = pimpl_->conversationDataPath_ / "preferences";
        auto prefs = map;
        auto itLast = prefs.find(LAST_MODIFIED);
        if (itLast != prefs.end()) {
            if (std::filesystem::is_regular_file(filePath)) {
                auto lastModified = fileutils::lastWriteTimeInSeconds(filePath);
                try {
                    if (lastModified >= std::stoul(itLast->second))
                        return;
                } catch (...) {
                    return;
                }
            }
            prefs.erase(itLast);
        }
    
        std::ofstream file(filePath, std::ios::trunc | std::ios::binary);
        msgpack::pack(file, prefs);
        emitSignal<libjami::ConversationSignal::ConversationPreferencesUpdated>(pimpl_->accountId_,
                                                                                id(),
                                                                                std::move(prefs));
    }
    
    std::map<std::string, std::string>
    Conversation::preferences(bool includeLastModified) const
    {
        try {
            std::map<std::string, std::string> preferences;
            auto filePath = pimpl_->conversationDataPath_ / "preferences";
            auto file = fileutils::loadFile(filePath);
            msgpack::object_handle oh = msgpack::unpack((const char*) file.data(), file.size());
            oh.get().convert(preferences);
            if (includeLastModified)
                preferences[LAST_MODIFIED] = std::to_string(fileutils::lastWriteTimeInSeconds(filePath));
            return preferences;
        } catch (const std::exception& e) {
        }
        return {};
    }
    
    std::vector<uint8_t>
    Conversation::vCard() const
    {
        try {
            return fileutils::loadFile(pimpl_->repoPath() / "profile.vcf");
        } catch (...) {
        }
        return {};
    }
    
    std::shared_ptr<TransferManager>
    Conversation::dataTransfer() const
    {
        return pimpl_->transferManager_;
    }
    
    bool
    Conversation::onFileChannelRequest(const std::string& member,
                                       const std::string& fileId,
                                       bool verifyShaSum) const
    {
        if (!isMember(member))
            return false;
    
        auto sep = fileId.find('_');
        if (sep == std::string::npos)
            return false;
    
        auto interactionId = fileId.substr(0, sep);
        auto commit = getCommit(interactionId);
        if (commit == std::nullopt || commit->find("type") == commit->end()
            || commit->find("tid") == commit->end() || commit->find("sha3sum") == commit->end()
            || commit->at("type") != "application/data-transfer+json")
            return false;
    
        auto path = dataTransfer()->path(fileId);
    
        if (!std::filesystem::is_regular_file(path)) {
            // Check if dangling symlink
            if (std::filesystem::is_symlink(path)) {
                dhtnet::fileutils::remove(path, true);
            }
            JAMI_DEBUG("[Account {:s}] {:s} asked for non existing file {} in {:s}",
                       pimpl_->accountId_,
                       member,
                       fileId,
                       id());
            return false;
        }
        // Check that our file is correct before sending
        if (verifyShaSum && commit->at("sha3sum") != fileutils::sha3File(path)) {
            JAMI_DEBUG(
                "[Account {:s}] {:s} asked for file {:s} in {:s}, but our version is not complete",
                pimpl_->accountId_,
                member,
                fileId,
                id());
            return false;
        }
        return true;
    }
    
    bool
    Conversation::downloadFile(const std::string& interactionId,
                               const std::string& fileId,
                               const std::string& path,
                               const std::string&,
                               const std::string& deviceId,
                               std::size_t start,
                               std::size_t end)
    {
        auto commit = getCommit(interactionId);
        if (commit == std::nullopt || commit->at("type") != "application/data-transfer+json") {
            JAMI_ERROR("Commit doesn't exists or is not a file transfer {} (Conversation: {}) ", interactionId, id());
            return false;
        }
        auto tid = commit->find("tid");
        auto sha3sum = commit->find("sha3sum");
        auto size_str = commit->find("totalSize");
    
        if (tid == commit->end() || sha3sum == commit->end() || size_str == commit->end()) {
            JAMI_ERROR("Invalid file transfer commit (missing tid, size or sha3)");
            return false;
        }
    
        auto totalSize = to_int<ssize_t>(size_str->second, (ssize_t) -1);
        if (totalSize < 0) {
            JAMI_ERROR("Invalid file size {}", totalSize);
            return false;
        }
    
        // Be sure to not lock conversation
        dht::ThreadPool().io().run([w = weak(),
                                    deviceId,
                                    fileId,
                                    interactionId,
                                    sha3sum = sha3sum->second,
                                    path,
                                    totalSize,
                                    start,
                                    end] {
            if (auto shared = w.lock()) {
                auto acc = shared->pimpl_->account_.lock();
                if (!acc)
                    return;
                shared->dataTransfer()->waitForTransfer(fileId, interactionId, sha3sum, path, totalSize);
                acc->askForFileChannel(shared->id(), deviceId, interactionId, fileId, start, end);
            }
        });
        return true;
    }
    
    void
    Conversation::hasFetched(const std::string& deviceId, const std::string& commitId)
    {
        dht::ThreadPool::io().run([w = weak(), deviceId, commitId]() {
            auto sthis = w.lock();
            if (!sthis)
                return;
            auto acc = sthis->pimpl_->account_.lock();
            // Update fetched for Uri
            auto uri = sthis->uriFromDevice(deviceId);
            if (uri.empty() || !acc || uri == acc->getUsername())
                return;
            // When a user fetches a commit, the message is sent for this person
            sthis->pimpl_->updateStatus(uri, libjami::Account::MessageStates::SENT, commitId, std::to_string(std::time(nullptr)), true);
        });
    }
    
    
    void
    Conversation::Impl::updateStatus(const std::string& uri,
                          libjami::Account::MessageStates st,
                          const std::string& commitId,
                          const std::string& ts,
                          bool emit)
    {
        // This method can be called if peer send us a status or if another device sync. Emit will be true if a peer send us a status and will emit to other connected devices.
        LogOptions options;
        std::map<std::string, std::map<std::string, std::string>> newStatus;
        {
            // Update internal structures.
            std::lock_guard lk(messageStatusMtx_);
            auto& status = messagesStatus_[uri];
            auto& oldStatus = status[st == libjami::Account::MessageStates::SENT ? "fetched" : "read"];
            if (oldStatus == commitId)
                return; // Nothing to do
            options.to = oldStatus;
            options.from = commitId;
            oldStatus = commitId;
            status[st == libjami::Account::MessageStates::SENT ? "fetched_ts" : "read_ts"] = ts;
            saveStatus();
            if (emit)
                newStatus[uri].insert(status.begin(), status.end());
        }
        if (emit && messageStatusCb_) {
            messageStatusCb_(newStatus);
        }
        // Update messages status for all commit between the old and new one
        options.logIfNotFound = false;
        options.fastLog = true;
        History optHistory;
        std::lock_guard lk(historyMtx_); // Avoid to announce messages while updating status.
        auto res = loadMessages2(options, &optHistory);
        if (res.size() == 0) {
            // In this case, commit is not received yet, so we cache it
            futureStatus[commitId][uri] = static_cast<int32_t>(st);
        }
        for (const auto& [cid, _]: optHistory.quickAccess) {
            auto message = loadedHistory_.quickAccess.find(cid);
            if (message != loadedHistory_.quickAccess.end()) {
                // Update message and emit to client,
                message->second->status[uri] = static_cast<int32_t>(st);
                emitSignal<libjami::ConfigurationSignal::AccountMessageStatusChanged>(
                    accountId_,
                    repository_->id(),
                    uri,
                    cid,
                    static_cast<int>(st));
            } else {
                // In this case, commit is not loaded by client, so we cache it
                // No need to emit to client, they will get a correct status on load.
                futureStatus[cid][uri] = static_cast<int32_t>(st);
            }
        }
    }
    
    bool
    Conversation::setMessageDisplayed(const std::string& uri, const std::string& interactionId)
    {
        std::lock_guard lk(pimpl_->messageStatusMtx_);
        if (pimpl_->messagesStatus_[uri]["read"] == interactionId)
            return false; // Nothing to do
        dht::ThreadPool::io().run([w = weak(), uri, interactionId]() {
            auto sthis = w.lock();
            if (!sthis)
                return;
            sthis->pimpl_->updateStatus(uri, libjami::Account::MessageStates::DISPLAYED, interactionId, std::to_string(std::time(nullptr)), true);
        });
        return true;
    }
    
    std::map<std::string, std::map<std::string, std::string>>
    Conversation::messageStatus() const
    {
        std::lock_guard lk(pimpl_->messageStatusMtx_);
        return pimpl_->messagesStatus_;
    }
    
    void
    Conversation::updateMessageStatus(const std::map<std::string, std::map<std::string, std::string>>& messageStatus)
    {
        auto acc = pimpl_->account_.lock();
        std::unique_lock lk(pimpl_->messageStatusMtx_);
        std::vector<std::tuple<libjami::Account::MessageStates, std::string, std::string, std::string>> stVec;
        for (const auto& [uri, status] : messageStatus) {
            auto& oldMs = pimpl_->messagesStatus_[uri];
            if (status.find("fetched_ts") != status.end() && status.at("fetched") != oldMs["fetched"]) {
                if (oldMs["fetched_ts"].empty() || std::stol(oldMs["fetched_ts"]) <= std::stol(status.at("fetched_ts"))) {
                    stVec.emplace_back(libjami::Account::MessageStates::SENT, uri, status.at("fetched"), status.at("fetched_ts"));
                }
            }
            if (status.find("read_ts") != status.end() && status.at("read") != oldMs["read"]) {
                if (oldMs["read_ts"].empty() || std::stol(oldMs["read_ts"]) <= std::stol(status.at("read_ts"))) {
                    stVec.emplace_back(libjami::Account::MessageStates::DISPLAYED, uri, status.at("read"), status.at("read_ts"));
                }
            }
        }
        lk.unlock();
    
        for (const auto& [status, uri, commitId, ts] : stVec) {
            pimpl_->updateStatus(uri, status, commitId, ts);
        }
    }
    
    void
    Conversation::onMessageStatusChanged(const std::function<void(const std::map<std::string, std::map<std::string, std::string>>&)>& cb)
    {
        std::unique_lock lk(pimpl_->messageStatusMtx_);
        pimpl_->messageStatusCb_ = cb;
    }
    
    #ifdef LIBJAMI_TESTABLE
    void
    Conversation::onBootstrapStatus(const std::function<void(std::string, BootstrapStatus)>& cb)
    {
        pimpl_->bootstrapCbTest_ = cb;
    }
    #endif
    
    void
    Conversation::checkBootstrapMember(const asio::error_code& ec,
                                       std::vector<std::map<std::string, std::string>> members)
    {
        if (ec == asio::error::operation_aborted)
            return;
        auto acc = pimpl_->account_.lock();
        if (pimpl_->swarmManager_->getRoutingTable().getNodes().size() > 0 or not acc)
            return;
        // We bootstrap the DRT with devices who already wrote in the repository.
        // However, in a conversation, a large number of devices may just watch
        // the conversation, but never write any message.
        std::string uri;
        while (!members.empty()) {
            auto member = members.back();
            members.pop_back();
            uri = member.at("uri");
            if (uri != acc->getUsername()
                && pimpl_->checkedMembers_.find(uri) == pimpl_->checkedMembers_.end())
                break;
        }
        auto fallbackFailed = [](auto sthis) {
            JAMI_WARNING("{}[SwarmManager {}] Bootstrap: Fallback failed. Wait for remote connections.",
                        sthis->pimpl_->toString(),
                        fmt::ptr(sthis->pimpl_->swarmManager_.get()));
    #ifdef LIBJAMI_TESTABLE
            if (sthis->pimpl_->bootstrapCbTest_)
                sthis->pimpl_->bootstrapCbTest_(sthis->id(), BootstrapStatus::FAILED);
    #endif
        };
        // If members is empty, we finished the fallback un-successfully
        if (members.empty() && uri.empty()) {
            fallbackFailed(this);
            return;
        }
    
        // Fallback, check devices of a member (we didn't check yet) in the conversation
        pimpl_->checkedMembers_.emplace(uri);
        auto devices = std::make_shared<std::vector<NodeId>>();
        acc->forEachDevice(
            dht::InfoHash(uri),
            [w = weak(), devices](const std::shared_ptr<dht::crypto::PublicKey>& dev) {
                // Test if already sent
                if (auto sthis = w.lock()) {
                    if (!sthis->pimpl_->swarmManager_->getRoutingTable().hasKnownNode(dev->getLongId()))
                        devices->emplace_back(dev->getLongId());
                }
            },
            [w = weak(), devices, members = std::move(members), uri, fallbackFailed=std::move(fallbackFailed)](bool ok) {
                auto sthis = w.lock();
                if (!sthis)
                    return;
                auto checkNext = true;
                if (ok && devices->size() != 0) {
    #ifdef LIBJAMI_TESTABLE
                    if (sthis->pimpl_->bootstrapCbTest_)
                        sthis->pimpl_->bootstrapCbTest_(sthis->id(), BootstrapStatus::FALLBACK);
    #endif
                    JAMI_WARNING("{}[SwarmManager {}] Bootstrap: Fallback with member: {}",
                                 sthis->pimpl_->toString(),
                                 fmt::ptr(sthis->pimpl_->swarmManager_.get()),
                                 uri);
                    if (sthis->pimpl_->swarmManager_->setKnownNodes(*devices))
                        checkNext = false;
                }
                if (checkNext) {
                    // Check next member
                    sthis->pimpl_->fallbackTimer_->expires_at(std::chrono::steady_clock::now());
                    sthis->pimpl_->fallbackTimer_->async_wait(
                        std::bind(&Conversation::checkBootstrapMember,
                                  sthis,
                                  std::placeholders::_1,
                                  std::move(members)));
                } else {
                    // In this case, all members are checked. Fallback failed
                    fallbackFailed(sthis);
                }
            });
    }
    
    void
    Conversation::bootstrap(std::function<void()> onBootstraped,
                            const std::vector<DeviceId>& knownDevices)
    {
        if (!pimpl_ || !pimpl_->repository_ || !pimpl_->swarmManager_)
            return;
        // Here, we bootstrap the DRT with devices who already wrote in the conversation
        // If this doesn't work, it will try to fallback with checkBootstrapMember
        // If it works, the callback onConnectionChanged will be called with ok=true
        pimpl_->bootstrapCb_ = std::move(onBootstraped);
        std::vector<DeviceId> devices = knownDevices;
        for (const auto& m : pimpl_->repository_->devices())
            devices.insert(devices.end(), m.second.begin(), m.second.end());
        JAMI_DEBUG("{}[SwarmManager {}] Bootstrap with {} devices",
                   pimpl_->toString(),
                   fmt::ptr(pimpl_->swarmManager_.get()),
                   devices.size());
        // set callback
        auto fallback = [](auto sthis, bool now = false) {
            // Fallback
            auto acc = sthis->pimpl_->account_.lock();
            if (!acc)
                return;
            auto members = sthis->getMembers(false, false);
            std::shuffle(members.begin(), members.end(), acc->rand);
            if (now) {
                sthis->pimpl_->fallbackTimer_->expires_at(std::chrono::steady_clock::now());
            } else {
                auto timeForBootstrap = std::min(static_cast<size_t>(8), members.size());
                sthis->pimpl_->fallbackTimer_->expires_at(std::chrono::steady_clock::now() + 20s
                                                            - std::chrono::seconds(timeForBootstrap));
                JAMI_DEBUG("{}[SwarmManager {}] Fallback in {} seconds",
                            sthis->pimpl_->toString(),
                            fmt::ptr(sthis->pimpl_->swarmManager_.get()),
                            (20 - timeForBootstrap));
            }
            sthis->pimpl_->fallbackTimer_->async_wait(std::bind(&Conversation::checkBootstrapMember,
                                                                sthis,
                                                                std::placeholders::_1,
                                                                std::move(members)));
        };
    
        pimpl_->swarmManager_->onConnectionChanged([w = weak(), fallback](bool ok) {
            // This will call methods from accounts, so trigger on another thread.
            dht::ThreadPool::io().run([w, ok, fallback=std::move(fallback)] {
                auto sthis = w.lock();
                if (!sthis)
                    return;
                if (ok) {
                    // Bootstrap succeeded!
                    sthis->pimpl_->checkedMembers_.clear();
                    if (sthis->pimpl_->bootstrapCb_)
                        sthis->pimpl_->bootstrapCb_();
    #ifdef LIBJAMI_TESTABLE
                    if (sthis->pimpl_->bootstrapCbTest_)
                        sthis->pimpl_->bootstrapCbTest_(sthis->id(), BootstrapStatus::SUCCESS);
    #endif
                    return;
                }
                fallback(sthis);
            });
        });
        pimpl_->checkedMembers_.clear();
        // If is shutdown, the conversation was re-added, causing no new nodes to be connected, but just a classic connectivity change
        if (pimpl_->swarmManager_->isShutdown()) {
            pimpl_->swarmManager_->restart();
            pimpl_->swarmManager_->maintainBuckets();
        } else if (!pimpl_->swarmManager_->setKnownNodes(devices)) {
            fallback(this, true);
        }
    }
    
    std::vector<std::string>
    Conversation::commitsEndedCalls()
    {
        pimpl_->loadActiveCalls();
        pimpl_->loadHostedCalls();
        auto commits = pimpl_->commitsEndedCalls();
        if (!commits.empty()) {
            // Announce to client
            dht::ThreadPool::io().run([w = weak(), commits] {
                if (auto sthis = w.lock())
                    sthis->pimpl_->announce(commits, true);
            });
        }
        return commits;
    }
    
    void
    Conversation::onMembersChanged(OnMembersChanged&& cb)
    {
        pimpl_->onMembersChanged_ = std::move(cb);
    }
    
    void
    Conversation::onNeedSocket(NeedSocketCb needSocket)
    {
        pimpl_->swarmManager_->needSocketCb_ = [needSocket = std::move(needSocket),
                                                w=weak()](const std::string& deviceId, ChannelCb&& cb) {
            if (auto sthis = w.lock())
                needSocket(sthis->id(), deviceId, std::move(cb), "application/im-gitmessage-id");
        };
    }
    
    void
    Conversation::addSwarmChannel(std::shared_ptr<dhtnet::ChannelSocket> channel)
    {
        auto deviceId = channel->deviceId();
        // Transmit avatar if necessary
        // We do this here, because at this point we know both sides are connected and in
        // the same conversation
        // addSwarmChannel is a bit more complex, but it should be the best moment to do this.
        auto cert = channel->peerCertificate();
        if (!cert || !cert->issuer)
            return;
        auto member = cert->issuer->getId().toString();
        pimpl_->swarmManager_->addChannel(std::move(channel));
        dht::ThreadPool::io().run([member, deviceId, a = pimpl_->account_, w = weak_from_this()] {
            auto sthis = w.lock();
            if (auto account = a.lock()) {
                account->sendProfile(sthis->id(), member, deviceId.toString());
            }
        });
    }
    
    uint32_t
    Conversation::countInteractions(const std::string& toId,
                                    const std::string& fromId,
                                    const std::string& authorUri) const
    {
        LogOptions options;
        options.to = toId;
        options.from = fromId;
        options.authorUri = authorUri;
        options.logIfNotFound = false;
        options.fastLog = true;
        History history;
        auto res = pimpl_->loadMessages2(options, &history);
        return res.size();
    }
    
    void
    Conversation::search(uint32_t req,
                         const Filter& filter,
                         const std::shared_ptr<std::atomic_int>& flag) const
    {
        // Because logging a conversation can take quite some time,
        // do it asynchronously
        dht::ThreadPool::io().run([w = weak(), req, filter, flag] {
            if (auto sthis = w.lock()) {
                auto acc = sthis->pimpl_->account_.lock();
                if (!acc)
                    return;
    
                History history;
                std::vector<std::map<std::string, std::string>> commits {};
                // std::regex_constants::ECMAScript is the default flag.
                auto re = std::regex(filter.regexSearch,
                                     filter.caseSensitive ? std::regex_constants::ECMAScript
                                                          : std::regex_constants::icase);
                sthis->pimpl_->repository_->log(
                    [&](const auto& id, const auto& author, auto& commit) {
                        if (!filter.author.empty()
                            && filter.author != sthis->uriFromDevice(author.email)) {
                            // Filter author
                            return CallbackResult::Skip;
                        }
                        auto commitTime = git_commit_time(commit.get());
                        if (filter.before && filter.before < commitTime) {
                            // Only get commits before this date
                            return CallbackResult::Skip;
                        }
                        if (filter.after && filter.after > commitTime) {
                            // Only get commits before this date
                            if (git_commit_parentcount(commit.get()) <= 1)
                                return CallbackResult::Break;
                            else
                                return CallbackResult::Skip; // Because we are sorting it with
                                                             // GIT_SORT_TOPOLOGICAL | GIT_SORT_TIME
                        }
    
                        return CallbackResult::Ok; // Continue
                    },
                    [&](auto&& cc) {
                        if (auto optMessage = sthis->pimpl_->repository_->convCommitToMap(cc))
                            sthis->pimpl_->addToHistory({optMessage.value()}, false, false, &history);
                    },
                    [&](auto id, auto, auto) {
                        if (id == filter.lastId)
                            return true;
                        return false;
                    },
                    "",
                    false);
                // Search on generated history
                for (auto& message : history.messageList) {
                    auto contentType = message->type;
                    auto isSearchable = contentType == "text/plain"
                                        || contentType == "application/data-transfer+json";
                    if (filter.type.empty() && !isSearchable) {
                        // Not searchable, at least for now
                        continue;
                    } else if (contentType == filter.type || filter.type.empty()) {
                        if (isSearchable) {
                            // If it's a text match the body, else the display name
                            auto body = contentType == "text/plain" ? message->body.at("body")
                                                                    : message->body.at("displayName");
                            std::smatch body_match;
                            if (std::regex_search(body, body_match, re)) {
                                auto commit = message->body;
                                commit["id"] = message->id;
                                commit["type"] = message->type;
                                commits.emplace_back(commit);
                            }
                        } else {
                            // Matching type, just add it to the results
                            commits.emplace_back(message->body);
                        }
    
                        if (filter.maxResult != 0 && commits.size() == filter.maxResult)
                            break;
                    }
                }
    
                if (commits.size() > 0)
                    emitSignal<libjami::ConversationSignal::MessagesFound>(req,
                                                                           acc->getAccountID(),
                                                                           sthis->id(),
                                                                           std::move(commits));
                // If we're the latest thread, inform client that the search is finished
                if ((*flag)-- == 1 /* decrement return the old value */) {
                    emitSignal<libjami::ConversationSignal::MessagesFound>(
                        req,
                        acc->getAccountID(),
                        std::string {},
                        std::vector<std::map<std::string, std::string>> {});
                }
            }
        });
    }
    
    void
    Conversation::hostConference(Json::Value&& message, OnDoneCb&& cb)
    {
        if (!message.isMember("confId")) {
            JAMI_ERR() << "Malformed commit";
            return;
        }
    
        auto now = std::chrono::system_clock::now();
        auto nowSecs = std::chrono::duration_cast<std::chrono::seconds>(now.time_since_epoch()).count();
        {
            std::lock_guard lk(pimpl_->activeCallsMtx_);
            pimpl_->hostedCalls_[message["confId"].asString()] = nowSecs;
            pimpl_->saveHostedCalls();
        }
    
        sendMessage(std::move(message), "", {}, std::move(cb));
    }
    
    bool
    Conversation::isHosting(const std::string& confId) const
    {
        auto shared = pimpl_->account_.lock();
        if (!shared)
            return false;
        auto info = infos();
        if (info["rdvDevice"] == shared->currentDeviceId() && info["rdvHost"] == shared->getUsername())
            return true; // We are the current device Host
        std::lock_guard lk(pimpl_->activeCallsMtx_);
        return pimpl_->hostedCalls_.find(confId) != pimpl_->hostedCalls_.end();
    }
    
    void
    Conversation::removeActiveConference(Json::Value&& message, OnDoneCb&& cb)
    {
        if (!message.isMember("confId")) {
            JAMI_ERR() << "Malformed commit";
            return;
        }
    
        auto erased = false;
        {
            std::lock_guard lk(pimpl_->activeCallsMtx_);
            erased = pimpl_->hostedCalls_.erase(message["confId"].asString());
        }
        if (erased) {
            pimpl_->saveHostedCalls();
            sendMessage(std::move(message), "", {}, std::move(cb));
        } else
            cb(false, "");
    }
    
    std::vector<std::map<std::string, std::string>>
    Conversation::currentCalls() const
    {
        std::lock_guard lk(pimpl_->activeCallsMtx_);
        return pimpl_->activeCalls_;
    }
    } // namespace jami