diff --git a/src/ringdht/ringaccount.cpp b/src/ringdht/ringaccount.cpp index 3bc9f5bc69115e2b444767aa88e33c9bf1a8ef73..319b2213dbbf3f2c3df8968b31a5240a63d2bf07 100644 --- a/src/ringdht/ringaccount.cpp +++ b/src/ringdht/ringaccount.cpp @@ -2279,9 +2279,12 @@ RingAccount::doRegister_() dht_.listen<dht::ImMessage>( inboxDeviceKey, [this,inboxDeviceKey](dht::ImMessage&& v) { - auto res = treatedMessages_.insert(v.id); - if (!res.second) - return true; + { + std::lock_guard<std::mutex> lock(messageMutex_); + auto res = treatedMessages_.insert(v.id); + if (!res.second) + return true; + } saveTreatedMessages(); onPeerMessage(v.from, [this, v, inboxDeviceKey](const std::shared_ptr<dht::crypto::Certificate>&, const dht::InfoHash& peer_account) @@ -2689,19 +2692,27 @@ RingAccount::saveTreatedCalls() const void RingAccount::loadTreatedMessages() { + std::lock_guard<std::mutex> lock(messageMutex_); treatedMessages_ = loadIdList(cachePath_+DIR_SEPARATOR_STR "treatedMessages"); } void RingAccount::saveTreatedMessages() const { - fileutils::check_dir(cachePath_.c_str()); - saveIdList(cachePath_+DIR_SEPARATOR_STR "treatedMessages", treatedMessages_); + ThreadPool::instance().run([w = weak()](){ + if (auto sthis = w.lock()) { + auto& this_ = *sthis; + std::lock_guard<std::mutex> lock(this_.messageMutex_); + fileutils::check_dir(this_.cachePath_.c_str()); + saveIdList(this_.cachePath_+DIR_SEPARATOR_STR "treatedMessages", this_.treatedMessages_); + } + }); } bool RingAccount::isMessageTreated(unsigned int id) { + std::lock_guard<std::mutex> lock(messageMutex_); auto res = treatedMessages_.insert(id); if (res.second) { saveTreatedMessages(); @@ -3387,8 +3398,11 @@ RingAccount::sendTextMessage(const std::string& to, const std::map<std::string, // Find listening devices for this account forEachDevice(toH, [confirm,token,payloads,now](const std::shared_ptr<RingAccount>& this_, const dht::InfoHash& dev) { - auto e = this_->sentMessages_.emplace(token, PendingMessage {}); - e.first->second.to = dev; + { + std::lock_guard<std::mutex> lock(this_->messageMutex_); + auto e = this_->sentMessages_.emplace(token, PendingMessage {}); + e.first->second.to = dev; + } auto h = dht::InfoHash::get("inbox:"+dev.toString()); std::weak_ptr<RingAccount> w = this_; @@ -3398,19 +3412,22 @@ RingAccount::sendTextMessage(const std::string& to, const std::map<std::string, // check expected message confirmation if (msg.id != token) return true; - auto e = this_.sentMessages_.find(msg.id); - if (e == this_.sentMessages_.end() or e->second.to != msg.from) { - RING_DBG() << "[Account " << this_.getAccountID() << "] [message " << token << "] Message not found"; - return true; - } - this_.sentMessages_.erase(e); - RING_DBG() << "[Account " << this_.getAccountID() << "] [message " << token << "] Received text message reply"; - // add treated message - auto res = this_.treatedMessages_.insert(msg.id); - if (!res.second) - return true; + { + std::lock_guard<std::mutex> lock(this_.messageMutex_); + auto e = this_.sentMessages_.find(msg.id); + if (e == this_.sentMessages_.end() or e->second.to != msg.from) { + RING_DBG() << "[Account " << this_.getAccountID() << "] [message " << token << "] Message not found"; + return true; + } + this_.sentMessages_.erase(e); + RING_DBG() << "[Account " << this_.getAccountID() << "] [message " << token << "] Received text message reply"; + // add treated message + auto res = this_.treatedMessages_.insert(msg.id); + if (!res.second) + return true; + } this_.saveTreatedMessages(); // report message as confirmed received diff --git a/src/ringdht/ringaccount.h b/src/ringdht/ringaccount.h index 6e33020d8e18d0c13bb1f8a104b76c2d0c93e030..454e4a4af23b99cbdad387d2755f5008895ef759 100644 --- a/src/ringdht/ringaccount.h +++ b/src/ringdht/ringaccount.h @@ -519,6 +519,7 @@ class RingAccount : public SIPAccountBase { std::set<dht::Value::Id> treatedCalls_ {}; mutable std::mutex callsMutex_ {}; + mutable std::mutex messageMutex_ {}; std::map<dht::Value::Id, PendingMessage> sentMessages_; std::set<dht::Value::Id> treatedMessages_ {};