diff --git a/src/jamidht/conversation.cpp b/src/jamidht/conversation.cpp index 21ce9e035284b393eab70ec8c7e56a49d2970e31..acf15b26f2d23ff0816976f8fe0c687954327638 100644 --- a/src/jamidht/conversation.cpp +++ b/src/jamidht/conversation.cpp @@ -584,13 +584,12 @@ public: std::vector<std::map<std::string, std::string>> loadMessages(const LogOptions& options); std::vector<libjami::SwarmMessage> loadMessages2(const LogOptions& options, History* optHistory = nullptr); - void pull(); + void pull(const std::string& deviceId); std::vector<std::map<std::string, std::string>> mergeHistory(const std::string& uri); // Avoid multiple fetch/merges at the same time. std::mutex pullcbsMtx_ {}; - std::set<std::string> fetchingRemotes_ {}; // store current remote in fetch - std::deque<std::tuple<std::string, std::string, OnPullCb>> pullcbs_ {}; + std::map<std::string, std::deque<std::pair<std::string, OnPullCb>>> fetchingRemotes_ {}; // store current remote in fetch std::shared_ptr<TransferManager> transferManager_ {}; std::filesystem::path conversationDataPath_ {}; std::filesystem::path fetchedPath_ {}; @@ -1695,70 +1694,58 @@ Conversation::Impl::mergeHistory(const std::string& uri) bool Conversation::pull(const std::string& deviceId, OnPullCb&& cb, std::string commitId) { + JAMI_DEBUG("Pulling from {:s} with commit {:s}", deviceId, commitId); std::lock_guard lk(pimpl_->pullcbsMtx_); - auto isInProgress = not pimpl_->pullcbs_.empty(); - auto itPull = std::find_if(pimpl_->pullcbs_.begin(), - pimpl_->pullcbs_.end(), - [&](const auto& elem) { return std::get<0>(elem) == deviceId && std::get<1>(elem) == commitId; }); - if (itPull != pimpl_->pullcbs_.end()) { + auto [it, notInProgress] = pimpl_->fetchingRemotes_.emplace(deviceId, std::deque<std::pair<std::string, OnPullCb>>()); + auto& pullcbs = it->second; + auto itPull = std::find_if(pullcbs.begin(), + pullcbs.end(), + [&](const auto& elem) { return std::get<0>(elem) == commitId; }); + if (itPull != pullcbs.end()) { cb(false); return false; } - JAMI_INFO() << "Sync " << id() << " with " << deviceId; - pimpl_->pullcbs_.emplace_back(deviceId, std::move(commitId), std::move(cb)); - if (isInProgress) - return true; - dht::ThreadPool::io().run([w = weak()] { - if (auto sthis_ = w.lock()) - sthis_->pimpl_->pull(); - }); + pullcbs.emplace_back(std::move(commitId), std::move(cb)); + if (notInProgress) + dht::ThreadPool::io().run([w = weak(), deviceId] { + if (auto sthis_ = w.lock()) + sthis_->pimpl_->pull(deviceId); + }); return true; } void -Conversation::Impl::pull() +Conversation::Impl::pull(const std::string& deviceId) { auto& repo = repository_; - std::string deviceId, commitId; + std::string commitId; OnPullCb cb; while (true) { - decltype(pullcbs_)::value_type pullcb; - decltype(fetchingRemotes_.begin()) it; { std::lock_guard lk(pullcbsMtx_); - if (pullcbs_.empty()) - return; - auto& elem = pullcbs_.front(); - deviceId = std::move(std::get<0>(elem)); - commitId = std::move(std::get<1>(elem)); - cb = std::move(std::get<2>(elem)); - pullcbs_.pop_front(); - - // Check if already using this remote, if so, no need to pull yet - // One pull at a time to avoid any early EOF or fetch errors. - auto itr = fetchingRemotes_.emplace(deviceId); - if (!itr.second) { - // Go to next pull - pullcbs_.emplace_back(std::move(deviceId), std::move(commitId), std::move(cb)); - continue; + auto it = fetchingRemotes_.find(deviceId); + if (it == fetchingRemotes_.end()) { + JAMI_ERROR("Could not find device {:s} in fetchingRemotes", deviceId); + break; + } + auto& pullcbs = it->second; + if (pullcbs.empty()) { + fetchingRemotes_.erase(it); + break; } - it = itr.first; + auto& elem = pullcbs.front(); + commitId = std::move(std::get<0>(elem)); + cb = std::move(std::get<1>(elem)); + pullcbs.pop_front(); } // If recently fetched, the commit can already be there, so no need to do complex operations if (commitId != "" && repo->getCommit(commitId, false) != std::nullopt) { cb(true); - std::lock_guard lk(pullcbsMtx_); - fetchingRemotes_.erase(it); continue; } // Pull from remote auto fetched = repo->fetch(deviceId); - { - std::lock_guard lk(pullcbsMtx_); - fetchingRemotes_.erase(it); - } - if (!fetched) { cb(false); continue;