diff --git a/src/jamidht/jamiaccount.cpp b/src/jamidht/jamiaccount.cpp index 6c1798ce74318111826ade81077789200ee57c1b..92becca1b96bb2eb896d5cf17f13fca9c1e8e131 100644 --- a/src/jamidht/jamiaccount.cpp +++ b/src/jamidht/jamiaccount.cpp @@ -114,18 +114,6 @@ static constexpr const char MIME_TYPE_INVITE_JSON[] {"application/invite+json"}; static constexpr const char DEVICE_ID_PATH[] {"ring_device"}; static constexpr auto TREATED_PATH = "treatedImMessages"sv; -// Used to pass info to a pjsip callback (pjsip_endpt_send_request) -struct TextMessageCtx -{ - std::weak_ptr<JamiAccount> acc; - std::string to; - DeviceId deviceId; - uint64_t id; - bool retryOnTimeout; - std::shared_ptr<dhtnet::ChannelSocket> channel; - bool onlyConnected; -}; - struct VCardMessageCtx { std::shared_ptr<std::atomic_int> success; @@ -3103,8 +3091,6 @@ JamiAccount::sendMessage(const std::string& to, return; } - auto devices = std::make_shared<std::set<DeviceId>>(); - // Use the Message channel if available std::shared_lock clk(connManagerMtx_); auto* handler = static_cast<MessageChannelHandler*>( @@ -3115,41 +3101,121 @@ JamiAccount::sendMessage(const std::string& to, messageEngine_.onMessageSent(to, token, false, deviceId); return; } + + /** + * Track sending state for a single message to one or more devices. + */ + class SendMessageContext { + public: + using OnComplete = std::function<void(bool, bool)>; + SendMessageContext(OnComplete onComplete) : onComplete(std::move(onComplete)) {} + /** Track new pending message for device */ + bool add(const DeviceId& device) { + std::lock_guard lk(mtx); + return devices.insert(device).second; + } + /** Call after all messages are sent */ + void start() { + std::unique_lock lk(mtx); + started = true; + checkComplete(lk); + } + /** Complete pending message for device */ + bool complete(const DeviceId& device, bool success) { + std::unique_lock lk(mtx); + if (devices.erase(device) == 0) + return false; + ++completeCount; + if (success) + ++successCount; + checkComplete(lk); + return true; + } + bool empty() const { + std::lock_guard lk(mtx); + return devices.empty(); + } + bool pending(const DeviceId& device) const { + std::lock_guard lk(mtx); + return devices.find(device) != devices.end(); + } + private: + mutable std::mutex mtx; + OnComplete onComplete; + std::set<DeviceId> devices; + unsigned completeCount = 0; + unsigned successCount = 0; + bool started {false}; + + void checkComplete(std::unique_lock<std::mutex>& lk) { + if (started && (devices.empty() || successCount)) { + if (onComplete) { + auto cb = std::move(onComplete); + onComplete = {}; + lk.unlock(); + cb(successCount != 0, completeCount != 0); + } + } + } + }; + auto devices = std::make_shared<SendMessageContext>([ + w= weak(), + to, + token, + deviceId, + onlyConnected, + retryOnTimeout + ](bool success, bool sent) { + if (auto acc = w.lock()) + acc->onMessageSent(to, token, deviceId, success, onlyConnected, sent && retryOnTimeout); + }); + + struct TextMessageCtx { + std::weak_ptr<JamiAccount> acc; + std::string peerId; + DeviceId deviceId; + std::shared_ptr<SendMessageContext> devices; + std::shared_ptr<dhtnet::ChannelSocket> sipChannel; + }; + + auto completed = [w = weak(), to, devices](const DeviceId& device, std::shared_ptr<dhtnet::ChannelSocket> conn, bool success) { + if (!success) + if (auto acc = w.lock()) { + std::shared_lock clk(acc->connManagerMtx_); + if (auto* handler = static_cast<MessageChannelHandler*>( + acc->channelHandlers_[Uri::Scheme::MESSAGE].get())) { + handler->closeChannel(to, device, conn); + } + } + devices->complete(device, success); + }; + const auto& payload = *payloads.begin(); - MessageChannelHandler::Message msg; - msg.id = token; - msg.t = payload.first; - msg.c = payload.second; + auto msg = std::make_shared<MessageChannelHandler::Message>(); + msg->id = token; + msg->t = payload.first; + msg->c = payload.second; auto device = deviceId.empty() ? DeviceId() : DeviceId(deviceId); if (deviceId.empty()) { - bool sent = false; auto conns = handler->getChannels(toUri); clk.unlock(); for (const auto& conn : conns) { - if (MessageChannelHandler::sendMessage(conn, msg)) { - devices->emplace(conn->deviceId()); - sent = true; - } - } - if (sent) { - if (!onlyConnected) - runOnMainThread([w = weak(), to, token, deviceId]() { - if (auto acc = w.lock()) - acc->messageEngine_.onMessageSent(to, token, true, deviceId); - }); - return; + auto connDevice = conn->deviceId(); + if (!devices->add(connDevice)) + continue; + dht::ThreadPool::io().run([completed, connDevice, conn, msg] { + completed(connDevice, conn, MessageChannelHandler::sendMessage(conn, *msg)); + }); } } else { if (auto conn = handler->getChannel(toUri, device)) { clk.unlock(); - if (MessageChannelHandler::sendMessage(conn, msg)) { - if (!onlyConnected) - runOnMainThread([w = weak(), to, token, deviceId]() { - if (auto acc = w.lock()) - acc->messageEngine_.onMessageSent(to, token, true, deviceId); - }); - return; - } + devices->add(device); + dht::ThreadPool::io().run([completed, device, conn, msg] { + completed(device, conn, MessageChannelHandler::sendMessage(conn, *msg)); + }); + devices->start(); + return; } } if (clk) @@ -3161,7 +3227,7 @@ JamiAccount::sendMessage(const std::string& to, continue; if (!deviceId.empty() && key.second != device) continue; - if (!devices->emplace(key.second).second) + if (!devices->add(key.second)) continue; auto& conn = value.back(); @@ -3170,12 +3236,10 @@ JamiAccount::sendMessage(const std::string& to, // Set input token into callback auto ctx = std::make_unique<TextMessageCtx>(); ctx->acc = weak(); - ctx->to = to; - ctx->deviceId = device; - ctx->id = token; - ctx->onlyConnected = onlyConnected; - ctx->retryOnTimeout = retryOnTimeout; - ctx->channel = channel; + ctx->peerId = to; + ctx->deviceId = key.second; + ctx->devices = devices; + ctx->sipChannel = channel; try { auto res = sendSIPMessage(conn, @@ -3184,42 +3248,47 @@ JamiAccount::sendMessage(const std::string& to, token, payloads, [](void* token, pjsip_event* event) { - std::shared_ptr<TextMessageCtx> c { - (TextMessageCtx*) token}; - auto code = event->body.tsx_state.tsx->status_code; - runOnMainThread([c = std::move(c), code]() { - if (c) { - if (auto acc = c->acc.lock()) - acc->onSIPMessageSent(std::move(c), code); - } - }); + if (auto c = std::shared_ptr<TextMessageCtx>{(TextMessageCtx*) token}) + runOnMainThread([ + c = std::move(c), + code = event->body.tsx_state.tsx->status_code + ] { + bool success = code == PJSIP_SC_OK; + // Note: This can be called from PJSIP's eventloop while + // sipConnsMtx_ is locked. So we should retrigger the shutdown. + if (!success) { + JAMI_WARNING("Timeout when send a message, close current connection"); + if (auto acc = c->acc.lock()) + acc->shutdownSIPConnection(c->sipChannel, + c->peerId, + c->deviceId); + } + c->devices->complete(c->deviceId, code == PJSIP_SC_OK); + }); }); if (!res) { - if (!onlyConnected) - messageEngine_.onMessageSent(to, token, false, deviceId); - devices->erase(key.second); + devices->complete(key.second, false); continue; } } catch (const std::runtime_error& ex) { JAMI_WARNING("{}", ex.what()); - if (!onlyConnected) - messageEngine_.onMessageSent(to, token, false, deviceId); - devices->erase(key.second); // Remove connection in incorrect state shutdownSIPConnection(channel, to, key.second); + devices->complete(key.second, false); continue; } if (key.second == device) { + devices->start(); return; } } lk.unlock(); + devices->start(); if (onlyConnected) return; // We are unable to send the message directly, try connecting - messageEngine_.onMessageSent(to, token, false, deviceId); // Get conversation id, which will be used by the iOS notification extension // to load the conversation. @@ -3237,9 +3306,9 @@ JamiAccount::sendMessage(const std::string& to, }; // get request type - auto payload_type = msg.t; + auto payload_type = msg->t; if (payload_type == MIME_TYPE_GIT) { - std::string id = extractIdFromJson(msg.c); + std::string id = extractIdFromJson(msg->c); if (!id.empty()) { payload_type += "/" + id; } @@ -3257,8 +3326,8 @@ JamiAccount::sendMessage(const std::string& to, const std::shared_ptr<dht::crypto::PublicKey>& dev) { // Test if already sent auto deviceId = dev->getLongId(); - if (!devices->emplace(deviceId).second - || deviceId == currentDevice) { + if (deviceId == currentDevice + || devices->pending(deviceId)) { return; } @@ -3271,35 +3340,17 @@ JamiAccount::sendMessage(const std::string& to, } void -JamiAccount::onSIPMessageSent(const std::shared_ptr<TextMessageCtx>& ctx, int code) -{ - if (code == PJSIP_SC_OK) { - if (!ctx->onlyConnected) - messageEngine_.onMessageSent(ctx->to, - ctx->id, - true, - ctx->deviceId ? ctx->deviceId.toString() : ""); - } else { - // Note: This can be called from PJSIP's eventloop while - // sipConnsMtx_ is locked. So we should retrigger the shutdown. - auto acc = ctx->acc.lock(); - if (not acc) - return; - JAMI_WARN("Timeout when send a message, close current connection"); - shutdownSIPConnection(ctx->channel, ctx->to, ctx->deviceId); - // This MUST be done after closing the connection to avoid race condition - // with messageEngine_ - if (!ctx->onlyConnected) - messageEngine_.onMessageSent(ctx->to, - ctx->id, - false, - ctx->deviceId ? ctx->deviceId.toString() : ""); - - // In that case, the peer typically changed its connectivity. - // After closing sockets with that peer, we try to re-connect to - // that peer one time. - if (ctx->retryOnTimeout) - messageEngine_.onPeerOnline(ctx->to, ctx->deviceId ? ctx->deviceId.toString() : ""); +JamiAccount::onMessageSent(const std::string& to, uint64_t id, const std::string& deviceId, bool success, bool onlyConnected, bool retry) +{ + if (!onlyConnected) + messageEngine_.onMessageSent(to, + id, + success, + deviceId); + + if (!success) { + if (retry) + messageEngine_.onPeerOnline(to, deviceId); } } diff --git a/src/jamidht/jamiaccount.h b/src/jamidht/jamiaccount.h index 1e5fd5b9a4f8a8c784f1c3237091a0f1f5aacdcb..f52cc136ffe0e5468e113a9940294097cdb85e01 100644 --- a/src/jamidht/jamiaccount.h +++ b/src/jamidht/jamiaccount.h @@ -84,7 +84,6 @@ struct AccountInfo; class SipTransport; class ChanneledOutgoingTransfer; class SyncModule; -struct TextMessageCtx; using SipConnectionKey = std::pair<std::string /* uri */, DeviceId>; @@ -891,7 +890,7 @@ private: uint64_t token, const std::map<std::string, std::string>& data, pjsip_endpt_send_callback cb); - void onSIPMessageSent(const std::shared_ptr<TextMessageCtx>& ctx, int code); + void onMessageSent(const std::string& to, uint64_t id, const std::string& deviceId, bool success, bool onlyConnected, bool retry); std::mutex gitServersMtx_ {}; std::map<dht::Value::Id, std::unique_ptr<GitServer>> gitServers_ {}; diff --git a/src/jamidht/message_channel_handler.cpp b/src/jamidht/message_channel_handler.cpp index 616f76f8c5e1586a8b932c0151cfd7cecfd377e7..d7c2766acd51345e885507cf7c7048b3687baa8e 100644 --- a/src/jamidht/message_channel_handler.cpp +++ b/src/jamidht/message_channel_handler.cpp @@ -189,6 +189,30 @@ MessageChannelHandler::onReady(const std::shared_ptr<dht::crypto::Certificate>& }); } +void +MessageChannelHandler::closeChannel(const std::string& peer, const DeviceId& device, const std::shared_ptr<dhtnet::ChannelSocket>& conn) +{ + if (!conn) + return; + std::unique_lock lk(pimpl_->connectionsMtx_); + auto it = pimpl_->connections_.find(peer); + if (it != pimpl_->connections_.end()) { + auto deviceIt = it->second.find(device); + if (deviceIt != it->second.end()) { + auto& channels = deviceIt->second; + channels.erase(std::remove(channels.begin(), channels.end(), conn), channels.end()); + if (channels.empty()) { + it->second.erase(deviceIt); + if (it->second.empty()) { + pimpl_->connections_.erase(it); + } + } + } + } + lk.unlock(); + conn->stop(); +} + bool MessageChannelHandler::sendMessage(const std::shared_ptr<dhtnet::ChannelSocket>& socket, const Message& message) diff --git a/src/jamidht/message_channel_handler.h b/src/jamidht/message_channel_handler.h index d6fd46a67a2049835230564c29a6dc7fccc4c5e3..06953aaa1714a30e21b3c11fea2449236bd3e5d9 100644 --- a/src/jamidht/message_channel_handler.h +++ b/src/jamidht/message_channel_handler.h @@ -70,6 +70,8 @@ public: const std::string& name, std::shared_ptr<dhtnet::ChannelSocket> channel) override; + void closeChannel(const std::string& peer, const DeviceId& device, const std::shared_ptr<dhtnet::ChannelSocket>& conn); + struct Message { uint64_t id {0}; /* Message ID */