diff --git a/src/jamidht/p2p.cpp b/src/jamidht/p2p.cpp index 700012263a22713610a8309d6b08ef011c02542f..5a0c53312834e9688d8313def2de1a0ce48ec003 100644 --- a/src/jamidht/p2p.cpp +++ b/src/jamidht/p2p.cpp @@ -172,7 +172,7 @@ public: std::mutex clientsMutex_; void cancel(const std::string& peer_id, const DRing::DataTransferId& tid); - void cancelChanneled(const DRing::DataTransferId& tid); + void cancelChanneled(const std::string& peer_id, const DRing::DataTransferId& tid); void onRequestMsg(PeerConnectionMsg&&); void onTrustedRequestMsg(PeerConnectionMsg&&, const std::shared_ptr<dht::crypto::Certificate>&, @@ -210,7 +210,7 @@ public: std::mutex channeledIncomingMtx_; std::map<DRing::DataTransferId, std::unique_ptr<ChanneledIncomingTransfer>> channeledIncoming_; std::mutex channeledOutgoingMtx_; - std::map<DRing::DataTransferId, std::shared_ptr<ChanneledOutgoingTransfer>> channeledOutgoing_; + std::map<DRing::DataTransferId, std::vector<std::shared_ptr<ChanneledOutgoingTransfer>>> channeledOutgoing_; std::mutex incomingTransfersMtx_; std::set<DRing::DataTransferId> incomingTransfers_; }; @@ -745,7 +745,7 @@ DhtPeerConnector::Impl::cancel(const std::string& peer_id, const DRing::DataTran } void -DhtPeerConnector::Impl::cancelChanneled(const DRing::DataTransferId& tid) { +DhtPeerConnector::Impl::cancelChanneled(const std::string& peerId, const DRing::DataTransferId& tid) { dht::ThreadPool::io().run([w=weak(), tid] { auto shared = w.lock(); if (!shared) return; @@ -846,19 +846,32 @@ DhtPeerConnector::requestConnection(const std::string& peer_id, auto outgoingFile = std::make_shared<ChanneledOutgoingTransfer>(channel); { std::lock_guard<std::mutex> lk(pimpl_->channeledOutgoingMtx_); - pimpl_->channeledOutgoing_.emplace(tid, outgoingFile); + pimpl_->channeledOutgoing_[tid].emplace_back(outgoingFile); } - channel->onShutdown([this, tid, onChanneledCancelled]() { + channel->onShutdown([this, tid, onChanneledCancelled, peer=outgoingFile->peer()]() { JAMI_INFO("Channel down for outgoing transfer with id(%lu)", tid); onChanneledCancelled(); - dht::ThreadPool::io().run([w=pimpl_->weak(), tid] { + dht::ThreadPool::io().run([w=pimpl_->weak(), tid, peer] { auto shared = w.lock(); if (!shared) return; // Cancel outgoing files { std::lock_guard<std::mutex> lk(shared->channeledOutgoingMtx_); - shared->channeledOutgoing_.erase(tid); + auto outgoingTransfers = shared->channeledOutgoing_.find(tid); + if (outgoingTransfers != shared->channeledOutgoing_.end()) { + auto& currentTransfers = outgoingTransfers->second; + auto it = currentTransfers.begin(); + while (it != currentTransfers.end()) { + auto& transfer = *it; + if (transfer && transfer->peer() == peer) + it = currentTransfers.erase(it); + else + ++it; + } + if (currentTransfers.empty()) + shared->channeledOutgoing_.erase(outgoingTransfers); + } } Manager::instance().dataTransfers->close(tid); }); @@ -888,7 +901,7 @@ void DhtPeerConnector::closeConnection(const std::string& peer_id, const DRing::DataTransferId& tid) { pimpl_->cancel(peer_id, tid); - pimpl_->cancelChanneled(tid); + pimpl_->cancelChanneled(peer_id, tid); } bool