diff --git a/src/manager.cpp b/src/manager.cpp index eb6a3f8d2ccefda4317089d2aa01e4c670c51e2e..800dd4d79e7f1ef924be9caef2fbbe009fbccce9 100644 --- a/src/manager.cpp +++ b/src/manager.cpp @@ -890,6 +890,8 @@ Manager::finish() noexcept pimpl_->audiodriver_.reset(); } + JAMI_DBG("Stopping schedulers and worker threads"); + // Flush remaining tasks (free lambda' with capture) pimpl_->scheduler_.stop(); dht::ThreadPool::io().join(); diff --git a/src/upnp/protocol/igd.cpp b/src/upnp/protocol/igd.cpp index 8159a89367bea4d1a8ea72d67a5d3ab5df3785b8..88cc72b68b45a6f508b4baab91c25dcdbc878937 100644 --- a/src/upnp/protocol/igd.cpp +++ b/src/upnp/protocol/igd.cpp @@ -2,7 +2,8 @@ * Copyright (C) 2004-2021 Savoir-faire Linux Inc. * * Author: Stepan Salenikovich <stepan.salenikovich@savoirfairelinux.com> - * Author: Eden Abitbol <eden.abitbol@savoirfairelinux.com> + * Author: Eden Abitbol <eden.abitbol@savoirfairelinux.com> + * Author: Mohamed Chibani <mohamed.chibani@savoirfairelinux.com> * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -44,7 +45,7 @@ IGD::setValid(bool valid) // Reset errors counter. errorsCounter_ = 0; } else { - JAMI_WARN("IGD %s [%s] was disabled", localIp_.toString().c_str(), getProtocolName()); + JAMI_WARN("IGD %s [%s] was disabled", toString().c_str(), getProtocolName()); } } @@ -56,7 +57,7 @@ IGD::incrementErrorsCounter() if (++errorsCounter_ >= MAX_ERRORS_COUNT) { JAMI_WARN("IGD %s [%s] has too many errors, it will be disabled", - localIp_.toString().c_str(), + toString().c_str(), getProtocolName()); setValid(false); return false; diff --git a/src/upnp/protocol/igd.h b/src/upnp/protocol/igd.h index ceb523bcbe6b2f8caf5baa231eed69660fbeac8d..b38b391dbd6f0be57e9814afae50e6a0b82cb2a0 100644 --- a/src/upnp/protocol/igd.h +++ b/src/upnp/protocol/igd.h @@ -2,6 +2,7 @@ * Copyright (C) 2004-2021 Savoir-faire Linux Inc. * * Author: Eden Abitbol <eden.abitbol@savoirfairelinux.com> + * Author: Mohamed Chibani <mohamed.chibani@savoirfairelinux.com> * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -95,6 +96,8 @@ public: bool incrementErrorsCounter(); int getErrorsCount() const; + virtual const std::string toString() const = 0; + protected: const NatProtocolType protocol_ {NatProtocolType::UNKNOWN}; std::atomic_bool valid_ {false}; diff --git a/src/upnp/protocol/mapping.cpp b/src/upnp/protocol/mapping.cpp index 58a5a367f5594b7f894d9a7d44993d1c0caada20..0773065ff9c9f606f2cf513df296b53d9ca6bb98 100644 --- a/src/upnp/protocol/mapping.cpp +++ b/src/upnp/protocol/mapping.cpp @@ -3,6 +3,7 @@ * * Author: Stepan Salenikovich <stepan.salenikovich@savoirfairelinux.com> * Author: Eden Abitbol <eden.abitbol@savoirfairelinux.com> + * Author: Mohamed Chibani <mohamed.chibani@savoirfairelinux.com> * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -34,7 +35,6 @@ Mapping::Mapping(PortType type, uint16_t portExternal, uint16_t portInternal, bo , available_(available) , state_(MappingState::PENDING) , notifyCb_(nullptr) - , timeoutTimer_() , autoUpdate_(false) #if HAVE_LIBNATPMP , renewalTime_(sys_clock::now()) @@ -171,27 +171,6 @@ Mapping::hasPublicAddress() const return igd_ and igd_->getPublicIp() and not igd_->getPublicIp().isPrivate(); } -void -Mapping::setTimeoutTimer(std::shared_ptr<Task> timer) -{ - // Cancel current timer if any. - cancelTimeoutTimer(); - - std::lock_guard<std::mutex> lock(mutex_); - timeoutTimer_ = std::move(timer); -} - -void -Mapping::cancelTimeoutTimer() -{ - std::lock_guard<std::mutex> lock(mutex_); - - if (timeoutTimer_) { - timeoutTimer_->cancel(); - timeoutTimer_.reset(); - } -} - Mapping::key_t Mapping::getMapKey() const { diff --git a/src/upnp/protocol/mapping.h b/src/upnp/protocol/mapping.h index 43885917609960a3d39348d72bee2f021a21e5e9..44006cd6504df81c8bb085e1f682c38ccb95964c 100644 --- a/src/upnp/protocol/mapping.h +++ b/src/upnp/protocol/mapping.h @@ -2,6 +2,7 @@ * Copyright (C) 2004-2021 Savoir-faire Linux Inc. * * Author: Eden Abitbol <eden.abitbol@savoirfairelinux.com> + * Author: Mohamed Chibani <mohamed.chibani@savoirfairelinux.com> * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -124,8 +125,6 @@ private: void setAvailable(bool val); void setState(const MappingState& state); void updateDescription(); - void setTimeoutTimer(std::shared_ptr<Task> timer); - void cancelTimeoutTimer(); #if HAVE_LIBNATPMP void setRenewalTime(sys_clock::time_point time); #endif @@ -142,7 +141,6 @@ private: // Track the state of the mapping MappingState state_; NotifyCallback notifyCb_; - std::shared_ptr<Task> timeoutTimer_ {}; // If true, a new mapping will be requested on behave of the mapping // owner when the mapping state changes from "OPEN" to "FAILED". bool autoUpdate_; diff --git a/src/upnp/protocol/natpmp/nat_pmp.cpp b/src/upnp/protocol/natpmp/nat_pmp.cpp index c7d20f890549359d92701f6a76d79f6793b6c3fa..328f8f3b869ee95859678b49b890f6205771aa76 100644 --- a/src/upnp/protocol/natpmp/nat_pmp.cpp +++ b/src/upnp/protocol/natpmp/nat_pmp.cpp @@ -2,6 +2,7 @@ * Copyright (C) 2004-2021 Savoir-faire Linux Inc. * * Author: Eden Abitbol <eden.abitbol@savoirfairelinux.com> + * Author: Mohamed Chibani <mohamed.chibani@savoirfairelinux.com> * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -28,7 +29,7 @@ namespace upnp { NatPmp::NatPmp() { JAMI_DBG("NAT-PMP: Instance [%p] created", this); - getNatpmpScheduler()->run([this] { + runOnNatPmpQueue([this] { threadId_ = getCurrentThread(); igd_ = std::make_shared<PMPIGD>(); }); @@ -43,12 +44,20 @@ void NatPmp::initNatPmp() { if (not isValidThread()) { - getNatpmpScheduler()->run([this] { initNatPmp(); }); + runOnNatPmpQueue([w = weak()] { + if (auto pmpThis = w.lock()) { + pmpThis->initNatPmp(); + } + }); return; } initialized_ = false; - hostAddress_ = ip_utils::getLocalAddr(pj_AF_INET()); + + { + std::lock_guard<std::mutex> lock(natpmpMutex_); + hostAddress_ = ip_utils::getLocalAddr(AF_INET); + } // Local address must be valid. if (not getHostAddress() or getHostAddress().isLoopback()) { @@ -112,11 +121,26 @@ NatPmp::initNatPmp() }; } +void +NatPmp::waitForShutdown() +{ + std::unique_lock<std::mutex> lk(natpmpMutex_); + if (shutdownCv_.wait_for(lk, std::chrono::seconds(10), [this] { return shutdownComplete_; })) { + JAMI_DBG("NAT-PMP: Shutdown completed"); + } else { + JAMI_ERR("NAT-PMP: Shutdown timed-out"); + } +} + void NatPmp::setObserver(UpnpMappingObserver* obs) { if (not isValidThread()) { - getNatpmpScheduler()->run([this, obs] { setObserver(obs); }); + runOnNatPmpQueue([w = weak(), obs] { + if (auto pmpThis = w.lock()) { + pmpThis->setObserver(obs); + } + }); return; } @@ -129,21 +153,48 @@ void NatPmp::terminate() { if (not isValidThread()) { - getNatpmpScheduler()->run([this] { terminate(); }); + runOnNatPmpQueue([w = weak()] { + if (auto pmpThis = w.lock()) { + pmpThis->terminate(); + } + }); + waitForShutdown(); return; } + initialized_ = false; observer_ = nullptr; + + { + std::lock_guard<std::mutex> lock(natpmpMutex_); + shutdownComplete_ = true; + } + + shutdownCv_.notify_one(); +} + +const IpAddr +NatPmp::getHostAddress() const +{ + std::lock_guard<std::mutex> lock(natpmpMutex_); + return hostAddress_; } void NatPmp::clearIgds() { if (not isValidThread()) { - getNatpmpScheduler()->run([this] { clearIgds(); }); + runOnNatPmpQueue([w = weak()] { + if (auto pmpThis = w.lock()) { + pmpThis->clearIgds(); + } + }); return; } + if (igd_) + igd_->setValid(false); + initialized_ = false; if (searchForIgdTimer_) searchForIgdTimer_->cancel(); @@ -158,7 +209,11 @@ void NatPmp::searchForIgd() { if (not isValidThread()) { - getNatpmpScheduler()->run([this] { searchForIgd(); }); + runOnNatPmpQueue([w = weak()] { + if (auto pmpThis = w.lock()) { + pmpThis->searchForIgd(); + } + }); return; } @@ -169,12 +224,15 @@ NatPmp::searchForIgd() // Schedule a retry in case init failed. if (not initialized_) { if (igdSearchCounter_++ < MAX_RESTART_SEARCH_RETRIES) { + JAMI_DBG("NAT-PMP: Start search for IGDs. Attempt %i", igdSearchCounter_); + // Cancel the current timer (if any) and re-schedule. if (searchForIgdTimer_) searchForIgdTimer_->cancel(); searchForIgdTimer_ = getNatpmpScheduler()->scheduleIn([this] { searchForIgd(); }, - TIMEOUT_BEFORE_IGD_SEARCH_RETRY); + NATPMP_SEARCH_RETRY_UNIT + * igdSearchCounter_); } else { JAMI_WARN("NAT-PMP: Setup failed after %u trials. NAT-PMP will be disabled!", MAX_RESTART_SEARCH_RETRIES); @@ -182,11 +240,14 @@ NatPmp::searchForIgd() } } -void -NatPmp::getIgdList(std::list<std::shared_ptr<IGD>>& igdList) const +std::list<std::shared_ptr<IGD>> +NatPmp::getIgdList() const { + std::lock_guard<std::mutex> lock(natpmpMutex_); + std::list<std::shared_ptr<IGD>> igdList; if (igd_->isValid()) igdList.emplace_back(igd_); + return igdList; } bool @@ -200,6 +261,7 @@ NatPmp::isReady() const // Must at least have a valid local address. if (not getHostAddress() or getHostAddress().isLoopback()) return false; + return igd_ and igd_->isValid(); } @@ -226,37 +288,78 @@ NatPmp::incrementErrorsCounter(const std::shared_ptr<IGD>& igdIn) } void -NatPmp::requestMappingAdd(const std::shared_ptr<IGD>& igd, const Mapping& mapping) +NatPmp::requestMappingAdd(const Mapping& mapping) { // Process on nat-pmp thread. - getNatpmpScheduler()->run([this, igd, mapping] { - JAMI_DBG("NAT-PMP: Request mapping %s on %s", - mapping.toString().c_str(), - igd_->getLocalIp().toString().c_str()); + if (not isValidThread()) { + runOnNatPmpQueue([w = weak(), mapping] { + if (auto pmpThis = w.lock()) { + pmpThis->requestMappingAdd(mapping); + } + }); + return; + } - Mapping map {mapping}; - addPortMapping(igd, map, false); - }); + Mapping map(mapping); + assert(map.getIgd()); + auto err = addPortMapping(map); + if (err < 0) { + JAMI_WARN("NAT-PMP: Request for mapping %s on %s failed with error %i: %s", + map.toString().c_str(), + igd_->toString().c_str(), + err, + getNatPmpErrorStr(err)); + + if (isErrorFatal(err)) { + // Fatal error, increment the counter. + incrementErrorsCounter(igd_); + } + // Notify the listener. + processMappingRequestFailed(std::move(map)); + } else { + JAMI_DBG("NAT-PMP: Request for mapping %s on %s succeeded", + map.toString().c_str(), + igd_->toString().c_str()); + // Notify the listener. + processMappingAdded(std::move(map)); + } } void NatPmp::requestMappingRenew(const Mapping& mapping) { // Process on nat-pmp thread. - getNatpmpScheduler()->run([this, mapping] { - if (not mapping.getIgd() or not mapping.getIgd()->isValid()) { - JAMI_WARN("NAT-PMP: Mapping %s has an invalid IGD. Ignoring.", - mapping.toString().c_str()); - return; - } + if (not isValidThread()) { + runOnNatPmpQueue([w = weak(), mapping] { + if (auto pmpThis = w.lock()) { + pmpThis->requestMappingRenew(mapping); + } + }); + return; + } - JAMI_DBG("NAT-PMP: Renew mapping %s on %s", - mapping.toString().c_str(), - mapping.getIgd()->getLocalIp().toString().c_str()); + Mapping map(mapping); + auto err = addPortMapping(map); + if (err < 0) { + JAMI_WARN("NAT-PMP: Renewal request for mapping %s on %s failed with error %i: %s", + map.toString().c_str(), + igd_->toString().c_str(), + err, + getNatPmpErrorStr(err)); + // Notify the listener. + processMappingRequestFailed(std::move(map)); - Mapping map {mapping}; - addPortMapping(mapping.getIgd(), map, true); - }); + if (isErrorFatal(err)) { + // Fatal error, increment the counter. + incrementErrorsCounter(igd_); + } + } else { + JAMI_DBG("NAT-PMP: Renewal request for mapping %s on %s succeeded", + map.toString().c_str(), + igd_->toString().c_str()); + // Notify the listener. + processMappingRenewed(map); + } } int @@ -323,14 +426,14 @@ NatPmp::sendMappingRequest(const Mapping& mapping, uint32_t& lifetime) if (err < 0) { JAMI_WARN("NAT-PMP: Read response on IGD %s failed with error %s", - igd_->getLocalIp().toString().c_str(), + igd_->toString().c_str(), getNatPmpErrorStr(err)); } else if (response.type != NATPMP_RESPTYPE_TCPPORTMAPPING and response.type != NATPMP_RESPTYPE_UDPPORTMAPPING) { JAMI_ERR("NAT-PMP: Unexpected response type (%i) for mapping %s from IGD %s.", response.type, mapping.toString().c_str(), - igd_->getLocalIp().toString().c_str()); + igd_->toString().c_str()); // Try to read again. continue; } @@ -343,67 +446,48 @@ NatPmp::sendMappingRequest(const Mapping& mapping, uint32_t& lifetime) return err; } -void -NatPmp::addPortMapping(const std::shared_ptr<IGD>& igdIn, Mapping& mapping, bool renew) +int +NatPmp::addPortMapping(Mapping& mapping) { + auto const& igdIn = mapping.getIgd(); assert(igdIn); assert(igdIn->getProtocol() == NatProtocolType::NAT_PMP); - if (not igdIn->isValid()) - return; - - if (not validIgdInstance(igdIn)) { - return; + if (not igdIn->isValid() or not validIgdInstance(igdIn)) { + mapping.setState(MappingState::FAILED); + return NATPMP_ERR_INVALIDARGS; } - Mapping mapToAdd(mapping); - mapToAdd.setInternalAddress(getHostAddress().toString()); - mapToAdd.setIgd(igd_); + mapping.setInternalAddress(getHostAddress().toString()); uint32_t lifetime = MAPPING_ALLOCATION_LIFETIME; int err = sendMappingRequest(mapping, lifetime); if (err < 0) { - JAMI_WARN("NAT-PMP: Add mapping request failed with error %s %i", - getNatPmpErrorStr(err), - errno); - - if (isErrorFatal(err)) { - // Fatal error, increment the counter. - incrementErrorsCounter(igd_); - } - // Mark as failed and notify. - mapToAdd.setState(MappingState::FAILED); - processMappingAdded(std::move(mapToAdd)); - } else { - // Success! Set renewal and update. - // Renewal time is set before the allocation expires. - mapToAdd.setRenewalTime(sys_clock::now() + std::chrono::seconds(lifetime * 4 / 5)); - mapToAdd.setState(MappingState::OPEN); - if (not renew) { - JAMI_DBG("NAT-PMP: Allocated mapping %s on %s", - mapToAdd.toString().c_str(), - igd_->getLocalIp().toString().c_str()); - // Notify the listener. - processMappingAdded(std::move(mapToAdd)); - } else { - JAMI_DBG("NAT-PMP: Renewed mapping %s on %s", - mapToAdd.toString().c_str(), - igd_->getLocalIp().toString().c_str()); - // Notify. - processMappingRenewed(std::move(mapToAdd)); - } + mapping.setState(MappingState::FAILED); + return err; } + + // Set the renewal time and update. + mapping.setRenewalTime(sys_clock::now() + std::chrono::seconds(lifetime * 4 / 5)); + mapping.setState(MappingState::OPEN); + + return 0; } void NatPmp::requestMappingRemove(const Mapping& mapping) { // Process on nat-pmp thread. - getNatpmpScheduler()->run([this, mapping] { - Mapping map {mapping}; - removePortMapping(map); - }); + if (not isValidThread()) { + runOnNatPmpQueue([w = weak(), mapping] { + if (auto pmpThis = w.lock()) { + Mapping map {mapping}; + pmpThis->removePortMapping(map); + } + }); + return; + } } void @@ -425,6 +509,7 @@ NatPmp::removePortMapping(Mapping& mapping) int err = sendMappingRequest(mapping, lifetime); if (err < 0) { + // Nothing to do if the request fails, just log the error. JAMI_WARN("NAT-PMP: Send remove request failed with error %s. Ignoring", getNatPmpErrorStr(err)); } @@ -443,7 +528,7 @@ NatPmp::getIgdPublicAddress() // have one already. if (igd_->getPublicIp()) { JAMI_WARN("NAT-PMP: IGD %s already have a public address (%s)", - igd_->getLocalIp().toString().c_str(), + igd_->toString().c_str(), igd_->getPublicIp().toString().c_str()); return; } @@ -453,7 +538,7 @@ NatPmp::getIgdPublicAddress() if (err < 0) { JAMI_ERR("NAT-PMP: send public address request on IGD %s failed with error: %s", - igd_->getLocalIp().toString().c_str(), + igd_->toString().c_str(), getNatPmpErrorStr(err)); if (isErrorFatal(err)) { @@ -468,7 +553,7 @@ NatPmp::getIgdPublicAddress() if (err < 0) { JAMI_ERR("NAT-PMP: read response on IGD %s failed with error %s", - igd_->getLocalIp().toString().c_str(), + igd_->toString().c_str(), getNatPmpErrorStr(err)); return; } @@ -476,7 +561,7 @@ NatPmp::getIgdPublicAddress() if (response.type != NATPMP_RESPTYPE_PUBLICADDRESS) { JAMI_ERR("NAT-PMP: Unexpected response type (%i) for public address request from IGD %s.", response.type, - igd_->getLocalIp().toString().c_str()); + igd_->toString().c_str()); return; } @@ -484,7 +569,7 @@ NatPmp::getIgdPublicAddress() if (not publicAddr) { JAMI_ERR("NAT-PMP: IGD %s returned an invalid public address %s", - igd_->getLocalIp().toString().c_str(), + igd_->toString().c_str(), publicAddr.toString().c_str()); } @@ -493,7 +578,7 @@ NatPmp::getIgdPublicAddress() igd_->setValid(true); JAMI_DBG("NAT-PMP: Setting IGD %s public address to %s", - igd_->getLocalIp().toString().c_str(), + igd_->toString().c_str(), igd_->getPublicIp().toString().c_str()); } @@ -503,7 +588,7 @@ NatPmp::removeAllMappings() CHECK_VALID_THREAD(); JAMI_WARN("NAT-PMP: Send request to close all existing mappings to IGD %s", - igd_->getLocalIp().toString().c_str()); + igd_->toString().c_str()); int err = sendnewportmappingrequest(&natpmpHdl_, NATPMP_PROTOCOL_TCP, 0, 0, 0); if (err < 0) { @@ -616,9 +701,9 @@ bool NatPmp::validIgdInstance(const std::shared_ptr<IGD>& igdIn) { if (igd_.get() != igdIn.get()) { - JAMI_ERR("NAT-PMP: IGD (%s) does not match in local instance (%s)", - igdIn->getLocalIp().toString().c_str(), - igd_->getLocalIp().toString().c_str()); + JAMI_ERR("NAT-PMP: IGD (%s) does not match local instance (%s)", + igdIn->toString().c_str(), + igd_->toString().c_str()); return false; } @@ -636,7 +721,7 @@ NatPmp::processIgdUpdate(UpnpIgdEvent event) if (observer_ == nullptr) return; // Process the response on the context thread. - runOnUpnpContextThread([obs = observer_, igd = igd_, event] { obs->onIgdUpdated(igd, event); }); + runOnUpnpContextQueue([obs = observer_, igd = igd_, event] { obs->onIgdUpdated(igd, event); }); } void @@ -646,7 +731,17 @@ NatPmp::processMappingAdded(const Mapping& map) return; // Process the response on the context thread. - runOnUpnpContextThread([obs = observer_, igd = igd_, map] { obs->onMappingAdded(igd, map); }); + runOnUpnpContextQueue([obs = observer_, igd = igd_, map] { obs->onMappingAdded(igd, map); }); +} + +void +NatPmp::processMappingRequestFailed(const Mapping& map) +{ + if (observer_ == nullptr) + return; + + // Process the response on the context thread. + runOnUpnpContextQueue([obs = observer_, igd = igd_, map] { obs->onMappingRequestFailed(map); }); } void @@ -656,7 +751,7 @@ NatPmp::processMappingRenewed(const Mapping& map) return; // Process the response on the context thread. - runOnUpnpContextThread([obs = observer_, igd = igd_, map] { obs->onMappingRenewed(igd, map); }); + runOnUpnpContextQueue([obs = observer_, igd = igd_, map] { obs->onMappingRenewed(igd, map); }); } void @@ -666,7 +761,7 @@ NatPmp::processMappingRemoved(const Mapping& map) return; // Process the response on the context thread. - runOnUpnpContextThread([obs = observer_, igd = igd_, map] { obs->onMappingRemoved(igd, map); }); + runOnUpnpContextQueue([obs = observer_, igd = igd_, map] { obs->onMappingRemoved(igd, map); }); } } // namespace upnp diff --git a/src/upnp/protocol/natpmp/nat_pmp.h b/src/upnp/protocol/natpmp/nat_pmp.h index fe2bf22956771f54db6a64f2621f9bc7346e9336..66b2ab58167b770e6515ff4b7f13a7809cfa7055 100644 --- a/src/upnp/protocol/natpmp/nat_pmp.h +++ b/src/upnp/protocol/natpmp/nat_pmp.h @@ -2,6 +2,7 @@ * Copyright (C) 2004-2021 Savoir-faire Linux Inc. * * Author: Eden Abitbol <eden.abitbol@savoirfairelinux.com> + * Author: Mohamed Chibani <mohamed.chibani@savoirfairelinux.com> * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -24,8 +25,8 @@ #include "config.h" #endif -#include "../upnp_protocol.h" -#include "../igd.h" +#include "upnp/protocol/upnp_protocol.h" +#include "upnp/protocol/igd.h" #include "pmp_igd.h" #include "logger.h" @@ -55,8 +56,8 @@ constexpr static unsigned int MAX_RESTART_SEARCH_RETRIES {5}; constexpr static auto TIMEOUT_BEFORE_READ_RETRY {std::chrono::milliseconds(300)}; // Max number of read attempts before failure. constexpr static unsigned int MAX_READ_RETRIES {5}; -// Time-out between two successive IGD search. -constexpr static auto TIMEOUT_BEFORE_IGD_SEARCH_RETRY {std::chrono::seconds(60)}; +// Base unit for the timeout between two successive IGD search. +constexpr static auto NATPMP_SEARCH_RETRY_UNIT {std::chrono::seconds(15)}; class NatPmp : public UPnPProtocol { @@ -80,58 +81,75 @@ public: void searchForIgd() override; // Get the IGD list. - void getIgdList(std::list<std::shared_ptr<IGD>>& igdList) const override; + std::list<std::shared_ptr<IGD>> getIgdList() const override; // Return true if it has at least one valid IGD. bool isReady() const override; - // Increment errors counter. - void incrementErrorsCounter(const std::shared_ptr<IGD>& igd) override; - // Request a new mapping. - void requestMappingAdd(const std::shared_ptr<IGD>& igd, const Mapping& mapping) override; + void requestMappingAdd(const Mapping& mapping) override; // Renew an allocated mapping. void requestMappingRenew(const Mapping& mapping) override; // Removes a mapping. - void requestMappingRemove(const Mapping& igdMapping) override; + void requestMappingRemove(const Mapping& mapping) override; + + // Get the host (local) address. + const IpAddr getHostAddress() const override; // Terminate. Nothing to do here, the clean-up is done when // the IGD is cleared. void terminate() override; private: + NON_COPYABLE(NatPmp); + + std::weak_ptr<NatPmp> weak() { return std::static_pointer_cast<NatPmp>(shared_from_this()); } + + // Helpers to run tasks on NAT-PMP internal execution queue. + ScheduledExecutor* getNatpmpScheduler() { return &natpmpScheduler_; } + template<typename Callback> + void runOnNatPmpQueue(Callback&& cb) + { + natpmpScheduler_.run([cb = std::forward<Callback>(cb)]() mutable { cb(); }); + } + + // Helpers to run tasks on UPNP context execution queue. + ScheduledExecutor* getUpnContextScheduler() { return UpnpThreadUtil::getScheduler(); } + void initNatPmp(); + void waitForShutdown(); void getIgdPublicAddress(); void removeAllMappings(); int readResponse(natpmp_t& handle, natpmpresp_t& response); int sendMappingRequest(const Mapping& mapping, uint32_t& lifetime); // Adds a port mapping. - void addPortMapping(const std::shared_ptr<IGD>& igd, Mapping& mapping, bool renew); + int addPortMapping(Mapping& mapping); // Removes a port mapping. void removePortMapping(Mapping& mapping); + // True if the error is fatal. bool isErrorFatal(int error); + // Gets NAT-PMP error code string. + const char* getNatPmpErrorStr(int errorCode) const; // Get local getaway. std::unique_ptr<IpAddr> getLocalGateway() const; - // Helpers to process user callbacks + // Helpers to process user's callbacks void processIgdUpdate(UpnpIgdEvent event); void processMappingAdded(const Mapping& map); + void processMappingRequestFailed(const Mapping& map); void processMappingRenewed(const Mapping& map); void processMappingRemoved(const Mapping& map); -private: - NON_COPYABLE(NatPmp); + // Check if the IGD has a local match + bool validIgdInstance(const std::shared_ptr<IGD>& igdIn); - // Gets NAT-PMP error code string. - const char* getNatPmpErrorStr(int errorCode) const; + // Increment errors counter. + void incrementErrorsCounter(const std::shared_ptr<IGD>& igd); - ScheduledExecutor* getNatpmpScheduler() { return &natpmpScheduler_; } - ScheduledExecutor* getUpnContextScheduler() { return UpnpThreadUtil::getScheduler(); } - bool validIgdInstance(const std::shared_ptr<IGD>& igdIn); std::atomic_bool initialized_ {false}; // Data members @@ -140,6 +158,20 @@ private: ScheduledExecutor natpmpScheduler_ {}; std::shared_ptr<Task> searchForIgdTimer_ {}; unsigned int igdSearchCounter_ {0}; + UpnpMappingObserver* observer_ {nullptr}; + IpAddr hostAddress_ {}; + + // Calls from other threads that does not need synchronous access are + // rescheduled on the NatPmp private queue. This will avoid the need to + // protect most of the data members of this class. + // For some internal members (such as the igd instance and the host + // address) that need to be synchronously accessed, are protected by + // this mutex. + mutable std::mutex natpmpMutex_; + + // Shutdown synchronization + std::condition_variable shutdownCv_ {}; + bool shutdownComplete_ {false}; }; } // namespace upnp diff --git a/src/upnp/protocol/natpmp/pmp_igd.cpp b/src/upnp/protocol/natpmp/pmp_igd.cpp index cfb6cd2bd3b8cf8afdc6e48ffaacd1d8fe14bcfe..5942da388bec255193d18755b590c44374a1606d 100644 --- a/src/upnp/protocol/natpmp/pmp_igd.cpp +++ b/src/upnp/protocol/natpmp/pmp_igd.cpp @@ -2,7 +2,8 @@ * Copyright (C) 2004-2021 Savoir-faire Linux Inc. * * Author: Stepan Salenikovich <stepan.salenikovich@savoirfairelinux.com> - * Author: Eden Abitbol <eden.abitbol@savoirfairelinux.com> + * Author: Eden Abitbol <eden.abitbol@savoirfairelinux.com> + * Author: Mohamed Chibani <mohamed.chibani@savoirfairelinux.com> * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -52,5 +53,11 @@ PMPIGD::operator==(PMPIGD& other) const return getPublicIp() == other.getPublicIp() and getLocalIp() == other.getLocalIp(); } +const std::string +PMPIGD::toString() const +{ + return getLocalIp().toString(); +} + } // namespace upnp } // namespace jami diff --git a/src/upnp/protocol/natpmp/pmp_igd.h b/src/upnp/protocol/natpmp/pmp_igd.h index d987979ea8711b2fb5a859697dfd8c86ac58169f..bf49b2480bfd63c67bd523a1fa4063de53c6cea9 100644 --- a/src/upnp/protocol/natpmp/pmp_igd.h +++ b/src/upnp/protocol/natpmp/pmp_igd.h @@ -2,7 +2,8 @@ * Copyright (C) 2004-2021 Savoir-faire Linux Inc. * * Author: Stepan Salenikovich <stepan.salenikovich@savoirfairelinux.com> - * Author: Eden Abitbol <eden.abitbol@savoirfairelinux.com> + * Author: Eden Abitbol <eden.abitbol@savoirfairelinux.com> + * Author: Mohamed Chibani <mohamed.chibani@savoirfairelinux.com> * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -49,6 +50,8 @@ public: bool operator==(IGD& other) const; bool operator==(PMPIGD& other) const; + + const std::string toString() const override; }; } // namespace upnp diff --git a/src/upnp/protocol/pupnp/pupnp.cpp b/src/upnp/protocol/pupnp/pupnp.cpp index 20aef2bede6dd725f73f1941c10f8ef8491c3f68..0d43d00a73368c0a3c7b303f68ad913111741862 100644 --- a/src/upnp/protocol/pupnp/pupnp.cpp +++ b/src/upnp/protocol/pupnp/pupnp.cpp @@ -4,6 +4,7 @@ * Author: Stepan Salenikovich <stepan.salenikovich@savoirfairelinux.com> * Author: Eden Abitbol <eden.abitbol@savoirfairelinux.com> * Author: Adrien Béraud <adrien.beraud@savoirfairelinux.com> + * Author: Mohamed Chibani <mohamed.chibani@savoirfairelinux.com> * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -98,8 +99,11 @@ errorOnResponse(IXML_Document* doc) PUPnP::PUPnP() { - JAMI_DBG("PUPnP: Instance [%p] created", this); - threadId_ = getCurrentThread(); + JAMI_DBG("PUPnP: Creating instance [%p] ...", this); + runOnPUPnPQueue([this] { + threadId_ = getCurrentThread(); + JAMI_DBG("PUPnP: Instance [%p] created", this); + }); } PUPnP::~PUPnP() @@ -151,6 +155,26 @@ PUPnP::initUpnpLib() initialized_ = true; } +void +PUPnP::waitForShutdown() +{ + std::unique_lock<std::mutex> lk(pupnpMutex_); + if (shutdownCv_.wait_for(lk, std::chrono::seconds(10), [this] { return shutdownComplete_; })) { + JAMI_DBG("PUPnP: Shutdown completed"); + } else { + JAMI_ERR("PUPnP: Shutdown timed-out"); + // Force stop if the shutdown take too much time. + shutdownComplete_ = true; + } +} + +bool +PUPnP::isRunning() const +{ + std::unique_lock<std::mutex> lk(pupnpMutex_); + return not shutdownComplete_; +} + void PUPnP::registerClient() { @@ -159,7 +183,6 @@ PUPnP::registerClient() CHECK_VALID_THREAD(); // Register Upnp control point. - std::unique_lock<std::mutex> lk(ctrlptMutex_); int upnp_err = UpnpRegisterClient(ctrlPtCallback, this, &ctrlptHandle_); if (upnp_err != UPNP_E_SUCCESS) { JAMI_ERR("PUPnP: Can't register client: %s", UpnpGetErrorMessage(upnp_err)); @@ -170,143 +193,117 @@ PUPnP::registerClient() } void -PUPnP::startPUPnP() +PUPnP::setObserver(UpnpMappingObserver* obs) { - JAMI_DBG("PUPnP: Starting PUPNP internal thread"); - - pupnpThread_ = std::thread([this] { - JAMI_DBG("PUPnP: Internal thread started"); - pupnpRun_ = true; - while (pupnpRun_) { - { - std::unique_lock<std::mutex> lk(igdListMutex_); - pupnpCv_.wait(lk, [this] { - return not pupnpRun_ or searchForIgd_ or not dwnldlXmlList_.empty(); - }); + if (not isValidThread()) { + runOnPUPnPQueue([w = weak(), obs] { + if (auto upnpThis = w.lock()) { + upnpThis->setObserver(obs); } + }); + return; + } - if (not pupnpRun_) - break; - - if (clientRegistered_) { - if (searchForIgd_.exchange(false)) { - searchForDevices(); - } - - std::unique_lock<std::mutex> lk(igdListMutex_); - if (not dwnldlXmlList_.empty()) { - auto xmlList = std::move(dwnldlXmlList_); - decltype(xmlList) finished {}; - - // Wait on futures asynchronously - lk.unlock(); - for (auto it = xmlList.begin(); it != xmlList.end();) { - if (it->wait_for(std::chrono::seconds(1)) == std::future_status::ready) { - finished.splice(finished.end(), xmlList, it++); - } else { - ++it; - } - } - lk.lock(); - - // Move back timed-out items to list - dwnldlXmlList_.splice(dwnldlXmlList_.begin(), xmlList); - // Handle successful downloads - for (auto& item : finished) { - auto result = item.get(); - if (not result->document or not validateIgd(*result)) { - discoveredIgdList_.erase(result->location); - } - } - } - for (auto it = cancelXmlList_.begin(); it != cancelXmlList_.end();) { - if (it->wait_for(std::chrono::seconds(0)) == std::future_status::ready) { - it = cancelXmlList_.erase(it); - } else { - ++it; - } - } - } - } + JAMI_DBG("PUPnP: Setting observer to %p", obs); - JAMI_DBG("PUPnP: Internal thread stopped"); - }); + observer_ = obs; } -void -PUPnP::setObserver(UpnpMappingObserver* obs) +const IpAddr +PUPnP::getHostAddress() const { - CHECK_VALID_THREAD(); - - JAMI_DBG("PUPnP: Setting observer to %p", obs); - - observer_ = obs; + std::lock_guard<std::mutex> lock(pupnpMutex_); + return hostAddress_; } void PUPnP::terminate() { - CHECK_VALID_THREAD(); + if (not isValidThread()) { + runOnPUPnPQueue([w = weak()] { + if (auto upnpThis = w.lock()) { + upnpThis->terminate(); + } + }); + waitForShutdown(); + return; + } JAMI_DBG("PUPnP: Terminate instance %p", this); clientRegistered_ = false; observer_ = nullptr; - { - std::lock_guard<std::mutex> lock(validIgdListMutex_); - std::lock_guard<std::mutex> lk(ctrlptMutex_); - - UpnpUnRegisterClient(ctrlptHandle_); - } + UpnpUnRegisterClient(ctrlptHandle_); if (UpnpFinish() != UPNP_E_SUCCESS) { JAMI_ERR("PUPnP: Failed to properly close lib-upnp"); } - pupnpRun_ = false; - // Notify thread to terminate. - pupnpCv_.notify_all(); - if (pupnpThread_.joinable()) - pupnpThread_.join(); - // Clear all the lists. + discoveredIgdList_.clear(); + { - std::lock_guard<std::mutex> lock(validIgdListMutex_); + std::lock_guard<std::mutex> lock(pupnpMutex_); validIgdList_.clear(); + shutdownComplete_ = true; } - { - std::lock_guard<std::mutex> lk2(igdListMutex_); - discoveredIgdList_.clear(); - dwnldlXmlList_.clear(); - cancelXmlList_.clear(); - } + shutdownCv_.notify_one(); } void PUPnP::searchForDevices() { - std::unique_lock<std::mutex> lk(ctrlptMutex_); + CHECK_VALID_THREAD(); JAMI_DBG("PUPnP: Send IGD search request"); // Send out search for multiple types of devices, as some routers may possibly // only reply to one. - UpnpSearchAsync(ctrlptHandle_, SEARCH_TIMEOUT, UPNP_ROOT_DEVICE, this); - UpnpSearchAsync(ctrlptHandle_, SEARCH_TIMEOUT, UPNP_IGD_DEVICE, this); - UpnpSearchAsync(ctrlptHandle_, SEARCH_TIMEOUT, UPNP_WANIP_SERVICE, this); - UpnpSearchAsync(ctrlptHandle_, SEARCH_TIMEOUT, UPNP_WANPPP_SERVICE, this); + + auto err = UpnpSearchAsync(ctrlptHandle_, SEARCH_TIMEOUT, UPNP_ROOT_DEVICE, this); + if (err != UPNP_E_SUCCESS) { + JAMI_WARN("PUPnP: Send search for UPNP_ROOT_DEVICE failed. Error %d: %s", + err, + UpnpGetErrorMessage(err)); + } + + err = UpnpSearchAsync(ctrlptHandle_, SEARCH_TIMEOUT, UPNP_IGD_DEVICE, this); + if (err != UPNP_E_SUCCESS) { + JAMI_WARN("PUPnP: Send search for UPNP_IGD_DEVICE failed. Error %d: %s", + err, + UpnpGetErrorMessage(err)); + } + + err = UpnpSearchAsync(ctrlptHandle_, SEARCH_TIMEOUT, UPNP_WANIP_SERVICE, this); + if (err != UPNP_E_SUCCESS) { + JAMI_WARN("PUPnP: Send search for UPNP_WANIP_SERVICE failed. Error %d: %s", + err, + UpnpGetErrorMessage(err)); + } + + err = UpnpSearchAsync(ctrlptHandle_, SEARCH_TIMEOUT, UPNP_WANPPP_SERVICE, this); + if (err != UPNP_E_SUCCESS) { + JAMI_WARN("PUPnP: Send search for UPNP_WANPPP_SERVICE failed. Error %d: %s", + err, + UpnpGetErrorMessage(err)); + } } void PUPnP::clearIgds() { - JAMI_DBG("PUPnP: clearing IGDs and devices lists"); - - CHECK_VALID_THREAD(); + if (not isValidThread()) { + runOnPUPnPQueue([w = weak()] { + if (auto upnpThis = w.lock()) { + upnpThis->clearIgds(); + } + }); + return; + } - hostAddress_ = {}; + JAMI_DBG("PUPnP: clearing IGDs and devices lists"); if (searchForIgdTimer_) searchForIgdTimer_->cancel(); @@ -314,29 +311,34 @@ PUPnP::clearIgds() igdSearchCounter_ = 0; { - std::lock_guard<std::mutex> lock(validIgdListMutex_); + std::lock_guard<std::mutex> lock(pupnpMutex_); + for (auto const& igd : validIgdList_) { + igd->setValid(false); + } validIgdList_.clear(); + hostAddress_ = {}; } - { - std::lock_guard<std::mutex> lk(igdListMutex_); - - // Clear all internal lists. - cancelXmlList_.splice(cancelXmlList_.end(), dwnldlXmlList_); - discoveredIgdList_.clear(); - } + discoveredIgdList_.clear(); } void PUPnP::searchForIgd() { - CHECK_VALID_THREAD(); + if (not isValidThread()) { + runOnPUPnPQueue([w = weak()] { + if (auto upnpThis = w.lock()) { + upnpThis->searchForIgd(); + } + }); + return; + } // Update local address before searching. - hostAddress_ = ip_utils::getLocalAddr(pj_AF_INET()); + updateHostAddress(); if (isReady()) { - JAMI_WARN("PUPnP: Already have a valid IGD. Skipping the search request"); + JAMI_DBG("PUPnP: Already have a valid IGD. Skip the search request"); return; } @@ -348,14 +350,10 @@ PUPnP::searchForIgd() JAMI_DBG("PUPnP: Start search for IGD: attempt %u", igdSearchCounter_); - if (not pupnpRun_) { - startPUPnP(); - } - // Do not init if the host is not valid. Otherwise, the init will fail // anyway and may put libupnp in an unstable state (mainly deadlocks) // even if the UpnpFinish() method is called. - if (not hostAddress_ or hostAddress_.isLoopback()) { + if (not hasValidHostAddress()) { JAMI_WARN("PUPnP: Host address is invalid. Skipping the IGD search"); } else { // Init and register if needed @@ -365,19 +363,19 @@ PUPnP::searchForIgd() if (initialized_ and not clientRegistered_) { registerClient(); } - } - - // Start search - if (clientRegistered_ and pupnpRun_) { - assert(initialized_); - JAMI_DBG("PUPnP: Start search for IGD"); - searchForIgd_ = true; - pupnpCv_.notify_one(); - } else { - JAMI_WARN("PUPnP: PUPNP not fully setup. Skipping the IGD search"); + // Start searching + if (clientRegistered_) { + assert(initialized_); + searchForDevices(); + } else { + JAMI_WARN("PUPnP: PUPNP not fully setup. Skipping the IGD search"); + } } // Cancel the current timer (if any) and re-schedule. + // The connectivity change may be received while the the local + // interface is not fully setup. The rescheduling typically + // usefull to mitigate this race. if (searchForIgdTimer_) searchForIgdTimer_->cancel(); @@ -386,19 +384,21 @@ PUPnP::searchForIgd() if (auto upnpThis = w.lock()) upnpThis->searchForIgd(); }, - PUPNP_TIMEOUT_BEFORE_IGD_SEARCH_RETRY); + PUPNP_SEARCH_RETRY_UNIT * igdSearchCounter_); } -void -PUPnP::getIgdList(std::list<std::shared_ptr<IGD>>& igdList) const +std::list<std::shared_ptr<IGD>> +PUPnP::getIgdList() const { - std::lock_guard<std::mutex> lock(validIgdListMutex_); + std::lock_guard<std::mutex> lock(pupnpMutex_); + std::list<std::shared_ptr<IGD>> igdList; for (auto& it : validIgdList_) { // Return only active IGDs. if (it->isValid()) { igdList.emplace_back(it); } } + return igdList; } bool @@ -414,6 +414,7 @@ PUPnP::isReady() const bool PUPnP::hasValidIgd() const { + std::lock_guard<std::mutex> lock(pupnpMutex_); for (auto& it : validIgdList_) { if (it->isValid()) { return true; @@ -422,21 +423,24 @@ PUPnP::hasValidIgd() const return false; } -bool -PUPnP::updateAndCheckHostAddress() +void +PUPnP::updateHostAddress() { - if (hostAddress_ and not hostAddress_.isLoopback()) - return true; - - hostAddress_ = ip_utils::getLocalAddr(pj_AF_INET()); + std::lock_guard<std::mutex> lock(pupnpMutex_); + hostAddress_ = ip_utils::getLocalAddr(AF_INET); +} +bool +PUPnP::hasValidHostAddress() +{ + std::lock_guard<std::mutex> lock(pupnpMutex_); return hostAddress_ and not hostAddress_.isLoopback(); } void PUPnP::incrementErrorsCounter(const std::shared_ptr<IGD>& igd) { - if (not igd->isValid()) + if (not igd or not igd->isValid()) return; if (not igd->incrementErrorsCounter()) { // Disable this IGD. @@ -448,9 +452,14 @@ PUPnP::incrementErrorsCounter(const std::shared_ptr<IGD>& igd) } bool -PUPnP::validateIgd(const IGDInfo& info) +PUPnP::validateIgd(const std::string& location, IXML_Document* doc_container_ptr) { - auto descDoc = info.document.get(); + CHECK_VALID_THREAD(); + + assert(doc_container_ptr != nullptr); + + XMLDocument document(doc_container_ptr, ixmlDocument_free); + auto descDoc = document.get(); // Check device type. std::string deviceType = getFirstDocItem(descDoc, "deviceType"); if (deviceType.empty()) { @@ -463,7 +472,7 @@ PUPnP::validateIgd(const IGDInfo& info) return false; } - std::shared_ptr<UPnPIGD> igd_candidate = parseIgd(descDoc, info.location); + std::shared_ptr<UPnPIGD> igd_candidate = parseIgd(descDoc, location); if (not igd_candidate) { // No valid IGD candidate. return false; @@ -523,7 +532,7 @@ PUPnP::validateIgd(const IGDInfo& info) { // Add the IGD if not already present in the list. - std::lock_guard<std::mutex> lock(validIgdListMutex_); + std::lock_guard<std::mutex> lock(pupnpMutex_); for (auto& igd : validIgdList_) { // Must not be a null pointer assert(igd.get() != nullptr); @@ -531,7 +540,7 @@ PUPnP::validateIgd(const IGDInfo& info) JAMI_DBG("PUPnP: Device [%s] with int/ext addresses [%s:%s] is already in the list " "of valid IGDs", igd_candidate->getUID().c_str(), - igd_candidate->getLocalIp().toString().c_str(), + igd_candidate->toString().c_str(), igd_candidate->getPublicIp().toString().c_str()); return true; } @@ -545,22 +554,9 @@ PUPnP::validateIgd(const IGDInfo& info) igd_candidate->getUID().c_str()); JAMI_DBG("PUPnP: New IGD addresses [int: %s - ext: %s]", - igd_candidate->getLocalIp().toString().c_str(), + igd_candidate->toString().c_str(), igd_candidate->getPublicIp().toString().c_str()); - { - // This is a new IGD, move it the list. - std::lock_guard<std::mutex> lock(validIgdListMutex_); - validIgdList_.emplace_back(igd_candidate); - } - - // Clear mappings from previous instances. - deleteMappingsByDescription(igd_candidate, Mapping::UPNP_MAPPING_DESCRIPTION_PREFIX); - - // Report to the listener. - if (observer_) - observer_->onIgdUpdated(igd_candidate, UpnpIgdEvent::ADDED); - // Subscribe to IGD events. int upnp_err = UpnpSubscribeAsync(ctrlptHandle_, eventSub.c_str(), @@ -572,180 +568,137 @@ PUPnP::validateIgd(const IGDInfo& info) igd_candidate->getUID().c_str(), upnp_err, UpnpGetErrorMessage(upnp_err)); + // return false; } else { - JAMI_DBG("PUPnP: Successfully sent subscribe request to %s", - igd_candidate->getUID().c_str()); + JAMI_DBG("PUPnP: Successfully subscribed to IGD %s", igd_candidate->getUID().c_str()); } + { + // This is a new (and hopefully valid) IGD. + std::lock_guard<std::mutex> lock(pupnpMutex_); + validIgdList_.emplace_back(igd_candidate); + } + + // Report to the listener. + runOnUpnpContextQueue([w = weak(), igd_candidate] { + if (auto upnpThis = w.lock()) { + if (upnpThis->observer_) + upnpThis->observer_->onIgdUpdated(igd_candidate, UpnpIgdEvent::ADDED); + } + }); + return true; } void -PUPnP::requestMappingAdd(const std::shared_ptr<IGD>& igd, const Mapping& mapping) +PUPnP::requestMappingAdd(const Mapping& mapping) { - if (auto pupnp_igd = std::dynamic_pointer_cast<UPnPIGD>(igd)) { - dht::ThreadPool::io().run([w = weak(), pupnp_igd, mapping] { - if (auto upnpThis = w.lock()) - upnpThis->actionAddPortMapping(pupnp_igd, mapping); - }); - } + runOnPUPnPQueue([w = weak(), mapping] { + if (auto upnpThis = w.lock()) { + if (not upnpThis->isRunning()) + return; + Mapping mapRes(mapping); + if (upnpThis->actionAddPortMapping(mapRes)) { + mapRes.setState(MappingState::OPEN); + mapRes.setInternalAddress(upnpThis->getHostAddress().toString()); + upnpThis->processAddMapAction(mapRes); + } else { + upnpThis->incrementErrorsCounter(mapRes.getIgd()); + mapRes.setState(MappingState::FAILED); + upnpThis->processRequestMappingFailure(mapRes); + } + } + }); } -bool -PUPnP::actionAddPortMappingAsync(const std::shared_ptr<UPnPIGD>& igd, const Mapping& mapping) +void +PUPnP::requestMappingRemove(const Mapping& mapping) { - if (not clientRegistered_) { - return false; - } - XMLDocument action(nullptr, ixmlDocument_free); // Action pointer. - IXML_Document* action_container_ptr = nullptr; - // Set action sequence. - UpnpAddToAction(&action_container_ptr, - ACTION_ADD_PORT_MAPPING, - igd->getServiceType().c_str(), - "NewRemoteHost", - ""); - UpnpAddToAction(&action_container_ptr, - ACTION_ADD_PORT_MAPPING, - igd->getServiceType().c_str(), - "NewExternalPort", - mapping.getExternalPortStr().c_str()); - UpnpAddToAction(&action_container_ptr, - ACTION_ADD_PORT_MAPPING, - igd->getServiceType().c_str(), - "NewProtocol", - mapping.getTypeStr()); - UpnpAddToAction(&action_container_ptr, - ACTION_ADD_PORT_MAPPING, - igd->getServiceType().c_str(), - "NewInternalPort", - mapping.getInternalPortStr().c_str()); - UpnpAddToAction(&action_container_ptr, - ACTION_ADD_PORT_MAPPING, - igd->getServiceType().c_str(), - "NewInternalClient", - getHostAddress().toString().c_str()); - UpnpAddToAction(&action_container_ptr, - ACTION_ADD_PORT_MAPPING, - igd->getServiceType().c_str(), - "NewEnabled", - "1"); - UpnpAddToAction(&action_container_ptr, - ACTION_ADD_PORT_MAPPING, - igd->getServiceType().c_str(), - "NewPortMappingDescription", - mapping.toString().c_str()); - UpnpAddToAction(&action_container_ptr, - ACTION_ADD_PORT_MAPPING, - igd->getServiceType().c_str(), - "NewLeaseDuration", - "0"); - action.reset(action_container_ptr); + // Send remove request using the matching IGD + runOnPUPnPQueue([w = weak(), mapping] { + if (auto upnpThis = w.lock()) { + // Abort if we are shutting down. + if (not upnpThis->isRunning()) + return; + if (upnpThis->actionDeletePortMapping(mapping)) { + upnpThis->processRemoveMapAction(mapping); + } else { + assert(mapping.getIgd()); + // Dont need to report in case of failure. + upnpThis->incrementErrorsCounter(mapping.getIgd()); + } + } + }); +} - int upnp_err = UpnpSendActionAsync(ctrlptHandle_, - igd->getControlURL().c_str(), - igd->getServiceType().c_str(), - nullptr, - action.get(), - ctrlPtCallback, - this); - if (upnp_err != UPNP_E_SUCCESS) { - JAMI_WARN("PUPnP: Failed to send async action %s from: %s, %d: %s", - ACTION_ADD_PORT_MAPPING, - igd->getServiceType().c_str(), - upnp_err, - UpnpGetErrorMessage(upnp_err)); - return false; +std::shared_ptr<UPnPIGD> +PUPnP::findMatchingIgd(const std::string& ctrlURL) const +{ + std::lock_guard<std::mutex> lock(pupnpMutex_); + + auto iter = std::find_if(validIgdList_.begin(), + validIgdList_.end(), + [&ctrlURL](const std::shared_ptr<IGD>& igd) { + if (auto upnpIgd = std::dynamic_pointer_cast<UPnPIGD>(igd)) { + return upnpIgd->getControlURL() == ctrlURL; + } + return false; + }); + + if (iter == validIgdList_.end()) { + JAMI_WARN("PUPnP: Did not find the IGD matching ctrl URL [%s]", ctrlURL.c_str()); + return {}; } - JAMI_DBG("PUPnP: Sent request to open port %s", mapping.toString().c_str()); - - return true; + return std::dynamic_pointer_cast<UPnPIGD>(*iter); } void -PUPnP::processAddMapAction(const std::string& ctrlURL, - uint16_t ePort, - uint16_t iPort, - PortType portType) +PUPnP::processAddMapAction(const Mapping& map) { - Mapping mapToAdd(portType, ePort, iPort); + CHECK_VALID_THREAD(); - { - std::lock_guard<std::mutex> lock(validIgdListMutex_); - for (auto const& it : validIgdList_) { - if (auto igd = std::dynamic_pointer_cast<UPnPIGD>(it)) { - if (igd->getControlURL() == ctrlURL) { - mapToAdd.setInternalAddress(getHostAddress().toString()); - mapToAdd.setIgd(igd); - break; - } - } - } - } + if (observer_ == nullptr) + return; - if (mapToAdd.getIgd()) { - JAMI_DBG("PUPnP: Opened port %s", mapToAdd.toString().c_str()); - if (observer_) - observer_->onMappingAdded(mapToAdd.getIgd(), std::move(mapToAdd)); - } else { - JAMI_WARN("PUPnP: Did not find matching ctrl URL [%s] for %s", - ctrlURL.c_str(), - mapToAdd.toString().c_str()); - } + runOnUpnpContextQueue([w = weak(), map] { + if (auto upnpThis = w.lock()) { + JAMI_DBG("PUPnP: Opened mapping %s", map.toString().c_str()); + if (upnpThis->observer_) + upnpThis->observer_->onMappingAdded(map.getIgd(), std::move(map)); + } + }); } void -PUPnP::processRemoveMapAction(const std::string& ctrlURL, - uint16_t ePort, - uint16_t iPort, - PortType portType) +PUPnP::processRequestMappingFailure(const Mapping& map) { - Mapping mapToRemove(portType, ePort, iPort); + CHECK_VALID_THREAD(); - { - std::lock_guard<std::mutex> lock(validIgdListMutex_); - for (auto const& it : validIgdList_) { - if (auto igd = std::dynamic_pointer_cast<UPnPIGD>(it)) { - if (igd->getControlURL() == ctrlURL) { - mapToRemove.setIgd(igd); - break; - } - } - } - } + if (observer_ == nullptr) + return; - if (mapToRemove.getIgd()) { - JAMI_DBG("PUPnP: Closed port %s", mapToRemove.toString().c_str()); - if (observer_) - observer_->onMappingRemoved(mapToRemove.getIgd(), std::move(mapToRemove)); - } else { - JAMI_WARN("PUPnP: Did not find matching ctrl URL [%s] for %s", - ctrlURL.c_str(), - mapToRemove.toString().c_str()); - } + runOnUpnpContextQueue([w = weak(), map] { + if (auto upnpThis = w.lock()) { + JAMI_DBG("PUPnP: Failed to request mapping %s", map.toString().c_str()); + if (upnpThis->observer_) + upnpThis->observer_->onMappingRequestFailed(map); + } + }); } void -PUPnP::requestMappingRemove(const Mapping& mapping) +PUPnP::processRemoveMapAction(const Mapping& map) { - auto igd = std::dynamic_pointer_cast<UPnPIGD>(mapping.getIgd()); + CHECK_VALID_THREAD(); - if (not igd) + if (observer_ == nullptr) return; - std::lock_guard<std::mutex> lock(validIgdListMutex_); - for (auto const& it : validIgdList_) { - if (std::dynamic_pointer_cast<UPnPIGD>(it) == igd) { - // Send remove request using the matching IGD - dht::ThreadPool::io().run([w = weak(), igd, mapping] { - if (auto upnpThis = w.lock()) { - upnpThis->actionDeletePortMapping(igd, mapping); - } - }); - break; - } - } + runOnUpnpContextQueue([map, obs = observer_] { + JAMI_DBG("PUPnP: Closed mapping %s", map.toString().c_str()); + obs->onMappingRemoved(map.getIgd(), std::move(map)); + }); } const char* @@ -833,23 +786,29 @@ PUPnP::processDiscoverySearchResult(const std::string& cpDeviceId, const std::string& igdLocationUrl, const IpAddr& dstAddr) { - // Must first have a valid local address. - if (not updateAndCheckHostAddress()) { + CHECK_VALID_THREAD(); + + // Update host address if needed. + if (not hasValidHostAddress()) + updateHostAddress(); + + // The host address must be valid to proceed. + if (not hasValidHostAddress()) { JAMI_WARN("PUPnP: Local address is invalid. Ignore search result for now!"); return; } dht::http::Url url(igdLocationUrl); - std::lock_guard<std::mutex> lk(igdListMutex_); - // Use the device ID and the URL as ID. This is necessary as some // IGDs may have the same device ID but different URLs. auto igdId = cpDeviceId + " url: " + igdLocationUrl; - if (not discoveredIgdList_.emplace(igdId).second) + if (not discoveredIgdList_.emplace(igdId).second) { + // JAMI_WARN("PUPnP: IGD [%s] already in the list", igdId.c_str()); return; + } JAMI_DBG("PUPnP: Discovered a new IGD [%s]", igdId.c_str()); @@ -866,48 +825,52 @@ PUPnP::processDiscoverySearchResult(const std::string& cpDeviceId, return; } - dwnldlXmlList_.emplace_back(dht::ThreadPool::io().get<pIGDInfo>([this, - location = std::move( - igdLocationUrl)] { - IXML_Document* doc_container_ptr = nullptr; - XMLDocument doc_desc(nullptr, ixmlDocument_free); - int upnp_err = UpnpDownloadXmlDoc(location.c_str(), &doc_container_ptr); - doc_desc.reset(doc_container_ptr); + // Run a separate thread to prevent blocking this thread + // if the IGD HTTP server is not responsive. + dht::ThreadPool::io().run([w = weak(), igdLocationUrl] { + if (auto upnpThis = w.lock()) { + upnpThis->downLoadIgdDescription(igdLocationUrl); + } + }); +} - pupnpCv_.notify_all(); +void +PUPnP::downLoadIgdDescription(const std::string& locationUrl) +{ + IXML_Document* doc_container_ptr = nullptr; + int upnp_err = UpnpDownloadXmlDoc(locationUrl.c_str(), &doc_container_ptr); - if (upnp_err != UPNP_E_SUCCESS or not doc_desc) { - JAMI_WARN("PUPnP: Error downloading device XML document from %s -> %s", - location.c_str(), - UpnpGetErrorMessage(upnp_err)); - return std::make_unique<IGDInfo>( - IGDInfo {std::move(location), XMLDocument(nullptr, ixmlDocument_free)}); - } else { - JAMI_DBG("PUPnP: Succeeded to download device XML document from %s", location.c_str()); - return std::make_unique<IGDInfo>(IGDInfo {std::move(location), std::move(doc_desc)}); - } - })); + if (upnp_err != UPNP_E_SUCCESS or not doc_container_ptr) { + JAMI_WARN("PUPnP: Error downloading device XML document from %s -> %s", + locationUrl.c_str(), + UpnpGetErrorMessage(upnp_err)); + } else { + JAMI_DBG("PUPnP: Succeeded to download device XML document from %s", locationUrl.c_str()); + runOnPUPnPQueue([w = weak(), url = locationUrl, doc_container_ptr] { + if (auto upnpThis = w.lock()) { + upnpThis->validateIgd(url, doc_container_ptr); + } + }); + } } void PUPnP::processDiscoveryAdvertisementByebye(const std::string& cpDeviceId) { - // Remove device Id from list. - { - std::lock_guard<std::mutex> lk(igdListMutex_); - discoveredIgdList_.erase(cpDeviceId); - } + CHECK_VALID_THREAD(); + + discoveredIgdList_.erase(cpDeviceId); std::shared_ptr<IGD> igd; { - std::lock_guard<std::mutex> lk(validIgdListMutex_); + std::lock_guard<std::mutex> lk(pupnpMutex_); for (auto it = validIgdList_.begin(); it != validIgdList_.end();) { if ((*it)->getUID() == cpDeviceId) { igd = *it; JAMI_DBG("PUPnP: Received [%s] for IGD [%s] %s. Will be removed.", PUPnP::eventTypeToString(UPNP_DISCOVERY_ADVERTISEMENT_BYEBYE), igd->getUID().c_str(), - igd->getLocalIp().toString().c_str()); + igd->toString().c_str()); igd->setValid(false); // Remove the IGD. it = validIgdList_.erase(it); @@ -927,15 +890,16 @@ PUPnP::processDiscoveryAdvertisementByebye(const std::string& cpDeviceId) void PUPnP::processDiscoverySubscriptionExpired(Upnp_EventType event_type, const std::string& eventSubUrl) { - std::lock_guard<std::mutex> lk(validIgdListMutex_); + CHECK_VALID_THREAD(); + + std::lock_guard<std::mutex> lk(pupnpMutex_); for (auto& it : validIgdList_) { if (auto igd = std::dynamic_pointer_cast<UPnPIGD>(it)) { if (igd->getEventSubURL() == eventSubUrl) { JAMI_DBG("PUPnP: Received [%s] event for IGD [%s] %s. Request a new subscribe.", PUPnP::eventTypeToString(event_type), igd->getUID().c_str(), - igd->getLocalIp().toString().c_str()); - std::lock_guard<std::mutex> lk1(ctrlptMutex_); + igd->toString().c_str()); UpnpSubscribeAsync(ctrlptHandle_, eventSubUrl.c_str(), UPNP_INFINITE, @@ -969,10 +933,10 @@ PUPnP::handleCtrlPtUPnPEvents(Upnp_EventType event_type, const void* event) std::string deviceId {UpnpDiscovery_get_DeviceID_cstr(d_event)}; std::string location {UpnpDiscovery_get_Location_cstr(d_event)}; IpAddr dstAddr(*(const pj_sockaddr*) (UpnpDiscovery_get_DestAddr(d_event))); - runOnUpnpContextThread([w = weak(), - deviceId = std::move(deviceId), - location = std::move(location), - dstAddr = std::move(dstAddr)] { + runOnPUPnPQueue([w = weak(), + deviceId = std::move(deviceId), + location = std::move(location), + dstAddr = std::move(dstAddr)] { if (auto upnpThis = w.lock()) { upnpThis->processDiscoverySearchResult(deviceId, location, dstAddr); } @@ -985,7 +949,7 @@ PUPnP::handleCtrlPtUPnPEvents(Upnp_EventType event_type, const void* event) std::string deviceId(UpnpDiscovery_get_DeviceID_cstr(d_event)); // Process the response on the main thread. - runOnUpnpContextThread([w = weak(), deviceId = std::move(deviceId)] { + runOnPUPnPQueue([w = weak(), deviceId = std::move(deviceId)] { if (auto upnpThis = w.lock()) { upnpThis->processDiscoveryAdvertisementByebye(deviceId); } @@ -1016,7 +980,7 @@ PUPnP::handleCtrlPtUPnPEvents(Upnp_EventType event_type, const void* event) std::string publisherUrl(UpnpEventSubscribe_get_PublisherUrl_cstr(es_event)); // Process the response on the main thread. - runOnUpnpContextThread([w = weak(), event_type, publisherUrl = std::move(publisherUrl)] { + runOnPUPnPQueue([w = weak(), event_type, publisherUrl = std::move(publisherUrl)] { if (auto upnpThis = w.lock()) { upnpThis->processDiscoverySubscriptionExpired(event_type, publisherUrl); } @@ -1057,52 +1021,6 @@ PUPnP::handleCtrlPtUPnPEvents(Upnp_EventType event_type, const void* event) } else { JAMI_WARN("PUPnP: Action Result document not found"); } - - auto upnpString = UpnpActionComplete_get_CtrlUrl(a_event); - - std::string ctrlUrl {UpnpString_get_String(upnpString), - UpnpString_get_Length(upnpString)}; - - char* xmlbuff = ixmlPrintNode((IXML_Node*) actionRequest); - if (xmlbuff != nullptr) { - auto ctrlAction = getAction(xmlbuff); - ixmlFreeDOMString(xmlbuff); - - // Parse the response. - std::string ePortStr(getFirstDocItem(actionRequest, "NewExternalPort")); - std::string iPortStr(getFirstDocItem(actionRequest, "NewInternalPort")); - std::string portTypeStr(getFirstDocItem(actionRequest, "NewProtocol")); - - ixmlDocument_free(actionRequest); - - uint16_t ePort = ePortStr.empty() ? 0 : std::stoi(ePortStr); - uint16_t iPort = iPortStr.empty() ? 0 : std::stoi(iPortStr); - PortType portType = portTypeStr == "UDP" ? upnp::PortType::UDP - : upnp::PortType::TCP; - - // Process the response on the main thread. - runOnUpnpContextThread([w = weak(), - ctrlAction = std::move(ctrlAction), - ctrlUrl = std::move(ctrlUrl), - ePort, - iPort, - portType] { - auto upnpThis = w.lock(); - if (not upnpThis) - return; - switch (ctrlAction) { - case CtrlAction::ADD_PORT_MAPPING: - upnpThis->processAddMapAction(ctrlUrl, ePort, iPort, portType); - break; - case CtrlAction::DELETE_PORT_MAPPING: - upnpThis->processRemoveMapAction(ctrlUrl, ePort, iPort, portType); - break; - default: - // All other control actions are ignored. - break; - } - }); - } } break; } @@ -1125,7 +1043,7 @@ PUPnP::subEventCallback(Upnp_EventType event_type, const void* event, void* user } int -PUPnP::handleSubscriptionUPnPEvent(Upnp_EventType event_type, const void* event) +PUPnP::handleSubscriptionUPnPEvent(Upnp_EventType, const void* event) { UpnpEventSubscribe* es_event = static_cast<UpnpEventSubscribe*>(const_cast<void*>(event)); @@ -1157,7 +1075,7 @@ PUPnP::parseIgd(IXML_Document* doc, std::string locationUrl) JAMI_WARN("PUPnP: could not find UDN in description document of device"); return nullptr; } else { - std::lock_guard<std::mutex> lk(validIgdListMutex_); + std::lock_guard<std::mutex> lk(pupnpMutex_); for (auto& it : validIgdList_) { if (it->getUID() == UDN) { // We already have this device in our list. @@ -1380,7 +1298,7 @@ PUPnP::getMappingsListByDescr(const std::shared_ptr<IGD>& igd, const std::string std::map<Mapping::key_t, Mapping> mapList; - if (not(clientRegistered_ and upnpIgd->getLocalIp())) + if (not clientRegistered_ or not upnpIgd->isValid() or not upnpIgd->getLocalIp()) return mapList; // Set action name. @@ -1477,7 +1395,7 @@ PUPnP::getMappingsListByDescr(const std::shared_ptr<IGD>& igd, const std::string JAMI_DBG("PUPnP: Found %lu allocated mappings on IGD %s", mapList.size(), - upnpIgd->getLocalIp().toString().c_str()); + upnpIgd->toString().c_str()); return mapList; } @@ -1489,7 +1407,7 @@ PUPnP::deleteMappingsByDescription(const std::shared_ptr<IGD>& igd, const std::s return; JAMI_DBG("PUPnP: Remove all mappings (if any) on IGD %s matching descr prefix %s", - igd->getLocalIp().toString().c_str(), + igd->toString().c_str(), Mapping::UPNP_MAPPING_DESCRIPTION_PREFIX); auto mapList = getMappingsListByDescr(igd, description); @@ -1500,11 +1418,23 @@ PUPnP::deleteMappingsByDescription(const std::shared_ptr<IGD>& igd, const std::s } bool -PUPnP::actionAddPortMapping(const std::shared_ptr<UPnPIGD>& igd, const Mapping& mapping) +PUPnP::actionAddPortMapping(const Mapping& mapping) { + CHECK_VALID_THREAD(); + if (not clientRegistered_) return false; + auto igdIn = std::dynamic_pointer_cast<UPnPIGD>(mapping.getIgd()); + if (not igdIn) + return false; + + // The requested IGD must be present in the list of local valid IGDs. + auto igd = findMatchingIgd(igdIn->getControlURL()); + + if (not igd or not igd->isValid()) + return false; + // Action and response pointers. XMLDocument action(nullptr, ixmlDocument_free); IXML_Document* action_container_ptr = nullptr; @@ -1563,99 +1493,57 @@ PUPnP::actionAddPortMapping(const std::shared_ptr<UPnPIGD>& igd, const Mapping& &response_container_ptr); response.reset(response_container_ptr); + JAMI_DBG("PUPnP: Sent request to open port %s", mapping.toString().c_str()); + + bool success = true; + if (upnp_err != UPNP_E_SUCCESS) { - JAMI_WARN("PUPnP: Failed to send action %s from: %s, %d: %s", + JAMI_WARN("PUPnP: Failed to send action %s for mapping %s. %d: %s", ACTION_ADD_PORT_MAPPING, - igd->getServiceType().c_str(), + mapping.toString().c_str(), upnp_err, UpnpGetErrorMessage(upnp_err)); - return false; - } + JAMI_WARN("PUPnP: IGD ctrlUrl %s", igd->getControlURL().c_str()); + JAMI_WARN("PUPnP: IGD service type %s", igd->getServiceType().c_str()); - if (not response) { - JAMI_WARN("PUPnP: Failed to get response from %s", ACTION_ADD_PORT_MAPPING); - return false; + success = false; } - // Check if there is an error code. + // Check if an error has occurred. std::string errorCode = getFirstDocItem(response.get(), "errorCode"); if (not errorCode.empty()) { - std::string errorDescription = getFirstDocItem(response.get(), "errorDescription"); - JAMI_WARN("PUPnP: %s returned with error: %s: %s", + success = false; + // Try to get the error description. + std::string errorDescription; + if (response) { + errorDescription = getFirstDocItem(response.get(), "errorDescription"); + } + + JAMI_WARN("PUPnP: %s returned with error: %s %s", ACTION_ADD_PORT_MAPPING, errorCode.c_str(), errorDescription.c_str()); - return false; } - - JAMI_DBG("PUPnP: Sent request to open port %s", mapping.toString().c_str()); - - // Process the response on the main thread. - runOnUpnpContextThread([w = weak(), mapping, igd] { - auto upnpThis = w.lock(); - if (not upnpThis) - return; - upnpThis->processAddMapAction(igd->getControlURL(), - mapping.getExternalPort(), - mapping.getInternalPort(), - mapping.getType()); - }); - - return true; + return success; } bool -PUPnP::actionDeletePortMappingAsync(const UPnPIGD& igd, - const std::string& port_external, - const std::string& protocol) +PUPnP::actionDeletePortMapping(const Mapping& mapping) { - if (not clientRegistered_) { + CHECK_VALID_THREAD(); + + if (not clientRegistered_) return false; - } - XMLDocument action(nullptr, ixmlDocument_free); // Action pointer. - IXML_Document* action_container_ptr = nullptr; - // Set action sequence. - UpnpAddToAction(&action_container_ptr, - ACTION_DELETE_PORT_MAPPING, - igd.getServiceType().c_str(), - "NewRemoteHost", - ""); - UpnpAddToAction(&action_container_ptr, - ACTION_DELETE_PORT_MAPPING, - igd.getServiceType().c_str(), - "NewExternalPort", - port_external.c_str()); - UpnpAddToAction(&action_container_ptr, - ACTION_DELETE_PORT_MAPPING, - igd.getServiceType().c_str(), - "NewProtocol", - protocol.c_str()); - action.reset(action_container_ptr); - int upnp_err = UpnpSendActionAsync(ctrlptHandle_, - igd.getControlURL().c_str(), - igd.getServiceType().c_str(), - nullptr, - action.get(), - ctrlPtCallback, - this); - if (upnp_err != UPNP_E_SUCCESS) { - JAMI_WARN("PUPnP: Failed to send async action %s from: %s, %d: %s", - ACTION_DELETE_PORT_MAPPING, - igd.getServiceType().c_str(), - upnp_err, - UpnpGetErrorMessage(upnp_err)); + + auto igdIn = std::dynamic_pointer_cast<UPnPIGD>(mapping.getIgd()); + if (not igdIn) return false; - } - JAMI_DBG("PUPnP: Sent request to close port %s %s", port_external.c_str(), protocol.c_str()); - return true; -} -bool -PUPnP::actionDeletePortMapping(const std::shared_ptr<UPnPIGD>& igd, const Mapping& mapping) -{ - if (not clientRegistered_) { + // The requested IGD must be present in the list of local valid IGDs. + auto igd = findMatchingIgd(igdIn->getControlURL()); + + if (not igd or not igd->isValid()) return false; - } // Action and response pointers. XMLDocument action(nullptr, ixmlDocument_free); @@ -1690,18 +1578,23 @@ PUPnP::actionDeletePortMapping(const std::shared_ptr<UPnPIGD>& igd, const Mappin &response_container_ptr); response.reset(response_container_ptr); + bool success = true; + if (upnp_err != UPNP_E_SUCCESS) { - JAMI_WARN("PUPnP: Failed to send action %s from: %s, %d: %s", + JAMI_WARN("PUPnP: Failed to send action %s for mapping from %s. %d: %s", ACTION_DELETE_PORT_MAPPING, - igd->getServiceType().c_str(), + mapping.toString().c_str(), upnp_err, UpnpGetErrorMessage(upnp_err)); - return false; + JAMI_WARN("PUPnP: IGD ctrlUrl %s", igd->getControlURL().c_str()); + JAMI_WARN("PUPnP: IGD service type %s", igd->getServiceType().c_str()); + + success = false; } if (not response) { - JAMI_WARN("PUPnP: Failed to get response from %s", ACTION_DELETE_PORT_MAPPING); - return false; + JAMI_WARN("PUPnP: Failed to get response for %s", ACTION_DELETE_PORT_MAPPING); + success = false; } // Check if there is an error code. @@ -1712,23 +1605,12 @@ PUPnP::actionDeletePortMapping(const std::shared_ptr<UPnPIGD>& igd, const Mappin ACTION_DELETE_PORT_MAPPING, errorCode.c_str(), errorDescription.c_str()); - return false; + success = false; } JAMI_DBG("PUPnP: Sent request to close port %s", mapping.toString().c_str()); - // Process the response on the main thread. - runOnUpnpContextThread([w = weak(), mapping, igd] { - auto upnpThis = w.lock(); - if (not upnpThis) - return; - upnpThis->processRemoveMapAction(igd->getControlURL(), - mapping.getExternalPort(), - mapping.getInternalPort(), - mapping.getType()); - }); - - return true; + return success; } } // namespace upnp diff --git a/src/upnp/protocol/pupnp/pupnp.h b/src/upnp/protocol/pupnp/pupnp.h index e490f5ffd6fdd19662f1773d6b2485c427f5e62a..a5aa6940bb462197ae48f459b2713f186cc66477 100644 --- a/src/upnp/protocol/pupnp/pupnp.h +++ b/src/upnp/protocol/pupnp/pupnp.h @@ -3,6 +3,7 @@ * * Author: Stepan Salenikovich <stepan.salenikovich@savoirfairelinux.com> * Author: Eden Abitbol <eden.abitbol@savoirfairelinux.com> + * Author: Mohamed Chibani <mohamed.chibani@savoirfairelinux.com> * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -67,22 +68,18 @@ namespace upnp { constexpr static int ARRAY_IDX_INVALID = 713; constexpr static int CONFLICT_IN_MAPPING = 718; -// Timeout values (in seconds). +// IGD search timeout (in seconds). constexpr static unsigned int SEARCH_TIMEOUT {60}; // Max number of IGD search attempts before failure. constexpr static unsigned int PUPNP_MAX_RESTART_SEARCH_RETRIES {5}; -// Time-out between two successive IGD search. -constexpr static auto PUPNP_TIMEOUT_BEFORE_IGD_SEARCH_RETRY {std::chrono::seconds(60)}; +// Base unit for the timeout between two successive IGD search. +constexpr static auto PUPNP_SEARCH_RETRY_UNIT {std::chrono::seconds(15)}; class PUPnP : public UPnPProtocol { public: using XMLDocument = std::unique_ptr<IXML_Document, decltype(ixmlDocument_free)&>; - struct IGDInfo - { - std::string location; - XMLDocument document; - }; + enum class CtrlAction { UNKNOWN, ADD_PORT_MAPPING, @@ -92,8 +89,6 @@ public: GET_EXTERNAL_IP_ADDRESS }; - using pIGDInfo = std::unique_ptr<IGDInfo>; - PUPnP(); ~PUPnP(); @@ -113,20 +108,17 @@ public: void searchForIgd() override; // Get the IGD list. - void getIgdList(std::list<std::shared_ptr<IGD>>& igdList) const override; + std::list<std::shared_ptr<IGD>> getIgdList() const override; // Return true if the it's fully setup. bool isReady() const override; - // Increment IGD errors counter. - void incrementErrorsCounter(const std::shared_ptr<IGD>& igd) override; - // Get from the IGD the list of already allocated mappings if any. std::map<Mapping::key_t, Mapping> getMappingsListByDescr( const std::shared_ptr<IGD>& igd, const std::string& descr) const override; // Request a new mapping. - void requestMappingAdd(const std::shared_ptr<IGD>& igd, const Mapping& mapping) override; + void requestMappingAdd(const Mapping& mapping) override; // Renew an allocated mapping. // Not implemented. Currently, UPNP allocations do not have expiration time. @@ -135,50 +127,75 @@ public: // Removes a mapping. void requestMappingRemove(const Mapping& igdMapping) override; + // Get the host (local) address. + const IpAddr getHostAddress() const override; + + // Terminate the instance. void terminate() override; private: NON_COPYABLE(PUPnP); + // Helpers to run tasks on PUPNP private execution queue. + ScheduledExecutor* getPUPnPScheduler() { return &pupnpScheduler_; } + template<typename Callback> + void runOnPUPnPQueue(Callback&& cb) + { + pupnpScheduler_.run([cb = std::forward<Callback>(cb)]() mutable { cb(); }); + } + + // Helper to run tasks on UPNP context execution queue. ScheduledExecutor* getUpnContextScheduler() { return UpnpThreadUtil::getScheduler(); } // Init lib-upnp void initUpnpLib(); + // Block until shutdown is complete or time-out. + void waitForShutdown(); + + // Return true if running. + bool isRunning() const; + // Register the client void registerClient(); - // Start the internal thread. - void startPUPnP(); - // Start search for UPNP devices void searchForDevices(); // Return true if it has at least one valid IGD. bool hasValidIgd() const; - // Update and check the host (local) address. Returns true - // if the address is valid. - bool updateAndCheckHostAddress(); + // Update the host (local) address. + void updateHostAddress(); + + // Check the host (local) address. + // Returns true if the address is valid. + bool hasValidHostAddress(); // Delete mappings matching the description void deleteMappingsByDescription(const std::shared_ptr<IGD>& igd, const std::string& description); + // Search for the IGD in the local list of known IGDs. + std::shared_ptr<UPnPIGD> findMatchingIgd(const std::string& ctrlURL) const; + // Process the reception of an add mapping action answer. - void processAddMapAction(const std::string& ctrlURL, - uint16_t ePort, - uint16_t iPort, - PortType portType); + void processAddMapAction(const Mapping& map); + + // Process the a mapping request failure. + void processRequestMappingFailure(const Mapping& map); // Process the reception of a remove mapping action answer. - void processRemoveMapAction(const std::string& ctrlURL, - uint16_t ePort, - uint16_t iPort, - PortType portType); + void processRemoveMapAction(const Mapping& map); + + // Increment IGD errors counter. + void incrementErrorsCounter(const std::shared_ptr<IGD>& igd); + + // Download XML document. + void downLoadIgdDescription(const std::string& url); // Validate IGD from the xml document received from the router. - bool validateIgd(const IGDInfo&); + bool validateIgd(const std::string& location, IXML_Document* doc_container_ptr); // Returns control point action callback based on xml node. static CtrlAction getAction(const char* xmlNode); @@ -217,46 +234,54 @@ private: // Parses the IGD candidate. std::unique_ptr<UPnPIGD> parseIgd(IXML_Document* doc, std::string locationUrl); - // These functions directly create UPnP actions and make synchronous UPnP control point calls. - // Assumes mutex is already locked. + // These functions directly create UPnP actions and make synchronous UPnP + // control point calls. Must be run on the PUPNP internal execution queue. bool actionIsIgdConnected(const UPnPIGD& igd); IpAddr actionGetExternalIP(const UPnPIGD& igd); + bool actionAddPortMapping(const Mapping& mapping); + bool actionDeletePortMapping(const Mapping& mapping); - bool actionAddPortMapping(const std::shared_ptr<UPnPIGD>& igd, const Mapping& mapping); - bool actionAddPortMappingAsync(const std::shared_ptr<UPnPIGD>& igd, const Mapping& mapping); - bool actionDeletePortMappingAsync(const UPnPIGD& igd, - const std::string& port_external, - const std::string& protocol); - bool actionDeletePortMapping(const std::shared_ptr<UPnPIGD>& igd, const Mapping& mapping); // Event type to string static const char* eventTypeToString(Upnp_EventType eventType); std::weak_ptr<PUPnP> weak() { return std::static_pointer_cast<PUPnP>(shared_from_this()); } + // Execution queue to run lib upnp actions + ScheduledExecutor pupnpScheduler_ {}; + // Initialization status. std::atomic_bool initialized_ {false}; // Client registration status. std::atomic_bool clientRegistered_ {false}; - std::condition_variable pupnpCv_ {}; // Condition variable for thread-safe signaling. - std::atomic_bool pupnpRun_ {false}; // Variable to allow the thread to run. - std::thread pupnpThread_ {}; // PUPnP thread for non-blocking client registration. std::shared_ptr<Task> searchForIgdTimer_ {}; unsigned int igdSearchCounter_ {0}; - mutable std::mutex validIgdListMutex_; - std::list<std::shared_ptr<IGD>> validIgdList_; // List of valid IGDs. + // List of discovered IGDs. + std::set<std::string> discoveredIgdList_; + + // Control point handle. + UpnpClient_Handle ctrlptHandle_ {-1}; + + // Observer to report the results. + UpnpMappingObserver* observer_ {nullptr}; + + // List of valid IGDs. + std::list<std::shared_ptr<IGD>> validIgdList_; - std::set<std::string> discoveredIgdList_; // UDN list of discovered IGDs. - std::list<std::future<pIGDInfo>> - dwnldlXmlList_; // List of futures for blocking xml download function calls. - std::list<std::future<pIGDInfo>> cancelXmlList_; // List of abandoned documents + // Current host address. + IpAddr hostAddress_ {}; - mutable std::mutex igdListMutex_; // Mutex used to protect IGD instances. - std::mutex ctrlptMutex_; // Mutex for client handle protection. - UpnpClient_Handle ctrlptHandle_ {-1}; // Control point handle. + // Calls from other threads that does not need synchronous access are + // rescheduled on the UPNP private queue. This will avoid the need to + // protect most of the data members of this class. + // For some internal members (namely the validIgdList and the hostAddress) + // that need to be synchronously accessed, are protected by this mutex. + mutable std::mutex pupnpMutex_; - std::atomic_bool searchForIgd_ {false}; // Variable to signal thread for a search. + // Shutdown synchronization + std::condition_variable shutdownCv_ {}; + bool shutdownComplete_ {false}; }; } // namespace upnp diff --git a/src/upnp/protocol/pupnp/upnp_igd.cpp b/src/upnp/protocol/pupnp/upnp_igd.cpp index 1b5f24bd1af6e8cc167ed9ab342154c09a3a7f75..b1d02e3ce5ed33f24cf6d819231a9fd886d02241 100644 --- a/src/upnp/protocol/pupnp/upnp_igd.cpp +++ b/src/upnp/protocol/pupnp/upnp_igd.cpp @@ -2,7 +2,8 @@ * Copyright (C) 2004-2021 Savoir-faire Linux Inc. * * Author: Stepan Salenikovich <stepan.salenikovich@savoirfairelinux.com> - * Author: Eden Abitbol <eden.abitbol@savoirfairelinux.com> + * Author: Eden Abitbol <eden.abitbol@savoirfairelinux.com> + * Author: Mohamed Chibani <mohamed.chibani@savoirfairelinux.com> * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by diff --git a/src/upnp/protocol/pupnp/upnp_igd.h b/src/upnp/protocol/pupnp/upnp_igd.h index e8b3352a623a97accbeaecde634c7dfca5a3c5e8..cfb13a0ed552f47e575c5402638c8eebac590cca 100644 --- a/src/upnp/protocol/pupnp/upnp_igd.h +++ b/src/upnp/protocol/pupnp/upnp_igd.h @@ -2,7 +2,8 @@ * Copyright (C) 2004-2021 Savoir-faire Linux Inc. * * Author: Stepan Salenikovich <stepan.salenikovich@savoirfairelinux.com> - * Author: Eden Abitbol <eden.abitbol@savoirfairelinux.com> + * Author: Eden Abitbol <eden.abitbol@savoirfairelinux.com> + * Author: Mohamed Chibani <mohamed.chibani@savoirfairelinux.com> * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -52,7 +53,12 @@ public: std::string&& eventSubURL, IpAddr&& localIp = {}, IpAddr&& publicIp = {}); + ~UPnPIGD() {} + + bool operator==(IGD& other) const; + bool operator==(UPnPIGD& other) const; + const std::string& getBaseURL() const { std::lock_guard<std::mutex> lock(mutex_); @@ -89,8 +95,7 @@ public: return eventSubURL_; } - bool operator==(IGD& other) const; - bool operator==(UPnPIGD& other) const; + const std::string toString() const override { return controlURL_; } private: std::string baseURL_ {}; diff --git a/src/upnp/protocol/upnp_protocol.h b/src/upnp/protocol/upnp_protocol.h index ce229b2f13c195268f5c13a16480e8fc9807f433..a14a77f1a1dd21235be9db7ff7d180c8a28a7c57 100644 --- a/src/upnp/protocol/upnp_protocol.h +++ b/src/upnp/protocol/upnp_protocol.h @@ -1,7 +1,8 @@ /* * Copyright (C) 2004-2021 Savoir-faire Linux Inc. * - * Author: Eden Abitbol <eden.abitbol@savoirfairelinux.com> + * Author: Eden Abitbol <eden.abitbol@savoirfairelinux.com> + * Author: Mohamed Chibani <mohamed.chibani@savoirfairelinux.com> * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -58,6 +59,11 @@ constexpr static const char* UPNP_WANPPP_SERVICE enum class UpnpIgdEvent { ADDED, REMOVED, INVALID_STATE }; +// Interface used to report mapping event from the protocol implementations. +// This interface is meant to be implemented only by UPnPConext class. Sincce +// this class is a singleton, it's assumed that it out-lives the protocol +// implementations. In other words, the observer is always assumed to point to a +// valid instance. class UpnpMappingObserver { public: @@ -66,6 +72,7 @@ public: virtual void onIgdUpdated(const std::shared_ptr<IGD>& igd, UpnpIgdEvent event) = 0; virtual void onMappingAdded(const std::shared_ptr<IGD>& igd, const Mapping& map) = 0; + virtual void onMappingRequestFailed(const Mapping& map) = 0; #if HAVE_LIBNATPMP virtual void onMappingRenewed(const std::shared_ptr<IGD>& igd, const Mapping& map) = 0; #endif @@ -94,14 +101,11 @@ public: virtual void searchForIgd() = 0; // Get the IGD instance. - virtual void getIgdList(std::list<std::shared_ptr<IGD>>& igdList) const = 0; + virtual std::list<std::shared_ptr<IGD>> getIgdList() const = 0; // Return true if it has at least one valid IGD. virtual bool isReady() const = 0; - // Increment IGD errors counter. - virtual void incrementErrorsCounter(const std::shared_ptr<IGD>& igd) = 0; - // Get the list of already allocated mappings if any. virtual std::map<Mapping::key_t, Mapping> getMappingsListByDescr(const std::shared_ptr<IGD>&, const std::string&) const @@ -110,7 +114,7 @@ public: } // Sends a request to add a mapping. - virtual void requestMappingAdd(const std::shared_ptr<IGD>& igd, const Mapping& map) = 0; + virtual void requestMappingAdd(const Mapping& map) = 0; // Renew an allocated mapping. virtual void requestMappingRenew(const Mapping& mapping) = 0; @@ -121,16 +125,11 @@ public: // Set the user callbacks. virtual void setObserver(UpnpMappingObserver* obs) = 0; - // Get the host (local) address. - const IpAddr& getHostAddress() const { return hostAddress_; } + // Get the current host (local) address + virtual const IpAddr getHostAddress() const = 0; // Terminate virtual void terminate() = 0; - -protected: - // The host (local) address. Must be fully set before making any request. - IpAddr hostAddress_ {}; - UpnpMappingObserver* observer_ {nullptr}; }; } // namespace upnp diff --git a/src/upnp/upnp_context.cpp b/src/upnp/upnp_context.cpp index f4031a92dbdcf51718b3c2fb6d5097c6c1d4f054..cbb3c6d1e5a36e4a62ae544974cddc77105fe11f 100644 --- a/src/upnp/upnp_context.cpp +++ b/src/upnp/upnp_context.cpp @@ -3,6 +3,7 @@ * * Author: Stepan Salenikovich <stepan.salenikovich@savoirfairelinux.com> * Author: Eden Abitbol <eden.abitbol@savoirfairelinux.com> + * Author: Mohamed Chibani <mohamed.chibani@savoirfairelinux.com> * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -24,8 +25,6 @@ namespace jami { namespace upnp { -constexpr static auto NAT_MAP_REQUEST_TIMEOUT_UNIT = std::chrono::seconds(1); -constexpr static auto PUPNP_MAP_REQUEST_TIMEOUT_UNIT = std::chrono::seconds(5); constexpr static auto MAP_UPDATE_INTERVAL = std::chrono::seconds(30); constexpr static int MAX_REQUEST_RETRIES = 20; constexpr static int MAX_REQUEST_REMOVE_COUNT = 5; @@ -44,7 +43,7 @@ UPnPContext::UPnPContext() portRange_.emplace(PortType::UDP, std::make_pair(UPNP_UDP_PORT_MIN, UPNP_UDP_PORT_MAX)); if (not isValidThread()) { - runOnUpnpContextThread([this] { init(); }); + runOnUpnpContextQueue([this] { init(); }); return; } } @@ -61,7 +60,8 @@ void UPnPContext::shutdown() { if (not isValidThread()) { - runOnUpnpContextThread([this] { shutdown(); }); + runOnUpnpContextQueue([this] { shutdown(); }); + waitForShutdown(); return; } @@ -72,12 +72,29 @@ UPnPContext::shutdown() for (auto const& [_, proto] : protocolList_) proto->terminate(); - std::lock_guard<std::mutex> lock(mappingMutex_); - mappingList_->clear(); - if (mappingListUpdateTimer_) - mappingListUpdateTimer_->cancel(); - controllerList_.clear(); - protocolList_.clear(); + { + std::lock_guard<std::mutex> lock(mappingMutex_); + mappingList_->clear(); + if (mappingListUpdateTimer_) + mappingListUpdateTimer_->cancel(); + controllerList_.clear(); + protocolList_.clear(); + shutdownComplete_ = true; + } + + shutdownCv_.notify_one(); +} + +void +UPnPContext::waitForShutdown() +{ + JAMI_DBG("Waiting for shutdown ..."); + std::unique_lock<std::mutex> lk(mappingMutex_); + if (shutdownCv_.wait_for(lk, std::chrono::seconds(30), [this] { return shutdownComplete_; })) { + JAMI_DBG("Shutdown completed"); + } else { + JAMI_ERR("Shutdown timed-out"); + } } UPnPContext::~UPnPContext() @@ -125,7 +142,7 @@ void UPnPContext::stopUpnp(bool forceRelease) { if (not isValidThread()) { - runOnUpnpContextThread([this] { stopUpnp(); }); + runOnUpnpContextQueue([this] { stopUpnp(); }); return; } @@ -147,6 +164,7 @@ UPnPContext::stopUpnp(bool forceRelease) } } // Invalidate the current IGDs. + preferredIgd_.reset(); validIgdList_.clear(); } @@ -198,18 +216,43 @@ void UPnPContext::connectivityChanged() { if (not isValidThread()) { - runOnUpnpContextThread([this] { connectivityChanged(); }); + runOnUpnpContextQueue([this] { connectivityChanged(); }); return; } + auto hostAddr = ip_utils::getLocalAddr(AF_INET); + + JAMI_DBG("Connectivity change check: host address %s", hostAddr.toString().c_str()); + + auto addrChanged = false; + + // Check if the host address changed. + for (auto const& [_, protocol] : protocolList_) { + if (protocol->isReady() and hostAddr != protocol->getHostAddress()) { + JAMI_WARN("Host address changed from %s to %s", + protocol->getHostAddress().toString().c_str(), + hostAddr.toString().c_str()); + protocol->clearIgds(); + addrChanged = true; + } + } + + // Address did not change, nothing to do. + if (not addrChanged) { + return; + } + + // No registered controller. New search will be performed when + // a controller is registered. if (controllerList_.empty()) return; - JAMI_DBG("Connectivity changed. Reset IGDs and restart."); + JAMI_DBG("Connectivity changed. Clear the IGDs and restart"); stopUpnp(); startUpnp(); + // Mapping with auto update enabled must be processed first. processMappingWithAutoUpdate(); } @@ -266,9 +309,10 @@ UPnPContext::reserveMapping(Mapping& requestedMap) // the caller to use it or not. for (auto const& [_, map] : mappingList) { // If the desired port is null, we pick the first available port. - if ((desiredPort == 0 or map->getExternalPort() == desiredPort) and map->isAvailable()) { - // Considere the first available mapping regardless of - // its state, if we dont have one yet. + if (map->isValid() and (desiredPort == 0 or map->getExternalPort() == desiredPort) + and map->isAvailable()) { + // Considere the first available mapping regardless of its + // state. A mapping with OPEN state will be used if found. if (not mapRes) mapRes = map; @@ -307,7 +351,7 @@ void UPnPContext::releaseMapping(const Mapping& map) { if (not isValidThread()) { - runOnUpnpContextThread([this, map] { releaseMapping(map); }); + runOnUpnpContextQueue([this, map] { releaseMapping(map); }); return; } @@ -332,8 +376,16 @@ UPnPContext::releaseMapping(const Mapping& map) void UPnPContext::registerController(void* controller) { + { + std::lock_guard<std::mutex> lock(mappingMutex_); + if (shutdownComplete_) { + JAMI_WARN("UPnPContext already shut down"); + return; + } + } + if (not isValidThread()) { - runOnUpnpContextThread([this, controller] { registerController(controller); }); + runOnUpnpContextQueue([this, controller] { registerController(controller); }); return; } @@ -352,7 +404,7 @@ void UPnPContext::unregisterController(void* controller) { if (not isValidThread()) { - runOnUpnpContextThread([this, controller] { unregisterController(controller); }); + runOnUpnpContextQueue([this, controller] { unregisterController(controller); }); return; } @@ -394,7 +446,7 @@ UPnPContext::requestMapping(const Mapping::sharedPtr_t& map) assert(map); if (not isValidThread()) { - runOnUpnpContextThread([this, map] { requestMapping(map); }); + runOnUpnpContextQueue([this, map] { requestMapping(map); }); return; } @@ -410,24 +462,16 @@ UPnPContext::requestMapping(const Mapping::sharedPtr_t& map) map->setIgd(igd); - JAMI_DBG("Request mapping %s using protocol [%s] IGD [%s %s]", + JAMI_DBG("Request mapping %s using protocol [%s] IGD [%s]", map->toString().c_str(), igd->getProtocolName(), - igd->getUID().c_str(), - igd->getLocalIp().toString().c_str()); + igd->toString().c_str()); - // Register mapping timeout callback and update the state if needed. - registerAddMappingTimeout(map); if (map->getState() != MappingState::IN_PROGRESS) updateMappingState(map, MappingState::IN_PROGRESS); - // Request the mapping. - if (not igd) { - JAMI_ERR("No valid IGD!"); - return; - } auto const& protocol = protocolList_.at(igd->getProtocol()); - protocol->requestMappingAdd(igd, *map); + protocol->requestMappingAdd(*map); } bool @@ -494,27 +538,48 @@ UPnPContext::deleteUnneededMappings(PortType type, int portCount) return true; } -std::shared_ptr<IGD> -UPnPContext::getPreferredIgd() const +void +UPnPContext::updatePreferredIgd() { CHECK_VALID_THREAD(); - std::shared_ptr<IGD> prefIgd; + if (preferredIgd_ and preferredIgd_->isValid()) + return; + + // Reset and search for the best IGD. + preferredIgd_.reset(); + for (auto const& [_, protocol] : protocolList_) { if (protocol->isReady()) { - std::list<std::shared_ptr<IGD>> igdList; - protocol->getIgdList(igdList); + auto igdList = protocol->getIgdList(); assert(not igdList.empty()); auto const& igd = igdList.front(); + if (not igd->isValid()) + continue; - // Perefer IGD with the lowest error count, if equal, prefer NAT-PMP. - if (not prefIgd or igd->getErrorsCount() < prefIgd->getErrorsCount() - or protocol->getProtocol() == NatProtocolType::NAT_PMP) { - prefIgd = igd; - } + // Prefer NAT-PMP over PUPNP. + if (preferredIgd_ and igd->getProtocol() != NatProtocolType::NAT_PMP) + continue; + + // Update. + preferredIgd_ = igd; } } - return prefIgd; + + if (preferredIgd_ and preferredIgd_->isValid()) { + JAMI_DBG("Preferred IGD updated to [%s] IGD [%s %s] ", + preferredIgd_->getProtocolName(), + preferredIgd_->getUID().c_str(), + preferredIgd_->toString().c_str()); + } +} + +std::shared_ptr<IGD> +UPnPContext::getPreferredIgd() const +{ + CHECK_VALID_THREAD(); + + return preferredIgd_; } void @@ -522,12 +587,15 @@ UPnPContext::updateMappingList(bool async) { // Run async if requested. if (async) { - runOnUpnpContextThread([this] { updateMappingList(false); }); + runOnUpnpContextQueue([this] { updateMappingList(false); }); return; } CHECK_VALID_THREAD(); + // Update the preferred IGD. + updatePreferredIgd(); + // Skip if no controller registered. if (controllerList_.empty()) return; @@ -543,8 +611,6 @@ UPnPContext::updateMappingList(bool async) if (not prefIgd) { JAMI_DBG("UPNP/NAT-PMP enabled, but no valid IGDs available"); std::lock_guard<std::mutex> lock(mappingMutex_); - // Invalidate the current IGDs. - validIgdList_.clear(); // No valid IGD. Nothing to do. return; } @@ -552,7 +618,7 @@ UPnPContext::updateMappingList(bool async) JAMI_DBG("Current preferred protocol [%s] IGD [%s %s] ", prefIgd->getProtocolName(), prefIgd->getUID().c_str(), - prefIgd->getLocalIp().toString().c_str()); + prefIgd->toString().c_str()); // Process pending requests if any. processPendingRequests(prefIgd); @@ -645,8 +711,12 @@ UPnPContext::pruneMappingList() auto remoteMapList = protocol->getMappingsListByDescr(igd, Mapping::UPNP_MAPPING_DESCRIPTION_PREFIX); - if (remoteMapList.empty()) - return; + if (remoteMapList.empty()) { + std::lock_guard<std::mutex> lock(mappingMutex_); + if (not getMappingList(PortType::TCP).empty() or getMappingList(PortType::TCP).empty()) { + JAMI_WARN("We have provisionned mappings but the PUPNP IGD returned an empty list!"); + } + } pruneUnMatchedMappings(igd, remoteMapList); pruneUnTrackedMappings(igd, remoteMapList); @@ -681,7 +751,7 @@ UPnPContext::pruneUnMatchedMappings(const std::shared_ptr<IGD>& igd, JAMI_WARN("Mapping %s (IGD %s) marked as \"OPEN\" but not found in the " "remote list. Mark as failed!", map->toString().c_str(), - igd->getLocalIp().toString().c_str()); + igd->toString().c_str()); } } } @@ -712,7 +782,7 @@ UPnPContext::pruneUnTrackedMappings(const std::shared_ptr<IGD>& igd, // Not present, request mapping remove. JAMI_DBG("Sending a remove request for un-tracked mapping %s on IGD %s", map.toString().c_str(), - igd->getLocalIp().toString().c_str()); + igd->toString().c_str()); // Add to the list. toRemoveList.emplace_back(std::move(map)); // Make only few remove requests at once. @@ -753,9 +823,8 @@ UPnPContext::pruneMappingsWithInvalidIgds(const std::shared_ptr<IGD>& igd) for (auto const& map : toRemoveList) { JAMI_DBG("Remove mapping %s (has an invalid IGD %s [%s])", map->toString().c_str(), - igd->getLocalIp().toString().c_str(), + igd->toString().c_str(), igd->getProtocolName()); - map->cancelTimeoutTimer(); updateMappingState(map, MappingState::FAILED); unregisterMapping(map); } @@ -778,9 +847,9 @@ UPnPContext::processPendingRequests(const std::shared_ptr<IGD>& igd) auto& mappingList = getMappingList(type); for (auto& [_, map] : mappingList) { if (map->getState() == MappingState::PENDING) { - JAMI_DBG("Send request for pending mapping %s to IGD %s", + JAMI_DBG("Send pending request for mapping %s to IGD %s", map->toString().c_str(), - igd->getLocalIp().toString(true).c_str()); + igd->toString().c_str()); requestsList.emplace_back(map); } } @@ -823,11 +892,11 @@ UPnPContext::processMappingWithAutoUpdate() // Reserve a new mapping. Mapping newMapping(oldMap->getType()); + newMapping.enableAutoUpdate(true); + newMapping.setNotifyCallback(oldMap->getNotifyCallback()); + auto const& mapPtr = reserveMapping(newMapping); assert(mapPtr); - mapPtr->setAvailable(false); - mapPtr->enableAutoUpdate(true); - mapPtr->setNotifyCallback(oldMap->getNotifyCallback()); // Release the old one. oldMap->setAvailable(true); @@ -843,10 +912,13 @@ UPnPContext::onIgdUpdated(const std::shared_ptr<IGD>& igd, UpnpIgdEvent event) assert(igd); if (not isValidThread()) { - runOnUpnpContextThread([this, igd, event] { onIgdUpdated(igd, event); }); + runOnUpnpContextQueue([this, igd, event] { onIgdUpdated(igd, event); }); return; } + // Reset to start search for a new best IGD. + preferredIgd_.reset(); + char const* IgdState = event == UpnpIgdEvent::ADDED ? "ADDED" : event == UpnpIgdEvent::REMOVED ? "REMOVED" : "INVALID"; @@ -854,9 +926,13 @@ UPnPContext::onIgdUpdated(const std::shared_ptr<IGD>& igd, UpnpIgdEvent event) auto const& igdLocalAddr = igd->getLocalIp(); auto protocolName = igd->getProtocolName(); - JAMI_DBG("New event for IGD [%s] [%s]: [%s]", igd->getUID().c_str(), protocolName, IgdState); + JAMI_DBG("New event for IGD [%s %s] [%s]: [%s]", + igd->getUID().c_str(), + igd->toString().c_str(), + protocolName, + IgdState); - // Check if IGD has a valid addresses. + // Check if the IGD has valid addresses. if (not igdLocalAddr) { JAMI_WARN("[%s] IGD has an invalid local address", protocolName); return; @@ -879,7 +955,7 @@ UPnPContext::onIgdUpdated(const std::shared_ptr<IGD>& igd, UpnpIgdEvent event) if (event == UpnpIgdEvent::REMOVED or event == UpnpIgdEvent::INVALID_STATE) { JAMI_WARN("State of IGD [%s %s] [%s] changed to [%s]. Pruning the mapping list", igd->getUID().c_str(), - igdLocalAddr.toString(true, true).c_str(), + igd->toString().c_str(), protocolName, IgdState); @@ -907,7 +983,7 @@ UPnPContext::onIgdUpdated(const std::shared_ptr<IGD>& igd, UpnpIgdEvent event) } } - // Update. + // Update the provisionned mappings. updateMappingList(false); } @@ -922,50 +998,22 @@ UPnPContext::onMappingAdded(const std::shared_ptr<IGD>& igd, const Mapping& mapR // We may receive a response for a canceled request. Just ignore it. JAMI_DBG("Response for mapping %s [IGD %s] [%s] does not have a local match", mapRes.toString().c_str(), - igd->getLocalIp().toString().c_str(), + igd->toString().c_str(), mapRes.getProtocolName()); return; } - // The mapping pointer must be valid at his point. - assert(map); - - // We have a response, so cancel the timer. - map->cancelTimeoutTimer(); - - // Update the mapping. - map->setState(mapRes.getState()); - map->setIgd(mapRes.getIgd()); + // The mapping request is new and successful. Update. + map->setIgd(igd); map->setInternalAddress(mapRes.getInternalAddress()); map->setExternalPort(mapRes.getExternalPort()); - // Clean-up if not valid. - if (not map->isValid()) { - JAMI_WARN("Mapping request %s on IGD %s [%s] failed!", - map->toString(true).c_str(), - igd->getLocalIp().toString().c_str(), - igd->getProtocolName()); - updateMappingState(map, MappingState::FAILED); - unregisterMapping(map); - return; - } - - // Ignore if already open. - if (map->getState() == MappingState::OPEN) { - assert(map->getIgd()); - JAMI_DBG("Mapping %s is already open on IGD %s [%s]", - map->toString().c_str(), - map->getIgd()->getLocalIp().toString().c_str(), - map->getIgd()->getProtocolName()); - return; - } - - // The mapping request is new and successful. Update. + // Update the state and report to the owner. updateMappingState(map, MappingState::OPEN); JAMI_DBG("Mapping %s (on IGD %s [%s]) successfully performed", map->toString().c_str(), - igd->getLocalIp().toString().c_str(), + igd->toString().c_str(), map->getProtocolName()); // Call setValid() to reset the errors counter. We need @@ -983,7 +1031,7 @@ UPnPContext::onMappingRenewed(const std::shared_ptr<IGD>& igd, const Mapping& ma // We may receive a notification for a canceled request. Ignore it. JAMI_WARN("Renewed mapping %s from IGD %s [%s] does not have a match in local list", map.toString().c_str(), - igd->getLocalIp().toString().c_str(), + igd->toString().c_str(), map.getProtocolName()); return; } @@ -991,7 +1039,7 @@ UPnPContext::onMappingRenewed(const std::shared_ptr<IGD>& igd, const Mapping& ma or mapPtr->getState() != MappingState::OPEN) { JAMI_WARN("Renewed mapping %s from IGD %s [%s] is in unexpected state", mapPtr->toString().c_str(), - igd->getLocalIp().toString().c_str(), + igd->toString().c_str(), mapPtr->getProtocolName()); return; } @@ -1011,12 +1059,7 @@ UPnPContext::requestRemoveMapping(const Mapping::sharedPtr_t& map) } if (not map->isValid()) { - JAMI_WARN("Mapping [%s] is invalid! Ignore.", map->toString().c_str()); - return; - } - - if (not map->getIgd()->isValid()) { - JAMI_WARN("Mapping [%s] has an invalid IGD! Ignore.", map->toString().c_str()); + // Silently ignore if the mapping is invalid return; } @@ -1028,7 +1071,7 @@ void UPnPContext::deleteAllMappings(PortType type) { if (not isValidThread()) { - runOnUpnpContextThread([this, type] { deleteAllMappings(type); }); + runOnUpnpContextQueue([this, type] { deleteAllMappings(type); }); return; } @@ -1047,7 +1090,7 @@ UPnPContext::onMappingRemoved(const std::shared_ptr<IGD>& igd, const Mapping& ma return; if (not isValidThread()) { - runOnUpnpContextThread([this, igd, mapRes] { onMappingRemoved(igd, mapRes); }); + runOnUpnpContextQueue([this, igd, mapRes] { onMappingRemoved(igd, mapRes); }); return; } @@ -1119,8 +1162,6 @@ UPnPContext::unregisterMapping(const Mapping::sharedPtr_t& map) return; } - map->cancelTimeoutTimer(); - if (map->getAutoUpdate()) { // Dont unregister mappings with auto-update enabled. return; @@ -1130,7 +1171,10 @@ UPnPContext::unregisterMapping(const Mapping::sharedPtr_t& map) if (mappingList.erase(map->getMapKey()) == 1) { JAMI_DBG("Unregistered mapping %s", map->toString().c_str()); } else { - JAMI_ERR("Failed to unregister mapping %s", map->toString().c_str()); + // The mapping may already be un-registered. Just ignore it. + JAMI_DBG("Mapping %s [%s] does not have a local match", + map->toString().c_str(), + map->getProtocolName()); } } @@ -1195,44 +1239,16 @@ UPnPContext::getMappingStatus(MappingStatus& status) } void -UPnPContext::registerAddMappingTimeout(const Mapping::sharedPtr_t& map) -{ - if (not map) { - JAMI_ERR("Invalid mapping pointer"); - return; - } - - auto const& igd = map->getIgd(); - if (not igd) { - JAMI_ERR("Invalid igd pointer"); - return; - } - - MappingStatus status; - getMappingStatus(status); - - // Schedule the timer and hold a pointer on the task. - auto timeout = igd->getProtocol() == NatProtocolType::NAT_PMP ? NAT_MAP_REQUEST_TIMEOUT_UNIT - : PUPNP_MAP_REQUEST_TIMEOUT_UNIT; - map->setTimeoutTimer( - // The time-out is set proportional to the number of "in-progress" - // requests plus a bias. - // This rule is empirical and based on experimenting with few - // implementations. - Manager::instance() - .scheduler() - .scheduleIn([this, key = map->getMapKey()] { onRequestTimeOut(key); }, - timeout * (status.inProgressCount_ + 5))); -} - -void -UPnPContext::onRequestTimeOut(Mapping::key_t key) +UPnPContext::onMappingRequestFailed(const Mapping& mapRes) { CHECK_VALID_THREAD(); - auto const& map = getMappingWithKey(key); + auto const& map = getMappingWithKey(mapRes.getMapKey()); if (not map) { - JAMI_ERR("Mapping pointer is null"); + // We may receive a response for a removed request. Just ignore it. + JAMI_DBG("Mapping %s [IGD %s] does not have a local match", + mapRes.toString().c_str(), + mapRes.getProtocolName()); return; } @@ -1242,31 +1258,13 @@ UPnPContext::onRequestTimeOut(Mapping::key_t key) return; } - if (not getMappingWithKey(map->getMapKey())) { - JAMI_ERR("Mapping [%s] does not exist", map->toString().c_str()); - return; - } - - // Ignore time-out if the request is not in-progress state. - // Should not occur. - if (map->getState() != MappingState::IN_PROGRESS) { - JAMI_ERR("Mapping %s timed-out but is not in IN-PROGRESS state (curr %s)", - map->toString().c_str(), - map->getStateStr()); - return; - } + updateMappingState(map, MappingState::FAILED); + unregisterMapping(map); - JAMI_WARN("Mapping request for %s timed-out on IGD %s [%s]", + JAMI_WARN("Mapping request for %s failed on IGD %s [%s]", map->toString().c_str(), - igd->getLocalIp().toString().c_str(), + igd->toString().c_str(), igd->getProtocolName()); - - auto protocol = protocolList_.at(igd->getProtocol()); - protocol->incrementErrorsCounter(igd); - - // Considere time-out as failure. - updateMappingState(map, MappingState::FAILED); - unregisterMapping(map); } void diff --git a/src/upnp/upnp_context.h b/src/upnp/upnp_context.h index 6ce476a03e0fcf399cf5e2aa7bad2beed953d69d..fe12465ee60ed77d2a489c648aa0ea733b2e1e60 100644 --- a/src/upnp/upnp_context.h +++ b/src/upnp/upnp_context.h @@ -3,6 +3,7 @@ * * Author: Stepan Salenikovich <stepan.salenikovich@savoirfairelinux.com> * Author: Eden Abitbol <eden.abitbol@savoirfairelinux.com> + * Author: Mohamed Chibani <mohamed.chibani@savoirfairelinux.com> * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -126,6 +127,9 @@ private: // Initialization void init(); + // Wait for Shutdown + void waitForShutdown(); + /** * @brief start the search for IGDs activate the mapping * list update. @@ -159,12 +163,6 @@ private: // Remove all mappings of the given type. void deleteAllMappings(PortType type); - // Schedule a time-out timer for a in-progress request. - void registerAddMappingTimeout(const Mapping::sharedPtr_t& map); - - // Callback invoked when a request times-out - void onRequestTimeOut(Mapping::key_t key); - // Update the state and notify the listener void updateMappingState(const Mapping::sharedPtr_t& map, MappingState newState, @@ -173,6 +171,9 @@ private: // Provision ports. uint16_t getAvailablePortNumber(PortType type); + // Update preferred IGD + void updatePreferredIgd(); + // Get preferred IGD std::shared_ptr<IGD> getPreferredIgd() const; @@ -243,6 +244,9 @@ private: void onIgdUpdated(const std::shared_ptr<IGD>& igd, UpnpIgdEvent event) override; // Callback used to report add request status. void onMappingAdded(const std::shared_ptr<IGD>& igd, const Mapping& map) override; + // Callback invoked when a request fails. Reported on failures for both + // new requests and renewal requests (if supported by the the protocol). + void onMappingRequestFailed(const Mapping& map) override; #if HAVE_LIBNATPMP // Callback used to report renew request status. void onMappingRenewed(const std::shared_ptr<IGD>& igd, const Mapping& map) override; @@ -275,12 +279,19 @@ private: std::shared_ptr<Task> mappingListUpdateTimer_ {}; + // Current preferred IGD. Can be null if there is no valid IGD. + std::shared_ptr<IGD> preferredIgd_; + // This mutex must lock only these two members. All other // members must be accessed only from the UPNP context thread. std::mutex mutable mappingMutex_; // List of mappings. std::map<Mapping::key_t, Mapping::sharedPtr_t> mappingList_[2] {}; std::set<std::shared_ptr<IGD>> validIgdList_ {}; + + // Shutdown synchronization + std::condition_variable shutdownCv_ {}; + bool shutdownComplete_ {false}; }; } // namespace upnp diff --git a/src/upnp/upnp_control.cpp b/src/upnp/upnp_control.cpp index 702da5ff0fe8e6b1a9b4a56fa9be8ed4a9fefd25..0aecf239423333f5074360a86acbb037db3ec9db 100644 --- a/src/upnp/upnp_control.cpp +++ b/src/upnp/upnp_control.cpp @@ -2,7 +2,8 @@ * Copyright (C) 2004-2021 Savoir-faire Linux Inc. * * Author: Stepan Salenikovich <stepan.salenikovich@savoirfairelinux.com> - * Author: Eden Abitbol <eden.abitbol@savoirfairelinux.com> + * Author: Eden Abitbol <eden.abitbol@savoirfairelinux.com> + * Author: Mohamed Chibani <mohamed.chibani@savoirfairelinux.com> * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by diff --git a/src/upnp/upnp_control.h b/src/upnp/upnp_control.h index c24579aa626220da02e23d6d64a05498cb3ce0c3..d19ac8e4bd8f1b927e6810ac5a50b7ada8f84607 100644 --- a/src/upnp/upnp_control.h +++ b/src/upnp/upnp_control.h @@ -2,7 +2,8 @@ * Copyright (C) 2004-2021 Savoir-faire Linux Inc. * * Author: Stepan Salenikovich <stepan.salenikovich@savoirfairelinux.com> - * Author: Eden Abitbol <eden.abitbol@savoirfairelinux.com> + * Author: Eden Abitbol <eden.abitbol@savoirfairelinux.com> + * Author: Mohamed Chibani <mohamed.chibani@savoirfairelinux.com> * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by diff --git a/src/upnp/upnp_thread_util.h b/src/upnp/upnp_thread_util.h index 1fcc3c9ceee5f261bffd321c7821490bcddb9018..e0320c4d8f869dd66bcb94c87abff18ed1d1c1ff 100644 --- a/src/upnp/upnp_thread_util.h +++ b/src/upnp/upnp_thread_util.h @@ -22,12 +22,11 @@ protected: bool isValidThread() const { return threadId_ == getCurrentThread(); } - // Upnp scheduler (same as manager's thread) + // Upnp context execution queue (same as manager's scheduler) + // Helpers to run tasks on upnp context queue. static ScheduledExecutor* getScheduler() { return &Manager::instance().scheduler(); } - - // Helper to run tasks on upnp thread. template<typename Callback> - static void runOnUpnpContextThread(Callback&& cb) + static void runOnUpnpContextQueue(Callback&& cb) { getScheduler()->run([cb = std::forward<Callback>(cb)]() mutable { cb(); }); }