From f59ff335d3567043687bfba86b6aac2841df7af9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Blin?= <sebastien.blin@savoirfairelinux.com> Date: Wed, 26 Aug 2020 10:54:38 -0400 Subject: [PATCH] filetransfer: do not use DHT anymore This breaks support with old versions from <=2019 Change-Id: Ie56e3db19dd73dfd7668c373caaeaac60e708c9c --- src/data_transfer.cpp | 29 +- src/jamidht/jamiaccount.cpp | 24 +- src/jamidht/jamiaccount.h | 4 +- src/jamidht/p2p.cpp | 882 +++--------------------------------- src/jamidht/p2p.h | 5 +- src/peer_connection.cpp | 289 ------------ src/peer_connection.h | 33 -- 7 files changed, 79 insertions(+), 1187 deletions(-) diff --git a/src/data_transfer.cpp b/src/data_transfer.cpp index 5312cb4d72..bacb166550 100644 --- a/src/data_transfer.cpp +++ b/src/data_transfer.cpp @@ -287,7 +287,7 @@ public: void cancel() override { if (auto account = Manager::instance().getAccount<JamiAccount>(info_.accountId)) - account->closePeerConnection(peerUri_, id); + account->closePeerConnection(id); } void setOnRecv(std::function<void(std::vector<uint8_t>&&)>&& cb) override @@ -597,7 +597,7 @@ public: { auto account = Manager::instance().getAccount<JamiAccount>(info_.accountId); if (account) - account->closePeerConnection(info_.peer, internalId_); + account->closePeerConnection(internalId_); } private: @@ -734,7 +734,6 @@ public: InternalCompletionCb cb); std::shared_ptr<DataTransfer> getTransfer(const DRing::DataTransferId&); void cancel(DataTransfer&); - void onConnectionRequestReply(const DRing::DataTransferId&, PeerConnection*); }; void @@ -783,23 +782,6 @@ DataTransferFacade::Impl::createIncomingFileTransfer(const DRing::DataTransferIn return transfer; } -void -DataTransferFacade::Impl::onConnectionRequestReply(const DRing::DataTransferId& id, - PeerConnection* connection) -{ - if (auto transfer = getTransfer(id)) { - if (connection) { - connection->attachInputStream( - std::dynamic_pointer_cast<OutgoingFileTransfer>(transfer)->startNewOutgoing( - connection->getPeerUri())); - } else if (std::dynamic_pointer_cast<OutgoingFileTransfer>(transfer)->cancel(false) - and not transfer->hasBeenStarted()) { - transfer->emit(DRing::DataTransferEventCode::unjoinable_peer); - cancel(*transfer); - } - } -} - //============================================================================== DataTransferFacade::DataTransferFacade() @@ -846,16 +828,11 @@ DataTransferFacade::sendFile(const DRing::DataTransferInfo& info, try { // IMPLEMENTATION NOTE: requestPeerConnection() may call the given callback a multiple time. // This happen when multiple agents handle communications of the given peer for the given - // account. Example: Ring account supports multi-devices, each can answer to the request. - // NOTE: this will create a PeerConnection for each files. This connection need to be shut - // when finished + // account. Example: Jami account supports multi-devices, each can answer to the request. account->requestPeerConnection( info.peer, tid, static_cast<bool>(cb), - [this, tid](PeerConnection* connection) { - pimpl_->onConnectionRequestReply(tid, connection); - }, [this, tid](const std::shared_ptr<ChanneledOutgoingTransfer>& out) { if (auto transfer = pimpl_->getTransfer(tid)) if (out) diff --git a/src/jamidht/jamiaccount.cpp b/src/jamidht/jamiaccount.cpp index 49ef168bf5..f662826978 100644 --- a/src/jamidht/jamiaccount.cpp +++ b/src/jamidht/jamiaccount.cpp @@ -1178,11 +1178,10 @@ JamiAccount::loadAccount(const std::string& archive_password, auto label = d.second.name.empty() ? id.substr(0, 8) : d.second.name; ids.emplace(std::move(id), std::move(label)); } - dht::ThreadPool::computation().run([id=getAccountID(), devices=std::move(ids)] { + dht::ThreadPool::computation().run([id = getAccountID(), devices = std::move(ids)] { emitSignal<DRing::ConfigurationSignal::KnownDevicesChanged>(id, devices); }); - } - }; + }}; try { auto onAsync = [w = weak()](AccountManager::AsyncUser&& cb) { @@ -2367,7 +2366,6 @@ JamiAccount::doRegister_() if (!dhtPeerConnector_) dhtPeerConnector_ = std::make_unique<DhtPeerConnector>(*this); - dhtPeerConnector_->onDhtConnected(accountManager_->getInfo()->deviceId); std::lock_guard<std::mutex> bLock(buddyInfoMtx); for (auto& buddy : trackedBuddies_) { @@ -2977,12 +2975,13 @@ JamiAccount::removeContact(const std::string& uri, bool ban) std::set<std::string> devices; { std::unique_lock<std::mutex> lk(sipConnectionsMtx_); - for (const auto& deviceConn: sipConnections_[uri]) { + for (const auto& deviceConn : sipConnections_[uri]) { devices.emplace(deviceConn.first); } sipConnections_.erase(uri); - for (auto pendingIt = pendingSipConnections_.begin(); pendingIt != pendingSipConnections_.end();) { + for (auto pendingIt = pendingSipConnections_.begin(); + pendingIt != pendingSipConnections_.end();) { if (uri == pendingIt->first) { devices.emplace(pendingIt->second); pendingIt = pendingSipConnections_.erase(pendingIt); @@ -2992,7 +2991,7 @@ JamiAccount::removeContact(const std::string& uri, bool ban) } } - for (const auto& device: devices) { + for (const auto& device : devices) { if (connectionManager_) connectionManager_->closeConnectionsWith(device); } @@ -3022,7 +3021,8 @@ std::vector<std::map<std::string, std::string>> JamiAccount::getTrustRequests() const { std::lock_guard<std::mutex> lock(configurationMutex_); - return accountManager_ ? accountManager_->getTrustRequests() : std::vector<std::map<std::string, std::string>>{}; + return accountManager_ ? accountManager_->getTrustRequests() + : std::vector<std::map<std::string, std::string>> {}; } bool @@ -3358,28 +3358,26 @@ JamiAccount::requestPeerConnection( const std::string& peer_id, const DRing::DataTransferId& tid, bool isVCard, - const std::function<void(PeerConnection*)>& connect_cb, const std::function<void(const std::shared_ptr<ChanneledOutgoingTransfer>&)>& channeledConnectedCb, const std::function<void()>& onChanneledCancelled) { if (not dhtPeerConnector_) { - runOnMainThread([onChanneledCancelled]{ onChanneledCancelled(); }); + runOnMainThread([onChanneledCancelled] { onChanneledCancelled(); }); return; } dhtPeerConnector_->requestConnection(peer_id, tid, isVCard, - connect_cb, channeledConnectedCb, onChanneledCancelled); } void -JamiAccount::closePeerConnection(const std::string& peer, const DRing::DataTransferId& tid) +JamiAccount::closePeerConnection(const DRing::DataTransferId& tid) { if (dhtPeerConnector_) - dhtPeerConnector_->closeConnection(peer, tid); + dhtPeerConnector_->closeConnection(tid); } void diff --git a/src/jamidht/jamiaccount.h b/src/jamidht/jamiaccount.h index e72456b387..caca25bd87 100644 --- a/src/jamidht/jamiaccount.h +++ b/src/jamidht/jamiaccount.h @@ -72,7 +72,6 @@ struct DeviceSync; struct AccountArchive; class ConnectionManager; class DhtPeerConnector; -class PeerConnection; class ContactList; class AccountManager; struct AccountInfo; @@ -366,7 +365,6 @@ public: const std::string& peer, const DRing::DataTransferId& tid, bool isVCard, - const std::function<void(PeerConnection*)>& connect_cb, const std::function<void(const std::shared_ptr<ChanneledOutgoingTransfer>&)>& channeledConnectedCb, const std::function<void()>& onChanneledCancelled); @@ -377,7 +375,7 @@ public: /// /// \param[in] peer RingID on request's recipient /// /// \param[in] tid linked outgoing data transfer /// - void closePeerConnection(const std::string& peer, const DRing::DataTransferId& tid); + void closePeerConnection(const DRing::DataTransferId& tid); std::vector<std::string> publicAddresses(); diff --git a/src/jamidht/p2p.cpp b/src/jamidht/p2p.cpp index e7b7de1c71..e0d5312807 100644 --- a/src/jamidht/p2p.cpp +++ b/src/jamidht/p2p.cpp @@ -61,85 +61,6 @@ using ValueIdDist = std::uniform_int_distribution<dht::Value::Id>; //============================================================================== -// This namespace prevents a nasty ODR violation with definitions in peer_connection.cpp -inline namespace { - -template<typename CT> -class Timeout -{ -public: - using clock = CT; - using duration = typename CT::duration; - using time_point = typename CT::time_point; - - explicit Timeout(const duration& delay) - : delay {delay} - {} - - void start() { start_ = clock::now(); } - - explicit operator bool() const { return (clock::now() - start_) >= delay; } - - const duration delay {duration::zero()}; - -private: - time_point start_ {}; -}; - -//============================================================================== - -/** - * DHT message to convey a end2end connection request to a peer - */ -class PeerConnectionMsg : public dht::EncryptedValue<PeerConnectionMsg> -{ -public: - static constexpr const dht::ValueType& TYPE = dht::ValueType::USER_DATA; - static constexpr uint32_t protocol_version = 0x01000002; ///< Supported protocol - static constexpr const char* key_prefix = "peer:"; ///< base to compute the DHT listen key - - dht::Value::Id id = dht::Value::INVALID_ID; - uint32_t protocol {protocol_version}; ///< Protocol identification. First bit reserved to - ///< indicate a request (0) or a response (1) - std::vector<std::string> addresses; ///< Request: public addresses for TURN permission. Response: - ///< TURN relay addresses (only 1 in current implementation) - uint64_t tid {0}; - MSGPACK_DEFINE_MAP(id, protocol, addresses, tid) - - PeerConnectionMsg() = default; - PeerConnectionMsg(dht::Value::Id id, - uint32_t aprotocol, - const std::string& arelay, - uint64_t transfer_id) - : id {id} - , protocol {aprotocol} - , addresses {{arelay}} - , tid {transfer_id} - {} - PeerConnectionMsg(dht::Value::Id id, - uint32_t aprotocol, - const std::vector<std::string>& asrelay, - uint64_t transfer_id) - : id {id} - , protocol {aprotocol} - , addresses {asrelay} - , tid {transfer_id} - {} - bool isRequest() const noexcept { return (protocol & 1) == 0; } - PeerConnectionMsg respond(const IpAddr& relay) const - { - return {id, protocol | 1, relay.toString(true, true), tid}; - } - PeerConnectionMsg respond(const std::vector<std::string>& addresses) const - { - return {id, protocol | 1, addresses, tid}; - } -}; - -} // namespace - -//============================================================================== - class DhtPeerConnector::Impl : public std::enable_shared_from_this<DhtPeerConnector::Impl> { public: @@ -151,16 +72,6 @@ public: ~Impl() { - for (auto& thread : answer_threads_) - thread.join(); - { - std::lock_guard<std::mutex> lk(serversMutex_); - servers_.clear(); - } - { - std::lock_guard<std::mutex> lk(clientsMutex_); - clients_.clear(); - } std::lock_guard<std::mutex> lk(waitForReadyMtx_); waitForReadyEndpoints_.clear(); } @@ -185,40 +96,9 @@ public: certMap_; std::map<IpAddr, dht::InfoHash> connectedPeers_; - std::map<std::pair<dht::InfoHash, IpAddr>, std::unique_ptr<PeerConnection>> servers_; - std::mutex serversMutex_; - std::map<IpAddr, std::unique_ptr<TlsTurnEndpoint>> tls_turn_ep_; - - std::map<std::pair<dht::InfoHash, DRing::DataTransferId>, std::unique_ptr<ClientConnector>> - clients_; - std::mutex clientsMutex_; - - void cancel(const std::string& peer_id, const DRing::DataTransferId& tid); - void cancelChanneled(const std::string& peer_id, const DRing::DataTransferId& tid); - - void onRequestMsg(PeerConnectionMsg&&); - void onTrustedRequestMsg(PeerConnectionMsg&&, - const std::shared_ptr<dht::crypto::Certificate>&, - const dht::InfoHash&); - void answerToRequest(PeerConnectionMsg&&, - const std::shared_ptr<dht::crypto::Certificate>&, - const dht::InfoHash&); - void onResponseMsg(PeerConnectionMsg&&); - void onAddDevice(const dht::InfoHash&, - const DRing::DataTransferId&, - const std::shared_ptr<dht::crypto::Certificate>&, - const std::vector<std::string>&, - const std::function<void(PeerConnection*)>&); - bool turnConnect(); bool validatePeerCertificate(const dht::crypto::Certificate&, dht::InfoHash&); - void stateChanged(const std::string& peer_id, - const DRing::DataTransferId& tid, - const DRing::DataTransferEventCode& code); - void closeConnection(const std::string& peer_id, const DRing::DataTransferId& tid); - - std::future<void> loopFut_; // keep it last member - - std::vector<std::thread> answer_threads_; + void closeConnection(const DRing::DataTransferId& tid); + void stateChanged(const DRing::DataTransferId& tid, const DRing::DataTransferEventCode& code); std::shared_ptr<DhtPeerConnector::Impl> shared() { @@ -246,279 +126,6 @@ public: std::mutex incomingTransfersMtx_; std::set<DRing::DataTransferId> incomingTransfers_; }; - -//============================================================================== - -/// This class is responsible of connection to a specific peer. -/// The connected peer acting as server (responsible of the TURN session). -/// When the TURN session is created and your IP is permited, we'll connect it -/// using a system socket. Later the TLS session is negotiated on this socket. -class DhtPeerConnector::Impl::ClientConnector -{ -public: - using ListenerFunction = std::function<void(PeerConnection*)>; - - ClientConnector(Impl& parent, - const DRing::DataTransferId& tid, - const dht::InfoHash& peer_h, - const std::shared_ptr<dht::crypto::Certificate>& peer_cert, - const std::vector<std::string>& public_addresses, - const ListenerFunction& connect_cb) - : tid_ {tid} - , parent_ {parent} - , peer_ {peer_h} - , publicAddresses_ {public_addresses} - , peerCertificate_ {peer_cert} - { - auto shared = parent_.account.lock(); - if (!shared) - return; - waitId_ = ValueIdDist()(shared->rand); - addListener(connect_cb); - processTask_ = std::async(std::launch::async, [this] { - try { - process(); - } catch (const std::exception& e) { - JAMI_ERR() << "[CNX] exception during client processing: " << e.what(); - cancel(); - } - }); - } - - ~ClientConnector() - { - decltype(listeners_) listeners; - { - std::lock_guard<std::mutex> lk {listenersMutex_}; - listeners = listeners_; - } - for (auto& cb : listeners) - cb(nullptr); - connection_.reset(); - } - - bool hasAlreadyAResponse() { return responseReceived_; } - - bool waitId(uint64_t id) { return waitId_ == id; } - - void addListener(const ListenerFunction& cb) - { - if (!connected_) { - std::lock_guard<std::mutex> lk {listenersMutex_}; - listeners_.emplace_back(cb); - } else { - cb(connection_.get()); - } - } - - void cancel() { parent_.cancel(peer_.toString(), tid_); } - - void onDhtResponse(PeerConnectionMsg&& response) - { - if (responseReceived_) - return; - response_ = std::move(response); - responseReceived_ = true; - responseCV_.notify_all(); - } - - const DRing::DataTransferId tid_; - -private: - void process() - { - // Add ice msg into the addresses - // TODO remove publicAddresses in the future and only use the iceMsg - // For now it's here for compability with old version - auto& iceTransportFactory = Manager::instance().getIceTransportFactory(); - auto acc = parent_.account.lock(); - if (!acc) - return; - auto ice_config = acc->getIceOptions(); - ice_config.tcpEnable = true; - auto ice = iceTransportFactory.createTransport(acc->getAccountID().c_str(), - 1, - false, - ice_config); - - if (ice->waitForInitialization(ICE_INIT_TIMEOUT) <= 0) { - JAMI_ERR("Cannot initialize ICE session."); - cancel(); - return; - } - - acc->registerDhtAddress(*ice); - - auto iceAttributes = ice->getLocalAttributes(); - std::stringstream icemsg; - icemsg << iceAttributes.ufrag << "\n"; - icemsg << iceAttributes.pwd << "\n"; - for (const auto& addr : ice->getLocalCandidates(0)) { - icemsg << addr << "\n"; - } - - // Prepare connection request as a DHT message - PeerConnectionMsg request; - request.id = waitId_; /* Random id for the message unicity */ - request.addresses = {icemsg.str()}; - request.addresses.insert(request.addresses.end(), - publicAddresses_.begin(), - publicAddresses_.end()); - request.tid = tid_; - - // Send connection request through DHT - JAMI_DBG() << acc << "[CNX] request connection to " << peer_; - acc->dht()->putEncrypted(dht::InfoHash::get(PeerConnectionMsg::key_prefix - + peer_.toString()), - peer_, - request, - [](bool ok) { - if (ok) - JAMI_DBG("[CNX] successfully put CNX request on DHT"); - else - JAMI_ERR("[CNX] error putting CNX request on DHT"); - }); - - // Wait for call to onResponse() operated by DHT - std::mutex mtx; - std::unique_lock<std::mutex> lk {mtx}; - responseCV_.wait_for(lk, DHT_MSG_TIMEOUT); - if (!responseReceived_) { - JAMI_ERR("no response from DHT to E2E request. Cancel transfer"); - cancel(); - return; - } - - // Check response validity - std::unique_ptr<AbstractSocketEndpoint> peer_ep; - if (response_.from != peer_ or response_.id != request.id or response_.addresses.empty()) - throw std::runtime_error("invalid connection reply"); - - IpAddr relay_addr; - for (const auto& address : response_.addresses) { - if (!(address.size() <= PJ_MAX_HOSTNAME && (relay_addr = address))) { - // Should be ICE SDP - // P2P File transfer. We received an ice SDP message: - auto sdp = IceTransport::parse_SDP(address, *ice); - // NOTE: hasPubIp is used for compability (because ICE is waiting for a certain - // state in old versions) This can be removed when old versions will be unsupported. - auto hasPubIp = parent_.hasPublicIp(sdp); - if (!hasPubIp) - ice->setInitiatorSession(); - if (not ice->start({sdp.rem_ufrag, sdp.rem_pwd}, sdp.rem_candidates)) { - JAMI_WARN("[Account:%s] start ICE failed - fallback to TURN", - acc->getAccountID().c_str()); - break; - } - - ice->waitForNegotiation(ICE_NEGOTIATION_TIMEOUT); - if (ice->isRunning()) { - peer_ep = std::make_unique<IceSocketEndpoint>(ice, true); - JAMI_DBG("[Account:%s] ICE negotiation succeed. Starting file transfer", - acc->getAccountID().c_str()); - if (hasPubIp) - ice->setInitiatorSession(); - break; - } else { - JAMI_ERR("[Account:%s] ICE negotation failed", acc->getAccountID().c_str()); - } - } else { - try { - // Connect to TURN peer using a raw socket - JAMI_DBG() << acc << "[CNX] connecting to TURN relay " - << relay_addr.toString(true, true); - peer_ep = std::make_unique<TcpSocketEndpoint>(relay_addr); - try { - peer_ep->connect(SOCK_TIMEOUT); - } catch (const std::logic_error& e) { - // In case of a timeout - JAMI_WARN() << "TcpSocketEndpoint timeout for addr " - << relay_addr.toString(true, true) << ": " << e.what(); - cancel(); - return; - } catch (...) { - JAMI_WARN() << "TcpSocketEndpoint failure for addr " - << relay_addr.toString(true, true); - cancel(); - return; - } - break; - } catch (std::system_error&) { - JAMI_DBG() << acc << "[CNX] Failed to connect to TURN relay " - << relay_addr.toString(true, true); - } - } - } - - if (!peer_ep) { - cancel(); - return; - } - - // Negotiate a TLS session - JAMI_DBG() << acc << "[CNX] start TLS session"; - tls_ep_ = std::make_unique<TlsSocketEndpoint>(std::move(peer_ep), - acc->identity(), - acc->dhParams(), - *peerCertificate_, - ice->isRunning()); - tls_ep_->setOnStateChange([this, ice = std::move(ice)](tls::TlsSessionState state) { - if (state == tls::TlsSessionState::SHUTDOWN) { - if (!connected_) - JAMI_WARN() << "TLS connection failure from peer " << peer_.toString(); - ice->cancelOperations(); // This will stop current PeerChannel operations - cancel(); - return false; - } else if (state == tls::TlsSessionState::ESTABLISHED) { - // Connected! - connected_ = true; - connection_ = std::make_unique<PeerConnection>([this] { cancel(); }, - peer_.toString(), - std::move(tls_ep_)); - connection_->setOnStateChangedCb( - [p = parent_.weak(), - peer = peer_.toString()](const DRing::DataTransferId& id, - const DRing::DataTransferEventCode& code) { - // NOTE: this callback is shared by all potential inputs/output, not - // only used by connection_, weak pointers MUST be used. - auto parent = p.lock(); - if (!parent) - return; - parent->stateChanged(peer, id, code); - }); - - decltype(listeners_) listeners; - { - std::lock_guard<std::mutex> lk {listenersMutex_}; - listeners = listeners_; - } - for (auto& cb : listeners) { - cb(connection_.get()); - } - } - return true; - }); - } - - Impl& parent_; - const dht::InfoHash peer_; - - std::vector<std::string> publicAddresses_; - std::atomic_bool responseReceived_ {false}; - std::condition_variable responseCV_ {}; - PeerConnectionMsg response_; - uint64_t waitId_ {0}; - std::shared_ptr<dht::crypto::Certificate> peerCertificate_; - std::unique_ptr<PeerConnection> connection_; - std::unique_ptr<TlsSocketEndpoint> tls_ep_; - - std::atomic_bool connected_ {false}; - std::mutex listenersMutex_; - std::vector<ListenerFunction> listeners_; - - std::future<void> processTask_; -}; - //============================================================================== /// Find who is connected by using connection certificate @@ -537,320 +144,17 @@ DhtPeerConnector::Impl::validatePeerCertificate(const dht::crypto::Certificate& } void -DhtPeerConnector::Impl::onRequestMsg(PeerConnectionMsg&& request) -{ - auto acc = account.lock(); - if (!acc) - return; - JAMI_DBG() << acc << "[CNX] rx DHT request from " << request.from; - - // Asynch certificate checking -> trig onTrustedRequestMsg when trusted certificate is found - acc->findCertificate(request.from, - [this, request = std::move(request)]( - const std::shared_ptr<dht::crypto::Certificate>& cert) mutable { - auto acc = account.lock(); - if (!acc) - return; - dht::InfoHash peer_h; - if (AccountManager::foundPeerDevice(cert, peer_h)) - onTrustedRequestMsg(std::move(request), cert, peer_h); - else - JAMI_WARN() - << acc << "[CNX] rejected untrusted connection request from " - << request.from; - }); -} - -void -DhtPeerConnector::Impl::onTrustedRequestMsg(PeerConnectionMsg&& request, - const std::shared_ptr<dht::crypto::Certificate>& cert, - const dht::InfoHash& peer_h) -{ - answer_threads_.emplace_back(&DhtPeerConnector::Impl::answerToRequest, - this, - request, - cert, - peer_h); -} - -void -DhtPeerConnector::Impl::answerToRequest(PeerConnectionMsg&& request, - const std::shared_ptr<dht::crypto::Certificate>& cert, - const dht::InfoHash& peer_h) -{ - auto acc = account.lock(); - if (!acc) - return; - - if (request.tid != 0) { - std::lock_guard<std::mutex> lk(incomingTransfersMtx_); - if (incomingTransfers_.find(request.tid) != incomingTransfers_.end()) { - JAMI_INFO("Incoming request for id(%lu) is already treated via channeled socket", - request.tid); - return; - } - incomingTransfers_.emplace(request.tid); - } - - // Save peer certificate for later TLS session (MUST BE DONE BEFORE TURN PEER AUTHORIZATION) - certMap_.emplace(cert->getId(), std::make_pair(cert, peer_h)); - - auto sendIce = false, hasPubIp = false; - - struct IceReady - { - std::mutex mtx {}; - std::condition_variable cv {}; - bool ready {false}; - }; - auto iceReady = std::make_shared<IceReady>(); - std::shared_ptr<IceTransport> ice; - for (auto& ip : request.addresses) { - try { - if (ip.size() <= PJ_MAX_HOSTNAME) { - IpAddr addr(ip); - if (addr.isIpv4() || addr.isIpv6()) { - JAMI_WARN() << "Deprecated TURN connection. Ignore"; - continue; - } - } - - // P2P File transfer. We received an ice SDP message: - JAMI_DBG() << acc << "[CNX] receiving ICE session request"; - auto& iceTransportFactory = Manager::instance().getIceTransportFactory(); - auto ice_config = acc->getIceOptions(); - ice_config.tcpEnable = true; - ice_config.onRecvReady = [iceReady]() { - auto& ir = *iceReady; - std::lock_guard<std::mutex> lk {ir.mtx}; - ir.ready = true; - ir.cv.notify_one(); - }; - ice = iceTransportFactory.createTransport(acc->getAccountID().c_str(), - 1, - true, - ice_config); - - if (ice->waitForInitialization(ICE_INIT_TIMEOUT) <= 0) { - JAMI_ERR("Cannot initialize ICE session."); - continue; - } - - acc->registerDhtAddress(*ice); - - auto sdp = IceTransport::parse_SDP(ip, *ice); - // NOTE: hasPubIp is used for compability (because ICE is waiting for a certain state in - // old versions) This can be removed when old versions will be unsupported (version - // before this patch) - hasPubIp = hasPublicIp(sdp); - if (not ice->start({sdp.rem_ufrag, sdp.rem_pwd}, sdp.rem_candidates)) { - JAMI_WARN("[Account:%s] start ICE failed - fallback to TURN", - acc->getAccountID().c_str()); - continue; - } - - if (!hasPubIp) { - ice->waitForNegotiation(ICE_NEGOTIATION_TIMEOUT); - if (ice->isRunning()) { - sendIce = true; - JAMI_DBG("[Account:%s] ICE negotiation succeed. Answering with local SDP", - acc->getAccountID().c_str()); - } else { - JAMI_WARN("[Account:%s] ICE negotation failed", acc->getAccountID().c_str()); - ice->cancelOperations(); - } - } else - sendIce = true; // Ice started with success, we can use it. - } catch (const std::exception& e) { - JAMI_WARN() << acc << "[CNX] ignored peer connection '" << ip << "', " << e.what(); - } - } - - // Prepare connection request as a DHT message - std::vector<std::string> addresses; - - if (sendIce) { - // NOTE: This is a shortest version of a real SDP message to save some bits - auto iceAttributes = ice->getLocalAttributes(); - std::stringstream icemsg; - icemsg << iceAttributes.ufrag << "\n"; - icemsg << iceAttributes.pwd << "\n"; - for (const auto& addr : ice->getLocalCandidates(0)) { - icemsg << addr << "\n"; - } - addresses = {icemsg.str()}; - } - - if (addresses.empty()) { - JAMI_DBG() << acc << "[CNX] connection aborted, no family address found"; - return; - } - - JAMI_DBG() << acc << "[CNX] connection accepted, DHT reply to " << request.from; - acc->dht()->putEncrypted(dht::InfoHash::get(PeerConnectionMsg::key_prefix - + request.from.toString()), - request.from, - request.respond(addresses)); - - if (sendIce) { - if (hasPubIp) { - ice->waitForNegotiation(ICE_NEGOTIATION_TIMEOUT); - if (ice->isRunning()) { - JAMI_DBG("[Account:%s] ICE negotiation succeed. Answering with local SDP", - acc->getAccountID().c_str()); - } else { - JAMI_WARN("[Account:%s] ICE negotation failed - Fallbacking to TURN", - acc->getAccountID().c_str()); - return; // wait for onTurnPeerConnection - } - } - - if (not iceReady->ready) { - if (!hasPubIp) - ice->setSlaveSession(); - std::unique_lock<std::mutex> lk {iceReady->mtx}; - if (not iceReady->cv.wait_for(lk, ICE_READY_TIMEOUT, [&] { return iceReady->ready; })) { - // This will fallback on TURN if ICE is not ready - return; - } - } - - std::unique_ptr<AbstractSocketEndpoint> peer_ep = std::make_unique<IceSocketEndpoint>(ice, - false); - JAMI_DBG() << acc << "[CNX] start TLS session"; - if (hasPubIp) - ice->setSlaveSession(); - - auto idx = std::make_pair(peer_h, ice->getRemoteAddress(0)); - std::lock_guard<std::mutex> lk(waitForReadyMtx_); - auto it = waitForReadyEndpoints_.emplace( - idx, - std::make_unique<TlsSocketEndpoint>(std::move(peer_ep), - acc->identity(), - acc->dhParams(), - [peer_h, - this](const dht::crypto::Certificate& cert) { - dht::InfoHash peer_h_found; - return validatePeerCertificate(cert, - peer_h_found) - and peer_h_found == peer_h; - })); - - it.first->second->setOnStateChange([this, - accountId = acc->getAccountID(), - idx = std::move(idx)](tls::TlsSessionState state) { - std::lock_guard<std::mutex> lk(waitForReadyMtx_); - if (waitForReadyEndpoints_.find(idx) == waitForReadyEndpoints_.end()) - return false; - if (state == tls::TlsSessionState::SHUTDOWN) { - JAMI_WARN() << "TLS connection failure"; - waitForReadyEndpoints_.erase(idx); - return false; - } else if (state == tls::TlsSessionState::ESTABLISHED) { - // Connected! - auto peer_h = idx.first.toString(); - auto connection = std::make_unique<PeerConnection>([] {}, - peer_h, - std::move( - waitForReadyEndpoints_[idx])); - connection->setOnStateChangedCb( - [w = weak(), peer_h](const DRing::DataTransferId& id, - const DRing::DataTransferEventCode& code) { - if (auto sthis = w.lock()) { - sthis->stateChanged(peer_h, id, code); - } - }); - connection->attachOutputStream(std::make_shared<FtpServer>(accountId, peer_h)); - { - std::lock_guard<std::mutex> lk(serversMutex_); - servers_.emplace(idx, std::move(connection)); - } - waitForReadyEndpoints_.erase(idx); - return false; - } - return true; - }); - } else { - JAMI_WARN() << "No connection negotiated. Abort file transfer"; - } -} - -void -DhtPeerConnector::Impl::onResponseMsg(PeerConnectionMsg&& response) -{ - auto acc = account.lock(); - if (!acc) - return; - JAMI_DBG() << acc << "[CNX] rx DHT reply from " << response.from; - std::lock_guard<std::mutex> lock(clientsMutex_); - for (auto& client : clients_) { - // NOTE We can receives multiple files from one peer. So fill unanswered clients with linked id. - if (client.first.first == response.from && client.second - && !client.second->hasAlreadyAResponse() && client.second->waitId(response.id)) { - client.second->onDhtResponse(std::move(response)); - break; - } - } -} - -void -DhtPeerConnector::Impl::onAddDevice(const dht::InfoHash& dev_h, - const DRing::DataTransferId& tid, - const std::shared_ptr<dht::crypto::Certificate>& peer_cert, - const std::vector<std::string>& public_addresses, - const std::function<void(PeerConnection*)>& connect_cb) -{ - auto client = std::make_pair(dev_h, tid); - std::lock_guard<std::mutex> lock(clientsMutex_); - const auto& iter = clients_.find(client); - if (iter == std::end(clients_)) { - clients_.emplace(client, - std::make_unique<Impl::ClientConnector>(*this, - tid, - dev_h, - peer_cert, - public_addresses, - connect_cb)); - } else { - iter->second->addListener(connect_cb); - } -} - -void -DhtPeerConnector::Impl::cancel(const std::string& peer_id, const DRing::DataTransferId& tid) +DhtPeerConnector::Impl::stateChanged(const DRing::DataTransferId& tid, + const DRing::DataTransferEventCode& code) { - dht::ThreadPool::io().run([w = weak(), dev_h = dht::InfoHash(peer_id), tid] { - auto shared = w.lock(); - if (!shared) - return; - // Cancel outgoing files - { - std::lock_guard<std::mutex> lock(shared->clientsMutex_); - shared->clients_.erase(std::make_pair(dev_h, tid)); - } - // Cancel incoming files - std::unique_lock<std::mutex> lk(shared->serversMutex_); - auto it = std::find_if(shared->servers_.begin(), - shared->servers_.end(), - [&dev_h, &tid](const auto& element) { - return (element.first.first == dev_h && element.second - && element.second->hasStreamWithId(tid)); - }); - if (it == shared->servers_.end()) { - Manager::instance().dataTransfers->close(tid); - return; - } - auto peer = it->first.second; // tmp copy to prevent use-after-free below - shared->servers_.erase(it); - lk.unlock(); - // Remove the file transfer if p2p - shared->connectedPeers_.erase(peer); - Manager::instance().dataTransfers->close(tid); - }); + if (code == DRing::DataTransferEventCode::finished + or code == DRing::DataTransferEventCode::closed_by_peer + or code == DRing::DataTransferEventCode::timeout_expired) + closeConnection(tid); } void -DhtPeerConnector::Impl::cancelChanneled(const std::string& peerId, const DRing::DataTransferId& tid) +DhtPeerConnector::Impl::closeConnection(const DRing::DataTransferId& tid) { dht::ThreadPool::io().run([w = weak(), tid] { auto shared = w.lock(); @@ -868,24 +172,6 @@ DhtPeerConnector::Impl::cancelChanneled(const std::string& peerId, const DRing:: }); } -void -DhtPeerConnector::Impl::stateChanged(const std::string& peer_id, - const DRing::DataTransferId& tid, - const DRing::DataTransferEventCode& code) -{ - if (code == DRing::DataTransferEventCode::finished - or code == DRing::DataTransferEventCode::closed_by_peer - or code == DRing::DataTransferEventCode::timeout_expired) - closeConnection(peer_id, tid); -} - -void -DhtPeerConnector::Impl::closeConnection(const std::string& peer_id, const DRing::DataTransferId& tid) -{ - cancel(peer_id, tid); - cancelChanneled(peer_id, tid); -} - //============================================================================== DhtPeerConnector::DhtPeerConnector(JamiAccount& account) @@ -894,43 +180,11 @@ DhtPeerConnector::DhtPeerConnector(JamiAccount& account) DhtPeerConnector::~DhtPeerConnector() = default; -/// Called by a JamiAccount when it's DHT is connected -/// Install a DHT LISTEN operation on given device to receive data connection requests and replies -/// The DHT key is Hash(PeerConnectionMsg::key_prefix + device_id), where '+' is the string concatenation. -void -DhtPeerConnector::onDhtConnected(const std::string& device_id) -{ - auto acc = pimpl_->account.lock(); - if (!acc) - return; - acc->dht()->listen<PeerConnectionMsg>( - dht::InfoHash::get(PeerConnectionMsg::key_prefix + device_id), - [this](PeerConnectionMsg&& msg) { - auto acc = pimpl_->account.lock(); - if (!acc) - return false; - if (msg.from == acc->dht()->getId()) - return true; - if (!acc->isMessageTreated(to_hex_string(msg.id))) { - if (msg.isRequest()) - pimpl_->onRequestMsg(std::move(msg)); - else - pimpl_->onResponseMsg(std::move(msg)); - } - return true; - }, - [](const dht::Value& v) { - // Avoid to answer for peer requests - return v.user_type != "peer_request"; - }); -} - void DhtPeerConnector::requestConnection( const std::string& peer_id, const DRing::DataTransferId& tid, bool isVCard, - const std::function<void(PeerConnection*)>& connect_cb, const std::function<void(const std::shared_ptr<ChanneledOutgoingTransfer>&)>& channeledConnectedCb, const std::function<void()>& onChanneledCancelled) @@ -941,62 +195,59 @@ DhtPeerConnector::requestConnection( const auto peer_h = dht::InfoHash(peer_id); - auto channelReadyCb = - [this, tid, peer_id, channeledConnectedCb, onChanneledCancelled, connect_cb]( - const std::shared_ptr<ChannelSocket>& channel) { - auto shared = pimpl_->account.lock(); - if (!channel) { - onChanneledCancelled(); - return; - } - if (!shared) - return; - JAMI_INFO("New file channel for outgoing transfer with id(%lu)", tid); + auto channelReadyCb = [this, tid, peer_id, channeledConnectedCb, onChanneledCancelled]( + const std::shared_ptr<ChannelSocket>& channel) { + auto shared = pimpl_->account.lock(); + if (!channel) { + onChanneledCancelled(); + return; + } + if (!shared) + return; + JAMI_INFO("New file channel for outgoing transfer with id(%lu)", tid); - auto outgoingFile = std::make_shared<ChanneledOutgoingTransfer>( - channel, - [this, peer_id](const DRing::DataTransferId& id, - const DRing::DataTransferEventCode& code) { - pimpl_->stateChanged(peer_id, id, code); - }); - if (!outgoingFile) - return; - { - std::lock_guard<std::mutex> lk(pimpl_->channeledOutgoingMtx_); - pimpl_->channeledOutgoing_[tid].emplace_back(outgoingFile); - } + auto outgoingFile = std::make_shared<ChanneledOutgoingTransfer>( + channel, + [this](const DRing::DataTransferId& id, const DRing::DataTransferEventCode& code) { + pimpl_->stateChanged(id, code); + }); + if (!outgoingFile) + return; + { + std::lock_guard<std::mutex> lk(pimpl_->channeledOutgoingMtx_); + pimpl_->channeledOutgoing_[tid].emplace_back(outgoingFile); + } - channel->onShutdown([this, tid, onChanneledCancelled, peer = outgoingFile->peer()]() { - JAMI_INFO("Channel down for outgoing transfer with id(%lu)", tid); - onChanneledCancelled(); - dht::ThreadPool::io().run([w = pimpl_->weak(), tid, peer] { - auto shared = w.lock(); - if (!shared) - return; - // Cancel outgoing files - { - std::lock_guard<std::mutex> lk(shared->channeledOutgoingMtx_); - auto outgoingTransfers = shared->channeledOutgoing_.find(tid); - if (outgoingTransfers != shared->channeledOutgoing_.end()) { - auto& currentTransfers = outgoingTransfers->second; - auto it = currentTransfers.begin(); - while (it != currentTransfers.end()) { - auto& transfer = *it; - if (transfer && transfer->peer() == peer) - it = currentTransfers.erase(it); - else - ++it; - } - if (currentTransfers.empty()) - shared->channeledOutgoing_.erase(outgoingTransfers); + channel->onShutdown([this, tid, onChanneledCancelled, peer = outgoingFile->peer()]() { + JAMI_INFO("Channel down for outgoing transfer with id(%lu)", tid); + onChanneledCancelled(); + dht::ThreadPool::io().run([w = pimpl_->weak(), tid, peer] { + auto shared = w.lock(); + if (!shared) + return; + // Cancel outgoing files + { + std::lock_guard<std::mutex> lk(shared->channeledOutgoingMtx_); + auto outgoingTransfers = shared->channeledOutgoing_.find(tid); + if (outgoingTransfers != shared->channeledOutgoing_.end()) { + auto& currentTransfers = outgoingTransfers->second; + auto it = currentTransfers.begin(); + while (it != currentTransfers.end()) { + auto& transfer = *it; + if (transfer && transfer->peer() == peer) + it = currentTransfers.erase(it); + else + ++it; } + if (currentTransfers.empty()) + shared->channeledOutgoing_.erase(outgoingTransfers); } - }); + } + Manager::instance().dataTransfers->close(tid); }); - // Cancel via DHT because we will use the channeled path - connect_cb(nullptr); - channeledConnectedCb(outgoingFile); - }; + }); + channeledConnectedCb(outgoingFile); + }; if (isVCard) { acc->connectionManager().connectDevice(peer_id, @@ -1016,7 +267,7 @@ DhtPeerConnector::requestConnection( acc->forEachDevice( peer_h, - [this, addresses, connect_cb, tid, channelReadyCb = std::move(channelReadyCb)]( + [this, addresses, tid, channelReadyCb = std::move(channelReadyCb)]( const dht::InfoHash& dev_h) { auto acc = pimpl_->account.lock(); if (!acc) @@ -1029,27 +280,20 @@ DhtPeerConnector::requestConnection( acc->connectionManager().connectDevice(dev_h.toString(), "file://" + std::to_string(tid), channelReadyCb); - - acc->findCertificate(dev_h, - [this, dev_h, addresses, connect_cb, tid]( - const std::shared_ptr<dht::crypto::Certificate>& cert) { - pimpl_->onAddDevice(dev_h, tid, cert, addresses, connect_cb); - }); }, - [this, peer_h, connect_cb, onChanneledCancelled, accId = acc->getAccountID()](bool found) { + [this, peer_h, onChanneledCancelled, accId = acc->getAccountID()](bool found) { if (!found) { JAMI_WARN() << accId << "[CNX] aborted, no devices for " << peer_h; - connect_cb(nullptr); onChanneledCancelled(); } }); } void -DhtPeerConnector::closeConnection(const std::string& peer_id, const DRing::DataTransferId& tid) +DhtPeerConnector::closeConnection(const DRing::DataTransferId& tid) { - pimpl_->closeConnection(peer_id, tid); + pimpl_->closeConnection(tid); } bool @@ -1079,8 +323,8 @@ DhtPeerConnector::onIncomingConnection(const std::string& peer_id, auto incomingFile = std::make_unique<ChanneledIncomingTransfer>( channel, std::make_shared<FtpServer>(acc->getAccountID(), peer_id, tid, std::move(cb)), - [this, peer_id](const DRing::DataTransferId& id, const DRing::DataTransferEventCode& code) { - pimpl_->stateChanged(peer_id, id, code); + [this](const DRing::DataTransferId& id, const DRing::DataTransferEventCode& code) { + pimpl_->stateChanged(id, code); }); { std::lock_guard<std::mutex> lk(pimpl_->channeledIncomingMtx_); diff --git a/src/jamidht/p2p.h b/src/jamidht/p2p.h index 47a337317a..cde1191ccc 100644 --- a/src/jamidht/p2p.h +++ b/src/jamidht/p2p.h @@ -32,7 +32,6 @@ namespace jami { class JamiAccount; -class PeerConnection; class DhtPeerConnector { @@ -40,16 +39,14 @@ public: DhtPeerConnector(JamiAccount& account); ~DhtPeerConnector(); - void onDhtConnected(const std::string& device_id); void requestConnection( const std::string& peer_id, const DRing::DataTransferId& tid, bool isVCard, - const std::function<void(PeerConnection*)>& connect_cb, const std::function<void(const std::shared_ptr<ChanneledOutgoingTransfer>&)>& channeledConnectedCb, const std::function<void()>& onChanneledCancelled); - void closeConnection(const std::string& peer_id, const DRing::DataTransferId& tid); + void closeConnection(const DRing::DataTransferId& tid); bool onIncomingChannelRequest(const DRing::DataTransferId& tid); void onIncomingConnection(const std::string& peer_id, const DRing::DataTransferId& tid, diff --git a/src/peer_connection.cpp b/src/peer_connection.cpp index ad00ce9e73..ba4ccdd0cf 100644 --- a/src/peer_connection.cpp +++ b/src/peer_connection.cpp @@ -678,293 +678,4 @@ TlsSocketEndpoint::underlyingICE() const return {}; } -//============================================================================== - -// following namespace prevents an ODR violation with definitions in p2p.cpp -namespace { - -enum class CtrlMsgType { - STOP, - ATTACH_INPUT, - ATTACH_OUTPUT, -}; - -struct CtrlMsg -{ - virtual CtrlMsgType type() const = 0; - virtual ~CtrlMsg() = default; -}; - -struct StopCtrlMsg final : CtrlMsg -{ - explicit StopCtrlMsg() {} - CtrlMsgType type() const override { return CtrlMsgType::STOP; } -}; - -struct AttachInputCtrlMsg final : CtrlMsg -{ - explicit AttachInputCtrlMsg(const std::shared_ptr<Stream>& stream) - : stream {stream} - {} - CtrlMsgType type() const override { return CtrlMsgType::ATTACH_INPUT; } - const std::shared_ptr<Stream> stream; -}; - -struct AttachOutputCtrlMsg final : CtrlMsg -{ - explicit AttachOutputCtrlMsg(const std::shared_ptr<Stream>& stream) - : stream {stream} - {} - CtrlMsgType type() const override { return CtrlMsgType::ATTACH_OUTPUT; } - const std::shared_ptr<Stream> stream; -}; - -} // namespace - -//============================================================================== - -class PeerConnection::PeerConnectionImpl -{ -public: - PeerConnectionImpl(std::function<void()>&& done, - const std::string& peer_uri, - std::unique_ptr<SocketType> endpoint) - : peer_uri {peer_uri} - , endpoint_ {std::move(endpoint)} - , eventLoopFut_ {std::async(std::launch::async, [this, done = std::move(done)] { - try { - eventLoop(); - } catch (const std::exception& e) { - JAMI_ERR() << "[CNX] peer connection event loop failure: " << e.what(); - done(); - } - })} - {} - - ~PeerConnectionImpl() - { - ctrlChannel << std::make_unique<StopCtrlMsg>(); - endpoint_->shutdown(); - } - - bool hasStreamWithId(const DRing::DataTransferId& id) - { - auto isInInput = std::any_of(inputs_.begin(), - inputs_.end(), - [&id](const std::shared_ptr<Stream>& str) { - return str && str->getId() == id; - }); - if (isInInput) - return true; - auto isInOutput = std::any_of(outputs_.begin(), - outputs_.end(), - [&id](const std::shared_ptr<Stream>& str) { - return str && str->getId() == id; - }); - return isInOutput; - } - - const std::string peer_uri; - Channel<std::unique_ptr<CtrlMsg>> ctrlChannel; - OnStateChangedCb stateChangedCb_; - std::vector<std::shared_ptr<Stream>> inputs_; - std::vector<std::shared_ptr<Stream>> outputs_; - -private: - std::unique_ptr<SocketType> endpoint_; - std::future<void> eventLoopFut_; - std::vector<uint8_t> bufferPool_; // will store non rattached buffers - - void eventLoop(); - - template<typename L, typename C> - void handle_stream_list(L& stream_list, const C& callable) - { - if (stream_list.empty()) - return; - const auto& item = std::begin(stream_list); - auto& stream = *item; - try { - if (callable(stream)) - return; - JAMI_DBG() << "EOF on stream #" << stream->getId(); - } catch (const std::system_error& e) { - JAMI_WARN() << "Stream #" << stream->getId() << " IO failed with code = " << e.code(); - } catch (const std::exception& e) { - JAMI_ERR() << "Unexpected exception during IO with stream #" << stream->getId() << ": " - << e.what(); - } - stream->close(); - stream_list.erase(item); - } -}; - -void -PeerConnection::PeerConnectionImpl::eventLoop() -{ - JAMI_DBG() << "[CNX] Peer connection to " << peer_uri << " ready"; - while (true) { - // Process ctrl orders first - while (true) { - std::unique_ptr<CtrlMsg> msg; - if (outputs_.empty() and inputs_.empty()) { - if (!ctrlChannel.empty()) { - msg = ctrlChannel.receive(); - } else { - std::error_code ec; - if (endpoint_->waitForData(std::chrono::milliseconds(100), ec) > 0) { - std::vector<uint8_t> buf(IO_BUFFER_SIZE); - JAMI_DBG("A good buffer arrived before any input or output attachment"); - auto size = endpoint_->read(buf, ec); - if (ec) - throw std::system_error(ec); - // If it's a good read, we should store the buffer somewhere - // and give it to the next input or output. - if (size < IO_BUFFER_SIZE) - bufferPool_.insert(bufferPool_.end(), buf.begin(), buf.begin() + size); - } - break; - } - } else if (!ctrlChannel.empty()) { - msg = ctrlChannel.receive(); - } else - break; - - switch (msg->type()) { - case CtrlMsgType::ATTACH_INPUT: { - auto& input_msg = static_cast<AttachInputCtrlMsg&>(*msg); - input_msg.stream->setOnStateChangedCb(stateChangedCb_); - inputs_.emplace_back(std::move(input_msg.stream)); - } break; - - case CtrlMsgType::ATTACH_OUTPUT: { - auto& output_msg = static_cast<AttachOutputCtrlMsg&>(*msg); - output_msg.stream->setOnStateChangedCb(stateChangedCb_); - outputs_.emplace_back(std::move(output_msg.stream)); - } break; - - case CtrlMsgType::STOP: - return; - - default: - JAMI_ERR("BUG: got unhandled control msg!"); - break; - } - } - - // Then handles IO streams - std::vector<uint8_t> buf; - std::error_code ec; - - bool sleep = true; - - // sending loop - handle_stream_list(inputs_, [&](auto& stream) { - if (!stream) - return false; - buf.resize(IO_BUFFER_SIZE); - if (stream->read(buf)) { - if (not buf.empty()) { - endpoint_->write(buf, ec); - if (ec) - throw std::system_error(ec); - sleep = false; - } - } else { - // EOF on outgoing stream => finished - return false; - } - if (!bufferPool_.empty()) { - stream->write(bufferPool_); - bufferPool_.clear(); - } else if (endpoint_->waitForData(std::chrono::milliseconds(0), ec) > 0) { - buf.resize(IO_BUFFER_SIZE); - endpoint_->read(buf, ec); - if (ec) - throw std::system_error(ec); - return stream->write(buf); - } else if (ec) - throw std::system_error(ec); - return true; - }); - - // receiving loop - handle_stream_list(outputs_, [&](auto& stream) { - if (!stream) - return false; - buf.resize(IO_BUFFER_SIZE); - auto eof = stream->read(buf); - // if eof we let a chance to send a reply before leaving - if (not buf.empty()) { - endpoint_->write(buf, ec); - if (ec) - throw std::system_error(ec); - } - if (not eof) - return false; - - if (!bufferPool_.empty()) { - stream->write(bufferPool_); - bufferPool_.clear(); - } else if (endpoint_->waitForData(std::chrono::milliseconds(0), ec) > 0) { - buf.resize(IO_BUFFER_SIZE); - endpoint_->read(buf, ec); - if (ec) - throw std::system_error(ec); - sleep = false; - return stream->write(buf); - } else if (ec) - throw std::system_error(ec); - return true; - }); - - if (sleep) - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - } -} - -//============================================================================== - -PeerConnection::PeerConnection(std::function<void()>&& done, - const std::string& peer_uri, - std::unique_ptr<GenericSocket<uint8_t>> endpoint) - : pimpl_(std::make_unique<PeerConnectionImpl>(std::move(done), peer_uri, std::move(endpoint))) -{} - -PeerConnection::~PeerConnection() {} - -void -PeerConnection::attachInputStream(const std::shared_ptr<Stream>& stream) -{ - pimpl_->ctrlChannel << std::make_unique<AttachInputCtrlMsg>(stream); -} - -void -PeerConnection::attachOutputStream(const std::shared_ptr<Stream>& stream) -{ - pimpl_->ctrlChannel << std::make_unique<AttachOutputCtrlMsg>(stream); -} - -bool -PeerConnection::hasStreamWithId(const DRing::DataTransferId& id) -{ - return pimpl_->hasStreamWithId(id); -} - -std::string -PeerConnection::getPeerUri() const -{ - return pimpl_->peer_uri; -} - -void -PeerConnection::setOnStateChangedCb(const OnStateChangedCb& cb) -{ - pimpl_->stateChangedCb_ = cb; - for (auto& input : pimpl_->inputs_) - input->setOnStateChangedCb(pimpl_->stateChangedCb_); - for (auto& output : pimpl_->outputs_) - output->setOnStateChangedCb(pimpl_->stateChangedCb_); -} - } // namespace jami diff --git a/src/peer_connection.h b/src/peer_connection.h index 744afd1129..6b9059b286 100644 --- a/src/peer_connection.h +++ b/src/peer_connection.h @@ -246,37 +246,4 @@ private: std::unique_ptr<Impl> pimpl_; }; -//============================================================================== - -class PeerConnection -{ -public: - using SocketType = GenericSocket<uint8_t>; - PeerConnection(std::function<void()>&& done, - const std::string& peer_uri, - std::unique_ptr<SocketType> endpoint); - - ~PeerConnection(); - - void attachOutputStream(const std::shared_ptr<Stream>& stream); - - void attachInputStream(const std::shared_ptr<Stream>& stream); - - /** - * Check if an input or output stream got the given id. - * NOTE: used by p2p to know which PeerConnection to close - * @param id to check - * @return if id is found - */ - bool hasStreamWithId(const DRing::DataTransferId& id); - - std::string getPeerUri() const; - - void setOnStateChangedCb(const OnStateChangedCb&); - -private: - class PeerConnectionImpl; - std::unique_ptr<PeerConnectionImpl> pimpl_; -}; - } // namespace jami -- GitLab