Skip to content
Snippets Groups Projects
Commit bf12b7e6 authored by Adrien Béraud's avatar Adrien Béraud
Browse files

sync module: factor channel shutdown

Change-Id: I6db2411fe91946017179bbba4bb471895d069e42
parent dd3d17bb
No related branches found
No related tags found
No related merge requests found
......@@ -37,14 +37,13 @@ public:
std::recursive_mutex syncConnectionsMtx_;
std::map<DeviceId /* deviceId */, std::vector<std::shared_ptr<dhtnet::ChannelSocket>>> syncConnections_;
std::weak_ptr<Impl> weak() { return std::static_pointer_cast<Impl>(shared_from_this()); }
/**
* Build SyncMsg and send it on socket
* @param socket
*/
void syncInfos(const std::shared_ptr<dhtnet::ChannelSocket>& socket,
const std::shared_ptr<SyncMsg>& syncMsg);
void onChannelShutdown(const std::shared_ptr<dhtnet::ChannelSocket>& socket, const DeviceId& device);
};
SyncModule::Impl::Impl(std::weak_ptr<JamiAccount>&& account)
......@@ -149,6 +148,21 @@ SyncModule::SyncModule(std::weak_ptr<JamiAccount>&& account)
: pimpl_ {std::make_shared<Impl>(std::move(account))}
{}
void
SyncModule::Impl::onChannelShutdown(const std::shared_ptr<dhtnet::ChannelSocket>& socket, const DeviceId& device)
{
std::lock_guard lk(syncConnectionsMtx_);
auto connectionsIt = syncConnections_.find(device);
if (connectionsIt == syncConnections_.end())
return;
auto& connections = connectionsIt->second;
auto conn = std::find(connections.begin(), connections.end(), socket);
if (conn != connections.end())
connections.erase(conn);
if (connections.empty())
syncConnections_.erase(connectionsIt);
}
void
SyncModule::cacheSyncConnection(std::shared_ptr<dhtnet::ChannelSocket>&& socket,
const std::string& peerId,
......@@ -157,22 +171,11 @@ SyncModule::cacheSyncConnection(std::shared_ptr<dhtnet::ChannelSocket>&& socket,
std::lock_guard lk(pimpl_->syncConnectionsMtx_);
pimpl_->syncConnections_[device].emplace_back(socket);
socket->onShutdown([w = pimpl_->weak(), peerId, device, socket]() {
auto shared = w.lock();
if (!shared)
return;
std::lock_guard lk(shared->syncConnectionsMtx_);
auto& connections = shared->syncConnections_[device];
auto conn = connections.begin();
while (conn != connections.end()) {
if (*conn == socket)
conn = connections.erase(conn);
else
conn++;
}
socket->onShutdown([w = pimpl_->weak_from_this(), device, s=std::weak_ptr(socket)]() {
if (auto shared = w.lock())
shared->onChannelShutdown(s.lock(), device);
});
struct DecodingContext
{
msgpack::unpacker pac {[](msgpack::type::object_type, std::size_t, void*) { return true; },
......@@ -191,10 +194,9 @@ SyncModule::cacheSyncConnection(std::shared_ptr<dhtnet::ChannelSocket>&& socket,
ctx->pac.buffer_consumed(len);
msgpack::object_handle oh;
SyncMsg msg;
try {
while (ctx->pac.next(oh)) {
SyncMsg msg;
oh.get().convert(msg);
if (auto manager = acc->accountManager())
manager->onSyncData(std::move(msg.ds), false);
......@@ -222,23 +224,9 @@ SyncModule::syncWith(const DeviceId& deviceId,
return;
{
std::lock_guard lk(pimpl_->syncConnectionsMtx_);
socket->onShutdown([w = pimpl_->weak(), socket, deviceId]() {
// When sock is shutdown update syncConnections_ to be able to resync asap
auto shared = w.lock();
if (!shared)
return;
std::lock_guard lk(shared->syncConnectionsMtx_);
auto& connections = shared->syncConnections_[deviceId];
auto conn = connections.begin();
while (conn != connections.end()) {
if (*conn == socket)
conn = connections.erase(conn);
else
conn++;
}
if (connections.empty()) {
shared->syncConnections_.erase(deviceId);
}
socket->onShutdown([w = pimpl_->weak_from_this(), deviceId, s=std::weak_ptr(socket)] {
if (auto shared = w.lock())
shared->onChannelShutdown(s.lock(), deviceId);
});
pimpl_->syncConnections_[deviceId].emplace_back(socket);
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment