Skip to content
Snippets Groups Projects
Commit 69c98285 authored by Adrien Béraud's avatar Adrien Béraud Committed by Sébastien Blin
Browse files

MultiplexedSocket: more refactor, remove channelCbs

Change-Id: I34eeff81cb639bab901e939dd6436f6645c32b4d
parent 4b2386c8
No related branches found
No related tags found
No related merge requests found
...@@ -578,6 +578,14 @@ ConnectionManager::Impl::sendChannelRequest(std::shared_ptr<MultiplexedSocket>& ...@@ -578,6 +578,14 @@ ConnectionManager::Impl::sendChannelRequest(std::shared_ptr<MultiplexedSocket>&
const dht::Value::Id& vid) const dht::Value::Id& vid)
{ {
auto channelSock = sock->addChannel(name); auto channelSock = sock->addChannel(name);
channelSock->onReady([wSock = std::weak_ptr<ChannelSocket>(channelSock), name, deviceId, vid, w = weak()]() {
auto shared = w.lock();
auto channelSock = wSock.lock();
if (shared and channelSock)
for (const auto& pending : shared->extractPendingCallbacks(deviceId, vid))
pending.cb(channelSock, deviceId);
});
ChannelRequest val; ChannelRequest val;
val.name = channelSock->name(); val.name = channelSock->name();
val.state = ChannelRequestState::REQUEST; val.state = ChannelRequestState::REQUEST;
...@@ -585,11 +593,6 @@ ConnectionManager::Impl::sendChannelRequest(std::shared_ptr<MultiplexedSocket>& ...@@ -585,11 +593,6 @@ ConnectionManager::Impl::sendChannelRequest(std::shared_ptr<MultiplexedSocket>&
msgpack::sbuffer buffer(256); msgpack::sbuffer buffer(256);
msgpack::pack(buffer, val); msgpack::pack(buffer, val);
sock->setOnChannelReady(channelSock->channel(), [channelSock, name, deviceId, vid, w = weak()]() {
if (auto shared = w.lock())
for (const auto& pending : shared->extractPendingCallbacks(deviceId, vid))
pending.cb(channelSock, deviceId);
});
std::error_code ec; std::error_code ec;
int res = sock->write(CONTROL_CHANNEL, int res = sock->write(CONTROL_CHANNEL,
reinterpret_cast<const uint8_t*>(buffer.data()), reinterpret_cast<const uint8_t*>(buffer.data()),
......
...@@ -122,6 +122,10 @@ public: ...@@ -122,6 +122,10 @@ public:
auto& channelSocket = sockets[channel]; auto& channelSocket = sockets[channel];
if (not channelSocket) if (not channelSocket)
channelSocket = std::make_shared<ChannelSocket>(parent_.weak(), name, channel); channelSocket = std::make_shared<ChannelSocket>(parent_.weak(), name, channel);
else {
JAMI_WARN("A channel is already present on that socket, accepting "
"the request will close the previous one");
}
return channelSocket; return channelSocket;
} }
...@@ -167,9 +171,6 @@ public: ...@@ -167,9 +171,6 @@ public:
std::mutex socketsMutex {}; std::mutex socketsMutex {};
std::map<uint16_t, std::shared_ptr<ChannelSocket>> sockets {}; std::map<uint16_t, std::shared_ptr<ChannelSocket>> sockets {};
// Contains callback triggered when a channel is ready
std::mutex channelCbsMutex {};
std::map<uint16_t, onChannelReadyCb> channelCbs {};
// Main loop to parse incoming packets // Main loop to parse incoming packets
std::atomic_bool stop {false}; std::atomic_bool stop {false};
...@@ -247,12 +248,9 @@ void ...@@ -247,12 +248,9 @@ void
MultiplexedSocket::Impl::onAccept(const std::string& name, uint16_t channel) MultiplexedSocket::Impl::onAccept(const std::string& name, uint16_t channel)
{ {
std::lock_guard<std::mutex> lkSockets(socketsMutex); std::lock_guard<std::mutex> lkSockets(socketsMutex);
onChannelReady_(deviceId, makeSocket(name, channel)); auto socket = makeSocket(name, channel);
std::lock_guard<std::mutex> lk(channelCbsMutex); onChannelReady_(deviceId, socket);
auto channelCbsIt = channelCbs.find(channel); socket->ready();
if (channelCbsIt != channelCbs.end()) {
(channelCbsIt->second)();
}
} }
void void
...@@ -375,11 +373,7 @@ MultiplexedSocket::Impl::onRequest(const std::string& name, uint16_t channel) ...@@ -375,11 +373,7 @@ MultiplexedSocket::Impl::onRequest(const std::string& name, uint16_t channel)
if (accept) { if (accept) {
onChannelReady_(deviceId, channelSocket); onChannelReady_(deviceId, channelSocket);
std::lock_guard<std::mutex> lk(channelCbsMutex); channelSocket->ready();
auto channelCbsIt = channelCbs.find(channel);
if (channelCbsIt != channelCbs.end()) {
channelCbsIt->second();
}
} }
} }
...@@ -534,12 +528,6 @@ MultiplexedSocket::setOnRequest(OnConnectionRequestCb&& cb) ...@@ -534,12 +528,6 @@ MultiplexedSocket::setOnRequest(OnConnectionRequestCb&& cb)
pimpl_->onRequest_ = std::move(cb); pimpl_->onRequest_ = std::move(cb);
} }
void
MultiplexedSocket::setOnChannelReady(uint16_t channel, onChannelReadyCb&& cb)
{
pimpl_->channelCbs[channel] = std::move(cb);
}
bool bool
MultiplexedSocket::isReliable() const MultiplexedSocket::isReliable() const
{ {
...@@ -708,6 +696,7 @@ public: ...@@ -708,6 +696,7 @@ public:
~Impl() {} ~Impl() {}
ChannelReadyCb readyCb_ {};
OnShutdownCb shutdownCb_ {}; OnShutdownCb shutdownCb_ {};
std::atomic_bool isShutdown_ {false}; std::atomic_bool isShutdown_ {false};
std::string name {}; std::string name {};
...@@ -819,6 +808,13 @@ ChannelSocket::underlyingSocket() const ...@@ -819,6 +808,13 @@ ChannelSocket::underlyingSocket() const
} }
#endif #endif
void
ChannelSocket::ready()
{
if (pimpl_->readyCb_)
pimpl_->readyCb_();
}
void void
ChannelSocket::stop() ChannelSocket::stop()
{ {
...@@ -899,6 +895,11 @@ ChannelSocket::onShutdown(OnShutdownCb&& cb) ...@@ -899,6 +895,11 @@ ChannelSocket::onShutdown(OnShutdownCb&& cb)
} }
} }
void
ChannelSocket::onReady(ChannelReadyCb&& cb){
pimpl_->readyCb_ = std::move(cb);
}
void void
ChannelSocket::sendBeacon(const std::chrono::milliseconds& timeout) ChannelSocket::sendBeacon(const std::chrono::milliseconds& timeout)
{ {
......
...@@ -32,7 +32,7 @@ using OnConnectionRequestCb = std::function< ...@@ -32,7 +32,7 @@ using OnConnectionRequestCb = std::function<
bool(const DeviceId& /* device id */, const uint16_t& /* id */, const std::string& /* name */)>; bool(const DeviceId& /* device id */, const uint16_t& /* id */, const std::string& /* name */)>;
using OnConnectionReadyCb using OnConnectionReadyCb
= std::function<void(const DeviceId& /* device id */, const std::shared_ptr<ChannelSocket>&)>; = std::function<void(const DeviceId& /* device id */, const std::shared_ptr<ChannelSocket>&)>;
using onChannelReadyCb = std::function<void(void)>; using ChannelReadyCb = std::function<void(void)>;
using OnShutdownCb = std::function<void(void)>; using OnShutdownCb = std::function<void(void)>;
static constexpr auto SEND_BEACON_TIMEOUT = std::chrono::milliseconds(3000); static constexpr auto SEND_BEACON_TIMEOUT = std::chrono::milliseconds(3000);
...@@ -97,11 +97,6 @@ public: ...@@ -97,11 +97,6 @@ public:
* Will be triggered when the peer asks for a new channel * Will be triggered when the peer asks for a new channel
*/ */
void setOnRequest(OnConnectionRequestCb&& cb); void setOnRequest(OnConnectionRequestCb&& cb);
/**
* Triggered when a specific channel is ready
* Used by ConnectionManager::connectDevice()
*/
void setOnChannelReady(uint16_t channel, onChannelReadyCb&& cb);
std::size_t write(const uint16_t& channel, std::size_t write(const uint16_t& channel,
const uint8_t* buf, const uint8_t* buf,
...@@ -197,6 +192,13 @@ public: ...@@ -197,6 +192,13 @@ public:
* Will trigger onShutdown's callback * Will trigger onShutdown's callback
*/ */
void shutdown() override; void shutdown() override;
void ready();
/**
* Triggered when a specific channel is ready
* Used by ConnectionManager::connectDevice()
*/
void onReady(ChannelReadyCb&& cb);
/** /**
* Will trigger that callback when shutdown() is called * Will trigger that callback when shutdown() is called
*/ */
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment