diff --git a/src/jamidht/conversation.cpp b/src/jamidht/conversation.cpp index e8ad1b94465c73b15ae1e1c70df11dc8e9747a8a..0f6c193ac31b4de8fc96784224a450124c6209c2 100644 --- a/src/jamidht/conversation.cpp +++ b/src/jamidht/conversation.cpp @@ -19,6 +19,7 @@ */ #include "conversation.h" +#include "account_const.h" #include "fileutils.h" #include "jamiaccount.h" #include "client/ring_signal.h" @@ -162,9 +163,11 @@ public: + shared->getAccountID() + DIR_SEPARATOR_STR + "conversation_data" + DIR_SEPARATOR_STR + repository_->id(); fetchedPath_ = conversationDataPath_ + DIR_SEPARATOR_STR + "fetched"; + sendingPath_ = conversationDataPath_ + DIR_SEPARATOR_STR + "sending"; lastDisplayedPath_ = conversationDataPath_ + DIR_SEPARATOR_STR + ConversationMapKeys::LAST_DISPLAYED; loadFetched(); + loadSending(); loadLastDisplayed(); } } @@ -302,6 +305,25 @@ public: msgpack::pack(file, fetchedDevices_); } + void loadSending() + { + try { + // read file + auto file = fileutils::loadFile(sendingPath_); + // load values + msgpack::object_handle oh = msgpack::unpack((const char*) file.data(), file.size()); + std::lock_guard<std::mutex> lk {writeMtx_}; + oh.get().convert(sending_); + } catch (const std::exception& e) { + return; + } + } + void saveSending() + { + std::ofstream file(sendingPath_, std::ios::trunc | std::ios::binary); + msgpack::pack(file, sending_); + } + void loadLastDisplayed() const { try { @@ -361,7 +383,9 @@ public: std::string fetchedPath_ {}; std::mutex fetchedDevicesMtx_ {}; std::set<std::string> fetchedDevices_ {}; - // Manage last message displayed + // Manage last message displayed and status + std::string sendingPath_ {}; + std::vector<std::string> sending_ {}; std::string lastDisplayedPath_ {}; mutable std::mutex lastDisplayedMtx_ {}; // for lastDisplayed_ mutable std::map<std::string, std::string> lastDisplayed_ {}; @@ -714,11 +738,19 @@ Conversation::sendMessage(Json::Value&& value, const std::string& replyTo, OnDon wbuilder["indentation"] = ""; auto commit = sthis->pimpl_->repository_->commitMessage( Json::writeString(wbuilder, value)); + sthis->pimpl_->sending_.emplace_back(commit); + sthis->pimpl_->saveSending(); sthis->clearFetched(); lk.unlock(); + sthis->pimpl_->announce(commit); + emitSignal<DRing::ConfigurationSignal::AccountMessageStatusChanged>( + shared->getAccountID(), + sthis->id(), + shared->getUsername(), + commit, + static_cast<int>(DRing::Account::MessageStates::SENDING)); if (cb) cb(!commit.empty(), commit); - sthis->pimpl_->announce(commit); } }); } @@ -1183,11 +1215,38 @@ Conversation::needsFetch(const std::string& deviceId) const } void -Conversation::hasFetched(const std::string& deviceId) +Conversation::hasFetched(const std::string& deviceId, const std::string& commitId) { - std::lock_guard<std::mutex> lk(pimpl_->fetchedDevicesMtx_); - pimpl_->fetchedDevices_.emplace(deviceId); - pimpl_->saveFetched(); + dht::ThreadPool::io().run([w = weak(), deviceId, commitId]() { + auto sthis = w.lock(); + if (!sthis) + return; + { + std::lock_guard<std::mutex> lk(sthis->pimpl_->fetchedDevicesMtx_); + sthis->pimpl_->fetchedDevices_.emplace(deviceId); + sthis->pimpl_->saveFetched(); + } + // Update sent status + std::lock_guard<std::mutex> lk(sthis->pimpl_->writeMtx_); + auto itCommit = std::find(sthis->pimpl_->sending_.begin(), + sthis->pimpl_->sending_.end(), + commitId); + if (itCommit != sthis->pimpl_->sending_.end()) { + auto acc = sthis->pimpl_->account_.lock(); + // Clear fetched commits and mark it as announced + auto end = std::next(itCommit); + for (auto it = sthis->pimpl_->sending_.begin(); it != end; ++it) { + emitSignal<DRing::ConfigurationSignal::AccountMessageStatusChanged>( + acc->getAccountID(), + sthis->id(), + acc->getUsername(), + *it, + static_cast<int>(DRing::Account::MessageStates::SENT)); + } + sthis->pimpl_->sending_.erase(sthis->pimpl_->sending_.begin(), end); + sthis->pimpl_->saveSending(); + } + }); } bool diff --git a/src/jamidht/conversation.h b/src/jamidht/conversation.h index e9a914e2f8d14c49e535bec01d4a21854004449d..01d4e14889d5c622620d132dd989fbbafe2dea72 100644 --- a/src/jamidht/conversation.h +++ b/src/jamidht/conversation.h @@ -344,8 +344,9 @@ public: * Store informations about who fetch or not. This simplify sync (sync when a device without the * last fetch is detected) * @param deviceId + * @param commitId */ - void hasFetched(const std::string& deviceId); + void hasFetched(const std::string& deviceId, const std::string& commitId); /** * Store last read commit (returned in getMembers) diff --git a/src/jamidht/conversation_module.cpp b/src/jamidht/conversation_module.cpp index 4451b60fd0c23b63f72d2fa223dfba14d47637c4..f06459686d47b822a4735c94c634623bbd0fb8a0 100644 --- a/src/jamidht/conversation_module.cpp +++ b/src/jamidht/conversation_module.cpp @@ -1489,7 +1489,9 @@ ConversationModule::needsSyncingWith(const std::string& memberUri, const std::st } void -ConversationModule::setFetched(const std::string& conversationId, const std::string& deviceId) +ConversationModule::setFetched(const std::string& conversationId, + const std::string& deviceId, + const std::string& commitId) { auto remove = false; { @@ -1497,7 +1499,7 @@ ConversationModule::setFetched(const std::string& conversationId, const std::str auto it = pimpl_->conversations_.find(conversationId); if (it != pimpl_->conversations_.end() && it->second) { remove = it->second->isRemoving(); - it->second->hasFetched(deviceId); + it->second->hasFetched(deviceId, commitId); } } if (remove) diff --git a/src/jamidht/conversation_module.h b/src/jamidht/conversation_module.h index 4895641dfddd3d40ae864d2a4cc65a73d4b22f53..0875367b270578a33ed78bc3809237575d37a1c2 100644 --- a/src/jamidht/conversation_module.h +++ b/src/jamidht/conversation_module.h @@ -242,8 +242,11 @@ public: * the information) * @param conversationId Related conv * @param deviceId Device who synced + * @param commit HEAD synced */ - void setFetched(const std::string& conversationId, const std::string& deviceId); + void setFetched(const std::string& conversationId, + const std::string& deviceId, + const std::string& commit); /** * Launch fetch on new commit diff --git a/src/jamidht/jamiaccount.cpp b/src/jamidht/jamiaccount.cpp index 9d0a5ee683242e7b202de0db4cf4aa348ac69e7e..76df7d1293ba1827823024e025c043097fcb3a73 100644 --- a/src/jamidht/jamiaccount.cpp +++ b/src/jamidht/jamiaccount.cpp @@ -2242,10 +2242,13 @@ JamiAccount::doRegister_() deviceId.to_c_str(), channel->channel()); auto gs = std::make_unique<GitServer>(accountId, conversationId, channel); - gs->setOnFetched([w = weak(), conversationId, deviceId](const std::string&) { - if (auto shared = w.lock()) - shared->convModule()->setFetched(conversationId, deviceId.toString()); - }); + gs->setOnFetched( + [w = weak(), conversationId, deviceId](const std::string& commit) { + if (auto shared = w.lock()) + shared->convModule()->setFetched(conversationId, + deviceId.toString(), + commit); + }); const dht::Value::Id serverId = ValueIdDist()(rand); { std::lock_guard<std::mutex> lk(gitServersMtx_); diff --git a/test/unitTest/conversation/conversation.cpp b/test/unitTest/conversation/conversation.cpp index 6bb07fe232021e6a8e2293e2fb2ed646f99da062..2752eea4251518b659e81cf819174fe174d2c3fb 100644 --- a/test/unitTest/conversation/conversation.cpp +++ b/test/unitTest/conversation/conversation.cpp @@ -84,6 +84,7 @@ private: void testSendMessageToMultipleParticipants(); void testPingPongMessages(); void testIsComposing(); + void testMessageStatus(); void testSetMessageDisplayed(); void testSetMessageDisplayedTwice(); void testSetMessageDisplayedPreference(); @@ -125,6 +126,7 @@ private: CPPUNIT_TEST(testSendMessageToMultipleParticipants); CPPUNIT_TEST(testPingPongMessages); CPPUNIT_TEST(testIsComposing); + CPPUNIT_TEST(testMessageStatus); CPPUNIT_TEST(testSetMessageDisplayed); CPPUNIT_TEST(testSetMessageDisplayedTwice); CPPUNIT_TEST(testSetMessageDisplayedPreference); @@ -900,6 +902,83 @@ ConversationTest::testIsComposing() CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&]() { return !aliceComposing; })); } +void +ConversationTest::testMessageStatus() +{ + auto aliceAccount = Manager::instance().getAccount<JamiAccount>(aliceId); + auto bobAccount = Manager::instance().getAccount<JamiAccount>(bobId); + auto aliceUri = aliceAccount->getUsername(); + auto bobUri = bobAccount->getUsername(); + auto convId = DRing::startConversation(aliceId); + std::mutex mtx; + std::unique_lock<std::mutex> lk {mtx}; + std::condition_variable cv; + std::map<std::string, std::shared_ptr<DRing::CallbackWrapperBase>> confHandlers; + bool conversationReady = false, requestReceived = false, memberMessageGenerated = false; + confHandlers.insert( + DRing::exportable_callback<DRing::ConversationSignal::ConversationRequestReceived>( + [&](const std::string& /*accountId*/, + const std::string& /* conversationId */, + std::map<std::string, std::string> /*metadatas*/) { + requestReceived = true; + cv.notify_one(); + })); + confHandlers.insert(DRing::exportable_callback<DRing::ConversationSignal::ConversationReady>( + [&](const std::string& accountId, const std::string& conversationId) { + if (accountId == bobId && conversationId == convId) { + conversationReady = true; + cv.notify_one(); + } + })); + confHandlers.insert(DRing::exportable_callback<DRing::ConversationSignal::MessageReceived>( + [&](const std::string& accountId, + const std::string& conversationId, + std::map<std::string, std::string> message) { + if (accountId == aliceId && conversationId == convId) { + if (message["type"] == "member") + memberMessageGenerated = true; + cv.notify_one(); + } + })); + bool sending = false, sent = false; + confHandlers.insert( + DRing::exportable_callback<DRing::ConfigurationSignal::AccountMessageStatusChanged>( + [&](const std::string& accountId, + const std::string& conversationId, + const std::string& peer, + const std::string& msgId, + int status) { + if (accountId == aliceId && convId == conversationId) { + if (status == 2) + sending = true; + if (status == 3) + sent = true; + cv.notify_one(); + } + })); + DRing::registerSignalHandlers(confHandlers); + DRing::addConversationMember(aliceId, convId, bobUri); + CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&]() { return memberMessageGenerated; })); + // Assert that repository exists + auto repoPath = fileutils::get_data_dir() + DIR_SEPARATOR_STR + aliceAccount->getAccountID() + + DIR_SEPARATOR_STR + "conversations" + DIR_SEPARATOR_STR + convId; + CPPUNIT_ASSERT(fileutils::isDirectory(repoPath)); + // Check created files + auto bobInvited = repoPath + DIR_SEPARATOR_STR + "invited" + DIR_SEPARATOR_STR + bobUri; + CPPUNIT_ASSERT(fileutils::isFile(bobInvited)); + CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&]() { return requestReceived; })); + memberMessageGenerated = false; + DRing::acceptConversationRequest(bobId, convId); + CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&]() { return memberMessageGenerated; })); + + sending = false; + sent = false; + DRing::sendMessage(aliceId, convId, "hi"s, ""); + CPPUNIT_ASSERT(cv.wait_for(lk, 30s, [&]() { return sending && sent; })); + + DRing::unregisterSignalHandlers(); +} + void ConversationTest::testSetMessageDisplayed() {