Skip to content
Snippets Groups Projects
Commit e781b9c7 authored by Sébastien Blin's avatar Sébastien Blin Committed by Adrien Béraud
Browse files

multiplexedsocket: various fixes

Check for endpoint existance to avoid nullptr exception.
Avoid to write multiple packets at the same time on the socket. This avoid
to mix packet.
Avoid to be able to shut the channel multiple times

Change-Id: If5158b51f55f368091616062ced4d641130c8468
parent c789b853
Branches
No related tags found
No related merge requests found
......@@ -67,17 +67,18 @@ public:
stop.store(true);
isShutdown_ = true;
if (onShutdown_) onShutdown_();
endpoint->setOnStateChange({});
endpoint->shutdown();
{
std::lock_guard<std::mutex> lkSockets(socketsMutex);
for (auto& socket : sockets) {
// Just trigger onShutdown() to make client know
// No need to write the EOF for the channel, the write will fail because endpoint is already shutdown
if (socket.second) socket.second->stop();
}
sockets.clear();
if (endpoint) {
endpoint->setOnStateChange({});
std::unique_lock<std::mutex> lk(writeMtx);
endpoint->shutdown();
}
std::lock_guard<std::mutex> lkSockets(socketsMutex);
for (auto& socket : sockets) {
// Just trigger onShutdown() to make client know
// No need to write the EOF for the channel, the write will fail because endpoint is already shutdown
if (socket.second) socket.second->stop();
}
sockets.clear();
}
/**
......@@ -123,6 +124,8 @@ public:
std::mutex channelCbsMtx_ {};
std::map<uint16_t, GenericSocket<uint8_t>::RecvCb> channelCbs_ {};
std::atomic_bool isShutdown_ {false};
std::mutex writeMtx;
};
void
......@@ -160,11 +163,10 @@ MultiplexedSocket::Impl::eventLoop()
try {
auto msg = oh.get().as<ChanneledMessage>();
if (msg.channel == 0) {
if (msg.channel == 0)
handleControlPacket(std::move(msg.data));
} else {
else
handleChannelPacket(msg.channel, std::move(msg.data));
}
} catch (const msgpack::unpack_error &e) {
JAMI_WARN("Error when decoding msgpack message: %s", e.what());
}
......@@ -273,7 +275,7 @@ MultiplexedSocket::Impl::handleChannelPacket(uint16_t channel, const std::vector
auto cb = channelCbs_.find(channel);
if (cb != channelCbs_.end()) {
lk.unlock();
cb->second(&pkt[0], pkt.size());
if (cb->second) cb->second(&pkt[0], pkt.size());
return;
}
lk.unlock();
......@@ -413,7 +415,9 @@ MultiplexedSocket::write(const uint16_t& channel, const uint8_t* buf, std::size_
pk.pack_bin(len);
pk.pack_bin_body((const char*)buf, len);
std::unique_lock<std::mutex> lk(pimpl_->writeMtx);
int res = pimpl_->endpoint->write((const unsigned char*)buffer.data(), buffer.size(), ec);
lk.unlock();
if (res < 0) {
if (ec)
JAMI_ERR("Error when writing on socket: %s", ec.message().c_str());
......@@ -481,7 +485,7 @@ public:
Impl(std::weak_ptr<MultiplexedSocket> endpoint, const std::string& name, const uint16_t& channel)
: name(name), channel(channel), endpoint(std::move(endpoint)) {}
~Impl() {}
~Impl() { }
OnShutdownCb shutdownCb_;
std::atomic_bool isShutdown_ {false};
......@@ -564,13 +568,16 @@ ChannelSocket::underlyingICE() const
void
ChannelSocket::stop()
{
if (pimpl_->isShutdown_) return;
pimpl_->isShutdown_ = true;
if (pimpl_->shutdownCb_) pimpl_->shutdownCb_();
if (pimpl_->shutdownCb_)
pimpl_->shutdownCb_();
}
void
ChannelSocket::shutdown()
{
if (pimpl_->isShutdown_) return;
stop();
if (auto ep = pimpl_->endpoint.lock()) {
std::error_code ec;
......
......@@ -158,6 +158,11 @@ public:
std::size_t write(const ValueType* buf, std::size_t len, std::error_code& ec) override;
int waitForData(std::chrono::milliseconds timeout, std::error_code&) const override;
/**
* set a callback when receiving data
* @note: this callback should take a little time and not block
* but you can move it in a thread
*/
void setOnRecv(RecvCb&&) override;
std::shared_ptr<IceTransport> underlyingICE() const;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment