diff --git a/bin/dbus/cx.ring.Ring.ConfigurationManager.xml b/bin/dbus/cx.ring.Ring.ConfigurationManager.xml index 4ab94e824c9c401f239439d1cfa234a166e81c80..5e578d02acb4a213ee90534b2a1e4611cef0683e 100644 --- a/bin/dbus/cx.ring.Ring.ConfigurationManager.xml +++ b/bin/dbus/cx.ring.Ring.ConfigurationManager.xml @@ -1952,6 +1952,23 @@ </arg> </signal> + <signal name="conversationRemoved" tp:name-for-bindings="conversationRemoved"> + <tp:added version="10.0.0"/> + <tp:docstring> + Notify clients when a conversation is removed + </tp:docstring> + <arg type="s" name="account_id"> + <tp:docstring> + Account id related + </tp:docstring> + </arg> + <arg type="s" name="conversation_id"> + <tp:docstring> + Conversation id + </tp:docstring> + </arg> + </signal> + <signal name="debugMessageReceived" tp:name-for-bindings="debugMessageReceived"> <tp:added version="5.2.0"/> <tp:docstring> diff --git a/bin/dbus/dbusclient.cpp b/bin/dbus/dbusclient.cpp index a33ee8a1ed9cfc57060323de670d3de50cbcb2fd..8d7c9f4173b8ca75e7f3bcbaa9a334fa72ce93c4 100644 --- a/bin/dbus/dbusclient.cpp +++ b/bin/dbus/dbusclient.cpp @@ -280,6 +280,8 @@ DBusClient::initLibrary(int flags) bind(&DBusConfigurationManager::conversationRequestReceived, confM, _1, _2, _3)), exportable_callback<ConversationSignal::ConversationReady>( bind(&DBusConfigurationManager::conversationReady, confM, _1, _2)), + exportable_callback<ConversationSignal::ConversationRemoved>( + bind(&DBusConfigurationManager::conversationRemoved, confM, _1, _2)), }; #ifdef ENABLE_VIDEO diff --git a/bin/jni/conversation.i b/bin/jni/conversation.i index 3abc9cb32a1c7aa3be98996b306a243dc9da6380..36fed2c044ece2d2cd28ee18cffdb951f4c97130 100644 --- a/bin/jni/conversation.i +++ b/bin/jni/conversation.i @@ -29,6 +29,7 @@ public: virtual void messageReceived(const std::string& /*accountId*/, const std::string& /* conversationId */, std::map<std::string, std::string> /*message*/){} virtual void conversationRequestReceived(const std::string& /*accountId*/, const std::string& /* conversationId */, std::map<std::string, std::string> /*metadatas*/){} virtual void conversationReady(const std::string& /*accountId*/, const std::string& /* conversationId */){} + virtual void conversationRemoved(const std::string& /*accountId*/, const std::string& /* conversationId */){} }; %} @@ -62,4 +63,5 @@ public: virtual void messageReceived(const std::string& /*accountId*/, const std::string& /* conversationId */, std::map<std::string, std::string> /*message*/){} virtual void conversationRequestReceived(const std::string& /*accountId*/, const std::string& /* conversationId */, std::map<std::string, std::string> /*metadatas*/){} virtual void conversationReady(const std::string& /*accountId*/, const std::string& /* conversationId */){} + virtual void conversationRemoved(const std::string& /*accountId*/, const std::string& /* conversationId */){} }; \ No newline at end of file diff --git a/bin/jni/jni_interface.i b/bin/jni/jni_interface.i index 9d57ad8528d9e60cb75559e447162972377fd331..89ad82fae732f81db1798d9c08db04c39b1cdbb8 100644 --- a/bin/jni/jni_interface.i +++ b/bin/jni/jni_interface.i @@ -316,7 +316,8 @@ void init(ConfigurationCallback* confM, Callback* callM, PresenceCallback* presM exportable_callback<ConversationSignal::ConversationLoaded>(bind(&ConversationCallback::conversationLoaded, convM, _1, _2, _3, _4)), exportable_callback<ConversationSignal::MessageReceived>(bind(&ConversationCallback::messageReceived, convM, _1, _2, _3)), exportable_callback<ConversationSignal::ConversationRequestReceived>(bind(&ConversationCallback::conversationRequestReceived, convM, _1, _2, _3)), - exportable_callback<ConversationSignal::ConversationReady>(bind(&ConversationCallback::conversationReady, convM, _1, _2)) + exportable_callback<ConversationSignal::ConversationReady>(bind(&ConversationCallback::conversationReady, convM, _1, _2)), + exportable_callback<ConversationSignal::ConversationRemoved>(bind(&ConversationCallback::conversationRemoved, convM, _1, _2)) }; if (!DRing::init(static_cast<DRing::InitFlag>(DRing::DRING_FLAG_DEBUG))) diff --git a/bin/nodejs/conversation.i b/bin/nodejs/conversation.i index b246337d5fc0b5c49333c7f8c19e590de9323f64..2123934d38cda4710a55b0d41f4542f0aa701203 100644 --- a/bin/nodejs/conversation.i +++ b/bin/nodejs/conversation.i @@ -29,6 +29,7 @@ public: virtual void messageReceived(const std::string& /*accountId*/, const std::string& /* conversationId */, std::map<std::string, std::string> /*message*/){} virtual void conversationRequestReceived(const std::string& /*accountId*/, const std::string& /* conversationId */, std::map<std::string, std::string> /*metadatas*/){} virtual void conversationReady(const std::string& /*accountId*/, const std::string& /* conversationId */){} + virtual void conversationRemoved(const std::string& /*accountId*/, const std::string& /* conversationId */){} }; %} @@ -75,4 +76,5 @@ public: virtual void messageReceived(const std::string& /*accountId*/, const std::string& /* conversationId */, std::map<std::string, std::string> /*message*/){} virtual void conversationRequestReceived(const std::string& /*accountId*/, const std::string& /* conversationId */, std::map<std::string, std::string> /*metadatas*/){} virtual void conversationReady(const std::string& /*accountId*/, const std::string& /* conversationId */){} + virtual void conversationRemoved(const std::string& /*accountId*/, const std::string& /* conversationId */){} }; \ No newline at end of file diff --git a/src/client/ring_signal.cpp b/src/client/ring_signal.cpp index 689f7812947d83bdbd0487346ebb69829655cc5a..79a1d86cb5442fda7cd57de688ac0248e1288962 100644 --- a/src/client/ring_signal.cpp +++ b/src/client/ring_signal.cpp @@ -131,6 +131,7 @@ getSignalHandlers() exported_callback<DRing::ConversationSignal::MessageReceived>(), exported_callback<DRing::ConversationSignal::ConversationRequestReceived>(), exported_callback<DRing::ConversationSignal::ConversationReady>(), + exported_callback<DRing::ConversationSignal::ConversationRemoved>(), }; return handlers; diff --git a/src/dring/conversation_interface.h b/src/dring/conversation_interface.h index ac450fbf664e807a513bfffee788ddcfbf2822d7..ed7927e0cd6769c7415d5cb8e41ab3f166aaf3d5 100644 --- a/src/dring/conversation_interface.h +++ b/src/dring/conversation_interface.h @@ -93,6 +93,12 @@ struct DRING_PUBLIC ConversationSignal using cb_type = void(const std::string& /*accountId*/, const std::string& /* conversationId */); }; + struct DRING_PUBLIC ConversationRemoved + { + constexpr static const char* name = "ConversationRemoved"; + using cb_type = void(const std::string& /*accountId*/, + const std::string& /* conversationId */); + }; }; } // namespace DRing diff --git a/src/jamidht/conversation.cpp b/src/jamidht/conversation.cpp index 426af1b8df8d34b52982e963c1f1b6999ba90170..f10ff171ff7a001e5605c674658917c55122c96b 100644 --- a/src/jamidht/conversation.cpp +++ b/src/jamidht/conversation.cpp @@ -24,6 +24,9 @@ #include "conversationrepository.h" #include <json/json.h> +#include <string_view> +#include <opendht/thread_pool.h> +#include <tuple> namespace jami { @@ -60,9 +63,14 @@ public: std::unique_ptr<ConversationRepository> repository_; std::weak_ptr<JamiAccount> account_; + std::atomic_bool isRemoving_ {false}; std::vector<std::map<std::string, std::string>> loadMessages(const std::string& fromMessage = "", const std::string& toMessage = "", size_t n = 0); + + std::mutex pullcbsMtx_ {}; + std::set<std::string> fetchingRemotes_ {}; // store current remote in fetch + std::deque<std::tuple<std::string, std::string, OnPullCb>> pullcbs_ {}; }; std::string @@ -203,9 +211,7 @@ Conversation::getMembers(bool includeInvited) const } if (includeInvited) { for (const auto& uri : fileutils::readDirectory(invitedPath)) { - std::map<std::string, std::string> - details {{"uri", uri }, - {"role", "invited"}}; + std::map<std::string, std::string> details {{"uri", uri}, {"role", "invited"}}; result.emplace_back(details); } } @@ -267,16 +273,39 @@ Conversation::sendMessage(const std::string& message, return pimpl_->repository_->commitMessage(Json::writeString(wbuilder, json)); } -std::vector<std::map<std::string, std::string>> -Conversation::loadMessages(const std::string& fromMessage, size_t n) +void +Conversation::loadMessages(const OnLoadMessages& cb, const std::string& fromMessage, size_t n) { - return pimpl_->loadMessages(fromMessage, "", n); + if (!cb) + return; + dht::ThreadPool::io().run([w = weak(), cb = std::move(cb), fromMessage, n] { + if (auto sthis = w.lock()) { + cb(sthis->pimpl_->loadMessages(fromMessage, "", n)); + } + }); } -std::vector<std::map<std::string, std::string>> -Conversation::loadMessages(const std::string& fromMessage, const std::string& toMessage) +void +Conversation::loadMessages(const OnLoadMessages& cb, + const std::string& fromMessage, + const std::string& toMessage) +{ + if (!cb) + return; + dht::ThreadPool::io().run([w = weak(), cb = std::move(cb), fromMessage, toMessage] { + if (auto sthis = w.lock()) { + cb(sthis->pimpl_->loadMessages(fromMessage, toMessage, 0)); + } + }); +} + +std::string +Conversation::lastCommitId() const { - return pimpl_->loadMessages(fromMessage, toMessage, 0); + auto messages = pimpl_->loadMessages("", "", 1); + if (messages.empty()) + return {}; + return messages.front().at("id"); } bool @@ -310,6 +339,80 @@ Conversation::mergeHistory(const std::string& uri) return true; } +void +Conversation::pull(const std::string& uri, OnPullCb&& cb, std::string commitId) +{ + std::lock_guard<std::mutex> lk(pimpl_->pullcbsMtx_); + auto isInProgress = not pimpl_->pullcbs_.empty(); + pimpl_->pullcbs_.emplace_back( + std::make_tuple<std::string, std::string, OnPullCb>(std::string(uri), + std::move(commitId), + std::move(cb))); + if (isInProgress) + return; + dht::ThreadPool::io().run([w = weak()] { + auto sthis_ = w.lock(); + if (!sthis_) + return; + + std::string deviceId, commitId; + OnPullCb cb; + while (true) { + decltype(sthis_->pimpl_->pullcbs_)::value_type pullcb; + decltype(sthis_->pimpl_->fetchingRemotes_.begin()) it; + { + std::lock_guard<std::mutex> lk(sthis_->pimpl_->pullcbsMtx_); + if (sthis_->pimpl_->pullcbs_.empty()) + return; + auto elem = sthis_->pimpl_->pullcbs_.front(); + deviceId = std::get<0>(elem); + commitId = std::get<1>(elem); + cb = std::move(std::get<2>(elem)); + sthis_->pimpl_->pullcbs_.pop_front(); + + // Check if already using this remote, if so, no need to pull yet + // One pull at a time to avoid any early EOF or fetch errors. + if (sthis_->pimpl_->fetchingRemotes_.find(deviceId) + != sthis_->pimpl_->fetchingRemotes_.end()) { + sthis_->pimpl_->pullcbs_.emplace_back( + std::make_tuple<std::string, std::string, OnPullCb>(std::string(deviceId), + std::move(commitId), + std::move(cb))); + // Go to next pull + continue; + } + auto itr = sthis_->pimpl_->fetchingRemotes_.emplace(deviceId); + if (!itr.second) { + cb(false, {}); + continue; + } + it = itr.first; + } + // If recently fetched, the commit can already be there, so no need to do complex operations + if (commitId != "" && sthis_->pimpl_->repository_->getCommit(commitId) != std::nullopt) { + cb(true, {}); + std::lock_guard<std::mutex> lk(sthis_->pimpl_->pullcbsMtx_); + sthis_->pimpl_->fetchingRemotes_.erase(it); + continue; + } + // Pull from remote + auto fetched = sthis_->fetchFrom(deviceId); + { + std::lock_guard<std::mutex> lk(sthis_->pimpl_->pullcbsMtx_); + sthis_->pimpl_->fetchingRemotes_.erase(it); + } + + if (!fetched) { + cb(false, {}); + continue; + } + // auto newCommits = sthis_->mergeHistory(deviceId); + // auto ok = newCommits.empty(); + // if (cb) cb(true, std::move(newCommits)); + } + }); +} + std::map<std::string, std::string> Conversation::generateInvitation() const { @@ -334,4 +437,29 @@ Conversation::generateInvitation() const return invite; } +std::string +Conversation::leave() +{ + setRemovingFlag(); + return pimpl_->repository_->leave(); +} + +void +Conversation::setRemovingFlag() +{ + pimpl_->isRemoving_ = true; +} + +bool +Conversation::isRemoving() +{ + return pimpl_->isRemoving_; +} + +void +Conversation::erase() +{ + pimpl_->repository_->erase(); +} + } // namespace jami \ No newline at end of file diff --git a/src/jamidht/conversation.h b/src/jamidht/conversation.h index 1bea272328fd4d95625874cb17453bc42d46b456..aab97c5a46576c880e8b488dbb911760f89a6194 100644 --- a/src/jamidht/conversation.h +++ b/src/jamidht/conversation.h @@ -19,6 +19,7 @@ */ #pragma once +#include <functional> #include <string> #include <vector> #include <map> @@ -29,7 +30,11 @@ namespace jami { class JamiAccount; class ConversationRepository; -class Conversation +using OnPullCb = std::function<void(bool fetchOk,std::vector<std::map<std::string, std::string>>&& newMessages)>; +using OnLoadMessages = std::function<void(std::vector<std::map<std::string, std::string>>&& messages)>; + + +class Conversation : public std::enable_shared_from_this<Conversation> { public: Conversation(const std::weak_ptr<JamiAccount>& account, const std::string& conversationId = ""); @@ -79,21 +84,23 @@ public: const std::string& parent = ""); /** * Get a range of messages + * @param cb The callback when loaded * @param from The most recent message ("" = last (default)) * @param n Number of messages to get (0 = no limit (default)) - * @return the range of messages */ - std::vector<std::map<std::string, std::string>> loadMessages(const std::string& fromMessage = "", - size_t n = 0); + void loadMessages(const OnLoadMessages& cb, const std::string& fromMessage = "", size_t n = 0); /** * Get a range of messages + * @param cb The callback when loaded * @param fromMessage The most recent message ("" = last (default)) * @param toMessage The oldest message ("" = last (default)), no limit - * @return the range of messages */ - std::vector<std::map<std::string, std::string>> loadMessages(const std::string& fromMessage = "", - const std::string& toMessage = ""); - + void loadMessages(const OnLoadMessages& cb, const std::string& fromMessage = "", const std::string& toMessage = ""); + /** + * Get last commit id + * @return last commit id + */ + std::string lastCommitId() const; /** * Get new messages from peer * @param uri the peer @@ -108,13 +115,63 @@ public: */ bool mergeHistory(const std::string& uri); + /** + * Fetch and merge from peer + * @param uri Peer + * @param cb On pulled callback + * @param commitId Commit id that triggered this fetch + */ + void pull(const std::string& uri, OnPullCb&& cb, std::string commitId = ""); + /** * Generate an invitation to send to new contacts * @return the invite to send */ std::map<std::string, std::string> generateInvitation() const; + /** + * Leave a conversation + * @return commit id to send + */ + std::string leave(); + + /** + * Set a conversation as removing (when loading convInfo and still not sync) + * @todo: not a big fan to see this here. can be set in the constructor + * cause it's used by jamiaccount when loading conversations + */ + void setRemovingFlag(); + + /** + * Check if we are removing the conversation + * @return true if left the room + */ + bool isRemoving(); + + /** + * Erase all related datas + */ + void erase(); + private: + + std::shared_ptr<Conversation> shared() + { + return std::static_pointer_cast<Conversation>(shared_from_this()); + } + std::shared_ptr<Conversation const> shared() const + { + return std::static_pointer_cast<Conversation const>(shared_from_this()); + } + std::weak_ptr<Conversation> weak() + { + return std::static_pointer_cast<Conversation>(shared_from_this()); + } + std::weak_ptr<Conversation const> weak() const + { + return std::static_pointer_cast<Conversation const>(shared_from_this()); + } + class Impl; std::unique_ptr<Impl> pimpl_; }; diff --git a/src/jamidht/conversationrepository.cpp b/src/jamidht/conversationrepository.cpp index ce0619240b5627862d9292eec80d0fd5832983bc..2ddf3044bcc1d838b8681457fee46939abf6cb6e 100644 --- a/src/jamidht/conversationrepository.cpp +++ b/src/jamidht/conversationrepository.cpp @@ -55,7 +55,11 @@ public: throw std::logic_error("Couldn't open " + path); repository_ = {std::move(repo), git_repository_free}; } - ~Impl() = default; + ~Impl() + { + if (repository_) + repository_.reset(); + } GitSignature signature(); bool mergeFastforward(const git_oid* target_oid, int is_unborn); @@ -710,7 +714,7 @@ ConversationRepository::Impl::log(const std::string& from, const std::string& to git_revwalk* walker_ptr = nullptr; if (git_revwalk_new(&walker_ptr, repository_.get()) < 0 || git_revwalk_push(walker_ptr, &oid) < 0) { - JAMI_ERR("Couldn't init revwalker for conversation %s", id_.c_str()); + JAMI_DBG("Couldn't init revwalker for conversation %s", id_.c_str()); return commits; } GitRevWalker walker {walker_ptr, git_revwalk_free}; @@ -1027,17 +1031,26 @@ ConversationRepository::commitMessage(const std::string& msg) } std::vector<ConversationCommit> -ConversationRepository::logN(const std::string& last, unsigned n) +ConversationRepository::logN(const std::string& last, unsigned n) const { return pimpl_->log(last, "", n); } std::vector<ConversationCommit> -ConversationRepository::log(const std::string& from, const std::string& to) +ConversationRepository::log(const std::string& from, const std::string& to) const { return pimpl_->log(from, to, 0); } +std::optional<ConversationCommit> +ConversationRepository::getCommit(const std::string& commitId) const +{ + auto commits = logN(commitId, 1); + if (commits.empty()) + return std::nullopt; + return std::move(commits[0]); +} + bool ConversationRepository::merge(const std::string& merge_id) { @@ -1157,8 +1170,6 @@ ConversationRepository::changedFiles(const std::string_view& diffStats) std::string ConversationRepository::join() { - if (!pimpl_) - return {}; // Check that not already member std::string repoPath = git_repository_workdir(pimpl_->repository_.get()); auto account = pimpl_->account_.lock(); @@ -1207,4 +1218,85 @@ ConversationRepository::join() return pimpl_->commit(Json::writeString(wbuilder, json)); } +std::string +ConversationRepository::leave() +{ + // TODO simplify + auto account = pimpl_->account_.lock(); + if (!account) + return {}; + auto details = account->getAccountDetails(); + auto deviceId = details[DRing::Account::ConfProperties::RING_DEVICE_ID]; + auto uri = details[DRing::Account::ConfProperties::USERNAME]; + auto name = details[DRing::Account::ConfProperties::DISPLAYNAME]; + if (name.empty()) + name = account + ->getVolatileAccountDetails()[DRing::Account::VolatileProperties::REGISTERED_NAME]; + if (name.empty()) + name = deviceId; + + // Remove related files + std::string repoPath = git_repository_workdir(pimpl_->repository_.get()); + + std::string adminFile = repoPath + "admins" + DIR_SEPARATOR_STR + uri + ".crt"; + std::string memberFile = repoPath + "members" + DIR_SEPARATOR_STR + uri + ".crt"; + std::string crlsPath = repoPath + "CRLs"; + + if (fileutils::isFile(adminFile)) { + fileutils::removeAll(adminFile, true); + } + + if (fileutils::isFile(memberFile)) { + fileutils::removeAll(memberFile, true); + } + + // /CRLs + for (const auto& crl : account->identity().second->getRevocationLists()) { + if (!crl) + continue; + auto v = crl->getNumber(); + std::stringstream ss; + ss << std::hex; + for (const auto& b : v) + ss << (unsigned) b; + std::string crlPath = crlsPath + DIR_SEPARATOR_STR + deviceId + DIR_SEPARATOR_STR + ss.str() + + ".crl"; + + if (fileutils::isFile(crlPath)) { + fileutils::removeAll(crlPath, true); + } + } + + // Devices + for (const auto& d : account->getKnownDevices()) { + std::string deviceFile = repoPath + "devices" + DIR_SEPARATOR_STR + d.first + ".crt"; + if (fileutils::isFile(deviceFile)) { + fileutils::removeAll(deviceFile, true); + } + } + + if (!git_add_all(pimpl_->repository_.get())) { + return {}; + } + + Json::Value json; + json["action"] = "remove"; + json["uri"] = uri; + json["type"] = "member"; + Json::StreamWriterBuilder wbuilder; + wbuilder["commentStyle"] = "None"; + wbuilder["indentation"] = ""; + return pimpl_->commit(Json::writeString(wbuilder, json)); +} + +void +ConversationRepository::erase() +{ + // First, we need to add the member file to the repository if not present + std::string repoPath = git_repository_workdir(pimpl_->repository_.get()); + + JAMI_DBG() << "Erasing " << repoPath; + fileutils::removeAll(repoPath, true); +} + } // namespace jami diff --git a/src/jamidht/conversationrepository.h b/src/jamidht/conversationrepository.h index c8aa376c0efd9691dd4d11422c2bef9154032e01..a19a46d1ef7c843b346cd938005c0b28a5e749a7 100644 --- a/src/jamidht/conversationrepository.h +++ b/src/jamidht/conversationrepository.h @@ -17,6 +17,7 @@ */ #pragma once +#include <optional> #include <git2.h> #include <memory> #include <opendht/default_types.h> @@ -141,8 +142,10 @@ public: * @param n Max commits number to get (default: 0) * @return a list of commits */ - std::vector<ConversationCommit> logN(const std::string& last = "", unsigned n = 0); - std::vector<ConversationCommit> log(const std::string& from = "", const std::string& to = ""); + std::vector<ConversationCommit> logN(const std::string& last = "", unsigned n = 0) const; + std::vector<ConversationCommit> log(const std::string& from = "", + const std::string& to = "") const; + std::optional<ConversationCommit> getCommit(const std::string& commitId) const; /** * Merge another branch into the main branch @@ -173,6 +176,17 @@ public: */ std::string join(); + /** + * Erase self from repository + * @return commit Id + */ + std::string leave(); + + /** + * Erase repository + */ + void erase(); + private: ConversationRepository() = delete; class Impl; diff --git a/src/jamidht/jamiaccount.cpp b/src/jamidht/jamiaccount.cpp index 5df2da730d472d97d121f8aad396924bf40b2198..b1cc648406de9e6bd68d15daa962cc511600ae0d 100644 --- a/src/jamidht/jamiaccount.cpp +++ b/src/jamidht/jamiaccount.cpp @@ -138,6 +138,7 @@ struct ConvInfo std::string id {}; time_t created {0}; time_t removed {0}; + time_t erased {0}; ConvInfo() = default; ConvInfo(const Json::Value& json) @@ -145,6 +146,7 @@ struct ConvInfo id = json["id"].asString(); created = json["created"].asLargestUInt(); removed = json["removed"].asLargestUInt(); + erased = json["erased"].asLargestUInt(); } Json::Value toJson() const @@ -155,10 +157,13 @@ struct ConvInfo if (removed) { json["removed"] = Json::Int64(removed); } + if (erased) { + json["erased"] = Json::Int64(erased); + } return json; } - MSGPACK_DEFINE_MAP(id, created, removed) + MSGPACK_DEFINE_MAP(id, created, removed, erased) }; // ConversationRequest @@ -416,7 +421,10 @@ JamiAccount::shutdownConnections() } for (auto& [_id, gs] : gservers) gs->stop(); - connectionManager_.reset(); + { + std::lock_guard<std::mutex> lk(connManagerMtx_); + connectionManager_.reset(); + } dhtPeerConnector_.reset(); std::lock_guard<std::mutex> lk(sipConnsMtx_); sipConns_.clear(); @@ -2004,7 +2012,7 @@ JamiAccount::doRegister() + "conversations"); for (const auto& repository : conversationsRepositories) { try { - auto conv = std::make_unique<Conversation>(weak(), repository); + auto conv = std::make_shared<Conversation>(weak(), repository); std::lock_guard<std::mutex> lk(conversationsMtx_); conversations_.emplace(repository, std::move(conv)); } catch (const std::logic_error& e) { @@ -2113,6 +2121,7 @@ JamiAccount::trackBuddyPresence(const std::string& buddy_id, bool track) } } + std::lock_guard<std::mutex> lk(connManagerMtx_); for (const auto& device : devices) { if (connectionManager_) connectionManager_->closeConnectionsWith(device); @@ -2392,7 +2401,10 @@ JamiAccount::doRegister_() pendingSync_.emplace(deviceId); } - connectionManager().connectDevice(crt->getId(), + std::lock_guard<std::mutex> lk(connManagerMtx_); + if (!connectionManager_) + return; + connectionManager_->connectDevice(crt->getId(), "sync://" + deviceId, [this](std::shared_ptr<ChannelSocket> socket, const DeviceId& deviceId) { @@ -2407,6 +2419,7 @@ JamiAccount::doRegister_() }); // Init connection manager + std::unique_lock<std::mutex> lkCM(connManagerMtx_); if (!connectionManager_) connectionManager_ = std::make_unique<ConnectionManager>(*this); connectionManager_->onDhtConnected(DeviceId(accountManager_->getInfo()->deviceId)); @@ -2542,6 +2555,12 @@ JamiAccount::doRegister_() deviceId.to_c_str(), channel->channel()); auto gs = std::make_unique<GitServer>(accountId, conversationId, channel); + gs->setOnFetched([w = weak(), conversationId](const std::string&) { + auto shared = w.lock(); + if (!shared) + return; + shared->removeRepository(conversationId, true); + }); const dht::Value::Id serverId = ValueIdDist()(rand); { std::lock_guard<std::mutex> lk(gitServersMtx_); @@ -2560,6 +2579,7 @@ JamiAccount::doRegister_() } } }); + lkCM.unlock(); // Listen for incoming calls callKey_ = dht::InfoHash::get("callto:" + accountManager_->getInfo()->deviceId); @@ -3447,8 +3467,11 @@ JamiAccount::sendTextMessage(const std::string& to, std::unique_lock<std::mutex> lk(acc->sipConnsMtx_); acc->sipConns_.erase(std::make_pair(c->to, c->deviceId)); } - if (acc->connectionManager_) - acc->connectionManager_->closeConnectionsWith(c->deviceId); + { + std::lock_guard<std::mutex> lk(acc->connManagerMtx_); + if (acc->connectionManager_) + acc->connectionManager_->closeConnectionsWith(c->deviceId); + } // This MUST be done after closing the connection to avoid race condition // with messageEngine_ acc->messageEngine_.onMessageSent(c->to, c->id, false); @@ -3552,8 +3575,10 @@ JamiAccount::sendTextMessage(const std::string& to, std::unique_lock<std::mutex> l(confirm->lock); auto lt = confirm->listenTokens.find(h); if (lt != confirm->listenTokens.end()) { - dht_->cancelListen(h, std::move(lt->second)); + std::shared_future<size_t> tok = std::move(lt->second); confirm->listenTokens.erase(lt); + l.unlock(); + dht_->cancelListen(h, tok); } if (confirm->listenTokens.empty() and not confirm->replied) { l.unlock(); @@ -3926,19 +3951,31 @@ JamiAccount::handlePendingConversations() // Clone and store conversation try { auto conversationId = it->first; - auto conversation = std::make_unique<Conversation>(weak(), + auto conversation = std::make_shared<Conversation>(weak(), it->second.deviceId, conversationId); if (conversation) { + auto commitId = conversation->join(); ConvInfo info; info.id = conversationId; info.created = std::time(nullptr); convInfos_.emplace_back(info); - saveConvInfos(); { std::lock_guard<std::mutex> lk(conversationsMtx_); conversations_.emplace(conversationId, std::move(conversation)); } + if (!commitId.empty()) { + runOnMainThread([w = weak(), conversationId, commitId]() { + if (auto shared = w.lock()) { + std::lock_guard<std::mutex> lk(shared->conversationsMtx_); + auto it = shared->conversations_.find(conversationId); + // Do not sync as it's synched by convInfos + if (it != shared->conversations_.end()) + shared->sendMessageNotification(*it->second, commitId, false); + } + }); + } + saveConvInfos(); // Inform user that the conversation is ready emitSignal<DRing::ConversationSignal::ConversationReady>(accountID_, conversationId); @@ -3970,6 +4007,55 @@ JamiAccount::declineConversationRequest(const std::string& conversationId) bool JamiAccount::removeConversation(const std::string& conversationId) { + std::unique_lock<std::mutex> lk(conversationsMtx_); + auto it = conversations_.find(conversationId); + if (it == conversations_.end()) { + JAMI_ERR("Conversation %s doesn't exist", conversationId.c_str()); + return false; + } + auto members = it->second->getMembers(); + auto hasMembers = !(members.size() == 1 + && username_.find(members[0]["uri"]) != std::string::npos); + // Update convInfos + for (auto& info : convInfos_) { + if (info.id == conversationId) { + info.removed = std::time(nullptr); + saveConvInfos(); + if (hasMembers) { + // Sync now, because it can take some time to really removes the datas + runOnMainThread([w = weak()]() { + // Invite connected devices for the same user + auto shared = w.lock(); + if (!shared or !shared->accountManager_) + return; + + // Send to connected devices + shared->syncWithConnected(); + }); + } + break; + } + } + auto commitId = it->second->leave(); + emitSignal<DRing::ConversationSignal::ConversationRemoved>(accountID_, conversationId); + if (hasMembers) { + JAMI_DBG() << "Wait that someone sync that user left conversation " << conversationId; + // Commit that we left + if (!commitId.empty()) { + // Do not sync as it's synched by convInfos + sendMessageNotification(*it->second, commitId, false); + } else { + JAMI_ERR("Failed to send message to conversation %s", conversationId.c_str()); + } + // In this case, we wait that another peer sync the conversation + // to definitely remove it from the device. This is to inform the + // peer that we left the conversation and never want to receives + // any messages + return true; + } + lk.unlock(); + // Else we are the last member, so we can remove + removeRepository(conversationId, true); return true; } @@ -3979,7 +4065,9 @@ JamiAccount::getConversations() std::vector<std::string> result; std::lock_guard<std::mutex> lk(conversationsMtx_); result.reserve(conversations_.size()); - for (const auto& [key, _] : conversations_) { + for (const auto& [key, conv] : conversations_) { + if (conv->isRemoving()) + continue; result.emplace_back(key); } return result; @@ -4019,14 +4107,22 @@ JamiAccount::addConversationMember(const std::string& conversationId, JAMI_WARN("Couldn't add %s to %s", contactUri.c_str(), conversationId.c_str()); return false; } - auto messages = it->second->loadMessages(commitId, 1); - if (messages.empty()) - return false; // should not happen - emitSignal<DRing::ConversationSignal::MessageReceived>(getAccountID(), - conversationId, - messages.front()); - if (sendRequest) - sendTextMessage(contactUri, it->second->generateInvitation()); + it->second->loadMessages([w=weak(), conversationId, sendRequest, contactUri, commitId](auto&& messages) { + auto shared = w.lock(); + if (not shared or messages.empty()) + return; // should not happen + std::lock_guard<std::mutex> lk(shared->conversationsMtx_); + // Add a new member in the conversation + auto it = shared->conversations_.find(conversationId); + if (it == shared->conversations_.end()) { + return; + } + auto message = messages.front(); + emitSignal<DRing::ConversationSignal::MessageReceived>(shared->getAccountID(), conversationId, message); + if (sendRequest) + shared->sendTextMessage(contactUri, it->second->generateInvitation()); + shared->sendMessageNotification(*it->second, commitId, true); + }, commitId, 1); return true; } @@ -4061,28 +4157,15 @@ JamiAccount::sendMessage(const std::string& conversationId, auto conversation = conversations_.find(conversationId); if (conversation != conversations_.end() && conversation->second) { auto commitId = conversation->second->sendMessage(message, type, parent); - // TODO make async/non blocking - auto messages = conversation->second->loadMessages(commitId, 1); - if (!messages.empty()) { - emitSignal<DRing::ConversationSignal::MessageReceived>(getAccountID(), - conversationId, - messages.front()); - } if (!commitId.empty()) { - Json::Value message; - message["id"] = conversationId; - message["commit"] = commitId; - message["deviceId"] = std::string(currentDeviceId()); - Json::StreamWriterBuilder builder; - const auto text = Json::writeString(builder, message); - for (const auto& members : conversation->second->getMembers()) { - auto uri = members.at("uri"); - if (username_.find(uri) != std::string::npos) - continue; - // Announce to all members that a new message is sent - if (announce) - sendTextMessage(uri, {{"application/im-gitmessage-id", text}}); - } + conversation->second->loadMessages([w=weak(), conversationId] (auto&& messages) { + auto shared = w.lock(); + if (shared && !messages.empty()) + emitSignal<DRing::ConversationSignal::MessageReceived>(shared->getAccountID(), + conversationId, + messages.front()); + }, commitId, 1); + sendMessageNotification(*conversation->second, commitId, true); } else { JAMI_ERR("Failed to send message to conversation %s", conversationId.c_str()); } @@ -4097,19 +4180,18 @@ JamiAccount::loadConversationMessages(const std::string& conversationId, if (!isConversation(conversationId)) return 0; const uint32_t id = LoadIdDist()(rand); - // loadMessages will perform a git log that can take quite some time, so to avoid any lock, run - // it the threadpool - dht::ThreadPool::io().run([this, conversationId, fromMessage, n, id] { - std::lock_guard<std::mutex> lk(conversationsMtx_); - auto conversation = conversations_.find(conversationId); - if (conversation != conversations_.end() && conversation->second) { - auto messages = conversation->second->loadMessages(fromMessage, n); - emitSignal<DRing::ConversationSignal::ConversationLoaded>(id, - accountID_, - conversationId, - messages); - } - }); + std::lock_guard<std::mutex> lk(conversationsMtx_); + auto conversation = conversations_.find(conversationId); + if (conversation != conversations_.end() && conversation->second) { + conversation->second->loadMessages([w=weak(), conversationId, id](auto&& messages) { + if (auto shared = w.lock()) { + emitSignal<DRing::ConversationSignal::ConversationLoaded>(id, + shared->getAccountID(), + conversationId, + messages); + } + }, fromMessage, n); + } return id; } @@ -4119,44 +4201,46 @@ JamiAccount::onNewGitCommit(const std::string& peer, const std::string& conversationId, const std::string& commitId) { + for (auto& info : convInfos_) + if (info.id == conversationId) + if (info.removed) // ignore new commits for removed conversation + return; JAMI_DBG("[Account %s] on new commit notification from %s, for %s, commit %s", getAccountID().c_str(), peer.c_str(), conversationId.c_str(), commitId.c_str()); - fetchNewCommits(peer, deviceId, conversationId); + fetchNewCommits(peer, deviceId, conversationId, commitId); } void JamiAccount::fetchNewCommits(const std::string& peer, const std::string& deviceId, - const std::string& conversationId) + const std::string& conversationId, + const std::string& commitId) { + std::unique_lock<std::mutex> lk(conversationsMtx_); auto conversation = conversations_.find(conversationId); if (conversation != conversations_.end() && conversation->second) { - if (!conversation->second->isMember(peer)) { - JAMI_WARN("%s is not a member of %s", peer.c_str(), conversationId.c_str()); + // Retrieve current last message + auto lastMessageId = conversation->second->lastCommitId(); + if (lastMessageId.empty()) { + JAMI_ERR("[Account %s] No message detected. This is a bug", getAccountID().c_str()); return; } - // Retrieve current last message - std::string lastMessageId = ""; - auto lastMessage = conversation->second->loadMessages("", 1); - if (lastMessage.empty()) - JAMI_ERR("No message detected. This is a bug"); - else - lastMessageId = lastMessage.front().at("id"); - - auto announceMessages = [w = weak(), conversationId, lastMessageId]() { + auto announceMessages = [w = weak(), conversationId](const std::vector<std::map<std::string, std::string>>& messages) { auto shared = w.lock(); if (!shared) return; + std::unique_lock<std::mutex> lk(shared->conversationsMtx_); auto conversation = shared->conversations_.find(conversationId); if (conversation != shared->conversations_.end() && conversation->second) { // Do a diff between last message, and current new message when merged - auto messages = conversation->second->loadMessages("", lastMessageId); + lk.unlock(); for (const auto& message : messages) { - JAMI_DBG("New message received for conversation %s with id %s", + JAMI_DBG("[Account %s] New message received for conversation %s with id %s", + shared->getAccountID().c_str(), conversationId.c_str(), message.at("id").c_str()); emitSignal<DRing::ConversationSignal::MessageReceived>(shared->getAccountID(), @@ -4164,57 +4248,105 @@ JamiAccount::fetchNewCommits(const std::string& peer, message); } } else { - JAMI_WARN("Unknown conversation %s", conversationId.c_str()); + JAMI_WARN("[Account %s] Unknown conversation %s", + shared->getAccountID().c_str(), + conversationId.c_str()); } }; if (gitSocket(deviceId, conversationId)) { - // If the git socket exists, we can fetch from it - if (!conversation->second->fetchFrom(deviceId)) { - JAMI_WARN("Could not fetch new commit from %s for %s", - deviceId.c_str(), - conversationId.c_str()); - removeGitSocket(deviceId, conversationId); - } - auto merged = conversation->second->mergeHistory(deviceId); - if (merged) - announceMessages(); + conversation->second + ->pull(deviceId, + [deviceId, + conversationId, + w = weak(), + announceMessages = std::move(announceMessages)](bool ok, auto messages) { + auto shared = w.lock(); + if (!shared) + return; + if (!ok) { + JAMI_WARN("[Account %s] Could not fetch new commit from %s for %s", + shared->getAccountID().c_str(), + deviceId.c_str(), + conversationId.c_str()); + shared->removeGitSocket(deviceId, conversationId); + } + if (!messages.empty()) + announceMessages(messages); + }, commitId); } else { + lk.unlock(); // Else we need to add a new gitSocket { std::lock_guard<std::mutex> lk(pendingConversationsFetchMtx_); pendingConversationsFetch_[conversationId] = PendingConversationFetch {}; } - connectionManager().connectDevice( + std::lock_guard<std::mutex> lkCM(connManagerMtx_); + if (!connectionManager_) + return; + connectionManager_->connectDevice( DeviceId(deviceId), "git://" + deviceId + "/" + conversationId, [this, - conversation, conversationId, + commitId, announceMessages = std::move( announceMessages)](std::shared_ptr<ChannelSocket> socket, const DeviceId& deviceId) { - if (socket) { - addGitSocket(deviceId.toString(), conversationId, socket); - if (!conversation->second->fetchFrom(deviceId.toString())) - JAMI_WARN("Could not fetch new commit from %s for %s", - deviceId.to_c_str(), - conversationId.c_str()); - auto merged = conversation->second->mergeHistory(deviceId.toString()); - if (merged) - announceMessages(); - } else { - JAMI_ERR("Couldn't open a new git channel with %s for conversation %s", - deviceId.to_c_str(), - conversationId.c_str()); - } - { - std::lock_guard<std::mutex> lk(pendingConversationsFetchMtx_); - pendingConversationsFetch_.erase(conversationId); - } + dht::ThreadPool::io().run([w = weak(), + conversationId, + socket = std::move(socket), + deviceId, + commitId, + announceMessages = std::move(announceMessages)] { + auto shared = w.lock(); + if (!shared) + return; + std::unique_lock<std::mutex> lk(shared->conversationsMtx_); + auto conversation = shared->conversations_.find(conversationId); + if (!conversation->second) + return; + if (socket) { + shared->addGitSocket(deviceId.toString(), conversationId, socket); + + conversation->second + ->pull(deviceId.toString(), + [deviceId, + conversationId, + w, + announceMessages = std::move( + announceMessages)](bool ok, auto messages) { + auto shared = w.lock(); + if (!shared) + return; + if (!ok) { + JAMI_WARN("[Account %s] Could not fetch new commit " + "from %s for %s", + shared->getAccountID().c_str(), + deviceId.to_c_str(), + conversationId.c_str()); + shared->removeGitSocket(deviceId.toString(), + conversationId); + } + if (!messages.empty()) + announceMessages(messages); + }, commitId); + } else { + JAMI_ERR("[Account %s] Couldn't open a new git channel with %s for " + "conversation %s", + shared->getAccountID().c_str(), + deviceId.to_c_str(), + conversationId.c_str()); + } + { + std::lock_guard<std::mutex> lk(shared->pendingConversationsFetchMtx_); + shared->pendingConversationsFetch_.erase(conversationId); + } + }); }); } } else { + lk.unlock(); { // Check if the conversation is still a request std::lock_guard<std::mutex> lk(conversationsRequestsMtx_); @@ -4227,7 +4359,9 @@ JamiAccount::fetchNewCommits(const std::string& peer, if (pendingConversationsFetch_.find(conversationId) != pendingConversationsFetch_.end()) return; } - JAMI_WARN("Could not find conversation %s", conversationId.c_str()); + JAMI_WARN("[Account %s] Could not find conversation %s", + getAccountID().c_str(), + conversationId.c_str()); } } @@ -4377,6 +4511,7 @@ JamiAccount::requestSIPConnection(const std::string& peerId, const DeviceId& dev JAMI_INFO("[Account %s] Ask %s for a new SIP channel", getAccountID().c_str(), deviceId.to_c_str()); + std::lock_guard<std::mutex> lkCM(connManagerMtx_); if (!connectionManager_) return; connectionManager_->connectDevice(deviceId, @@ -4687,6 +4822,7 @@ JamiAccount::cacheSyncConnection(std::shared_ptr<ChannelSocket>&& socket, std::lock_guard<std::mutex> lk(conversationsRequestsMtx_); conversationsRequests_.erase(convId); } + auto itConv = conversations_.find(convId); if (not removed) { if (!isConversation(convId)) { { @@ -4696,8 +4832,10 @@ JamiAccount::cacheSyncConnection(std::shared_ptr<ChannelSocket>&& socket, return len; pendingConversationsFetch_[convId] = PendingConversationFetch {}; } - - connectionManager().connectDevice( + std::lock_guard<std::mutex> lkCM(connManagerMtx_); + if (!connectionManager_) + return len; + connectionManager_->connectDevice( DeviceId(deviceId), std::string("git://").append(deviceId).append("/").append(convId), [this, convId](std::shared_ptr<ChannelSocket> socket, @@ -4731,9 +4869,22 @@ JamiAccount::cacheSyncConnection(std::shared_ptr<ChannelSocket>&& socket, convId.c_str()); } } else { + { + std::lock_guard<std::mutex> lk(conversationsMtx_); + auto itConv = conversations_.find(convId); + if (itConv != conversations_.end() && !itConv->second->isRemoving()) { + emitSignal<DRing::ConversationSignal::ConversationRemoved>(accountID_, convId); + itConv->second->setRemovingFlag(); + } + } for (auto& info : convInfos_) { if (info.id == convId) { info.removed = std::time(nullptr); + if (jsonConv.isMember("erased")) { + info.erased = std::time(nullptr); + removeRepository(convId, false); + } + break; } } } @@ -4851,8 +5002,14 @@ JamiAccount::loadConvInfos() return; } - for (auto& info : convInfo) + for (auto& info : convInfo) { convInfos_.emplace_back(info); + std::lock_guard<std::mutex> lk(conversationsMtx_); + auto itConv = conversations_.find(info.id); + if (itConv != conversations_.end() && info.removed) { + itConv->second->setRemovingFlag(); + } + } } void @@ -4886,4 +5043,54 @@ JamiAccount::saveConvRequests() msgpack::pack(file, conversationsRequests_); } +void +JamiAccount::removeRepository(const std::string& conversationId, bool sync) +{ + std::unique_lock<std::mutex> lk(conversationsMtx_); + auto it = conversations_.find(conversationId); + if (it != conversations_.end() && it->second && it->second->isRemoving()) { + JAMI_DBG() << "Remove conversation: " << conversationId; + it->second->erase(); + conversations_.erase(it); + lk.unlock(); + // Update convInfos + if (!sync) + return; + for (auto& info : convInfos_) { + if (info.id == conversationId) { + info.erased = std::time(nullptr); + saveConvInfos(); + runOnMainThread([w = weak()]() { + // Send to connected devices + if (auto shared = w.lock()) + shared->syncWithConnected(); + }); + break; + } + } + } +} + +void +JamiAccount::sendMessageNotification(const Conversation& conversation, + const std::string& commitId, + bool sync) +{ + Json::Value message; + message["id"] = conversation.id(); + message["commit"] = commitId; + // TODO avoid lookup + message["deviceId"] = std::string(currentDeviceId()); + Json::StreamWriterBuilder builder; + const auto text = Json::writeString(builder, message); + for (const auto& members : conversation.getMembers()) { + auto uri = members.at("uri"); + // Do not send to ourself, it's synced via convInfos + if (!sync && username_.find(uri) != std::string::npos) + continue; + // Announce to all members that a new message is sent + sendTextMessage(uri, {{"application/im-gitmessage-id", text}}); + } +} + } // namespace jami diff --git a/src/jamidht/jamiaccount.h b/src/jamidht/jamiaccount.h index a232911b04120bd1cdd80e018a99ef9685df3a11..6fcc1bfb2d57cfbae01551020f1c3ee4c1e5127d 100644 --- a/src/jamidht/jamiaccount.h +++ b/src/jamidht/jamiaccount.h @@ -510,8 +510,8 @@ public: std::string startConversation(); void acceptConversationRequest(const std::string& conversationId); void declineConversationRequest(const std::string& conversationId); - bool removeConversation(const std::string& conversationId); std::vector<std::string> getConversations(); + bool removeConversation(const std::string& conversationId); std::vector<std::map<std::string, std::string>> getConversationRequests(); // Member management @@ -537,9 +537,17 @@ public: const std::string& deviceId, const std::string& conversationId, const std::string& commitId) override; + /** + * Pull remote device (do not do it if commitId is already in the current repo) + * @param peer Contact URI + * @param deviceId Contact's device + * @param conversationId + * @param commitId (optional) + */ void fetchNewCommits(const std::string& peer, const std::string& deviceId, - const std::string& conversationId); + const std::string& conversationId, + const std::string& commitId = ""); // Invites void onConversationRequest(const std::string& from, const Json::Value&) override; @@ -740,7 +748,7 @@ private: /** Conversations */ mutable std::mutex conversationsMtx_ {}; - std::map<std::string, std::unique_ptr<Conversation>> conversations_; + std::map<std::string, std::shared_ptr<Conversation>> conversations_; bool isConversation(const std::string& convId) const { std::lock_guard<std::mutex> lk(conversationsMtx_); @@ -804,6 +812,7 @@ private: pjsip_transport* via_tp_ {nullptr}; std::unique_ptr<DhtPeerConnector> dhtPeerConnector_; + mutable std::mutex connManagerMtx_ {}; std::unique_ptr<ConnectionManager> connectionManager_; GitSocketList gitSocketList_ {}; @@ -934,6 +943,23 @@ private: void syncInfos(const std::shared_ptr<ChannelSocket>& socket); void syncWithConnected(); std::atomic_bool needsConvSync_ {true}; + + /** + * Remove a repository and all files + * @param convId + * @param sync If we send an update to other account's devices + */ + void removeRepository(const std::string& convId, bool sync); + + /** + * Send a message notification to all members + * @param conversation + * @param commit + * @param sync If we send an update to other account's devices + */ + void sendMessageNotification(const Conversation& conversation, + const std::string& commitId, + bool sync); }; static inline std::ostream& diff --git a/test/unitTest/conversation/conversation.cpp b/test/unitTest/conversation/conversation.cpp index b6259fac94b6c6aad86bfac0ea2ce8798042e72f..3aff208e79ca4d2f04428325f62247f5e56addc5 100644 --- a/test/unitTest/conversation/conversation.cpp +++ b/test/unitTest/conversation/conversation.cpp @@ -57,6 +57,10 @@ public: private: void testCreateConversation(); void testGetConversation(); + void testGetConversationsAfterRm(); + void testRemoveInvalidConversation(); + void testRemoveConversationNoMember(); + void testRemoveConversationWithMember(); void testAddMember(); void testAddOfflineMemberThenConnects(); void testGetMembers(); @@ -71,6 +75,10 @@ private: CPPUNIT_TEST_SUITE(ConversationTest); CPPUNIT_TEST(testCreateConversation); CPPUNIT_TEST(testGetConversation); + CPPUNIT_TEST(testGetConversationsAfterRm); + CPPUNIT_TEST(testRemoveInvalidConversation); + CPPUNIT_TEST(testRemoveConversationNoMember); + CPPUNIT_TEST(testRemoveConversationWithMember); CPPUNIT_TEST(testAddMember); CPPUNIT_TEST(testAddOfflineMemberThenConnects); CPPUNIT_TEST(testGetMembers); @@ -224,6 +232,186 @@ ConversationTest::testGetConversation() CPPUNIT_ASSERT(conversations.front() == convId); } +void +ConversationTest::testGetConversationsAfterRm() +{ + auto aliceAccount = Manager::instance().getAccount<JamiAccount>(aliceId); + auto aliceDeviceId = aliceAccount->currentDeviceId(); + auto uri = aliceAccount->getUsername(); + + std::mutex mtx; + std::unique_lock<std::mutex> lk {mtx}; + std::condition_variable cv; + std::map<std::string, std::shared_ptr<DRing::CallbackWrapperBase>> confHandlers; + bool conversationReady = false; + confHandlers.insert(DRing::exportable_callback<DRing::ConversationSignal::ConversationReady>( + [&](const std::string& accountId, const std::string& /* conversationId */) { + if (accountId == aliceId) { + conversationReady = true; + cv.notify_one(); + } + })); + DRing::registerSignalHandlers(confHandlers); + + // Start conversation + auto convId = aliceAccount->startConversation(); + CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(30), [&]() { return conversationReady; })); + + auto conversations = aliceAccount->getConversations(); + CPPUNIT_ASSERT(conversations.size() == 1); + CPPUNIT_ASSERT(aliceAccount->removeConversation(convId)); + conversations = aliceAccount->getConversations(); + CPPUNIT_ASSERT(conversations.size() == 0); +} + +void +ConversationTest::testRemoveInvalidConversation() +{ + auto aliceAccount = Manager::instance().getAccount<JamiAccount>(aliceId); + auto aliceDeviceId = aliceAccount->currentDeviceId(); + auto uri = aliceAccount->getUsername(); + + std::mutex mtx; + std::unique_lock<std::mutex> lk {mtx}; + std::condition_variable cv; + std::map<std::string, std::shared_ptr<DRing::CallbackWrapperBase>> confHandlers; + bool conversationReady = false; + confHandlers.insert(DRing::exportable_callback<DRing::ConversationSignal::ConversationReady>( + [&](const std::string& accountId, const std::string& /* conversationId */) { + if (accountId == aliceId) { + conversationReady = true; + cv.notify_one(); + } + })); + DRing::registerSignalHandlers(confHandlers); + + // Start conversation + auto convId = aliceAccount->startConversation(); + CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(30), [&]() { return conversationReady; })); + + auto conversations = aliceAccount->getConversations(); + CPPUNIT_ASSERT(conversations.size() == 1); + CPPUNIT_ASSERT(!aliceAccount->removeConversation("foo")); + conversations = aliceAccount->getConversations(); + CPPUNIT_ASSERT(conversations.size() == 1); +} + +void +ConversationTest::testRemoveConversationNoMember() +{ + auto aliceAccount = Manager::instance().getAccount<JamiAccount>(aliceId); + auto aliceDeviceId = aliceAccount->currentDeviceId(); + auto uri = aliceAccount->getUsername(); + + std::mutex mtx; + std::unique_lock<std::mutex> lk {mtx}; + std::condition_variable cv; + std::map<std::string, std::shared_ptr<DRing::CallbackWrapperBase>> confHandlers; + bool conversationReady = false; + confHandlers.insert(DRing::exportable_callback<DRing::ConversationSignal::ConversationReady>( + [&](const std::string& accountId, const std::string& /* conversationId */) { + if (accountId == aliceId) { + conversationReady = true; + cv.notify_one(); + } + })); + DRing::registerSignalHandlers(confHandlers); + + // Start conversation + auto convId = aliceAccount->startConversation(); + CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(30), [&]() { return conversationReady; })); + + // Assert that repository exists + auto repoPath = fileutils::get_data_dir() + DIR_SEPARATOR_STR + aliceAccount->getAccountID() + + DIR_SEPARATOR_STR + "conversations" + DIR_SEPARATOR_STR + convId; + CPPUNIT_ASSERT(fileutils::isDirectory(repoPath)); + + auto conversations = aliceAccount->getConversations(); + CPPUNIT_ASSERT(conversations.size() == 1); + // Removing the conversation will erase all related files + CPPUNIT_ASSERT(aliceAccount->removeConversation(convId)); + conversations = aliceAccount->getConversations(); + CPPUNIT_ASSERT(conversations.size() == 0); + CPPUNIT_ASSERT(!fileutils::isDirectory(repoPath)); +} + +void +ConversationTest::testRemoveConversationWithMember() +{ + auto aliceAccount = Manager::instance().getAccount<JamiAccount>(aliceId); + auto bobAccount = Manager::instance().getAccount<JamiAccount>(bobId); + auto bobUri = bobAccount->getUsername(); + auto convId = aliceAccount->startConversation(); + + std::mutex mtx; + std::unique_lock<std::mutex> lk {mtx}; + std::condition_variable cv; + std::map<std::string, std::shared_ptr<DRing::CallbackWrapperBase>> confHandlers; + bool conversationReady = false, requestReceived = false, memberMessageGenerated = false, + bobSeeAliceRemoved = false; + confHandlers.insert( + DRing::exportable_callback<DRing::ConversationSignal::ConversationRequestReceived>( + [&](const std::string& /*accountId*/, + const std::string& /* conversationId */, + std::map<std::string, std::string> /*metadatas*/) { + requestReceived = true; + cv.notify_one(); + })); + confHandlers.insert(DRing::exportable_callback<DRing::ConversationSignal::ConversationReady>( + [&](const std::string& accountId, const std::string& /* conversationId */) { + if (accountId == bobId) { + conversationReady = true; + cv.notify_one(); + } + })); + confHandlers.insert(DRing::exportable_callback<DRing::ConversationSignal::MessageReceived>( + [&](const std::string& accountId, + const std::string& conversationId, + std::map<std::string, std::string> message) { + if (accountId == aliceId && conversationId == convId && message["type"] == "member") { + memberMessageGenerated = true; + cv.notify_one(); + } else if (accountId == bobId && conversationId == convId + && message["type"] == "member") { + bobSeeAliceRemoved = true; + cv.notify_one(); + } + })); + DRing::registerSignalHandlers(confHandlers); + + CPPUNIT_ASSERT(aliceAccount->addConversationMember(convId, bobUri)); + CPPUNIT_ASSERT(memberMessageGenerated); + + // Assert that repository exists + auto repoPath = fileutils::get_data_dir() + DIR_SEPARATOR_STR + aliceAccount->getAccountID() + + DIR_SEPARATOR_STR + "conversations" + DIR_SEPARATOR_STR + convId; + CPPUNIT_ASSERT(fileutils::isDirectory(repoPath)); + // Check created files + auto bobInvitedFile = repoPath + DIR_SEPARATOR_STR + "invited" + DIR_SEPARATOR_STR + bobUri + + ".crt"; + CPPUNIT_ASSERT(fileutils::isFile(bobInvitedFile)); + + CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(30), [&]() { return requestReceived; })); + memberMessageGenerated = false; + bobAccount->acceptConversationRequest(convId); + CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(30), [&]() { return conversationReady; })); + auto clonedPath = fileutils::get_data_dir() + DIR_SEPARATOR_STR + bobAccount->getAccountID() + + DIR_SEPARATOR_STR + "conversations" + DIR_SEPARATOR_STR + convId; + CPPUNIT_ASSERT(fileutils::isDirectory(clonedPath)); + bobInvitedFile = clonedPath + DIR_SEPARATOR_STR + "invited" + DIR_SEPARATOR_STR + bobUri + + ".crt"; + CPPUNIT_ASSERT(!fileutils::isFile(bobInvitedFile)); + // Remove conversation from alice once member confirmed + CPPUNIT_ASSERT( + cv.wait_for(lk, std::chrono::seconds(30), [&]() { return memberMessageGenerated; })); + + bobSeeAliceRemoved = false; + aliceAccount->removeConversation(convId); + CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(30), [&]() { return bobSeeAliceRemoved; })); + std::this_thread::sleep_for(std::chrono::seconds(3)); + CPPUNIT_ASSERT(!fileutils::isDirectory(repoPath)); +} + void ConversationTest::testAddMember() { diff --git a/test/unitTest/syncHistory/syncHistory.cpp b/test/unitTest/syncHistory/syncHistory.cpp index 1b2298fd513392ee8a418eceadc52be22dfc48b5..8fec5e149ccb9782ebf6317160c068a3396f68a5 100644 --- a/test/unitTest/syncHistory/syncHistory.cpp +++ b/test/unitTest/syncHistory/syncHistory.cpp @@ -60,12 +60,14 @@ private: void testCreateConversationWithOnlineDevice(); void testCreateConversationWithMessagesThenAddDevice(); void testReceivesInviteThenAddDevice(); + void testRemoveConversationOnAllDevices(); CPPUNIT_TEST_SUITE(SyncHistoryTest); CPPUNIT_TEST(testCreateConversationThenSync); CPPUNIT_TEST(testCreateConversationWithOnlineDevice); CPPUNIT_TEST(testCreateConversationWithMessagesThenAddDevice); CPPUNIT_TEST(testReceivesInviteThenAddDevice); + CPPUNIT_TEST(testRemoveConversationOnAllDevices); CPPUNIT_TEST_SUITE_END(); }; @@ -200,12 +202,7 @@ SyncHistoryTest::testCreateConversationThenSync() } })); DRing::registerSignalHandlers(confHandlers); - cv.wait_for(lk, std::chrono::seconds(30)); - DRing::unregisterSignalHandlers(); - confHandlers.clear(); - - // Check if conversation is ready - CPPUNIT_ASSERT(conversationReady); + CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(30), [&] { return conversationReady; })); } void @@ -257,12 +254,8 @@ SyncHistoryTest::testCreateConversationWithOnlineDevice() } })); DRing::registerSignalHandlers(confHandlers); - cv.wait_for(lk, std::chrono::seconds(30)); + CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(30), [&] { return conversationReady; })); DRing::unregisterSignalHandlers(); - confHandlers.clear(); - - // Check if conversation is ready - CPPUNIT_ASSERT(conversationReady); } void @@ -271,9 +264,9 @@ SyncHistoryTest::testCreateConversationWithMessagesThenAddDevice() auto aliceAccount = Manager::instance().getAccount<JamiAccount>(aliceId); // Start conversation auto convId = aliceAccount->startConversation(); - aliceAccount->sendMessage(convId, "Message 1"); - aliceAccount->sendMessage(convId, "Message 2"); - aliceAccount->sendMessage(convId, "Message 3"); + aliceAccount->sendMessage(convId, std::string("Message 1")); + aliceAccount->sendMessage(convId, std::string("Message 2")); + aliceAccount->sendMessage(convId, std::string("Message 3")); // Now create alice2 auto aliceArchive = std::filesystem::current_path().string() + "/alice.gz"; @@ -311,7 +304,6 @@ SyncHistoryTest::testCreateConversationWithMessagesThenAddDevice() } })); DRing::registerSignalHandlers(confHandlers); - DRing::unregisterSignalHandlers(); confHandlers.clear(); // Check if conversation is ready @@ -376,10 +368,9 @@ SyncHistoryTest::testReceivesInviteThenAddDevice() })); DRing::registerSignalHandlers(confHandlers); - cv.wait_for(lk, std::chrono::seconds(30)); + CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(30), [&] { return requestReceived; })); DRing::unregisterSignalHandlers(); confHandlers.clear(); - CPPUNIT_ASSERT(requestReceived); // Now create alice2 std::map<std::string, std::string> details = DRing::getAccountTemplate("RING"); @@ -405,10 +396,71 @@ SyncHistoryTest::testReceivesInviteThenAddDevice() })); DRing::registerSignalHandlers(confHandlers); + CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(30), [&] { return requestReceived; })); + DRing::unregisterSignalHandlers(); +} + +void +SyncHistoryTest::testRemoveConversationOnAllDevices() +{ + auto aliceAccount = Manager::instance().getAccount<JamiAccount>(aliceId); + + // Now create alice2 + auto aliceArchive = std::filesystem::current_path().string() + "/alice.gz"; + std::remove(aliceArchive.c_str()); + aliceAccount->exportArchive(aliceArchive); + std::map<std::string, std::string> details = DRing::getAccountTemplate("RING"); + details[ConfProperties::TYPE] = "RING"; + details[ConfProperties::DISPLAYNAME] = "ALICE2"; + details[ConfProperties::ALIAS] = "ALICE2"; + details[ConfProperties::UPNP_ENABLED] = "true"; + details[ConfProperties::ARCHIVE_PASSWORD] = ""; + details[ConfProperties::ARCHIVE_PIN] = ""; + details[ConfProperties::ARCHIVE_PATH] = aliceArchive; + alice2Id = Manager::instance().addAccount(details); + + std::mutex mtx; + std::unique_lock<std::mutex> lk {mtx}; + std::condition_variable cv; + std::map<std::string, std::shared_ptr<DRing::CallbackWrapperBase>> confHandlers; + confHandlers.insert( + DRing::exportable_callback<DRing::ConfigurationSignal::VolatileDetailsChanged>( + [&](const std::string&, const std::map<std::string, std::string>&) { + auto alice2Account = Manager::instance().getAccount<JamiAccount>(alice2Id); + auto details = alice2Account->getVolatileAccountDetails(); + auto daemonStatus = details[DRing::Account::ConfProperties::Registration::STATUS]; + if (daemonStatus == "REGISTERED") + cv.notify_one(); + })); + DRing::registerSignalHandlers(confHandlers); + cv.wait_for(lk, std::chrono::seconds(30)); DRing::unregisterSignalHandlers(); confHandlers.clear(); - CPPUNIT_ASSERT(requestReceived); + + // Start conversation now + auto convId = aliceAccount->startConversation(); + auto conversationReady = false, conversationRemoved = false; + confHandlers.insert(DRing::exportable_callback<DRing::ConversationSignal::ConversationReady>( + [&](const std::string& accountId, const std::string& conversationId) { + if (accountId == alice2Id && conversationId == convId) { + conversationReady = true; + cv.notify_one(); + } + })); + confHandlers.insert(DRing::exportable_callback<DRing::ConversationSignal::ConversationRemoved>( + [&](const std::string& accountId, const std::string& conversationId) { + if (accountId == alice2Id && conversationId == convId) { + conversationRemoved = true; + cv.notify_one(); + } + })); + DRing::registerSignalHandlers(confHandlers); + CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(30), [&] { return conversationReady; })); + aliceAccount->removeConversation(convId); + CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(30), [&] { return conversationRemoved; })); + + DRing::unregisterSignalHandlers(); } } // namespace test diff --git a/tools/dringctrl/controller.py b/tools/dringctrl/controller.py index 027d24e2ccbf33fe38006d05ca14069e6db68934..427732b29259c82b286373ef3b608c656bba6f54 100644 --- a/tools/dringctrl/controller.py +++ b/tools/dringctrl/controller.py @@ -734,6 +734,9 @@ class DRingCtrl(Thread): def sendMessage(self, account, conversationId, message, parent=''): return self.configurationmanager.sendMessage(account, conversationId, message, parent) + def removeConversation(self, account, conversationId): + return self.configurationmanager.removeConversation(account, conversationId) + def run(self): """Processing method for this thread""" diff --git a/tools/dringctrl/swarm.py b/tools/dringctrl/swarm.py index 6e1a3af1adcc229fbbbb2a26a091848e5aa42df9..e3bc5c7004e772b7b727abf99355c10a53dd08e7 100644 --- a/tools/dringctrl/swarm.py +++ b/tools/dringctrl/swarm.py @@ -58,6 +58,7 @@ if __name__ == "__main__": 6. Decline request 7. Load messages 8. Send message +9. Remove conversation """) opt = int(input("> ")) if opt == 0: @@ -89,6 +90,9 @@ if __name__ == "__main__": conversationId = input('Conversation: ') message = input('Message: ') ctrl.sendMessage(args.account, conversationId, message) + elif opt == 9: + conversationId = input('Conversation: ') + ctrl.removeConversation(args.account, conversationId) else: print('Not implemented yet') ctrlThread.join()