Skip to content
Snippets Groups Projects
Commit 68fc552f authored by Adrien Béraud's avatar Adrien Béraud Committed by Adrien Béraud
Browse files

jamiaccount: use MessageChannelHandler for connection status

Change-Id: Icb3d5de46b45c8097a0e6080bbb1bb3b60503dce
parent 2c615dbc
No related branches found
No related tags found
No related merge requests found
......@@ -1857,6 +1857,7 @@ JamiAccount::onTrackedBuddyOffline(const dht::InfoHash& contactId)
JAMI_WARNING("[Account {:s}] Buddy {} is not present on the DHT, but P2P connected",
getAccountID(),
id);
return;
}
state = PresenceState::DISCONNECTED;
emitSignal<libjami::PresenceSignal::NewBuddyNotification>(getAccountID(),
......@@ -3111,6 +3112,7 @@ JamiAccount::sendMessage(const std::string& to,
}
const auto& payload = *payloads.begin();
MessageChannelHandler::Message msg;
msg.id = token;
msg.t = payload.first;
msg.c = payload.second;
auto device = deviceId.empty() ? DeviceId() : DeviceId(deviceId);
......@@ -3756,10 +3758,21 @@ JamiAccount::requestMessageConnection(const std::string& peerId,
[w = weak(), peerId](std::shared_ptr<dhtnet::ChannelSocket> socket,
const DeviceId& deviceId) {
if (socket)
dht::ThreadPool::io().run([w, peerId, deviceId] {
if (auto acc = w.lock()) {
acc->messageEngine_.onPeerOnline(peerId);
acc->messageEngine_.onPeerOnline(peerId, deviceId.toString(), true);
if (!acc->presenceNote_.empty()) {
// If a presence note is set, send it to this device.
auto token = std::uniform_int_distribution<uint64_t> {1, JAMI_ID_MAX_VAL}(acc->rand);
std::map<std::string, std::string> msg = {
{MIME_TYPE_PIDF, getPIDF(acc->presenceNote_)}
};
acc->sendMessage(peerId, deviceId.toString(), msg, token, false, true);
}
acc->convModule()->syncConversations(peerId, deviceId.toString());
}
});
},
connectionType);
}
......@@ -3845,11 +3858,19 @@ JamiAccount::sendPresenceNote(const std::string& note)
return;
presenceNote_ = note;
auto contacts = info->contacts->getContacts();
std::vector<SipConnectionKey> keys;
std::vector<std::pair<std::string, DeviceId>> keys;
{
std::lock_guard lk(sipConnsMtx_);
for (auto& [key, conns] : sipConns_) {
keys.push_back(key);
std::shared_lock lkCM(connManagerMtx_);
auto* handler = static_cast<MessageChannelHandler*>(
channelHandlers_[Uri::Scheme::MESSAGE].get());
if (!handler)
return;
for (const auto& contact : contacts) {
auto peerId = contact.first.toString();
auto channels = handler->getChannels(peerId);
for (const auto& channel : channels) {
keys.emplace_back(peerId, channel->deviceId());
}
}
}
auto token = std::uniform_int_distribution<uint64_t> {1, JAMI_ID_MAX_VAL}(rand);
......@@ -4068,20 +4089,6 @@ JamiAccount::cacheSIPConnection(std::shared_ptr<dhtnet::ChannelSocket>&& socket,
JAMI_WARNING("[Account {:s}] [device {}] New SIP channel opened", getAccountID(), deviceId);
lk.unlock();
dht::ThreadPool::io().run([w = weak(), peerId, deviceId] {
if (auto shared = w.lock()) {
if (shared->presenceNote_ != "") {
// If a presence note is set, send it to this device.
auto token = std::uniform_int_distribution<uint64_t> {1, JAMI_ID_MAX_VAL}(
shared->rand);
std::map<std::string, std::string> msg = {
{MIME_TYPE_PIDF, getPIDF(shared->presenceNote_)}};
shared->sendMessage(peerId, deviceId.toString(), msg, token, false, true);
}
shared->convModule()->syncConversations(peerId, deviceId.toString());
}
});
// Retry messages
messageEngine_.onPeerOnline(peerId);
messageEngine_.onPeerOnline(peerId, deviceId.toString(), true);
......@@ -4105,16 +4112,6 @@ JamiAccount::cacheSIPConnection(std::shared_ptr<dhtnet::ChannelSocket>&& socket,
}
}
});
auto& state = presenceState_[peerId];
if (state != PresenceState::CONNECTED) {
state = PresenceState::CONNECTED;
emitSignal<libjami::PresenceSignal::NewBuddyNotification>(getAccountID(),
peerId,
static_cast<int>(
PresenceState::CONNECTED),
"");
}
}
void
......@@ -4133,27 +4130,6 @@ JamiAccount::shutdownSIPConnection(const std::shared_ptr<dhtnet::ChannelSocket>&
conns.end());
if (conns.empty()) {
sipConns_.erase(it);
// If all devices of an account are disconnected, we need to update the presence state
auto it = std::find_if(sipConns_.begin(), sipConns_.end(), [&](const auto& v) {
return v.first.first == peerId;
});
if (it == sipConns_.end()) {
auto& state = presenceState_[peerId];
if (state == PresenceState::CONNECTED) {
std::lock_guard lock(buddyInfoMtx);
auto buddy = trackedBuddies_.find(dht::InfoHash(peerId));
if (buddy == trackedBuddies_.end())
state = PresenceState::DISCONNECTED;
else
state = buddy->second.devices_cnt > 0 ? PresenceState::AVAILABLE
: PresenceState::DISCONNECTED;
emitSignal<libjami::PresenceSignal::NewBuddyNotification>(getAccountID(),
peerId,
static_cast<int>(
state),
"");
}
}
}
}
lk.unlock();
......@@ -4462,6 +4438,24 @@ JamiAccount::askForProfile(const std::string& conversationId,
false);
}
void
JamiAccount::onPeerConnected(const std::string& peerId, bool connected)
{
std::unique_lock lock(buddyInfoMtx);
auto& state = presenceState_[peerId];
auto it = trackedBuddies_.find(dht::InfoHash(peerId));
auto isOnline = it != trackedBuddies_.end() && it->second.devices_cnt > 0;
auto newState = connected ? PresenceState::CONNECTED : (isOnline ? PresenceState::AVAILABLE : PresenceState::DISCONNECTED);
if (state != newState) {
state = newState;
lock.unlock();
emitSignal<libjami::PresenceSignal::NewBuddyNotification>(getAccountID(),
peerId,
static_cast<int>(newState),
"");
}
}
void
JamiAccount::initConnectionManager()
{
......@@ -4500,7 +4494,16 @@ JamiAccount::initConnectionManager()
channelHandlers_[Uri::Scheme::DATA_TRANSFER]
= std::make_unique<TransferChannelHandler>(shared(), *connectionManager_.get());
channelHandlers_[Uri::Scheme::MESSAGE]
= std::make_unique<MessageChannelHandler>(shared(), *connectionManager_.get());
= std::make_unique<MessageChannelHandler>(*connectionManager_.get(),
[this](const auto& cert, std::string& type, const std::string& content) {
onTextMessage("", cert->issuer->getId().toString(), cert, {{type, content}});
},
[w = weak()](const std::string& peer, bool connected) {
Manager::instance().ioContext()->post([w, peer, connected] {
if (auto acc = w.lock())
acc->onPeerConnected(peer, connected);
});
});
channelHandlers_[Uri::Scheme::AUTH]
= std::make_unique<AuthChannelHandler>(shared(), *connectionManager_.get());
......
......@@ -648,6 +648,7 @@ private:
*/
void trackPresence(const dht::InfoHash& h, BuddyInfo& buddy);
void onPeerConnected(const std::string& peerId, bool connected);
void doRegister_();
......
......@@ -24,14 +24,16 @@ using Key = std::pair<std::string, DeviceId>;
struct MessageChannelHandler::Impl : public std::enable_shared_from_this<Impl>
{
std::weak_ptr<JamiAccount> account_;
dhtnet::ConnectionManager& connectionManager_;
OnMessage onMessage_;
OnPeerStateChanged onPeerStateChanged_;
std::recursive_mutex connectionsMtx_;
std::map<Key, std::vector<std::shared_ptr<dhtnet::ChannelSocket>>> connections_;
std::map<std::string, std::map<DeviceId, std::vector<std::shared_ptr<dhtnet::ChannelSocket>>>> connections_;
Impl(const std::shared_ptr<JamiAccount>& acc, dhtnet::ConnectionManager& cm)
: account_(acc)
, connectionManager_(cm)
Impl(dhtnet::ConnectionManager& cm, OnMessage onMessage, OnPeerStateChanged onPeer)
: connectionManager_(cm)
, onMessage_(std::move(onMessage))
, onPeerStateChanged_(std::move(onPeer))
{}
void onChannelShutdown(const std::shared_ptr<dhtnet::ChannelSocket>& socket,
......@@ -39,10 +41,10 @@ struct MessageChannelHandler::Impl : public std::enable_shared_from_this<Impl>
const DeviceId& device);
};
MessageChannelHandler::MessageChannelHandler(const std::shared_ptr<JamiAccount>& acc,
dhtnet::ConnectionManager& cm)
MessageChannelHandler::MessageChannelHandler(dhtnet::ConnectionManager& cm,
OnMessage onMessage, OnPeerStateChanged onPeer)
: ChannelHandlerInterface()
, pimpl_(std::make_shared<Impl>(acc, cm))
, pimpl_(std::make_shared<Impl>(cm, std::move(onMessage), std::move(onPeer)))
{}
MessageChannelHandler::~MessageChannelHandler() {}
......@@ -56,7 +58,7 @@ MessageChannelHandler::connect(const DeviceId& deviceId,
{
auto channelName = MESSAGE_SCHEME + deviceId.toString();
if (pimpl_->connectionManager_.isConnecting(deviceId, channelName)) {
JAMI_INFO("Already connecting to %s", deviceId.to_c_str());
JAMI_LOG("Already connecting to {}", deviceId);
return;
}
pimpl_->connectionManager_.connectDevice(deviceId,
......@@ -73,27 +75,38 @@ MessageChannelHandler::Impl::onChannelShutdown(const std::shared_ptr<dhtnet::Cha
const DeviceId& device)
{
std::lock_guard lk(connectionsMtx_);
auto connectionsIt = connections_.find({peerId, device});
if (connectionsIt == connections_.end())
auto peerIt = connections_.find(peerId);
if (peerIt == connections_.end())
return;
auto connectionsIt = peerIt->second.find(device);
if (connectionsIt == peerIt->second.end())
return;
auto& connections = connectionsIt->second;
auto conn = std::find(connections.begin(), connections.end(), socket);
if (conn != connections.end())
connections.erase(conn);
if (connections.empty())
connections_.erase(connectionsIt);
if (connections.empty()) {
peerIt->second.erase(connectionsIt);
}
if (peerIt->second.empty()) {
connections_.erase(peerIt);
onPeerStateChanged_(peerId, false);
}
}
std::shared_ptr<dhtnet::ChannelSocket>
MessageChannelHandler::getChannel(const std::string& peer, const DeviceId& deviceId) const
{
std::lock_guard lk(pimpl_->connectionsMtx_);
auto it = pimpl_->connections_.find({peer, deviceId});
auto it = pimpl_->connections_.find(peer);
if (it == pimpl_->connections_.end())
return nullptr;
if (it->second.empty())
auto deviceIt = it->second.find(deviceId);
if (deviceIt == it->second.end())
return nullptr;
if (deviceIt->second.empty())
return nullptr;
return it->second.front();
return deviceIt->second.back();
}
std::vector<std::shared_ptr<dhtnet::ChannelSocket>>
......@@ -101,9 +114,15 @@ MessageChannelHandler::getChannels(const std::string& peer) const
{
std::vector<std::shared_ptr<dhtnet::ChannelSocket>> sockets;
std::lock_guard lk(pimpl_->connectionsMtx_);
auto lower = pimpl_->connections_.lower_bound({peer, DeviceId()});
for (auto it = lower; it != pimpl_->connections_.end() && it->first.first == peer; ++it)
sockets.insert(sockets.end(), it->second.begin(), it->second.end());
auto it = pimpl_->connections_.find(peer);
if (it == pimpl_->connections_.end())
return sockets;
sockets.reserve(it->second.size());
for (auto& [deviceId, channels] : it->second) {
for (auto& channel : channels) {
sockets.push_back(channel);
}
}
return sockets;
}
......@@ -111,11 +130,9 @@ bool
MessageChannelHandler::onRequest(const std::shared_ptr<dht::crypto::Certificate>& cert,
const std::string& /* name */)
{
auto acc = pimpl_->account_.lock();
if (!cert || !cert->issuer || !acc)
if (!cert || !cert->issuer)
return false;
return true;
// return cert->issuer->getId().toString() == acc->getUsername();
}
void
......@@ -123,13 +140,17 @@ MessageChannelHandler::onReady(const std::shared_ptr<dht::crypto::Certificate>&
const std::string&,
std::shared_ptr<dhtnet::ChannelSocket> socket)
{
auto acc = pimpl_->account_.lock();
if (!cert || !cert->issuer || !acc)
if (!cert || !cert->issuer)
return;
auto peerId = cert->issuer->getId().toString();
auto device = cert->getLongId();
std::lock_guard lk(pimpl_->connectionsMtx_);
pimpl_->connections_[{peerId, device}].emplace_back(socket);
auto& connections = pimpl_->connections_[peerId];
bool newPeerConnection = connections.empty();
auto& deviceConnections = connections[device];
deviceConnections.insert(deviceConnections.begin(), socket);
if (newPeerConnection)
pimpl_->onPeerStateChanged_(peerId, true);
socket->onShutdown([w = pimpl_->weak_from_this(), peerId, device, s = std::weak_ptr(socket)]() {
if (auto shared = w.lock())
......@@ -143,11 +164,11 @@ MessageChannelHandler::onReady(const std::shared_ptr<dht::crypto::Certificate>&
1500};
};
socket->setOnRecv([acc = pimpl_->account_.lock(),
socket->setOnRecv([onMessage = pimpl_->onMessage_,
peerId,
cert,
ctx = std::make_shared<DecodingContext>()](const uint8_t* buf, size_t len) {
if (!buf || !acc)
if (!buf)
return len;
ctx->pac.reserve_buffer(len);
......@@ -159,7 +180,7 @@ MessageChannelHandler::onReady(const std::shared_ptr<dht::crypto::Certificate>&
while (ctx->pac.next(oh)) {
Message msg;
oh.get().convert(msg);
acc->onTextMessage("", peerId, cert, {{msg.t, msg.c}});
onMessage(cert, msg.t, msg.c);
}
} catch (const std::exception& e) {
JAMI_WARNING("[convInfo] error on sync: {:s}", e.what());
......
......@@ -28,7 +28,9 @@ namespace jami {
class MessageChannelHandler : public ChannelHandlerInterface
{
public:
MessageChannelHandler(const std::shared_ptr<JamiAccount>& acc, dhtnet::ConnectionManager& cm);
using OnMessage = std::function<void(const std::shared_ptr<dht::crypto::Certificate>&, std::string&, const std::string&)>;
using OnPeerStateChanged = std::function<void(const std::string&, bool)>;
MessageChannelHandler(dhtnet::ConnectionManager& cm, OnMessage onMessage, OnPeerStateChanged onPeer);
~MessageChannelHandler();
/**
......@@ -70,10 +72,11 @@ public:
struct Message
{
uint64_t id {0}; /* Message ID */
std::string t; /* Message type */
std::string c; /* Message content */
std::unique_ptr<ConversationRequest> req; /* Conversation request */
MSGPACK_DEFINE_MAP(t, c, req)
MSGPACK_DEFINE_MAP(id, t, c, req)
};
static bool sendMessage(const std::shared_ptr<dhtnet::ChannelSocket>&, const Message& message);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment