Commit f59ff335 authored by Sébastien Blin's avatar Sébastien Blin Committed by Adrien Béraud

filetransfer: do not use DHT anymore

This breaks support with old versions from <=2019

Change-Id: Ie56e3db19dd73dfd7668c373caaeaac60e708c9c
parent 66336ccb
......@@ -287,7 +287,7 @@ public:
void cancel() override
{
if (auto account = Manager::instance().getAccount<JamiAccount>(info_.accountId))
account->closePeerConnection(peerUri_, id);
account->closePeerConnection(id);
}
void setOnRecv(std::function<void(std::vector<uint8_t>&&)>&& cb) override
......@@ -597,7 +597,7 @@ public:
{
auto account = Manager::instance().getAccount<JamiAccount>(info_.accountId);
if (account)
account->closePeerConnection(info_.peer, internalId_);
account->closePeerConnection(internalId_);
}
private:
......@@ -734,7 +734,6 @@ public:
InternalCompletionCb cb);
std::shared_ptr<DataTransfer> getTransfer(const DRing::DataTransferId&);
void cancel(DataTransfer&);
void onConnectionRequestReply(const DRing::DataTransferId&, PeerConnection*);
};
void
......@@ -783,23 +782,6 @@ DataTransferFacade::Impl::createIncomingFileTransfer(const DRing::DataTransferIn
return transfer;
}
void
DataTransferFacade::Impl::onConnectionRequestReply(const DRing::DataTransferId& id,
PeerConnection* connection)
{
if (auto transfer = getTransfer(id)) {
if (connection) {
connection->attachInputStream(
std::dynamic_pointer_cast<OutgoingFileTransfer>(transfer)->startNewOutgoing(
connection->getPeerUri()));
} else if (std::dynamic_pointer_cast<OutgoingFileTransfer>(transfer)->cancel(false)
and not transfer->hasBeenStarted()) {
transfer->emit(DRing::DataTransferEventCode::unjoinable_peer);
cancel(*transfer);
}
}
}
//==============================================================================
DataTransferFacade::DataTransferFacade()
......@@ -846,16 +828,11 @@ DataTransferFacade::sendFile(const DRing::DataTransferInfo& info,
try {
// IMPLEMENTATION NOTE: requestPeerConnection() may call the given callback a multiple time.
// This happen when multiple agents handle communications of the given peer for the given
// account. Example: Ring account supports multi-devices, each can answer to the request.
// NOTE: this will create a PeerConnection for each files. This connection need to be shut
// when finished
// account. Example: Jami account supports multi-devices, each can answer to the request.
account->requestPeerConnection(
info.peer,
tid,
static_cast<bool>(cb),
[this, tid](PeerConnection* connection) {
pimpl_->onConnectionRequestReply(tid, connection);
},
[this, tid](const std::shared_ptr<ChanneledOutgoingTransfer>& out) {
if (auto transfer = pimpl_->getTransfer(tid))
if (out)
......
......@@ -1178,11 +1178,10 @@ JamiAccount::loadAccount(const std::string& archive_password,
auto label = d.second.name.empty() ? id.substr(0, 8) : d.second.name;
ids.emplace(std::move(id), std::move(label));
}
dht::ThreadPool::computation().run([id=getAccountID(), devices=std::move(ids)] {
dht::ThreadPool::computation().run([id = getAccountID(), devices = std::move(ids)] {
emitSignal<DRing::ConfigurationSignal::KnownDevicesChanged>(id, devices);
});
}
};
}};
try {
auto onAsync = [w = weak()](AccountManager::AsyncUser&& cb) {
......@@ -2367,7 +2366,6 @@ JamiAccount::doRegister_()
if (!dhtPeerConnector_)
dhtPeerConnector_ = std::make_unique<DhtPeerConnector>(*this);
dhtPeerConnector_->onDhtConnected(accountManager_->getInfo()->deviceId);
std::lock_guard<std::mutex> bLock(buddyInfoMtx);
for (auto& buddy : trackedBuddies_) {
......@@ -2977,12 +2975,13 @@ JamiAccount::removeContact(const std::string& uri, bool ban)
std::set<std::string> devices;
{
std::unique_lock<std::mutex> lk(sipConnectionsMtx_);
for (const auto& deviceConn: sipConnections_[uri]) {
for (const auto& deviceConn : sipConnections_[uri]) {
devices.emplace(deviceConn.first);
}
sipConnections_.erase(uri);
for (auto pendingIt = pendingSipConnections_.begin(); pendingIt != pendingSipConnections_.end();) {
for (auto pendingIt = pendingSipConnections_.begin();
pendingIt != pendingSipConnections_.end();) {
if (uri == pendingIt->first) {
devices.emplace(pendingIt->second);
pendingIt = pendingSipConnections_.erase(pendingIt);
......@@ -2992,7 +2991,7 @@ JamiAccount::removeContact(const std::string& uri, bool ban)
}
}
for (const auto& device: devices) {
for (const auto& device : devices) {
if (connectionManager_)
connectionManager_->closeConnectionsWith(device);
}
......@@ -3022,7 +3021,8 @@ std::vector<std::map<std::string, std::string>>
JamiAccount::getTrustRequests() const
{
std::lock_guard<std::mutex> lock(configurationMutex_);
return accountManager_ ? accountManager_->getTrustRequests() : std::vector<std::map<std::string, std::string>>{};
return accountManager_ ? accountManager_->getTrustRequests()
: std::vector<std::map<std::string, std::string>> {};
}
bool
......@@ -3358,28 +3358,26 @@ JamiAccount::requestPeerConnection(
const std::string& peer_id,
const DRing::DataTransferId& tid,
bool isVCard,
const std::function<void(PeerConnection*)>& connect_cb,
const std::function<void(const std::shared_ptr<ChanneledOutgoingTransfer>&)>&
channeledConnectedCb,
const std::function<void()>& onChanneledCancelled)
{
if (not dhtPeerConnector_) {
runOnMainThread([onChanneledCancelled]{ onChanneledCancelled(); });
runOnMainThread([onChanneledCancelled] { onChanneledCancelled(); });
return;
}
dhtPeerConnector_->requestConnection(peer_id,
tid,
isVCard,
connect_cb,
channeledConnectedCb,
onChanneledCancelled);
}
void
JamiAccount::closePeerConnection(const std::string& peer, const DRing::DataTransferId& tid)
JamiAccount::closePeerConnection(const DRing::DataTransferId& tid)
{
if (dhtPeerConnector_)
dhtPeerConnector_->closeConnection(peer, tid);
dhtPeerConnector_->closeConnection(tid);
}
void
......
......@@ -72,7 +72,6 @@ struct DeviceSync;
struct AccountArchive;
class ConnectionManager;
class DhtPeerConnector;
class PeerConnection;
class ContactList;
class AccountManager;
struct AccountInfo;
......@@ -366,7 +365,6 @@ public:
const std::string& peer,
const DRing::DataTransferId& tid,
bool isVCard,
const std::function<void(PeerConnection*)>& connect_cb,
const std::function<void(const std::shared_ptr<ChanneledOutgoingTransfer>&)>&
channeledConnectedCb,
const std::function<void()>& onChanneledCancelled);
......@@ -377,7 +375,7 @@ public:
/// /// \param[in] peer RingID on request's recipient
/// /// \param[in] tid linked outgoing data transfer
///
void closePeerConnection(const std::string& peer, const DRing::DataTransferId& tid);
void closePeerConnection(const DRing::DataTransferId& tid);
std::vector<std::string> publicAddresses();
......
This diff is collapsed.
......@@ -32,7 +32,6 @@
namespace jami {
class JamiAccount;
class PeerConnection;
class DhtPeerConnector
{
......@@ -40,16 +39,14 @@ public:
DhtPeerConnector(JamiAccount& account);
~DhtPeerConnector();
void onDhtConnected(const std::string& device_id);
void requestConnection(
const std::string& peer_id,
const DRing::DataTransferId& tid,
bool isVCard,
const std::function<void(PeerConnection*)>& connect_cb,
const std::function<void(const std::shared_ptr<ChanneledOutgoingTransfer>&)>&
channeledConnectedCb,
const std::function<void()>& onChanneledCancelled);
void closeConnection(const std::string& peer_id, const DRing::DataTransferId& tid);
void closeConnection(const DRing::DataTransferId& tid);
bool onIncomingChannelRequest(const DRing::DataTransferId& tid);
void onIncomingConnection(const std::string& peer_id,
const DRing::DataTransferId& tid,
......
......@@ -678,293 +678,4 @@ TlsSocketEndpoint::underlyingICE() const
return {};
}
//==============================================================================
// following namespace prevents an ODR violation with definitions in p2p.cpp
namespace {
enum class CtrlMsgType {
STOP,
ATTACH_INPUT,
ATTACH_OUTPUT,
};
struct CtrlMsg
{
virtual CtrlMsgType type() const = 0;
virtual ~CtrlMsg() = default;
};
struct StopCtrlMsg final : CtrlMsg
{
explicit StopCtrlMsg() {}
CtrlMsgType type() const override { return CtrlMsgType::STOP; }
};
struct AttachInputCtrlMsg final : CtrlMsg
{
explicit AttachInputCtrlMsg(const std::shared_ptr<Stream>& stream)
: stream {stream}
{}
CtrlMsgType type() const override { return CtrlMsgType::ATTACH_INPUT; }
const std::shared_ptr<Stream> stream;
};
struct AttachOutputCtrlMsg final : CtrlMsg
{
explicit AttachOutputCtrlMsg(const std::shared_ptr<Stream>& stream)
: stream {stream}
{}
CtrlMsgType type() const override { return CtrlMsgType::ATTACH_OUTPUT; }
const std::shared_ptr<Stream> stream;
};
} // namespace
//==============================================================================
class PeerConnection::PeerConnectionImpl
{
public:
PeerConnectionImpl(std::function<void()>&& done,
const std::string& peer_uri,
std::unique_ptr<SocketType> endpoint)
: peer_uri {peer_uri}
, endpoint_ {std::move(endpoint)}
, eventLoopFut_ {std::async(std::launch::async, [this, done = std::move(done)] {
try {
eventLoop();
} catch (const std::exception& e) {
JAMI_ERR() << "[CNX] peer connection event loop failure: " << e.what();
done();
}
})}
{}
~PeerConnectionImpl()
{
ctrlChannel << std::make_unique<StopCtrlMsg>();
endpoint_->shutdown();
}
bool hasStreamWithId(const DRing::DataTransferId& id)
{
auto isInInput = std::any_of(inputs_.begin(),
inputs_.end(),
[&id](const std::shared_ptr<Stream>& str) {
return str && str->getId() == id;
});
if (isInInput)
return true;
auto isInOutput = std::any_of(outputs_.begin(),
outputs_.end(),
[&id](const std::shared_ptr<Stream>& str) {
return str && str->getId() == id;
});
return isInOutput;
}
const std::string peer_uri;
Channel<std::unique_ptr<CtrlMsg>> ctrlChannel;
OnStateChangedCb stateChangedCb_;
std::vector<std::shared_ptr<Stream>> inputs_;
std::vector<std::shared_ptr<Stream>> outputs_;
private:
std::unique_ptr<SocketType> endpoint_;
std::future<void> eventLoopFut_;
std::vector<uint8_t> bufferPool_; // will store non rattached buffers
void eventLoop();
template<typename L, typename C>
void handle_stream_list(L& stream_list, const C& callable)
{
if (stream_list.empty())
return;
const auto& item = std::begin(stream_list);
auto& stream = *item;
try {
if (callable(stream))
return;
JAMI_DBG() << "EOF on stream #" << stream->getId();
} catch (const std::system_error& e) {
JAMI_WARN() << "Stream #" << stream->getId() << " IO failed with code = " << e.code();
} catch (const std::exception& e) {
JAMI_ERR() << "Unexpected exception during IO with stream #" << stream->getId() << ": "
<< e.what();
}
stream->close();
stream_list.erase(item);
}
};
void
PeerConnection::PeerConnectionImpl::eventLoop()
{
JAMI_DBG() << "[CNX] Peer connection to " << peer_uri << " ready";
while (true) {
// Process ctrl orders first
while (true) {
std::unique_ptr<CtrlMsg> msg;
if (outputs_.empty() and inputs_.empty()) {
if (!ctrlChannel.empty()) {
msg = ctrlChannel.receive();
} else {
std::error_code ec;
if (endpoint_->waitForData(std::chrono::milliseconds(100), ec) > 0) {
std::vector<uint8_t> buf(IO_BUFFER_SIZE);
JAMI_DBG("A good buffer arrived before any input or output attachment");
auto size = endpoint_->read(buf, ec);
if (ec)
throw std::system_error(ec);
// If it's a good read, we should store the buffer somewhere
// and give it to the next input or output.
if (size < IO_BUFFER_SIZE)
bufferPool_.insert(bufferPool_.end(), buf.begin(), buf.begin() + size);
}
break;
}
} else if (!ctrlChannel.empty()) {
msg = ctrlChannel.receive();
} else
break;
switch (msg->type()) {
case CtrlMsgType::ATTACH_INPUT: {
auto& input_msg = static_cast<AttachInputCtrlMsg&>(*msg);
input_msg.stream->setOnStateChangedCb(stateChangedCb_);
inputs_.emplace_back(std::move(input_msg.stream));
} break;
case CtrlMsgType::ATTACH_OUTPUT: {
auto& output_msg = static_cast<AttachOutputCtrlMsg&>(*msg);
output_msg.stream->setOnStateChangedCb(stateChangedCb_);
outputs_.emplace_back(std::move(output_msg.stream));
} break;
case CtrlMsgType::STOP:
return;
default:
JAMI_ERR("BUG: got unhandled control msg!");
break;
}
}
// Then handles IO streams
std::vector<uint8_t> buf;
std::error_code ec;
bool sleep = true;
// sending loop
handle_stream_list(inputs_, [&](auto& stream) {
if (!stream)
return false;
buf.resize(IO_BUFFER_SIZE);
if (stream->read(buf)) {
if (not buf.empty()) {
endpoint_->write(buf, ec);
if (ec)
throw std::system_error(ec);
sleep = false;
}
} else {
// EOF on outgoing stream => finished
return false;
}
if (!bufferPool_.empty()) {
stream->write(bufferPool_);
bufferPool_.clear();
} else if (endpoint_->waitForData(std::chrono::milliseconds(0), ec) > 0) {
buf.resize(IO_BUFFER_SIZE);
endpoint_->read(buf, ec);
if (ec)
throw std::system_error(ec);
return stream->write(buf);
} else if (ec)
throw std::system_error(ec);
return true;
});
// receiving loop
handle_stream_list(outputs_, [&](auto& stream) {
if (!stream)
return false;
buf.resize(IO_BUFFER_SIZE);
auto eof = stream->read(buf);
// if eof we let a chance to send a reply before leaving
if (not buf.empty()) {
endpoint_->write(buf, ec);
if (ec)
throw std::system_error(ec);
}
if (not eof)
return false;
if (!bufferPool_.empty()) {
stream->write(bufferPool_);
bufferPool_.clear();
} else if (endpoint_->waitForData(std::chrono::milliseconds(0), ec) > 0) {
buf.resize(IO_BUFFER_SIZE);
endpoint_->read(buf, ec);
if (ec)
throw std::system_error(ec);
sleep = false;
return stream->write(buf);
} else if (ec)
throw std::system_error(ec);
return true;
});
if (sleep)
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
//==============================================================================
PeerConnection::PeerConnection(std::function<void()>&& done,
const std::string& peer_uri,
std::unique_ptr<GenericSocket<uint8_t>> endpoint)
: pimpl_(std::make_unique<PeerConnectionImpl>(std::move(done), peer_uri, std::move(endpoint)))
{}
PeerConnection::~PeerConnection() {}
void
PeerConnection::attachInputStream(const std::shared_ptr<Stream>& stream)
{
pimpl_->ctrlChannel << std::make_unique<AttachInputCtrlMsg>(stream);
}
void
PeerConnection::attachOutputStream(const std::shared_ptr<Stream>& stream)
{
pimpl_->ctrlChannel << std::make_unique<AttachOutputCtrlMsg>(stream);
}
bool
PeerConnection::hasStreamWithId(const DRing::DataTransferId& id)
{
return pimpl_->hasStreamWithId(id);
}
std::string
PeerConnection::getPeerUri() const
{
return pimpl_->peer_uri;
}
void
PeerConnection::setOnStateChangedCb(const OnStateChangedCb& cb)
{
pimpl_->stateChangedCb_ = cb;
for (auto& input : pimpl_->inputs_)
input->setOnStateChangedCb(pimpl_->stateChangedCb_);
for (auto& output : pimpl_->outputs_)
output->setOnStateChangedCb(pimpl_->stateChangedCb_);
}
} // namespace jami
......@@ -246,37 +246,4 @@ private:
std::unique_ptr<Impl> pimpl_;
};
//==============================================================================
class PeerConnection
{
public:
using SocketType = GenericSocket<uint8_t>;
PeerConnection(std::function<void()>&& done,
const std::string& peer_uri,
std::unique_ptr<SocketType> endpoint);
~PeerConnection();
void attachOutputStream(const std::shared_ptr<Stream>& stream);
void attachInputStream(const std::shared_ptr<Stream>& stream);
/**
* Check if an input or output stream got the given id.
* NOTE: used by p2p to know which PeerConnection to close
* @param id to check
* @return if id is found
*/
bool hasStreamWithId(const DRing::DataTransferId& id);
std::string getPeerUri() const;
void setOnStateChangedCb(const OnStateChangedCb&);
private:
class PeerConnectionImpl;
std::unique_ptr<PeerConnectionImpl> pimpl_;
};
} // namespace jami
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment