Commit 7a71ff44 authored by Sébastien Blin's avatar Sébastien Blin Committed by Adrien Béraud
Browse files

sync: synchronize avatars from swarm members

This work follow the JamiAccount split started before. This moves
the ConnectionManager's callbacks code related to data transfer into
transfer_channel_handler and improves syncing by sending contact's
avatars (where a swarm is present) to new devices.

Note: for now, contact's avatar are managed by the client. So, this
code will only transmit avatars if the profile is found. For now,
the only path tested is the one used by jami-libclient. To be able
to fully sync all avatars, the avatar management should be moved in
the daemon with future work.

Finally, in syncHistory a test is added to validate the behavior.

Doc: https://git.jami.net/savoirfairelinux/ring-project/-/wikis/technical/3.8-Sync-profiles

GitLab: https://git.jami.net/savoirfairelinux/ring-project/-/issues/1282
Change-Id: Ic98da34aabf1be070a57dcac55bba0a00c555445
parent 7e6d84ee
......@@ -20,14 +20,15 @@
#include "data_transfer.h"
#include "manager.h"
#include "base64.h"
#include "client/ring_signal.h"
#include "fileutils.h"
#include "jamidht/jamiaccount.h"
#include "jamidht/p2p.h"
#include "manager.h"
#include "map_utils.h"
#include "peer_connection.h"
#include "fileutils.h"
#include "string_utils.h"
#include "map_utils.h"
#include "client/ring_signal.h"
#include "jamidht/p2p.h"
#include <thread>
#include <stdexcept>
......@@ -354,13 +355,14 @@ private:
onRecvCb_(std::string_view(buf.data(), buf.size()));
}
JAMI_DBG() << "FTP#" << getId() << ": sent " << info_.bytesProgress << " bytes";
if (internalCompletionCb_)
internalCompletionCb_(info_.path);
if (info_.bytesProgress != info_.totalSize)
emit(DRing::DataTransferEventCode::closed_by_peer);
else
else {
if (internalCompletionCb_)
internalCompletionCb_(info_.path);
emit(DRing::DataTransferEventCode::finished);
}
});
}
......@@ -709,7 +711,7 @@ FileInfo::emit(DRing::DataTransferEventCode code)
{
if (finishedCb_ && code >= DRing::DataTransferEventCode::finished)
finishedCb_(uint32_t(code));
if (fileId_ != "profile.vcf") {
if (interactionId_ != "") {
// Else it's an internal transfer
runOnMainThread([info = info_, iid = interactionId_, fid = fileId_, code]() {
emitSignal<DRing::DataTransferSignal::DataTransferEvent>(info.accountId,
......@@ -882,6 +884,8 @@ public:
fileutils::check_dir(conversationDataPath_.c_str());
waitingPath_ = conversationDataPath_ + DIR_SEPARATOR_STR + "waiting";
}
profilesPath_ = fileutils::get_data_dir() + DIR_SEPARATOR_STR + accountId_
+ DIR_SEPARATOR_STR + "profiles";
loadWaiting();
}
......@@ -918,6 +922,7 @@ public:
std::string accountId_ {};
std::string to_ {};
std::string waitingPath_ {};
std::string profilesPath_ {};
std::string conversationDataPath_ {};
// Pre swarm
......@@ -928,7 +933,7 @@ public:
std::map<std::string, WaitingRequest> waitingIds_ {};
std::map<std::shared_ptr<ChannelSocket>, std::shared_ptr<OutgoingFile>> outgoings_ {};
std::map<std::string, std::shared_ptr<IncomingFile>> incomings_ {};
std::map<std::string, std::shared_ptr<IncomingFile>> vcards_ {};
std::map<std::pair<std::string, std::string>, std::shared_ptr<IncomingFile>> vcards_ {};
};
TransferManager::TransferManager(const std::string& accountId, const std::string& to)
......@@ -1272,34 +1277,48 @@ TransferManager::onIncomingProfile(const std::shared_ptr<ChannelSocket>& channel
{
if (!channel)
return;
auto name = channel->name();
auto lastSep = name.find_last_of('/');
auto fileId = name.substr(lastSep + 1);
auto deviceId = channel->deviceId().toString();
auto cert = channel->peerCertificate();
if (!cert || !cert->issuer)
if (!cert || !cert->issuer || fileId.find(".vcf") == std::string::npos)
return;
auto uri = fileId == "profile.vcf" ? cert->issuer->getId().toString()
: fileId.substr(0, fileId.size() - 4 /*.vcf*/);
std::lock_guard<std::mutex> lk(pimpl_->mapMutex_);
auto idx = std::pair<std::string, std::string> {deviceId, uri};
// Check if not already an incoming file for this id and that we are waiting this file
auto itV = pimpl_->vcards_.find(deviceId);
auto itV = pimpl_->vcards_.find(idx);
if (itV != pimpl_->vcards_.end()) {
channel->shutdown();
return;
}
auto tid = generateUID();
DRing::DataTransferInfo info;
info.accountId = pimpl_->accountId_;
info.conversationId = pimpl_->to_;
info.path = fileutils::get_cache_dir() + DIR_SEPARATOR_STR + pimpl_->accountId_
+ DIR_SEPARATOR_STR + "vcard" + DIR_SEPARATOR_STR + deviceId;
+ DIR_SEPARATOR_STR + "vcard" + DIR_SEPARATOR_STR + deviceId + "_" + uri + "_"
+ std::to_string(tid);
auto ifile = std::make_shared<IncomingFile>(std::move(channel), info, "profile.vcf", "");
auto res = pimpl_->vcards_.emplace(deviceId, std::move(ifile));
auto res = pimpl_->vcards_.emplace(idx, std::move(ifile));
if (res.second) {
res.first->second->onFinished([w = weak(),
uri = std::move(uri),
deviceId = std::move(deviceId),
accountId = pimpl_->accountId_,
cert = std::move(cert),
path = info.path](uint32_t code) {
// schedule destroy transfer as not needed
dht::ThreadPool().computation().run([w,
uri = std::move(uri),
deviceId = std::move(deviceId),
accountId = std::move(accountId),
cert = std::move(cert),
......@@ -1308,13 +1327,12 @@ TransferManager::onIncomingProfile(const std::shared_ptr<ChannelSocket>& channel
if (auto sthis_ = w.lock()) {
auto& pimpl = sthis_->pimpl_;
std::lock_guard<std::mutex> lk {pimpl->mapMutex_};
auto itO = pimpl->vcards_.find(deviceId);
auto itO = pimpl->vcards_.find({deviceId, uri});
if (itO != pimpl->vcards_.end())
pimpl->vcards_.erase(itO);
if (code == uint32_t(DRing::DataTransferEventCode::finished))
emitSignal<DRing::ConfigurationSignal::ProfileReceived>(accountId,
cert->issuer->getId()
.toString(),
uri,
path);
}
});
......@@ -1323,6 +1341,13 @@ TransferManager::onIncomingProfile(const std::shared_ptr<ChannelSocket>& channel
}
}
std::string
TransferManager::profilePath(const std::string& contactId) const
{
// TODO Android? iOS?
return pimpl_->profilesPath_ + DIR_SEPARATOR_STR + base64::encode(contactId) + ".vcf";
}
std::vector<WaitingRequest>
TransferManager::waitingRequests() const
{
......
......@@ -235,6 +235,12 @@ public:
bool isWaiting(const std::string& fileId) const;
void onIncomingProfile(const std::shared_ptr<ChannelSocket>& channel);
/**
* @param contactId contact's id
* @return where profile.vcf is stored
*/
std::string profilePath(const std::string& contactId) const;
private:
std::weak_ptr<TransferManager> weak()
{
......
......@@ -35,6 +35,8 @@ list (APPEND Source_Files__jamidht
"${CMAKE_CURRENT_SOURCE_DIR}/channel_handler.h"
"${CMAKE_CURRENT_SOURCE_DIR}/conversation_channel_handler.h"
"${CMAKE_CURRENT_SOURCE_DIR}/conversation_channel_handler.cpp"
"${CMAKE_CURRENT_SOURCE_DIR}/transfer_channel_handler.h"
"${CMAKE_CURRENT_SOURCE_DIR}/transfer_channel_handler.cpp"
"${CMAKE_CURRENT_SOURCE_DIR}/conversation_module.h"
"${CMAKE_CURRENT_SOURCE_DIR}/conversation_module.cpp"
"${CMAKE_CURRENT_SOURCE_DIR}/namedirectory.cpp"
......
......@@ -43,7 +43,9 @@ libringacc_la_SOURCES = \
./jamidht/sync_channel_handler.h \
./jamidht/sync_channel_handler.cpp \
./jamidht/sync_module.h \
./jamidht/sync_module.cpp
./jamidht/sync_module.cpp \
./jamidht/transfer_channel_handler.h \
./jamidht/transfer_channel_handler.cpp
if RINGNS
libringacc_la_SOURCES += \
......
......@@ -1091,7 +1091,7 @@ Conversation::downloadFile(const std::string& interactionId,
sha3sum,
path,
totalSize);
acc->askForFileChannel(shared->id(), deviceId, fileId, start, end);
acc->askForFileChannel(shared->id(), deviceId, interactionId, fileId, start, end);
}
});
return true;
......
......@@ -22,7 +22,7 @@
namespace jami {
ConversationChannelHandler::ConversationChannelHandler(std::weak_ptr<JamiAccount>&& acc,
ConversationChannelHandler::ConversationChannelHandler(const std::shared_ptr<JamiAccount>& acc,
ConnectionManager& cm)
: ChannelHandlerInterface()
, account_(acc)
......
......@@ -32,7 +32,7 @@ namespace jami {
class ConversationChannelHandler : public ChannelHandlerInterface
{
public:
ConversationChannelHandler(std::weak_ptr<JamiAccount>&& acc, ConnectionManager& cm);
ConversationChannelHandler(const std::shared_ptr<JamiAccount>& acc, ConnectionManager& cm);
~ConversationChannelHandler();
/**
......
......@@ -485,6 +485,20 @@ ConversationModule::Impl::handlePendingConversations()
*commits.rbegin(),
true);
});
// Download members profile on first sync
if (auto cert = tls::CertificateStore::instance().getCertificate(deviceId)) {
if (cert->issuer
&& cert->issuer->getId().toString() == sthis->username_) {
if (auto acc = sthis->account_.lock()) {
for (const auto& member : conversation->getMembers()) {
if (member.at("uri") != sthis->username_)
acc->askForProfile(conversationId,
deviceId,
member.at("uri"));
}
}
}
}
}
} catch (const std::exception& e) {
emitSignal<DRing::ConversationSignal::OnConversationError>(sthis->accountId_,
......
......@@ -40,6 +40,7 @@
#include "multiplexed_socket.h"
#include "conversation_channel_handler.h"
#include "sync_channel_handler.h"
#include "transfer_channel_handler.h"
#include "sip/sdp.h"
#include "sip/sipvoiplink.h"
......@@ -2192,38 +2193,6 @@ JamiAccount::doRegister_()
std::lock_guard<std::mutex> lk(transfersMtx_);
incomingFileTransfers_.emplace(tid);
return true;
} else if (isDataTransfer) {
// Check if sync request is from same account
std::promise<bool> accept;
std::future<bool> fut = accept.get_future();
auto idstr = name.substr(16);
auto sep = idstr.find('/');
auto lastSep = idstr.find_last_of('/');
auto conversationId = idstr.substr(0, sep);
auto fileHost = idstr.substr(sep + 1, lastSep - sep - 1);
auto fileId = idstr.substr(lastSep + 1);
if (fileHost == currentDeviceId())
return false;
sep = fileId.find_last_of('?');
if (sep != std::string::npos) {
fileId = fileId.substr(0, sep);
}
// Check if peer is member of the conversation
if (fileId == "profile.vcf") {
auto members = convModule()->getConversationMembers(conversationId);
return std::find_if(members.begin(),
members.end(),
[&](auto m) { return m["uri"] == issuer; })
!= members.end();
}
return convModule()->onFileChannelRequest(conversationId,
issuer,
fileId,
!noSha3sumVerification_);
}
return false;
});
......@@ -2248,7 +2217,7 @@ JamiAccount::doRegister_()
return;
incomingFileTransfers_.erase(it);
lk.unlock();
std::function<void(const std::string&)> cb;
InternalCompletionCb cb;
if (isVCard)
cb = [peerId, accountId = getAccountID()](const std::string& path) {
emitSignal<DRing::ConfigurationSignal::ProfileReceived>(accountId,
......@@ -2323,53 +2292,6 @@ JamiAccount::doRegister_()
shared->gitServers_.erase(serverId);
});
});
} else if (name.find(DATA_TRANSFER_URI) == 0) {
auto idstr = name.substr(16);
auto sep = idstr.find('/');
auto lastSep = idstr.find_last_of('/');
auto conversationId = idstr.substr(0, sep);
auto fileId = idstr.substr(lastSep + 1);
if (channel->isInitiator())
return;
sep = fileId.find_last_of('?');
std::string arguments;
if (sep != std::string::npos) {
arguments = fileId.substr(sep + 1);
fileId = fileId.substr(0, sep);
}
if (fileId == "profile.vcf") {
std::string path = fileutils::sha3File(idPath_ + DIR_SEPARATOR_STR
+ "profile.vcf");
dataTransfer()->transferFile(channel, fileId, "", path);
return;
}
auto dt = dataTransfer(conversationId);
sep = fileId.find('_');
if (!dt or sep == std::string::npos) {
channel->shutdown();
return;
}
auto interactionId = fileId.substr(0, sep);
std::string path = dt->path(fileId);
auto start = 0u, end = 0u;
for (const auto arg : jami::split_string(arguments, '&')) {
auto keyVal = jami::split_string(arg, '=');
if (keyVal.size() == 2) {
if (keyVal[0] == "start") {
std::from_chars(keyVal[1].data(),
keyVal[1].data() + keyVal[1].size(),
start);
} else if (keyVal[0] == "end") {
std::from_chars(keyVal[1].data(),
keyVal[1].data() + keyVal[1].size(),
end);
}
}
}
dt->transferFile(channel, fileId, interactionId, path, start, end);
} else {
// TODO move git://
auto uri = Uri(name);
......@@ -4172,12 +4094,16 @@ JamiAccount::sendProfile(const std::string& deviceId)
sendFile(deviceId,
idPath_ + DIR_SEPARATOR_STR + "profile.vcf",
[deviceId, accId = getAccountID()](const std::string&) {
[deviceId, this](const std::string&) {
// Mark the VCard as sent
auto path = fileutils::get_cache_dir() + DIR_SEPARATOR_STR + accId
auto path = fileutils::get_cache_dir() + DIR_SEPARATOR_STR + getAccountID()
+ DIR_SEPARATOR_STR + "vcard" + DIR_SEPARATOR_STR + deviceId;
std::lock_guard<std::mutex> lock(fileutils::getFileLock(path));
if (fileutils::isFile(path))
return;
fileutils::ofstream(path);
});
} catch (const std::exception& e) {
JAMI_ERR() << e.what();
}
......@@ -4413,6 +4339,7 @@ JamiAccount::transferFile(const std::string& conversationId,
void
JamiAccount::askForFileChannel(const std::string& conversationId,
const std::string& deviceId,
const std::string& interactionId,
const std::string& fileId,
size_t start,
size_t end)
......@@ -4432,21 +4359,23 @@ JamiAccount::askForFileChannel(const std::string& conversationId,
connectionManager_->connectDevice(
did,
channelName,
[this, conversationId, fileId](std::shared_ptr<ChannelSocket> channel, const DeviceId&) {
[this, conversationId, fileId, interactionId](std::shared_ptr<ChannelSocket> channel,
const DeviceId&) {
if (!channel)
return;
dht::ThreadPool::io().run([w = weak(), conversationId, channel, fileId] {
auto shared = w.lock();
if (!shared)
return;
auto dt = shared->dataTransfer(conversationId);
if (!dt)
return;
if (fileId == "profile.vcf")
dt->onIncomingProfile(channel);
else
dt->onIncomingFileTransfer(fileId, channel);
});
dht::ThreadPool::io().run(
[w = weak(), conversationId, channel, fileId, interactionId] {
auto shared = w.lock();
if (!shared)
return;
auto dt = shared->dataTransfer(conversationId);
if (!dt)
return;
if (interactionId.empty())
dt->onIncomingProfile(channel);
else
dt->onIncomingFileTransfer(fileId, channel);
});
},
false);
};
......@@ -4467,15 +4396,44 @@ JamiAccount::askForFileChannel(const std::string& conversationId,
}
}
void
JamiAccount::askForProfile(const std::string& conversationId,
const std::string& deviceId,
const std::string& memberUri)
{
std::lock_guard<std::mutex> lkCM(connManagerMtx_);
if (!connectionManager_)
return;
auto channelName = DATA_TRANSFER_URI + conversationId + "/profile/" + memberUri + ".vcf";
// We can avoid to negotiate new sessions, as the file notif
// probably come from an online device or last connected device.
connectionManager_->connectDevice(
DeviceId(deviceId),
channelName,
[this, conversationId](std::shared_ptr<ChannelSocket> channel, const DeviceId&) {
if (!channel)
return;
dht::ThreadPool::io().run([w = weak(), conversationId, channel] {
if (auto shared = w.lock())
if (auto dt = shared->dataTransfer(conversationId))
dt->onIncomingProfile(channel);
});
},
false);
}
void
JamiAccount::initConnectionManager()
{
if (!connectionManager_) {
connectionManager_ = std::make_unique<ConnectionManager>(*this);
channelHandlers_[Uri::Scheme::GIT]
= std::make_unique<ConversationChannelHandler>(weak(), *connectionManager_.get());
= std::make_unique<ConversationChannelHandler>(shared(), *connectionManager_.get());
channelHandlers_[Uri::Scheme::SYNC]
= std::make_unique<SyncChannelHandler>(weak(), *connectionManager_.get());
= std::make_unique<SyncChannelHandler>(shared(), *connectionManager_.get());
channelHandlers_[Uri::Scheme::DATA_TRANSFER]
= std::make_unique<TransferChannelHandler>(shared(), *connectionManager_.get());
}
}
......
......@@ -560,10 +560,15 @@ public:
void askForFileChannel(const std::string& conversationId,
const std::string& deviceId,
const std::string& interactionId,
const std::string& fileId,
size_t start = 0,
size_t end = 0);
void askForProfile(const std::string& conversationId,
const std::string& deviceId,
const std::string& memberUri);
/**
* Retrieve linked transfer manager
* @param id conversationId or empty for fallback
......@@ -581,7 +586,7 @@ public:
// Note: when swarm will be merged, this can be moved in transferManager
bool needToSendProfile(const std::string& deviceId);
/**
* Send Profile via cached SIP connection
* Send profile via cached SIP connection
* @param deviceId Device that will receive the profile
*/
void sendProfile(const std::string& deviceId);
......@@ -590,6 +595,8 @@ public:
AccountManager* accountManager() { return accountManager_.get(); }
bool sha3SumVerify() const { return !noSha3sumVerification_; }
private:
NON_COPYABLE(JamiAccount);
......
......@@ -257,7 +257,9 @@ void
MultiplexedSocket::Impl::onAccept(const std::string& name, uint16_t channel)
{
std::lock_guard<std::mutex> lkSockets(socketsMutex);
auto socket = makeSocket(name, channel);
auto& socket = sockets[channel];
if (!socket)
socket = makeSocket(name, channel);
onChannelReady_(deviceId, socket);
socket->ready();
}
......
......@@ -24,7 +24,8 @@ static constexpr const char SYNC_URI[] {"sync://"};
namespace jami {
SyncChannelHandler::SyncChannelHandler(std::weak_ptr<JamiAccount>&& acc, ConnectionManager& cm)
SyncChannelHandler::SyncChannelHandler(const std::shared_ptr<JamiAccount>& acc,
ConnectionManager& cm)
: ChannelHandlerInterface()
, account_(acc)
, connectionManager_(cm)
......@@ -69,7 +70,6 @@ SyncChannelHandler::onReady(const DeviceId& deviceId,
if (!cert || !acc || !acc->syncModule())
return;
acc->syncModule()->cacheSyncConnection(std::move(channel), cert->getIssuerUID(), deviceId);
acc->sendProfile(deviceId.toString());
}
} // namespace jami
\ No newline at end of file
......@@ -32,7 +32,7 @@ namespace jami {
class SyncChannelHandler : public ChannelHandlerInterface
{
public:
SyncChannelHandler(std::weak_ptr<JamiAccount>&& acc, ConnectionManager& cm);
SyncChannelHandler(const std::shared_ptr<JamiAccount>& acc, ConnectionManager& cm);
~SyncChannelHandler();
/**
......
/*
* Copyright (C) 2021 Savoir-faire Linux Inc.
*
* Author: Sébastien Blin <sebastien.blin@savoirfairelinux.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#include "jamidht/transfer_channel_handler.h"
#include <charconv>
#include "fileutils.h"
namespace jami {
TransferChannelHandler::TransferChannelHandler(const std::shared_ptr<JamiAccount>& account,
ConnectionManager& cm)
: ChannelHandlerInterface()
, account_(account)
, connectionManager_(cm)
{
auto acc = account_.lock();
idPath_ = fileutils::get_data_dir() + DIR_SEPARATOR_STR + acc->getAccountID();
}
TransferChannelHandler::~TransferChannelHandler() {}
void
TransferChannelHandler::connect(const DeviceId& deviceId,
const std::string& channelName,
ConnectCb&& cb)
{}
bool
TransferChannelHandler::onRequest(const DeviceId& deviceId, const std::string& name)
{
auto acc = account_.lock();
auto cert = tls::CertificateStore::instance().getCertificate(deviceId.toString());
if (!acc || !cert || !cert->issuer)
return false;
auto uri = cert->issuer->getId().toString();
auto idstr = name.substr(16);
auto sep = idstr.find('/');
auto lastSep = idstr.find_last_of('/');
auto conversationId = idstr.substr(0, sep);
auto fileHost = idstr.substr(sep + 1, lastSep - sep - 1);
auto fileId = idstr.substr(lastSep + 1);
if (fileHost == acc->currentDeviceId())
return false;