diff --git a/src/im/message_engine.cpp b/src/im/message_engine.cpp index b39fb326d695b6c787a47d83bb5b82401ae90de4..143a9a0cdbfff3183672030caefcdfd0c0da8eeb 100644 --- a/src/im/message_engine.cpp +++ b/src/im/message_engine.cpp @@ -62,7 +62,6 @@ MessageEngine::sendMessage(const std::string& to, const std::map<std::string, st void MessageEngine::reschedule() { - std::lock_guard<std::mutex> lock(messagesMutex_); if (messages_.empty()) return; std::weak_ptr<Account> w = std::static_pointer_cast<Account>(account_.shared_from_this()); @@ -114,6 +113,7 @@ MessageEngine::retrySend() } // avoid locking while calling callback for (auto& p : pending) { + RING_DBG("[message %" PRIx64 "] retrying sending", p.token); emitSignal<DRing::ConfigurationSignal::AccountMessageStatusChanged>( account_.getAccountID(), p.token, @@ -134,43 +134,44 @@ MessageEngine::getStatus(MessageToken t) const void MessageEngine::onMessageSent(MessageToken token, bool ok) { - RING_DBG("Message %" PRIu64 ": %s", token, ok ? "success" : "failure"); + RING_DBG("[message %" PRIx64 "] message sent: %s", token, ok ? "success" : "failure"); std::lock_guard<std::mutex> lock(messagesMutex_); auto f = messages_.find(token); if (f != messages_.end()) { if (f->second.status == MessageStatus::SENDING) { if (ok) { f->second.status = MessageStatus::SENT; - RING_DBG("Status SENT for message %" PRIu64, token); + RING_DBG("[message %" PRIx64 "] status changed to SENT", token); emitSignal<DRing::ConfigurationSignal::AccountMessageStatusChanged>(account_.getAccountID(), token, f->second.to, static_cast<int>(DRing::Account::MessageStates::SENT)); - } else if (f->second.retried == MAX_RETRIES) { + } else if (f->second.retried >= MAX_RETRIES) { f->second.status = MessageStatus::FAILURE; - RING_DBG("Status FAILURE for message %" PRIu64, token); + RING_WARN("[message %" PRIx64 "] status changed to FAILURE", token); emitSignal<DRing::ConfigurationSignal::AccountMessageStatusChanged>(account_.getAccountID(), token, f->second.to, static_cast<int>(DRing::Account::MessageStates::FAILURE)); } else { f->second.status = MessageStatus::IDLE; - RING_DBG("Status IDLE for message %" PRIu64, token); - // TODO: reschedule sending + RING_DBG("[message %" PRIx64 "] status changed to IDLE", token); + reschedule(); } } else { - RING_DBG("Message %" PRIu64 " not SENDING", token); + RING_DBG("[message %" PRIx64 "] state is not SENDING", token); } } else { - RING_DBG("Can't find message %" PRIu64, token); + RING_DBG("[message %" PRIx64 "] can't find message", token); } } void MessageEngine::load() { + std::lock_guard<std::mutex> lock(messagesMutex_); try { std::ifstream file; file.exceptions(std::ifstream::failbit | std::ifstream::badbit); @@ -181,8 +182,7 @@ MessageEngine::load() if (!reader.parse(file, root)) throw std::runtime_error("can't parse JSON."); - std::lock_guard<std::mutex> lock(messagesMutex_); - + long unsigned loaded {0}; for (auto i = root.begin(); i != root.end(); ++i) { const auto& jmsg = *i; MessageToken token; @@ -198,12 +198,14 @@ MessageEngine::load() for (auto p = pl.begin(); p != pl.end(); ++p) msg.payloads[p.key().asString()] = p->asString(); messages_.emplace(token, std::move(msg)); + loaded++; } + RING_DBG("[Account %s] loaded %lu messages from %s", account_.getAccountID().c_str(), loaded, savePath_.c_str()); // everything whent fine, removing the file std::remove(savePath_.c_str()); } catch (const std::exception& e) { - RING_ERR("Could not load messages from %s: %s", savePath_.c_str(), e.what()); + RING_ERR("[Account %s] couldn't load messages from %s: %s", account_.getAccountID().c_str(), savePath_.c_str(), e.what()); } reschedule(); } @@ -235,8 +237,9 @@ MessageEngine::save() const file.exceptions(std::ifstream::failbit | std::ifstream::badbit); file.open(savePath_, std::ios::trunc); file << fastWriter.write(root); + RING_DBG("[Account %s] saved %lu messages to %s", account_.getAccountID().c_str(), messages_.size(), savePath_.c_str()); } catch (const std::exception& e) { - RING_ERR("Could not save messages to %s: %s", savePath_.c_str(), e.what()); + RING_ERR("[Account %s] couldn't save messages to %s: %s", account_.getAccountID().c_str(), savePath_.c_str(), e.what()); } } diff --git a/src/ringdht/ringaccount.cpp b/src/ringdht/ringaccount.cpp index 632cbbf581be6022fea7f14742374e7c9a96642f..894c03453571cdb8b3332d707505b7d05d436b67 100644 --- a/src/ringdht/ringaccount.cpp +++ b/src/ringdht/ringaccount.cpp @@ -2158,7 +2158,7 @@ RingAccount::doRegister_() std::map<std::string, std::string> payloads = {{"text/plain", utf8_make_valid(v.msg)}}; shared->onTextMessage(peer_account.toString(), payloads); - RING_DBG("Sending message confirmation %" PRIu64, v.id); + RING_DBG("Sending message confirmation %" PRIx64, v.id); shared->dht_.putEncrypted(inboxDeviceKey, v.from, dht::ImMessage(v.id, std::string(), now)); @@ -3124,43 +3124,34 @@ RingAccount::sendTextMessage(const std::string& to, const std::map<std::string, e.first->second.to = dev; auto h = dht::InfoHash::get("inbox:"+dev.toString()); - RING_DBG("Found device to send message %s -> %s", dev.toString().c_str(), h.toString().c_str()); - std::weak_ptr<RingAccount> wshared = shared; auto list_token = shared->dht_.listen<dht::ImMessage>(h, [h,wshared,token,confirm](dht::ImMessage&& msg) { - if (auto this_ = wshared.lock()) { + if (auto sthis = wshared.lock()) { + auto& this_ = *sthis; // check expected message confirmation if (msg.id != token) return true; - auto e = this_->sentMessages_.find(msg.id); - if (e == this_->sentMessages_.end()) { - RING_DBG("Message not found for %" PRIu64, token); - return true; - } - if (e->second.to != msg.from) { - RING_DBG("Unrelated text message : from %s != second %s", - msg.from.toString().c_str(), e->second.to.toString().c_str()); - } - if (e == this_->sentMessages_.end() || e->second.to != msg.from) { - RING_DBG("Unrelated text message reply for %" PRIu64, token); + auto e = this_.sentMessages_.find(msg.id); + if (e == this_.sentMessages_.end() or e->second.to != msg.from) { + RING_DBG("[Account %s] [message %" PRIx64 "] message not found", this_.getAccountID().c_str(), token); return true; } - this_->sentMessages_.erase(e); - RING_DBG("Relevant text message reply for %" PRIu64, token); + this_.sentMessages_.erase(e); + RING_DBG("[Account %s] [message %" PRIx64 "] received text message reply", this_.getAccountID().c_str(), token); // add treated message - auto res = this_->treatedMessages_.insert(msg.id); + auto res = this_.treatedMessages_.insert(msg.id); if (!res.second) return true; - this_->saveTreatedMessages(); + this_.saveTreatedMessages(); // report message as confirmed received for (auto& t : confirm->listenTokens) - this_->dht_.cancelListen(t.first, t.second.get()); + this_.dht_.cancelListen(t.first, t.second.get()); confirm->listenTokens.clear(); confirm->replied = true; - this_->messageEngine_.onMessageSent(token, true); + this_.messageEngine_.onMessageSent(token, true); } return false; }); @@ -3169,15 +3160,36 @@ RingAccount::sendTextMessage(const std::string& to, const std::map<std::string, dht::ImMessage(token, std::string(payloads.begin()->second), now), [wshared,token,confirm,h](bool ok) { if (auto this_ = wshared.lock()) { + RING_DBG("[Account %s] [message %" PRIx64 "] put encrypted %s", this_->getAccountID().c_str(), token, ok ? "ok" : "failed"); if (not ok) { - confirm->listenTokens.erase(h); + auto lt = confirm->listenTokens.find(h); + if (lt != confirm->listenTokens.end()) { + this_->dht_.cancelListen(h, lt->second.get()); + confirm->listenTokens.erase(lt); + } if (confirm->listenTokens.empty() and not confirm->replied) this_->messageEngine_.onMessageSent(token, false); } } }); - RING_DBG("Put encrypted message at %s for %s", h.toString().c_str(), dev.toString().c_str()); + + RING_DBG("[Account %s] [message %" PRIx64 "] sending message for device %s", shared->getAccountID().c_str(), token, dev.toString().c_str()); }); + + // Timeout cleanup + std::weak_ptr<RingAccount> wshared = shared(); + Manager::instance().scheduleTask([wshared, confirm, token]() { + if (not confirm->replied and not confirm->listenTokens.empty()) { + if (auto this_ = wshared.lock()) { + RING_DBG("[Account %s] [message %" PRIx64 "] timeout", this_->getAccountID().c_str(), token); + for (auto& t : confirm->listenTokens) + this_->dht_.cancelListen(t.first, t.second.get()); + confirm->listenTokens.clear(); + confirm->replied = true; + this_->messageEngine_.onMessageSent(token, false); + } + } + }, std::chrono::steady_clock::now() + std::chrono::minutes(1)); } void