diff --git a/src/jamidht/conversation_module.cpp b/src/jamidht/conversation_module.cpp index 72ff80693919e630104cd481caa6056a1cdd955e..db16c086d18ced92a6d2b0ee9740c2ae9b5d9e33 100644 --- a/src/jamidht/conversation_module.cpp +++ b/src/jamidht/conversation_module.cpp @@ -51,19 +51,27 @@ struct PendingConversationFetch std::shared_ptr<dhtnet::ChannelSocket> socket {}; }; +constexpr std::chrono::seconds MAX_FALLBACK {12 * 3600s}; + struct SyncedConversation { std::mutex mtx; + std::unique_ptr<asio::steady_timer> fallbackClone; + std::chrono::seconds fallbackTimer {5s}; ConvInfo info; std::unique_ptr<PendingConversationFetch> pending; std::shared_ptr<Conversation> conversation; SyncedConversation(const std::string& convId) : info {convId} - {} + { + fallbackClone = std::make_unique<asio::steady_timer>(*Manager::instance().ioContext()); + } SyncedConversation(const ConvInfo& info) : info {info} - {} + { + fallbackClone = std::make_unique<asio::steady_timer>(*Manager::instance().ioContext()); + } bool startFetch(const std::string& deviceId, bool checkIfConv = false) { @@ -410,6 +418,7 @@ public: const std::string& deviceId, const std::string& oldConvId = ""); void bootstrap(const std::string& convId); + void fallbackClone(const asio::error_code& ec, const std::string& conversationId); void cloneConversationFrom(const std::string& conversationId, const std::string& uri, const std::string& oldConvId = ""); @@ -845,7 +854,17 @@ ConversationModule::Impl::handlePendingConversation(const std::string& conversat } } } catch (const std::exception& e) { - JAMI_WARN("Something went wrong when cloning conversation: %s", e.what()); + JAMI_WARNING("Something went wrong when cloning conversation: {}. Re-clone in {}s", e.what(), conv->fallbackTimer.count()); + conv->fallbackClone->expires_at(std::chrono::steady_clock::now() + conv->fallbackTimer); + conv->fallbackTimer *= 2; + if (conv->fallbackTimer > MAX_FALLBACK) + conv->fallbackTimer = MAX_FALLBACK; + conv->fallbackClone->async_wait( + std::bind(&ConversationModule::Impl::fallbackClone, + shared_from_this(), + std::placeholders::_1, + conversationId)); + } lk.lock(); erasePending(); @@ -1306,6 +1325,18 @@ ConversationModule::Impl::cloneConversationFrom(const std::shared_ptr<SyncedConv return true; } else { conv->stopFetch(deviceId); + JAMI_WARNING("Clone failed. Re-clone in {}s", conv->fallbackTimer.count()); + conv->fallbackClone->expires_at(std::chrono::steady_clock::now() + conv->fallbackTimer); + conv->fallbackTimer *= 2; + if (conv->fallbackTimer > MAX_FALLBACK) + conv->fallbackTimer = MAX_FALLBACK; + conv->fallbackClone->async_wait( + std::bind(&ConversationModule::Impl::fallbackClone, + sthis, + std::placeholders::_1, + conversationId)); + + } } return false; @@ -1313,6 +1344,20 @@ ConversationModule::Impl::cloneConversationFrom(const std::shared_ptr<SyncedConv MIME_TYPE_GIT); } +void +ConversationModule::Impl::fallbackClone(const asio::error_code& ec, const std::string& conversationId) +{ + if (ec == asio::error::operation_aborted) + return; + auto conv = getConversation(conversationId); + if (!conv || conv->conversation) + return; + auto members = getConversationMembers(conversationId); + for (const auto& member : members) + if (member.at("uri") != username_) + cloneConversationFrom(conversationId, member.at("uri")); +} + void ConversationModule::Impl::bootstrap(const std::string& convId) { @@ -2289,11 +2334,6 @@ ConversationModule::syncConversations(const std::string& peer, const std::string if (conv->conversation) { if (!conv->conversation->isRemoving() && conv->conversation->isMember(peer, false)) { toFetch.emplace(conv->info.id); - // TODO connect to Swarm - // if (!conv->conversation->hasSwarmChannel(deviceId)) { - // if (auto acc = pimpl_->account_.lock()) { - // } - // } } } else if (!conv->info.isRemoved() && std::find(conv->info.members.begin(), conv->info.members.end(), peer)