Skip to content
Snippets Groups Projects
Commit 3dc8883c authored by Sébastien Blin's avatar Sébastien Blin
Browse files

conversation_module: add a fallback if the clone fails

This fix testAddAcceptOfflineThenConnects without reverting
7463a8a2
The idea is in case of a sporadic clone failure (peer offline but
present, socket broken while cloning, etc), we schedule a new
clone operation later.

GitLab: #991
Change-Id: Ifacd6bfe383902bff6cb818e41ff8f5ddd0f6917
parent 3357b8fb
No related branches found
No related tags found
No related merge requests found
...@@ -51,19 +51,27 @@ struct PendingConversationFetch ...@@ -51,19 +51,27 @@ struct PendingConversationFetch
std::shared_ptr<dhtnet::ChannelSocket> socket {}; std::shared_ptr<dhtnet::ChannelSocket> socket {};
}; };
constexpr std::chrono::seconds MAX_FALLBACK {12 * 3600s};
struct SyncedConversation struct SyncedConversation
{ {
std::mutex mtx; std::mutex mtx;
std::unique_ptr<asio::steady_timer> fallbackClone;
std::chrono::seconds fallbackTimer {5s};
ConvInfo info; ConvInfo info;
std::unique_ptr<PendingConversationFetch> pending; std::unique_ptr<PendingConversationFetch> pending;
std::shared_ptr<Conversation> conversation; std::shared_ptr<Conversation> conversation;
SyncedConversation(const std::string& convId) SyncedConversation(const std::string& convId)
: info {convId} : info {convId}
{} {
fallbackClone = std::make_unique<asio::steady_timer>(*Manager::instance().ioContext());
}
SyncedConversation(const ConvInfo& info) SyncedConversation(const ConvInfo& info)
: info {info} : info {info}
{} {
fallbackClone = std::make_unique<asio::steady_timer>(*Manager::instance().ioContext());
}
bool startFetch(const std::string& deviceId, bool checkIfConv = false) bool startFetch(const std::string& deviceId, bool checkIfConv = false)
{ {
...@@ -410,6 +418,7 @@ public: ...@@ -410,6 +418,7 @@ public:
const std::string& deviceId, const std::string& deviceId,
const std::string& oldConvId = ""); const std::string& oldConvId = "");
void bootstrap(const std::string& convId); void bootstrap(const std::string& convId);
void fallbackClone(const asio::error_code& ec, const std::string& conversationId);
void cloneConversationFrom(const std::string& conversationId, void cloneConversationFrom(const std::string& conversationId,
const std::string& uri, const std::string& uri,
const std::string& oldConvId = ""); const std::string& oldConvId = "");
...@@ -845,7 +854,17 @@ ConversationModule::Impl::handlePendingConversation(const std::string& conversat ...@@ -845,7 +854,17 @@ ConversationModule::Impl::handlePendingConversation(const std::string& conversat
} }
} }
} catch (const std::exception& e) { } catch (const std::exception& e) {
JAMI_WARN("Something went wrong when cloning conversation: %s", e.what()); JAMI_WARNING("Something went wrong when cloning conversation: {}. Re-clone in {}s", e.what(), conv->fallbackTimer.count());
conv->fallbackClone->expires_at(std::chrono::steady_clock::now() + conv->fallbackTimer);
conv->fallbackTimer *= 2;
if (conv->fallbackTimer > MAX_FALLBACK)
conv->fallbackTimer = MAX_FALLBACK;
conv->fallbackClone->async_wait(
std::bind(&ConversationModule::Impl::fallbackClone,
shared_from_this(),
std::placeholders::_1,
conversationId));
} }
lk.lock(); lk.lock();
erasePending(); erasePending();
...@@ -1306,6 +1325,18 @@ ConversationModule::Impl::cloneConversationFrom(const std::shared_ptr<SyncedConv ...@@ -1306,6 +1325,18 @@ ConversationModule::Impl::cloneConversationFrom(const std::shared_ptr<SyncedConv
return true; return true;
} else { } else {
conv->stopFetch(deviceId); conv->stopFetch(deviceId);
JAMI_WARNING("Clone failed. Re-clone in {}s", conv->fallbackTimer.count());
conv->fallbackClone->expires_at(std::chrono::steady_clock::now() + conv->fallbackTimer);
conv->fallbackTimer *= 2;
if (conv->fallbackTimer > MAX_FALLBACK)
conv->fallbackTimer = MAX_FALLBACK;
conv->fallbackClone->async_wait(
std::bind(&ConversationModule::Impl::fallbackClone,
sthis,
std::placeholders::_1,
conversationId));
} }
} }
return false; return false;
...@@ -1313,6 +1344,20 @@ ConversationModule::Impl::cloneConversationFrom(const std::shared_ptr<SyncedConv ...@@ -1313,6 +1344,20 @@ ConversationModule::Impl::cloneConversationFrom(const std::shared_ptr<SyncedConv
MIME_TYPE_GIT); MIME_TYPE_GIT);
} }
void
ConversationModule::Impl::fallbackClone(const asio::error_code& ec, const std::string& conversationId)
{
if (ec == asio::error::operation_aborted)
return;
auto conv = getConversation(conversationId);
if (!conv || conv->conversation)
return;
auto members = getConversationMembers(conversationId);
for (const auto& member : members)
if (member.at("uri") != username_)
cloneConversationFrom(conversationId, member.at("uri"));
}
void void
ConversationModule::Impl::bootstrap(const std::string& convId) ConversationModule::Impl::bootstrap(const std::string& convId)
{ {
...@@ -2289,11 +2334,6 @@ ConversationModule::syncConversations(const std::string& peer, const std::string ...@@ -2289,11 +2334,6 @@ ConversationModule::syncConversations(const std::string& peer, const std::string
if (conv->conversation) { if (conv->conversation) {
if (!conv->conversation->isRemoving() && conv->conversation->isMember(peer, false)) { if (!conv->conversation->isRemoving() && conv->conversation->isMember(peer, false)) {
toFetch.emplace(conv->info.id); toFetch.emplace(conv->info.id);
// TODO connect to Swarm
// if (!conv->conversation->hasSwarmChannel(deviceId)) {
// if (auto acc = pimpl_->account_.lock()) {
// }
// }
} }
} else if (!conv->info.isRemoved() } else if (!conv->info.isRemoved()
&& std::find(conv->info.members.begin(), conv->info.members.end(), peer) && std::find(conv->info.members.begin(), conv->info.members.end(), peer)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment