diff --git a/CMakeLists.txt b/CMakeLists.txt index 4a72860358d8d0aea09339706ee6000de3015c66..b27b0f39dfa3939075787ecc055c2d254b7066ab 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -30,6 +30,9 @@ pkg_check_modules (pjproject REQUIRED IMPORTED_TARGET libpjproject) set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DMSGPACK_NO_BOOST -DMSGPACK_DISABLE_LEGACY_NIL -DMSGPACK_DISABLE_LEGACY_CONVERT") +option(DHTNET_PUPNP "Enable UPnP support" ON) +option(DHTNET_NATPMP "Enable NAT-PMP support" ON) + # Sources list (APPEND dhtnet_SOURCES src/connectionmanager.cpp @@ -48,8 +51,6 @@ list (APPEND dhtnet_SOURCES src/upnp/upnp_context.cpp src/upnp/upnp_control.cpp src/upnp/protocol/mapping.cpp - src/upnp/upnp_context.cpp - src/upnp/upnp_control.cpp src/upnp/protocol/igd.cpp ) @@ -70,6 +71,37 @@ list (APPEND dhtnet_HEADERS include/upnp/upnp_control.h ) +if (DHTNET_PUPNP) + pkg_search_module (upnp IMPORTED_TARGET upnp libupnp) + if (NOT upnp_FOUND) + message("libupnp not found: disabling") + set(DHTNET_PUPNP Off) + else() + list (APPEND dhtnet_SOURCES + src/upnp/protocol/pupnp/pupnp.cpp + src/upnp/protocol/pupnp/upnp_igd.cpp + ) + endif() +endif() +if (DHTNET_NATPMP) + pkg_search_module (natpmp IMPORTED_TARGET natpmp) + if (NOT natpmp_FOUND) + find_library(natpmp_LIBRARIES natpmp) + if (NOT natpmp_LIBRARIES) + message("NAT-PMP not found: disabling") + set(DHTNET_NATPMP Off) + else() + message("NAT-PMP found: ${natpmp_LIBRARIES}") + endif() + endif() + if (DHTNET_NATPMP) + list (APPEND dhtnet_SOURCES + src/upnp/protocol/natpmp/nat_pmp.cpp + src/upnp/protocol/natpmp/pmp_igd.cpp + ) + endif() +endif() + add_library(dhtnet ${dhtnet_SOURCES}) target_link_libraries(dhtnet PUBLIC PkgConfig::opendht PkgConfig::pjproject fmt::fmt ${MSGPACK_LIB}) if (APPLE) @@ -80,6 +112,14 @@ target_include_directories(dhtnet PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include> $<INSTALL_INTERFACE:include> ) +if (DHTNET_PUPNP) + target_compile_definitions(dhtnet PRIVATE HAVE_LIBUPNP) + target_link_libraries(dhtnet PRIVATE PkgConfig::upnp) +endif() +if (DHTNET_NATPMP) + target_compile_definitions(dhtnet PRIVATE HAVE_LIBNATPMP) + target_link_libraries(dhtnet PRIVATE ${natpmp_LIBRARIES}) +endif() if (BUILD_TESTING) target_compile_definitions(dhtnet PUBLIC DHTNET_TESTABLE) endif() diff --git a/include/upnp/mapping.h b/include/upnp/mapping.h index a9cff54f115fe52061ea58308ec3016bfae68975..4b3f4273b20cb9a15eda902babd5f4e3eafd40b0 100644 --- a/include/upnp/mapping.h +++ b/include/upnp/mapping.h @@ -36,7 +36,7 @@ enum class MappingState { PENDING, IN_PROGRESS, FAILED, OPEN }; enum class NatProtocolType; class IGD; -class Mapping +class Mapping : std::enable_shared_from_this<Mapping> { friend class UPnPContext; friend class NatPmp; @@ -112,6 +112,7 @@ private: void setIgd(const std::shared_ptr<IGD>& igd); void setAvailable(bool val); void setState(const MappingState& state); + void updateState(const MappingState& state, bool notify = true); void updateDescription(); #if HAVE_LIBNATPMP void setRenewalTime(sys_clock::time_point time); diff --git a/include/upnp/upnp_context.h b/include/upnp/upnp_context.h index a486eb5fbe74d2233635c99fe0c2ea71362051dc..62ada121e361c4fe3374e3578513431aa5a2e801 100644 --- a/include/upnp/upnp_context.h +++ b/include/upnp/upnp_context.h @@ -27,7 +27,6 @@ #include "../ip_utils.h" -#include "upnp_thread_util.h" #include "mapping.h" #include <opendht/rng.h> @@ -82,7 +81,7 @@ public: virtual void onMappingRemoved(const std::shared_ptr<IGD>& igd, const Mapping& map) = 0; }; -class UPnPContext : public UpnpMappingObserver, protected UpnpThreadUtil +class UPnPContext : public UpnpMappingObserver { private: struct MappingStatus @@ -166,8 +165,6 @@ private: Mapping::sharedPtr_t registerMapping(Mapping& map); // Removes the mapping from the list. - std::map<Mapping::key_t, Mapping::sharedPtr_t>::iterator unregisterMapping( - std::map<Mapping::key_t, Mapping::sharedPtr_t>::iterator it); void unregisterMapping(const Mapping::sharedPtr_t& map); // Perform the request on the provided IGD. @@ -179,11 +176,6 @@ private: // Remove all mappings of the given type. void deleteAllMappings(PortType type); - // Update the state and notify the listener - void updateMappingState(const Mapping::sharedPtr_t& map, - MappingState newState, - bool notify = true); - // Provision ports. uint16_t getAvailablePortNumber(PortType type); @@ -297,6 +289,8 @@ private: int maxOpenPortLimit_[2] {8, 12}; //std::shared_ptr<Task> mappingListUpdateTimer_ {}; + std::shared_ptr<asio::io_context> ctx; + std::shared_ptr<dht::log::Logger> logger_; asio::steady_timer mappingListUpdateTimer_;// {}; // Current preferred IGD. Can be null if there is no valid IGD. diff --git a/include/upnp/upnp_thread_util.h b/include/upnp/upnp_thread_util.h deleted file mode 100644 index 22b25028461ebad1baafbb303865f39e1173a771..0000000000000000000000000000000000000000 --- a/include/upnp/upnp_thread_util.h +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Copyright (C) 2004-2023 Savoir-faire Linux Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see <https://www.gnu.org/licenses/>. - */ -#pragma once - -#include <thread> -#include <memory> -#include <asio/io_context.hpp> -#include <fmt/format.h> - -// This macro is used to validate that a code is executed from the expected -// thread. It's useful to detect unexpected race on data members. -#define CHECK_VALID_THREAD() \ - if (not isValidThread()) \ - fmt::print("The calling thread {} is not the expected thread: {}\n", getCurrentThread(), threadId_); - /*JAMI_ERR() << "The calling thread " << getCurrentThread() \ - << " is not the expected thread: " << threadId_;*/ - -namespace dhtnet { -namespace upnp { - -class UpnpThreadUtil -{ -protected: - std::thread::id getCurrentThread() const { return std::this_thread::get_id(); } - - bool isValidThread() const { return threadId_ == getCurrentThread(); } - - // Upnp context execution queue (same as manager's scheduler) - // Helpers to run tasks on upnp context queue. - //static ScheduledExecutor* getScheduler() { return &Manager::instance().scheduler(); } - - template<typename Callback> - static void runOnUpnpContextQueue(Callback&& cb) - { - //getScheduler()->run([cb = std::forward<Callback>(cb)]() mutable { cb(); }); - //ioContext->post(std::move(cb)); - } - - std::shared_ptr<asio::io_context> ioContext; - std::thread::id threadId_; -}; - -} // namespace upnp -} // namespace dhtnet diff --git a/src/ice_transport.cpp b/src/ice_transport.cpp index 47a67e6b8fdc6d8e6217d20091c0604e44b5aa16..a31fdb38827b49c2086bf72abdcdc71b76dfcc59 100644 --- a/src/ice_transport.cpp +++ b/src/ice_transport.cpp @@ -448,8 +448,8 @@ IceTransport::Impl::initIceInstance(const IceTransportOptions& options) config_.stun_tp_cnt = 0; - if (logger_) - logger_->debug("[ice:{}] Add host candidates", fmt::ptr(this)); + // if (logger_) + // logger_->debug("[ice:{}] Add host candidates", fmt::ptr(this)); addStunConfig(pj_AF_INET()); addStunConfig(pj_AF_INET6()); @@ -459,8 +459,8 @@ IceTransport::Impl::initIceInstance(const IceTransportOptions& options) upnpSrflxCand = setupUpnpReflexiveCandidates(); if (not upnpSrflxCand.empty()) { addServerReflexiveCandidates(upnpSrflxCand); - if (logger_) - logger_->debug("[ice:{}] Added UPNP srflx candidates:", fmt::ptr(this)); + // if (logger_) + // logger_->debug("[ice:{}] Added UPNP srflx candidates:", fmt::ptr(this)); } } @@ -472,8 +472,8 @@ IceTransport::Impl::initIceInstance(const IceTransportOptions& options) if (upnpSrflxCand.empty() or (upnpSrflxCand[0].second.toString() != genericSrflxCand[0].second.toString())) { addServerReflexiveCandidates(genericSrflxCand); - if (logger_) - logger_->debug("[ice:{}] Added generic srflx candidates:", fmt::ptr(this)); + // if (logger_) + // logger_->debug("[ice:{}] Added generic srflx candidates:", fmt::ptr(this)); } } @@ -908,10 +908,10 @@ IceTransport::Impl::addStunConfig(int af) stun.af = af; stun.conn_type = config_.stun.conn_type; - if (logger_) - logger_->debug("[ice:{}] added host stun config for {:s} transport", - fmt::ptr(this), - config_.protocol == PJ_ICE_TP_TCP ? "TCP" : "UDP"); + // if (logger_) + // logger_->debug("[ice:{}] added host stun config for {:s} transport", + // fmt::ptr(this), + // config_.protocol == PJ_ICE_TP_TCP ? "TCP" : "UDP"); return true; } diff --git a/src/upnp/protocol/mapping.cpp b/src/upnp/protocol/mapping.cpp index 3aa24b345c6ed49ae0818c3caca18d4bc5e9bdcc..4b9d5ba0cbbac062f4a562775ea5594b49293436 100644 --- a/src/upnp/protocol/mapping.cpp +++ b/src/upnp/protocol/mapping.cpp @@ -94,6 +94,20 @@ Mapping::setState(const MappingState& state) state_ = state; } +void +Mapping::updateState(const MappingState& newState, bool notify) +{ + std::unique_lock<std::mutex> lock(mutex_); + if (newState == state_) + return; + state_ = newState; + + if (notify && notifyCb_) { + lock.unlock(); + notifyCb_(shared_from_this()); + } +} + const char* Mapping::getStateStr() const { diff --git a/src/upnp/protocol/natpmp/nat_pmp.cpp b/src/upnp/protocol/natpmp/nat_pmp.cpp index 599205e2d990d1a102f523cf0e31804664cf5af3..2f48713fc4c5fb8cccd9d8ef09800ee46c8b2d93 100644 --- a/src/upnp/protocol/natpmp/nat_pmp.cpp +++ b/src/upnp/protocol/natpmp/nat_pmp.cpp @@ -21,11 +21,11 @@ namespace dhtnet { namespace upnp { -NatPmp::NatPmp() +NatPmp::NatPmp(const std::shared_ptr<asio::io_context>& ctx, const std::shared_ptr<dht::log::Logger>& logger) + : UPnPProtocol(logger), ioContext(ctx), searchForIgdTimer_(*ctx) { // JAMI_DBG("NAT-PMP: Instance [%p] created", this); - runOnNatPmpQueue([this] { - threadId_ = getCurrentThread(); + ioContext->dispatch([this] { igd_ = std::make_shared<PMPIGD>(); }); } @@ -38,15 +38,6 @@ NatPmp::~NatPmp() void NatPmp::initNatPmp() { - if (not isValidThread()) { - runOnNatPmpQueue([w = weak()] { - if (auto pmpThis = w.lock()) { - pmpThis->initNatPmp(); - } - }); - return; - } - initialized_ = false; { @@ -82,8 +73,7 @@ NatPmp::initNatPmp() err = NATPMP_ERR_CANNOTGETGATEWAY; } else { // JAMI_WARN("NAT-PMP: Trying to initialize using detected gateway %s", - localGw.toString().c_str()); - + // localGw.toString().c_str()); struct in_addr inaddr; inet_pton(AF_INET, localGw.toString().c_str(), &inaddr); err = initnatpmp(&natpmpHdl_, 1, inaddr.s_addr); @@ -119,17 +109,6 @@ NatPmp::initNatPmp() void NatPmp::setObserver(UpnpMappingObserver* obs) { - if (not isValidThread()) { - runOnNatPmpQueue([w = weak(), obs] { - if (auto pmpThis = w.lock()) { - pmpThis->setObserver(obs); - } - }); - return; - } - - // JAMI_DBG("NAT-PMP: Setting observer to %p", obs); - observer_ = obs; } @@ -152,10 +131,8 @@ NatPmp::terminate() std::unique_lock<std::mutex> lk(natpmpMutex_); std::condition_variable cv {}; - runOnNatPmpQueue([w = weak(), &cv = cv] { - if (auto pmpThis = w.lock()) { - pmpThis->terminate(cv); - } + ioContext->dispatch([&] { + terminate(cv); }); if (cv.wait_for(lk, std::chrono::seconds(10), [this] { return shutdownComplete_; })) { @@ -175,15 +152,6 @@ NatPmp::getHostAddress() const void NatPmp::clearIgds() { - if (not isValidThread()) { - runOnNatPmpQueue([w = weak()] { - if (auto pmpThis = w.lock()) { - pmpThis->clearIgds(); - } - }); - return; - } - bool do_close = false; if (igd_) { @@ -194,8 +162,7 @@ NatPmp::clearIgds() } initialized_ = false; - if (searchForIgdTimer_) - searchForIgdTimer_->cancel(); + searchForIgdTimer_.cancel(); igdSearchCounter_ = 0; @@ -208,15 +175,6 @@ NatPmp::clearIgds() void NatPmp::searchForIgd() { - if (not isValidThread()) { - runOnNatPmpQueue([w = weak()] { - if (auto pmpThis = w.lock()) { - pmpThis->searchForIgd(); - } - }); - return; - } - if (not initialized_) { initNatPmp(); } @@ -227,12 +185,11 @@ NatPmp::searchForIgd() // JAMI_DBG("NAT-PMP: Start search for IGDs. Attempt %i", igdSearchCounter_); // Cancel the current timer (if any) and re-schedule. - if (searchForIgdTimer_) - searchForIgdTimer_->cancel(); - - searchForIgdTimer_ = getNatpmpScheduler()->scheduleIn([this] { searchForIgd(); }, - NATPMP_SEARCH_RETRY_UNIT - * igdSearchCounter_); + searchForIgdTimer_.expires_after(NATPMP_SEARCH_RETRY_UNIT * igdSearchCounter_); + searchForIgdTimer_.async_wait([this](const asio::error_code& ec) { + if (!ec) + searchForIgd(); + }); } else { // JAMI_WARN("NAT-PMP: Setup failed after %u trials. NAT-PMP will be disabled!", // MAX_RESTART_SEARCH_RETRIES); @@ -291,14 +248,14 @@ void NatPmp::requestMappingAdd(const Mapping& mapping) { // Process on nat-pmp thread. - if (not isValidThread()) { - runOnNatPmpQueue([w = weak(), mapping] { + /*if (not isValidThread()) { + ioContext->post([w = weak(), mapping] { if (auto pmpThis = w.lock()) { pmpThis->requestMappingAdd(mapping); } }); return; - } + }*/ Mapping map(mapping); assert(map.getIgd()); @@ -329,14 +286,14 @@ void NatPmp::requestMappingRenew(const Mapping& mapping) { // Process on nat-pmp thread. - if (not isValidThread()) { - runOnNatPmpQueue([w = weak(), mapping] { + /*if (not isValidThread()) { + ioContext->post([w = weak(), mapping] { if (auto pmpThis = w.lock()) { pmpThis->requestMappingRenew(mapping); } }); return; - } + }*/ Mapping map(mapping); auto err = addPortMapping(map); @@ -401,7 +358,7 @@ NatPmp::readResponse(natpmp_t& handle, natpmpresp_t& response) int NatPmp::sendMappingRequest(const Mapping& mapping, uint32_t& lifetime) { - CHECK_VALID_THREAD(); + //CHECK_VALID_THREAD(); int err = sendnewportmappingrequest(&natpmpHdl_, mapping.getType() == PortType::UDP ? NATPMP_PROTOCOL_UDP @@ -478,16 +435,12 @@ NatPmp::addPortMapping(Mapping& mapping) void NatPmp::requestMappingRemove(const Mapping& mapping) { - // Process on nat-pmp thread. - if (not isValidThread()) { - runOnNatPmpQueue([w = weak(), mapping] { - if (auto pmpThis = w.lock()) { - Mapping map {mapping}; - pmpThis->removePortMapping(map); - } - }); - return; - } + ioContext->dispatch([w = weak(), mapping] { + if (auto pmpThis = w.lock()) { + Mapping map {mapping}; + pmpThis->removePortMapping(map); + } + }); } void @@ -522,7 +475,7 @@ NatPmp::removePortMapping(Mapping& mapping) void NatPmp::getIgdPublicAddress() { - CHECK_VALID_THREAD(); + //CHECK_VALID_THREAD(); // Set the public address for this IGD if it does not // have one already. @@ -585,7 +538,7 @@ NatPmp::getIgdPublicAddress() void NatPmp::removeAllMappings() { - CHECK_VALID_THREAD(); + //CHECK_VALID_THREAD(); // JAMI_WARN("NAT-PMP: Send request to close all existing mappings to IGD %s", // igd_->toString().c_str()); @@ -721,7 +674,7 @@ NatPmp::processIgdUpdate(UpnpIgdEvent event) if (observer_ == nullptr) return; // Process the response on the context thread. - runOnUpnpContextQueue([obs = observer_, igd = igd_, event] { obs->onIgdUpdated(igd, event); }); + ioContext->post([obs = observer_, igd = igd_, event] { obs->onIgdUpdated(igd, event); }); } void @@ -731,7 +684,7 @@ NatPmp::processMappingAdded(const Mapping& map) return; // Process the response on the context thread. - runOnUpnpContextQueue([obs = observer_, igd = igd_, map] { obs->onMappingAdded(igd, map); }); + ioContext->post([obs = observer_, igd = igd_, map] { obs->onMappingAdded(igd, map); }); } void @@ -741,7 +694,7 @@ NatPmp::processMappingRequestFailed(const Mapping& map) return; // Process the response on the context thread. - runOnUpnpContextQueue([obs = observer_, igd = igd_, map] { obs->onMappingRequestFailed(map); }); + ioContext->post([obs = observer_, igd = igd_, map] { obs->onMappingRequestFailed(map); }); } void @@ -751,7 +704,7 @@ NatPmp::processMappingRenewed(const Mapping& map) return; // Process the response on the context thread. - runOnUpnpContextQueue([obs = observer_, igd = igd_, map] { obs->onMappingRenewed(igd, map); }); + ioContext->post([obs = observer_, igd = igd_, map] { obs->onMappingRenewed(igd, map); }); } void @@ -761,7 +714,7 @@ NatPmp::processMappingRemoved(const Mapping& map) return; // Process the response on the context thread. - runOnUpnpContextQueue([obs = observer_, igd = igd_, map] { obs->onMappingRemoved(igd, map); }); + ioContext->post([obs = observer_, igd = igd_, map] { obs->onMappingRemoved(igd, map); }); } } // namespace upnp diff --git a/src/upnp/protocol/natpmp/nat_pmp.h b/src/upnp/protocol/natpmp/nat_pmp.h index 30642a0ba1fbf83af8dcec11e19415fdf1a79bab..fbc2fc0bebe7d00305c3257d7b3589beb307776f 100644 --- a/src/upnp/protocol/natpmp/nat_pmp.h +++ b/src/upnp/protocol/natpmp/nat_pmp.h @@ -16,14 +16,10 @@ */ #pragma once -#include "connectivity/upnp/protocol/upnp_protocol.h" -#include "connectivity/upnp/protocol/igd.h" +#include "../upnp_protocol.h" +#include "../igd.h" #include "pmp_igd.h" - -#include "logger.h" -#include "connectivity/ip_utils.h" -#include "noncopyable.h" -#include "compiler_intrinsics.h" +#include "ip_utils.h" // uncomment to enable native natpmp error messages //#define ENABLE_STRNATPMPERR 1 @@ -53,7 +49,7 @@ constexpr static auto NATPMP_SEARCH_RETRY_UNIT {std::chrono::seconds(10)}; class NatPmp : public UPnPProtocol { public: - NatPmp(); + NatPmp(const std::shared_ptr<asio::io_context>& ctx, const std::shared_ptr<dht::log::Logger>& logger); ~NatPmp(); // Set the observer. @@ -94,21 +90,11 @@ public: void terminate() override; private: - NON_COPYABLE(NatPmp); + NatPmp& operator=(const NatPmp&) = delete; + NatPmp(const NatPmp&) = delete; std::weak_ptr<NatPmp> weak() { return std::static_pointer_cast<NatPmp>(shared_from_this()); } - // Helpers to run tasks on NAT-PMP internal execution queue. - ScheduledExecutor* getNatpmpScheduler() { return &natpmpScheduler_; } - template<typename Callback> - void runOnNatPmpQueue(Callback&& cb) - { - natpmpScheduler_.run([cb = std::forward<Callback>(cb)]() mutable { cb(); }); - } - - // Helpers to run tasks on UPNP context execution queue. - ScheduledExecutor* getUpnContextScheduler() { return UpnpThreadUtil::getScheduler(); } - void terminate(std::condition_variable& cv); void initNatPmp(); @@ -147,8 +133,8 @@ private: // Data members std::shared_ptr<PMPIGD> igd_; natpmp_t natpmpHdl_; - ScheduledExecutor natpmpScheduler_ {"natpmp"}; - std::shared_ptr<Task> searchForIgdTimer_ {}; + std::shared_ptr<asio::io_context> ioContext; + asio::steady_timer searchForIgdTimer_; unsigned int igdSearchCounter_ {0}; UpnpMappingObserver* observer_ {nullptr}; IpAddr hostAddress_ {}; diff --git a/src/upnp/protocol/natpmp/pmp_igd.h b/src/upnp/protocol/natpmp/pmp_igd.h index df1c44b2dd8359fd06f226c9f74f8a00719e5bc2..1ca7fe74a6869a803317778dc08c5616ce7842a3 100644 --- a/src/upnp/protocol/natpmp/pmp_igd.h +++ b/src/upnp/protocol/natpmp/pmp_igd.h @@ -17,8 +17,7 @@ #pragma once #include "../igd.h" -#include "noncopyable.h" -#include "connectivity/ip_utils.h" +#include "ip_utils.h" #include <map> #include <atomic> diff --git a/src/upnp/protocol/pupnp/pupnp.cpp b/src/upnp/protocol/pupnp/pupnp.cpp index e6ad85d174fb78cd078615090baf7ad4a3fe1500..df68696c8af74249d0c9b13606e1a2c237c25ee7 100644 --- a/src/upnp/protocol/pupnp/pupnp.cpp +++ b/src/upnp/protocol/pupnp/pupnp.cpp @@ -15,6 +15,7 @@ * along with this program. If not, see <https://www.gnu.org/licenses/>. */ #include "pupnp.h" +#include "string_utils.h" #include <opendht/thread_pool.h> #include <opendht/http.h> @@ -95,13 +96,10 @@ errorOnResponse(IXML_Document* doc) // UPNP class implementation -PUPnP::PUPnP() +PUPnP::PUPnP(const std::shared_ptr<asio::io_context>& ctx, const std::shared_ptr<dht::log::Logger>& logger) + : UPnPProtocol(logger), ioContext(ctx), searchForIgdTimer_(*ctx) { - // JAMI_DBG("PUPnP: Creating instance [%p] ...", this); - runOnPUPnPQueue([this] { - threadId_ = getCurrentThread(); - // JAMI_DBG("PUPnP: Instance [%p] created", this); - }); + // JAMI_LOG("PUPnP: Creating instance [{}] ...", fmt::ptr(this)); } PUPnP::~PUPnP() @@ -142,10 +140,10 @@ PUPnP::initUpnpLib() ip_address6 = UpnpGetServerIp6Address(); port6 = UpnpGetServerPort6(); #endif - if (ip_address6 and port6) - // JAMI_DBG("PUPnP: Initialized on %s:%u | %s:%u", ip_address, port, ip_address6, port6); + /*if (ip_address6 and port6) + JAMI_DBG("PUPnP: Initialized on %s:%u | %s:%u", ip_address, port, ip_address6, port6); else - // JAMI_DBG("PUPnP: Initialized on %s:%u", ip_address, port); + JAMI_DBG("PUPnP: Initialized on %s:%u", ip_address, port);*/ // Relax the parser to allow malformed XML text. ixmlRelaxParser(1); @@ -165,8 +163,6 @@ PUPnP::registerClient() { assert(not clientRegistered_); - CHECK_VALID_THREAD(); - // Register Upnp control point. int upnp_err = UpnpRegisterClient(ctrlPtCallback, this, &ctrlptHandle_); if (upnp_err != UPNP_E_SUCCESS) { @@ -180,17 +176,6 @@ PUPnP::registerClient() void PUPnP::setObserver(UpnpMappingObserver* obs) { - if (not isValidThread()) { - runOnPUPnPQueue([w = weak(), obs] { - if (auto upnpThis = w.lock()) { - upnpThis->setObserver(obs); - } - }); - return; - } - - // JAMI_DBG("PUPnP: Setting observer to %p", obs); - observer_ = obs; } @@ -236,10 +221,8 @@ PUPnP::terminate() std::unique_lock<std::mutex> lk(pupnpMutex_); std::condition_variable cv {}; - runOnPUPnPQueue([w = weak(), &cv = cv] { - if (auto upnpThis = w.lock()) { - upnpThis->terminate(cv); - } + ioContext->dispatch([&] { + terminate(cv); }); if (cv.wait_for(lk, std::chrono::seconds(10), [this] { return shutdownComplete_; })) { @@ -254,8 +237,6 @@ PUPnP::terminate() void PUPnP::searchForDevices() { - CHECK_VALID_THREAD(); - // JAMI_DBG("PUPnP: Send IGD search request"); // Send out search for multiple types of devices, as some routers may possibly @@ -293,19 +274,9 @@ PUPnP::searchForDevices() void PUPnP::clearIgds() { - if (not isValidThread()) { - runOnPUPnPQueue([w = weak()] { - if (auto upnpThis = w.lock()) { - upnpThis->clearIgds(); - } - }); - return; - } - // JAMI_DBG("PUPnP: clearing IGDs and devices lists"); - if (searchForIgdTimer_) - searchForIgdTimer_->cancel(); + searchForIgdTimer_.cancel(); igdSearchCounter_ = 0; @@ -324,15 +295,6 @@ PUPnP::clearIgds() void PUPnP::searchForIgd() { - if (not isValidThread()) { - runOnPUPnPQueue([w = weak()] { - if (auto upnpThis = w.lock()) { - upnpThis->searchForIgd(); - } - }); - return; - } - // Update local address before searching. updateHostAddress(); @@ -375,15 +337,13 @@ PUPnP::searchForIgd() // The connectivity change may be received while the the local // interface is not fully setup. The rescheduling typically // usefull to mitigate this race. - if (searchForIgdTimer_) - searchForIgdTimer_->cancel(); - - searchForIgdTimer_ = getUpnContextScheduler()->scheduleIn( - [w = weak()] { + searchForIgdTimer_.expires_after(PUPNP_SEARCH_RETRY_UNIT * igdSearchCounter_); + searchForIgdTimer_.async_wait([w = weak()] (const asio::error_code& ec) { + if (not ec) { if (auto upnpThis = w.lock()) upnpThis->searchForIgd(); - }, - PUPNP_SEARCH_RETRY_UNIT * igdSearchCounter_); + } + }); } std::list<std::shared_ptr<IGD>> @@ -453,8 +413,6 @@ PUPnP::incrementErrorsCounter(const std::shared_ptr<IGD>& igd) bool PUPnP::validateIgd(const std::string& location, IXML_Document* doc_container_ptr) { - CHECK_VALID_THREAD(); - assert(doc_container_ptr != nullptr); XMLDocument document(doc_container_ptr, ixmlDocument_free); @@ -574,7 +532,7 @@ PUPnP::validateIgd(const std::string& location, IXML_Document* doc_container_ptr } // Report to the listener. - runOnUpnpContextQueue([w = weak(), igd_candidate] { + ioContext->post([w = weak(), igd_candidate] { if (auto upnpThis = w.lock()) { if (upnpThis->observer_) upnpThis->observer_->onIgdUpdated(igd_candidate, UpnpIgdEvent::ADDED); @@ -587,7 +545,7 @@ PUPnP::validateIgd(const std::string& location, IXML_Document* doc_container_ptr void PUPnP::requestMappingAdd(const Mapping& mapping) { - runOnPUPnPQueue([w = weak(), mapping] { + ioContext->post([w = weak(), mapping] { if (auto upnpThis = w.lock()) { if (not upnpThis->isRunning()) return; @@ -609,7 +567,7 @@ void PUPnP::requestMappingRemove(const Mapping& mapping) { // Send remove request using the matching IGD - runOnPUPnPQueue([w = weak(), mapping] { + ioContext->dispatch([w = weak(), mapping] { if (auto upnpThis = w.lock()) { // Abort if we are shutting down. if (not upnpThis->isRunning()) @@ -650,12 +608,10 @@ PUPnP::findMatchingIgd(const std::string& ctrlURL) const void PUPnP::processAddMapAction(const Mapping& map) { - CHECK_VALID_THREAD(); - if (observer_ == nullptr) return; - runOnUpnpContextQueue([w = weak(), map] { + ioContext->post([w = weak(), map] { if (auto upnpThis = w.lock()) { if (upnpThis->observer_) upnpThis->observer_->onMappingAdded(map.getIgd(), std::move(map)); @@ -666,12 +622,10 @@ PUPnP::processAddMapAction(const Mapping& map) void PUPnP::processRequestMappingFailure(const Mapping& map) { - CHECK_VALID_THREAD(); - if (observer_ == nullptr) return; - runOnUpnpContextQueue([w = weak(), map] { + ioContext->post([w = weak(), map] { if (auto upnpThis = w.lock()) { // JAMI_DBG("PUPnP: Failed to request mapping %s", map.toString().c_str()); if (upnpThis->observer_) @@ -683,12 +637,10 @@ PUPnP::processRequestMappingFailure(const Mapping& map) void PUPnP::processRemoveMapAction(const Mapping& map) { - CHECK_VALID_THREAD(); - if (observer_ == nullptr) return; - runOnUpnpContextQueue([map, obs = observer_] { + ioContext->post([map, obs = observer_] { // JAMI_DBG("PUPnP: Closed mapping %s", map.toString().c_str()); obs->onMappingRemoved(map.getIgd(), std::move(map)); }); @@ -779,8 +731,6 @@ PUPnP::processDiscoverySearchResult(const std::string& cpDeviceId, const std::string& igdLocationUrl, const IpAddr& dstAddr) { - CHECK_VALID_THREAD(); - // Update host address if needed. if (not hasValidHostAddress()) updateHostAddress(); @@ -833,12 +783,12 @@ PUPnP::downLoadIgdDescription(const std::string& locationUrl) int upnp_err = UpnpDownloadXmlDoc(locationUrl.c_str(), &doc_container_ptr); if (upnp_err != UPNP_E_SUCCESS or not doc_container_ptr) { - // JAMI_WARN("PUPnP: Error downloading device XML document from %s -> %s", - // locationUrl.c_str(), - // UpnpGetErrorMessage(upnp_err)); + if(logger_) logger_->warn("PUPnP: Error downloading device XML document from {} -> {}", + locationUrl, + UpnpGetErrorMessage(upnp_err)); } else { - // JAMI_DBG("PUPnP: Succeeded to download device XML document from %s", locationUrl.c_str()); - runOnPUPnPQueue([w = weak(), url = locationUrl, doc_container_ptr] { + if(logger_) logger_->debug("PUPnP: Succeeded to download device XML document from {}", locationUrl); + ioContext->post([w = weak(), url = locationUrl, doc_container_ptr] { if (auto upnpThis = w.lock()) { upnpThis->validateIgd(url, doc_container_ptr); } @@ -849,8 +799,6 @@ PUPnP::downLoadIgdDescription(const std::string& locationUrl) void PUPnP::processDiscoveryAdvertisementByebye(const std::string& cpDeviceId) { - CHECK_VALID_THREAD(); - discoveredIgdList_.erase(cpDeviceId); std::shared_ptr<IGD> igd; @@ -882,8 +830,6 @@ PUPnP::processDiscoveryAdvertisementByebye(const std::string& cpDeviceId) void PUPnP::processDiscoverySubscriptionExpired(Upnp_EventType event_type, const std::string& eventSubUrl) { - CHECK_VALID_THREAD(); - std::lock_guard<std::mutex> lk(pupnpMutex_); for (auto& it : validIgdList_) { if (auto igd = std::dynamic_pointer_cast<UPnPIGD>(it)) { @@ -925,7 +871,7 @@ PUPnP::handleCtrlPtUPnPEvents(Upnp_EventType event_type, const void* event) std::string deviceId {UpnpDiscovery_get_DeviceID_cstr(d_event)}; std::string location {UpnpDiscovery_get_Location_cstr(d_event)}; IpAddr dstAddr(*(const pj_sockaddr*) (UpnpDiscovery_get_DestAddr(d_event))); - runOnPUPnPQueue([w = weak(), + ioContext->post([w = weak(), deviceId = std::move(deviceId), location = std::move(location), dstAddr = std::move(dstAddr)] { @@ -941,7 +887,7 @@ PUPnP::handleCtrlPtUPnPEvents(Upnp_EventType event_type, const void* event) std::string deviceId(UpnpDiscovery_get_DeviceID_cstr(d_event)); // Process the response on the main thread. - runOnPUPnPQueue([w = weak(), deviceId = std::move(deviceId)] { + ioContext->post([w = weak(), deviceId = std::move(deviceId)] { if (auto upnpThis = w.lock()) { upnpThis->processDiscoveryAdvertisementByebye(deviceId); } @@ -971,7 +917,7 @@ PUPnP::handleCtrlPtUPnPEvents(Upnp_EventType event_type, const void* event) std::string publisherUrl(UpnpEventSubscribe_get_PublisherUrl_cstr(es_event)); // Process the response on the main thread. - runOnPUPnPQueue([w = weak(), event_type, publisherUrl = std::move(publisherUrl)] { + ioContext->post([w = weak(), event_type, publisherUrl = std::move(publisherUrl)] { if (auto upnpThis = w.lock()) { upnpThis->processDiscoverySubscriptionExpired(event_type, publisherUrl); } @@ -1332,7 +1278,7 @@ PUPnP::getMappingsListByDescr(const std::shared_ptr<IGD>& igd, const std::string break; } else { auto errorDescription = getFirstDocItem(response.get(), "errorDescription"); - JAMI_ERROR("PUPnP: GetGenericPortMappingEntry returned with error: {:s}: {:s}", + if (logger_) logger_->error("PUPnP: GetGenericPortMappingEntry returned with error: {:s}: {:s}", errorCode, errorDescription); break; @@ -1356,7 +1302,7 @@ PUPnP::getMappingsListByDescr(const std::shared_ptr<IGD>& igd, const std::string std::string transport(getFirstDocItem(response.get(), "NewProtocol")); if (port_internal.empty() || port_external.empty() || transport.empty()) { - // JAMI_ERR("PUPnP: GetGenericPortMappingEntry returned an invalid entry at index %i", + // if (logger_) logger_->e("PUPnP: GetGenericPortMappingEntry returned an invalid entry at index %i", // entry_idx); continue; } @@ -1372,9 +1318,9 @@ PUPnP::getMappingsListByDescr(const std::shared_ptr<IGD>& igd, const std::string mapList.emplace(map.getMapKey(), std::move(map)); } - JAMI_DEBUG("PUPnP: Found {:d} allocated mappings on IGD {:s}", - mapList.size(), - upnpIgd->toString()); + // if (logger_) logger_->debug("PUPnP: Found {:d} allocated mappings on IGD {:s}", + // mapList.size(), + // upnpIgd->toString()); return mapList; } @@ -1385,22 +1331,23 @@ PUPnP::deleteMappingsByDescription(const std::shared_ptr<IGD>& igd, const std::s if (not(clientRegistered_ and igd->getLocalIp())) return; - // JAMI_DBG("PUPnP: Remove all mappings (if any) on IGD %s matching descr prefix %s", - // igd->toString().c_str(), + // if (logger_) logger_->debug("PUPnP: Remove all mappings (if any) on IGD {} matching descr prefix {}", + // igd->toString(), // Mapping::UPNP_MAPPING_DESCRIPTION_PREFIX); - auto mapList = getMappingsListByDescr(igd, description); - - for (auto const& [_, map] : mapList) { - requestMappingRemove(map); - } + ioContext->post([w=weak(), igd, description]{ + if (auto sthis = w.lock()) { + auto mapList = sthis->getMappingsListByDescr(igd, description); + for (auto const& [_, map] : mapList) { + sthis->requestMappingRemove(map); + } + } + }); } bool PUPnP::actionAddPortMapping(const Mapping& mapping) { - CHECK_VALID_THREAD(); - if (not clientRegistered_) return false; @@ -1475,13 +1422,13 @@ PUPnP::actionAddPortMapping(const Mapping& mapping) bool success = true; if (upnp_err != UPNP_E_SUCCESS) { - // JAMI_WARN("PUPnP: Failed to send action %s for mapping %s. %d: %s", + // if (logger_) logger_->warn("PUPnP: Failed to send action {} for mapping {}. {:d}: {}", // ACTION_ADD_PORT_MAPPING, - // mapping.toString().c_str(), + // mapping.toString(), // upnp_err, // UpnpGetErrorMessage(upnp_err)); - // JAMI_WARN("PUPnP: IGD ctrlUrl %s", igd->getControlURL().c_str()); - // JAMI_WARN("PUPnP: IGD service type %s", igd->getServiceType().c_str()); + // if (logger_) logger_->warn("PUPnP: IGD ctrlUrl {}", igd->getControlURL()); + // if (logger_) logger_->warn("PUPnP: IGD service type {}", igd->getServiceType()); success = false; } @@ -1496,7 +1443,7 @@ PUPnP::actionAddPortMapping(const Mapping& mapping) errorDescription = getFirstDocItem(response.get(), "errorDescription"); } - // JAMI_WARNING("PUPnP: {:s} returned with error: {:s} {:s}", + // if (logger_) logger_->warn("PUPnP: {:s} returned with error: {:s} {:s}", // ACTION_ADD_PORT_MAPPING, // errorCode, // errorDescription); @@ -1507,8 +1454,6 @@ PUPnP::actionAddPortMapping(const Mapping& mapping) bool PUPnP::actionDeletePortMapping(const Mapping& mapping) { - CHECK_VALID_THREAD(); - if (not clientRegistered_) return false; @@ -1558,19 +1503,20 @@ PUPnP::actionDeletePortMapping(const Mapping& mapping) bool success = true; if (upnp_err != UPNP_E_SUCCESS) { - // JAMI_WARN("PUPnP: Failed to send action %s for mapping from %s. %d: %s", + // if (logger_) { + // logger_->warn("PUPnP: Failed to send action {} for mapping from {}. {:d}: {}", // ACTION_DELETE_PORT_MAPPING, - // mapping.toString().c_str(), + // mapping.toString(), // upnp_err, // UpnpGetErrorMessage(upnp_err)); - // JAMI_WARN("PUPnP: IGD ctrlUrl %s", igd->getControlURL().c_str()); - // JAMI_WARN("PUPnP: IGD service type %s", igd->getServiceType().c_str()); - + // logger_->warn("PUPnP: IGD ctrlUrl {}", igd->getControlURL()); + // logger_->warn("PUPnP: IGD service type {}", igd->getServiceType()); + // } success = false; } if (not response) { - // JAMI_WARN("PUPnP: Failed to get response for %s", ACTION_DELETE_PORT_MAPPING); + // if (logger_) logger_->warn("PUPnP: Failed to get response for {}", ACTION_DELETE_PORT_MAPPING); success = false; } @@ -1578,7 +1524,7 @@ PUPnP::actionDeletePortMapping(const Mapping& mapping) auto errorCode = getFirstDocItem(response.get(), "errorCode"); if (not errorCode.empty()) { auto errorDescription = getFirstDocItem(response.get(), "errorDescription"); - // JAMI_WARNING("PUPnP: {:s} returned with error: {:s}: {:s}", + // if (logger_) logger_->warn("PUPnP: {:s} returned with error: {:s}: {:s}", // ACTION_DELETE_PORT_MAPPING, // errorCode, // errorDescription); diff --git a/src/upnp/protocol/pupnp/pupnp.h b/src/upnp/protocol/pupnp/pupnp.h index d45fddea2c11471750b08ccb211956222872f2df..4c0ea789d53c54ea29ed1fd8163e8d178d3667dc 100644 --- a/src/upnp/protocol/pupnp/pupnp.h +++ b/src/upnp/protocol/pupnp/pupnp.h @@ -24,11 +24,7 @@ #include "../upnp_protocol.h" #include "../igd.h" #include "upnp_igd.h" - -#include "logger.h" -#include "connectivity/ip_utils.h" -#include "noncopyable.h" -#include "compiler_intrinsics.h" +#include "ip_utils.h" #include <upnp/upnp.h> #include <upnp/upnptools.h> @@ -68,7 +64,7 @@ public: GET_EXTERNAL_IP_ADDRESS }; - PUPnP(); + PUPnP(const std::shared_ptr<asio::io_context>& ctx, const std::shared_ptr<dht::log::Logger>& logger); ~PUPnP(); // Set the observer @@ -113,18 +109,8 @@ public: void terminate() override; private: - NON_COPYABLE(PUPnP); - - // Helpers to run tasks on PUPNP private execution queue. - ScheduledExecutor* getPUPnPScheduler() { return &pupnpScheduler_; } - template<typename Callback> - void runOnPUPnPQueue(Callback&& cb) - { - pupnpScheduler_.run([cb = std::forward<Callback>(cb)]() mutable { cb(); }); - } - - // Helper to run tasks on UPNP context execution queue. - ScheduledExecutor* getUpnContextScheduler() { return UpnpThreadUtil::getScheduler(); } + PUPnP& operator=(const PUPnP&) = delete; + PUPnP(const PUPnP&) = delete; void terminate(std::condition_variable& cv); @@ -224,15 +210,13 @@ private: std::weak_ptr<PUPnP> weak() { return std::static_pointer_cast<PUPnP>(shared_from_this()); } - // Execution queue to run lib upnp actions - ScheduledExecutor pupnpScheduler_ {"pupnp"}; - // Initialization status. std::atomic_bool initialized_ {false}; // Client registration status. std::atomic_bool clientRegistered_ {false}; - std::shared_ptr<Task> searchForIgdTimer_ {}; + std::shared_ptr<asio::io_context> ioContext; + asio::steady_timer searchForIgdTimer_; unsigned int igdSearchCounter_ {0}; // List of discovered IGDs. diff --git a/src/upnp/protocol/pupnp/upnp_igd.h b/src/upnp/protocol/pupnp/upnp_igd.h index 9f335058e76f77e037c92cb7790e07489179f070..5ea7fe565f5b56dbe27c31e15a6b2e4a7d9c4ad2 100644 --- a/src/upnp/protocol/pupnp/upnp_igd.h +++ b/src/upnp/protocol/pupnp/upnp_igd.h @@ -16,10 +16,8 @@ */ #pragma once -#include "connectivity/upnp/protocol/igd.h" - -#include "noncopyable.h" -#include "connectivity/ip_utils.h" +#include "../igd.h" +#include "ip_utils.h" #include <map> #include <string> diff --git a/src/upnp/protocol/upnp_protocol.h b/src/upnp/protocol/upnp_protocol.h index ce891e178e9c88d09edaaae322e4e8629c103159..3dde4abd13406627215e6ea76db269a9b751f5cb 100644 --- a/src/upnp/protocol/upnp_protocol.h +++ b/src/upnp/protocol/upnp_protocol.h @@ -16,7 +16,8 @@ */ #pragma once -#include "igd.h" +#include "./igd.h" +#include "upnp/upnp_context.h" #include "upnp/mapping.h" #include "ip_utils.h" @@ -48,7 +49,7 @@ class UPnPProtocol : public std::enable_shared_from_this<UPnPProtocol>//, protec public: enum class UpnpError : int { INVALID_ERR = -1, ERROR_OK, CONFLICT_IN_MAPPING }; - UPnPProtocol() {}; + UPnPProtocol(const std::shared_ptr<dht::log::Logger>& logger) : logger_(logger) {}; virtual ~UPnPProtocol() {}; // Get protocol type. @@ -93,6 +94,8 @@ public: // Terminate virtual void terminate() = 0; + + std::shared_ptr<dht::log::Logger> logger_; }; } // namespace upnp diff --git a/src/upnp/upnp_context.cpp b/src/upnp/upnp_context.cpp index 6b1ff90861c75b0f9c287bf60bb7dff7f455f4c8..5022eb3913eb9929d3365f72b8774d424c51593b 100644 --- a/src/upnp/upnp_context.cpp +++ b/src/upnp/upnp_context.cpp @@ -17,6 +17,13 @@ #include "upnp/upnp_context.h" #include "protocol/upnp_protocol.h" +#if HAVE_LIBNATPMP +#include "protocol/natpmp/nat_pmp.h" +#endif +#if HAVE_LIBUPNP +#include "protocol/pupnp/pupnp.h" +#endif + #include <asio/steady_timer.hpp> #if __has_include(<fmt/std.h>) #include <fmt/std.h> @@ -37,7 +44,7 @@ constexpr static uint16_t UPNP_UDP_PORT_MIN {20000}; constexpr static uint16_t UPNP_UDP_PORT_MAX {UPNP_UDP_PORT_MIN + 5000}; UPnPContext::UPnPContext(const std::shared_ptr<asio::io_context>& ioContext, const std::shared_ptr<dht::log::Logger>& logger) - : mappingListUpdateTimer_(*ioContext) + : mappingListUpdateTimer_(*ioContext), ctx(ioContext), logger_(logger) { // JAMI_DBG("Creating UPnPContext instance [%p]", this); @@ -45,7 +52,7 @@ UPnPContext::UPnPContext(const std::shared_ptr<asio::io_context>& ioContext, con portRange_.emplace(PortType::TCP, std::make_pair(UPNP_TCP_PORT_MIN, UPNP_TCP_PORT_MAX)); portRange_.emplace(PortType::UDP, std::make_pair(UPNP_UDP_PORT_MIN, UPNP_UDP_PORT_MAX)); - ioContext->post([this] { init(); }); + ctx->post([this] { init(); }); } /*std::shared_ptr<UPnPContext> @@ -86,8 +93,8 @@ UPnPContext::shutdown() std::unique_lock<std::mutex> lk(mappingMutex_); std::condition_variable cv; - runOnUpnpContextQueue([&, this] { shutdown(cv); }); - + ctx->post([&, this] { shutdown(cv); }); + // JAMI_DBG("Waiting for shutdown ..."); if (cv.wait_for(lk, std::chrono::seconds(30), [this] { return shutdownComplete_; })) { @@ -105,17 +112,14 @@ UPnPContext::~UPnPContext() void UPnPContext::init() { - threadId_ = getCurrentThread(); - CHECK_VALID_THREAD(); - #if HAVE_LIBNATPMP - auto natPmp = std::make_shared<NatPmp>(); + auto natPmp = std::make_shared<NatPmp>(ctx, logger_); natPmp->setObserver(this); protocolList_.emplace(NatProtocolType::NAT_PMP, std::move(natPmp)); #endif #if HAVE_LIBUPNP - auto pupnp = std::make_shared<PUPnP>(); + auto pupnp = std::make_shared<PUPnP>(ctx, logger_); pupnp->setObserver(this); protocolList_.emplace(NatProtocolType::PUPNP, std::move(pupnp)); #endif @@ -126,13 +130,11 @@ UPnPContext::startUpnp() { assert(not controllerList_.empty()); - CHECK_VALID_THREAD(); - // JAMI_DBG("Starting UPNP context"); // Request a new IGD search. for (auto const& [_, protocol] : protocolList_) { - protocol->searchForIgd(); + ctx->dispatch([p=protocol] { p->searchForIgd(); }); } started_ = true; @@ -141,10 +143,10 @@ UPnPContext::startUpnp() void UPnPContext::stopUpnp(bool forceRelease) { - if (not isValidThread()) { - runOnUpnpContextQueue([this, forceRelease] { stopUpnp(forceRelease); }); + /*if (not isValidThread()) { + ctx->post([this, forceRelease] { stopUpnp(forceRelease); }); return; - } + }*/ // JAMI_DBG("Stopping UPNP context"); @@ -170,13 +172,12 @@ UPnPContext::stopUpnp(bool forceRelease) for (auto const& map : toRemoveList) { requestRemoveMapping(map); - // Notify is not needed in updateMappingState when + // Notify is not needed in updateState when // shutting down (hence set it to false). NotifyCallback // would trigger a new SIP registration and create a // false registered state upon program close. // It's handled by upper layers. - - updateMappingState(map, MappingState::FAILED, false); + map->updateState(MappingState::FAILED, false); // We dont remove mappings with auto-update enabled, // unless forceRelease is true. if (not map->getAutoUpdate() or forceRelease) { @@ -187,7 +188,7 @@ UPnPContext::stopUpnp(bool forceRelease) // Clear all current IGDs. for (auto const& [_, protocol] : protocolList_) { - protocol->clearIgds(); + ctx->dispatch([p=protocol]{ p->clearIgds(); }); } started_ = false; @@ -221,10 +222,10 @@ UPnPContext::generateRandomPort(PortType type, bool mustBeEven) void UPnPContext::connectivityChanged() { - if (not isValidThread()) { - runOnUpnpContextQueue([this] { connectivityChanged(); }); + /*if (not isValidThread()) { + ctx->post([this] { connectivityChanged(); }); return; - } + }*/ auto hostAddr = ip_utils::getLocalAddr(AF_INET); @@ -367,10 +368,10 @@ UPnPContext::reserveMapping(Mapping& requestedMap) void UPnPContext::releaseMapping(const Mapping& map) { - if (not isValidThread()) { - runOnUpnpContextQueue([this, map] { releaseMapping(map); }); + /*if (not isValidThread()) { + ctx->post([this, map] { releaseMapping(map); }); return; - } + }*/ auto mapPtr = getMappingWithKey(map.getMapKey()); @@ -401,10 +402,10 @@ UPnPContext::registerController(void* controller) } } - if (not isValidThread()) { - runOnUpnpContextQueue([this, controller] { registerController(controller); }); + /*if (not isValidThread()) { + ctx->post([this, controller] { registerController(controller); }); return; - } + }*/ auto ret = controllerList_.emplace(controller); if (not ret.second) { @@ -420,10 +421,10 @@ UPnPContext::registerController(void* controller) void UPnPContext::unregisterController(void* controller) { - if (not isValidThread()) { - runOnUpnpContextQueue([this, controller] { unregisterController(controller); }); + /*if (not isValidThread()) { + ctx->post([this, controller] { unregisterController(controller); }); return; - } + }*/ if (controllerList_.erase(controller) == 1) { // JAMI_DBG("Successfully unregistered controller %p", controller); @@ -462,10 +463,10 @@ UPnPContext::requestMapping(const Mapping::sharedPtr_t& map) { assert(map); - if (not isValidThread()) { - runOnUpnpContextQueue([this, map] { requestMapping(map); }); + /*if (not isValidThread()) { + ctx->post([this, map] { requestMapping(map); }); return; - } + }*/ auto const& igd = getPreferredIgd(); // We must have at least a valid IGD pointer if we get here. @@ -484,8 +485,7 @@ UPnPContext::requestMapping(const Mapping::sharedPtr_t& map) // igd->getProtocolName(), // igd->toString().c_str()); - if (map->getState() != MappingState::IN_PROGRESS) - updateMappingState(map, MappingState::IN_PROGRESS); + map->updateState(MappingState::IN_PROGRESS); auto const& protocol = protocolList_.at(igd->getProtocol()); protocol->requestMappingAdd(*map); @@ -522,7 +522,7 @@ UPnPContext::deleteUnneededMappings(PortType type, int portCount) assert(portCount > 0); - CHECK_VALID_THREAD(); + //CHECK_VALID_THREAD(); std::lock_guard<std::mutex> lock(mappingMutex_); auto& mappingList = getMappingList(type); @@ -539,14 +539,14 @@ UPnPContext::deleteUnneededMappings(PortType type, int portCount) if (map->getState() == MappingState::OPEN and portCount > 0) { // Close portCount mappings in "OPEN" state. requestRemoveMapping(map); - it = unregisterMapping(it); + it = mappingList.erase(it); portCount--; } else if (map->getState() != MappingState::OPEN) { // If this methods is called, it means there are more open // mappings than required. So, all mappings in a state other // than "OPEN" state (typically in in-progress state) will // be deleted as well. - it = unregisterMapping(it); + it = mappingList.erase(it); } else { it++; } @@ -558,7 +558,7 @@ UPnPContext::deleteUnneededMappings(PortType type, int portCount) void UPnPContext::updatePreferredIgd() { - CHECK_VALID_THREAD(); + //CHECK_VALID_THREAD(); if (preferredIgd_ and preferredIgd_->isValid()) return; @@ -594,7 +594,7 @@ UPnPContext::updatePreferredIgd() std::shared_ptr<IGD> UPnPContext::getPreferredIgd() const { - CHECK_VALID_THREAD(); + //CHECK_VALID_THREAD(); return preferredIgd_; } @@ -604,11 +604,11 @@ UPnPContext::updateMappingList(bool async) { // Run async if requested. if (async) { - runOnUpnpContextQueue([this] { updateMappingList(false); }); + ctx->post([this] { updateMappingList(false); }); return; } - CHECK_VALID_THREAD(); + //CHECK_VALID_THREAD(); // Update the preferred IGD. updatePreferredIgd(); @@ -712,7 +712,7 @@ UPnPContext::updateMappingList(bool async) void UPnPContext::pruneMappingList() { - CHECK_VALID_THREAD(); + //CHECK_VALID_THREAD(); MappingStatus status; getMappingStatus(status); @@ -776,7 +776,7 @@ UPnPContext::pruneUnMatchedMappings(const std::shared_ptr<IGD>& igd, } for (auto const& map : toRemoveList) { - updateMappingState(map, MappingState::FAILED); + map->updateState(MappingState::FAILED); unregisterMapping(map); } } @@ -817,7 +817,7 @@ UPnPContext::pruneUnTrackedMappings(const std::shared_ptr<IGD>& igd, void UPnPContext::pruneMappingsWithInvalidIgds(const std::shared_ptr<IGD>& igd) { - CHECK_VALID_THREAD(); + //CHECK_VALID_THREAD(); // Use temporary list to avoid holding the lock while // processing the mapping list. @@ -840,7 +840,7 @@ UPnPContext::pruneMappingsWithInvalidIgds(const std::shared_ptr<IGD>& igd) // map->toString().c_str(), // igd->toString().c_str(), // igd->getProtocolName()); - updateMappingState(map, MappingState::FAILED); + map->updateState(MappingState::FAILED); unregisterMapping(map); } } @@ -926,10 +926,10 @@ UPnPContext::onIgdUpdated(const std::shared_ptr<IGD>& igd, UpnpIgdEvent event) { assert(igd); - if (not isValidThread()) { - runOnUpnpContextQueue([this, igd, event] { onIgdUpdated(igd, event); }); + /*if (not isValidThread()) { + ctx->post([this, igd, event] { onIgdUpdated(igd, event); }); return; - } + }*/ // Reset to start search for a new best IGD. preferredIgd_.reset(); @@ -1005,7 +1005,7 @@ UPnPContext::onIgdUpdated(const std::shared_ptr<IGD>& igd, UpnpIgdEvent event) void UPnPContext::onMappingAdded(const std::shared_ptr<IGD>& igd, const Mapping& mapRes) { - CHECK_VALID_THREAD(); + //CHECK_VALID_THREAD(); // Check if we have a pending request for this response. auto map = getMappingWithKey(mapRes.getMapKey()); @@ -1024,7 +1024,7 @@ UPnPContext::onMappingAdded(const std::shared_ptr<IGD>& igd, const Mapping& mapR map->setExternalPort(mapRes.getExternalPort()); // Update the state and report to the owner. - updateMappingState(map, MappingState::OPEN); + map->updateState(MappingState::OPEN); // JAMI_DBG("Mapping %s (on IGD %s [%s]) successfully performed", // map->toString().c_str(), @@ -1066,18 +1066,10 @@ UPnPContext::onMappingRenewed(const std::shared_ptr<IGD>& igd, const Mapping& ma void UPnPContext::requestRemoveMapping(const Mapping::sharedPtr_t& map) { - CHECK_VALID_THREAD(); - - if (not map) { - // JAMI_ERR("Mapping shared pointer is null!"); - return; - } - - if (not map->isValid()) { + if (not map or not map->isValid()) { // Silently ignore if the mapping is invalid return; } - auto protocol = protocolList_.at(map->getIgd()->getProtocol()); protocol->requestMappingRemove(*map); } @@ -1085,10 +1077,10 @@ UPnPContext::requestRemoveMapping(const Mapping::sharedPtr_t& map) void UPnPContext::deleteAllMappings(PortType type) { - if (not isValidThread()) { - runOnUpnpContextQueue([this, type] { deleteAllMappings(type); }); + /*if (not isValidThread()) { + ctx->post([this, type] { deleteAllMappings(type); }); return; - } + }*/ std::lock_guard<std::mutex> lock(mappingMutex_); auto& mappingList = getMappingList(type); @@ -1104,10 +1096,10 @@ UPnPContext::onMappingRemoved(const std::shared_ptr<IGD>& igd, const Mapping& ma if (not mapRes.isValid()) return; - if (not isValidThread()) { - runOnUpnpContextQueue([this, igd, mapRes] { onMappingRemoved(igd, mapRes); }); + /*if (not isValidThread()) { + ctx->post([this, igd, mapRes] { onMappingRemoved(igd, mapRes); }); return; - } + }*/ auto map = getMappingWithKey(mapRes.getMapKey()); // Notify the listener. @@ -1154,23 +1146,10 @@ UPnPContext::registerMapping(Mapping& map) return mapPtr; } -std::map<Mapping::key_t, Mapping::sharedPtr_t>::iterator -UPnPContext::unregisterMapping(std::map<Mapping::key_t, Mapping::sharedPtr_t>::iterator it) -{ - assert(it->second); - - CHECK_VALID_THREAD(); - auto descr = it->second->toString(); - auto& mappingList = getMappingList(it->second->getType()); - auto ret = mappingList.erase(it); - - return ret; -} - void UPnPContext::unregisterMapping(const Mapping::sharedPtr_t& map) { - CHECK_VALID_THREAD(); + //CHECK_VALID_THREAD(); if (not map) { // JAMI_ERR("Mapping pointer is null"); @@ -1256,8 +1235,6 @@ UPnPContext::getMappingStatus(MappingStatus& status) void UPnPContext::onMappingRequestFailed(const Mapping& mapRes) { - CHECK_VALID_THREAD(); - auto const& map = getMappingWithKey(mapRes.getMapKey()); if (not map) { // We may receive a response for a removed request. Just ignore it. @@ -1273,7 +1250,7 @@ UPnPContext::onMappingRequestFailed(const Mapping& mapRes) return; } - updateMappingState(map, MappingState::FAILED); + map->updateState(MappingState::FAILED); unregisterMapping(map); // JAMI_WARN("Mapping request for %s failed on IGD %s [%s]", @@ -1282,32 +1259,11 @@ UPnPContext::onMappingRequestFailed(const Mapping& mapRes) // igd->getProtocolName()); } -void -UPnPContext::updateMappingState(const Mapping::sharedPtr_t& map, MappingState newState, bool notify) -{ - CHECK_VALID_THREAD(); - - assert(map); - - // Ignore if the state did not change. - if (newState == map->getState()) { - // JAMI_DBG("Mapping %s already in state %s", map->toString().c_str(), map->getStateStr()); - return; - } - - // Update the state. - map->setState(newState); - - // Notify the listener if set. - if (notify and map->getNotifyCallback()) - map->getNotifyCallback()(map); -} - #if HAVE_LIBNATPMP void UPnPContext::renewAllocations() { - CHECK_VALID_THREAD(); + //CHECK_VALID_THREAD(); // Check if the we have valid PMP IGD. auto pmpProto = protocolList_.at(NatProtocolType::NAT_PMP);