From 7b1b0b5516e763bf9a93cb7da9fdcfb37b4607c3 Mon Sep 17 00:00:00 2001 From: Mohamed Chibani <mohamed.chibani@savoirfairelinux.com> Date: Tue, 23 Mar 2021 09:38:43 -0400 Subject: [PATCH] upnp: Connectivity change handling and clean-up This patch introduces three major changes: 1- When a "Connectivity Change" notification is received from the configuration manager, the current IGDs and the provisioned mappings were reset and a new search and provisioning was started anew. Now, the reset/restart is performed only if the host address actually changes. 2- Many code simplification and clean-up. Obsolete code used to handle asynchronous operations on libupnp was also removed. Most of the calls to libupnp are now synchronous. Blocking calls are dispatched on the internal execution queue (worker thread). 3- Add synchronization to avoid thread race at shutdown Gitlab: #416 Change-Id: I26970d78db494795f23379c7a9af5a20a665bb06 --- src/manager.cpp | 2 + src/upnp/protocol/igd.cpp | 7 +- src/upnp/protocol/igd.h | 3 + src/upnp/protocol/mapping.cpp | 23 +- src/upnp/protocol/mapping.h | 4 +- src/upnp/protocol/natpmp/nat_pmp.cpp | 273 ++++++--- src/upnp/protocol/natpmp/nat_pmp.h | 70 ++- src/upnp/protocol/natpmp/pmp_igd.cpp | 9 +- src/upnp/protocol/natpmp/pmp_igd.h | 5 +- src/upnp/protocol/pupnp/pupnp.cpp | 832 ++++++++++++--------------- src/upnp/protocol/pupnp/pupnp.h | 127 ++-- src/upnp/protocol/pupnp/upnp_igd.cpp | 3 +- src/upnp/protocol/pupnp/upnp_igd.h | 11 +- src/upnp/protocol/upnp_protocol.h | 25 +- src/upnp/upnp_context.cpp | 326 ++++++----- src/upnp/upnp_context.h | 23 +- src/upnp/upnp_control.cpp | 3 +- src/upnp/upnp_control.h | 3 +- src/upnp/upnp_thread_util.h | 7 +- 19 files changed, 899 insertions(+), 857 deletions(-) diff --git a/src/manager.cpp b/src/manager.cpp index eb6a3f8d2c..800dd4d79e 100644 --- a/src/manager.cpp +++ b/src/manager.cpp @@ -890,6 +890,8 @@ Manager::finish() noexcept pimpl_->audiodriver_.reset(); } + JAMI_DBG("Stopping schedulers and worker threads"); + // Flush remaining tasks (free lambda' with capture) pimpl_->scheduler_.stop(); dht::ThreadPool::io().join(); diff --git a/src/upnp/protocol/igd.cpp b/src/upnp/protocol/igd.cpp index 8159a89367..88cc72b68b 100644 --- a/src/upnp/protocol/igd.cpp +++ b/src/upnp/protocol/igd.cpp @@ -2,7 +2,8 @@ * Copyright (C) 2004-2021 Savoir-faire Linux Inc. * * Author: Stepan Salenikovich <stepan.salenikovich@savoirfairelinux.com> - * Author: Eden Abitbol <eden.abitbol@savoirfairelinux.com> + * Author: Eden Abitbol <eden.abitbol@savoirfairelinux.com> + * Author: Mohamed Chibani <mohamed.chibani@savoirfairelinux.com> * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -44,7 +45,7 @@ IGD::setValid(bool valid) // Reset errors counter. errorsCounter_ = 0; } else { - JAMI_WARN("IGD %s [%s] was disabled", localIp_.toString().c_str(), getProtocolName()); + JAMI_WARN("IGD %s [%s] was disabled", toString().c_str(), getProtocolName()); } } @@ -56,7 +57,7 @@ IGD::incrementErrorsCounter() if (++errorsCounter_ >= MAX_ERRORS_COUNT) { JAMI_WARN("IGD %s [%s] has too many errors, it will be disabled", - localIp_.toString().c_str(), + toString().c_str(), getProtocolName()); setValid(false); return false; diff --git a/src/upnp/protocol/igd.h b/src/upnp/protocol/igd.h index ceb523bcbe..b38b391dbd 100644 --- a/src/upnp/protocol/igd.h +++ b/src/upnp/protocol/igd.h @@ -2,6 +2,7 @@ * Copyright (C) 2004-2021 Savoir-faire Linux Inc. * * Author: Eden Abitbol <eden.abitbol@savoirfairelinux.com> + * Author: Mohamed Chibani <mohamed.chibani@savoirfairelinux.com> * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -95,6 +96,8 @@ public: bool incrementErrorsCounter(); int getErrorsCount() const; + virtual const std::string toString() const = 0; + protected: const NatProtocolType protocol_ {NatProtocolType::UNKNOWN}; std::atomic_bool valid_ {false}; diff --git a/src/upnp/protocol/mapping.cpp b/src/upnp/protocol/mapping.cpp index 58a5a367f5..0773065ff9 100644 --- a/src/upnp/protocol/mapping.cpp +++ b/src/upnp/protocol/mapping.cpp @@ -3,6 +3,7 @@ * * Author: Stepan Salenikovich <stepan.salenikovich@savoirfairelinux.com> * Author: Eden Abitbol <eden.abitbol@savoirfairelinux.com> + * Author: Mohamed Chibani <mohamed.chibani@savoirfairelinux.com> * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -34,7 +35,6 @@ Mapping::Mapping(PortType type, uint16_t portExternal, uint16_t portInternal, bo , available_(available) , state_(MappingState::PENDING) , notifyCb_(nullptr) - , timeoutTimer_() , autoUpdate_(false) #if HAVE_LIBNATPMP , renewalTime_(sys_clock::now()) @@ -171,27 +171,6 @@ Mapping::hasPublicAddress() const return igd_ and igd_->getPublicIp() and not igd_->getPublicIp().isPrivate(); } -void -Mapping::setTimeoutTimer(std::shared_ptr<Task> timer) -{ - // Cancel current timer if any. - cancelTimeoutTimer(); - - std::lock_guard<std::mutex> lock(mutex_); - timeoutTimer_ = std::move(timer); -} - -void -Mapping::cancelTimeoutTimer() -{ - std::lock_guard<std::mutex> lock(mutex_); - - if (timeoutTimer_) { - timeoutTimer_->cancel(); - timeoutTimer_.reset(); - } -} - Mapping::key_t Mapping::getMapKey() const { diff --git a/src/upnp/protocol/mapping.h b/src/upnp/protocol/mapping.h index 4388591760..44006cd650 100644 --- a/src/upnp/protocol/mapping.h +++ b/src/upnp/protocol/mapping.h @@ -2,6 +2,7 @@ * Copyright (C) 2004-2021 Savoir-faire Linux Inc. * * Author: Eden Abitbol <eden.abitbol@savoirfairelinux.com> + * Author: Mohamed Chibani <mohamed.chibani@savoirfairelinux.com> * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -124,8 +125,6 @@ private: void setAvailable(bool val); void setState(const MappingState& state); void updateDescription(); - void setTimeoutTimer(std::shared_ptr<Task> timer); - void cancelTimeoutTimer(); #if HAVE_LIBNATPMP void setRenewalTime(sys_clock::time_point time); #endif @@ -142,7 +141,6 @@ private: // Track the state of the mapping MappingState state_; NotifyCallback notifyCb_; - std::shared_ptr<Task> timeoutTimer_ {}; // If true, a new mapping will be requested on behave of the mapping // owner when the mapping state changes from "OPEN" to "FAILED". bool autoUpdate_; diff --git a/src/upnp/protocol/natpmp/nat_pmp.cpp b/src/upnp/protocol/natpmp/nat_pmp.cpp index c7d20f8905..328f8f3b86 100644 --- a/src/upnp/protocol/natpmp/nat_pmp.cpp +++ b/src/upnp/protocol/natpmp/nat_pmp.cpp @@ -2,6 +2,7 @@ * Copyright (C) 2004-2021 Savoir-faire Linux Inc. * * Author: Eden Abitbol <eden.abitbol@savoirfairelinux.com> + * Author: Mohamed Chibani <mohamed.chibani@savoirfairelinux.com> * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -28,7 +29,7 @@ namespace upnp { NatPmp::NatPmp() { JAMI_DBG("NAT-PMP: Instance [%p] created", this); - getNatpmpScheduler()->run([this] { + runOnNatPmpQueue([this] { threadId_ = getCurrentThread(); igd_ = std::make_shared<PMPIGD>(); }); @@ -43,12 +44,20 @@ void NatPmp::initNatPmp() { if (not isValidThread()) { - getNatpmpScheduler()->run([this] { initNatPmp(); }); + runOnNatPmpQueue([w = weak()] { + if (auto pmpThis = w.lock()) { + pmpThis->initNatPmp(); + } + }); return; } initialized_ = false; - hostAddress_ = ip_utils::getLocalAddr(pj_AF_INET()); + + { + std::lock_guard<std::mutex> lock(natpmpMutex_); + hostAddress_ = ip_utils::getLocalAddr(AF_INET); + } // Local address must be valid. if (not getHostAddress() or getHostAddress().isLoopback()) { @@ -112,11 +121,26 @@ NatPmp::initNatPmp() }; } +void +NatPmp::waitForShutdown() +{ + std::unique_lock<std::mutex> lk(natpmpMutex_); + if (shutdownCv_.wait_for(lk, std::chrono::seconds(10), [this] { return shutdownComplete_; })) { + JAMI_DBG("NAT-PMP: Shutdown completed"); + } else { + JAMI_ERR("NAT-PMP: Shutdown timed-out"); + } +} + void NatPmp::setObserver(UpnpMappingObserver* obs) { if (not isValidThread()) { - getNatpmpScheduler()->run([this, obs] { setObserver(obs); }); + runOnNatPmpQueue([w = weak(), obs] { + if (auto pmpThis = w.lock()) { + pmpThis->setObserver(obs); + } + }); return; } @@ -129,21 +153,48 @@ void NatPmp::terminate() { if (not isValidThread()) { - getNatpmpScheduler()->run([this] { terminate(); }); + runOnNatPmpQueue([w = weak()] { + if (auto pmpThis = w.lock()) { + pmpThis->terminate(); + } + }); + waitForShutdown(); return; } + initialized_ = false; observer_ = nullptr; + + { + std::lock_guard<std::mutex> lock(natpmpMutex_); + shutdownComplete_ = true; + } + + shutdownCv_.notify_one(); +} + +const IpAddr +NatPmp::getHostAddress() const +{ + std::lock_guard<std::mutex> lock(natpmpMutex_); + return hostAddress_; } void NatPmp::clearIgds() { if (not isValidThread()) { - getNatpmpScheduler()->run([this] { clearIgds(); }); + runOnNatPmpQueue([w = weak()] { + if (auto pmpThis = w.lock()) { + pmpThis->clearIgds(); + } + }); return; } + if (igd_) + igd_->setValid(false); + initialized_ = false; if (searchForIgdTimer_) searchForIgdTimer_->cancel(); @@ -158,7 +209,11 @@ void NatPmp::searchForIgd() { if (not isValidThread()) { - getNatpmpScheduler()->run([this] { searchForIgd(); }); + runOnNatPmpQueue([w = weak()] { + if (auto pmpThis = w.lock()) { + pmpThis->searchForIgd(); + } + }); return; } @@ -169,12 +224,15 @@ NatPmp::searchForIgd() // Schedule a retry in case init failed. if (not initialized_) { if (igdSearchCounter_++ < MAX_RESTART_SEARCH_RETRIES) { + JAMI_DBG("NAT-PMP: Start search for IGDs. Attempt %i", igdSearchCounter_); + // Cancel the current timer (if any) and re-schedule. if (searchForIgdTimer_) searchForIgdTimer_->cancel(); searchForIgdTimer_ = getNatpmpScheduler()->scheduleIn([this] { searchForIgd(); }, - TIMEOUT_BEFORE_IGD_SEARCH_RETRY); + NATPMP_SEARCH_RETRY_UNIT + * igdSearchCounter_); } else { JAMI_WARN("NAT-PMP: Setup failed after %u trials. NAT-PMP will be disabled!", MAX_RESTART_SEARCH_RETRIES); @@ -182,11 +240,14 @@ NatPmp::searchForIgd() } } -void -NatPmp::getIgdList(std::list<std::shared_ptr<IGD>>& igdList) const +std::list<std::shared_ptr<IGD>> +NatPmp::getIgdList() const { + std::lock_guard<std::mutex> lock(natpmpMutex_); + std::list<std::shared_ptr<IGD>> igdList; if (igd_->isValid()) igdList.emplace_back(igd_); + return igdList; } bool @@ -200,6 +261,7 @@ NatPmp::isReady() const // Must at least have a valid local address. if (not getHostAddress() or getHostAddress().isLoopback()) return false; + return igd_ and igd_->isValid(); } @@ -226,37 +288,78 @@ NatPmp::incrementErrorsCounter(const std::shared_ptr<IGD>& igdIn) } void -NatPmp::requestMappingAdd(const std::shared_ptr<IGD>& igd, const Mapping& mapping) +NatPmp::requestMappingAdd(const Mapping& mapping) { // Process on nat-pmp thread. - getNatpmpScheduler()->run([this, igd, mapping] { - JAMI_DBG("NAT-PMP: Request mapping %s on %s", - mapping.toString().c_str(), - igd_->getLocalIp().toString().c_str()); + if (not isValidThread()) { + runOnNatPmpQueue([w = weak(), mapping] { + if (auto pmpThis = w.lock()) { + pmpThis->requestMappingAdd(mapping); + } + }); + return; + } - Mapping map {mapping}; - addPortMapping(igd, map, false); - }); + Mapping map(mapping); + assert(map.getIgd()); + auto err = addPortMapping(map); + if (err < 0) { + JAMI_WARN("NAT-PMP: Request for mapping %s on %s failed with error %i: %s", + map.toString().c_str(), + igd_->toString().c_str(), + err, + getNatPmpErrorStr(err)); + + if (isErrorFatal(err)) { + // Fatal error, increment the counter. + incrementErrorsCounter(igd_); + } + // Notify the listener. + processMappingRequestFailed(std::move(map)); + } else { + JAMI_DBG("NAT-PMP: Request for mapping %s on %s succeeded", + map.toString().c_str(), + igd_->toString().c_str()); + // Notify the listener. + processMappingAdded(std::move(map)); + } } void NatPmp::requestMappingRenew(const Mapping& mapping) { // Process on nat-pmp thread. - getNatpmpScheduler()->run([this, mapping] { - if (not mapping.getIgd() or not mapping.getIgd()->isValid()) { - JAMI_WARN("NAT-PMP: Mapping %s has an invalid IGD. Ignoring.", - mapping.toString().c_str()); - return; - } + if (not isValidThread()) { + runOnNatPmpQueue([w = weak(), mapping] { + if (auto pmpThis = w.lock()) { + pmpThis->requestMappingRenew(mapping); + } + }); + return; + } - JAMI_DBG("NAT-PMP: Renew mapping %s on %s", - mapping.toString().c_str(), - mapping.getIgd()->getLocalIp().toString().c_str()); + Mapping map(mapping); + auto err = addPortMapping(map); + if (err < 0) { + JAMI_WARN("NAT-PMP: Renewal request for mapping %s on %s failed with error %i: %s", + map.toString().c_str(), + igd_->toString().c_str(), + err, + getNatPmpErrorStr(err)); + // Notify the listener. + processMappingRequestFailed(std::move(map)); - Mapping map {mapping}; - addPortMapping(mapping.getIgd(), map, true); - }); + if (isErrorFatal(err)) { + // Fatal error, increment the counter. + incrementErrorsCounter(igd_); + } + } else { + JAMI_DBG("NAT-PMP: Renewal request for mapping %s on %s succeeded", + map.toString().c_str(), + igd_->toString().c_str()); + // Notify the listener. + processMappingRenewed(map); + } } int @@ -323,14 +426,14 @@ NatPmp::sendMappingRequest(const Mapping& mapping, uint32_t& lifetime) if (err < 0) { JAMI_WARN("NAT-PMP: Read response on IGD %s failed with error %s", - igd_->getLocalIp().toString().c_str(), + igd_->toString().c_str(), getNatPmpErrorStr(err)); } else if (response.type != NATPMP_RESPTYPE_TCPPORTMAPPING and response.type != NATPMP_RESPTYPE_UDPPORTMAPPING) { JAMI_ERR("NAT-PMP: Unexpected response type (%i) for mapping %s from IGD %s.", response.type, mapping.toString().c_str(), - igd_->getLocalIp().toString().c_str()); + igd_->toString().c_str()); // Try to read again. continue; } @@ -343,67 +446,48 @@ NatPmp::sendMappingRequest(const Mapping& mapping, uint32_t& lifetime) return err; } -void -NatPmp::addPortMapping(const std::shared_ptr<IGD>& igdIn, Mapping& mapping, bool renew) +int +NatPmp::addPortMapping(Mapping& mapping) { + auto const& igdIn = mapping.getIgd(); assert(igdIn); assert(igdIn->getProtocol() == NatProtocolType::NAT_PMP); - if (not igdIn->isValid()) - return; - - if (not validIgdInstance(igdIn)) { - return; + if (not igdIn->isValid() or not validIgdInstance(igdIn)) { + mapping.setState(MappingState::FAILED); + return NATPMP_ERR_INVALIDARGS; } - Mapping mapToAdd(mapping); - mapToAdd.setInternalAddress(getHostAddress().toString()); - mapToAdd.setIgd(igd_); + mapping.setInternalAddress(getHostAddress().toString()); uint32_t lifetime = MAPPING_ALLOCATION_LIFETIME; int err = sendMappingRequest(mapping, lifetime); if (err < 0) { - JAMI_WARN("NAT-PMP: Add mapping request failed with error %s %i", - getNatPmpErrorStr(err), - errno); - - if (isErrorFatal(err)) { - // Fatal error, increment the counter. - incrementErrorsCounter(igd_); - } - // Mark as failed and notify. - mapToAdd.setState(MappingState::FAILED); - processMappingAdded(std::move(mapToAdd)); - } else { - // Success! Set renewal and update. - // Renewal time is set before the allocation expires. - mapToAdd.setRenewalTime(sys_clock::now() + std::chrono::seconds(lifetime * 4 / 5)); - mapToAdd.setState(MappingState::OPEN); - if (not renew) { - JAMI_DBG("NAT-PMP: Allocated mapping %s on %s", - mapToAdd.toString().c_str(), - igd_->getLocalIp().toString().c_str()); - // Notify the listener. - processMappingAdded(std::move(mapToAdd)); - } else { - JAMI_DBG("NAT-PMP: Renewed mapping %s on %s", - mapToAdd.toString().c_str(), - igd_->getLocalIp().toString().c_str()); - // Notify. - processMappingRenewed(std::move(mapToAdd)); - } + mapping.setState(MappingState::FAILED); + return err; } + + // Set the renewal time and update. + mapping.setRenewalTime(sys_clock::now() + std::chrono::seconds(lifetime * 4 / 5)); + mapping.setState(MappingState::OPEN); + + return 0; } void NatPmp::requestMappingRemove(const Mapping& mapping) { // Process on nat-pmp thread. - getNatpmpScheduler()->run([this, mapping] { - Mapping map {mapping}; - removePortMapping(map); - }); + if (not isValidThread()) { + runOnNatPmpQueue([w = weak(), mapping] { + if (auto pmpThis = w.lock()) { + Mapping map {mapping}; + pmpThis->removePortMapping(map); + } + }); + return; + } } void @@ -425,6 +509,7 @@ NatPmp::removePortMapping(Mapping& mapping) int err = sendMappingRequest(mapping, lifetime); if (err < 0) { + // Nothing to do if the request fails, just log the error. JAMI_WARN("NAT-PMP: Send remove request failed with error %s. Ignoring", getNatPmpErrorStr(err)); } @@ -443,7 +528,7 @@ NatPmp::getIgdPublicAddress() // have one already. if (igd_->getPublicIp()) { JAMI_WARN("NAT-PMP: IGD %s already have a public address (%s)", - igd_->getLocalIp().toString().c_str(), + igd_->toString().c_str(), igd_->getPublicIp().toString().c_str()); return; } @@ -453,7 +538,7 @@ NatPmp::getIgdPublicAddress() if (err < 0) { JAMI_ERR("NAT-PMP: send public address request on IGD %s failed with error: %s", - igd_->getLocalIp().toString().c_str(), + igd_->toString().c_str(), getNatPmpErrorStr(err)); if (isErrorFatal(err)) { @@ -468,7 +553,7 @@ NatPmp::getIgdPublicAddress() if (err < 0) { JAMI_ERR("NAT-PMP: read response on IGD %s failed with error %s", - igd_->getLocalIp().toString().c_str(), + igd_->toString().c_str(), getNatPmpErrorStr(err)); return; } @@ -476,7 +561,7 @@ NatPmp::getIgdPublicAddress() if (response.type != NATPMP_RESPTYPE_PUBLICADDRESS) { JAMI_ERR("NAT-PMP: Unexpected response type (%i) for public address request from IGD %s.", response.type, - igd_->getLocalIp().toString().c_str()); + igd_->toString().c_str()); return; } @@ -484,7 +569,7 @@ NatPmp::getIgdPublicAddress() if (not publicAddr) { JAMI_ERR("NAT-PMP: IGD %s returned an invalid public address %s", - igd_->getLocalIp().toString().c_str(), + igd_->toString().c_str(), publicAddr.toString().c_str()); } @@ -493,7 +578,7 @@ NatPmp::getIgdPublicAddress() igd_->setValid(true); JAMI_DBG("NAT-PMP: Setting IGD %s public address to %s", - igd_->getLocalIp().toString().c_str(), + igd_->toString().c_str(), igd_->getPublicIp().toString().c_str()); } @@ -503,7 +588,7 @@ NatPmp::removeAllMappings() CHECK_VALID_THREAD(); JAMI_WARN("NAT-PMP: Send request to close all existing mappings to IGD %s", - igd_->getLocalIp().toString().c_str()); + igd_->toString().c_str()); int err = sendnewportmappingrequest(&natpmpHdl_, NATPMP_PROTOCOL_TCP, 0, 0, 0); if (err < 0) { @@ -616,9 +701,9 @@ bool NatPmp::validIgdInstance(const std::shared_ptr<IGD>& igdIn) { if (igd_.get() != igdIn.get()) { - JAMI_ERR("NAT-PMP: IGD (%s) does not match in local instance (%s)", - igdIn->getLocalIp().toString().c_str(), - igd_->getLocalIp().toString().c_str()); + JAMI_ERR("NAT-PMP: IGD (%s) does not match local instance (%s)", + igdIn->toString().c_str(), + igd_->toString().c_str()); return false; } @@ -636,7 +721,7 @@ NatPmp::processIgdUpdate(UpnpIgdEvent event) if (observer_ == nullptr) return; // Process the response on the context thread. - runOnUpnpContextThread([obs = observer_, igd = igd_, event] { obs->onIgdUpdated(igd, event); }); + runOnUpnpContextQueue([obs = observer_, igd = igd_, event] { obs->onIgdUpdated(igd, event); }); } void @@ -646,7 +731,17 @@ NatPmp::processMappingAdded(const Mapping& map) return; // Process the response on the context thread. - runOnUpnpContextThread([obs = observer_, igd = igd_, map] { obs->onMappingAdded(igd, map); }); + runOnUpnpContextQueue([obs = observer_, igd = igd_, map] { obs->onMappingAdded(igd, map); }); +} + +void +NatPmp::processMappingRequestFailed(const Mapping& map) +{ + if (observer_ == nullptr) + return; + + // Process the response on the context thread. + runOnUpnpContextQueue([obs = observer_, igd = igd_, map] { obs->onMappingRequestFailed(map); }); } void @@ -656,7 +751,7 @@ NatPmp::processMappingRenewed(const Mapping& map) return; // Process the response on the context thread. - runOnUpnpContextThread([obs = observer_, igd = igd_, map] { obs->onMappingRenewed(igd, map); }); + runOnUpnpContextQueue([obs = observer_, igd = igd_, map] { obs->onMappingRenewed(igd, map); }); } void @@ -666,7 +761,7 @@ NatPmp::processMappingRemoved(const Mapping& map) return; // Process the response on the context thread. - runOnUpnpContextThread([obs = observer_, igd = igd_, map] { obs->onMappingRemoved(igd, map); }); + runOnUpnpContextQueue([obs = observer_, igd = igd_, map] { obs->onMappingRemoved(igd, map); }); } } // namespace upnp diff --git a/src/upnp/protocol/natpmp/nat_pmp.h b/src/upnp/protocol/natpmp/nat_pmp.h index fe2bf22956..66b2ab5816 100644 --- a/src/upnp/protocol/natpmp/nat_pmp.h +++ b/src/upnp/protocol/natpmp/nat_pmp.h @@ -2,6 +2,7 @@ * Copyright (C) 2004-2021 Savoir-faire Linux Inc. * * Author: Eden Abitbol <eden.abitbol@savoirfairelinux.com> + * Author: Mohamed Chibani <mohamed.chibani@savoirfairelinux.com> * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -24,8 +25,8 @@ #include "config.h" #endif -#include "../upnp_protocol.h" -#include "../igd.h" +#include "upnp/protocol/upnp_protocol.h" +#include "upnp/protocol/igd.h" #include "pmp_igd.h" #include "logger.h" @@ -55,8 +56,8 @@ constexpr static unsigned int MAX_RESTART_SEARCH_RETRIES {5}; constexpr static auto TIMEOUT_BEFORE_READ_RETRY {std::chrono::milliseconds(300)}; // Max number of read attempts before failure. constexpr static unsigned int MAX_READ_RETRIES {5}; -// Time-out between two successive IGD search. -constexpr static auto TIMEOUT_BEFORE_IGD_SEARCH_RETRY {std::chrono::seconds(60)}; +// Base unit for the timeout between two successive IGD search. +constexpr static auto NATPMP_SEARCH_RETRY_UNIT {std::chrono::seconds(15)}; class NatPmp : public UPnPProtocol { @@ -80,58 +81,75 @@ public: void searchForIgd() override; // Get the IGD list. - void getIgdList(std::list<std::shared_ptr<IGD>>& igdList) const override; + std::list<std::shared_ptr<IGD>> getIgdList() const override; // Return true if it has at least one valid IGD. bool isReady() const override; - // Increment errors counter. - void incrementErrorsCounter(const std::shared_ptr<IGD>& igd) override; - // Request a new mapping. - void requestMappingAdd(const std::shared_ptr<IGD>& igd, const Mapping& mapping) override; + void requestMappingAdd(const Mapping& mapping) override; // Renew an allocated mapping. void requestMappingRenew(const Mapping& mapping) override; // Removes a mapping. - void requestMappingRemove(const Mapping& igdMapping) override; + void requestMappingRemove(const Mapping& mapping) override; + + // Get the host (local) address. + const IpAddr getHostAddress() const override; // Terminate. Nothing to do here, the clean-up is done when // the IGD is cleared. void terminate() override; private: + NON_COPYABLE(NatPmp); + + std::weak_ptr<NatPmp> weak() { return std::static_pointer_cast<NatPmp>(shared_from_this()); } + + // Helpers to run tasks on NAT-PMP internal execution queue. + ScheduledExecutor* getNatpmpScheduler() { return &natpmpScheduler_; } + template<typename Callback> + void runOnNatPmpQueue(Callback&& cb) + { + natpmpScheduler_.run([cb = std::forward<Callback>(cb)]() mutable { cb(); }); + } + + // Helpers to run tasks on UPNP context execution queue. + ScheduledExecutor* getUpnContextScheduler() { return UpnpThreadUtil::getScheduler(); } + void initNatPmp(); + void waitForShutdown(); void getIgdPublicAddress(); void removeAllMappings(); int readResponse(natpmp_t& handle, natpmpresp_t& response); int sendMappingRequest(const Mapping& mapping, uint32_t& lifetime); // Adds a port mapping. - void addPortMapping(const std::shared_ptr<IGD>& igd, Mapping& mapping, bool renew); + int addPortMapping(Mapping& mapping); // Removes a port mapping. void removePortMapping(Mapping& mapping); + // True if the error is fatal. bool isErrorFatal(int error); + // Gets NAT-PMP error code string. + const char* getNatPmpErrorStr(int errorCode) const; // Get local getaway. std::unique_ptr<IpAddr> getLocalGateway() const; - // Helpers to process user callbacks + // Helpers to process user's callbacks void processIgdUpdate(UpnpIgdEvent event); void processMappingAdded(const Mapping& map); + void processMappingRequestFailed(const Mapping& map); void processMappingRenewed(const Mapping& map); void processMappingRemoved(const Mapping& map); -private: - NON_COPYABLE(NatPmp); + // Check if the IGD has a local match + bool validIgdInstance(const std::shared_ptr<IGD>& igdIn); - // Gets NAT-PMP error code string. - const char* getNatPmpErrorStr(int errorCode) const; + // Increment errors counter. + void incrementErrorsCounter(const std::shared_ptr<IGD>& igd); - ScheduledExecutor* getNatpmpScheduler() { return &natpmpScheduler_; } - ScheduledExecutor* getUpnContextScheduler() { return UpnpThreadUtil::getScheduler(); } - bool validIgdInstance(const std::shared_ptr<IGD>& igdIn); std::atomic_bool initialized_ {false}; // Data members @@ -140,6 +158,20 @@ private: ScheduledExecutor natpmpScheduler_ {}; std::shared_ptr<Task> searchForIgdTimer_ {}; unsigned int igdSearchCounter_ {0}; + UpnpMappingObserver* observer_ {nullptr}; + IpAddr hostAddress_ {}; + + // Calls from other threads that does not need synchronous access are + // rescheduled on the NatPmp private queue. This will avoid the need to + // protect most of the data members of this class. + // For some internal members (such as the igd instance and the host + // address) that need to be synchronously accessed, are protected by + // this mutex. + mutable std::mutex natpmpMutex_; + + // Shutdown synchronization + std::condition_variable shutdownCv_ {}; + bool shutdownComplete_ {false}; }; } // namespace upnp diff --git a/src/upnp/protocol/natpmp/pmp_igd.cpp b/src/upnp/protocol/natpmp/pmp_igd.cpp index cfb6cd2bd3..5942da388b 100644 --- a/src/upnp/protocol/natpmp/pmp_igd.cpp +++ b/src/upnp/protocol/natpmp/pmp_igd.cpp @@ -2,7 +2,8 @@ * Copyright (C) 2004-2021 Savoir-faire Linux Inc. * * Author: Stepan Salenikovich <stepan.salenikovich@savoirfairelinux.com> - * Author: Eden Abitbol <eden.abitbol@savoirfairelinux.com> + * Author: Eden Abitbol <eden.abitbol@savoirfairelinux.com> + * Author: Mohamed Chibani <mohamed.chibani@savoirfairelinux.com> * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -52,5 +53,11 @@ PMPIGD::operator==(PMPIGD& other) const return getPublicIp() == other.getPublicIp() and getLocalIp() == other.getLocalIp(); } +const std::string +PMPIGD::toString() const +{ + return getLocalIp().toString(); +} + } // namespace upnp } // namespace jami diff --git a/src/upnp/protocol/natpmp/pmp_igd.h b/src/upnp/protocol/natpmp/pmp_igd.h index d987979ea8..bf49b2480b 100644 --- a/src/upnp/protocol/natpmp/pmp_igd.h +++ b/src/upnp/protocol/natpmp/pmp_igd.h @@ -2,7 +2,8 @@ * Copyright (C) 2004-2021 Savoir-faire Linux Inc. * * Author: Stepan Salenikovich <stepan.salenikovich@savoirfairelinux.com> - * Author: Eden Abitbol <eden.abitbol@savoirfairelinux.com> + * Author: Eden Abitbol <eden.abitbol@savoirfairelinux.com> + * Author: Mohamed Chibani <mohamed.chibani@savoirfairelinux.com> * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -49,6 +50,8 @@ public: bool operator==(IGD& other) const; bool operator==(PMPIGD& other) const; + + const std::string toString() const override; }; } // namespace upnp diff --git a/src/upnp/protocol/pupnp/pupnp.cpp b/src/upnp/protocol/pupnp/pupnp.cpp index 20aef2bede..0d43d00a73 100644 --- a/src/upnp/protocol/pupnp/pupnp.cpp +++ b/src/upnp/protocol/pupnp/pupnp.cpp @@ -4,6 +4,7 @@ * Author: Stepan Salenikovich <stepan.salenikovich@savoirfairelinux.com> * Author: Eden Abitbol <eden.abitbol@savoirfairelinux.com> * Author: Adrien Béraud <adrien.beraud@savoirfairelinux.com> + * Author: Mohamed Chibani <mohamed.chibani@savoirfairelinux.com> * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -98,8 +99,11 @@ errorOnResponse(IXML_Document* doc) PUPnP::PUPnP() { - JAMI_DBG("PUPnP: Instance [%p] created", this); - threadId_ = getCurrentThread(); + JAMI_DBG("PUPnP: Creating instance [%p] ...", this); + runOnPUPnPQueue([this] { + threadId_ = getCurrentThread(); + JAMI_DBG("PUPnP: Instance [%p] created", this); + }); } PUPnP::~PUPnP() @@ -151,6 +155,26 @@ PUPnP::initUpnpLib() initialized_ = true; } +void +PUPnP::waitForShutdown() +{ + std::unique_lock<std::mutex> lk(pupnpMutex_); + if (shutdownCv_.wait_for(lk, std::chrono::seconds(10), [this] { return shutdownComplete_; })) { + JAMI_DBG("PUPnP: Shutdown completed"); + } else { + JAMI_ERR("PUPnP: Shutdown timed-out"); + // Force stop if the shutdown take too much time. + shutdownComplete_ = true; + } +} + +bool +PUPnP::isRunning() const +{ + std::unique_lock<std::mutex> lk(pupnpMutex_); + return not shutdownComplete_; +} + void PUPnP::registerClient() { @@ -159,7 +183,6 @@ PUPnP::registerClient() CHECK_VALID_THREAD(); // Register Upnp control point. - std::unique_lock<std::mutex> lk(ctrlptMutex_); int upnp_err = UpnpRegisterClient(ctrlPtCallback, this, &ctrlptHandle_); if (upnp_err != UPNP_E_SUCCESS) { JAMI_ERR("PUPnP: Can't register client: %s", UpnpGetErrorMessage(upnp_err)); @@ -170,143 +193,117 @@ PUPnP::registerClient() } void -PUPnP::startPUPnP() +PUPnP::setObserver(UpnpMappingObserver* obs) { - JAMI_DBG("PUPnP: Starting PUPNP internal thread"); - - pupnpThread_ = std::thread([this] { - JAMI_DBG("PUPnP: Internal thread started"); - pupnpRun_ = true; - while (pupnpRun_) { - { - std::unique_lock<std::mutex> lk(igdListMutex_); - pupnpCv_.wait(lk, [this] { - return not pupnpRun_ or searchForIgd_ or not dwnldlXmlList_.empty(); - }); + if (not isValidThread()) { + runOnPUPnPQueue([w = weak(), obs] { + if (auto upnpThis = w.lock()) { + upnpThis->setObserver(obs); } + }); + return; + } - if (not pupnpRun_) - break; - - if (clientRegistered_) { - if (searchForIgd_.exchange(false)) { - searchForDevices(); - } - - std::unique_lock<std::mutex> lk(igdListMutex_); - if (not dwnldlXmlList_.empty()) { - auto xmlList = std::move(dwnldlXmlList_); - decltype(xmlList) finished {}; - - // Wait on futures asynchronously - lk.unlock(); - for (auto it = xmlList.begin(); it != xmlList.end();) { - if (it->wait_for(std::chrono::seconds(1)) == std::future_status::ready) { - finished.splice(finished.end(), xmlList, it++); - } else { - ++it; - } - } - lk.lock(); - - // Move back timed-out items to list - dwnldlXmlList_.splice(dwnldlXmlList_.begin(), xmlList); - // Handle successful downloads - for (auto& item : finished) { - auto result = item.get(); - if (not result->document or not validateIgd(*result)) { - discoveredIgdList_.erase(result->location); - } - } - } - for (auto it = cancelXmlList_.begin(); it != cancelXmlList_.end();) { - if (it->wait_for(std::chrono::seconds(0)) == std::future_status::ready) { - it = cancelXmlList_.erase(it); - } else { - ++it; - } - } - } - } + JAMI_DBG("PUPnP: Setting observer to %p", obs); - JAMI_DBG("PUPnP: Internal thread stopped"); - }); + observer_ = obs; } -void -PUPnP::setObserver(UpnpMappingObserver* obs) +const IpAddr +PUPnP::getHostAddress() const { - CHECK_VALID_THREAD(); - - JAMI_DBG("PUPnP: Setting observer to %p", obs); - - observer_ = obs; + std::lock_guard<std::mutex> lock(pupnpMutex_); + return hostAddress_; } void PUPnP::terminate() { - CHECK_VALID_THREAD(); + if (not isValidThread()) { + runOnPUPnPQueue([w = weak()] { + if (auto upnpThis = w.lock()) { + upnpThis->terminate(); + } + }); + waitForShutdown(); + return; + } JAMI_DBG("PUPnP: Terminate instance %p", this); clientRegistered_ = false; observer_ = nullptr; - { - std::lock_guard<std::mutex> lock(validIgdListMutex_); - std::lock_guard<std::mutex> lk(ctrlptMutex_); - - UpnpUnRegisterClient(ctrlptHandle_); - } + UpnpUnRegisterClient(ctrlptHandle_); if (UpnpFinish() != UPNP_E_SUCCESS) { JAMI_ERR("PUPnP: Failed to properly close lib-upnp"); } - pupnpRun_ = false; - // Notify thread to terminate. - pupnpCv_.notify_all(); - if (pupnpThread_.joinable()) - pupnpThread_.join(); - // Clear all the lists. + discoveredIgdList_.clear(); + { - std::lock_guard<std::mutex> lock(validIgdListMutex_); + std::lock_guard<std::mutex> lock(pupnpMutex_); validIgdList_.clear(); + shutdownComplete_ = true; } - { - std::lock_guard<std::mutex> lk2(igdListMutex_); - discoveredIgdList_.clear(); - dwnldlXmlList_.clear(); - cancelXmlList_.clear(); - } + shutdownCv_.notify_one(); } void PUPnP::searchForDevices() { - std::unique_lock<std::mutex> lk(ctrlptMutex_); + CHECK_VALID_THREAD(); JAMI_DBG("PUPnP: Send IGD search request"); // Send out search for multiple types of devices, as some routers may possibly // only reply to one. - UpnpSearchAsync(ctrlptHandle_, SEARCH_TIMEOUT, UPNP_ROOT_DEVICE, this); - UpnpSearchAsync(ctrlptHandle_, SEARCH_TIMEOUT, UPNP_IGD_DEVICE, this); - UpnpSearchAsync(ctrlptHandle_, SEARCH_TIMEOUT, UPNP_WANIP_SERVICE, this); - UpnpSearchAsync(ctrlptHandle_, SEARCH_TIMEOUT, UPNP_WANPPP_SERVICE, this); + + auto err = UpnpSearchAsync(ctrlptHandle_, SEARCH_TIMEOUT, UPNP_ROOT_DEVICE, this); + if (err != UPNP_E_SUCCESS) { + JAMI_WARN("PUPnP: Send search for UPNP_ROOT_DEVICE failed. Error %d: %s", + err, + UpnpGetErrorMessage(err)); + } + + err = UpnpSearchAsync(ctrlptHandle_, SEARCH_TIMEOUT, UPNP_IGD_DEVICE, this); + if (err != UPNP_E_SUCCESS) { + JAMI_WARN("PUPnP: Send search for UPNP_IGD_DEVICE failed. Error %d: %s", + err, + UpnpGetErrorMessage(err)); + } + + err = UpnpSearchAsync(ctrlptHandle_, SEARCH_TIMEOUT, UPNP_WANIP_SERVICE, this); + if (err != UPNP_E_SUCCESS) { + JAMI_WARN("PUPnP: Send search for UPNP_WANIP_SERVICE failed. Error %d: %s", + err, + UpnpGetErrorMessage(err)); + } + + err = UpnpSearchAsync(ctrlptHandle_, SEARCH_TIMEOUT, UPNP_WANPPP_SERVICE, this); + if (err != UPNP_E_SUCCESS) { + JAMI_WARN("PUPnP: Send search for UPNP_WANPPP_SERVICE failed. Error %d: %s", + err, + UpnpGetErrorMessage(err)); + } } void PUPnP::clearIgds() { - JAMI_DBG("PUPnP: clearing IGDs and devices lists"); - - CHECK_VALID_THREAD(); + if (not isValidThread()) { + runOnPUPnPQueue([w = weak()] { + if (auto upnpThis = w.lock()) { + upnpThis->clearIgds(); + } + }); + return; + } - hostAddress_ = {}; + JAMI_DBG("PUPnP: clearing IGDs and devices lists"); if (searchForIgdTimer_) searchForIgdTimer_->cancel(); @@ -314,29 +311,34 @@ PUPnP::clearIgds() igdSearchCounter_ = 0; { - std::lock_guard<std::mutex> lock(validIgdListMutex_); + std::lock_guard<std::mutex> lock(pupnpMutex_); + for (auto const& igd : validIgdList_) { + igd->setValid(false); + } validIgdList_.clear(); + hostAddress_ = {}; } - { - std::lock_guard<std::mutex> lk(igdListMutex_); - - // Clear all internal lists. - cancelXmlList_.splice(cancelXmlList_.end(), dwnldlXmlList_); - discoveredIgdList_.clear(); - } + discoveredIgdList_.clear(); } void PUPnP::searchForIgd() { - CHECK_VALID_THREAD(); + if (not isValidThread()) { + runOnPUPnPQueue([w = weak()] { + if (auto upnpThis = w.lock()) { + upnpThis->searchForIgd(); + } + }); + return; + } // Update local address before searching. - hostAddress_ = ip_utils::getLocalAddr(pj_AF_INET()); + updateHostAddress(); if (isReady()) { - JAMI_WARN("PUPnP: Already have a valid IGD. Skipping the search request"); + JAMI_DBG("PUPnP: Already have a valid IGD. Skip the search request"); return; } @@ -348,14 +350,10 @@ PUPnP::searchForIgd() JAMI_DBG("PUPnP: Start search for IGD: attempt %u", igdSearchCounter_); - if (not pupnpRun_) { - startPUPnP(); - } - // Do not init if the host is not valid. Otherwise, the init will fail // anyway and may put libupnp in an unstable state (mainly deadlocks) // even if the UpnpFinish() method is called. - if (not hostAddress_ or hostAddress_.isLoopback()) { + if (not hasValidHostAddress()) { JAMI_WARN("PUPnP: Host address is invalid. Skipping the IGD search"); } else { // Init and register if needed @@ -365,19 +363,19 @@ PUPnP::searchForIgd() if (initialized_ and not clientRegistered_) { registerClient(); } - } - - // Start search - if (clientRegistered_ and pupnpRun_) { - assert(initialized_); - JAMI_DBG("PUPnP: Start search for IGD"); - searchForIgd_ = true; - pupnpCv_.notify_one(); - } else { - JAMI_WARN("PUPnP: PUPNP not fully setup. Skipping the IGD search"); + // Start searching + if (clientRegistered_) { + assert(initialized_); + searchForDevices(); + } else { + JAMI_WARN("PUPnP: PUPNP not fully setup. Skipping the IGD search"); + } } // Cancel the current timer (if any) and re-schedule. + // The connectivity change may be received while the the local + // interface is not fully setup. The rescheduling typically + // usefull to mitigate this race. if (searchForIgdTimer_) searchForIgdTimer_->cancel(); @@ -386,19 +384,21 @@ PUPnP::searchForIgd() if (auto upnpThis = w.lock()) upnpThis->searchForIgd(); }, - PUPNP_TIMEOUT_BEFORE_IGD_SEARCH_RETRY); + PUPNP_SEARCH_RETRY_UNIT * igdSearchCounter_); } -void -PUPnP::getIgdList(std::list<std::shared_ptr<IGD>>& igdList) const +std::list<std::shared_ptr<IGD>> +PUPnP::getIgdList() const { - std::lock_guard<std::mutex> lock(validIgdListMutex_); + std::lock_guard<std::mutex> lock(pupnpMutex_); + std::list<std::shared_ptr<IGD>> igdList; for (auto& it : validIgdList_) { // Return only active IGDs. if (it->isValid()) { igdList.emplace_back(it); } } + return igdList; } bool @@ -414,6 +414,7 @@ PUPnP::isReady() const bool PUPnP::hasValidIgd() const { + std::lock_guard<std::mutex> lock(pupnpMutex_); for (auto& it : validIgdList_) { if (it->isValid()) { return true; @@ -422,21 +423,24 @@ PUPnP::hasValidIgd() const return false; } -bool -PUPnP::updateAndCheckHostAddress() +void +PUPnP::updateHostAddress() { - if (hostAddress_ and not hostAddress_.isLoopback()) - return true; - - hostAddress_ = ip_utils::getLocalAddr(pj_AF_INET()); + std::lock_guard<std::mutex> lock(pupnpMutex_); + hostAddress_ = ip_utils::getLocalAddr(AF_INET); +} +bool +PUPnP::hasValidHostAddress() +{ + std::lock_guard<std::mutex> lock(pupnpMutex_); return hostAddress_ and not hostAddress_.isLoopback(); } void PUPnP::incrementErrorsCounter(const std::shared_ptr<IGD>& igd) { - if (not igd->isValid()) + if (not igd or not igd->isValid()) return; if (not igd->incrementErrorsCounter()) { // Disable this IGD. @@ -448,9 +452,14 @@ PUPnP::incrementErrorsCounter(const std::shared_ptr<IGD>& igd) } bool -PUPnP::validateIgd(const IGDInfo& info) +PUPnP::validateIgd(const std::string& location, IXML_Document* doc_container_ptr) { - auto descDoc = info.document.get(); + CHECK_VALID_THREAD(); + + assert(doc_container_ptr != nullptr); + + XMLDocument document(doc_container_ptr, ixmlDocument_free); + auto descDoc = document.get(); // Check device type. std::string deviceType = getFirstDocItem(descDoc, "deviceType"); if (deviceType.empty()) { @@ -463,7 +472,7 @@ PUPnP::validateIgd(const IGDInfo& info) return false; } - std::shared_ptr<UPnPIGD> igd_candidate = parseIgd(descDoc, info.location); + std::shared_ptr<UPnPIGD> igd_candidate = parseIgd(descDoc, location); if (not igd_candidate) { // No valid IGD candidate. return false; @@ -523,7 +532,7 @@ PUPnP::validateIgd(const IGDInfo& info) { // Add the IGD if not already present in the list. - std::lock_guard<std::mutex> lock(validIgdListMutex_); + std::lock_guard<std::mutex> lock(pupnpMutex_); for (auto& igd : validIgdList_) { // Must not be a null pointer assert(igd.get() != nullptr); @@ -531,7 +540,7 @@ PUPnP::validateIgd(const IGDInfo& info) JAMI_DBG("PUPnP: Device [%s] with int/ext addresses [%s:%s] is already in the list " "of valid IGDs", igd_candidate->getUID().c_str(), - igd_candidate->getLocalIp().toString().c_str(), + igd_candidate->toString().c_str(), igd_candidate->getPublicIp().toString().c_str()); return true; } @@ -545,22 +554,9 @@ PUPnP::validateIgd(const IGDInfo& info) igd_candidate->getUID().c_str()); JAMI_DBG("PUPnP: New IGD addresses [int: %s - ext: %s]", - igd_candidate->getLocalIp().toString().c_str(), + igd_candidate->toString().c_str(), igd_candidate->getPublicIp().toString().c_str()); - { - // This is a new IGD, move it the list. - std::lock_guard<std::mutex> lock(validIgdListMutex_); - validIgdList_.emplace_back(igd_candidate); - } - - // Clear mappings from previous instances. - deleteMappingsByDescription(igd_candidate, Mapping::UPNP_MAPPING_DESCRIPTION_PREFIX); - - // Report to the listener. - if (observer_) - observer_->onIgdUpdated(igd_candidate, UpnpIgdEvent::ADDED); - // Subscribe to IGD events. int upnp_err = UpnpSubscribeAsync(ctrlptHandle_, eventSub.c_str(), @@ -572,180 +568,137 @@ PUPnP::validateIgd(const IGDInfo& info) igd_candidate->getUID().c_str(), upnp_err, UpnpGetErrorMessage(upnp_err)); + // return false; } else { - JAMI_DBG("PUPnP: Successfully sent subscribe request to %s", - igd_candidate->getUID().c_str()); + JAMI_DBG("PUPnP: Successfully subscribed to IGD %s", igd_candidate->getUID().c_str()); } + { + // This is a new (and hopefully valid) IGD. + std::lock_guard<std::mutex> lock(pupnpMutex_); + validIgdList_.emplace_back(igd_candidate); + } + + // Report to the listener. + runOnUpnpContextQueue([w = weak(), igd_candidate] { + if (auto upnpThis = w.lock()) { + if (upnpThis->observer_) + upnpThis->observer_->onIgdUpdated(igd_candidate, UpnpIgdEvent::ADDED); + } + }); + return true; } void -PUPnP::requestMappingAdd(const std::shared_ptr<IGD>& igd, const Mapping& mapping) +PUPnP::requestMappingAdd(const Mapping& mapping) { - if (auto pupnp_igd = std::dynamic_pointer_cast<UPnPIGD>(igd)) { - dht::ThreadPool::io().run([w = weak(), pupnp_igd, mapping] { - if (auto upnpThis = w.lock()) - upnpThis->actionAddPortMapping(pupnp_igd, mapping); - }); - } + runOnPUPnPQueue([w = weak(), mapping] { + if (auto upnpThis = w.lock()) { + if (not upnpThis->isRunning()) + return; + Mapping mapRes(mapping); + if (upnpThis->actionAddPortMapping(mapRes)) { + mapRes.setState(MappingState::OPEN); + mapRes.setInternalAddress(upnpThis->getHostAddress().toString()); + upnpThis->processAddMapAction(mapRes); + } else { + upnpThis->incrementErrorsCounter(mapRes.getIgd()); + mapRes.setState(MappingState::FAILED); + upnpThis->processRequestMappingFailure(mapRes); + } + } + }); } -bool -PUPnP::actionAddPortMappingAsync(const std::shared_ptr<UPnPIGD>& igd, const Mapping& mapping) +void +PUPnP::requestMappingRemove(const Mapping& mapping) { - if (not clientRegistered_) { - return false; - } - XMLDocument action(nullptr, ixmlDocument_free); // Action pointer. - IXML_Document* action_container_ptr = nullptr; - // Set action sequence. - UpnpAddToAction(&action_container_ptr, - ACTION_ADD_PORT_MAPPING, - igd->getServiceType().c_str(), - "NewRemoteHost", - ""); - UpnpAddToAction(&action_container_ptr, - ACTION_ADD_PORT_MAPPING, - igd->getServiceType().c_str(), - "NewExternalPort", - mapping.getExternalPortStr().c_str()); - UpnpAddToAction(&action_container_ptr, - ACTION_ADD_PORT_MAPPING, - igd->getServiceType().c_str(), - "NewProtocol", - mapping.getTypeStr()); - UpnpAddToAction(&action_container_ptr, - ACTION_ADD_PORT_MAPPING, - igd->getServiceType().c_str(), - "NewInternalPort", - mapping.getInternalPortStr().c_str()); - UpnpAddToAction(&action_container_ptr, - ACTION_ADD_PORT_MAPPING, - igd->getServiceType().c_str(), - "NewInternalClient", - getHostAddress().toString().c_str()); - UpnpAddToAction(&action_container_ptr, - ACTION_ADD_PORT_MAPPING, - igd->getServiceType().c_str(), - "NewEnabled", - "1"); - UpnpAddToAction(&action_container_ptr, - ACTION_ADD_PORT_MAPPING, - igd->getServiceType().c_str(), - "NewPortMappingDescription", - mapping.toString().c_str()); - UpnpAddToAction(&action_container_ptr, - ACTION_ADD_PORT_MAPPING, - igd->getServiceType().c_str(), - "NewLeaseDuration", - "0"); - action.reset(action_container_ptr); + // Send remove request using the matching IGD + runOnPUPnPQueue([w = weak(), mapping] { + if (auto upnpThis = w.lock()) { + // Abort if we are shutting down. + if (not upnpThis->isRunning()) + return; + if (upnpThis->actionDeletePortMapping(mapping)) { + upnpThis->processRemoveMapAction(mapping); + } else { + assert(mapping.getIgd()); + // Dont need to report in case of failure. + upnpThis->incrementErrorsCounter(mapping.getIgd()); + } + } + }); +} - int upnp_err = UpnpSendActionAsync(ctrlptHandle_, - igd->getControlURL().c_str(), - igd->getServiceType().c_str(), - nullptr, - action.get(), - ctrlPtCallback, - this); - if (upnp_err != UPNP_E_SUCCESS) { - JAMI_WARN("PUPnP: Failed to send async action %s from: %s, %d: %s", - ACTION_ADD_PORT_MAPPING, - igd->getServiceType().c_str(), - upnp_err, - UpnpGetErrorMessage(upnp_err)); - return false; +std::shared_ptr<UPnPIGD> +PUPnP::findMatchingIgd(const std::string& ctrlURL) const +{ + std::lock_guard<std::mutex> lock(pupnpMutex_); + + auto iter = std::find_if(validIgdList_.begin(), + validIgdList_.end(), + [&ctrlURL](const std::shared_ptr<IGD>& igd) { + if (auto upnpIgd = std::dynamic_pointer_cast<UPnPIGD>(igd)) { + return upnpIgd->getControlURL() == ctrlURL; + } + return false; + }); + + if (iter == validIgdList_.end()) { + JAMI_WARN("PUPnP: Did not find the IGD matching ctrl URL [%s]", ctrlURL.c_str()); + return {}; } - JAMI_DBG("PUPnP: Sent request to open port %s", mapping.toString().c_str()); - - return true; + return std::dynamic_pointer_cast<UPnPIGD>(*iter); } void -PUPnP::processAddMapAction(const std::string& ctrlURL, - uint16_t ePort, - uint16_t iPort, - PortType portType) +PUPnP::processAddMapAction(const Mapping& map) { - Mapping mapToAdd(portType, ePort, iPort); + CHECK_VALID_THREAD(); - { - std::lock_guard<std::mutex> lock(validIgdListMutex_); - for (auto const& it : validIgdList_) { - if (auto igd = std::dynamic_pointer_cast<UPnPIGD>(it)) { - if (igd->getControlURL() == ctrlURL) { - mapToAdd.setInternalAddress(getHostAddress().toString()); - mapToAdd.setIgd(igd); - break; - } - } - } - } + if (observer_ == nullptr) + return; - if (mapToAdd.getIgd()) { - JAMI_DBG("PUPnP: Opened port %s", mapToAdd.toString().c_str()); - if (observer_) - observer_->onMappingAdded(mapToAdd.getIgd(), std::move(mapToAdd)); - } else { - JAMI_WARN("PUPnP: Did not find matching ctrl URL [%s] for %s", - ctrlURL.c_str(), - mapToAdd.toString().c_str()); - } + runOnUpnpContextQueue([w = weak(), map] { + if (auto upnpThis = w.lock()) { + JAMI_DBG("PUPnP: Opened mapping %s", map.toString().c_str()); + if (upnpThis->observer_) + upnpThis->observer_->onMappingAdded(map.getIgd(), std::move(map)); + } + }); } void -PUPnP::processRemoveMapAction(const std::string& ctrlURL, - uint16_t ePort, - uint16_t iPort, - PortType portType) +PUPnP::processRequestMappingFailure(const Mapping& map) { - Mapping mapToRemove(portType, ePort, iPort); + CHECK_VALID_THREAD(); - { - std::lock_guard<std::mutex> lock(validIgdListMutex_); - for (auto const& it : validIgdList_) { - if (auto igd = std::dynamic_pointer_cast<UPnPIGD>(it)) { - if (igd->getControlURL() == ctrlURL) { - mapToRemove.setIgd(igd); - break; - } - } - } - } + if (observer_ == nullptr) + return; - if (mapToRemove.getIgd()) { - JAMI_DBG("PUPnP: Closed port %s", mapToRemove.toString().c_str()); - if (observer_) - observer_->onMappingRemoved(mapToRemove.getIgd(), std::move(mapToRemove)); - } else { - JAMI_WARN("PUPnP: Did not find matching ctrl URL [%s] for %s", - ctrlURL.c_str(), - mapToRemove.toString().c_str()); - } + runOnUpnpContextQueue([w = weak(), map] { + if (auto upnpThis = w.lock()) { + JAMI_DBG("PUPnP: Failed to request mapping %s", map.toString().c_str()); + if (upnpThis->observer_) + upnpThis->observer_->onMappingRequestFailed(map); + } + }); } void -PUPnP::requestMappingRemove(const Mapping& mapping) +PUPnP::processRemoveMapAction(const Mapping& map) { - auto igd = std::dynamic_pointer_cast<UPnPIGD>(mapping.getIgd()); + CHECK_VALID_THREAD(); - if (not igd) + if (observer_ == nullptr) return; - std::lock_guard<std::mutex> lock(validIgdListMutex_); - for (auto const& it : validIgdList_) { - if (std::dynamic_pointer_cast<UPnPIGD>(it) == igd) { - // Send remove request using the matching IGD - dht::ThreadPool::io().run([w = weak(), igd, mapping] { - if (auto upnpThis = w.lock()) { - upnpThis->actionDeletePortMapping(igd, mapping); - } - }); - break; - } - } + runOnUpnpContextQueue([map, obs = observer_] { + JAMI_DBG("PUPnP: Closed mapping %s", map.toString().c_str()); + obs->onMappingRemoved(map.getIgd(), std::move(map)); + }); } const char* @@ -833,23 +786,29 @@ PUPnP::processDiscoverySearchResult(const std::string& cpDeviceId, const std::string& igdLocationUrl, const IpAddr& dstAddr) { - // Must first have a valid local address. - if (not updateAndCheckHostAddress()) { + CHECK_VALID_THREAD(); + + // Update host address if needed. + if (not hasValidHostAddress()) + updateHostAddress(); + + // The host address must be valid to proceed. + if (not hasValidHostAddress()) { JAMI_WARN("PUPnP: Local address is invalid. Ignore search result for now!"); return; } dht::http::Url url(igdLocationUrl); - std::lock_guard<std::mutex> lk(igdListMutex_); - // Use the device ID and the URL as ID. This is necessary as some // IGDs may have the same device ID but different URLs. auto igdId = cpDeviceId + " url: " + igdLocationUrl; - if (not discoveredIgdList_.emplace(igdId).second) + if (not discoveredIgdList_.emplace(igdId).second) { + // JAMI_WARN("PUPnP: IGD [%s] already in the list", igdId.c_str()); return; + } JAMI_DBG("PUPnP: Discovered a new IGD [%s]", igdId.c_str()); @@ -866,48 +825,52 @@ PUPnP::processDiscoverySearchResult(const std::string& cpDeviceId, return; } - dwnldlXmlList_.emplace_back(dht::ThreadPool::io().get<pIGDInfo>([this, - location = std::move( - igdLocationUrl)] { - IXML_Document* doc_container_ptr = nullptr; - XMLDocument doc_desc(nullptr, ixmlDocument_free); - int upnp_err = UpnpDownloadXmlDoc(location.c_str(), &doc_container_ptr); - doc_desc.reset(doc_container_ptr); + // Run a separate thread to prevent blocking this thread + // if the IGD HTTP server is not responsive. + dht::ThreadPool::io().run([w = weak(), igdLocationUrl] { + if (auto upnpThis = w.lock()) { + upnpThis->downLoadIgdDescription(igdLocationUrl); + } + }); +} - pupnpCv_.notify_all(); +void +PUPnP::downLoadIgdDescription(const std::string& locationUrl) +{ + IXML_Document* doc_container_ptr = nullptr; + int upnp_err = UpnpDownloadXmlDoc(locationUrl.c_str(), &doc_container_ptr); - if (upnp_err != UPNP_E_SUCCESS or not doc_desc) { - JAMI_WARN("PUPnP: Error downloading device XML document from %s -> %s", - location.c_str(), - UpnpGetErrorMessage(upnp_err)); - return std::make_unique<IGDInfo>( - IGDInfo {std::move(location), XMLDocument(nullptr, ixmlDocument_free)}); - } else { - JAMI_DBG("PUPnP: Succeeded to download device XML document from %s", location.c_str()); - return std::make_unique<IGDInfo>(IGDInfo {std::move(location), std::move(doc_desc)}); - } - })); + if (upnp_err != UPNP_E_SUCCESS or not doc_container_ptr) { + JAMI_WARN("PUPnP: Error downloading device XML document from %s -> %s", + locationUrl.c_str(), + UpnpGetErrorMessage(upnp_err)); + } else { + JAMI_DBG("PUPnP: Succeeded to download device XML document from %s", locationUrl.c_str()); + runOnPUPnPQueue([w = weak(), url = locationUrl, doc_container_ptr] { + if (auto upnpThis = w.lock()) { + upnpThis->validateIgd(url, doc_container_ptr); + } + }); + } } void PUPnP::processDiscoveryAdvertisementByebye(const std::string& cpDeviceId) { - // Remove device Id from list. - { - std::lock_guard<std::mutex> lk(igdListMutex_); - discoveredIgdList_.erase(cpDeviceId); - } + CHECK_VALID_THREAD(); + + discoveredIgdList_.erase(cpDeviceId); std::shared_ptr<IGD> igd; { - std::lock_guard<std::mutex> lk(validIgdListMutex_); + std::lock_guard<std::mutex> lk(pupnpMutex_); for (auto it = validIgdList_.begin(); it != validIgdList_.end();) { if ((*it)->getUID() == cpDeviceId) { igd = *it; JAMI_DBG("PUPnP: Received [%s] for IGD [%s] %s. Will be removed.", PUPnP::eventTypeToString(UPNP_DISCOVERY_ADVERTISEMENT_BYEBYE), igd->getUID().c_str(), - igd->getLocalIp().toString().c_str()); + igd->toString().c_str()); igd->setValid(false); // Remove the IGD. it = validIgdList_.erase(it); @@ -927,15 +890,16 @@ PUPnP::processDiscoveryAdvertisementByebye(const std::string& cpDeviceId) void PUPnP::processDiscoverySubscriptionExpired(Upnp_EventType event_type, const std::string& eventSubUrl) { - std::lock_guard<std::mutex> lk(validIgdListMutex_); + CHECK_VALID_THREAD(); + + std::lock_guard<std::mutex> lk(pupnpMutex_); for (auto& it : validIgdList_) { if (auto igd = std::dynamic_pointer_cast<UPnPIGD>(it)) { if (igd->getEventSubURL() == eventSubUrl) { JAMI_DBG("PUPnP: Received [%s] event for IGD [%s] %s. Request a new subscribe.", PUPnP::eventTypeToString(event_type), igd->getUID().c_str(), - igd->getLocalIp().toString().c_str()); - std::lock_guard<std::mutex> lk1(ctrlptMutex_); + igd->toString().c_str()); UpnpSubscribeAsync(ctrlptHandle_, eventSubUrl.c_str(), UPNP_INFINITE, @@ -969,10 +933,10 @@ PUPnP::handleCtrlPtUPnPEvents(Upnp_EventType event_type, const void* event) std::string deviceId {UpnpDiscovery_get_DeviceID_cstr(d_event)}; std::string location {UpnpDiscovery_get_Location_cstr(d_event)}; IpAddr dstAddr(*(const pj_sockaddr*) (UpnpDiscovery_get_DestAddr(d_event))); - runOnUpnpContextThread([w = weak(), - deviceId = std::move(deviceId), - location = std::move(location), - dstAddr = std::move(dstAddr)] { + runOnPUPnPQueue([w = weak(), + deviceId = std::move(deviceId), + location = std::move(location), + dstAddr = std::move(dstAddr)] { if (auto upnpThis = w.lock()) { upnpThis->processDiscoverySearchResult(deviceId, location, dstAddr); } @@ -985,7 +949,7 @@ PUPnP::handleCtrlPtUPnPEvents(Upnp_EventType event_type, const void* event) std::string deviceId(UpnpDiscovery_get_DeviceID_cstr(d_event)); // Process the response on the main thread. - runOnUpnpContextThread([w = weak(), deviceId = std::move(deviceId)] { + runOnPUPnPQueue([w = weak(), deviceId = std::move(deviceId)] { if (auto upnpThis = w.lock()) { upnpThis->processDiscoveryAdvertisementByebye(deviceId); } @@ -1016,7 +980,7 @@ PUPnP::handleCtrlPtUPnPEvents(Upnp_EventType event_type, const void* event) std::string publisherUrl(UpnpEventSubscribe_get_PublisherUrl_cstr(es_event)); // Process the response on the main thread. - runOnUpnpContextThread([w = weak(), event_type, publisherUrl = std::move(publisherUrl)] { + runOnPUPnPQueue([w = weak(), event_type, publisherUrl = std::move(publisherUrl)] { if (auto upnpThis = w.lock()) { upnpThis->processDiscoverySubscriptionExpired(event_type, publisherUrl); } @@ -1057,52 +1021,6 @@ PUPnP::handleCtrlPtUPnPEvents(Upnp_EventType event_type, const void* event) } else { JAMI_WARN("PUPnP: Action Result document not found"); } - - auto upnpString = UpnpActionComplete_get_CtrlUrl(a_event); - - std::string ctrlUrl {UpnpString_get_String(upnpString), - UpnpString_get_Length(upnpString)}; - - char* xmlbuff = ixmlPrintNode((IXML_Node*) actionRequest); - if (xmlbuff != nullptr) { - auto ctrlAction = getAction(xmlbuff); - ixmlFreeDOMString(xmlbuff); - - // Parse the response. - std::string ePortStr(getFirstDocItem(actionRequest, "NewExternalPort")); - std::string iPortStr(getFirstDocItem(actionRequest, "NewInternalPort")); - std::string portTypeStr(getFirstDocItem(actionRequest, "NewProtocol")); - - ixmlDocument_free(actionRequest); - - uint16_t ePort = ePortStr.empty() ? 0 : std::stoi(ePortStr); - uint16_t iPort = iPortStr.empty() ? 0 : std::stoi(iPortStr); - PortType portType = portTypeStr == "UDP" ? upnp::PortType::UDP - : upnp::PortType::TCP; - - // Process the response on the main thread. - runOnUpnpContextThread([w = weak(), - ctrlAction = std::move(ctrlAction), - ctrlUrl = std::move(ctrlUrl), - ePort, - iPort, - portType] { - auto upnpThis = w.lock(); - if (not upnpThis) - return; - switch (ctrlAction) { - case CtrlAction::ADD_PORT_MAPPING: - upnpThis->processAddMapAction(ctrlUrl, ePort, iPort, portType); - break; - case CtrlAction::DELETE_PORT_MAPPING: - upnpThis->processRemoveMapAction(ctrlUrl, ePort, iPort, portType); - break; - default: - // All other control actions are ignored. - break; - } - }); - } } break; } @@ -1125,7 +1043,7 @@ PUPnP::subEventCallback(Upnp_EventType event_type, const void* event, void* user } int -PUPnP::handleSubscriptionUPnPEvent(Upnp_EventType event_type, const void* event) +PUPnP::handleSubscriptionUPnPEvent(Upnp_EventType, const void* event) { UpnpEventSubscribe* es_event = static_cast<UpnpEventSubscribe*>(const_cast<void*>(event)); @@ -1157,7 +1075,7 @@ PUPnP::parseIgd(IXML_Document* doc, std::string locationUrl) JAMI_WARN("PUPnP: could not find UDN in description document of device"); return nullptr; } else { - std::lock_guard<std::mutex> lk(validIgdListMutex_); + std::lock_guard<std::mutex> lk(pupnpMutex_); for (auto& it : validIgdList_) { if (it->getUID() == UDN) { // We already have this device in our list. @@ -1380,7 +1298,7 @@ PUPnP::getMappingsListByDescr(const std::shared_ptr<IGD>& igd, const std::string std::map<Mapping::key_t, Mapping> mapList; - if (not(clientRegistered_ and upnpIgd->getLocalIp())) + if (not clientRegistered_ or not upnpIgd->isValid() or not upnpIgd->getLocalIp()) return mapList; // Set action name. @@ -1477,7 +1395,7 @@ PUPnP::getMappingsListByDescr(const std::shared_ptr<IGD>& igd, const std::string JAMI_DBG("PUPnP: Found %lu allocated mappings on IGD %s", mapList.size(), - upnpIgd->getLocalIp().toString().c_str()); + upnpIgd->toString().c_str()); return mapList; } @@ -1489,7 +1407,7 @@ PUPnP::deleteMappingsByDescription(const std::shared_ptr<IGD>& igd, const std::s return; JAMI_DBG("PUPnP: Remove all mappings (if any) on IGD %s matching descr prefix %s", - igd->getLocalIp().toString().c_str(), + igd->toString().c_str(), Mapping::UPNP_MAPPING_DESCRIPTION_PREFIX); auto mapList = getMappingsListByDescr(igd, description); @@ -1500,11 +1418,23 @@ PUPnP::deleteMappingsByDescription(const std::shared_ptr<IGD>& igd, const std::s } bool -PUPnP::actionAddPortMapping(const std::shared_ptr<UPnPIGD>& igd, const Mapping& mapping) +PUPnP::actionAddPortMapping(const Mapping& mapping) { + CHECK_VALID_THREAD(); + if (not clientRegistered_) return false; + auto igdIn = std::dynamic_pointer_cast<UPnPIGD>(mapping.getIgd()); + if (not igdIn) + return false; + + // The requested IGD must be present in the list of local valid IGDs. + auto igd = findMatchingIgd(igdIn->getControlURL()); + + if (not igd or not igd->isValid()) + return false; + // Action and response pointers. XMLDocument action(nullptr, ixmlDocument_free); IXML_Document* action_container_ptr = nullptr; @@ -1563,99 +1493,57 @@ PUPnP::actionAddPortMapping(const std::shared_ptr<UPnPIGD>& igd, const Mapping& &response_container_ptr); response.reset(response_container_ptr); + JAMI_DBG("PUPnP: Sent request to open port %s", mapping.toString().c_str()); + + bool success = true; + if (upnp_err != UPNP_E_SUCCESS) { - JAMI_WARN("PUPnP: Failed to send action %s from: %s, %d: %s", + JAMI_WARN("PUPnP: Failed to send action %s for mapping %s. %d: %s", ACTION_ADD_PORT_MAPPING, - igd->getServiceType().c_str(), + mapping.toString().c_str(), upnp_err, UpnpGetErrorMessage(upnp_err)); - return false; - } + JAMI_WARN("PUPnP: IGD ctrlUrl %s", igd->getControlURL().c_str()); + JAMI_WARN("PUPnP: IGD service type %s", igd->getServiceType().c_str()); - if (not response) { - JAMI_WARN("PUPnP: Failed to get response from %s", ACTION_ADD_PORT_MAPPING); - return false; + success = false; } - // Check if there is an error code. + // Check if an error has occurred. std::string errorCode = getFirstDocItem(response.get(), "errorCode"); if (not errorCode.empty()) { - std::string errorDescription = getFirstDocItem(response.get(), "errorDescription"); - JAMI_WARN("PUPnP: %s returned with error: %s: %s", + success = false; + // Try to get the error description. + std::string errorDescription; + if (response) { + errorDescription = getFirstDocItem(response.get(), "errorDescription"); + } + + JAMI_WARN("PUPnP: %s returned with error: %s %s", ACTION_ADD_PORT_MAPPING, errorCode.c_str(), errorDescription.c_str()); - return false; } - - JAMI_DBG("PUPnP: Sent request to open port %s", mapping.toString().c_str()); - - // Process the response on the main thread. - runOnUpnpContextThread([w = weak(), mapping, igd] { - auto upnpThis = w.lock(); - if (not upnpThis) - return; - upnpThis->processAddMapAction(igd->getControlURL(), - mapping.getExternalPort(), - mapping.getInternalPort(), - mapping.getType()); - }); - - return true; + return success; } bool -PUPnP::actionDeletePortMappingAsync(const UPnPIGD& igd, - const std::string& port_external, - const std::string& protocol) +PUPnP::actionDeletePortMapping(const Mapping& mapping) { - if (not clientRegistered_) { + CHECK_VALID_THREAD(); + + if (not clientRegistered_) return false; - } - XMLDocument action(nullptr, ixmlDocument_free); // Action pointer. - IXML_Document* action_container_ptr = nullptr; - // Set action sequence. - UpnpAddToAction(&action_container_ptr, - ACTION_DELETE_PORT_MAPPING, - igd.getServiceType().c_str(), - "NewRemoteHost", - ""); - UpnpAddToAction(&action_container_ptr, - ACTION_DELETE_PORT_MAPPING, - igd.getServiceType().c_str(), - "NewExternalPort", - port_external.c_str()); - UpnpAddToAction(&action_container_ptr, - ACTION_DELETE_PORT_MAPPING, - igd.getServiceType().c_str(), - "NewProtocol", - protocol.c_str()); - action.reset(action_container_ptr); - int upnp_err = UpnpSendActionAsync(ctrlptHandle_, - igd.getControlURL().c_str(), - igd.getServiceType().c_str(), - nullptr, - action.get(), - ctrlPtCallback, - this); - if (upnp_err != UPNP_E_SUCCESS) { - JAMI_WARN("PUPnP: Failed to send async action %s from: %s, %d: %s", - ACTION_DELETE_PORT_MAPPING, - igd.getServiceType().c_str(), - upnp_err, - UpnpGetErrorMessage(upnp_err)); + + auto igdIn = std::dynamic_pointer_cast<UPnPIGD>(mapping.getIgd()); + if (not igdIn) return false; - } - JAMI_DBG("PUPnP: Sent request to close port %s %s", port_external.c_str(), protocol.c_str()); - return true; -} -bool -PUPnP::actionDeletePortMapping(const std::shared_ptr<UPnPIGD>& igd, const Mapping& mapping) -{ - if (not clientRegistered_) { + // The requested IGD must be present in the list of local valid IGDs. + auto igd = findMatchingIgd(igdIn->getControlURL()); + + if (not igd or not igd->isValid()) return false; - } // Action and response pointers. XMLDocument action(nullptr, ixmlDocument_free); @@ -1690,18 +1578,23 @@ PUPnP::actionDeletePortMapping(const std::shared_ptr<UPnPIGD>& igd, const Mappin &response_container_ptr); response.reset(response_container_ptr); + bool success = true; + if (upnp_err != UPNP_E_SUCCESS) { - JAMI_WARN("PUPnP: Failed to send action %s from: %s, %d: %s", + JAMI_WARN("PUPnP: Failed to send action %s for mapping from %s. %d: %s", ACTION_DELETE_PORT_MAPPING, - igd->getServiceType().c_str(), + mapping.toString().c_str(), upnp_err, UpnpGetErrorMessage(upnp_err)); - return false; + JAMI_WARN("PUPnP: IGD ctrlUrl %s", igd->getControlURL().c_str()); + JAMI_WARN("PUPnP: IGD service type %s", igd->getServiceType().c_str()); + + success = false; } if (not response) { - JAMI_WARN("PUPnP: Failed to get response from %s", ACTION_DELETE_PORT_MAPPING); - return false; + JAMI_WARN("PUPnP: Failed to get response for %s", ACTION_DELETE_PORT_MAPPING); + success = false; } // Check if there is an error code. @@ -1712,23 +1605,12 @@ PUPnP::actionDeletePortMapping(const std::shared_ptr<UPnPIGD>& igd, const Mappin ACTION_DELETE_PORT_MAPPING, errorCode.c_str(), errorDescription.c_str()); - return false; + success = false; } JAMI_DBG("PUPnP: Sent request to close port %s", mapping.toString().c_str()); - // Process the response on the main thread. - runOnUpnpContextThread([w = weak(), mapping, igd] { - auto upnpThis = w.lock(); - if (not upnpThis) - return; - upnpThis->processRemoveMapAction(igd->getControlURL(), - mapping.getExternalPort(), - mapping.getInternalPort(), - mapping.getType()); - }); - - return true; + return success; } } // namespace upnp diff --git a/src/upnp/protocol/pupnp/pupnp.h b/src/upnp/protocol/pupnp/pupnp.h index e490f5ffd6..a5aa6940bb 100644 --- a/src/upnp/protocol/pupnp/pupnp.h +++ b/src/upnp/protocol/pupnp/pupnp.h @@ -3,6 +3,7 @@ * * Author: Stepan Salenikovich <stepan.salenikovich@savoirfairelinux.com> * Author: Eden Abitbol <eden.abitbol@savoirfairelinux.com> + * Author: Mohamed Chibani <mohamed.chibani@savoirfairelinux.com> * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -67,22 +68,18 @@ namespace upnp { constexpr static int ARRAY_IDX_INVALID = 713; constexpr static int CONFLICT_IN_MAPPING = 718; -// Timeout values (in seconds). +// IGD search timeout (in seconds). constexpr static unsigned int SEARCH_TIMEOUT {60}; // Max number of IGD search attempts before failure. constexpr static unsigned int PUPNP_MAX_RESTART_SEARCH_RETRIES {5}; -// Time-out between two successive IGD search. -constexpr static auto PUPNP_TIMEOUT_BEFORE_IGD_SEARCH_RETRY {std::chrono::seconds(60)}; +// Base unit for the timeout between two successive IGD search. +constexpr static auto PUPNP_SEARCH_RETRY_UNIT {std::chrono::seconds(15)}; class PUPnP : public UPnPProtocol { public: using XMLDocument = std::unique_ptr<IXML_Document, decltype(ixmlDocument_free)&>; - struct IGDInfo - { - std::string location; - XMLDocument document; - }; + enum class CtrlAction { UNKNOWN, ADD_PORT_MAPPING, @@ -92,8 +89,6 @@ public: GET_EXTERNAL_IP_ADDRESS }; - using pIGDInfo = std::unique_ptr<IGDInfo>; - PUPnP(); ~PUPnP(); @@ -113,20 +108,17 @@ public: void searchForIgd() override; // Get the IGD list. - void getIgdList(std::list<std::shared_ptr<IGD>>& igdList) const override; + std::list<std::shared_ptr<IGD>> getIgdList() const override; // Return true if the it's fully setup. bool isReady() const override; - // Increment IGD errors counter. - void incrementErrorsCounter(const std::shared_ptr<IGD>& igd) override; - // Get from the IGD the list of already allocated mappings if any. std::map<Mapping::key_t, Mapping> getMappingsListByDescr( const std::shared_ptr<IGD>& igd, const std::string& descr) const override; // Request a new mapping. - void requestMappingAdd(const std::shared_ptr<IGD>& igd, const Mapping& mapping) override; + void requestMappingAdd(const Mapping& mapping) override; // Renew an allocated mapping. // Not implemented. Currently, UPNP allocations do not have expiration time. @@ -135,50 +127,75 @@ public: // Removes a mapping. void requestMappingRemove(const Mapping& igdMapping) override; + // Get the host (local) address. + const IpAddr getHostAddress() const override; + + // Terminate the instance. void terminate() override; private: NON_COPYABLE(PUPnP); + // Helpers to run tasks on PUPNP private execution queue. + ScheduledExecutor* getPUPnPScheduler() { return &pupnpScheduler_; } + template<typename Callback> + void runOnPUPnPQueue(Callback&& cb) + { + pupnpScheduler_.run([cb = std::forward<Callback>(cb)]() mutable { cb(); }); + } + + // Helper to run tasks on UPNP context execution queue. ScheduledExecutor* getUpnContextScheduler() { return UpnpThreadUtil::getScheduler(); } // Init lib-upnp void initUpnpLib(); + // Block until shutdown is complete or time-out. + void waitForShutdown(); + + // Return true if running. + bool isRunning() const; + // Register the client void registerClient(); - // Start the internal thread. - void startPUPnP(); - // Start search for UPNP devices void searchForDevices(); // Return true if it has at least one valid IGD. bool hasValidIgd() const; - // Update and check the host (local) address. Returns true - // if the address is valid. - bool updateAndCheckHostAddress(); + // Update the host (local) address. + void updateHostAddress(); + + // Check the host (local) address. + // Returns true if the address is valid. + bool hasValidHostAddress(); // Delete mappings matching the description void deleteMappingsByDescription(const std::shared_ptr<IGD>& igd, const std::string& description); + // Search for the IGD in the local list of known IGDs. + std::shared_ptr<UPnPIGD> findMatchingIgd(const std::string& ctrlURL) const; + // Process the reception of an add mapping action answer. - void processAddMapAction(const std::string& ctrlURL, - uint16_t ePort, - uint16_t iPort, - PortType portType); + void processAddMapAction(const Mapping& map); + + // Process the a mapping request failure. + void processRequestMappingFailure(const Mapping& map); // Process the reception of a remove mapping action answer. - void processRemoveMapAction(const std::string& ctrlURL, - uint16_t ePort, - uint16_t iPort, - PortType portType); + void processRemoveMapAction(const Mapping& map); + + // Increment IGD errors counter. + void incrementErrorsCounter(const std::shared_ptr<IGD>& igd); + + // Download XML document. + void downLoadIgdDescription(const std::string& url); // Validate IGD from the xml document received from the router. - bool validateIgd(const IGDInfo&); + bool validateIgd(const std::string& location, IXML_Document* doc_container_ptr); // Returns control point action callback based on xml node. static CtrlAction getAction(const char* xmlNode); @@ -217,46 +234,54 @@ private: // Parses the IGD candidate. std::unique_ptr<UPnPIGD> parseIgd(IXML_Document* doc, std::string locationUrl); - // These functions directly create UPnP actions and make synchronous UPnP control point calls. - // Assumes mutex is already locked. + // These functions directly create UPnP actions and make synchronous UPnP + // control point calls. Must be run on the PUPNP internal execution queue. bool actionIsIgdConnected(const UPnPIGD& igd); IpAddr actionGetExternalIP(const UPnPIGD& igd); + bool actionAddPortMapping(const Mapping& mapping); + bool actionDeletePortMapping(const Mapping& mapping); - bool actionAddPortMapping(const std::shared_ptr<UPnPIGD>& igd, const Mapping& mapping); - bool actionAddPortMappingAsync(const std::shared_ptr<UPnPIGD>& igd, const Mapping& mapping); - bool actionDeletePortMappingAsync(const UPnPIGD& igd, - const std::string& port_external, - const std::string& protocol); - bool actionDeletePortMapping(const std::shared_ptr<UPnPIGD>& igd, const Mapping& mapping); // Event type to string static const char* eventTypeToString(Upnp_EventType eventType); std::weak_ptr<PUPnP> weak() { return std::static_pointer_cast<PUPnP>(shared_from_this()); } + // Execution queue to run lib upnp actions + ScheduledExecutor pupnpScheduler_ {}; + // Initialization status. std::atomic_bool initialized_ {false}; // Client registration status. std::atomic_bool clientRegistered_ {false}; - std::condition_variable pupnpCv_ {}; // Condition variable for thread-safe signaling. - std::atomic_bool pupnpRun_ {false}; // Variable to allow the thread to run. - std::thread pupnpThread_ {}; // PUPnP thread for non-blocking client registration. std::shared_ptr<Task> searchForIgdTimer_ {}; unsigned int igdSearchCounter_ {0}; - mutable std::mutex validIgdListMutex_; - std::list<std::shared_ptr<IGD>> validIgdList_; // List of valid IGDs. + // List of discovered IGDs. + std::set<std::string> discoveredIgdList_; + + // Control point handle. + UpnpClient_Handle ctrlptHandle_ {-1}; + + // Observer to report the results. + UpnpMappingObserver* observer_ {nullptr}; + + // List of valid IGDs. + std::list<std::shared_ptr<IGD>> validIgdList_; - std::set<std::string> discoveredIgdList_; // UDN list of discovered IGDs. - std::list<std::future<pIGDInfo>> - dwnldlXmlList_; // List of futures for blocking xml download function calls. - std::list<std::future<pIGDInfo>> cancelXmlList_; // List of abandoned documents + // Current host address. + IpAddr hostAddress_ {}; - mutable std::mutex igdListMutex_; // Mutex used to protect IGD instances. - std::mutex ctrlptMutex_; // Mutex for client handle protection. - UpnpClient_Handle ctrlptHandle_ {-1}; // Control point handle. + // Calls from other threads that does not need synchronous access are + // rescheduled on the UPNP private queue. This will avoid the need to + // protect most of the data members of this class. + // For some internal members (namely the validIgdList and the hostAddress) + // that need to be synchronously accessed, are protected by this mutex. + mutable std::mutex pupnpMutex_; - std::atomic_bool searchForIgd_ {false}; // Variable to signal thread for a search. + // Shutdown synchronization + std::condition_variable shutdownCv_ {}; + bool shutdownComplete_ {false}; }; } // namespace upnp diff --git a/src/upnp/protocol/pupnp/upnp_igd.cpp b/src/upnp/protocol/pupnp/upnp_igd.cpp index 1b5f24bd1a..b1d02e3ce5 100644 --- a/src/upnp/protocol/pupnp/upnp_igd.cpp +++ b/src/upnp/protocol/pupnp/upnp_igd.cpp @@ -2,7 +2,8 @@ * Copyright (C) 2004-2021 Savoir-faire Linux Inc. * * Author: Stepan Salenikovich <stepan.salenikovich@savoirfairelinux.com> - * Author: Eden Abitbol <eden.abitbol@savoirfairelinux.com> + * Author: Eden Abitbol <eden.abitbol@savoirfairelinux.com> + * Author: Mohamed Chibani <mohamed.chibani@savoirfairelinux.com> * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by diff --git a/src/upnp/protocol/pupnp/upnp_igd.h b/src/upnp/protocol/pupnp/upnp_igd.h index e8b3352a62..cfb13a0ed5 100644 --- a/src/upnp/protocol/pupnp/upnp_igd.h +++ b/src/upnp/protocol/pupnp/upnp_igd.h @@ -2,7 +2,8 @@ * Copyright (C) 2004-2021 Savoir-faire Linux Inc. * * Author: Stepan Salenikovich <stepan.salenikovich@savoirfairelinux.com> - * Author: Eden Abitbol <eden.abitbol@savoirfairelinux.com> + * Author: Eden Abitbol <eden.abitbol@savoirfairelinux.com> + * Author: Mohamed Chibani <mohamed.chibani@savoirfairelinux.com> * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -52,7 +53,12 @@ public: std::string&& eventSubURL, IpAddr&& localIp = {}, IpAddr&& publicIp = {}); + ~UPnPIGD() {} + + bool operator==(IGD& other) const; + bool operator==(UPnPIGD& other) const; + const std::string& getBaseURL() const { std::lock_guard<std::mutex> lock(mutex_); @@ -89,8 +95,7 @@ public: return eventSubURL_; } - bool operator==(IGD& other) const; - bool operator==(UPnPIGD& other) const; + const std::string toString() const override { return controlURL_; } private: std::string baseURL_ {}; diff --git a/src/upnp/protocol/upnp_protocol.h b/src/upnp/protocol/upnp_protocol.h index ce229b2f13..a14a77f1a1 100644 --- a/src/upnp/protocol/upnp_protocol.h +++ b/src/upnp/protocol/upnp_protocol.h @@ -1,7 +1,8 @@ /* * Copyright (C) 2004-2021 Savoir-faire Linux Inc. * - * Author: Eden Abitbol <eden.abitbol@savoirfairelinux.com> + * Author: Eden Abitbol <eden.abitbol@savoirfairelinux.com> + * Author: Mohamed Chibani <mohamed.chibani@savoirfairelinux.com> * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -58,6 +59,11 @@ constexpr static const char* UPNP_WANPPP_SERVICE enum class UpnpIgdEvent { ADDED, REMOVED, INVALID_STATE }; +// Interface used to report mapping event from the protocol implementations. +// This interface is meant to be implemented only by UPnPConext class. Sincce +// this class is a singleton, it's assumed that it out-lives the protocol +// implementations. In other words, the observer is always assumed to point to a +// valid instance. class UpnpMappingObserver { public: @@ -66,6 +72,7 @@ public: virtual void onIgdUpdated(const std::shared_ptr<IGD>& igd, UpnpIgdEvent event) = 0; virtual void onMappingAdded(const std::shared_ptr<IGD>& igd, const Mapping& map) = 0; + virtual void onMappingRequestFailed(const Mapping& map) = 0; #if HAVE_LIBNATPMP virtual void onMappingRenewed(const std::shared_ptr<IGD>& igd, const Mapping& map) = 0; #endif @@ -94,14 +101,11 @@ public: virtual void searchForIgd() = 0; // Get the IGD instance. - virtual void getIgdList(std::list<std::shared_ptr<IGD>>& igdList) const = 0; + virtual std::list<std::shared_ptr<IGD>> getIgdList() const = 0; // Return true if it has at least one valid IGD. virtual bool isReady() const = 0; - // Increment IGD errors counter. - virtual void incrementErrorsCounter(const std::shared_ptr<IGD>& igd) = 0; - // Get the list of already allocated mappings if any. virtual std::map<Mapping::key_t, Mapping> getMappingsListByDescr(const std::shared_ptr<IGD>&, const std::string&) const @@ -110,7 +114,7 @@ public: } // Sends a request to add a mapping. - virtual void requestMappingAdd(const std::shared_ptr<IGD>& igd, const Mapping& map) = 0; + virtual void requestMappingAdd(const Mapping& map) = 0; // Renew an allocated mapping. virtual void requestMappingRenew(const Mapping& mapping) = 0; @@ -121,16 +125,11 @@ public: // Set the user callbacks. virtual void setObserver(UpnpMappingObserver* obs) = 0; - // Get the host (local) address. - const IpAddr& getHostAddress() const { return hostAddress_; } + // Get the current host (local) address + virtual const IpAddr getHostAddress() const = 0; // Terminate virtual void terminate() = 0; - -protected: - // The host (local) address. Must be fully set before making any request. - IpAddr hostAddress_ {}; - UpnpMappingObserver* observer_ {nullptr}; }; } // namespace upnp diff --git a/src/upnp/upnp_context.cpp b/src/upnp/upnp_context.cpp index f4031a92db..cbb3c6d1e5 100644 --- a/src/upnp/upnp_context.cpp +++ b/src/upnp/upnp_context.cpp @@ -3,6 +3,7 @@ * * Author: Stepan Salenikovich <stepan.salenikovich@savoirfairelinux.com> * Author: Eden Abitbol <eden.abitbol@savoirfairelinux.com> + * Author: Mohamed Chibani <mohamed.chibani@savoirfairelinux.com> * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -24,8 +25,6 @@ namespace jami { namespace upnp { -constexpr static auto NAT_MAP_REQUEST_TIMEOUT_UNIT = std::chrono::seconds(1); -constexpr static auto PUPNP_MAP_REQUEST_TIMEOUT_UNIT = std::chrono::seconds(5); constexpr static auto MAP_UPDATE_INTERVAL = std::chrono::seconds(30); constexpr static int MAX_REQUEST_RETRIES = 20; constexpr static int MAX_REQUEST_REMOVE_COUNT = 5; @@ -44,7 +43,7 @@ UPnPContext::UPnPContext() portRange_.emplace(PortType::UDP, std::make_pair(UPNP_UDP_PORT_MIN, UPNP_UDP_PORT_MAX)); if (not isValidThread()) { - runOnUpnpContextThread([this] { init(); }); + runOnUpnpContextQueue([this] { init(); }); return; } } @@ -61,7 +60,8 @@ void UPnPContext::shutdown() { if (not isValidThread()) { - runOnUpnpContextThread([this] { shutdown(); }); + runOnUpnpContextQueue([this] { shutdown(); }); + waitForShutdown(); return; } @@ -72,12 +72,29 @@ UPnPContext::shutdown() for (auto const& [_, proto] : protocolList_) proto->terminate(); - std::lock_guard<std::mutex> lock(mappingMutex_); - mappingList_->clear(); - if (mappingListUpdateTimer_) - mappingListUpdateTimer_->cancel(); - controllerList_.clear(); - protocolList_.clear(); + { + std::lock_guard<std::mutex> lock(mappingMutex_); + mappingList_->clear(); + if (mappingListUpdateTimer_) + mappingListUpdateTimer_->cancel(); + controllerList_.clear(); + protocolList_.clear(); + shutdownComplete_ = true; + } + + shutdownCv_.notify_one(); +} + +void +UPnPContext::waitForShutdown() +{ + JAMI_DBG("Waiting for shutdown ..."); + std::unique_lock<std::mutex> lk(mappingMutex_); + if (shutdownCv_.wait_for(lk, std::chrono::seconds(30), [this] { return shutdownComplete_; })) { + JAMI_DBG("Shutdown completed"); + } else { + JAMI_ERR("Shutdown timed-out"); + } } UPnPContext::~UPnPContext() @@ -125,7 +142,7 @@ void UPnPContext::stopUpnp(bool forceRelease) { if (not isValidThread()) { - runOnUpnpContextThread([this] { stopUpnp(); }); + runOnUpnpContextQueue([this] { stopUpnp(); }); return; } @@ -147,6 +164,7 @@ UPnPContext::stopUpnp(bool forceRelease) } } // Invalidate the current IGDs. + preferredIgd_.reset(); validIgdList_.clear(); } @@ -198,18 +216,43 @@ void UPnPContext::connectivityChanged() { if (not isValidThread()) { - runOnUpnpContextThread([this] { connectivityChanged(); }); + runOnUpnpContextQueue([this] { connectivityChanged(); }); return; } + auto hostAddr = ip_utils::getLocalAddr(AF_INET); + + JAMI_DBG("Connectivity change check: host address %s", hostAddr.toString().c_str()); + + auto addrChanged = false; + + // Check if the host address changed. + for (auto const& [_, protocol] : protocolList_) { + if (protocol->isReady() and hostAddr != protocol->getHostAddress()) { + JAMI_WARN("Host address changed from %s to %s", + protocol->getHostAddress().toString().c_str(), + hostAddr.toString().c_str()); + protocol->clearIgds(); + addrChanged = true; + } + } + + // Address did not change, nothing to do. + if (not addrChanged) { + return; + } + + // No registered controller. New search will be performed when + // a controller is registered. if (controllerList_.empty()) return; - JAMI_DBG("Connectivity changed. Reset IGDs and restart."); + JAMI_DBG("Connectivity changed. Clear the IGDs and restart"); stopUpnp(); startUpnp(); + // Mapping with auto update enabled must be processed first. processMappingWithAutoUpdate(); } @@ -266,9 +309,10 @@ UPnPContext::reserveMapping(Mapping& requestedMap) // the caller to use it or not. for (auto const& [_, map] : mappingList) { // If the desired port is null, we pick the first available port. - if ((desiredPort == 0 or map->getExternalPort() == desiredPort) and map->isAvailable()) { - // Considere the first available mapping regardless of - // its state, if we dont have one yet. + if (map->isValid() and (desiredPort == 0 or map->getExternalPort() == desiredPort) + and map->isAvailable()) { + // Considere the first available mapping regardless of its + // state. A mapping with OPEN state will be used if found. if (not mapRes) mapRes = map; @@ -307,7 +351,7 @@ void UPnPContext::releaseMapping(const Mapping& map) { if (not isValidThread()) { - runOnUpnpContextThread([this, map] { releaseMapping(map); }); + runOnUpnpContextQueue([this, map] { releaseMapping(map); }); return; } @@ -332,8 +376,16 @@ UPnPContext::releaseMapping(const Mapping& map) void UPnPContext::registerController(void* controller) { + { + std::lock_guard<std::mutex> lock(mappingMutex_); + if (shutdownComplete_) { + JAMI_WARN("UPnPContext already shut down"); + return; + } + } + if (not isValidThread()) { - runOnUpnpContextThread([this, controller] { registerController(controller); }); + runOnUpnpContextQueue([this, controller] { registerController(controller); }); return; } @@ -352,7 +404,7 @@ void UPnPContext::unregisterController(void* controller) { if (not isValidThread()) { - runOnUpnpContextThread([this, controller] { unregisterController(controller); }); + runOnUpnpContextQueue([this, controller] { unregisterController(controller); }); return; } @@ -394,7 +446,7 @@ UPnPContext::requestMapping(const Mapping::sharedPtr_t& map) assert(map); if (not isValidThread()) { - runOnUpnpContextThread([this, map] { requestMapping(map); }); + runOnUpnpContextQueue([this, map] { requestMapping(map); }); return; } @@ -410,24 +462,16 @@ UPnPContext::requestMapping(const Mapping::sharedPtr_t& map) map->setIgd(igd); - JAMI_DBG("Request mapping %s using protocol [%s] IGD [%s %s]", + JAMI_DBG("Request mapping %s using protocol [%s] IGD [%s]", map->toString().c_str(), igd->getProtocolName(), - igd->getUID().c_str(), - igd->getLocalIp().toString().c_str()); + igd->toString().c_str()); - // Register mapping timeout callback and update the state if needed. - registerAddMappingTimeout(map); if (map->getState() != MappingState::IN_PROGRESS) updateMappingState(map, MappingState::IN_PROGRESS); - // Request the mapping. - if (not igd) { - JAMI_ERR("No valid IGD!"); - return; - } auto const& protocol = protocolList_.at(igd->getProtocol()); - protocol->requestMappingAdd(igd, *map); + protocol->requestMappingAdd(*map); } bool @@ -494,27 +538,48 @@ UPnPContext::deleteUnneededMappings(PortType type, int portCount) return true; } -std::shared_ptr<IGD> -UPnPContext::getPreferredIgd() const +void +UPnPContext::updatePreferredIgd() { CHECK_VALID_THREAD(); - std::shared_ptr<IGD> prefIgd; + if (preferredIgd_ and preferredIgd_->isValid()) + return; + + // Reset and search for the best IGD. + preferredIgd_.reset(); + for (auto const& [_, protocol] : protocolList_) { if (protocol->isReady()) { - std::list<std::shared_ptr<IGD>> igdList; - protocol->getIgdList(igdList); + auto igdList = protocol->getIgdList(); assert(not igdList.empty()); auto const& igd = igdList.front(); + if (not igd->isValid()) + continue; - // Perefer IGD with the lowest error count, if equal, prefer NAT-PMP. - if (not prefIgd or igd->getErrorsCount() < prefIgd->getErrorsCount() - or protocol->getProtocol() == NatProtocolType::NAT_PMP) { - prefIgd = igd; - } + // Prefer NAT-PMP over PUPNP. + if (preferredIgd_ and igd->getProtocol() != NatProtocolType::NAT_PMP) + continue; + + // Update. + preferredIgd_ = igd; } } - return prefIgd; + + if (preferredIgd_ and preferredIgd_->isValid()) { + JAMI_DBG("Preferred IGD updated to [%s] IGD [%s %s] ", + preferredIgd_->getProtocolName(), + preferredIgd_->getUID().c_str(), + preferredIgd_->toString().c_str()); + } +} + +std::shared_ptr<IGD> +UPnPContext::getPreferredIgd() const +{ + CHECK_VALID_THREAD(); + + return preferredIgd_; } void @@ -522,12 +587,15 @@ UPnPContext::updateMappingList(bool async) { // Run async if requested. if (async) { - runOnUpnpContextThread([this] { updateMappingList(false); }); + runOnUpnpContextQueue([this] { updateMappingList(false); }); return; } CHECK_VALID_THREAD(); + // Update the preferred IGD. + updatePreferredIgd(); + // Skip if no controller registered. if (controllerList_.empty()) return; @@ -543,8 +611,6 @@ UPnPContext::updateMappingList(bool async) if (not prefIgd) { JAMI_DBG("UPNP/NAT-PMP enabled, but no valid IGDs available"); std::lock_guard<std::mutex> lock(mappingMutex_); - // Invalidate the current IGDs. - validIgdList_.clear(); // No valid IGD. Nothing to do. return; } @@ -552,7 +618,7 @@ UPnPContext::updateMappingList(bool async) JAMI_DBG("Current preferred protocol [%s] IGD [%s %s] ", prefIgd->getProtocolName(), prefIgd->getUID().c_str(), - prefIgd->getLocalIp().toString().c_str()); + prefIgd->toString().c_str()); // Process pending requests if any. processPendingRequests(prefIgd); @@ -645,8 +711,12 @@ UPnPContext::pruneMappingList() auto remoteMapList = protocol->getMappingsListByDescr(igd, Mapping::UPNP_MAPPING_DESCRIPTION_PREFIX); - if (remoteMapList.empty()) - return; + if (remoteMapList.empty()) { + std::lock_guard<std::mutex> lock(mappingMutex_); + if (not getMappingList(PortType::TCP).empty() or getMappingList(PortType::TCP).empty()) { + JAMI_WARN("We have provisionned mappings but the PUPNP IGD returned an empty list!"); + } + } pruneUnMatchedMappings(igd, remoteMapList); pruneUnTrackedMappings(igd, remoteMapList); @@ -681,7 +751,7 @@ UPnPContext::pruneUnMatchedMappings(const std::shared_ptr<IGD>& igd, JAMI_WARN("Mapping %s (IGD %s) marked as \"OPEN\" but not found in the " "remote list. Mark as failed!", map->toString().c_str(), - igd->getLocalIp().toString().c_str()); + igd->toString().c_str()); } } } @@ -712,7 +782,7 @@ UPnPContext::pruneUnTrackedMappings(const std::shared_ptr<IGD>& igd, // Not present, request mapping remove. JAMI_DBG("Sending a remove request for un-tracked mapping %s on IGD %s", map.toString().c_str(), - igd->getLocalIp().toString().c_str()); + igd->toString().c_str()); // Add to the list. toRemoveList.emplace_back(std::move(map)); // Make only few remove requests at once. @@ -753,9 +823,8 @@ UPnPContext::pruneMappingsWithInvalidIgds(const std::shared_ptr<IGD>& igd) for (auto const& map : toRemoveList) { JAMI_DBG("Remove mapping %s (has an invalid IGD %s [%s])", map->toString().c_str(), - igd->getLocalIp().toString().c_str(), + igd->toString().c_str(), igd->getProtocolName()); - map->cancelTimeoutTimer(); updateMappingState(map, MappingState::FAILED); unregisterMapping(map); } @@ -778,9 +847,9 @@ UPnPContext::processPendingRequests(const std::shared_ptr<IGD>& igd) auto& mappingList = getMappingList(type); for (auto& [_, map] : mappingList) { if (map->getState() == MappingState::PENDING) { - JAMI_DBG("Send request for pending mapping %s to IGD %s", + JAMI_DBG("Send pending request for mapping %s to IGD %s", map->toString().c_str(), - igd->getLocalIp().toString(true).c_str()); + igd->toString().c_str()); requestsList.emplace_back(map); } } @@ -823,11 +892,11 @@ UPnPContext::processMappingWithAutoUpdate() // Reserve a new mapping. Mapping newMapping(oldMap->getType()); + newMapping.enableAutoUpdate(true); + newMapping.setNotifyCallback(oldMap->getNotifyCallback()); + auto const& mapPtr = reserveMapping(newMapping); assert(mapPtr); - mapPtr->setAvailable(false); - mapPtr->enableAutoUpdate(true); - mapPtr->setNotifyCallback(oldMap->getNotifyCallback()); // Release the old one. oldMap->setAvailable(true); @@ -843,10 +912,13 @@ UPnPContext::onIgdUpdated(const std::shared_ptr<IGD>& igd, UpnpIgdEvent event) assert(igd); if (not isValidThread()) { - runOnUpnpContextThread([this, igd, event] { onIgdUpdated(igd, event); }); + runOnUpnpContextQueue([this, igd, event] { onIgdUpdated(igd, event); }); return; } + // Reset to start search for a new best IGD. + preferredIgd_.reset(); + char const* IgdState = event == UpnpIgdEvent::ADDED ? "ADDED" : event == UpnpIgdEvent::REMOVED ? "REMOVED" : "INVALID"; @@ -854,9 +926,13 @@ UPnPContext::onIgdUpdated(const std::shared_ptr<IGD>& igd, UpnpIgdEvent event) auto const& igdLocalAddr = igd->getLocalIp(); auto protocolName = igd->getProtocolName(); - JAMI_DBG("New event for IGD [%s] [%s]: [%s]", igd->getUID().c_str(), protocolName, IgdState); + JAMI_DBG("New event for IGD [%s %s] [%s]: [%s]", + igd->getUID().c_str(), + igd->toString().c_str(), + protocolName, + IgdState); - // Check if IGD has a valid addresses. + // Check if the IGD has valid addresses. if (not igdLocalAddr) { JAMI_WARN("[%s] IGD has an invalid local address", protocolName); return; @@ -879,7 +955,7 @@ UPnPContext::onIgdUpdated(const std::shared_ptr<IGD>& igd, UpnpIgdEvent event) if (event == UpnpIgdEvent::REMOVED or event == UpnpIgdEvent::INVALID_STATE) { JAMI_WARN("State of IGD [%s %s] [%s] changed to [%s]. Pruning the mapping list", igd->getUID().c_str(), - igdLocalAddr.toString(true, true).c_str(), + igd->toString().c_str(), protocolName, IgdState); @@ -907,7 +983,7 @@ UPnPContext::onIgdUpdated(const std::shared_ptr<IGD>& igd, UpnpIgdEvent event) } } - // Update. + // Update the provisionned mappings. updateMappingList(false); } @@ -922,50 +998,22 @@ UPnPContext::onMappingAdded(const std::shared_ptr<IGD>& igd, const Mapping& mapR // We may receive a response for a canceled request. Just ignore it. JAMI_DBG("Response for mapping %s [IGD %s] [%s] does not have a local match", mapRes.toString().c_str(), - igd->getLocalIp().toString().c_str(), + igd->toString().c_str(), mapRes.getProtocolName()); return; } - // The mapping pointer must be valid at his point. - assert(map); - - // We have a response, so cancel the timer. - map->cancelTimeoutTimer(); - - // Update the mapping. - map->setState(mapRes.getState()); - map->setIgd(mapRes.getIgd()); + // The mapping request is new and successful. Update. + map->setIgd(igd); map->setInternalAddress(mapRes.getInternalAddress()); map->setExternalPort(mapRes.getExternalPort()); - // Clean-up if not valid. - if (not map->isValid()) { - JAMI_WARN("Mapping request %s on IGD %s [%s] failed!", - map->toString(true).c_str(), - igd->getLocalIp().toString().c_str(), - igd->getProtocolName()); - updateMappingState(map, MappingState::FAILED); - unregisterMapping(map); - return; - } - - // Ignore if already open. - if (map->getState() == MappingState::OPEN) { - assert(map->getIgd()); - JAMI_DBG("Mapping %s is already open on IGD %s [%s]", - map->toString().c_str(), - map->getIgd()->getLocalIp().toString().c_str(), - map->getIgd()->getProtocolName()); - return; - } - - // The mapping request is new and successful. Update. + // Update the state and report to the owner. updateMappingState(map, MappingState::OPEN); JAMI_DBG("Mapping %s (on IGD %s [%s]) successfully performed", map->toString().c_str(), - igd->getLocalIp().toString().c_str(), + igd->toString().c_str(), map->getProtocolName()); // Call setValid() to reset the errors counter. We need @@ -983,7 +1031,7 @@ UPnPContext::onMappingRenewed(const std::shared_ptr<IGD>& igd, const Mapping& ma // We may receive a notification for a canceled request. Ignore it. JAMI_WARN("Renewed mapping %s from IGD %s [%s] does not have a match in local list", map.toString().c_str(), - igd->getLocalIp().toString().c_str(), + igd->toString().c_str(), map.getProtocolName()); return; } @@ -991,7 +1039,7 @@ UPnPContext::onMappingRenewed(const std::shared_ptr<IGD>& igd, const Mapping& ma or mapPtr->getState() != MappingState::OPEN) { JAMI_WARN("Renewed mapping %s from IGD %s [%s] is in unexpected state", mapPtr->toString().c_str(), - igd->getLocalIp().toString().c_str(), + igd->toString().c_str(), mapPtr->getProtocolName()); return; } @@ -1011,12 +1059,7 @@ UPnPContext::requestRemoveMapping(const Mapping::sharedPtr_t& map) } if (not map->isValid()) { - JAMI_WARN("Mapping [%s] is invalid! Ignore.", map->toString().c_str()); - return; - } - - if (not map->getIgd()->isValid()) { - JAMI_WARN("Mapping [%s] has an invalid IGD! Ignore.", map->toString().c_str()); + // Silently ignore if the mapping is invalid return; } @@ -1028,7 +1071,7 @@ void UPnPContext::deleteAllMappings(PortType type) { if (not isValidThread()) { - runOnUpnpContextThread([this, type] { deleteAllMappings(type); }); + runOnUpnpContextQueue([this, type] { deleteAllMappings(type); }); return; } @@ -1047,7 +1090,7 @@ UPnPContext::onMappingRemoved(const std::shared_ptr<IGD>& igd, const Mapping& ma return; if (not isValidThread()) { - runOnUpnpContextThread([this, igd, mapRes] { onMappingRemoved(igd, mapRes); }); + runOnUpnpContextQueue([this, igd, mapRes] { onMappingRemoved(igd, mapRes); }); return; } @@ -1119,8 +1162,6 @@ UPnPContext::unregisterMapping(const Mapping::sharedPtr_t& map) return; } - map->cancelTimeoutTimer(); - if (map->getAutoUpdate()) { // Dont unregister mappings with auto-update enabled. return; @@ -1130,7 +1171,10 @@ UPnPContext::unregisterMapping(const Mapping::sharedPtr_t& map) if (mappingList.erase(map->getMapKey()) == 1) { JAMI_DBG("Unregistered mapping %s", map->toString().c_str()); } else { - JAMI_ERR("Failed to unregister mapping %s", map->toString().c_str()); + // The mapping may already be un-registered. Just ignore it. + JAMI_DBG("Mapping %s [%s] does not have a local match", + map->toString().c_str(), + map->getProtocolName()); } } @@ -1195,44 +1239,16 @@ UPnPContext::getMappingStatus(MappingStatus& status) } void -UPnPContext::registerAddMappingTimeout(const Mapping::sharedPtr_t& map) -{ - if (not map) { - JAMI_ERR("Invalid mapping pointer"); - return; - } - - auto const& igd = map->getIgd(); - if (not igd) { - JAMI_ERR("Invalid igd pointer"); - return; - } - - MappingStatus status; - getMappingStatus(status); - - // Schedule the timer and hold a pointer on the task. - auto timeout = igd->getProtocol() == NatProtocolType::NAT_PMP ? NAT_MAP_REQUEST_TIMEOUT_UNIT - : PUPNP_MAP_REQUEST_TIMEOUT_UNIT; - map->setTimeoutTimer( - // The time-out is set proportional to the number of "in-progress" - // requests plus a bias. - // This rule is empirical and based on experimenting with few - // implementations. - Manager::instance() - .scheduler() - .scheduleIn([this, key = map->getMapKey()] { onRequestTimeOut(key); }, - timeout * (status.inProgressCount_ + 5))); -} - -void -UPnPContext::onRequestTimeOut(Mapping::key_t key) +UPnPContext::onMappingRequestFailed(const Mapping& mapRes) { CHECK_VALID_THREAD(); - auto const& map = getMappingWithKey(key); + auto const& map = getMappingWithKey(mapRes.getMapKey()); if (not map) { - JAMI_ERR("Mapping pointer is null"); + // We may receive a response for a removed request. Just ignore it. + JAMI_DBG("Mapping %s [IGD %s] does not have a local match", + mapRes.toString().c_str(), + mapRes.getProtocolName()); return; } @@ -1242,31 +1258,13 @@ UPnPContext::onRequestTimeOut(Mapping::key_t key) return; } - if (not getMappingWithKey(map->getMapKey())) { - JAMI_ERR("Mapping [%s] does not exist", map->toString().c_str()); - return; - } - - // Ignore time-out if the request is not in-progress state. - // Should not occur. - if (map->getState() != MappingState::IN_PROGRESS) { - JAMI_ERR("Mapping %s timed-out but is not in IN-PROGRESS state (curr %s)", - map->toString().c_str(), - map->getStateStr()); - return; - } + updateMappingState(map, MappingState::FAILED); + unregisterMapping(map); - JAMI_WARN("Mapping request for %s timed-out on IGD %s [%s]", + JAMI_WARN("Mapping request for %s failed on IGD %s [%s]", map->toString().c_str(), - igd->getLocalIp().toString().c_str(), + igd->toString().c_str(), igd->getProtocolName()); - - auto protocol = protocolList_.at(igd->getProtocol()); - protocol->incrementErrorsCounter(igd); - - // Considere time-out as failure. - updateMappingState(map, MappingState::FAILED); - unregisterMapping(map); } void diff --git a/src/upnp/upnp_context.h b/src/upnp/upnp_context.h index 6ce476a03e..fe12465ee6 100644 --- a/src/upnp/upnp_context.h +++ b/src/upnp/upnp_context.h @@ -3,6 +3,7 @@ * * Author: Stepan Salenikovich <stepan.salenikovich@savoirfairelinux.com> * Author: Eden Abitbol <eden.abitbol@savoirfairelinux.com> + * Author: Mohamed Chibani <mohamed.chibani@savoirfairelinux.com> * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -126,6 +127,9 @@ private: // Initialization void init(); + // Wait for Shutdown + void waitForShutdown(); + /** * @brief start the search for IGDs activate the mapping * list update. @@ -159,12 +163,6 @@ private: // Remove all mappings of the given type. void deleteAllMappings(PortType type); - // Schedule a time-out timer for a in-progress request. - void registerAddMappingTimeout(const Mapping::sharedPtr_t& map); - - // Callback invoked when a request times-out - void onRequestTimeOut(Mapping::key_t key); - // Update the state and notify the listener void updateMappingState(const Mapping::sharedPtr_t& map, MappingState newState, @@ -173,6 +171,9 @@ private: // Provision ports. uint16_t getAvailablePortNumber(PortType type); + // Update preferred IGD + void updatePreferredIgd(); + // Get preferred IGD std::shared_ptr<IGD> getPreferredIgd() const; @@ -243,6 +244,9 @@ private: void onIgdUpdated(const std::shared_ptr<IGD>& igd, UpnpIgdEvent event) override; // Callback used to report add request status. void onMappingAdded(const std::shared_ptr<IGD>& igd, const Mapping& map) override; + // Callback invoked when a request fails. Reported on failures for both + // new requests and renewal requests (if supported by the the protocol). + void onMappingRequestFailed(const Mapping& map) override; #if HAVE_LIBNATPMP // Callback used to report renew request status. void onMappingRenewed(const std::shared_ptr<IGD>& igd, const Mapping& map) override; @@ -275,12 +279,19 @@ private: std::shared_ptr<Task> mappingListUpdateTimer_ {}; + // Current preferred IGD. Can be null if there is no valid IGD. + std::shared_ptr<IGD> preferredIgd_; + // This mutex must lock only these two members. All other // members must be accessed only from the UPNP context thread. std::mutex mutable mappingMutex_; // List of mappings. std::map<Mapping::key_t, Mapping::sharedPtr_t> mappingList_[2] {}; std::set<std::shared_ptr<IGD>> validIgdList_ {}; + + // Shutdown synchronization + std::condition_variable shutdownCv_ {}; + bool shutdownComplete_ {false}; }; } // namespace upnp diff --git a/src/upnp/upnp_control.cpp b/src/upnp/upnp_control.cpp index 702da5ff0f..0aecf23942 100644 --- a/src/upnp/upnp_control.cpp +++ b/src/upnp/upnp_control.cpp @@ -2,7 +2,8 @@ * Copyright (C) 2004-2021 Savoir-faire Linux Inc. * * Author: Stepan Salenikovich <stepan.salenikovich@savoirfairelinux.com> - * Author: Eden Abitbol <eden.abitbol@savoirfairelinux.com> + * Author: Eden Abitbol <eden.abitbol@savoirfairelinux.com> + * Author: Mohamed Chibani <mohamed.chibani@savoirfairelinux.com> * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by diff --git a/src/upnp/upnp_control.h b/src/upnp/upnp_control.h index c24579aa62..d19ac8e4bd 100644 --- a/src/upnp/upnp_control.h +++ b/src/upnp/upnp_control.h @@ -2,7 +2,8 @@ * Copyright (C) 2004-2021 Savoir-faire Linux Inc. * * Author: Stepan Salenikovich <stepan.salenikovich@savoirfairelinux.com> - * Author: Eden Abitbol <eden.abitbol@savoirfairelinux.com> + * Author: Eden Abitbol <eden.abitbol@savoirfairelinux.com> + * Author: Mohamed Chibani <mohamed.chibani@savoirfairelinux.com> * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by diff --git a/src/upnp/upnp_thread_util.h b/src/upnp/upnp_thread_util.h index 1fcc3c9cee..e0320c4d8f 100644 --- a/src/upnp/upnp_thread_util.h +++ b/src/upnp/upnp_thread_util.h @@ -22,12 +22,11 @@ protected: bool isValidThread() const { return threadId_ == getCurrentThread(); } - // Upnp scheduler (same as manager's thread) + // Upnp context execution queue (same as manager's scheduler) + // Helpers to run tasks on upnp context queue. static ScheduledExecutor* getScheduler() { return &Manager::instance().scheduler(); } - - // Helper to run tasks on upnp thread. template<typename Callback> - static void runOnUpnpContextThread(Callback&& cb) + static void runOnUpnpContextQueue(Callback&& cb) { getScheduler()->run([cb = std::forward<Callback>(cb)]() mutable { cb(); }); } -- GitLab