diff --git a/src/jamidht/jamiaccount.cpp b/src/jamidht/jamiaccount.cpp index 3ee4196a08f6a50546c253485e0227f4f52f9f96..3a74d79cad7660c83f4cee364935f95181a41d6f 100644 --- a/src/jamidht/jamiaccount.cpp +++ b/src/jamidht/jamiaccount.cpp @@ -1857,6 +1857,7 @@ JamiAccount::onTrackedBuddyOffline(const dht::InfoHash& contactId) JAMI_WARNING("[Account {:s}] Buddy {} is not present on the DHT, but P2P connected", getAccountID(), id); + return; } state = PresenceState::DISCONNECTED; emitSignal<libjami::PresenceSignal::NewBuddyNotification>(getAccountID(), @@ -3111,6 +3112,7 @@ JamiAccount::sendMessage(const std::string& to, } const auto& payload = *payloads.begin(); MessageChannelHandler::Message msg; + msg.id = token; msg.t = payload.first; msg.c = payload.second; auto device = deviceId.empty() ? DeviceId() : DeviceId(deviceId); @@ -3756,10 +3758,21 @@ JamiAccount::requestMessageConnection(const std::string& peerId, [w = weak(), peerId](std::shared_ptr<dhtnet::ChannelSocket> socket, const DeviceId& deviceId) { if (socket) - if (auto acc = w.lock()) { - acc->messageEngine_.onPeerOnline(peerId); - acc->messageEngine_.onPeerOnline(peerId, deviceId.toString(), true); - } + dht::ThreadPool::io().run([w, peerId, deviceId] { + if (auto acc = w.lock()) { + acc->messageEngine_.onPeerOnline(peerId); + acc->messageEngine_.onPeerOnline(peerId, deviceId.toString(), true); + if (!acc->presenceNote_.empty()) { + // If a presence note is set, send it to this device. + auto token = std::uniform_int_distribution<uint64_t> {1, JAMI_ID_MAX_VAL}(acc->rand); + std::map<std::string, std::string> msg = { + {MIME_TYPE_PIDF, getPIDF(acc->presenceNote_)} + }; + acc->sendMessage(peerId, deviceId.toString(), msg, token, false, true); + } + acc->convModule()->syncConversations(peerId, deviceId.toString()); + } + }); }, connectionType); } @@ -3845,11 +3858,19 @@ JamiAccount::sendPresenceNote(const std::string& note) return; presenceNote_ = note; auto contacts = info->contacts->getContacts(); - std::vector<SipConnectionKey> keys; + std::vector<std::pair<std::string, DeviceId>> keys; { - std::lock_guard lk(sipConnsMtx_); - for (auto& [key, conns] : sipConns_) { - keys.push_back(key); + std::shared_lock lkCM(connManagerMtx_); + auto* handler = static_cast<MessageChannelHandler*>( + channelHandlers_[Uri::Scheme::MESSAGE].get()); + if (!handler) + return; + for (const auto& contact : contacts) { + auto peerId = contact.first.toString(); + auto channels = handler->getChannels(peerId); + for (const auto& channel : channels) { + keys.emplace_back(peerId, channel->deviceId()); + } } } auto token = std::uniform_int_distribution<uint64_t> {1, JAMI_ID_MAX_VAL}(rand); @@ -4068,20 +4089,6 @@ JamiAccount::cacheSIPConnection(std::shared_ptr<dhtnet::ChannelSocket>&& socket, JAMI_WARNING("[Account {:s}] [device {}] New SIP channel opened", getAccountID(), deviceId); lk.unlock(); - dht::ThreadPool::io().run([w = weak(), peerId, deviceId] { - if (auto shared = w.lock()) { - if (shared->presenceNote_ != "") { - // If a presence note is set, send it to this device. - auto token = std::uniform_int_distribution<uint64_t> {1, JAMI_ID_MAX_VAL}( - shared->rand); - std::map<std::string, std::string> msg = { - {MIME_TYPE_PIDF, getPIDF(shared->presenceNote_)}}; - shared->sendMessage(peerId, deviceId.toString(), msg, token, false, true); - } - shared->convModule()->syncConversations(peerId, deviceId.toString()); - } - }); - // Retry messages messageEngine_.onPeerOnline(peerId); messageEngine_.onPeerOnline(peerId, deviceId.toString(), true); @@ -4105,16 +4112,6 @@ JamiAccount::cacheSIPConnection(std::shared_ptr<dhtnet::ChannelSocket>&& socket, } } }); - - auto& state = presenceState_[peerId]; - if (state != PresenceState::CONNECTED) { - state = PresenceState::CONNECTED; - emitSignal<libjami::PresenceSignal::NewBuddyNotification>(getAccountID(), - peerId, - static_cast<int>( - PresenceState::CONNECTED), - ""); - } } void @@ -4133,27 +4130,6 @@ JamiAccount::shutdownSIPConnection(const std::shared_ptr<dhtnet::ChannelSocket>& conns.end()); if (conns.empty()) { sipConns_.erase(it); - // If all devices of an account are disconnected, we need to update the presence state - auto it = std::find_if(sipConns_.begin(), sipConns_.end(), [&](const auto& v) { - return v.first.first == peerId; - }); - if (it == sipConns_.end()) { - auto& state = presenceState_[peerId]; - if (state == PresenceState::CONNECTED) { - std::lock_guard lock(buddyInfoMtx); - auto buddy = trackedBuddies_.find(dht::InfoHash(peerId)); - if (buddy == trackedBuddies_.end()) - state = PresenceState::DISCONNECTED; - else - state = buddy->second.devices_cnt > 0 ? PresenceState::AVAILABLE - : PresenceState::DISCONNECTED; - emitSignal<libjami::PresenceSignal::NewBuddyNotification>(getAccountID(), - peerId, - static_cast<int>( - state), - ""); - } - } } } lk.unlock(); @@ -4462,6 +4438,24 @@ JamiAccount::askForProfile(const std::string& conversationId, false); } +void +JamiAccount::onPeerConnected(const std::string& peerId, bool connected) +{ + std::unique_lock lock(buddyInfoMtx); + auto& state = presenceState_[peerId]; + auto it = trackedBuddies_.find(dht::InfoHash(peerId)); + auto isOnline = it != trackedBuddies_.end() && it->second.devices_cnt > 0; + auto newState = connected ? PresenceState::CONNECTED : (isOnline ? PresenceState::AVAILABLE : PresenceState::DISCONNECTED); + if (state != newState) { + state = newState; + lock.unlock(); + emitSignal<libjami::PresenceSignal::NewBuddyNotification>(getAccountID(), + peerId, + static_cast<int>(newState), + ""); + } +} + void JamiAccount::initConnectionManager() { @@ -4500,7 +4494,16 @@ JamiAccount::initConnectionManager() channelHandlers_[Uri::Scheme::DATA_TRANSFER] = std::make_unique<TransferChannelHandler>(shared(), *connectionManager_.get()); channelHandlers_[Uri::Scheme::MESSAGE] - = std::make_unique<MessageChannelHandler>(shared(), *connectionManager_.get()); + = std::make_unique<MessageChannelHandler>(*connectionManager_.get(), + [this](const auto& cert, std::string& type, const std::string& content) { + onTextMessage("", cert->issuer->getId().toString(), cert, {{type, content}}); + }, + [w = weak()](const std::string& peer, bool connected) { + Manager::instance().ioContext()->post([w, peer, connected] { + if (auto acc = w.lock()) + acc->onPeerConnected(peer, connected); + }); + }); channelHandlers_[Uri::Scheme::AUTH] = std::make_unique<AuthChannelHandler>(shared(), *connectionManager_.get()); diff --git a/src/jamidht/jamiaccount.h b/src/jamidht/jamiaccount.h index 1ba574d1d751f84ea748731ef77a673ca305b158..d0049f6adde159b3de06f83ade93f4c440a33191 100644 --- a/src/jamidht/jamiaccount.h +++ b/src/jamidht/jamiaccount.h @@ -648,6 +648,7 @@ private: */ void trackPresence(const dht::InfoHash& h, BuddyInfo& buddy); + void onPeerConnected(const std::string& peerId, bool connected); void doRegister_(); diff --git a/src/jamidht/message_channel_handler.cpp b/src/jamidht/message_channel_handler.cpp index ca0e892b15fc222ff7a2ebccbe143b138d1c90ca..616f76f8c5e1586a8b932c0151cfd7cecfd377e7 100644 --- a/src/jamidht/message_channel_handler.cpp +++ b/src/jamidht/message_channel_handler.cpp @@ -24,14 +24,16 @@ using Key = std::pair<std::string, DeviceId>; struct MessageChannelHandler::Impl : public std::enable_shared_from_this<Impl> { - std::weak_ptr<JamiAccount> account_; dhtnet::ConnectionManager& connectionManager_; + OnMessage onMessage_; + OnPeerStateChanged onPeerStateChanged_; std::recursive_mutex connectionsMtx_; - std::map<Key, std::vector<std::shared_ptr<dhtnet::ChannelSocket>>> connections_; + std::map<std::string, std::map<DeviceId, std::vector<std::shared_ptr<dhtnet::ChannelSocket>>>> connections_; - Impl(const std::shared_ptr<JamiAccount>& acc, dhtnet::ConnectionManager& cm) - : account_(acc) - , connectionManager_(cm) + Impl(dhtnet::ConnectionManager& cm, OnMessage onMessage, OnPeerStateChanged onPeer) + : connectionManager_(cm) + , onMessage_(std::move(onMessage)) + , onPeerStateChanged_(std::move(onPeer)) {} void onChannelShutdown(const std::shared_ptr<dhtnet::ChannelSocket>& socket, @@ -39,10 +41,10 @@ struct MessageChannelHandler::Impl : public std::enable_shared_from_this<Impl> const DeviceId& device); }; -MessageChannelHandler::MessageChannelHandler(const std::shared_ptr<JamiAccount>& acc, - dhtnet::ConnectionManager& cm) +MessageChannelHandler::MessageChannelHandler(dhtnet::ConnectionManager& cm, + OnMessage onMessage, OnPeerStateChanged onPeer) : ChannelHandlerInterface() - , pimpl_(std::make_shared<Impl>(acc, cm)) + , pimpl_(std::make_shared<Impl>(cm, std::move(onMessage), std::move(onPeer))) {} MessageChannelHandler::~MessageChannelHandler() {} @@ -56,7 +58,7 @@ MessageChannelHandler::connect(const DeviceId& deviceId, { auto channelName = MESSAGE_SCHEME + deviceId.toString(); if (pimpl_->connectionManager_.isConnecting(deviceId, channelName)) { - JAMI_INFO("Already connecting to %s", deviceId.to_c_str()); + JAMI_LOG("Already connecting to {}", deviceId); return; } pimpl_->connectionManager_.connectDevice(deviceId, @@ -73,27 +75,38 @@ MessageChannelHandler::Impl::onChannelShutdown(const std::shared_ptr<dhtnet::Cha const DeviceId& device) { std::lock_guard lk(connectionsMtx_); - auto connectionsIt = connections_.find({peerId, device}); - if (connectionsIt == connections_.end()) + auto peerIt = connections_.find(peerId); + if (peerIt == connections_.end()) + return; + auto connectionsIt = peerIt->second.find(device); + if (connectionsIt == peerIt->second.end()) return; auto& connections = connectionsIt->second; auto conn = std::find(connections.begin(), connections.end(), socket); if (conn != connections.end()) connections.erase(conn); - if (connections.empty()) - connections_.erase(connectionsIt); + if (connections.empty()) { + peerIt->second.erase(connectionsIt); + } + if (peerIt->second.empty()) { + connections_.erase(peerIt); + onPeerStateChanged_(peerId, false); + } } std::shared_ptr<dhtnet::ChannelSocket> MessageChannelHandler::getChannel(const std::string& peer, const DeviceId& deviceId) const { std::lock_guard lk(pimpl_->connectionsMtx_); - auto it = pimpl_->connections_.find({peer, deviceId}); + auto it = pimpl_->connections_.find(peer); if (it == pimpl_->connections_.end()) return nullptr; - if (it->second.empty()) + auto deviceIt = it->second.find(deviceId); + if (deviceIt == it->second.end()) + return nullptr; + if (deviceIt->second.empty()) return nullptr; - return it->second.front(); + return deviceIt->second.back(); } std::vector<std::shared_ptr<dhtnet::ChannelSocket>> @@ -101,9 +114,15 @@ MessageChannelHandler::getChannels(const std::string& peer) const { std::vector<std::shared_ptr<dhtnet::ChannelSocket>> sockets; std::lock_guard lk(pimpl_->connectionsMtx_); - auto lower = pimpl_->connections_.lower_bound({peer, DeviceId()}); - for (auto it = lower; it != pimpl_->connections_.end() && it->first.first == peer; ++it) - sockets.insert(sockets.end(), it->second.begin(), it->second.end()); + auto it = pimpl_->connections_.find(peer); + if (it == pimpl_->connections_.end()) + return sockets; + sockets.reserve(it->second.size()); + for (auto& [deviceId, channels] : it->second) { + for (auto& channel : channels) { + sockets.push_back(channel); + } + } return sockets; } @@ -111,11 +130,9 @@ bool MessageChannelHandler::onRequest(const std::shared_ptr<dht::crypto::Certificate>& cert, const std::string& /* name */) { - auto acc = pimpl_->account_.lock(); - if (!cert || !cert->issuer || !acc) + if (!cert || !cert->issuer) return false; return true; - // return cert->issuer->getId().toString() == acc->getUsername(); } void @@ -123,13 +140,17 @@ MessageChannelHandler::onReady(const std::shared_ptr<dht::crypto::Certificate>& const std::string&, std::shared_ptr<dhtnet::ChannelSocket> socket) { - auto acc = pimpl_->account_.lock(); - if (!cert || !cert->issuer || !acc) + if (!cert || !cert->issuer) return; auto peerId = cert->issuer->getId().toString(); auto device = cert->getLongId(); std::lock_guard lk(pimpl_->connectionsMtx_); - pimpl_->connections_[{peerId, device}].emplace_back(socket); + auto& connections = pimpl_->connections_[peerId]; + bool newPeerConnection = connections.empty(); + auto& deviceConnections = connections[device]; + deviceConnections.insert(deviceConnections.begin(), socket); + if (newPeerConnection) + pimpl_->onPeerStateChanged_(peerId, true); socket->onShutdown([w = pimpl_->weak_from_this(), peerId, device, s = std::weak_ptr(socket)]() { if (auto shared = w.lock()) @@ -143,11 +164,11 @@ MessageChannelHandler::onReady(const std::shared_ptr<dht::crypto::Certificate>& 1500}; }; - socket->setOnRecv([acc = pimpl_->account_.lock(), + socket->setOnRecv([onMessage = pimpl_->onMessage_, peerId, cert, ctx = std::make_shared<DecodingContext>()](const uint8_t* buf, size_t len) { - if (!buf || !acc) + if (!buf) return len; ctx->pac.reserve_buffer(len); @@ -159,7 +180,7 @@ MessageChannelHandler::onReady(const std::shared_ptr<dht::crypto::Certificate>& while (ctx->pac.next(oh)) { Message msg; oh.get().convert(msg); - acc->onTextMessage("", peerId, cert, {{msg.t, msg.c}}); + onMessage(cert, msg.t, msg.c); } } catch (const std::exception& e) { JAMI_WARNING("[convInfo] error on sync: {:s}", e.what()); diff --git a/src/jamidht/message_channel_handler.h b/src/jamidht/message_channel_handler.h index 0d94d9a7a19f1850c1e074a21af70874a0135e11..d6fd46a67a2049835230564c29a6dc7fccc4c5e3 100644 --- a/src/jamidht/message_channel_handler.h +++ b/src/jamidht/message_channel_handler.h @@ -28,7 +28,9 @@ namespace jami { class MessageChannelHandler : public ChannelHandlerInterface { public: - MessageChannelHandler(const std::shared_ptr<JamiAccount>& acc, dhtnet::ConnectionManager& cm); + using OnMessage = std::function<void(const std::shared_ptr<dht::crypto::Certificate>&, std::string&, const std::string&)>; + using OnPeerStateChanged = std::function<void(const std::string&, bool)>; + MessageChannelHandler(dhtnet::ConnectionManager& cm, OnMessage onMessage, OnPeerStateChanged onPeer); ~MessageChannelHandler(); /** @@ -70,10 +72,11 @@ public: struct Message { + uint64_t id {0}; /* Message ID */ std::string t; /* Message type */ std::string c; /* Message content */ std::unique_ptr<ConversationRequest> req; /* Conversation request */ - MSGPACK_DEFINE_MAP(t, c, req) + MSGPACK_DEFINE_MAP(id, t, c, req) }; static bool sendMessage(const std::shared_ptr<dhtnet::ChannelSocket>&, const Message& message);