Project 'savoirfairelinux/ring-daemon' was moved to 'savoirfairelinux/jami-daemon'. Please update any links and bookmarks that may still have the old path.
Select Git revision
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
conversation_module.cpp 129.70 KiB
/*
* Copyright (C) 2004-2025 Savoir-faire Linux Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
#include "conversation_module.h"
#include <algorithm>
#include <fstream>
#include <opendht/thread_pool.h>
#include "account_const.h"
#include "call.h"
#include "client/ring_signal.h"
#include "fileutils.h"
#include "jamidht/account_manager.h"
#include "jamidht/jamiaccount.h"
#include "manager.h"
#include "sip/sipcall.h"
#include "vcard.h"
namespace jami {
using ConvInfoMap = std::map<std::string, ConvInfo>;
struct PendingConversationFetch
{
bool ready {false};
bool cloning {false};
std::string deviceId {};
std::string removeId {};
std::map<std::string, std::string> preferences {};
std::map<std::string, std::map<std::string, std::string>> status {};
std::set<std::string> connectingTo {};
std::shared_ptr<dhtnet::ChannelSocket> socket {};
};
constexpr std::chrono::seconds MAX_FALLBACK {12 * 3600s};
struct SyncedConversation
{
std::mutex mtx;
std::unique_ptr<asio::steady_timer> fallbackClone;
std::chrono::seconds fallbackTimer {5s};
ConvInfo info;
std::unique_ptr<PendingConversationFetch> pending;
std::shared_ptr<Conversation> conversation;
SyncedConversation(const std::string& convId)
: info {convId}
{
fallbackClone = std::make_unique<asio::steady_timer>(*Manager::instance().ioContext());
}
SyncedConversation(const ConvInfo& info)
: info {info}
{
fallbackClone = std::make_unique<asio::steady_timer>(*Manager::instance().ioContext());
}
bool startFetch(const std::string& deviceId, bool checkIfConv = false)
{
// conversation mtx must be locked
if (checkIfConv && conversation)
return false; // Already a conversation
if (pending) {
if (pending->ready)
return false; // Already doing stuff
// if (pending->deviceId == deviceId)
// return false; // Already fetching
if (pending->connectingTo.find(deviceId) != pending->connectingTo.end())
return false; // Already connecting to this device
} else {
pending = std::make_unique<PendingConversationFetch>();
pending->connectingTo.insert(deviceId);
return true;
}
return true;
}
void stopFetch(const std::string& deviceId)
{
// conversation mtx must be locked
if (!pending)
return;
pending->connectingTo.erase(deviceId);
if (pending->connectingTo.empty())
pending.reset();
}
std::vector<std::map<std::string, std::string>> getMembers(bool includeLeft,
bool includeBanned) const
{
// conversation mtx must be locked
if (conversation)
return conversation->getMembers(true, includeLeft, includeBanned);
// If we're cloning, we can return the initial members
std::vector<std::map<std::string, std::string>> result;
result.reserve(info.members.size());
for (const auto& uri : info.members) {
result.emplace_back(std::map<std::string, std::string> {{"uri", uri}});
}
return result;
}
};
class ConversationModule::Impl : public std::enable_shared_from_this<Impl>
{
public:
Impl(std::shared_ptr<JamiAccount>&& account,
std::shared_ptr<AccountManager>&& accountManager,
NeedsSyncingCb&& needsSyncingCb,
SengMsgCb&& sendMsgCb,
NeedSocketCb&& onNeedSocket,
NeedSocketCb&& onNeedSwarmSocket,
OneToOneRecvCb&& oneToOneRecvCb);
template<typename S, typename T>
inline auto withConv(const S& convId, T&& cb) const
{
if (auto conv = getConversation(convId)) {
std::lock_guard lk(conv->mtx);
return cb(*conv);
} else {
JAMI_WARNING("Conversation {} not found", convId);
}
return decltype(cb(std::declval<SyncedConversation&>()))();
}
template<typename S, typename T>
inline auto withConversation(const S& convId, T&& cb)
{
if (auto conv = getConversation(convId)) {
std::lock_guard lk(conv->mtx);
if (conv->conversation)
return cb(*conv->conversation);
} else {
JAMI_WARNING("Conversation {} not found", convId);
}
return decltype(cb(std::declval<Conversation&>()))();
}
// Retrieving recent commits
/**
* Clone a conversation (initial) from device
* @param deviceId
* @param convId
*/
void cloneConversation(const std::string& deviceId,
const std::string& peer,
const std::string& convId);
void cloneConversation(const std::string& deviceId,
const std::string& peer,
const std::shared_ptr<SyncedConversation>& conv);
/**
* Pull remote device
* @param peer Contact URI
* @param deviceId Contact's device
* @param conversationId
* @param commitId (optional)
*/
void fetchNewCommits(const std::string& peer,
const std::string& deviceId,
const std::string& conversationId,
const std::string& commitId = "");
/**
* Handle events to receive new commits
*/
void handlePendingConversation(const std::string& conversationId, const std::string& deviceId);
// Requests
std::optional<ConversationRequest> getRequest(const std::string& id) const;
// Conversations
/**
* Get members
* @param conversationId
* @param includeBanned
* @return a map of members with their role and details
*/
std::vector<std::map<std::string, std::string>> getConversationMembers(
const std::string& conversationId, bool includeBanned = false) const;
void setConversationMembers(const std::string& convId, const std::set<std::string>& members);
/**
* Remove a repository and all files
* @param convId
* @param sync If we send an update to other account's devices
* @param force True if ignore the removing flag
*/
void removeRepository(const std::string& convId, bool sync, bool force = false);
void removeRepositoryImpl(SyncedConversation& conv, bool sync, bool force = false);
/**
* Remove a conversation
* @param conversationId
*/
bool removeConversation(const std::string& conversationId);
bool removeConversationImpl(SyncedConversation& conv);
/**
* Send a message notification to all members
* @param conversation
* @param commit
* @param sync If we send an update to other account's devices
* @param deviceId If we need to filter a specific device
*/
void sendMessageNotification(const std::string& conversationId,
bool sync,
const std::string& commitId = "",
const std::string& deviceId = "");
void sendMessageNotification(Conversation& conversation,
bool sync,
const std::string& commitId = "",
const std::string& deviceId = "");
/**
* @return if a convId is a valid conversation (repository cloned & usable)
*/
bool isConversation(const std::string& convId) const
{
std::lock_guard lk(conversationsMtx_);
auto c = conversations_.find(convId);
return c != conversations_.end() && c->second;
}
void addConvInfo(const ConvInfo& info)
{
std::lock_guard lk(convInfosMtx_);
convInfos_[info.id] = info;
saveConvInfos();
}
std::string getOneToOneConversation(const std::string& uri) const noexcept;
bool updateConvForContact(const std::string& uri,
const std::string& oldConv,
const std::string& newConv);
std::shared_ptr<SyncedConversation> getConversation(std::string_view convId) const
{
std::lock_guard lk(conversationsMtx_);
auto c = conversations_.find(convId);
return c != conversations_.end() ? c->second : nullptr;
}
std::shared_ptr<SyncedConversation> getConversation(std::string_view convId)
{
std::lock_guard lk(conversationsMtx_);
auto c = conversations_.find(convId);
return c != conversations_.end() ? c->second : nullptr;
}
std::shared_ptr<SyncedConversation> startConversation(const std::string& convId)
{
std::lock_guard lk(conversationsMtx_);
auto& c = conversations_[convId];
if (!c)
c = std::make_shared<SyncedConversation>(convId);
return c;
}
std::shared_ptr<SyncedConversation> startConversation(const ConvInfo& info)
{
std::lock_guard lk(conversationsMtx_);
auto& c = conversations_[info.id];
if (!c)
c = std::make_shared<SyncedConversation>(info);
return c;
}
std::vector<std::shared_ptr<SyncedConversation>> getSyncedConversations() const
{
std::lock_guard lk(conversationsMtx_);
std::vector<std::shared_ptr<SyncedConversation>> result;
result.reserve(conversations_.size());
for (const auto& [_, c] : conversations_)
result.emplace_back(c);
return result;
}
std::vector<std::shared_ptr<Conversation>> getConversations() const
{
std::lock_guard lk(conversationsMtx_);
std::vector<std::shared_ptr<Conversation>> result;
result.reserve(conversations_.size());
for (const auto& [_, sc] : conversations_) {
if (auto c = sc->conversation)
result.emplace_back(std::move(c));
}
return result;
}
// Message send/load
void sendMessage(const std::string& conversationId,
Json::Value&& value,
const std::string& replyTo = "",
bool announce = true,
OnCommitCb&& onCommit = {},
OnDoneCb&& cb = {});
void sendMessage(const std::string& conversationId,
std::string message,
const std::string& replyTo = "",
const std::string& type = "text/plain",
bool announce = true,
OnCommitCb&& onCommit = {},
OnDoneCb&& cb = {});
void editMessage(const std::string& conversationId,
const std::string& newBody,
const std::string& editedId);
void bootstrapCb(std::string convId);
// The following methods modify what is stored on the disk
/**
* @note convInfosMtx_ should be locked
*/
void saveConvInfos() const { ConversationModule::saveConvInfos(accountId_, convInfos_); }
/**
* @note conversationsRequestsMtx_ should be locked
*/
void saveConvRequests() const
{
ConversationModule::saveConvRequests(accountId_, conversationsRequests_);
}
void declineOtherConversationWith(const std::string& uri) noexcept;
bool addConversationRequest(const std::string& id, const ConversationRequest& req)
{
// conversationsRequestsMtx_ MUST BE LOCKED
if (isConversation(id))
return false;
auto it = conversationsRequests_.find(id);
if (it != conversationsRequests_.end()) {
// We only remove requests (if accepted) or change .declined
if (!req.declined)
return false;
} else if (req.isOneToOne()) {
// Check that we're not adding a second one to one trust request
// NOTE: If a new one to one request is received, we can decline the previous one.
declineOtherConversationWith(req.from);
}
JAMI_DEBUG("Adding conversation request from {} ({})", req.from, id);
conversationsRequests_[id] = req;
saveConvRequests();
return true;
}
void rmConversationRequest(const std::string& id)
{
// conversationsRequestsMtx_ MUST BE LOCKED
auto it = conversationsRequests_.find(id);
if (it != conversationsRequests_.end()) {
auto& md = syncingMetadatas_[id];
md = it->second.metadatas;
md["syncing"] = "true";
md["created"] = std::to_string(it->second.received);
}
saveMetadata();
conversationsRequests_.erase(id);
saveConvRequests();
}
std::weak_ptr<JamiAccount> account_;
std::shared_ptr<AccountManager> accountManager_;
const std::string accountId_ {};
NeedsSyncingCb needsSyncingCb_;
SengMsgCb sendMsgCb_;
NeedSocketCb onNeedSocket_;
NeedSocketCb onNeedSwarmSocket_;
OneToOneRecvCb oneToOneRecvCb_;
std::string deviceId_ {};
std::string username_ {};
// Requests
mutable std::mutex conversationsRequestsMtx_;
std::map<std::string, ConversationRequest> conversationsRequests_;
// Conversations
mutable std::mutex conversationsMtx_ {};
std::map<std::string, std::shared_ptr<SyncedConversation>, std::less<>> conversations_;
// The following information are stored on the disk
mutable std::mutex convInfosMtx_; // Note, should be locked after conversationsMtx_ if needed
std::map<std::string, ConvInfo> convInfos_;
// When sending a new message, we need to send the notification to some peers of the
// conversation However, the conversation may be not bootstraped, so the list will be empty.
// notSyncedNotification_ will store the notifiaction to announce until we have peers to sync
// with.
std::mutex notSyncedNotificationMtx_;
std::map<std::string, std::string> notSyncedNotification_;
std::weak_ptr<Impl> weak() { return std::static_pointer_cast<Impl>(shared_from_this()); }
// Replay conversations (after erasing/re-adding)
std::mutex replayMtx_;
std::map<std::string, std::vector<std::map<std::string, std::string>>> replay_;
std::map<std::string, uint64_t> refreshMessage;
std::atomic_int syncCnt {0};
#ifdef LIBJAMI_TESTABLE
std::function<void(std::string, Conversation::BootstrapStatus)> bootstrapCbTest_;
#endif
void fixStructures(
std::shared_ptr<JamiAccount> account,
const std::vector<std::tuple<std::string, std::string, std::string>>& updateContactConv,
const std::set<std::string>& toRm);
void cloneConversationFrom(const std::shared_ptr<SyncedConversation> conv,
const std::string& deviceId,
const std::string& oldConvId = "");
void bootstrap(const std::string& convId);
void fallbackClone(const asio::error_code& ec, const std::string& conversationId);
void cloneConversationFrom(const std::string& conversationId,
const std::string& uri,
const std::string& oldConvId = "");
// While syncing, we do not want to lose metadata (avatar/title and mode)
std::map<std::string, std::map<std::string, std::string>> syncingMetadatas_;
void saveMetadata()
{
auto path = fileutils::get_data_dir() / accountId_;
std::ofstream file(path / "syncingMetadatas", std::ios::trunc | std::ios::binary);
msgpack::pack(file, syncingMetadatas_);
}
void loadMetadata()
{
try {
// read file
auto path = fileutils::get_data_dir() / accountId_;
std::lock_guard lock(dhtnet::fileutils::getFileLock(path / "syncingMetadatas"));
auto file = fileutils::loadFile("syncingMetadatas", path);
// load values
msgpack::unpacked result;
msgpack::unpack(result, (const char*) file.data(), file.size(), 0);
result.get().convert(syncingMetadatas_);
} catch (const std::exception& e) {
JAMI_WARNING("[Account {}] [ConversationModule] unable to load syncing metadata: {}", accountId_, e.what());
}
}
};
ConversationModule::Impl::Impl(std::shared_ptr<JamiAccount>&& account,
std::shared_ptr<AccountManager>&& accountManager,
NeedsSyncingCb&& needsSyncingCb,
SengMsgCb&& sendMsgCb,
NeedSocketCb&& onNeedSocket,
NeedSocketCb&& onNeedSwarmSocket,
OneToOneRecvCb&& oneToOneRecvCb)
: account_(account)
, accountManager_(accountManager)
, accountId_(account->getAccountID())
, needsSyncingCb_(needsSyncingCb)
, sendMsgCb_(sendMsgCb)
, onNeedSocket_(onNeedSocket)
, onNeedSwarmSocket_(onNeedSwarmSocket)
, oneToOneRecvCb_(oneToOneRecvCb)
{
if (auto accm = account->accountManager())
if (const auto* info = accm->getInfo()) {
deviceId_ = info->deviceId;
username_ = info->accountId;
}
conversationsRequests_ = convRequests(accountId_);
loadMetadata();
}
void
ConversationModule::Impl::cloneConversation(const std::string& deviceId,
const std::string& peerUri,
const std::string& convId)
{
JAMI_DEBUG("[Account {}] Clone conversation on device {}", accountId_, deviceId);
auto conv = startConversation(convId);
std::unique_lock lk(conv->mtx);
cloneConversation(deviceId, peerUri, conv);
}
void
ConversationModule::Impl::cloneConversation(const std::string& deviceId,
const std::string& peerUri,
const std::shared_ptr<SyncedConversation>& conv)
{
// conv->mtx must be locked
if (!conv->conversation) {
// Note: here we don't return and connect to all members
// the first that will successfully connect will be used for
// cloning.
// This avoid the case when we try to clone from convInfos + sync message
// at the same time.
if (!conv->startFetch(deviceId, true)) {
JAMI_WARNING("[Account {}] Already fetching {}", accountId_, conv->info.id);
addConvInfo(conv->info);
return;
}
onNeedSocket_(
conv->info.id,
deviceId,
[w = weak(), conv, deviceId](const auto& channel) {
std::lock_guard lk(conv->mtx);
if (conv->pending && !conv->pending->ready) {
if (channel) {
conv->pending->ready = true;
conv->pending->deviceId = channel->deviceId().toString();
conv->pending->socket = channel;
if (!conv->pending->cloning) {
conv->pending->cloning = true;
dht::ThreadPool::io().run([w,
convId = conv->info.id,
deviceId = conv->pending->deviceId]() {
if (auto sthis = w.lock())
sthis->handlePendingConversation(convId, deviceId);
});
}
return true;
} else {
conv->stopFetch(deviceId);
}
}
return false;
},
MIME_TYPE_GIT);
JAMI_LOG("[Account {}] New conversation detected: {}. Ask device {} to clone it",
accountId_,
conv->info.id,
deviceId);
conv->info.members.emplace(username_);
conv->info.members.emplace(peerUri);
addConvInfo(conv->info);
} else {
JAMI_DEBUG("[Account {}] Already have conversation {}", accountId_, conv->info.id);
}
}
void
ConversationModule::Impl::fetchNewCommits(const std::string& peer,
const std::string& deviceId,
const std::string& conversationId,
const std::string& commitId)
{
{
std::lock_guard lk(convInfosMtx_);
auto itConv = convInfos_.find(conversationId);
if (itConv != convInfos_.end() && itConv->second.isRemoved()) {
// If the conversation is removed and we receives a new commit,
// it means that the contact was removed but not banned.
// If he wants a new conversation, they must removes/re-add the contact who declined.
JAMI_WARNING("[Account {:s}] Received a commit for {}, but conversation is removed",
accountId_,
conversationId);
return;
}
}
std::optional<ConversationRequest> oldReq;
{
std::lock_guard lk(conversationsRequestsMtx_);
oldReq = getRequest(conversationId);
if (oldReq != std::nullopt && oldReq->declined) {
JAMI_DEBUG("[Account {}] Received a request for a conversation already declined.",
accountId_);
return;
}
}
JAMI_DEBUG("[Account {:s}] fetch commits from {:s}, for {:s}, commit {:s}",
accountId_,
peer,
conversationId,
commitId);
auto conv = getConversation(conversationId);
if (!conv) {
if (oldReq == std::nullopt) {
// We didn't find a conversation or a request with the given ID.
// This suggests that someone tried to send us an invitation but
// that we didn't receive it, so we ask for a new one.
JAMI_WARNING("[Account {}] Unable to find conversation {}, ask for an invite",
accountId_,
conversationId);
sendMsgCb_(peer,
{},
std::map<std::string, std::string> {{MIME_TYPE_INVITE, conversationId}},
0);
}
return;
}
std::unique_lock lk(conv->mtx);
if (conv->conversation) {
// Check if we already have the commit
if (not commitId.empty() && conv->conversation->getCommit(commitId) != std::nullopt) {
return;
}
if (conv->conversation->isRemoving()) {
JAMI_WARNING("[Account {}] Conversation {} is being removed",
accountId_,
conversationId);
return;
}
if (!conv->conversation->isMember(peer, true)) {
JAMI_WARNING("[Account {}] {} is not a member of {}", accountId_, peer, conversationId);
return;
}
if (conv->conversation->isBanned(deviceId)) {
JAMI_WARNING("[Account {}] {} is a banned device in conversation {}",
accountId_,
deviceId,
conversationId);
return;
}
// Retrieve current last message
auto lastMessageId = conv->conversation->lastCommitId();
if (lastMessageId.empty()) {
JAMI_ERROR("[Account {}] No message detected. This is a bug", accountId_);
return;
}
if (!conv->startFetch(deviceId)) {
JAMI_WARNING("[Account {}] Already fetching {}", accountId_, conversationId);
return;
}
syncCnt.fetch_add(1);
onNeedSocket_(
conversationId,
deviceId,
[w = weak(),
conv,
conversationId = std::move(conversationId),
peer = std::move(peer),
deviceId = std::move(deviceId),
commitId = std::move(commitId)](const auto& channel) {
auto sthis = w.lock();
auto acc = sthis ? sthis->account_.lock() : nullptr;
std::unique_lock lk(conv->mtx);
auto conversation = conv->conversation;
if (!channel || !acc || !conversation) {
conv->stopFetch(deviceId);
if (sthis)
sthis->syncCnt.fetch_sub(1);
return false;
}
conversation->addGitSocket(channel->deviceId(), channel);
lk.unlock();
conversation->sync(
peer,
deviceId,
[w,
conv,
conversationId = std::move(conversationId),
peer = std::move(peer),
deviceId = std::move(deviceId),
commitId = std::move(commitId)](bool ok) {
auto shared = w.lock();
if (!shared)
return;
if (!ok) {
JAMI_WARNING("[Account {}] Unable to fetch new commit from "
"{} for {}, other "
"peer may be disconnected",
shared->accountId_,
deviceId,
conversationId);
JAMI_LOG("[Account {}] Relaunch sync with {} for {}",
shared->accountId_,
deviceId,
conversationId);
}
{
std::lock_guard lk(conv->mtx);
conv->pending.reset();
// Notify peers that a new commit is there (DRT)
if (not commitId.empty() && ok) {
shared->sendMessageNotification(*conv->conversation,
false,
commitId,
deviceId);
}
}
if (shared->syncCnt.fetch_sub(1) == 1) {
emitSignal<libjami::ConversationSignal::ConversationSyncFinished>(shared->accountId_);
}
},
commitId);
return true;
},
"");
} else {
if (oldReq != std::nullopt)
return;
if (conv->pending)
return;
bool clone = !conv->info.isRemoved();
if (clone) {
cloneConversation(deviceId, peer, conv);
return;
}
lk.unlock();
JAMI_WARNING("[Account {}] Unable to find conversation {}, ask for an invite",
accountId_,
conversationId);
sendMsgCb_(peer,
{},
std::map<std::string, std::string> {{MIME_TYPE_INVITE, conversationId}},
0);
}
}
// Clone and store conversation
void
ConversationModule::Impl::handlePendingConversation(const std::string& conversationId,
const std::string& deviceId)
{
auto acc = account_.lock();
if (!acc)
return;
std::vector<DeviceId> kd;
{
std::unique_lock lk(conversationsMtx_);
const auto& devices = accountManager_->getKnownDevices();
kd.reserve(devices.size());
for (const auto& [id, _] : devices)
kd.emplace_back(id);
}
auto conv = getConversation(conversationId);
if (!conv)
return;
std::unique_lock lk(conv->mtx, std::defer_lock);
auto erasePending = [&] {
std::string toRm;
if (conv->pending && !conv->pending->removeId.empty())
toRm = std::move(conv->pending->removeId);
conv->pending.reset();
lk.unlock();
if (!toRm.empty())
removeConversation(toRm);
};
try {
auto conversation = std::make_shared<Conversation>(acc, deviceId, conversationId);
conversation->onMembersChanged([w=weak_from_this(), conversationId](const auto& members) {
// Delay in another thread to avoid deadlocks
dht::ThreadPool::io().run([w, conversationId, members = std::move(members)] {
if (auto sthis = w.lock())
sthis->setConversationMembers(conversationId, members);
});
});
conversation->onMessageStatusChanged([this, conversationId](const auto& status) {
auto msg = std::make_shared<SyncMsg>();
msg->ms = {{conversationId, status}};
needsSyncingCb_(std::move(msg));
});
conversation->onNeedSocket(onNeedSwarmSocket_);
if (!conversation->isMember(username_, true)) {
JAMI_ERR("Conversation cloned but does not seems to be a valid member");
conversation->erase();
lk.lock();
erasePending();
return;
}
// Make sure that the list of members stored in convInfos_ matches the
// one from the conversation's repository.
// (https://git.jami.net/savoirfairelinux/jami-daemon/-/issues/1026)
setConversationMembers(conversationId, conversation->memberUris("", {}));
lk.lock();
if (conv->pending && conv->pending->socket)
conversation->addGitSocket(DeviceId(deviceId), std::move(conv->pending->socket));
auto removeRepo = false;
// Note: a removeContact while cloning. In this case, the conversation
// must not be announced and removed.
if (conv->info.isRemoved())
removeRepo = true;
std::map<std::string, std::string> preferences;
std::map<std::string, std::map<std::string, std::string>> status;
if (conv->pending) {
preferences = std::move(conv->pending->preferences);
status = std::move(conv->pending->status);
}
conv->conversation = conversation;
if (removeRepo) {
removeRepositoryImpl(*conv, false, true);
erasePending();
return;
}
auto commitId = conversation->join();
std::vector<std::map<std::string, std::string>> messages;
{
std::lock_guard lk(replayMtx_);
auto replayIt = replay_.find(conversationId);
if (replayIt != replay_.end()) {
messages = std::move(replayIt->second);
replay_.erase(replayIt);
}
}
if (!commitId.empty())
sendMessageNotification(*conversation, false, commitId);
erasePending(); // Will unlock
#ifdef LIBJAMI_TESTABLE
conversation->onBootstrapStatus(bootstrapCbTest_);
#endif // LIBJAMI_TESTABLE
conversation->bootstrap(std::bind(&ConversationModule::Impl::bootstrapCb,
this,
conversation->id()),
kd);
if (!preferences.empty())
conversation->updatePreferences(preferences);
if (!status.empty())
conversation->updateMessageStatus(status);
syncingMetadatas_.erase(conversationId);
saveMetadata();
// Inform user that the conversation is ready
emitSignal<libjami::ConversationSignal::ConversationReady>(accountId_, conversationId);
needsSyncingCb_({});
std::vector<Json::Value> values;
values.reserve(messages.size());
for (const auto& message : messages) {
// For now, only replay text messages.
// File transfers will need more logic, and don't care about calls for now.
if (message.at("type") == "text/plain" && message.at("author") == username_) {
Json::Value json;
json["body"] = message.at("body");
json["type"] = "text/plain";
values.emplace_back(std::move(json));
}
}
if (!values.empty())
conversation->sendMessages(std::move(values),
[w = weak(), conversationId](const auto& commits) {
auto shared = w.lock();
if (shared and not commits.empty())
shared->sendMessageNotification(conversationId,
true,
*commits.rbegin());
});
// Download members profile on first sync
auto isOneOne = conversation->mode() == ConversationMode::ONE_TO_ONE;
auto askForProfile = isOneOne;
if (!isOneOne) {
// If not 1:1 only download profiles from self (to avoid non checked files)
auto cert = acc->certStore().getCertificate(deviceId);
askForProfile = cert && cert->issuer
&& cert->issuer->getId().toString() == username_;
}
if (askForProfile) {
for (const auto& member : conversation->memberUris(username_)) {
acc->askForProfile(conversationId, deviceId, member);
}
}
} catch (const std::exception& e) {
JAMI_WARNING("Something went wrong when cloning conversation: {}. Re-clone in {}s", e.what(), conv->fallbackTimer.count());
conv->fallbackClone->expires_at(std::chrono::steady_clock::now() + conv->fallbackTimer);
conv->fallbackTimer *= 2;
if (conv->fallbackTimer > MAX_FALLBACK)
conv->fallbackTimer = MAX_FALLBACK;
conv->fallbackClone->async_wait(
std::bind(&ConversationModule::Impl::fallbackClone,
shared_from_this(),
std::placeholders::_1,
conversationId));
}
lk.lock();
erasePending();
}
std::optional<ConversationRequest>
ConversationModule::Impl::getRequest(const std::string& id) const
{
// ConversationsRequestsMtx MUST BE LOCKED
auto it = conversationsRequests_.find(id);
if (it != conversationsRequests_.end())
return it->second;
return std::nullopt;
}
std::string
ConversationModule::Impl::getOneToOneConversation(const std::string& uri) const noexcept
{
auto details = accountManager_->getContactDetails(uri);
auto itRemoved = details.find("removed");
// If contact is removed there is no conversation
if (itRemoved != details.end() && itRemoved->second != "0") {
auto itBanned = details.find("banned");
// If banned, conversation is still on disk
if (itBanned == details.end() || itBanned->second == "0") {
// Check if contact is removed
auto itAdded = details.find("added");
if (std::stoi(itRemoved->second) > std::stoi(itAdded->second))
return {};
}
}
auto it = details.find(libjami::Account::TrustRequest::CONVERSATIONID);
if (it != details.end())
return it->second;
return {};
}
bool
ConversationModule::Impl::updateConvForContact(const std::string& uri,
const std::string& oldConv,
const std::string& newConv)
{
if (newConv != oldConv) {
auto conversation = getOneToOneConversation(uri);
if (conversation != oldConv) {
JAMI_DEBUG("Old conversation is not found in details {} - found: {}",
oldConv,
conversation);
return false;
}
accountManager_->updateContactConversation(uri, newConv);
return true;
}
return false;
}
void
ConversationModule::Impl::declineOtherConversationWith(const std::string& uri) noexcept
{
// conversationsRequestsMtx_ MUST BE LOCKED
for (auto& [id, request] : conversationsRequests_) {
if (request.declined)
continue; // Ignore already declined requests
if (request.isOneToOne() && request.from == uri) {
JAMI_WARNING("Decline conversation request ({}) from {}", id, uri);
request.declined = std::time(nullptr);
syncingMetadatas_.erase(id);
saveMetadata();
emitSignal<libjami::ConversationSignal::ConversationRequestDeclined>(accountId_, id);
}
}
}
std::vector<std::map<std::string, std::string>>
ConversationModule::Impl::getConversationMembers(const std::string& conversationId,
bool includeBanned) const
{
return withConv(conversationId,
[&](const auto& conv) { return conv.getMembers(true, includeBanned); });
}
void
ConversationModule::Impl::removeRepository(const std::string& conversationId, bool sync, bool force)
{
auto conv = getConversation(conversationId);
if (!conv)
return;
std::unique_lock lk(conv->mtx);
removeRepositoryImpl(*conv, sync, force);
}
void
ConversationModule::Impl::removeRepositoryImpl(SyncedConversation& conv, bool sync, bool force)
{
if (conv.conversation && (force || conv.conversation->isRemoving())) {
// Stop fetch!
conv.pending.reset();
JAMI_LOG("Remove conversation: {}", conv.info.id);
try {
if (conv.conversation->mode() == ConversationMode::ONE_TO_ONE) {
for (const auto& member : conv.conversation->getInitialMembers()) {
if (member != username_) {
// Note: this can happen while re-adding a contact.
// In this case, check that we are removing the linked conversation.
if (conv.info.id == getOneToOneConversation(member)) {
accountManager_->removeContactConversation(member);
}
}
}
}
} catch (const std::exception& e) {
JAMI_ERR() << e.what();
}
conv.conversation->erase();
conv.conversation.reset();
if (!sync)
return;
conv.info.erased = std::time(nullptr);
needsSyncingCb_({});
addConvInfo(conv.info);
}
}
bool
ConversationModule::Impl::removeConversation(const std::string& conversationId)
{
return withConv(conversationId, [this](auto& conv) { return removeConversationImpl(conv); });
}
bool
ConversationModule::Impl::removeConversationImpl(SyncedConversation& conv)
{
auto members = conv.getMembers(false, false);
auto isSyncing = !conv.conversation;
auto hasMembers = !isSyncing // If syncing there is no member to inform
&& std::find_if(members.begin(),
members.end(),
[&](const auto& member) {
return member.at("uri") == username_;
})
!= members.end() // We must be still a member
&& members.size() != 1; // If there is only ourself
conv.info.removed = std::time(nullptr);
if (isSyncing)
conv.info.erased = std::time(nullptr);
// Sync now, because it can take some time to really removes the datas
needsSyncingCb_({});
addConvInfo(conv.info);
emitSignal<libjami::ConversationSignal::ConversationRemoved>(accountId_, conv.info.id);
if (isSyncing)
return true;
if (conv.conversation->mode() != ConversationMode::ONE_TO_ONE) {
// For one to one, we do not notify the leave. The other can still generate request
// and this is managed by the banned part. If we re-accept, the old conversation will be
// retrieved
auto commitId = conv.conversation->leave();
if (hasMembers) {
JAMI_LOG("Wait that someone sync that user left conversation {}", conv.info.id);
// Commit that we left
if (!commitId.empty()) {
// Do not sync as it's synched by convInfos
sendMessageNotification(*conv.conversation, false, commitId);
} else {
JAMI_ERROR("Failed to send message to conversation {}", conv.info.id);
}
// In this case, we wait that another peer sync the conversation
// to definitely remove it from the device. This is to inform the
// peer that we left the conversation and never want to receive
// any messages
return true;
}
} else {
for (const auto& m : members)
if (username_ != m.at("uri"))
updateConvForContact(m.at("uri"), conv.info.id, "");
}
// Else we are the last member, so we can remove
removeRepositoryImpl(conv, true);
return true;
}
void
ConversationModule::Impl::sendMessageNotification(const std::string& conversationId,
bool sync,
const std::string& commitId,
const std::string& deviceId)
{
if (auto conv = getConversation(conversationId)) {
std::lock_guard lk(conv->mtx);
if (conv->conversation)
sendMessageNotification(*conv->conversation, sync, commitId, deviceId);
}
}
void
ConversationModule::Impl::sendMessageNotification(Conversation& conversation,
bool sync,
const std::string& commitId,
const std::string& deviceId)
{
auto acc = account_.lock();
if (!acc)
return;
Json::Value message;
auto commit = commitId == "" ? conversation.lastCommitId() : commitId;
message["id"] = conversation.id();
message["commit"] = commit;
message["deviceId"] = deviceId_;
Json::StreamWriterBuilder builder;
const auto text = Json::writeString(builder, message);
// Send message notification will announce the new commit in 3 steps.
// First, because our account can have several devices, announce to other devices
if (sync) {
// Announce to our devices
refreshMessage[username_] = sendMsgCb_(username_,
{},
std::map<std::string, std::string> {
{MIME_TYPE_GIT, text}},
refreshMessage[username_]);
}
// Then, we announce to 2 random members in the conversation that aren't in the DRT
// This allow new devices without the ability to sync to their other devices to sync with us.
// Or they can also use an old backup.
std::vector<std::string> nonConnectedMembers;
std::vector<NodeId> devices;
{
std::lock_guard lk(notSyncedNotificationMtx_);
devices = conversation.peersToSyncWith();
auto members = conversation.memberUris(username_, {MemberRole::BANNED});
std::vector<std::string> connectedMembers;
// print all members
for (const auto& device : devices) {
auto cert = acc->certStore().getCertificate(device.toString());
if (cert && cert->issuer)
connectedMembers.emplace_back(cert->issuer->getId().toString());
}
std::sort(std::begin(connectedMembers), std::end(connectedMembers));
std::set_difference(members.begin(),
members.end(),
connectedMembers.begin(),
connectedMembers.end(),
std::inserter(nonConnectedMembers, nonConnectedMembers.begin()));
std::shuffle(nonConnectedMembers.begin(), nonConnectedMembers.end(), acc->rand);
if (nonConnectedMembers.size() > 2)
nonConnectedMembers.resize(2);
if (!conversation.isBootstraped()) {
JAMI_DEBUG("[Conversation {}] Not yet bootstraped, save notification",
conversation.id());
// Because we can get some git channels but not bootstraped, we should keep this
// to refresh when bootstraped.
notSyncedNotification_[conversation.id()] = commit;
}
}
for (const auto& member : nonConnectedMembers) {
refreshMessage[member] = sendMsgCb_(member,
{},
std::map<std::string, std::string> {
{MIME_TYPE_GIT, text}},
refreshMessage[member]);
}
// Finally we send to devices that the DRT choose.
for (const auto& device : devices) {
auto deviceIdStr = device.toString();
auto memberUri = conversation.uriFromDevice(deviceIdStr);
if (memberUri.empty() || deviceIdStr == deviceId)
continue;
refreshMessage[deviceIdStr] = sendMsgCb_(memberUri,
device,
std::map<std::string, std::string> {
{MIME_TYPE_GIT, text}},
refreshMessage[deviceIdStr]);
}
}
void
ConversationModule::Impl::sendMessage(const std::string& conversationId,
std::string message,
const std::string& replyTo,
const std::string& type,
bool announce,
OnCommitCb&& onCommit,
OnDoneCb&& cb)
{
Json::Value json;
json["body"] = std::move(message);
json["type"] = type;
sendMessage(conversationId,
std::move(json),
replyTo,
announce,
std::move(onCommit),
std::move(cb));
}
void
ConversationModule::Impl::sendMessage(const std::string& conversationId,
Json::Value&& value,
const std::string& replyTo,
bool announce,
OnCommitCb&& onCommit,
OnDoneCb&& cb)
{
if (auto conv = getConversation(conversationId)) {
std::lock_guard lk(conv->mtx);
if (conv->conversation)
conv->conversation
->sendMessage(std::move(value),
replyTo,
std::move(onCommit),
[this,
conversationId,
announce,
cb = std::move(cb)](bool ok, const std::string& commitId) {
if (cb)
cb(ok, commitId);
if (!announce)
return;
if (ok)
sendMessageNotification(conversationId, true, commitId);
else
JAMI_ERR("Failed to send message to conversation %s",
conversationId.c_str());
});
}
}
void
ConversationModule::Impl::editMessage(const std::string& conversationId,
const std::string& newBody,
const std::string& editedId)
{
// Check that editedId is a valid commit, from ourself and plain/text
auto validCommit = false;
std::string type, tid;
if (auto conv = getConversation(conversationId)) {
std::lock_guard lk(conv->mtx);
if (conv->conversation) {
auto commit = conv->conversation->getCommit(editedId);
if (commit != std::nullopt) {
type = commit->at("type");
if (type == "application/data-transfer+json")
tid = commit->at("tid");
validCommit = commit->at("author") == username_
&& (type == "text/plain" || type == "application/data-transfer+json");
}
}
}
if (!validCommit) {
JAMI_ERROR("Unable to edit commit {:s}", editedId);
return;
}
// Commit message edition
Json::Value json;
if (type == "application/data-transfer+json") {
json["tid"] = "";
// Remove file!
auto path = fileutils::get_data_dir() / accountId_
/ "conversation_data" / conversationId
/ fmt::format("{}_{}", editedId, tid);
dhtnet::fileutils::remove(path, true);
} else {
json["body"] = newBody;
}
json["edit"] = editedId;
json["type"] = type;
sendMessage(conversationId, std::move(json));
}
void
ConversationModule::Impl::bootstrapCb(std::string convId)
{
std::string commitId;
{
std::lock_guard lk(notSyncedNotificationMtx_);
auto it = notSyncedNotification_.find(convId);
if (it != notSyncedNotification_.end()) {
commitId = it->second;
notSyncedNotification_.erase(it);
}
}
JAMI_DEBUG("[Conversation {}] Resend last message notification", convId);
dht::ThreadPool::io().run([w = weak(), convId, commitId = std::move(commitId)] {
if (auto sthis = w.lock())
sthis->sendMessageNotification(convId, true, commitId);
});
}
void
ConversationModule::Impl::fixStructures(
std::shared_ptr<JamiAccount> acc,
const std::vector<std::tuple<std::string, std::string, std::string>>& updateContactConv,
const std::set<std::string>& toRm)
{
for (const auto& [uri, oldConv, newConv] : updateContactConv) {
updateConvForContact(uri, oldConv, newConv);
}
////////////////////////////////////////////////////////////////
// Note: This is only to homogenize trust and convRequests
std::vector<std::string> invalidPendingRequests;
{
auto requests = acc->getTrustRequests();
std::lock_guard lk(conversationsRequestsMtx_);
for (const auto& request : requests) {
auto itConvId = request.find(libjami::Account::TrustRequest::CONVERSATIONID);
auto itConvFrom = request.find(libjami::Account::TrustRequest::FROM);
if (itConvId != request.end() && itConvFrom != request.end()) {
// Check if requests exists or is declined.
auto itReq = conversationsRequests_.find(itConvId->second);
auto declined = itReq == conversationsRequests_.end() || itReq->second.declined;
if (declined) {
JAMI_WARNING("Invalid trust request found: {:s}", itConvId->second);
invalidPendingRequests.emplace_back(itConvFrom->second);
}
}
}
auto requestRemoved = false;
for (auto it = conversationsRequests_.begin(); it != conversationsRequests_.end();) {
if (it->second.from == username_) {
JAMI_WARNING("Detected request from ourself, this makes no sense. Remove {}",
it->first);
it = conversationsRequests_.erase(it);
} else {
++it;
}
}
if (requestRemoved) {
saveConvRequests();
}
}
for (const auto& invalidPendingRequest : invalidPendingRequests)
acc->discardTrustRequest(invalidPendingRequest);
////////////////////////////////////////////////////////////////
for (const auto& conv : toRm) {
JAMI_ERROR("Remove conversation ({})", conv);
removeConversation(conv);
}
JAMI_DEBUG("[Account {}] Conversations loaded!", accountId_);
}
void
ConversationModule::Impl::cloneConversationFrom(const std::shared_ptr<SyncedConversation> conv,
const std::string& deviceId,
const std::string& oldConvId)
{
std::lock_guard lk(conv->mtx);
const auto& conversationId = conv->info.id;
if (!conv->startFetch(deviceId, true)) {
JAMI_WARNING("[Account {}] Already fetching {}", accountId_, conversationId);
return;
}
onNeedSocket_(
conversationId,
deviceId,
[wthis=weak_from_this(), conv, conversationId, oldConvId, deviceId](const auto& channel) {
std::lock_guard lk(conv->mtx);
if (conv->pending && !conv->pending->ready) {
conv->pending->removeId = oldConvId;
if (channel) {
conv->pending->ready = true;
conv->pending->deviceId = channel->deviceId().toString();
conv->pending->socket = channel;
if (!conv->pending->cloning) {
conv->pending->cloning = true;
dht::ThreadPool::io().run([wthis,
conversationId,
deviceId = conv->pending->deviceId]() {
if (auto sthis = wthis.lock())
sthis->handlePendingConversation(conversationId, deviceId);
});
}
return true;
} else if (auto sthis = wthis.lock()) {
conv->stopFetch(deviceId);
JAMI_WARNING("Clone failed. Re-clone in {}s", conv->fallbackTimer.count());
conv->fallbackClone->expires_at(std::chrono::steady_clock::now() + conv->fallbackTimer);
conv->fallbackTimer *= 2;
if (conv->fallbackTimer > MAX_FALLBACK)
conv->fallbackTimer = MAX_FALLBACK;
conv->fallbackClone->async_wait(
std::bind(&ConversationModule::Impl::fallbackClone,
sthis,
std::placeholders::_1,
conversationId));
}
}
return false;
},
MIME_TYPE_GIT);
}
void
ConversationModule::Impl::fallbackClone(const asio::error_code& ec, const std::string& conversationId)
{
if (ec == asio::error::operation_aborted)
return;
auto conv = getConversation(conversationId);
if (!conv || conv->conversation)
return;
auto members = getConversationMembers(conversationId);
for (const auto& member : members)
if (member.at("uri") != username_)
cloneConversationFrom(conversationId, member.at("uri"));
}
void
ConversationModule::Impl::bootstrap(const std::string& convId)
{
std::vector<DeviceId> kd;
{
std::unique_lock lk(conversationsMtx_);
const auto& devices = accountManager_->getKnownDevices();
kd.reserve(devices.size());
for (const auto& [id, _] : devices)
kd.emplace_back(id);
}
auto bootstrap = [&](auto& conv) {
if (conv) {
#ifdef LIBJAMI_TESTABLE
conv->onBootstrapStatus(bootstrapCbTest_);
#endif // LIBJAMI_TESTABLE
conv->bootstrap(std::bind(&ConversationModule::Impl::bootstrapCb, this, conv->id()), kd);
}
};
std::vector<std::string> toClone;
if (convId.empty()) {
std::lock_guard lk(convInfosMtx_);
for (const auto& [conversationId, convInfo] : convInfos_) {
auto conv = getConversation(conversationId);
if (!conv)
return;
if ((!conv->conversation && !conv->info.isRemoved())) {
// Because we're not tracking contact presence in order to sync now,
// we need to ask to clone requests when bootstraping all conversations
// else it can stay syncing
toClone.emplace_back(conversationId);
} else if (conv->conversation) {
bootstrap(conv->conversation);
}
}
} else if (auto conv = getConversation(convId)) {
std::lock_guard lk(conv->mtx);
if (conv->conversation)
bootstrap(conv->conversation);
}
for (const auto& cid : toClone) {
auto members = getConversationMembers(cid);
for (const auto& member : members) {
if (member.at("uri") != username_)
cloneConversationFrom(cid, member.at("uri"));
}
}
}
void
ConversationModule::Impl::cloneConversationFrom(const std::string& conversationId,
const std::string& uri,
const std::string& oldConvId)
{
auto memberHash = dht::InfoHash(uri);
if (!memberHash) {
JAMI_WARNING("Invalid member detected: {}", uri);
return;
}
auto conv = startConversation(conversationId);
std::lock_guard lk(conv->mtx);
conv->info = {conversationId};
conv->info.created = std::time(nullptr);
conv->info.members.emplace(username_);
conv->info.members.emplace(uri);
accountManager_->forEachDevice(
memberHash,
[w = weak(), conv, conversationId, oldConvId](
const std::shared_ptr<dht::crypto::PublicKey>& pk) {
auto sthis = w.lock();
auto deviceId = pk->getLongId().toString();
if (!sthis or deviceId == sthis->deviceId_)
return;
sthis->cloneConversationFrom(conv, deviceId, oldConvId);
});
addConvInfo(conv->info);
}
////////////////////////////////////////////////////////////////
void
ConversationModule::saveConvRequests(
const std::string& accountId,
const std::map<std::string, ConversationRequest>& conversationsRequests)
{
auto path = fileutils::get_data_dir() / accountId;
saveConvRequestsToPath(path, conversationsRequests);
}
void
ConversationModule::saveConvRequestsToPath(
const std::filesystem::path& path,
const std::map<std::string, ConversationRequest>& conversationsRequests)
{
auto p = path / "convRequests";
std::lock_guard lock(dhtnet::fileutils::getFileLock(p));
std::ofstream file(p, std::ios::trunc | std::ios::binary);
msgpack::pack(file, conversationsRequests);
}
void
ConversationModule::saveConvInfos(const std::string& accountId, const ConvInfoMap& conversations)
{
auto path = fileutils::get_data_dir() / accountId;
saveConvInfosToPath(path, conversations);
}
void
ConversationModule::saveConvInfosToPath(const std::filesystem::path& path,
const ConvInfoMap& conversations)
{
std::ofstream file(path / "convInfo", std::ios::trunc | std::ios::binary);
msgpack::pack(file, conversations);
}
////////////////////////////////////////////////////////////////
ConversationModule::ConversationModule(std::shared_ptr<JamiAccount> account,
std::shared_ptr<AccountManager> accountManager,
NeedsSyncingCb&& needsSyncingCb,
SengMsgCb&& sendMsgCb,
NeedSocketCb&& onNeedSocket,
NeedSocketCb&& onNeedSwarmSocket,
OneToOneRecvCb&& oneToOneRecvCb,
bool autoLoadConversations)
: pimpl_ {std::make_unique<Impl>(std::move(account),
std::move(accountManager),
std::move(needsSyncingCb),
std::move(sendMsgCb),
std::move(onNeedSocket),
std::move(onNeedSwarmSocket),
std::move(oneToOneRecvCb))}
{
if (autoLoadConversations) {
loadConversations();
}
}
void
ConversationModule::setAccountManager(std::shared_ptr<AccountManager> accountManager)
{
std::unique_lock lk(pimpl_->conversationsMtx_);
pimpl_->accountManager_ = accountManager;
}
#ifdef LIBJAMI_TESTABLE
void
ConversationModule::onBootstrapStatus(
const std::function<void(std::string, Conversation::BootstrapStatus)>& cb)
{
pimpl_->bootstrapCbTest_ = cb;
for (auto& c : pimpl_->getConversations())
c->onBootstrapStatus(pimpl_->bootstrapCbTest_);
}
#endif
void
ConversationModule::loadConversations()
{
auto acc = pimpl_->account_.lock();
if (!acc)
return;
JAMI_LOG("[Account {}] Start loading conversations…", pimpl_->accountId_);
auto conversationsRepositories = dhtnet::fileutils::readDirectory(
fileutils::get_data_dir() / pimpl_->accountId_ / "conversations");
std::unique_lock lk(pimpl_->conversationsMtx_);
auto contacts = pimpl_->accountManager_->getContacts(true); // Avoid to lock configurationMtx while conv Mtx is locked
std::unique_lock ilk(pimpl_->convInfosMtx_);
pimpl_->convInfos_ = convInfos(pimpl_->accountId_);
pimpl_->conversations_.clear();
struct Ctx
{
std::mutex cvMtx;
std::condition_variable cv;
std::mutex toRmMtx;
std::set<std::string> toRm;
std::mutex convMtx;
size_t convNb;
std::vector<std::map<std::string, std::string>> contacts;
std::vector<std::tuple<std::string, std::string, std::string>> updateContactConv;
};
auto ctx = std::make_shared<Ctx>();
ctx->convNb = conversationsRepositories.size();
ctx->contacts = std::move(contacts);
for (auto&& r : conversationsRepositories) {
dht::ThreadPool::io().run([this, ctx, repository=std::move(r), acc] {
try {
auto sconv = std::make_shared<SyncedConversation>(repository);
auto conv = std::make_shared<Conversation>(acc, repository);
conv->onMessageStatusChanged([this, repository](const auto& status) {
auto msg = std::make_shared<SyncMsg>();
msg->ms = {{repository, status}};
pimpl_->needsSyncingCb_(std::move(msg));
});
conv->onMembersChanged(
[w = pimpl_->weak_from_this(), repository](const auto& members) {
// Delay in another thread to avoid deadlocks
dht::ThreadPool::io().run([w, repository, members = std::move(members)] {
if (auto sthis = w.lock())
sthis->setConversationMembers(repository, members);
});
});
conv->onNeedSocket(pimpl_->onNeedSwarmSocket_);
auto members = conv->memberUris(acc->getUsername(), {});
// NOTE: The following if is here to protect against any incorrect state
// that can be introduced
if (conv->mode() == ConversationMode::ONE_TO_ONE && members.size() == 1) {
// If we got a 1:1 conversation, but not in the contact details, it's rather a
// duplicate or a weird state
auto otherUri = *members.begin();
auto itContact = std::find_if(ctx->contacts.cbegin(),
ctx->contacts.cend(),
[&](const auto& c) {
return c.at("id") == otherUri;
});
if (itContact == ctx->contacts.end()) {
JAMI_WARNING("Contact {} not found", otherUri);
std::lock_guard lkCv {ctx->cvMtx};
--ctx->convNb;
ctx->cv.notify_all();
return;
}
const std::string& convFromDetails = itContact->at("conversationId");
auto removed = std::stoul(itContact->at("removed"));
auto added = std::stoul(itContact->at("added"));
auto isRemoved = removed > added;
if (convFromDetails != repository) {
if (convFromDetails.empty()) {
if (isRemoved) {
// If details is empty, contact is removed and not banned.
JAMI_ERROR("Conversation {} detected for {} and should be removed",
repository,
otherUri);
std::lock_guard lkMtx {ctx->toRmMtx};
ctx->toRm.insert(repository);
} else {
JAMI_ERROR("No conversation detected for {} but one exists ({}). "
"Update details",
otherUri,
repository);
std::lock_guard lkMtx {ctx->toRmMtx};
ctx->updateContactConv.emplace_back(
std::make_tuple(otherUri, convFromDetails, repository));
}
} else {
JAMI_ERROR("Multiple conversation detected for {} but ({} & {})",
otherUri,
repository,
convFromDetails);
std::lock_guard lkMtx {ctx->toRmMtx};
ctx->toRm.insert(repository);
}
}
}
{
std::lock_guard lkMtx {ctx->convMtx};
auto convInfo = pimpl_->convInfos_.find(repository);
if (convInfo == pimpl_->convInfos_.end()) {
JAMI_ERROR("Missing conv info for {}. This is a bug!", repository);
sconv->info.created = std::time(nullptr);
sconv->info.lastDisplayed
= conv->infos()[ConversationMapKeys::LAST_DISPLAYED];
} else {
sconv->info = convInfo->second;
if (convInfo->second.isRemoved()) {
// A conversation was removed, but repository still exists
conv->setRemovingFlag();
std::lock_guard lkMtx {ctx->toRmMtx};
ctx->toRm.insert(repository);
}
}
// Even if we found the conversation in convInfos_, unable to assume that the list of members
// stored in `convInfo` is correct (https://git.jami.net/savoirfairelinux/jami-daemon/-/issues/1025).
// For this reason, we always use the list we got from the conversation repository to set
// the value of `sconv->info.members`.
members.emplace(acc->getUsername());
sconv->info.members = std::move(members);
// convInfosMtx_ is already locked
pimpl_->convInfos_[repository] = sconv->info;
}
auto commits = conv->commitsEndedCalls();
if (!commits.empty()) {
// Note: here, this means that some calls were actives while the
// daemon finished (can be a crash).
// Notify other in the conversation that the call is finished
pimpl_->sendMessageNotification(*conv, true, *commits.rbegin());
}
sconv->conversation = conv;
std::lock_guard lkMtx {ctx->convMtx};
pimpl_->conversations_.emplace(repository, std::move(sconv));
} catch (const std::logic_error& e) {
JAMI_WARNING("[Account {}] Conversations not loaded: {}",
pimpl_->accountId_,
e.what());
}
std::lock_guard lkCv {ctx->cvMtx};
--ctx->convNb;
ctx->cv.notify_all();
});
}
std::unique_lock lkCv(ctx->cvMtx);
ctx->cv.wait(lkCv, [&] { return ctx->convNb == 0; });
// Prune any invalid conversations without members and
// set the removed flag if needed
std::set<std::string> removed;
for (auto itInfo = pimpl_->convInfos_.begin(); itInfo != pimpl_->convInfos_.end();) {
const auto& info = itInfo->second;
if (info.members.empty()) {
itInfo = pimpl_->convInfos_.erase(itInfo);
continue;
}
if (info.isRemoved())
removed.insert(info.id);
auto itConv = pimpl_->conversations_.find(info.id);
if (itConv == pimpl_->conversations_.end()) {
// convInfos_ can contain a conversation that is not yet cloned
// so we need to add it there.
itConv = pimpl_->conversations_
.emplace(info.id, std::make_shared<SyncedConversation>(info))
.first;
}
if (itConv != pimpl_->conversations_.end() && itConv->second && itConv->second->conversation
&& info.isRemoved())
itConv->second->conversation->setRemovingFlag();
if (!info.isRemoved() && itConv == pimpl_->conversations_.end()) {
// In this case, the conversation is not synced and we only know ourself
if (info.members.size() == 1 && *info.members.begin() == acc->getUsername()) {
JAMI_WARNING("[Account {:s}] Conversation {:s} seems not present/synced.",
pimpl_->accountId_,
info.id);
emitSignal<libjami::ConversationSignal::ConversationRemoved>(pimpl_->accountId_,
info.id);
itInfo = pimpl_->convInfos_.erase(itInfo);
continue;
}
}
++itInfo;
}
// On oldest version, removeConversation didn't update "appdata/contacts"
// causing a potential incorrect state between "appdata/contacts" and "appdata/convInfos"
if (!removed.empty())
acc->unlinkConversations(removed);
// Save if we've removed some invalid entries
pimpl_->saveConvInfos();
ilk.unlock();
lk.unlock();
dht::ThreadPool::io().run([w = pimpl_->weak(),
acc,
updateContactConv = std::move(ctx->updateContactConv),
toRm = std::move(ctx->toRm)]() {
// Will lock account manager
if (auto shared = w.lock())
shared->fixStructures(acc, updateContactConv, toRm);
});
}
void
ConversationModule::loadSingleConversation(const std::string& convId)
{
auto acc = pimpl_->account_.lock();
if (!acc)
return;
JAMI_LOG("[Account {}] Start loading conversation {}", pimpl_->accountId_, convId);
std::unique_lock lk(pimpl_->conversationsMtx_);
std::unique_lock ilk(pimpl_->convInfosMtx_);
// Load convInfos to retrieve requests that have been accepted but not yet synchronized.
pimpl_->convInfos_ = convInfos(pimpl_->accountId_);
pimpl_->conversations_.clear();
try {
auto sconv = std::make_shared<SyncedConversation>(convId);
auto conv = std::make_shared<Conversation>(acc, convId);
conv->onNeedSocket(pimpl_->onNeedSwarmSocket_);
sconv->conversation = conv;
pimpl_->conversations_.emplace(convId, std::move(sconv));
} catch (const std::logic_error& e) {
JAMI_WARNING("[Account {}] Conversations not loaded: {}", pimpl_->accountId_, e.what());
}
// Add all other conversations as dummy conversations to indicate their existence so
// isConversation could detect conversations correctly.
auto conversationsRepositoryIds = dhtnet::fileutils::readDirectory(
fileutils::get_data_dir() / pimpl_->accountId_ / "conversations");
for (auto repositoryId : conversationsRepositoryIds) {
if (repositoryId != convId) {
auto conv = std::make_shared<SyncedConversation>(convId);
pimpl_->conversations_.emplace(repositoryId, conv);
}
}
// Add conversations from convInfos_ so isConversation could detect conversations correctly.
// This includes conversations that have been accepted but are not yet synchronized.
for (auto itInfo = pimpl_->convInfos_.begin(); itInfo != pimpl_->convInfos_.end();) {
const auto& info = itInfo->second;
if (info.members.empty()) {
itInfo = pimpl_->convInfos_.erase(itInfo);
continue;
}
auto itConv = pimpl_->conversations_.find(info.id);
if (itConv == pimpl_->conversations_.end()) {
// convInfos_ can contain a conversation that is not yet cloned
// so we need to add it there.
pimpl_->conversations_.emplace(info.id, std::make_shared<SyncedConversation>(info)).first;
}
++itInfo;
}
ilk.unlock();
lk.unlock();
}
void
ConversationModule::bootstrap(const std::string& convId)
{
pimpl_->bootstrap(convId);
}
void
ConversationModule::monitor()
{
for (auto& conv : pimpl_->getConversations())
conv->monitor();
}
void
ConversationModule::clearPendingFetch()
{
// Note: This is a workaround. convModule() is kept if account is disabled/re-enabled.
// iOS uses setAccountActive() a lot, and if for some reason the previous pending fetch
// is not erased (callback not called), it will block the new messages as it will not
// sync. The best way to debug this is to get logs from the last ICE connection for
// syncing the conversation. It may have been killed in some un-expected way avoiding to
// call the callbacks. This should never happen, but if it's the case, this will allow
// new messages to be synced correctly.
for (auto& conv : pimpl_->getSyncedConversations()) {
std::lock_guard lk(conv->mtx);
if (conv && conv->pending) {
JAMI_ERR("This is a bug, seems to still fetch to some device on initializing");
conv->pending.reset();
}
}
}
void
ConversationModule::reloadRequests()
{
pimpl_->conversationsRequests_ = convRequests(pimpl_->accountId_);
}
std::vector<std::string>
ConversationModule::getConversations() const
{
std::vector<std::string> result;
std::lock_guard lk(pimpl_->convInfosMtx_);
result.reserve(pimpl_->convInfos_.size());
for (const auto& [key, conv] : pimpl_->convInfos_) {
if (conv.isRemoved())
continue;
result.emplace_back(key);
}
return result;
}
std::string
ConversationModule::getOneToOneConversation(const std::string& uri) const noexcept
{
return pimpl_->getOneToOneConversation(uri);
}
bool
ConversationModule::updateConvForContact(const std::string& uri,
const std::string& oldConv,
const std::string& newConv)
{
return pimpl_->updateConvForContact(uri, oldConv, newConv);
}
std::vector<std::map<std::string, std::string>>
ConversationModule::getConversationRequests() const
{
std::vector<std::map<std::string, std::string>> requests;
std::lock_guard lk(pimpl_->conversationsRequestsMtx_);
requests.reserve(pimpl_->conversationsRequests_.size());
for (const auto& [id, request] : pimpl_->conversationsRequests_) {
if (request.declined)
continue; // Do not add declined requests
requests.emplace_back(request.toMap());
}
return requests;
}
void
ConversationModule::onTrustRequest(const std::string& uri,
const std::string& conversationId,
const std::vector<uint8_t>& payload,
time_t received)
{
auto oldConv = getOneToOneConversation(uri);
if (!oldConv.empty() && pimpl_->isConversation(oldConv)) {
// If there is already an active one to one conversation here, it's an active
// contact and the contact will reclone this activeConv, so ignore the request
JAMI_WARNING(
"Contact is sending a request for a non active conversation. Ignore. They will "
"clone the old one");
return;
}
std::unique_lock lk(pimpl_->conversationsRequestsMtx_);
ConversationRequest req;
req.from = uri;
req.conversationId = conversationId;
req.received = std::time(nullptr);
req.metadatas = ConversationRepository::infosFromVCard(vCard::utils::toMap(
std::string_view(reinterpret_cast<const char*>(payload.data()), payload.size())));
auto reqMap = req.toMap();
if (pimpl_->addConversationRequest(conversationId, std::move(req))) {
lk.unlock();
emitSignal<libjami::ConfigurationSignal::IncomingTrustRequest>(pimpl_->accountId_,
conversationId,
uri,
payload,
received);
emitSignal<libjami::ConversationSignal::ConversationRequestReceived>(pimpl_->accountId_,
conversationId,
reqMap);
pimpl_->needsSyncingCb_({});
} else {
JAMI_DEBUG("[Account {}] Received a request for a conversation "
"already existing. Ignore",
pimpl_->accountId_);
}
}
void
ConversationModule::onConversationRequest(const std::string& from, const Json::Value& value)
{
ConversationRequest req(value);
auto isOneToOne = req.isOneToOne();
std::string oldConv;
if (isOneToOne) {
oldConv = pimpl_->getOneToOneConversation(from);
}
std::unique_lock lk(pimpl_->conversationsRequestsMtx_);
JAMI_DEBUG("[Account {}] Receive a new conversation request for conversation {} from {}",
pimpl_->accountId_,
req.conversationId,
from);
auto convId = req.conversationId;
// Already accepted request, do nothing
if (pimpl_->isConversation(convId))
return;
auto oldReq = pimpl_->getRequest(convId);
if (oldReq != std::nullopt) {
JAMI_DEBUG("[Account {}] Received a request for a conversation already existing. "
"Ignore. Declined: {}",
pimpl_->accountId_,
static_cast<int>(oldReq->declined));
return;
}
if (!oldConv.empty()) {
lk.unlock();
// Already a conversation with the contact.
// If there is already an active one to one conversation here, it's an active
// contact and the contact will reclone this activeConv, so ignore the request
JAMI_WARNING(
"Contact is sending a request for a non active conversation. Ignore. They will "
"clone the old one");
return;
}
req.received = std::time(nullptr);
req.from = from;
auto reqMap = req.toMap();
if (pimpl_->addConversationRequest(convId, std::move(req))) {
lk.unlock();
// Note: no need to sync here because other connected devices should receive
// the same conversation request. Will sync when the conversation will be added
if (isOneToOne)
pimpl_->oneToOneRecvCb_(convId, from);
emitSignal<libjami::ConversationSignal::ConversationRequestReceived>(pimpl_->accountId_,
convId,
reqMap);
}
}
std::string
ConversationModule::peerFromConversationRequest(const std::string& convId) const
{
std::lock_guard lk(pimpl_->conversationsRequestsMtx_);
auto it = pimpl_->conversationsRequests_.find(convId);
if (it != pimpl_->conversationsRequests_.end()) {
return it->second.from;
}
return {};
}
void
ConversationModule::onNeedConversationRequest(const std::string& from,
const std::string& conversationId)
{
pimpl_->withConversation(conversationId, [&](auto& conversation) {
if (!conversation.isMember(from, true)) {
JAMI_WARNING("{} is asking a new invite for {}, but not a member", from, conversationId);
return;
}
JAMI_LOG("{} is asking a new invite for {}", from, conversationId);
pimpl_->sendMsgCb_(from, {}, conversation.generateInvitation(), 0);
});
}
void
ConversationModule::acceptConversationRequest(const std::string& conversationId,
const std::string& deviceId)
{
// For all conversation members, try to open a git channel with this conversation ID
std::unique_lock lkCr(pimpl_->conversationsRequestsMtx_);
auto request = pimpl_->getRequest(conversationId);
if (request == std::nullopt) {
lkCr.unlock();
if (auto conv = pimpl_->getConversation(conversationId)) {
std::unique_lock lk(conv->mtx);
if (!conv->conversation) {
lk.unlock();
pimpl_->cloneConversationFrom(conv, deviceId);
}
}
JAMI_WARNING("[Account {}] Request not found for conversation {}",
pimpl_->accountId_,
conversationId);
return;
}
pimpl_->rmConversationRequest(conversationId);
lkCr.unlock();
pimpl_->accountManager_->acceptTrustRequest(request->from, true);
cloneConversationFrom(conversationId, request->from);
}
void
ConversationModule::declineConversationRequest(const std::string& conversationId)
{
std::lock_guard lk(pimpl_->conversationsRequestsMtx_);
auto it = pimpl_->conversationsRequests_.find(conversationId);
if (it != pimpl_->conversationsRequests_.end()) {
it->second.declined = std::time(nullptr);
pimpl_->saveConvRequests();
}
pimpl_->syncingMetadatas_.erase(conversationId);
pimpl_->saveMetadata();
emitSignal<libjami::ConversationSignal::ConversationRequestDeclined>(pimpl_->accountId_,
conversationId);
pimpl_->needsSyncingCb_({});
}
std::string
ConversationModule::startConversation(ConversationMode mode, const dht::InfoHash& otherMember)
{
auto acc = pimpl_->account_.lock();
if (!acc)
return {};
std::vector<DeviceId> kd;
for (const auto& [id, _] : acc->getKnownDevices())
kd.emplace_back(id);
// Create the conversation object
std::shared_ptr<Conversation> conversation;
try {
conversation = std::make_shared<Conversation>(acc, mode, otherMember.toString());
auto conversationId = conversation->id();
conversation->onMessageStatusChanged([this, conversationId](const auto& status) {
auto msg = std::make_shared<SyncMsg>();
msg->ms = {{conversationId, status}};
pimpl_->needsSyncingCb_(std::move(msg));
});
conversation->onMembersChanged([w=pimpl_->weak_from_this(), conversationId](const auto& members) {
// Delay in another thread to avoid deadlocks
dht::ThreadPool::io().run([w, conversationId, members = std::move(members)] {
if (auto sthis = w.lock())
sthis->setConversationMembers(conversationId, members);
});
});
conversation->onNeedSocket(pimpl_->onNeedSwarmSocket_);
#ifdef LIBJAMI_TESTABLE
conversation->onBootstrapStatus(pimpl_->bootstrapCbTest_);
#endif // LIBJAMI_TESTABLE
conversation->bootstrap(std::bind(&ConversationModule::Impl::bootstrapCb,
pimpl_.get(),
conversationId),
kd);
} catch (const std::exception& e) {
JAMI_ERROR("[Account {}] Error while generating a conversation {}",
pimpl_->accountId_, e.what());
return {};
}
auto convId = conversation->id();
auto conv = pimpl_->startConversation(convId);
std::unique_lock lk(conv->mtx);
conv->info.created = std::time(nullptr);
conv->info.members.emplace(pimpl_->username_);
if (otherMember)
conv->info.members.emplace(otherMember.toString());
conv->conversation = conversation;
addConvInfo(conv->info);
lk.unlock();
pimpl_->needsSyncingCb_({});
emitSignal<libjami::ConversationSignal::ConversationReady>(pimpl_->accountId_, convId);
return convId;
}
void
ConversationModule::cloneConversationFrom(const std::string& conversationId,
const std::string& uri,
const std::string& oldConvId)
{
pimpl_->cloneConversationFrom(conversationId, uri, oldConvId);
}
// Message send/load
void
ConversationModule::sendMessage(const std::string& conversationId,
std::string message,
const std::string& replyTo,
const std::string& type,
bool announce,
OnCommitCb&& onCommit,
OnDoneCb&& cb)
{
pimpl_->sendMessage(conversationId,
std::move(message),
replyTo,
type,
announce,
std::move(onCommit),
std::move(cb));
}
void
ConversationModule::sendMessage(const std::string& conversationId,
Json::Value&& value,
const std::string& replyTo,
bool announce,
OnCommitCb&& onCommit,
OnDoneCb&& cb)
{
pimpl_->sendMessage(conversationId,
std::move(value),
replyTo,
announce,
std::move(onCommit),
std::move(cb));
}
void
ConversationModule::editMessage(const std::string& conversationId,
const std::string& newBody,
const std::string& editedId)
{
pimpl_->editMessage(conversationId, newBody, editedId);
}
void
ConversationModule::reactToMessage(const std::string& conversationId,
const std::string& newBody,
const std::string& reactToId)
{
// Commit message edition
Json::Value json;
json["body"] = newBody;
json["react-to"] = reactToId;
json["type"] = "text/plain";
pimpl_->sendMessage(conversationId, std::move(json));
}
void
ConversationModule::addCallHistoryMessage(const std::string& uri,
uint64_t duration_ms,
const std::string& reason)
{
auto finalUri = uri.substr(0, uri.find("@ring.dht"));
finalUri = finalUri.substr(0, uri.find("@jami.dht"));
auto convId = getOneToOneConversation(finalUri);
if (!convId.empty()) {
Json::Value value;
value["to"] = finalUri;
value["type"] = "application/call-history+json";
value["duration"] = std::to_string(duration_ms);
if (!reason.empty())
value["reason"] = reason;
sendMessage(convId, std::move(value));
}
}
bool
ConversationModule::onMessageDisplayed(const std::string& peer,
const std::string& conversationId,
const std::string& interactionId)
{
if (auto conv = pimpl_->getConversation(conversationId)) {
std::unique_lock lk(conv->mtx);
if (auto conversation = conv->conversation) {
lk.unlock();
return conversation->setMessageDisplayed(peer, interactionId);
}
}
return false;
}
std::map<std::string, std::map<std::string, std::map<std::string, std::string>>>
ConversationModule::convMessageStatus() const
{
std::map<std::string, std::map<std::string, std::map<std::string, std::string>>> messageStatus;
for (const auto& conv : pimpl_->getConversations()) {
auto d = conv->messageStatus();
if (!d.empty())
messageStatus[conv->id()] = std::move(d);
}
return messageStatus;
}
uint32_t
ConversationModule::loadConversationMessages(const std::string& conversationId,
const std::string& fromMessage,
size_t n)
{
auto acc = pimpl_->account_.lock();
if (auto conv = pimpl_->getConversation(conversationId)) {
std::lock_guard lk(conv->mtx);
if (conv->conversation) {
const uint32_t id = std::uniform_int_distribution<uint32_t> {}(acc->rand);
LogOptions options;
options.from = fromMessage;
options.nbOfCommits = n;
conv->conversation->loadMessages(
[accountId = pimpl_->accountId_, conversationId, id](auto&& messages) {
emitSignal<libjami::ConversationSignal::ConversationLoaded>(id,
accountId,
conversationId,
messages);
},
options);
return id;
}
}
return 0;
}
void
ConversationModule::clearCache(const std::string& conversationId)
{
if (auto conv = pimpl_->getConversation(conversationId)) {
std::lock_guard lk(conv->mtx);
if (conv->conversation) {
conv->conversation->clearCache();
}
}
}
uint32_t
ConversationModule::loadConversation(const std::string& conversationId,
const std::string& fromMessage,
size_t n)
{
auto acc = pimpl_->account_.lock();
if (auto conv = pimpl_->getConversation(conversationId)) {
std::lock_guard lk(conv->mtx);
if (conv->conversation) {
const uint32_t id = std::uniform_int_distribution<uint32_t> {}(acc->rand);
LogOptions options;
options.from = fromMessage;
options.nbOfCommits = n;
conv->conversation->loadMessages2(
[accountId = pimpl_->accountId_, conversationId, id](auto&& messages) {
emitSignal<libjami::ConversationSignal::SwarmLoaded>(id,
accountId,
conversationId,
messages);
},
options);
return id;
}
}
return 0;
}
uint32_t
ConversationModule::loadConversationUntil(const std::string& conversationId,
const std::string& fromMessage,
const std::string& toMessage)
{
auto acc = pimpl_->account_.lock();
if (auto conv = pimpl_->getConversation(conversationId)) {
std::lock_guard lk(conv->mtx);
if (conv->conversation) {
const uint32_t id = std::uniform_int_distribution<uint32_t> {}(acc->rand);
LogOptions options;
options.from = fromMessage;
options.to = toMessage;
options.includeTo = true;
conv->conversation->loadMessages(
[accountId = pimpl_->accountId_, conversationId, id](auto&& messages) {
emitSignal<libjami::ConversationSignal::ConversationLoaded>(id,
accountId,
conversationId,
messages);
},
options);
return id;
}
}
return 0;
}
uint32_t
ConversationModule::loadSwarmUntil(const std::string& conversationId,
const std::string& fromMessage,
const std::string& toMessage)
{
auto acc = pimpl_->account_.lock();
if (auto conv = pimpl_->getConversation(conversationId)) {
std::lock_guard lk(conv->mtx);
if (conv->conversation) {
const uint32_t id = std::uniform_int_distribution<uint32_t> {}(acc->rand);
LogOptions options;
options.from = fromMessage;
options.to = toMessage;
options.includeTo = true;
conv->conversation->loadMessages2(
[accountId = pimpl_->accountId_, conversationId, id](auto&& messages) {
emitSignal<libjami::ConversationSignal::SwarmLoaded>(id,
accountId,
conversationId,
messages);
},
options);
return id;
}
}
return 0;
}
std::shared_ptr<TransferManager>
ConversationModule::dataTransfer(const std::string& conversationId) const
{
return pimpl_->withConversation(conversationId,
[](auto& conversation) { return conversation.dataTransfer(); });
}
bool
ConversationModule::onFileChannelRequest(const std::string& conversationId,
const std::string& member,
const std::string& fileId,
bool verifyShaSum) const
{
if (auto conv = pimpl_->getConversation(conversationId)) {
std::lock_guard lk(conv->mtx);
if (conv->conversation)
return conv->conversation->onFileChannelRequest(member, fileId, verifyShaSum);
}
return false;
}
bool
ConversationModule::downloadFile(const std::string& conversationId,
const std::string& interactionId,
const std::string& fileId,
const std::string& path,
size_t start,
size_t end)
{
if (auto conv = pimpl_->getConversation(conversationId)) {
std::lock_guard lk(conv->mtx);
if (conv->conversation)
return conv->conversation->downloadFile(interactionId, fileId, path, "", "", start, end);
}
return false;
}
void
ConversationModule::syncConversations(const std::string& peer, const std::string& deviceId)
{
// Sync conversations where peer is member
std::set<std::string> toFetch;
std::set<std::string> toClone;
for (const auto& conv : pimpl_->getSyncedConversations()) {
std::lock_guard lk(conv->mtx);
if (conv->conversation) {
if (!conv->conversation->isRemoving() && conv->conversation->isMember(peer, false)) {
toFetch.emplace(conv->info.id);
}
} else if (!conv->info.isRemoved()
&& std::find(conv->info.members.begin(), conv->info.members.end(), peer)
!= conv->info.members.end()) {
// In this case the conversation was never cloned (can be after an import)
toClone.emplace(conv->info.id);
}
}
for (const auto& cid : toFetch)
pimpl_->fetchNewCommits(peer, deviceId, cid);
for (const auto& cid : toClone)
pimpl_->cloneConversation(deviceId, peer, cid);
if (pimpl_->syncCnt.load() == 0)
emitSignal<libjami::ConversationSignal::ConversationSyncFinished>(pimpl_->accountId_);
}
void
ConversationModule::onSyncData(const SyncMsg& msg,
const std::string& peerId,
const std::string& deviceId)
{
std::vector<std::string> toClone;
for (const auto& [key, convInfo] : msg.c) {
const auto& convId = convInfo.id;
{
std::lock_guard lk(pimpl_->conversationsRequestsMtx_);
pimpl_->rmConversationRequest(convId);
}
auto conv = pimpl_->startConversation(convInfo);
std::unique_lock lk(conv->mtx);
// Skip outdated info
if (std::max(convInfo.created, convInfo.removed)
< std::max(conv->info.created, conv->info.removed))
continue;
if (not convInfo.isRemoved()) {
// If multi devices, it can detect a conversation that was already
// removed, so just check if the convinfo contains a removed conv
if (conv->info.removed) {
if (conv->info.removed >= convInfo.created) {
// Only reclone if re-added, else the peer is not synced yet (could be
// offline before)
continue;
}
JAMI_DEBUG("Re-add previously removed conversation {:s}", convId);
}
conv->info = convInfo;
if (!conv->conversation) {
if (deviceId != "") {
pimpl_->cloneConversation(deviceId, peerId, conv);
} else {
// In this case, information is from JAMS
// JAMS does not store the conversation itself, so we
// must use information to clone the conversation
addConvInfo(convInfo);
toClone.emplace_back(convId);
}
}
} else {
if (conv->conversation && !conv->conversation->isRemoving()) {
emitSignal<libjami::ConversationSignal::ConversationRemoved>(pimpl_->accountId_,
convId);
conv->conversation->setRemovingFlag();
}
auto update = false;
if (!conv->info.removed) {
update = true;
conv->info.removed = std::time(nullptr);
}
if (convInfo.erased && !conv->info.erased) {
conv->info.erased = std::time(nullptr);
pimpl_->addConvInfo(conv->info);
pimpl_->removeRepositoryImpl(*conv, false);
} else if (update) {
pimpl_->addConvInfo(conv->info);
}
}
}
for (const auto& cid : toClone) {
auto members = getConversationMembers(cid);
for (const auto& member : members) {
if (member.at("uri") != pimpl_->username_)
cloneConversationFrom(cid, member.at("uri"));
}
}
for (const auto& [convId, req] : msg.cr) {
if (req.from == pimpl_->username_) {
JAMI_WARNING("Detected request from ourself, ignore {}.", convId);
continue;
}
std::unique_lock lk(pimpl_->conversationsRequestsMtx_);
if (pimpl_->isConversation(convId)) {
// Already handled request
pimpl_->rmConversationRequest(convId);
continue;
}
// New request
if (!pimpl_->addConversationRequest(convId, req))
continue;
lk.unlock();
if (req.declined != 0) {
// Request declined
JAMI_LOG("[Account {:s}] Declined request detected for conversation {:s} (device {:s})",
pimpl_->accountId_,
convId,
deviceId);
pimpl_->syncingMetadatas_.erase(convId);
pimpl_->saveMetadata();
emitSignal<libjami::ConversationSignal::ConversationRequestDeclined>(pimpl_->accountId_,
convId);
continue;
}
JAMI_LOG("[Account {:s}] New request detected for conversation {:s} (device {:s})",
pimpl_->accountId_,
convId,
deviceId);
emitSignal<libjami::ConversationSignal::ConversationRequestReceived>(pimpl_->accountId_,
convId,
req.toMap());
}
// Updates preferences for conversations
for (const auto& [convId, p] : msg.p) {
if (auto conv = pimpl_->getConversation(convId)) {
std::unique_lock lk(conv->mtx);
if (conv->conversation) {
auto conversation = conv->conversation;
lk.unlock();
conversation->updatePreferences(p);
} else if (conv->pending) {
conv->pending->preferences = p;
}
}
}
// Updates displayed for conversations
for (const auto& [convId, ms] : msg.ms) {
if (auto conv = pimpl_->getConversation(convId)) {
std::unique_lock lk(conv->mtx);
if (conv->conversation) {
auto conversation = conv->conversation;
lk.unlock();
conversation->updateMessageStatus(ms);
} else if (conv->pending) {
conv->pending->status = ms;
}
}
}
}
bool
ConversationModule::needsSyncingWith(const std::string& memberUri, const std::string& deviceId) const
{
// Check if a conversation needs to fetch remote or to be cloned
std::lock_guard lk(pimpl_->conversationsMtx_);
for (const auto& [key, ci] : pimpl_->conversations_) {
std::lock_guard lk(ci->mtx);
if (ci->conversation) {
if (ci->conversation->isRemoving() && ci->conversation->isMember(memberUri, false))
return true;
} else if (!ci->info.removed
&& std::find(ci->info.members.begin(), ci->info.members.end(), memberUri)
!= ci->info.members.end()) {
// In this case the conversation was never cloned (can be after an import)
return true;
}
}
return false;
}
void
ConversationModule::setFetched(const std::string& conversationId,
const std::string& deviceId,
const std::string& commitId)
{
if (auto conv = pimpl_->getConversation(conversationId)) {
std::lock_guard lk(conv->mtx);
if (conv->conversation) {
bool remove = conv->conversation->isRemoving();
conv->conversation->hasFetched(deviceId, commitId);
if (remove)
pimpl_->removeRepositoryImpl(*conv, true);
}
}
}
void
ConversationModule::fetchNewCommits(const std::string& peer,
const std::string& deviceId,
const std::string& conversationId,
const std::string& commitId)
{
pimpl_->fetchNewCommits(peer, deviceId, conversationId, commitId);
}
void
ConversationModule::addConversationMember(const std::string& conversationId,
const dht::InfoHash& contactUri,
bool sendRequest)
{
auto conv = pimpl_->getConversation(conversationId);
if (not conv || not conv->conversation) {
JAMI_ERROR("Conversation {:s} does not exist", conversationId);
return;
}
std::unique_lock lk(conv->mtx);
auto contactUriStr = contactUri.toString();
if (conv->conversation->isMember(contactUriStr, true)) {
JAMI_DEBUG("{:s} is already a member of {:s}, resend invite", contactUriStr, conversationId);
// Note: This should not be necessary, but if for whatever reason the other side didn't
// join we should not forbid new invites
auto invite = conv->conversation->generateInvitation();
lk.unlock();
pimpl_->sendMsgCb_(contactUriStr, {}, std::move(invite), 0);
return;
}
conv->conversation->addMember(
contactUriStr,
[this, conv, conversationId, sendRequest, contactUriStr](bool ok, const std::string& commitId) {
if (ok) {
std::unique_lock lk(conv->mtx);
pimpl_->sendMessageNotification(*conv->conversation,
true,
commitId); // For the other members
if (sendRequest) {
auto invite = conv->conversation->generateInvitation();
lk.unlock();
pimpl_->sendMsgCb_(contactUriStr, {}, std::move(invite), 0);
}
}
});
}
void
ConversationModule::removeConversationMember(const std::string& conversationId,
const dht::InfoHash& contactUri,
bool isDevice)
{
auto contactUriStr = contactUri.toString();
if (auto conv = pimpl_->getConversation(conversationId)) {
std::lock_guard lk(conv->mtx);
if (conv->conversation)
return conv->conversation->removeMember(
contactUriStr, isDevice, [this, conversationId](bool ok, const std::string& commitId) {
if (ok) {
pimpl_->sendMessageNotification(conversationId, true, commitId);
}
});
}
}
std::vector<std::map<std::string, std::string>>
ConversationModule::getConversationMembers(const std::string& conversationId,
bool includeBanned) const
{
return pimpl_->getConversationMembers(conversationId, includeBanned);
}
uint32_t
ConversationModule::countInteractions(const std::string& convId,
const std::string& toId,
const std::string& fromId,
const std::string& authorUri) const
{
if (auto conv = pimpl_->getConversation(convId)) {
std::lock_guard lk(conv->mtx);
if (conv->conversation)
return conv->conversation->countInteractions(toId, fromId, authorUri);
}
return 0;
}
void
ConversationModule::search(uint32_t req, const std::string& convId, const Filter& filter) const
{
if (convId.empty()) {
auto convs = pimpl_->getConversations();
if (convs.empty()) {
emitSignal<libjami::ConversationSignal::MessagesFound>(
req,
pimpl_->accountId_,
std::string {},
std::vector<std::map<std::string, std::string>> {});
return;
}
auto finishedFlag = std::make_shared<std::atomic_int>(convs.size());
for (const auto& conv : convs) {
conv->search(req, filter, finishedFlag);
}
} else if (auto conv = pimpl_->getConversation(convId)) {
std::lock_guard lk(conv->mtx);
if (conv->conversation)
conv->conversation->search(req, filter, std::make_shared<std::atomic_int>(1));
}
}
void
ConversationModule::updateConversationInfos(const std::string& conversationId,
const std::map<std::string, std::string>& infos,
bool sync)
{
auto conv = pimpl_->getConversation(conversationId);
if (not conv or not conv->conversation) {
JAMI_ERROR("Conversation {:s} does not exist", conversationId);
return;
}
std::lock_guard lk(conv->mtx);
conv->conversation
->updateInfos(infos, [this, conversationId, sync](bool ok, const std::string& commitId) {
if (ok && sync) {
pimpl_->sendMessageNotification(conversationId, true, commitId);
} else if (sync)
JAMI_WARNING("Unable to update info on {:s}", conversationId);
});
}
std::map<std::string, std::string>
ConversationModule::conversationInfos(const std::string& conversationId) const
{
{
std::lock_guard lk(pimpl_->conversationsRequestsMtx_);
auto itReq = pimpl_->conversationsRequests_.find(conversationId);
if (itReq != pimpl_->conversationsRequests_.end())
return itReq->second.metadatas;
}
if (auto conv = pimpl_->getConversation(conversationId)) {
std::lock_guard lk(conv->mtx);
std::map<std::string, std::string> md;
{
auto syncingMetadatasIt = pimpl_->syncingMetadatas_.find(conversationId);
if (syncingMetadatasIt != pimpl_->syncingMetadatas_.end()) {
if (conv->conversation) {
pimpl_->syncingMetadatas_.erase(syncingMetadatasIt);
pimpl_->saveMetadata();
} else {
md = syncingMetadatasIt->second;
}
}
}
if (conv->conversation)
return conv->conversation->infos();
else
return md;
}
JAMI_ERROR("Conversation {:s} does not exist", conversationId);
return {};
}
void
ConversationModule::setConversationPreferences(const std::string& conversationId,
const std::map<std::string, std::string>& prefs)
{
if (auto conv = pimpl_->getConversation(conversationId)) {
std::unique_lock lk(conv->mtx);
if (not conv->conversation) {
JAMI_ERROR("Conversation {:s} does not exist", conversationId);
return;
}
auto conversation = conv->conversation;
lk.unlock();
conversation->updatePreferences(prefs);
auto msg = std::make_shared<SyncMsg>();
msg->p = {{conversationId, conversation->preferences(true)}};
pimpl_->needsSyncingCb_(std::move(msg));
}
}
std::map<std::string, std::string>
ConversationModule::getConversationPreferences(const std::string& conversationId,
bool includeCreated) const
{
if (auto conv = pimpl_->getConversation(conversationId)) {
std::lock_guard lk(conv->mtx);
if (conv->conversation)
return conv->conversation->preferences(includeCreated);
}
return {};
}
std::map<std::string, std::map<std::string, std::string>>
ConversationModule::convPreferences() const
{
std::map<std::string, std::map<std::string, std::string>> p;
for (const auto& conv : pimpl_->getConversations()) {
auto prefs = conv->preferences(true);
if (!prefs.empty())
p[conv->id()] = std::move(prefs);
}
return p;
}
std::vector<uint8_t>
ConversationModule::conversationVCard(const std::string& conversationId) const
{
if (auto conv = pimpl_->getConversation(conversationId)) {
std::lock_guard lk(conv->mtx);
if (conv->conversation)
return conv->conversation->vCard();
}
JAMI_ERROR("Conversation {:s} does not exist", conversationId);
return {};
}
bool
ConversationModule::isBanned(const std::string& convId, const std::string& uri) const
{
if (auto conv = pimpl_->getConversation(convId)) {
std::lock_guard lk(conv->mtx);
if (!conv->conversation)
return true;
if (conv->conversation->mode() != ConversationMode::ONE_TO_ONE)
return conv->conversation->isBanned(uri);
}
// If 1:1 we check the certificate status
std::lock_guard lk(pimpl_->conversationsMtx_);
return pimpl_->accountManager_->getCertificateStatus(uri) == dhtnet::tls::TrustStore::PermissionStatus::BANNED;
}
void
ConversationModule::removeContact(const std::string& uri, bool banned)
{
// Remove linked conversation's requests
{
std::lock_guard lk(pimpl_->conversationsRequestsMtx_);
auto update = false;
for (auto it = pimpl_->conversationsRequests_.begin();
it != pimpl_->conversationsRequests_.end();
++it) {
if (it->second.from == uri && !it->second.declined) {
JAMI_DEBUG("Declining conversation request {:s} from {:s}", it->first, uri);
pimpl_->syncingMetadatas_.erase(it->first);
pimpl_->saveMetadata();
emitSignal<libjami::ConversationSignal::ConversationRequestDeclined>(
pimpl_->accountId_, it->first);
update = true;
it->second.declined = std::time(nullptr);
}
}
if (update) {
pimpl_->saveConvRequests();
pimpl_->needsSyncingCb_({});
}
}
if (banned) {
auto conversationId = getOneToOneConversation(uri);
pimpl_->withConversation(conversationId, [&](auto& conv) { conv.shutdownConnections(); });
return; // Keep the conversation in banned model but stop connections
}
// Removed contacts should not be linked to any conversation
pimpl_->accountManager_->updateContactConversation(uri, "");
// Remove all one-to-one conversations with the removed contact
auto isSelf = uri == pimpl_->username_;
std::vector<std::string> toRm;
auto removeConvInfo = [&](const auto& conv, const auto& members) {
if ((isSelf && members.size() == 1)
|| (!isSelf && std::find(members.begin(), members.end(), uri) != members.end())) {
// Mark the conversation as removed if it wasn't already
if (!conv->info.isRemoved()) {
conv->info.removed = std::time(nullptr);
emitSignal<libjami::ConversationSignal::ConversationRemoved>(pimpl_->accountId_, conv->info.id);
pimpl_->addConvInfo(conv->info);
return true;
}
}
return false;
};
{
std::lock_guard lk(pimpl_->conversationsMtx_);
for (auto& [convId, conv] : pimpl_->conversations_) {
std::lock_guard lk(conv->mtx);
if (conv->conversation) {
try {
// Note it's important to check getUsername(), else
// removing self can remove all conversations
if (conv->conversation->mode() == ConversationMode::ONE_TO_ONE) {
auto initMembers = conv->conversation->getInitialMembers();
if (removeConvInfo(conv, initMembers))
toRm.emplace_back(convId);
}
} catch (const std::exception& e) {
JAMI_WARN("%s", e.what());
}
} else {
removeConvInfo(conv, conv->info.members);
}
}
}
for (const auto& id : toRm)
pimpl_->removeRepository(id, true, true);
}
bool
ConversationModule::removeConversation(const std::string& conversationId)
{
return pimpl_->removeConversation(conversationId);
}
void
ConversationModule::initReplay(const std::string& oldConvId, const std::string& newConvId)
{
if (auto conv = pimpl_->getConversation(oldConvId)) {
std::lock_guard lk(conv->mtx);
if (conv->conversation) {
std::promise<bool> waitLoad;
std::future<bool> fut = waitLoad.get_future();
// we should wait for loadMessage, because it will be deleted after this.
conv->conversation->loadMessages(
[&](auto&& messages) {
std::reverse(messages.begin(),
messages.end()); // Log is inverted as we want to replay
std::lock_guard lk(pimpl_->replayMtx_);
pimpl_->replay_[newConvId] = std::move(messages);
waitLoad.set_value(true);
},
{});
fut.wait();
}
}
}
bool
ConversationModule::isHosting(const std::string& conversationId, const std::string& confId) const
{
if (conversationId.empty()) {
std::lock_guard lk(pimpl_->conversationsMtx_);
return std::find_if(pimpl_->conversations_.cbegin(),
pimpl_->conversations_.cend(),
[&](const auto& conv) {
return conv.second->conversation
&& conv.second->conversation->isHosting(confId);
})
!= pimpl_->conversations_.cend();
} else if (auto conv = pimpl_->getConversation(conversationId)) {
if (conv->conversation) {
return conv->conversation->isHosting(confId);
}
}
return false;
}
std::vector<std::map<std::string, std::string>>
ConversationModule::getActiveCalls(const std::string& conversationId) const
{
return pimpl_->withConversation(conversationId, [](const auto& conversation) {
return conversation.currentCalls();
});
}
std::shared_ptr<SIPCall>
ConversationModule::call(const std::string& url,
const std::vector<libjami::MediaMap>& mediaList,
std::function<void(const std::string&, const DeviceId&, const std::shared_ptr<SIPCall>&)>&& cb)
{
std::string conversationId = "", confId = "", uri = "", deviceId = "";
if (url.find('/') == std::string::npos) {
conversationId = url;
} else {
auto parameters = jami::split_string(url, '/');
if (parameters.size() != 4) {
JAMI_ERROR("Incorrect url {:s}", url);
return {};
}
conversationId = parameters[0];
uri = parameters[1];
deviceId = parameters[2];
confId = parameters[3];
}
auto conv = pimpl_->getConversation(conversationId);
if (!conv)
return {};
std::unique_lock lk(conv->mtx);
if (!conv->conversation) {
JAMI_ERROR("Conversation {:s} not found", conversationId);
return {};
}
// Check if we want to join a specific conference
// So, if confId is specified or if there is some activeCalls
// or if we are the default host.
auto activeCalls = conv->conversation->currentCalls();
auto infos = conv->conversation->infos();
auto itRdvAccount = infos.find("rdvAccount");
auto itRdvDevice = infos.find("rdvDevice");
auto sendCallRequest = false;
if (!confId.empty()) {
sendCallRequest = true;
JAMI_DEBUG("Calling self, join conference");
} else if (!activeCalls.empty()) {
// Else, we try to join active calls
sendCallRequest = true;
auto& ac = *activeCalls.rbegin();
confId = ac.at("id");
uri = ac.at("uri");
deviceId = ac.at("device");
} else if (itRdvAccount != infos.end() && itRdvDevice != infos.end()
&& !itRdvAccount->second.empty()) {
// Else, creates "to" (accountId/deviceId/conversationId/confId) and ask remote host
sendCallRequest = true;
uri = itRdvAccount->second;
deviceId = itRdvDevice->second;
confId = "0";
JAMI_DEBUG("Remote host detected. Calling {:s} on device {:s}", uri, deviceId);
}
lk.unlock();
auto account = pimpl_->account_.lock();
std::vector<libjami::MediaMap> mediaMap = mediaList.empty()
? MediaAttribute::mediaAttributesToMediaMaps(
pimpl_->account_.lock()->createDefaultMediaList(
pimpl_->account_.lock()->isVideoEnabled()))
: mediaList;
if (!sendCallRequest
|| (uri == pimpl_->username_ && deviceId == pimpl_->deviceId_)) {
confId = confId == "0" ? Manager::instance().callFactory.getNewCallID() : confId;
// TODO attach host with media list
hostConference(conversationId, confId, "", mediaMap);
return {};
}
// Else we need to create a call
auto& manager = Manager::instance();
std::shared_ptr<SIPCall> call = manager.callFactory.newSipCall(account, Call::CallType::OUTGOING, mediaMap);
if (not call)
return {};
auto callUri = fmt::format("{}/{}/{}/{}", conversationId, uri, deviceId, confId);
account->getIceOptions([call, accountId = account->getAccountID(), callUri, uri = std::move(uri), conversationId, deviceId, cb=std::move(cb)](auto&& opts) {
if (call->isIceEnabled()) {
if (not call->createIceMediaTransport(false)
or not call->initIceMediaTransport(true,
std::forward<dhtnet::IceTransportOptions>(opts))) {
return;
}
}
JAMI_DEBUG("New outgoing call with {}", uri);
call->setPeerNumber(uri);
call->setPeerUri("swarm:" + uri);
JAMI_DEBUG("Calling: {:s}", callUri);
call->setState(Call::ConnectionState::TRYING);
call->setPeerNumber(callUri);
call->setPeerUri("rdv:" + callUri);
call->addStateListener([accountId, conversationId](Call::CallState call_state,
Call::ConnectionState cnx_state,
int) {
if (cnx_state == Call::ConnectionState::DISCONNECTED
&& call_state == Call::CallState::MERROR) {
emitSignal<libjami::ConfigurationSignal::NeedsHost>(accountId, conversationId);
return true;
}
return true;
});
cb(callUri, DeviceId(deviceId), call);
});
return call;
}
void
ConversationModule::hostConference(const std::string& conversationId,
const std::string& confId,
const std::string& callId,
const std::vector<libjami::MediaMap>& mediaList)
{
auto acc = pimpl_->account_.lock();
if (!acc)
return;
auto conf = acc->getConference(confId);
auto createConf = !conf;
std::shared_ptr<SIPCall> call;
if (!callId.empty()) {
call = std::dynamic_pointer_cast<SIPCall>(acc->getCall(callId));
if (!call) {
JAMI_WARNING("No call with id {} found", callId);
return;
}
}
if (createConf) {
conf = std::make_shared<Conference>(acc, confId);
acc->attach(conf);
}
if (!callId.empty())
conf->addSubCall(callId);
if (callId.empty())
conf->attachHost(mediaList);
if (createConf) {
emitSignal<libjami::CallSignal::ConferenceCreated>(acc->getAccountID(), conversationId, conf->getConfId());
} else {
conf->reportMediaNegotiationStatus();
emitSignal<libjami::CallSignal::ConferenceChanged>(acc->getAccountID(),
conf->getConfId(),
conf->getStateStr());
return;
}
auto conv = pimpl_->getConversation(conversationId);
if (!conv)
return;
std::unique_lock lk(conv->mtx);
if (!conv->conversation) {
JAMI_ERROR("Conversation {} not found", conversationId);
return;
}
// Add commit to conversation
Json::Value value;
value["uri"] = pimpl_->username_;
value["device"] = pimpl_->deviceId_;
value["confId"] = conf->getConfId();
value["type"] = "application/call-history+json";
conv->conversation->hostConference(std::move(value),
[w = pimpl_->weak(),
conversationId](bool ok, const std::string& commitId) {
if (ok) {
if (auto shared = w.lock())
shared->sendMessageNotification(conversationId,
true,
commitId);
} else {
JAMI_ERR("Failed to send message to conversation %s",
conversationId.c_str());
}
});
// When conf finished = remove host & commit
// Master call, so when it's stopped, the conference will be stopped (as we use the hold
// state for detaching the call)
conf->onShutdown(
[w = pimpl_->weak(), accountUri = pimpl_->username_, confId=conf->getConfId(), conversationId, conv](
int duration) {
auto shared = w.lock();
if (shared) {
Json::Value value;
value["uri"] = accountUri;
value["device"] = shared->deviceId_;
value["confId"] = confId;
value["type"] = "application/call-history+json";
value["duration"] = std::to_string(duration);
std::lock_guard lk(conv->mtx);
if (!conv->conversation) {
JAMI_ERROR("Conversation {} not found", conversationId);
return;
}
conv->conversation->removeActiveConference(
std::move(value), [w, conversationId](bool ok, const std::string& commitId) {
if (ok) {
if (auto shared = w.lock()) {
shared->sendMessageNotification(conversationId, true, commitId);
}
} else {
JAMI_ERROR("Failed to send message to conversation {}", conversationId);
}
});
}
});
}
std::map<std::string, ConvInfo>
ConversationModule::convInfos(const std::string& accountId)
{
return convInfosFromPath(fileutils::get_data_dir() / accountId);
}
std::map<std::string, ConvInfo>
ConversationModule::convInfosFromPath(const std::filesystem::path& path)
{
std::map<std::string, ConvInfo> convInfos;
try {
// read file
std::lock_guard lock(dhtnet::fileutils::getFileLock(path / "convInfo"));
auto file = fileutils::loadFile("convInfo", path);
// load values
msgpack::unpacked result;
msgpack::unpack(result, (const char*) file.data(), file.size());
result.get().convert(convInfos);
} catch (const std::exception& e) {
JAMI_WARN("[convInfo] error loading convInfo: %s", e.what());
}
return convInfos;
}
std::map<std::string, ConversationRequest>
ConversationModule::convRequests(const std::string& accountId)
{
auto path = fileutils::get_data_dir() / accountId;
return convRequestsFromPath(path.string());
}
std::map<std::string, ConversationRequest>
ConversationModule::convRequestsFromPath(const std::filesystem::path& path)
{
std::map<std::string, ConversationRequest> convRequests;
try {
// read file
std::lock_guard lock(dhtnet::fileutils::getFileLock(path / "convRequests"));
auto file = fileutils::loadFile("convRequests", path);
// load values
msgpack::unpacked result;
msgpack::unpack(result, (const char*) file.data(), file.size(), 0);
result.get().convert(convRequests);
} catch (const std::exception& e) {
JAMI_WARN("[convInfo] error loading convInfo: %s", e.what());
}
return convRequests;
}
void
ConversationModule::addConvInfo(const ConvInfo& info)
{
pimpl_->addConvInfo(info);
}
void
ConversationModule::Impl::setConversationMembers(const std::string& convId,
const std::set<std::string>& members)
{
if (auto conv = getConversation(convId)) {
std::lock_guard lk(conv->mtx);
conv->info.members = members;
addConvInfo(conv->info);
}
}
std::shared_ptr<Conversation>
ConversationModule::getConversation(const std::string& convId)
{
if (auto conv = pimpl_->getConversation(convId)) {
std::lock_guard lk(conv->mtx);
return conv->conversation;
}
return nullptr;
}
std::shared_ptr<dhtnet::ChannelSocket>
ConversationModule::gitSocket(std::string_view deviceId, std::string_view convId) const
{
if (auto conv = pimpl_->getConversation(convId)) {
std::lock_guard lk(conv->mtx);
if (conv->conversation)
return conv->conversation->gitSocket(DeviceId(deviceId));
else if (conv->pending)
return conv->pending->socket;
}
return nullptr;
}
void
ConversationModule::addGitSocket(std::string_view deviceId,
std::string_view convId,
const std::shared_ptr<dhtnet::ChannelSocket>& channel)
{
if (auto conv = pimpl_->getConversation(convId)) {
std::lock_guard lk(conv->mtx);
conv->conversation->addGitSocket(DeviceId(deviceId), channel);
} else
JAMI_WARNING("addGitSocket: Unable to find conversation {:s}", convId);
}
void
ConversationModule::removeGitSocket(std::string_view deviceId, std::string_view convId)
{
pimpl_->withConversation(convId, [&](auto& conv) { conv.removeGitSocket(DeviceId(deviceId)); });
}
void
ConversationModule::shutdownConnections()
{
for (const auto& c : pimpl_->getSyncedConversations()) {
std::lock_guard lkc(c->mtx);
if (c->conversation)
c->conversation->shutdownConnections();
if (c->pending)
c->pending->socket = {};
}
}
void
ConversationModule::addSwarmChannel(const std::string& conversationId,
std::shared_ptr<dhtnet::ChannelSocket> channel)
{
pimpl_->withConversation(conversationId,
[&](auto& conv) { conv.addSwarmChannel(std::move(channel)); });
}
void
ConversationModule::connectivityChanged()
{
for (const auto& conv : pimpl_->getConversations())
conv->connectivityChanged();
}
std::shared_ptr<Typers>
ConversationModule::getTypers(const std::string& convId)
{
if (auto c = pimpl_->getConversation(convId)) {
std::lock_guard lk(c->mtx);
if (c->conversation)
return c->conversation->typers();
}
return nullptr;
}
} // namespace jami