diff --git a/src/data_transfer.cpp b/src/data_transfer.cpp index 0d0a9d7c3058117c5445d2145e65a620b539f7e2..6422644456febde45604f7576cb36260a858d62e 100644 --- a/src/data_transfer.cpp +++ b/src/data_transfer.cpp @@ -283,6 +283,9 @@ public: bool read(std::vector<uint8_t>&) const override; bool write(std::string_view) override; void emit(DRing::DataTransferEventCode code) const override; + const std::string& peer() { return peerUri_; } + + bool isFinished() const { return info_.lastEvent >= DRing::DataTransferEventCode::finished; } void cancel() override { @@ -345,6 +348,7 @@ private: JAMI_DBG() << "FTP#" << getId() << ": sent " << info_.bytesProgress << " bytes"; if (internalCompletionCb_) internalCompletionCb_(info_.path); + if (info_.bytesProgress != info_.totalSize) emit(DRing::DataTransferEventCode::closed_by_peer); else @@ -516,10 +520,16 @@ public: void close() noexcept override; - void cancel() override + bool cancelWithPeer(const std::string& peer) { - for (const auto& subtransfer : subtransfer_) - subtransfer->cancel(); + auto allFinished = true; + for (const auto& subtransfer : subtransfer_) { + if (subtransfer->peer() == peer) + subtransfer->cancel(); + else if (!subtransfer->isFinished()) + allFinished = false; + } + return allFinished; } private: @@ -823,12 +833,15 @@ DataTransferFacade::sendFile(const DRing::DataTransferInfo& info, out->linkTransfer(std::dynamic_pointer_cast<OutgoingFileTransfer>(transfer) ->startNewOutgoing(out->peer())); }, - [this, tid](const std::string& device) { - if (auto transfer = pimpl_->getTransfer(tid)) - if (not transfer->hasBeenStarted()) { + [this, tid](const std::string& peer) { + if (auto transfer = std::dynamic_pointer_cast<OutgoingFileTransfer>( + pimpl_->getTransfer(tid))) { + auto allFinished = transfer->cancelWithPeer(peer); + if (allFinished and not transfer->hasBeenStarted()) { transfer->emit(DRing::DataTransferEventCode::unjoinable_peer); pimpl_->cancel(*transfer); } + } }); return DRing::DataTransferError::success; } catch (const std::exception& ex) { diff --git a/src/jamidht/p2p.cpp b/src/jamidht/p2p.cpp index 01e57c519881cc9856e53831e5172cac5d437902..a29f6b26f564d6b2256452054344d08ecaef600f 100644 --- a/src/jamidht/p2p.cpp +++ b/src/jamidht/p2p.cpp @@ -96,8 +96,10 @@ public: std::map<IpAddr, dht::InfoHash> connectedPeers_; bool validatePeerCertificate(const dht::crypto::Certificate&, dht::InfoHash&); - void closeConnection(const DRing::DataTransferId& tid); - void stateChanged(const DRing::DataTransferId& tid, const DRing::DataTransferEventCode& code); + void closeConnection(const DRing::DataTransferId& tid, const std::string& peer = ""); + void stateChanged(const DRing::DataTransferId& tid, + const DRing::DataTransferEventCode& code, + const std::string& peer); std::shared_ptr<DhtPeerConnector::Impl> shared() { @@ -120,6 +122,7 @@ public: std::mutex channeledIncomingMtx_; std::map<DRing::DataTransferId, std::unique_ptr<ChanneledIncomingTransfer>> channeledIncoming_; std::mutex channeledOutgoingMtx_; + // TODO change <<id, peer>, Channeled> std::map<DRing::DataTransferId, std::vector<std::shared_ptr<ChanneledOutgoingTransfer>>> channeledOutgoing_; std::mutex incomingTransfersMtx_; @@ -144,29 +147,43 @@ DhtPeerConnector::Impl::validatePeerCertificate(const dht::crypto::Certificate& void DhtPeerConnector::Impl::stateChanged(const DRing::DataTransferId& tid, - const DRing::DataTransferEventCode& code) + const DRing::DataTransferEventCode& code, + const std::string& peer) { if (code == DRing::DataTransferEventCode::finished or code == DRing::DataTransferEventCode::closed_by_peer or code == DRing::DataTransferEventCode::timeout_expired) - closeConnection(tid); + closeConnection(tid, peer); } void -DhtPeerConnector::Impl::closeConnection(const DRing::DataTransferId& tid) +DhtPeerConnector::Impl::closeConnection(const DRing::DataTransferId& tid, const std::string& peer) { - dht::ThreadPool::io().run([w = weak(), tid] { + dht::ThreadPool::io().run([w = weak(), tid, peer] { auto shared = w.lock(); if (!shared) return; - // Cancel outgoing files { std::lock_guard<std::mutex> lk(shared->channeledIncomingMtx_); - auto it = shared->channeledIncoming_.erase(tid); + shared->channeledIncoming_.erase(tid); } + // Cancel outgoing files + std::vector<std::shared_ptr<ChanneledOutgoingTransfer>> files; { std::lock_guard<std::mutex> lk(shared->channeledOutgoingMtx_); - shared->channeledOutgoing_.erase(tid); + auto it = shared->channeledOutgoing_.find(tid); + if (it != shared->channeledOutgoing_.end()) { + for (auto chanIt = it->second.begin(); chanIt != it->second.end();) { + if ((*chanIt)->peer() == peer) { + files.emplace_back(std::move(*chanIt)); + chanIt = it->second.erase(chanIt); + } else { + ++chanIt; + } + } + if (it->second.empty()) + shared->channeledOutgoing_.erase(it); + } } }); } @@ -196,13 +213,12 @@ DhtPeerConnector::requestConnection( auto channelReadyCb = [this, tid, - peer_id, channeledConnectedCb, onChanneledCancelled](const std::shared_ptr<ChannelSocket>& channel, - const DeviceId&) { + const DeviceId& deviceId) { auto shared = pimpl_->account.lock(); if (!channel) { - onChanneledCancelled(""); + onChanneledCancelled(deviceId.toString()); return; } if (!shared) @@ -211,8 +227,9 @@ DhtPeerConnector::requestConnection( auto outgoingFile = std::make_shared<ChanneledOutgoingTransfer>( channel, - [this](const DRing::DataTransferId& id, const DRing::DataTransferEventCode& code) { - pimpl_->stateChanged(id, code); + [this, deviceId](const DRing::DataTransferId& id, + const DRing::DataTransferEventCode& code) { + pimpl_->stateChanged(id, code, deviceId.toString()); }); if (!outgoingFile) return; @@ -223,7 +240,7 @@ DhtPeerConnector::requestConnection( channel->onShutdown([this, tid, onChanneledCancelled, peer = outgoingFile->peer()]() { JAMI_INFO("Channel down for outgoing transfer with id(%lu)", tid); - onChanneledCancelled(""); + onChanneledCancelled(peer); dht::ThreadPool::io().run([w = pimpl_->weak(), tid, peer] { auto shared = w.lock(); if (!shared) @@ -246,7 +263,6 @@ DhtPeerConnector::requestConnection( shared->channeledOutgoing_.erase(outgoingTransfers); } } - Manager::instance().dataTransfers->close(tid); }); }); channeledConnectedCb(outgoingFile); @@ -288,7 +304,7 @@ DhtPeerConnector::requestConnection( [peer_h, onChanneledCancelled, accId = acc->getAccountID()](bool found) { if (!found) { JAMI_WARN() << accId << "[CNX] aborted, no devices for " << peer_h; - onChanneledCancelled(""); + onChanneledCancelled(peer_h.toString()); } }); } @@ -326,8 +342,8 @@ DhtPeerConnector::onIncomingConnection(const std::string& peer_id, auto incomingFile = std::make_unique<ChanneledIncomingTransfer>( channel, std::make_shared<FtpServer>(acc->getAccountID(), peer_id, tid, std::move(cb)), - [this](const DRing::DataTransferId& id, const DRing::DataTransferEventCode& code) { - pimpl_->stateChanged(id, code); + [this, peer_id](const DRing::DataTransferId& id, const DRing::DataTransferEventCode& code) { + pimpl_->stateChanged(id, code, peer_id); }); { std::lock_guard<std::mutex> lk(pimpl_->channeledIncomingMtx_);