Skip to content
Snippets Groups Projects
Commit c2e092e3 authored by Adrien Béraud's avatar Adrien Béraud Committed by Adrien Béraud
Browse files

ConnectionManager: clarify variable names

* mtx -> mutex
* cbIds_ -> pendingCbs_

Change-Id: Iea5c1ed55efe197024f25adda4860e599bc94892
parent a4872393
Branches
No related tags found
No related merge requests found
...@@ -100,7 +100,7 @@ struct ConnectionInfo ...@@ -100,7 +100,7 @@ struct ConnectionInfo
// Used to store currently non ready TLS Socket // Used to store currently non ready TLS Socket
std::unique_ptr<TlsSocketEndpoint> tls_ {nullptr}; std::unique_ptr<TlsSocketEndpoint> tls_ {nullptr};
std::shared_ptr<MultiplexedSocket> socket_ {}; std::shared_ptr<MultiplexedSocket> socket_ {};
std::set<dht::Value::Id> cbIds_ {}; std::set<dht::Value::Id> pendingCbs_ {};
std::function<void(bool)> onConnected_; std::function<void(bool)> onConnected_;
std::unique_ptr<asio::steady_timer> waitForAnswer_ {}; std::unique_ptr<asio::steady_timer> waitForAnswer_ {};
...@@ -157,7 +157,7 @@ struct PendingCb { ...@@ -157,7 +157,7 @@ struct PendingCb {
struct DeviceInfo { struct DeviceInfo {
const DeviceId deviceId; const DeviceId deviceId;
mutable std::mutex mtx_ {}; mutable std::mutex mutex_ {};
std::shared_ptr<dht::crypto::Certificate> cert; std::shared_ptr<dht::crypto::Certificate> cert;
std::map<dht::Value::Id, std::shared_ptr<ConnectionInfo>> info; std::map<dht::Value::Id, std::shared_ptr<ConnectionInfo>> info;
std::map<dht::Value::Id, PendingCb> connecting; std::map<dht::Value::Id, PendingCb> connecting;
...@@ -300,7 +300,7 @@ struct DeviceInfo { ...@@ -300,7 +300,7 @@ struct DeviceInfo {
cb.cb(sock, deviceId); cb.cb(sock, deviceId);
} }
void executePendingOperations(dht::Value::Id vid, const std::shared_ptr<ChannelSocket>& sock, bool accepted = true) { void executePendingOperations(dht::Value::Id vid, const std::shared_ptr<ChannelSocket>& sock, bool accepted = true) {
std::unique_lock lock(mtx_); std::unique_lock lock(mutex_);
executePendingOperations(lock, vid, sock, accepted); executePendingOperations(lock, vid, sock, accepted);
} }
...@@ -324,7 +324,7 @@ struct DeviceInfo { ...@@ -324,7 +324,7 @@ struct DeviceInfo {
std::vector<std::map<std::string, std::string>> std::vector<std::map<std::string, std::string>>
getConnectionList(tls::CertificateStore& certStore) const { getConnectionList(tls::CertificateStore& certStore) const {
std::lock_guard lk(mtx_); std::lock_guard lk(mutex_);
std::vector<std::map<std::string, std::string>> ret; std::vector<std::map<std::string, std::string>> ret;
ret.reserve(info.size() + connecting.size() + waiting.size()); ret.reserve(info.size() + connecting.size() + waiting.size());
for (auto& [id, ci] : info) { for (auto& [id, ci] : info) {
...@@ -355,7 +355,7 @@ struct DeviceInfo { ...@@ -355,7 +355,7 @@ struct DeviceInfo {
class DeviceInfoSet { class DeviceInfoSet {
public: public:
std::shared_ptr<DeviceInfo> getDeviceInfo(const DeviceId& deviceId) { std::shared_ptr<DeviceInfo> getDeviceInfo(const DeviceId& deviceId) {
std::lock_guard lk(mtx_); std::lock_guard lk(mutex_);
auto it = infos_.find(deviceId); auto it = infos_.find(deviceId);
if (it != infos_.end()) if (it != infos_.end())
return it->second; return it->second;
...@@ -364,7 +364,7 @@ public: ...@@ -364,7 +364,7 @@ public:
std::vector<std::shared_ptr<DeviceInfo>> getDeviceInfos() { std::vector<std::shared_ptr<DeviceInfo>> getDeviceInfos() {
std::vector<std::shared_ptr<DeviceInfo>> deviceInfos; std::vector<std::shared_ptr<DeviceInfo>> deviceInfos;
std::lock_guard lk(mtx_); std::lock_guard lk(mutex_);
deviceInfos.reserve(infos_.size()); deviceInfos.reserve(infos_.size());
for (auto& [deviceId, info] : infos_) for (auto& [deviceId, info] : infos_)
deviceInfos.emplace_back(info); deviceInfos.emplace_back(info);
...@@ -372,7 +372,7 @@ public: ...@@ -372,7 +372,7 @@ public:
} }
std::shared_ptr<DeviceInfo> createDeviceInfo(const DeviceId& deviceId) { std::shared_ptr<DeviceInfo> createDeviceInfo(const DeviceId& deviceId) {
std::lock_guard lk(mtx_); std::lock_guard lk(mutex_);
auto& info = infos_[deviceId]; auto& info = infos_[deviceId];
if (!info) if (!info)
info = std::make_shared<DeviceInfo>(deviceId); info = std::make_shared<DeviceInfo>(deviceId);
...@@ -380,13 +380,13 @@ public: ...@@ -380,13 +380,13 @@ public:
} }
bool removeDeviceInfo(const DeviceId& deviceId) { bool removeDeviceInfo(const DeviceId& deviceId) {
std::lock_guard lk(mtx_); std::lock_guard lk(mutex_);
return infos_.erase(deviceId) != 0; return infos_.erase(deviceId) != 0;
} }
std::shared_ptr<ConnectionInfo> getInfo(const DeviceId& deviceId, const dht::Value::Id& id) { std::shared_ptr<ConnectionInfo> getInfo(const DeviceId& deviceId, const dht::Value::Id& id) {
if (auto info = getDeviceInfo(deviceId)) { if (auto info = getDeviceInfo(deviceId)) {
std::lock_guard lk(info->mtx_); std::lock_guard lk(info->mutex_);
auto it = info->info.find(id); auto it = info->info.find(id);
if (it != info->info.end()) if (it != info->info.end())
return it->second; return it->second;
...@@ -399,7 +399,7 @@ public: ...@@ -399,7 +399,7 @@ public:
std::vector<std::shared_ptr<ConnectionInfo>> ret; std::vector<std::shared_ptr<ConnectionInfo>> ret;
ret.reserve(deviceInfos.size()); ret.reserve(deviceInfos.size());
for (auto& info : deviceInfos) { for (auto& info : deviceInfos) {
std::lock_guard lk(info->mtx_); std::lock_guard lk(info->mutex_);
for (auto& [id, ci] : info->info) { for (auto& [id, ci] : info->info) {
if (ci->socket_) if (ci->socket_)
ret.emplace_back(ci); ret.emplace_back(ci);
...@@ -409,7 +409,7 @@ public: ...@@ -409,7 +409,7 @@ public:
} }
std::vector<std::shared_ptr<DeviceInfo>> shutdown() { std::vector<std::shared_ptr<DeviceInfo>> shutdown() {
std::vector<std::shared_ptr<DeviceInfo>> ret; std::vector<std::shared_ptr<DeviceInfo>> ret;
std::lock_guard lk(mtx_); std::lock_guard lk(mutex_);
ret.reserve(infos_.size()); ret.reserve(infos_.size());
for (auto& [deviceId, info] : infos_) { for (auto& [deviceId, info] : infos_) {
ret.emplace_back(std::move(info)); ret.emplace_back(std::move(info));
...@@ -419,7 +419,7 @@ public: ...@@ -419,7 +419,7 @@ public:
} }
private: private:
std::mutex mtx_ {}; std::mutex mutex_ {};
std::map<DeviceId, std::shared_ptr<DeviceInfo>> infos_ {}; std::map<DeviceId, std::shared_ptr<DeviceInfo>> infos_ {};
}; };
...@@ -462,7 +462,7 @@ public: ...@@ -462,7 +462,7 @@ public:
std::vector<std::shared_ptr<ConnectionInfo>> unused; std::vector<std::shared_ptr<ConnectionInfo>> unused;
std::vector<std::pair<DeviceId, std::vector<PendingCb>>> pending; std::vector<std::pair<DeviceId, std::vector<PendingCb>>> pending;
for (auto& dinfo: infos_.shutdown()) { for (auto& dinfo: infos_.shutdown()) {
std::lock_guard lk(dinfo->mtx_); std::lock_guard lk(dinfo->mutex_);
auto p = dinfo->extractPendingOperations(0, nullptr, false); auto p = dinfo->extractPendingOperations(0, nullptr, false);
if (!p.empty()) if (!p.empty())
pending.emplace_back(dinfo->deviceId, std::move(p)); pending.emplace_back(dinfo->deviceId, std::move(p));
...@@ -920,7 +920,7 @@ ConnectionManager::Impl::connectDevice(const std::shared_ptr<dht::crypto::Certif ...@@ -920,7 +920,7 @@ ConnectionManager::Impl::connectDevice(const std::shared_ptr<dht::crypto::Certif
return; return;
} }
auto di = sthis->infos_.createDeviceInfo(deviceId); auto di = sthis->infos_.createDeviceInfo(deviceId);
std::unique_lock lk(di->mtx_); std::unique_lock lk(di->mutex_);
if (!di->cert) { if (!di->cert) {
di->cert = cert; di->cert = cert;
} }
...@@ -946,7 +946,7 @@ ConnectionManager::Impl::connectDevice(const std::shared_ptr<dht::crypto::Certif ...@@ -946,7 +946,7 @@ ConnectionManager::Impl::connectDevice(const std::shared_ptr<dht::crypto::Certif
if (auto info = di->getConnectedInfo()) { if (auto info = di->getConnectedInfo()) {
std::unique_lock lkc(info->mutex_); std::unique_lock lkc(info->mutex_);
if (auto sock = info->socket_) { if (auto sock = info->socket_) {
info->cbIds_.emplace(vid); info->pendingCbs_.emplace(vid);
diw.requested = true; diw.requested = true;
lkc.unlock(); lkc.unlock();
lk.unlock(); lk.unlock();
...@@ -982,7 +982,7 @@ ConnectionManager::Impl::startConnection(const std::shared_ptr<DeviceInfo>& di, ...@@ -982,7 +982,7 @@ ConnectionManager::Impl::startConnection(const std::shared_ptr<DeviceInfo>& di,
// all stored structures. // all stored structures.
auto eraseInfo = [w = weak_from_this(), diw=std::weak_ptr(di), vid] { auto eraseInfo = [w = weak_from_this(), diw=std::weak_ptr(di), vid] {
if (auto di = diw.lock()) { if (auto di = diw.lock()) {
std::unique_lock lk(di->mtx_); std::unique_lock lk(di->mutex_);
di->info.erase(vid); di->info.erase(vid);
auto ops = di->extractPendingOperations(vid, nullptr); auto ops = di->extractPendingOperations(vid, nullptr);
if (di->empty()) { if (di->empty()) {
...@@ -1117,7 +1117,7 @@ ConnectionManager::Impl::sendChannelRequest(const std::weak_ptr<DeviceInfo>& din ...@@ -1117,7 +1117,7 @@ ConnectionManager::Impl::sendChannelRequest(const std::weak_ptr<DeviceInfo>& din
dht::ThreadPool::io().run([cinfow, vid]() { dht::ThreadPool::io().run([cinfow, vid]() {
if (auto cinfo = cinfow.lock()) { if (auto cinfo = cinfow.lock()) {
std::lock_guard lk(cinfo->mutex_); std::lock_guard lk(cinfo->mutex_);
cinfo->cbIds_.erase(vid); cinfo->pendingCbs_.erase(vid);
} }
}); });
} }
...@@ -1251,7 +1251,7 @@ ConnectionManager::Impl::onTlsNegotiationDone(const std::shared_ptr<DeviceInfo>& ...@@ -1251,7 +1251,7 @@ ConnectionManager::Impl::onTlsNegotiationDone(const std::shared_ptr<DeviceInfo>&
dinfo->executePendingOperations(vid, nullptr); dinfo->executePendingOperations(vid, nullptr);
} }
std::unique_lock lk(dinfo->mtx_); std::unique_lock lk(dinfo->mutex_);
dinfo->info.erase(vid); dinfo->info.erase(vid);
if (dinfo->empty()) { if (dinfo->empty()) {
...@@ -1273,13 +1273,13 @@ ConnectionManager::Impl::onTlsNegotiationDone(const std::shared_ptr<DeviceInfo>& ...@@ -1273,13 +1273,13 @@ ConnectionManager::Impl::onTlsNegotiationDone(const std::shared_ptr<DeviceInfo>&
} }
// Note: do not remove pending there it's done in sendChannelRequest // Note: do not remove pending there it's done in sendChannelRequest
std::unique_lock lk2 {dinfo->mtx_}; std::unique_lock lk2 {dinfo->mutex_};
auto pendingIds = dinfo->requestPendingOps(); auto pendingIds = dinfo->requestPendingOps();
auto previousConnections = dinfo->getConnectedInfos(); auto previousConnections = dinfo->getConnectedInfos();
std::unique_lock lk {info->mutex_}; std::unique_lock lk {info->mutex_};
addNewMultiplexedSocket(dinfo, deviceId, vid, info); addNewMultiplexedSocket(dinfo, deviceId, vid, info);
for (const auto& [id, name]: pendingIds) for (const auto& [id, name]: pendingIds)
info->cbIds_.emplace(id); info->pendingCbs_.emplace(id);
lk.unlock(); lk.unlock();
lk2.unlock(); lk2.unlock();
// send beacon to existing connections for this device // send beacon to existing connections for this device
...@@ -1459,7 +1459,7 @@ ConnectionManager::Impl::onDhtPeerRequest(const PeerConnectionRequest& req, ...@@ -1459,7 +1459,7 @@ ConnectionManager::Impl::onDhtPeerRequest(const PeerConnectionRequest& req,
auto eraseInfo = [w, wdi, id = req.id] { auto eraseInfo = [w, wdi, id = req.id] {
auto shared = w.lock(); auto shared = w.lock();
if (auto di = wdi.lock()) { if (auto di = wdi.lock()) {
std::unique_lock lk(di->mtx_); std::unique_lock lk(di->mutex_);
di->info.erase(id); di->info.erase(id);
auto ops = di->extractPendingOperations(id, nullptr); auto ops = di->extractPendingOperations(id, nullptr);
if (di->empty()) { if (di->empty()) {
...@@ -1519,7 +1519,7 @@ ConnectionManager::Impl::onDhtPeerRequest(const PeerConnectionRequest& req, ...@@ -1519,7 +1519,7 @@ ConnectionManager::Impl::onDhtPeerRequest(const PeerConnectionRequest& req,
// Negotiate a new ICE socket // Negotiate a new ICE socket
{ {
std::lock_guard lk(di->mtx_); std::lock_guard lk(di->mutex_);
di->info[req.id] = info; di->info[req.id] = info;
} }
...@@ -1570,9 +1570,9 @@ ConnectionManager::Impl::addNewMultiplexedSocket(const std::weak_ptr<DeviceInfo> ...@@ -1570,9 +1570,9 @@ ConnectionManager::Impl::addNewMultiplexedSocket(const std::weak_ptr<DeviceInfo>
dht::ThreadPool::io().run([w, dinfo, wi, vid] { dht::ThreadPool::io().run([w, dinfo, wi, vid] {
if (auto info = wi.lock()) { if (auto info = wi.lock()) {
if (auto deviceInfo = dinfo.lock()) { if (auto deviceInfo = dinfo.lock()) {
std::unique_lock lkd(deviceInfo->mtx_); std::unique_lock lkd(deviceInfo->mutex_);
std::unique_lock lkc(info->mutex_); std::unique_lock lkc(info->mutex_);
auto ids = std::move(info->cbIds_); auto ids = std::move(info->pendingCbs_);
auto [ops, retry] = deviceInfo->resetPendingOperations(ids); auto [ops, retry] = deviceInfo->resetPendingOperations(ids);
auto erased = deviceInfo->info.erase(vid); auto erased = deviceInfo->info.erase(vid);
if (!retry && deviceInfo->empty()) { if (!retry && deviceInfo->empty()) {
...@@ -1597,7 +1597,7 @@ ConnectionManager::Impl::addNewMultiplexedSocket(const std::weak_ptr<DeviceInfo> ...@@ -1597,7 +1597,7 @@ ConnectionManager::Impl::addNewMultiplexedSocket(const std::weak_ptr<DeviceInfo>
void void
ConnectionManager::Impl::retryOnError(const std::shared_ptr<DeviceInfo>& deviceInfo) ConnectionManager::Impl::retryOnError(const std::shared_ptr<DeviceInfo>& deviceInfo)
{ {
std::unique_lock<std::mutex> lk(deviceInfo->mtx_); std::unique_lock<std::mutex> lk(deviceInfo->mutex_);
if (not deviceInfo->isConnecting()) if (not deviceInfo->isConnecting())
return; return;
if (auto i = deviceInfo->getConnectedInfo()) { if (auto i = deviceInfo->getConnectedInfo()) {
...@@ -1894,7 +1894,7 @@ bool ...@@ -1894,7 +1894,7 @@ bool
ConnectionManager::isConnecting(const DeviceId& deviceId, const std::string& name) const ConnectionManager::isConnecting(const DeviceId& deviceId, const std::string& name) const
{ {
if (auto dinfo = pimpl_->infos_.getDeviceInfo(deviceId)) { if (auto dinfo = pimpl_->infos_.getDeviceInfo(deviceId)) {
std::unique_lock lk {dinfo->mtx_}; std::unique_lock lk {dinfo->mutex_};
return dinfo->isConnecting(name); return dinfo->isConnecting(name);
} }
return false; return false;
...@@ -1904,7 +1904,7 @@ bool ...@@ -1904,7 +1904,7 @@ bool
ConnectionManager::isConnected(const DeviceId& deviceId) const ConnectionManager::isConnected(const DeviceId& deviceId) const
{ {
if (auto dinfo = pimpl_->infos_.getDeviceInfo(deviceId)) { if (auto dinfo = pimpl_->infos_.getDeviceInfo(deviceId)) {
std::unique_lock lk {dinfo->mtx_}; std::unique_lock lk {dinfo->mutex_};
return dinfo->getConnectedInfo() != nullptr; return dinfo->getConnectedInfo() != nullptr;
} }
return false; return false;
...@@ -1915,7 +1915,7 @@ ConnectionManager::closeConnectionsWith(const std::string& peerUri) ...@@ -1915,7 +1915,7 @@ ConnectionManager::closeConnectionsWith(const std::string& peerUri)
{ {
std::vector<std::shared_ptr<DeviceInfo>> dInfos; std::vector<std::shared_ptr<DeviceInfo>> dInfos;
for (const auto& dinfo: pimpl_->infos_.getDeviceInfos()) { for (const auto& dinfo: pimpl_->infos_.getDeviceInfos()) {
std::unique_lock lk(dinfo->mtx_); std::unique_lock lk(dinfo->mutex_);
bool isPeer = false; bool isPeer = false;
for (auto const& [id, cinfo]: dinfo->info) { for (auto const& [id, cinfo]: dinfo->info) {
std::lock_guard lkv {cinfo->mutex_}; std::lock_guard lkv {cinfo->mutex_};
...@@ -1935,7 +1935,7 @@ ConnectionManager::closeConnectionsWith(const std::string& peerUri) ...@@ -1935,7 +1935,7 @@ ConnectionManager::closeConnectionsWith(const std::string& peerUri)
} }
// Stop connections to all peers devices // Stop connections to all peers devices
for (const auto& dinfo : dInfos) { for (const auto& dinfo : dInfos) {
std::unique_lock lk {dinfo->mtx_}; std::unique_lock lk {dinfo->mutex_};
auto unused = dinfo->extractUnusedConnections(); auto unused = dinfo->extractUnusedConnections();
auto pending = dinfo->extractPendingOperations(0, nullptr); auto pending = dinfo->extractPendingOperations(0, nullptr);
pimpl_->infos_.removeDeviceInfo(dinfo->deviceId); pimpl_->infos_.removeDeviceInfo(dinfo->deviceId);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment