From 09f143cc823975a00f2d4385685122c0bda0606c Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?S=C3=A9bastien=20Blin?=
 <sebastien.blin@savoirfairelinux.com>
Date: Tue, 19 Oct 2021 14:27:23 -0400
Subject: [PATCH] channeled_transport: connect setOnRecv when ready to receive
 data

A ChannelSocket can starts to receive datas while waiting acceptance
for the channel. In this case, received datas will be injected when
setOnRecv will be called on the channel.

However, in channeled_transport, setOnRecv injects data to pjsip and
is handled by the SIPVoIPLink where transaction_request_cb will retrieve
the infos via the transports_. This means, that, in order to successfully
handle data, we MUST inject data when transports_ is correct.

In this patch, SipTransportBroker::getChanneledTransport is modified
to connect the callbacks after adding the transport to the map and
inject data in the correct methods.

This fix sporadic failures for testInviteFromMessageAfterRemoved

Change-Id: I2767801a9dad77439fb2f2adedbc9b900add8cea
---
 src/jamidht/channeled_transport.cpp | 11 ++++++++---
 src/jamidht/channeled_transport.h   |  6 ++++++
 src/jamidht/jamiaccount.cpp         |  5 +++--
 src/jamidht/multiplexed_socket.cpp  | 14 +++++++++++---
 src/sip/siptransport.cpp            |  8 ++++++--
 src/sip/siptransport.h              |  6 ++++--
 6 files changed, 38 insertions(+), 12 deletions(-)

diff --git a/src/jamidht/channeled_transport.cpp b/src/jamidht/channeled_transport.cpp
index ed0745294e..90303e7eb0 100644
--- a/src/jamidht/channeled_transport.cpp
+++ b/src/jamidht/channeled_transport.cpp
@@ -39,6 +39,7 @@ ChanneledSIPTransport::ChanneledSIPTransport(pjsip_endpoint* endpt,
                                              const IpAddr& remote,
                                              onShutdownCb&& cb)
     : socket_(socket)
+    , shutdownCb_(std::move(cb))
     , local_ {local}
     , remote_ {remote}
     , trData_()
@@ -150,9 +151,13 @@ ChanneledSIPTransport::ChanneledSIPTransport(pjsip_endpoint* endpt,
     // Register callbacks
     if (pjsip_transport_register(base.tpmgr, &base) != PJ_SUCCESS)
         throw std::runtime_error("Can't register PJSIP transport.");
+}
 
+void
+ChanneledSIPTransport::start()
+{
     // Link to Channel Socket
-    socket->setOnRecv([this](const uint8_t* buf, size_t len) {
+    socket_->setOnRecv([this](const uint8_t* buf, size_t len) {
         pj_gettimeofday(&rdata_.pkt_info.timestamp);
         size_t remaining {len};
         while (remaining) {
@@ -176,7 +181,7 @@ ChanneledSIPTransport::ChanneledSIPTransport(pjsip_endpoint* endpt,
         }
         return len;
     });
-    socket->onShutdown([cb = std::move(cb), this] {
+    socket_->onShutdown([this] {
         disconnected_ = true;
         if (auto state_cb = pjsip_tpmgr_get_state_cb(trData_.base.tpmgr)) {
             JAMI_WARN("[SIPS] process disconnect event");
@@ -185,7 +190,7 @@ ChanneledSIPTransport::ChanneledSIPTransport(pjsip_endpoint* endpt,
             state_info.status = PJ_SUCCESS;
             (*state_cb)(&trData_.base, PJSIP_TP_STATE_DISCONNECTED, &state_info);
         }
-        cb();
+        shutdownCb_();
     });
 }
 
diff --git a/src/jamidht/channeled_transport.h b/src/jamidht/channeled_transport.h
index 42588e7afd..ed3a79b778 100644
--- a/src/jamidht/channeled_transport.h
+++ b/src/jamidht/channeled_transport.h
@@ -56,6 +56,11 @@ public:
                           onShutdownCb&& cb);
     ~ChanneledSIPTransport();
 
+    /**
+     * Connect callbacks for channeled socket, must be done when the channel is ready to be used
+     */
+    void start();
+
     pjsip_transport* getTransportBase() override { return &trData_.base; }
 
     IpAddr getLocalAddress() const override { return local_; }
@@ -65,6 +70,7 @@ private:
 
     // The SIP transport uses a ChannelSocket to send and receive datas
     std::shared_ptr<ChannelSocket> socket_ {};
+    onShutdownCb shutdownCb_ {};
     IpAddr local_ {};
     IpAddr remote_ {};
 
diff --git a/src/jamidht/jamiaccount.cpp b/src/jamidht/jamiaccount.cpp
index 542cf42f24..69c02e34b3 100644
--- a/src/jamidht/jamiaccount.cpp
+++ b/src/jamidht/jamiaccount.cpp
@@ -4217,12 +4217,13 @@ JamiAccount::cacheSIPConnection(std::shared_ptr<ChannelSocket>&& socket,
         // SIP channel to make the call pass through
         shared->callConnectionClosed(key.second, false);
     };
-    auto sip_tr = link_.sipTransportBroker->getChanneledTransport(socket, std::move(onShutdown));
+    auto sip_tr = link_.sipTransportBroker->getChanneledTransport(shared(),
+                                                                  socket,
+                                                                  std::move(onShutdown));
     if (!sip_tr) {
         JAMI_ERR() << "No channeled transport found";
         return;
     }
-    sip_tr->setAccount(shared());
     // Store the connection
     connections.emplace_back(SipConnection {sip_tr, socket});
     JAMI_WARN("[Account %s] New SIP channel opened with %s",
diff --git a/src/jamidht/multiplexed_socket.cpp b/src/jamidht/multiplexed_socket.cpp
index e971968c9c..5930a15614 100644
--- a/src/jamidht/multiplexed_socket.cpp
+++ b/src/jamidht/multiplexed_socket.cpp
@@ -120,11 +120,16 @@ public:
         clearSockets();
     }
 
-    std::shared_ptr<ChannelSocket> makeSocket(const std::string& name, uint16_t channel, bool isInitiator = false)
+    std::shared_ptr<ChannelSocket> makeSocket(const std::string& name,
+                                              uint16_t channel,
+                                              bool isInitiator = false)
     {
         auto& channelSocket = sockets[channel];
         if (not channelSocket)
-            channelSocket = std::make_shared<ChannelSocket>(parent_.weak(), name, channel, isInitiator);
+            channelSocket = std::make_shared<ChannelSocket>(parent_.weak(),
+                                                            name,
+                                                            channel,
+                                                            isInitiator);
         else {
             JAMI_WARN("A channel is already present on that socket, accepting "
                       "the request will close the previous one %s",
@@ -705,7 +710,10 @@ MultiplexedSocket::sendVersion()
 class ChannelSocket::Impl
 {
 public:
-    Impl(std::weak_ptr<MultiplexedSocket> endpoint, const std::string& name, const uint16_t& channel, bool isInitiator)
+    Impl(std::weak_ptr<MultiplexedSocket> endpoint,
+         const std::string& name,
+         const uint16_t& channel,
+         bool isInitiator)
         : name(name)
         , channel(channel)
         , endpoint(std::move(endpoint))
diff --git a/src/sip/siptransport.cpp b/src/sip/siptransport.cpp
index ffdd0b001d..5030edb135 100644
--- a/src/sip/siptransport.cpp
+++ b/src/sip/siptransport.cpp
@@ -397,7 +397,8 @@ SipTransportBroker::getTlsTransport(const std::shared_ptr<TlsListener>& l,
 }
 
 std::shared_ptr<SipTransport>
-SipTransportBroker::getChanneledTransport(const std::shared_ptr<ChannelSocket>& socket,
+SipTransportBroker::getChanneledTransport(const std::shared_ptr<SIPAccountBase>& account,
+                                          const std::shared_ptr<ChannelSocket>& socket,
                                           onShutdownCb&& cb)
 {
     auto ice = socket->underlyingICE();
@@ -415,7 +416,7 @@ SipTransportBroker::getChanneledTransport(const std::shared_ptr<ChannelSocket>&
     auto tr = sips_tr->getTransportBase();
     auto sip_tr = std::make_shared<SipTransport>(tr);
     sip_tr->setDeviceId(socket->deviceId().toString());
-    sips_tr.release(); // managed by PJSIP now
+    sip_tr->setAccount(account);
 
     {
         std::lock_guard<std::mutex> lock(transportMapMutex_);
@@ -423,6 +424,9 @@ SipTransportBroker::getChanneledTransport(const std::shared_ptr<ChannelSocket>&
         // (member of new SipIceTransport instance)
         transports_.emplace(tr, sip_tr);
     }
+
+    sips_tr->start();
+    sips_tr.release(); // managed by PJSIP now
     return sip_tr;
 }
 
diff --git a/src/sip/siptransport.h b/src/sip/siptransport.h
index 6d81183cdc..914c68b434 100644
--- a/src/sip/siptransport.h
+++ b/src/sip/siptransport.h
@@ -161,8 +161,10 @@ public:
 
     std::shared_ptr<SipTransport> addTransport(pjsip_transport*);
 
-    std::shared_ptr<SipTransport> getChanneledTransport(const std::shared_ptr<ChannelSocket>& socket,
-                                                        onShutdownCb&& cb);
+    std::shared_ptr<SipTransport> getChanneledTransport(
+        const std::shared_ptr<SIPAccountBase>& account,
+        const std::shared_ptr<ChannelSocket>& socket,
+        onShutdownCb&& cb);
 
     /**
      * Start graceful shutdown procedure for all transports
-- 
GitLab