diff --git a/include/upnp/upnp_context.h b/include/upnp/upnp_context.h index 57f6d826453f7925d6f463799c99f050ee2acd5d..a1014cc717296d7784cf2bd8566bc54ec43be2dd 100644 --- a/include/upnp/upnp_context.h +++ b/include/upnp/upnp_context.h @@ -84,7 +84,10 @@ public: UPnPContext(const std::shared_ptr<asio::io_context>& ctx, const std::shared_ptr<dht::log::Logger>& logger); ~UPnPContext(); - std::shared_ptr<asio::io_context> createIoContext(const std::shared_ptr<asio::io_context>& ctx, const std::shared_ptr<dht::log::Logger>& logger); + static std::shared_ptr<asio::io_context> createIoContext( + const std::shared_ptr<asio::io_context>& ctx, + std::unique_ptr<std::thread>& ioContextRunner, + const std::shared_ptr<dht::log::Logger>& logger); // Terminate the instance. void shutdown(); @@ -122,12 +125,12 @@ public: template <typename T> inline void dispatch(T&& f) { - ctx->dispatch(std::move(f)); + stateCtx->dispatch(std::move(f)); } void restart() { - ctx->dispatch([this]{ + dispatch([this]{ stopUpnp(); startUpnp(); }); @@ -279,7 +282,8 @@ private: void _connectivityChanged(const asio::error_code& ec); - // Thread (io_context), destroyed last + // Thread for the state management, destroyed last + std::unique_ptr<std::thread> stateContextRunner_ {}; std::unique_ptr<std::thread> ioContextRunner_ {}; bool started_ {false}; @@ -308,7 +312,9 @@ private: return maxAvailableMappings_[index]; } - std::shared_ptr<asio::io_context> ctx; + std::shared_ptr<asio::io_context> stateCtx; + /** Context dedicated to run blocking IO calls */ + std::shared_ptr<asio::io_context> ioCtx; std::shared_ptr<dht::log::Logger> logger_; asio::steady_timer connectivityChangedTimer_; asio::system_timer mappingRenewalTimer_; diff --git a/src/upnp/protocol/natpmp/nat_pmp.cpp b/src/upnp/protocol/natpmp/nat_pmp.cpp index 3459981a456014892dd65d0ebcbb71aeb0f9d9e7..01ebc9536ac46ad808dd2e09e552dae912b7d424 100644 --- a/src/upnp/protocol/natpmp/nat_pmp.cpp +++ b/src/upnp/protocol/natpmp/nat_pmp.cpp @@ -753,80 +753,41 @@ NatPmp::processIgdUpdate(UpnpIgdEvent event) removeAllMappings(); } - if (observer_ == nullptr) - return; - // Process the response on the context thread. - ioContext->post([w = weak(), event] { - if (auto shared = w.lock()) { - if (!shared->shutdownComplete_) { - shared->observer_->onIgdUpdated(shared->igd_, event); - } - } - }); + if (observer_ && !shutdownComplete_) { + observer_->onIgdUpdated(igd_, event); + } } void NatPmp::processMappingAdded(const Mapping& map) { - if (observer_ == nullptr) - return; - - // Process the response on the context thread. - ioContext->post([w=weak(), map] { - if (auto shared = w.lock()) { - if (!shared->shutdownComplete_) { - shared->observer_->onMappingAdded(shared->igd_, map); - } - } - }); + if (observer_ && !shutdownComplete_) { + observer_->onMappingAdded(igd_, map); + } } void NatPmp::processMappingRequestFailed(const Mapping& map) { - if (observer_ == nullptr) - return; - - // Process the response on the context thread. - ioContext->post([w=weak(), map] { - if (auto shared = w.lock()) { - if (!shared->shutdownComplete_) { - shared->observer_->onMappingRequestFailed(map); - } - } - }); + if (observer_ && !shutdownComplete_) { + observer_->onMappingRequestFailed(map); + } } void NatPmp::processMappingRenewed(const Mapping& map) { - if (observer_ == nullptr) - return; - - // Process the response on the context thread. - ioContext->post([w=weak(), map] { - if (auto shared = w.lock()) { - if (!shared->shutdownComplete_) { - shared->observer_->onMappingRenewed(shared->igd_, map); - } - } - }); + if (observer_ && !shutdownComplete_) { + observer_->onMappingRenewed(igd_, map); + } } void NatPmp::processMappingRemoved(const Mapping& map) { - if (observer_ == nullptr) - return; - - // Process the response on the context thread. - ioContext->post([w=weak(), map] { - if (auto shared = w.lock()) { - if (!shared->shutdownComplete_) { - shared->observer_->onMappingRemoved(shared->igd_, map); - } - } - }); + if (observer_ && !shutdownComplete_) { + observer_->onMappingRemoved(igd_, map); + } } } // namespace upnp diff --git a/src/upnp/protocol/pupnp/pupnp.cpp b/src/upnp/protocol/pupnp/pupnp.cpp index e26e7be54116522aa51a074f47b1bb2184099afd..c702d4047f2f5bd742e8471be92dd76c2f55ce05 100644 --- a/src/upnp/protocol/pupnp/pupnp.cpp +++ b/src/upnp/protocol/pupnp/pupnp.cpp @@ -549,12 +549,8 @@ PUPnP::validateIgd(const std::string& location, IXML_Document* doc_container_ptr } // Report to the listener. - ioContext->post([w = weak(), igd_candidate] { - if (auto upnpThis = w.lock()) { - if (upnpThis->observer_) - upnpThis->observer_->onIgdUpdated(igd_candidate, UpnpIgdEvent::ADDED); - } - }); + if (observer_) + observer_->onIgdUpdated(igd_candidate, UpnpIgdEvent::ADDED); return true; } @@ -660,55 +656,29 @@ PUPnP::findMatchingIgd(const std::string& ctrlURL) const void PUPnP::processAddMapAction(const Mapping& map) { - if (observer_ == nullptr) - return; - - ioContext->post([w = weak(), map] { - if (auto upnpThis = w.lock()) { - if (upnpThis->observer_) - upnpThis->observer_->onMappingAdded(map.getIgd(), std::move(map)); - } - }); + if (observer_) + observer_->onMappingAdded(map.getIgd(), std::move(map)); } void PUPnP::processMappingRenewed(const Mapping& map) { - if (observer_ == nullptr) - return; - - ioContext->post([w = weak(), map] { - if (auto upnpThis = w.lock()) { - if (upnpThis->observer_) - upnpThis->observer_->onMappingRenewed(map.getIgd(), std::move(map)); - } - }); + if (observer_) + observer_->onMappingRenewed(map.getIgd(), std::move(map)); } void PUPnP::processRequestMappingFailure(const Mapping& map) { - if (observer_ == nullptr) - return; - - ioContext->post([w = weak(), map] { - if (auto upnpThis = w.lock()) { - if (upnpThis->observer_) - upnpThis->observer_->onMappingRequestFailed(map); - } - }); + if (observer_) + observer_->onMappingRequestFailed(map); } void PUPnP::processRemoveMapAction(const Mapping& map) { - if (observer_ == nullptr) - return; - - if (logger_) logger_->warn("PUPnP: Closed mapping {}", map.toString()); - ioContext->post([map, obs = observer_] { - obs->onMappingRemoved(map.getIgd(), std::move(map)); - }); + if (observer_) + observer_->onMappingRemoved(map.getIgd(), map); } const char* diff --git a/src/upnp/upnp_context.cpp b/src/upnp/upnp_context.cpp index 4c209ae50cd0a291a55627b6eb27383c6c0ed1b0..756ec1ecafdcc7b4aedab8e0b5b71834d71b8439 100644 --- a/src/upnp/upnp_context.cpp +++ b/src/upnp/upnp_context.cpp @@ -45,13 +45,14 @@ constexpr static uint16_t UPNP_UDP_PORT_MIN {20000}; constexpr static uint16_t UPNP_UDP_PORT_MAX {UPNP_UDP_PORT_MIN + 5000}; UPnPContext::UPnPContext(const std::shared_ptr<asio::io_context>& ioContext, const std::shared_ptr<dht::log::Logger>& logger) - : ctx(createIoContext(ioContext, logger)) + : stateCtx(createIoContext(ioContext, stateContextRunner_, logger)) + , ioCtx(createIoContext(nullptr, ioContextRunner_, logger)) , logger_(logger) - , connectivityChangedTimer_(*ctx) - , mappingRenewalTimer_(*ctx) - , renewalSchedulingTimer_(*ctx) - , syncTimer_(*ctx) - , igdDiscoveryTimer_(*ctx) + , connectivityChangedTimer_(*stateCtx) + , mappingRenewalTimer_(*stateCtx) + , renewalSchedulingTimer_(*stateCtx) + , syncTimer_(*stateCtx) + , igdDiscoveryTimer_(*stateCtx) { if (logger_) logger_->debug("Creating UPnPContext instance [{}]", fmt::ptr(this)); @@ -60,17 +61,19 @@ UPnPContext::UPnPContext(const std::shared_ptr<asio::io_context>& ioContext, con portRange_.emplace(PortType::TCP, std::make_pair(UPNP_TCP_PORT_MIN, UPNP_TCP_PORT_MAX)); portRange_.emplace(PortType::UDP, std::make_pair(UPNP_UDP_PORT_MIN, UPNP_UDP_PORT_MAX)); - ctx->post([this] { init(); }); + stateCtx->post([this] { init(); }); } std::shared_ptr<asio::io_context> -UPnPContext::createIoContext(const std::shared_ptr<asio::io_context>& ctx, const std::shared_ptr<dht::log::Logger>& logger) { +UPnPContext::createIoContext(const std::shared_ptr<asio::io_context>& ctx, + std::unique_ptr<std::thread>& ioContextRunner, + const std::shared_ptr<dht::log::Logger>& logger) { if (ctx) { return ctx; } else { if (logger) logger->debug("UPnPContext: Starting dedicated io_context thread"); auto ioCtx = std::make_shared<asio::io_context>(); - ioContextRunner_ = std::make_unique<std::thread>([ioCtx, l=logger]() { + ioContextRunner = std::make_unique<std::thread>([ioCtx, l=logger]() { try { auto work = asio::make_work_guard(*ioCtx); ioCtx->run(); @@ -113,7 +116,7 @@ UPnPContext::shutdown() std::unique_lock lk(mappingMutex_); std::condition_variable cv; - ctx->post([&, this] { shutdown(cv); }); + stateCtx->post([&, this] { shutdown(cv); }); if (logger_) logger_->debug("Waiting for shutdown…"); @@ -130,11 +133,18 @@ UPnPContext::shutdown() if (ioContextRunner_) { if (logger_) logger_->debug("UPnPContext: Stopping io_context thread {}", fmt::ptr(this)); - ctx->stop(); + ioCtx->stop(); ioContextRunner_->join(); ioContextRunner_.reset(); if (logger_) logger_->debug("UPnPContext: Stopping io_context thread - finished {}", fmt::ptr(this)); } + if (stateContextRunner_) { + if (logger_) logger_->debug("Stopping io runner for UPnPContext instance {}", fmt::ptr(this)); + stateCtx->stop(); + stateContextRunner_->join(); + stateContextRunner_.reset(); + if (logger_) logger_->debug("Stopped io runner for UPnPContext instance {}", fmt::ptr(this)); + } } UPnPContext::~UPnPContext() @@ -146,13 +156,13 @@ void UPnPContext::init() { #if HAVE_LIBNATPMP - auto natPmp = std::make_shared<NatPmp>(ctx, logger_); + auto natPmp = std::make_shared<NatPmp>(ioCtx, logger_); natPmp->setObserver(this); protocolList_.emplace(NatProtocolType::NAT_PMP, std::move(natPmp)); #endif #if HAVE_LIBUPNP - auto pupnp = std::make_shared<PUPnP>(ctx, logger_); + auto pupnp = std::make_shared<PUPnP>(ioCtx, logger_); pupnp->setObserver(this); protocolList_.emplace(NatProtocolType::PUPNP, std::move(pupnp)); #endif @@ -167,7 +177,7 @@ UPnPContext::startUpnp() // Request a new IGD search. for (auto const& [_, protocol] : protocolList_) { - ctx->dispatch([p=protocol] { p->searchForIgd(); }); + ioCtx->dispatch([p=protocol] { p->searchForIgd(); }); } started_ = true; @@ -215,7 +225,7 @@ UPnPContext::stopUpnp(bool forceRelease) // Clear all current IGDs. for (auto const& [_, protocol] : protocolList_) { - ctx->dispatch([p=protocol]{ p->clearIgds(); }); + ioCtx->dispatch([p=protocol]{ p->clearIgds(); }); } started_ = false; @@ -383,7 +393,7 @@ UPnPContext::reserveMapping(Mapping& requestedMap) void UPnPContext::releaseMapping(const Mapping& map) { - ctx->dispatch([this, map] { + stateCtx->dispatch([this, map] { if (shutdownComplete_) return; auto mapPtr = getMappingWithKey(map.getMapKey()); @@ -964,6 +974,10 @@ UPnPContext::processPendingRequests() void UPnPContext::onIgdUpdated(const std::shared_ptr<IGD>& igd, UpnpIgdEvent event) { + if (!stateCtx->get_executor().running_in_this_thread()) { + stateCtx->post([this, igd, event = std::move(event)] { onIgdUpdated(igd, event); }); + return; + } assert(igd); char const* IgdState = event == UpnpIgdEvent::ADDED ? "ADDED" @@ -1026,6 +1040,11 @@ UPnPContext::onIgdUpdated(const std::shared_ptr<IGD>& igd, UpnpIgdEvent event) void UPnPContext::onMappingAdded(const std::shared_ptr<IGD>& igd, const Mapping& mapRes) { + if (!stateCtx->get_executor().running_in_this_thread()) { + stateCtx->post([this, igd, mapRes] { onMappingAdded(igd, mapRes); }); + return; + } + // Check if we have a pending request for this response. auto map = getMappingWithKey(mapRes.getMapKey()); if (not map) { @@ -1062,6 +1081,11 @@ UPnPContext::onMappingAdded(const std::shared_ptr<IGD>& igd, const Mapping& mapR void UPnPContext::onMappingRenewed(const std::shared_ptr<IGD>& igd, const Mapping& map) { + if (!stateCtx->get_executor().running_in_this_thread()) { + stateCtx->post([this, igd, map] { onMappingRenewed(igd, map); }); + return; + } + auto mapPtr = getMappingWithKey(map.getMapKey()); if (not mapPtr) { @@ -1100,6 +1124,11 @@ UPnPContext::requestRemoveMapping(const Mapping::sharedPtr_t& map) void UPnPContext::onMappingRemoved(const std::shared_ptr<IGD>& igd, const Mapping& mapRes) { + if (!stateCtx->get_executor().running_in_this_thread()) { + stateCtx->post([this, igd, mapRes] { onMappingRemoved(igd, mapRes); }); + return; + } + if (not mapRes.isValid()) return; @@ -1111,6 +1140,10 @@ UPnPContext::onMappingRemoved(const std::shared_ptr<IGD>& igd, const Mapping& ma void UPnPContext::onIgdDiscoveryStarted() { + if (!stateCtx->get_executor().running_in_this_thread()) { + stateCtx->post([this] { onIgdDiscoveryStarted(); }); + return; + } std::lock_guard lock(igdDiscoveryMutex_); igdDiscoveryInProgress_ = true; if (logger_) logger_->debug("IGD Discovery started"); @@ -1260,6 +1293,11 @@ UPnPContext::getMappingWithKey(Mapping::key_t key) void UPnPContext::onMappingRequestFailed(const Mapping& mapRes) { + if (!stateCtx->get_executor().running_in_this_thread()) { + stateCtx->post([this, mapRes] { onMappingRequestFailed(mapRes); }); + return; + } + auto igd = mapRes.getIgd(); auto const& map = getMappingWithKey(mapRes.getMapKey()); if (not map) {