Skip to content
Snippets Groups Projects
Commit a17a44c4 authored by Adrien Béraud's avatar Adrien Béraud
Browse files

Conversation: refactor pull

Change-Id: I9560cb0bd8bfab5a07039e93d4db8827906982ef
parent c7be4ea4
No related branches found
No related tags found
No related merge requests found
...@@ -211,8 +211,8 @@ public: ...@@ -211,8 +211,8 @@ public:
// Announce member events // Announce member events
if (c.at("type") == "member") { if (c.at("type") == "member") {
if (c.find("uri") != c.end() && c.find("action") != c.end()) { if (c.find("uri") != c.end() && c.find("action") != c.end()) {
auto uri = c.at("uri"); const auto& uri = c.at("uri");
auto actionStr = c.at("action"); const auto& actionStr = c.at("action");
auto action = -1; auto action = -1;
if (actionStr == "add") if (actionStr == "add")
action = 0; action = 0;
...@@ -354,6 +354,8 @@ public: ...@@ -354,6 +354,8 @@ public:
std::vector<std::map<std::string, std::string>> loadMessages(const std::string& fromMessage = "", std::vector<std::map<std::string, std::string>> loadMessages(const std::string& fromMessage = "",
const std::string& toMessage = "", const std::string& toMessage = "",
size_t n = 0); size_t n = 0);
void pull();
std::vector<std::map<std::string, std::string>> mergeHistory(const std::string& uri);
std::mutex pullcbsMtx_ {}; std::mutex pullcbsMtx_ {};
std::set<std::string> fetchingRemotes_ {}; // store current remote in fetch std::set<std::string> fetchingRemotes_ {}; // store current remote in fetch
...@@ -879,47 +881,47 @@ Conversation::fetchFrom(const std::string& uri) ...@@ -879,47 +881,47 @@ Conversation::fetchFrom(const std::string& uri)
} }
std::vector<std::map<std::string, std::string>> std::vector<std::map<std::string, std::string>>
Conversation::mergeHistory(const std::string& uri) Conversation::Impl::mergeHistory(const std::string& uri)
{ {
if (not pimpl_ or not pimpl_->repository_) { if (not repository_) {
JAMI_WARN("Invalid repo. Abort merge"); JAMI_WARN("Invalid repo. Abort merge");
return {}; return {};
} }
auto remoteHead = pimpl_->repository_->remoteHead(uri); auto remoteHead = repository_->remoteHead(uri);
if (remoteHead.empty()) { if (remoteHead.empty()) {
JAMI_WARN("Could not get HEAD of %s", uri.c_str()); JAMI_WARN("Could not get HEAD of %s", uri.c_str());
return {}; return {};
} }
// Validate commit // Validate commit
auto [newCommits, err] = pimpl_->repository_->validFetch(uri); auto [newCommits, err] = repository_->validFetch(uri);
if (newCommits.empty()) { if (newCommits.empty()) {
if (err) if (err)
JAMI_ERR("Could not validate history with %s", uri.c_str()); JAMI_ERR("Could not validate history with %s", uri.c_str());
pimpl_->repository_->removeBranchWith(uri); repository_->removeBranchWith(uri);
return {}; return {};
} }
// If validated, merge // If validated, merge
auto [ok, cid] = pimpl_->repository_->merge(remoteHead); auto [ok, cid] = repository_->merge(remoteHead);
if (!ok) { if (!ok) {
JAMI_ERR("Could not merge history with %s", uri.c_str()); JAMI_ERR("Could not merge history with %s", uri.c_str());
pimpl_->repository_->removeBranchWith(uri); repository_->removeBranchWith(uri);
return {}; return {};
} }
if (!cid.empty()) { if (!cid.empty()) {
// A merge commit was generated, should be added in new commits // A merge commit was generated, should be added in new commits
auto commit = pimpl_->repository_->getCommit(cid); auto commit = repository_->getCommit(cid);
if (commit != std::nullopt) if (commit != std::nullopt)
newCommits.emplace_back(*commit); newCommits.emplace_back(*commit);
} }
JAMI_DBG("Successfully merge history with %s", uri.c_str()); JAMI_DBG("Successfully merge history with %s", uri.c_str());
auto result = pimpl_->convCommitToMap(newCommits); auto result = convCommitToMap(newCommits);
for (const auto& commit : result) { for (const auto& commit : result) {
auto it = commit.find("type"); auto it = commit.find("type");
if (it != commit.end() && it->second == "member") { if (it != commit.end() && it->second == "member") {
pimpl_->repository_->refreshMembers(); repository_->refreshMembers();
} }
} }
return result; return result;
...@@ -930,49 +932,47 @@ Conversation::pull(const std::string& deviceId, OnPullCb&& cb, std::string commi ...@@ -930,49 +932,47 @@ Conversation::pull(const std::string& deviceId, OnPullCb&& cb, std::string commi
{ {
std::lock_guard<std::mutex> lk(pimpl_->pullcbsMtx_); std::lock_guard<std::mutex> lk(pimpl_->pullcbsMtx_);
auto isInProgress = not pimpl_->pullcbs_.empty(); auto isInProgress = not pimpl_->pullcbs_.empty();
pimpl_->pullcbs_.emplace_back( pimpl_->pullcbs_.emplace_back(deviceId, std::move(commitId), std::move(cb));
std::make_tuple<std::string, std::string, OnPullCb>(std::string(deviceId),
std::move(commitId),
std::move(cb)));
if (isInProgress) if (isInProgress)
return; return;
dht::ThreadPool::io().run([w = weak()] { dht::ThreadPool::io().run([w = weak()] {
auto sthis_ = w.lock(); if (auto sthis_ = w.lock())
if (!sthis_) sthis_->pimpl_->pull();
return; });
auto& repo = sthis_->pimpl_->repository_; }
void
Conversation::Impl::pull()
{
auto& repo = repository_;
std::string deviceId, commitId; std::string deviceId, commitId;
OnPullCb cb; OnPullCb cb;
while (true) { while (true) {
decltype(sthis_->pimpl_->pullcbs_)::value_type pullcb; decltype(pullcbs_)::value_type pullcb;
decltype(sthis_->pimpl_->fetchingRemotes_.begin()) it; decltype(fetchingRemotes_.begin()) it;
{ {
std::lock_guard<std::mutex> lk(sthis_->pimpl_->pullcbsMtx_); std::lock_guard<std::mutex> lk(pullcbsMtx_);
if (sthis_->pimpl_->pullcbs_.empty()) { if (pullcbs_.empty()) {
if (auto account = sthis_->pimpl_->account_.lock()) if (auto account = account_.lock())
emitSignal<DRing::ConversationSignal::ConversationSyncFinished>( emitSignal<DRing::ConversationSignal::ConversationSyncFinished>(
account->getAccountID().c_str()); account->getAccountID().c_str());
return; return;
} }
auto elem = sthis_->pimpl_->pullcbs_.front(); auto& elem = pullcbs_.front();
deviceId = std::get<0>(elem); deviceId = std::move(std::get<0>(elem));
commitId = std::get<1>(elem); commitId = std::move(std::get<1>(elem));
cb = std::move(std::get<2>(elem)); cb = std::move(std::get<2>(elem));
sthis_->pimpl_->pullcbs_.pop_front(); pullcbs_.pop_front();
// Check if already using this remote, if so, no need to pull yet // Check if already using this remote, if so, no need to pull yet
// One pull at a time to avoid any early EOF or fetch errors. // One pull at a time to avoid any early EOF or fetch errors.
if (sthis_->pimpl_->fetchingRemotes_.find(deviceId) if (fetchingRemotes_.find(deviceId) != fetchingRemotes_.end()) {
!= sthis_->pimpl_->fetchingRemotes_.end()) { pullcbs_.emplace_back(std::move(deviceId), std::move(commitId), std::move(cb));
sthis_->pimpl_->pullcbs_.emplace_back(
std::make_tuple<std::string, std::string, OnPullCb>(std::string(deviceId),
std::move(commitId),
std::move(cb)));
// Go to next pull // Go to next pull
continue; continue;
} }
auto itr = sthis_->pimpl_->fetchingRemotes_.emplace(deviceId); auto itr = fetchingRemotes_.emplace(deviceId);
if (!itr.second) { if (!itr.second) {
cb(false); cb(false);
continue; continue;
...@@ -982,15 +982,15 @@ Conversation::pull(const std::string& deviceId, OnPullCb&& cb, std::string commi ...@@ -982,15 +982,15 @@ Conversation::pull(const std::string& deviceId, OnPullCb&& cb, std::string commi
// If recently fetched, the commit can already be there, so no need to do complex operations // If recently fetched, the commit can already be there, so no need to do complex operations
if (commitId != "" && repo->getCommit(commitId, false) != std::nullopt) { if (commitId != "" && repo->getCommit(commitId, false) != std::nullopt) {
cb(true); cb(true);
std::lock_guard<std::mutex> lk(sthis_->pimpl_->pullcbsMtx_); std::lock_guard<std::mutex> lk(pullcbsMtx_);
sthis_->pimpl_->fetchingRemotes_.erase(it); fetchingRemotes_.erase(it);
continue; continue;
} }
// Pull from remote // Pull from remote
auto fetched = sthis_->fetchFrom(deviceId); auto fetched = repo->fetch(deviceId);
{ {
std::lock_guard<std::mutex> lk(sthis_->pimpl_->pullcbsMtx_); std::lock_guard<std::mutex> lk(pullcbsMtx_);
sthis_->pimpl_->fetchingRemotes_.erase(it); fetchingRemotes_.erase(it);
} }
if (!fetched) { if (!fetched) {
...@@ -998,9 +998,9 @@ Conversation::pull(const std::string& deviceId, OnPullCb&& cb, std::string commi ...@@ -998,9 +998,9 @@ Conversation::pull(const std::string& deviceId, OnPullCb&& cb, std::string commi
continue; continue;
} }
auto oldId = repo->getHead(); auto oldId = repo->getHead();
std::unique_lock<std::mutex> lk(sthis_->pimpl_->writeMtx_); std::unique_lock<std::mutex> lk(writeMtx_);
auto newCommits = sthis_->mergeHistory(deviceId); auto newCommits = mergeHistory(deviceId);
sthis_->pimpl_->announce(newCommits); announce(newCommits);
lk.unlock(); lk.unlock();
if (cb) if (cb)
cb(true); cb(true);
...@@ -1011,13 +1011,12 @@ Conversation::pull(const std::string& deviceId, OnPullCb&& cb, std::string commi ...@@ -1011,13 +1011,12 @@ Conversation::pull(const std::string& deviceId, OnPullCb&& cb, std::string commi
auto changedFiles = repo->changedFiles(diffStats); auto changedFiles = repo->changedFiles(diffStats);
if (find(changedFiles.begin(), changedFiles.end(), "profile.vcf") if (find(changedFiles.begin(), changedFiles.end(), "profile.vcf")
!= changedFiles.end()) { != changedFiles.end()) {
if (auto account = sthis_->pimpl_->account_.lock()) if (auto account = account_.lock())
emitSignal<DRing::ConversationSignal::ConversationProfileUpdated>( emitSignal<DRing::ConversationSignal::ConversationProfileUpdated>(
account->getAccountID(), repo->id(), repo->infos()); account->getAccountID(), repo->id(), repo->infos());
} }
} }
} }
});
} }
void void
......
...@@ -218,13 +218,6 @@ public: ...@@ -218,13 +218,6 @@ public:
*/ */
bool fetchFrom(const std::string& uri); bool fetchFrom(const std::string& uri);
/**
* Analyze if merge is possible and merge history
* @param uri the peer
* @return new commits
*/
std::vector<std::map<std::string, std::string>> mergeHistory(const std::string& uri);
/** /**
* Fetch and merge from peer * Fetch and merge from peer
* @param deviceId Peer device * @param deviceId Peer device
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment