diff --git a/src/jamidht/account_manager.cpp b/src/jamidht/account_manager.cpp index 48da2417f34f0c06b48b4a372f4333637948dadf..166fa19c6824eeb21cdbb4d98e72830859a39569 100644 --- a/src/jamidht/account_manager.cpp +++ b/src/jamidht/account_manager.cpp @@ -190,7 +190,7 @@ AccountManager::useIdentity(const dht::crypto::Identity& identity, } void -AccountManager::startSync() +AccountManager::startSync(const OnNewDeviceCb& cb) { // Put device announcement if (info_->announce) { @@ -199,10 +199,13 @@ AccountManager::startSync() dht_->put(h, info_->announce, dht::DoneCallback {}, {}, true); for (const auto& crl : info_->identity.second->issuer->getRevocationLists()) dht_->put(h, crl, dht::DoneCallback {}, {}, true); - dht_->listen<DeviceAnnouncement>(h, [this](DeviceAnnouncement&& dev) { - findCertificate(dev.dev, [this](const std::shared_ptr<dht::crypto::Certificate>& crt) { - foundAccountDevice(crt); - }); + dht_->listen<DeviceAnnouncement>(h, [this, cb = std::move(cb)](DeviceAnnouncement&& dev) { + findCertificate(dev.dev, + [this, cb](const std::shared_ptr<dht::crypto::Certificate>& crt) { + if (cb) + cb(crt); + foundAccountDevice(crt); + }); return true; }); dht_->listen<dht::crypto::RevocationList>(h, [this](dht::crypto::RevocationList&& crl) { diff --git a/src/jamidht/account_manager.h b/src/jamidht/account_manager.h index bb4f160c5107c0b953e7f72c764101fbf49a1a3f..4a9cdd593ffd630b3e359a7370b62b32b8c2708f 100644 --- a/src/jamidht/account_manager.h +++ b/src/jamidht/account_manager.h @@ -73,6 +73,7 @@ public: using OnChangeCallback = ContactList::OnChangeCallback; using clock = std::chrono::system_clock; using time_point = clock::time_point; + using OnNewDeviceCb = std::function<void(const std::shared_ptr<dht::crypto::Certificate>&)>; AccountManager(const std::string& path, OnAsync&& onAsync, const std::string& nameServer) : path_(path) @@ -133,7 +134,7 @@ public: void setDht(const std::shared_ptr<dht::DhtRunner>& dht) { dht_ = dht; } - virtual void startSync(); + virtual void startSync(const OnNewDeviceCb& cb); const AccountInfo* getInfo() const { return info_.get(); } diff --git a/src/jamidht/archive_account_manager.cpp b/src/jamidht/archive_account_manager.cpp index 3f4a49b04072d319e2ae77225d6aca3e3d0b6504..561fef03d273d1f6996ceb35436558d58b567649 100644 --- a/src/jamidht/archive_account_manager.cpp +++ b/src/jamidht/archive_account_manager.cpp @@ -449,9 +449,9 @@ ArchiveAccountManager::syncDevices() } void -ArchiveAccountManager::startSync() +ArchiveAccountManager::startSync(const OnNewDeviceCb& cb) { - AccountManager::startSync(); + AccountManager::startSync(std::move(cb)); dht_->listen<DeviceSync>(dht::InfoHash::get("inbox:" + info_->deviceId), [this](DeviceSync&& sync) { // Received device sync data. diff --git a/src/jamidht/archive_account_manager.h b/src/jamidht/archive_account_manager.h index fa7a8cdc58aac920c0c1924aafb5a024194f8a40..e10c357fa56ddbbe43c418396d60d0d92c3fbc2d 100644 --- a/src/jamidht/archive_account_manager.h +++ b/src/jamidht/archive_account_manager.h @@ -49,7 +49,7 @@ public: AuthFailureCallback onFailure, OnChangeCallback onChange) override; - void startSync() override; + void startSync(const OnNewDeviceCb&) override; bool changePassword(const std::string& password_old, const std::string& password_new) override; diff --git a/src/jamidht/connectionmanager.cpp b/src/jamidht/connectionmanager.cpp index 654cec8f447aad30c05333e83db0da9971bef879..6d6484d62a1fedef2a58504ab907f6a6aab645e3 100644 --- a/src/jamidht/connectionmanager.cpp +++ b/src/jamidht/connectionmanager.cpp @@ -381,6 +381,10 @@ ConnectionManager::Impl::connectDevice(const DeviceId& deviceId, cb(nullptr, deviceId); return; } + if (deviceId.toString() == account.currentDeviceId()) { + cb(nullptr, deviceId); + return; + } account.findCertificate( deviceId, [w = weak(), deviceId, name, cb = std::move(cb)]( diff --git a/src/jamidht/conversation.cpp b/src/jamidht/conversation.cpp index c5a88b358260f101cf517d3e1cfe60fa0fd61c44..00860304959e14148df70165216a2ba29c88c82c 100644 --- a/src/jamidht/conversation.cpp +++ b/src/jamidht/conversation.cpp @@ -268,6 +268,10 @@ Conversation::fetchFrom(const std::string& uri) bool Conversation::mergeHistory(const std::string& uri) { + if (not pimpl_ or not pimpl_->repository_) { + JAMI_WARN("Invalid repo. Abort merge"); + return false; + } auto remoteHead = pimpl_->repository_->remoteHead(uri); if (remoteHead.empty()) { JAMI_WARN("Could not get HEAD of %s", uri.c_str()); diff --git a/src/jamidht/jamiaccount.cpp b/src/jamidht/jamiaccount.cpp index a1b681a223e9674a63cccefb3447b100015c20e1..7bfd6dfb1df869271e74aefcc533182a9786dbb9 100644 --- a/src/jamidht/jamiaccount.cpp +++ b/src/jamidht/jamiaccount.cpp @@ -134,6 +134,34 @@ struct VCardMessageCtx std::string path; }; +struct ConvInfo +{ + std::string id {}; + time_t created {0}; + time_t removed {0}; + + ConvInfo() = default; + ConvInfo(const Json::Value& json) + { + id = json["id"].asString(); + created = json["created"].asLargestUInt(); + removed = json["removed"].asLargestUInt(); + } + + Json::Value toJson() const + { + Json::Value json; + json["id"] = id; + json["created"] = Json::Int64(created); + if (removed) { + json["removed"] = Json::Int64(removed); + } + return json; + } + + MSGPACK_DEFINE_MAP(id, created, removed) +}; + namespace Migration { enum class State { // Contains all the Migration states @@ -327,14 +355,6 @@ JamiAccount::JamiAccount(const std::string& accountID, bool /* presenceEnabled * } setActiveCodecs({}); - - JAMI_INFO("Start loading conversations…"); - auto conversationsRepositories = fileutils::readDirectory(idPath_ + DIR_SEPARATOR_STR - + "conversations"); - for (const auto& repository : conversationsRepositories) { - conversations_.emplace(repository, std::make_unique<Conversation>(weak(), repository)); - } - JAMI_INFO("Conversations loaded!"); } JamiAccount::~JamiAccount() @@ -352,13 +372,13 @@ JamiAccount::~JamiAccount() void JamiAccount::shutdownConnections() { + decltype(gitServers_) gservers; { std::lock_guard<std::mutex> lk(gitServersMtx_); - for (auto& [_id, gs] : gitServers_) { - gs->stop(); - } - gitServers_.clear(); + gservers = std::move(gitServers_); } + for (auto& [_id, gs] : gservers) + gs->stop(); connectionManager_.reset(); dhtPeerConnector_.reset(); std::lock_guard<std::mutex> lk(sipConnsMtx_); @@ -1947,14 +1967,16 @@ JamiAccount::doRegister() + "conversations"); for (const auto& repository : conversationsRepositories) { try { - conversations_[repository] = std::move( - std::make_unique<Conversation>(weak(), repository)); + auto conv = std::make_unique<Conversation>(weak(), repository); + std::lock_guard<std::mutex> lk(conversationsMtx_); + conversations_.emplace(repository, std::move(conv)); } catch (const std::logic_error& e) { JAMI_WARN("[Account %s] Conversations not loaded : %s", getAccountID().c_str(), e.what()); } } + loadConvInfos(); JAMI_INFO("[Account %s] Conversations loaded!", getAccountID().c_str()); // invalid state transitions: @@ -2315,7 +2337,36 @@ JamiAccount::doRegister_() dht_->bootstrap(bootstrap); accountManager_->setDht(dht_); - accountManager_->startSync(); + accountManager_->startSync([this](const std::shared_ptr<dht::crypto::Certificate>& crt) { + if (!crt) + return; + auto deviceId = crt->getId().toString(); + if (accountManager_->getInfo()->deviceId == deviceId) + return; + + { + // Avoid to create multiple sync channels with a device + std::lock_guard<std::mutex> lk(syncConnectionsMtx_); + auto syncConn = syncConnections_.find(deviceId); + if ((syncConn != syncConnections_.end() and not syncConn->second.empty()) + or pendingSync_.find(deviceId) != pendingSync_.end()) + return; // Already syncing + pendingSync_.emplace(deviceId); + } + + connectionManager().connectDevice(crt->getId(), + "sync://" + deviceId, + [this](std::shared_ptr<ChannelSocket> socket, + const DeviceId& deviceId) { + if (socket) + syncWith(deviceId.toString(), socket); + { + std::lock_guard<std::mutex> lk( + syncConnectionsMtx_); + pendingSync_.erase(deviceId.toString()); + } + }); + }); // Init connection manager if (!connectionManager_) @@ -2342,7 +2393,8 @@ JamiAccount::doRegister_() auto result = fut.get(); return result; }); - connectionManager_->onChannelRequest([this](const DeviceId&, const std::string& name) { + connectionManager_->onChannelRequest([this](const DeviceId& deviceId, + const std::string& name) { auto isFile = name.substr(0, 7) == "file://"; auto isVCard = name.substr(0, 8) == "vcard://"; if (name.find("git://") == 0) { @@ -2350,6 +2402,24 @@ JamiAccount::doRegister_() return true; } else if (name == "sip") { return true; + } else if (name.find("sync://") == 0) { + // Check if sync request is from same account + std::promise<bool> accept; + std::future<bool> fut = accept.get_future(); + accountManager_ + ->findCertificate(deviceId, + [this, &accept]( + const std::shared_ptr<dht::crypto::Certificate>& cert) { + if (not cert or not cert->issuer) { + accept.set_value(false); + return; + } + accept.set_value(cert->issuer->getId().toString() + == accountManager_->getInfo()->accountId); + }); + fut.wait(); + auto result = fut.get(); + return result; } else if (isFile or isVCard) { auto tid_str = isFile ? name.substr(7) : name.substr(8); uint64_t tid; @@ -2375,6 +2445,8 @@ JamiAccount::doRegister_() auto isVCard = name.substr(0, 8) == "vcard://"; if (name == "sip") { cacheSIPConnection(std::move(channel), peerId, deviceId); + } else if (name.find("sync://") == 0) { + cacheSyncConnection(std::move(channel), peerId, deviceId); } else if (isFile or isVCard) { auto tid_str = isFile ? name.substr(7) : name.substr(8); std::unique_lock<std::mutex> lk(transfersMtx_); @@ -2399,17 +2471,23 @@ JamiAccount::doRegister_() std::move(channel), std::move(cb)); } else if (name.find("git://") == 0) { - auto conversationId = name.substr(name.find_last_of("/") + 1); - { - std::lock_guard<std::mutex> lk(pendingConversationsFetchMtx_); - if (pendingConversationsFetch_.find(conversationId) - != pendingConversationsFetch_.end()) { - // Currently cloning, so we can't offer a server. - return; - } + JAMI_WARN("[Account %s] New channel asked from %s with name %s", + getAccountID().c_str(), + deviceId.to_c_str(), + name.c_str()); + auto sep = name.find_last_of('/'); + auto conversationId = name.substr(sep + 1); + auto remoteDevice = name.substr(6, sep - 6); + auto currentDevice = currentDeviceId(); + if (remoteDevice != currentDevice || currentDevice == deviceId.to_c_str()) { + // Check if wanted remote it's our side (git://removeDevice/conversationId) + return; } - if (conversations_.find(conversationId) == conversations_.end()) { - JAMI_WARN("Git server requested, but for a non existing conversation (%s)", + + if (!isConversation(conversationId)) { + JAMI_WARN("[Account %s] Git server requested, but for a non existing " + "conversation (%s)", + getAccountID().c_str(), conversationId.c_str()); return; } @@ -2419,6 +2497,12 @@ JamiAccount::doRegister_() return; } auto accountId = this->accountID_; + JAMI_WARN("[Account %s] Git server requested for conversation %s, device %s, " + "channel %u", + getAccountID().c_str(), + conversationId.c_str(), + deviceId.to_c_str(), + channel->channel()); auto gs = std::make_unique<GitServer>(accountId, conversationId, channel); const dht::Value::Id serverId = ValueIdDist()(rand); { @@ -2426,11 +2510,14 @@ JamiAccount::doRegister_() gitServers_[serverId] = std::move(gs); } channel->onShutdown([w = weak(), serverId]() { - auto shared = w.lock(); - if (!shared) - return; - std::lock_guard<std::mutex> lk(shared->gitServersMtx_); - shared->gitServers_.erase(serverId); + // Run on main thread to avoid to be in mxSock's eventLoop + runOnMainThread([serverId, w]() { + auto shared = w.lock(); + if (!shared) + return; + std::lock_guard<std::mutex> lk(shared->gitServersMtx_); + shared->gitServers_.erase(serverId); + }); }); } } @@ -2505,8 +2592,15 @@ JamiAccount::doRegister_() std::map<std::string, std::string> metadatas = req.metadatas; { std::lock_guard<std::mutex> lk(conversationsRequestsMtx_); + auto it = conversationsRequests_.find(convId); + if (it != conversationsRequests_.end()) { + JAMI_INFO("Received a request for a conversation already existing. Ignore"); + return true; + } conversationsRequests_[convId] = std::move(req); } + // TODO: store request to be persistent when restarting + emitSignal<DRing::ConversationSignal::ConversationRequestReceived>(accountID_, convId, metadatas); @@ -3265,7 +3359,7 @@ JamiAccount::sendTextMessage(const std::string& to, auto confirm = std::make_shared<PendingConfirmation>(); - std::set<DeviceId> devices; + std::shared_ptr<std::set<DeviceId>> devices = std::make_shared<std::set<DeviceId>>(); std::unique_lock<std::mutex> lk(sipConnsMtx_); sip_utils::register_thread(); for (auto it = sipConns_.begin(); it != sipConns_.end();) { @@ -3305,7 +3399,8 @@ JamiAccount::sendTextMessage(const std::string& to, std::unique_lock<std::mutex> lk(acc->sipConnsMtx_); acc->sipConns_.erase(std::make_pair(c->to, c->deviceId)); } - acc->connectionManager_->closeConnectionsWith(c->deviceId); + if (acc->connectionManager_) + acc->connectionManager_->closeConnectionsWith(c->deviceId); // This MUST be done after closing the connection to avoid race condition // with messageEngine_ acc->messageEngine_.onMessageSent(c->to, c->id, false); @@ -3330,7 +3425,7 @@ JamiAccount::sendTextMessage(const std::string& to, continue; } - devices.emplace(key.second); + devices->emplace(key.second); ++it; } lk.unlock(); @@ -3338,14 +3433,13 @@ JamiAccount::sendTextMessage(const std::string& to, // Find listening devices for this account accountManager_->forEachDevice( toH, - [this, confirm, to, token, payloads, now, devices {std::move(devices)}]( - const dht::InfoHash& dev) { + [this, confirm, to, token, payloads, now, devices](const dht::InfoHash& dev) { // Test if already sent - if (devices.find(dev) != devices.end()) { + if (devices->find(dev) != devices->end()) { return; } - // TODO do not use getAccountDetails(), accountInfo if (dev.toString() == currentDeviceId()) { + devices->emplace(dev); return; } @@ -3423,8 +3517,18 @@ JamiAccount::sendTextMessage(const std::string& to, JAMI_DBG() << "[Account " << getAccountID() << "] [message " << token << "] Sending message for device " << dev.toString(); }, - [this, to, token](bool ok) { - if (not ok) { + [this, to, token, devices, confirm](bool ok) { + if (devices->size() == 1 && devices->begin()->toString() == currentDeviceId()) { + // Current user only have devices, so no message are sent + { + std::lock_guard<std::mutex> l(confirm->lock); + for (auto& t : confirm->listenTokens) + dht_->cancelListen(t.first, std::move(t.second)); + confirm->listenTokens.clear(); + confirm->replied = true; + } + messageEngine_.onMessageSent(to, token, true); + } else if (not ok) { messageEngine_.onMessageSent(to, token, false); } }); @@ -3670,11 +3774,28 @@ JamiAccount::startConversation() // Create the conversation object auto conversation = std::make_unique<Conversation>(weak()); auto convId = conversation->id(); - conversations_[convId] = std::move(conversation); + { + std::lock_guard<std::mutex> lk(conversationsMtx_); + conversations_[convId] = std::move(conversation); + } + + // Update convInfo + ConvInfo info; + info.id = convId; + info.created = std::time(nullptr); + convInfos_.emplace_back(info); + saveConvInfos(); + + runOnMainThread([w = weak()]() { + // Invite connected devices for the same user + auto shared = w.lock(); + if (!shared or !shared->accountManager_) + return; + + // Send to connected devices + shared->syncWithConnected(); + }); - // TODO - // And send an invite to others devices to sync the conversation between device - // Via getMembers emitSignal<DRing::ConversationSignal::ConversationReady>(accountID_, convId); return convId; } @@ -3700,15 +3821,15 @@ JamiAccount::acceptConversationRequest(const std::string& conversationId) // TODO check why some members are 000000 continue; } - // Avoid to connect to self for now - if (username_.find(member) != std::string::npos) - continue; // TODO cf sync between devices forEachDevice(memberHash, [this, request = request->second](const dht::InfoHash& dev) { + if (dev == dht()->getId()) + return; + std::string channelName = "git://" + dev.toString() + "/" + request.conversationId; connectionManager().connectDevice( dev, - "git://" + dev.toString() + "/" + request.conversationId, - [this, dev, request](std::shared_ptr<ChannelSocket> socket, const DeviceId&) { + channelName, + [this, request](std::shared_ptr<ChannelSocket> socket, const DeviceId& dev) { if (socket) { std::unique_lock<std::mutex> lk(pendingConversationsFetchMtx_); auto& pending = pendingConversationsFetch_[request.conversationId]; @@ -3764,7 +3885,15 @@ JamiAccount::handlePendingConversations() it->second.deviceId, conversationId); if (conversation) { - conversations_.emplace(conversationId, std::move(conversation)); + ConvInfo info; + info.id = conversationId; + info.created = std::time(nullptr); + convInfos_.emplace_back(info); + saveConvInfos(); + { + std::lock_guard<std::mutex> lk(conversationsMtx_); + conversations_.emplace(conversationId, std::move(conversation)); + } // Inform user that the conversation is ready emitSignal<DRing::ConversationSignal::ConversationReady>(accountID_, conversationId); @@ -3794,6 +3923,7 @@ std::vector<std::string> JamiAccount::getConversations() { std::vector<std::string> result; + std::lock_guard<std::mutex> lk(conversationsMtx_); result.reserve(conversations_.size()); for (const auto& [key, _] : conversations_) { result.emplace_back(key); @@ -3812,6 +3942,7 @@ JamiAccount::getConversationRequests() bool JamiAccount::addConversationMember(const std::string& conversationId, const std::string& contactUri) { + std::lock_guard<std::mutex> lk(conversationsMtx_); // Add a new member in the conversation if (conversations_[conversationId]->addMember(contactUri).empty()) { JAMI_WARN("Couldn't add %s to %s", contactUri.c_str(), conversationId.c_str()); @@ -3839,6 +3970,7 @@ bool JamiAccount::removeConversationMember(const std::string& conversationId, const std::string& contactUri) { + std::lock_guard<std::mutex> lk(conversationsMtx_); conversations_[conversationId]->removeMember(contactUri); return true; } @@ -3846,6 +3978,7 @@ JamiAccount::removeConversationMember(const std::string& conversationId, std::vector<std::map<std::string, std::string>> JamiAccount::getConversationMembers(const std::string& conversationId) { + std::lock_guard<std::mutex> lk(conversationsMtx_); auto conversation = conversations_.find(conversationId); if (conversation != conversations_.end() && conversation->second) return conversation->second->getMembers(); @@ -3859,6 +3992,7 @@ JamiAccount::sendMessage(const std::string& conversationId, const std::string& parent, const std::string& type) { + std::lock_guard<std::mutex> lk(conversationsMtx_); auto conversation = conversations_.find(conversationId); if (conversation != conversations_.end() && conversation->second) { auto commitId = conversation->second->sendMessage(message, type, parent); @@ -3894,12 +4028,13 @@ JamiAccount::loadConversationMessages(const std::string& conversationId, const std::string& fromMessage, size_t n) { - if (conversations_.find(conversationId) == conversations_.end()) + if (!isConversation(conversationId)) return 0; const uint32_t id = LoadIdDist()(rand); // loadMessages will perform a git log that can take quite some time, so to avoid any lock, run // it the threadpool dht::ThreadPool::io().run([this, conversationId, fromMessage, n, id] { + std::lock_guard<std::mutex> lk(conversationsMtx_); auto conversation = conversations_.find(conversationId); if (conversation != conversations_.end() && conversation->second) { auto messages = conversation->second->loadMessages(fromMessage, n); @@ -3918,7 +4053,8 @@ JamiAccount::onNewGitCommit(const std::string& peer, const std::string& conversationId, const std::string& commitId) { - JAMI_DBG("on new commit notification from %s, for %s, commit %s", + JAMI_DBG("[Account %s] on new commit notification from %s, for %s, commit %s", + getAccountID().c_str(), peer.c_str(), conversationId.c_str(), commitId.c_str()); @@ -3979,23 +4115,23 @@ JamiAccount::onNewGitCommit(const std::string& peer, DeviceId(deviceId), "git://" + deviceId + "/" + conversationId, [this, - deviceId, conversation, conversationId, announceMessages = std::move( - announceMessages)](std::shared_ptr<ChannelSocket> socket, const DeviceId&) { + announceMessages)](std::shared_ptr<ChannelSocket> socket, + const DeviceId& deviceId) { if (socket) { - addGitSocket(deviceId, conversationId, socket); - if (!conversation->second->fetchFrom(deviceId)) + addGitSocket(deviceId.toString(), conversationId, socket); + if (!conversation->second->fetchFrom(deviceId.toString())) JAMI_WARN("Could not fetch new commit from %s for %s", - deviceId.c_str(), + deviceId.to_c_str(), conversationId.c_str()); - auto merged = conversation->second->mergeHistory(deviceId); + auto merged = conversation->second->mergeHistory(deviceId.toString()); if (merged) announceMessages(); } else { JAMI_ERR("Couldn't open a new git channel with %s for conversation %s", - deviceId.c_str(), + deviceId.to_c_str(), conversationId.c_str()); } { @@ -4387,4 +4523,188 @@ JamiAccount::currentDeviceId() const return accountManager_->getInfo()->deviceId; } +void +JamiAccount::cacheSyncConnection(std::shared_ptr<ChannelSocket>&& socket, + const std::string& peerId, + const DeviceId& device) +{ + auto deviceId = device.toString(); + std::unique_lock<std::mutex> lk(syncConnectionsMtx_); + syncConnections_[deviceId].emplace_back(socket); + + socket->onShutdown([w = weak(), peerId, deviceId, socket]() { + auto shared = w.lock(); + if (!shared) + return; + std::lock_guard<std::mutex> lk(shared->syncConnectionsMtx_); + auto& connections = shared->syncConnections_[deviceId]; + auto conn = connections.begin(); + while (conn != connections.end()) { + if (*conn == socket) + conn = connections.erase(conn); + else + conn++; + } + }); + + socket->setOnRecv([this, deviceId](const uint8_t* buf, size_t len) { + if (!buf) + return len; + + std::string err; + Json::Value value; + Json::CharReaderBuilder rbuilder; + Json::CharReaderBuilder::strictMode(&rbuilder.settings_); + auto reader = std::unique_ptr<Json::CharReader>(rbuilder.newCharReader()); + auto bufCstr = reinterpret_cast<const char*>(buf); + if (!reader->parse(bufCstr, bufCstr + len, &value, &err)) { + JAMI_ERR() << "Archive JSON parsing error: " << err; + return len; + } + + if (value.isMember("conversations")) { + for (const auto& jsonConv : value["conversations"]) { + auto convId = jsonConv["id"].asString(); + auto removed = jsonConv.isMember("removed"); + if (not removed) { + if (!isConversation(convId)) { + { + std::lock_guard<std::mutex> lk(pendingConversationsFetchMtx_); + auto it = pendingConversationsFetch_.find(convId); + if (it != pendingConversationsFetch_.end()) // Already pending + return len; + pendingConversationsFetch_[convId] = PendingConversationFetch {}; + } + + connectionManager().connectDevice( + DeviceId(deviceId), + std::string("git://").append(deviceId).append("/").append(convId), + [this, convId](std::shared_ptr<ChannelSocket> socket, + const DeviceId& deviceId) { + if (socket) { + std::unique_lock<std::mutex> lk(pendingConversationsFetchMtx_); + auto& pending = pendingConversationsFetch_[convId]; + if (!pending.ready) { + pending.ready = true; + pending.deviceId = deviceId.toString(); + lk.unlock(); + // Save the git socket + addGitSocket(deviceId.toString(), convId, socket); + checkConversationsEvents(); + // TODO when do we remove the gitSocket? + } else { + lk.unlock(); + socket->shutdown(); + } + } + }); + + JAMI_INFO( + "[Account %s] New conversation detected: %s. Ask device %s to clone it", + getAccountID().c_str(), + convId.c_str(), + deviceId.c_str()); + } else { + JAMI_INFO("[Account %s] Already have conversation %s", + getAccountID().c_str(), + convId.c_str()); + } + } else { + for (auto& info : convInfos_) { + if (info.id == convId) { + info.removed = std::time(nullptr); + } + } + } + } + saveConvInfos(); + } + return len; + }); +} + +void +JamiAccount::syncWith(const std::string& deviceId, const std::shared_ptr<ChannelSocket>& socket) +{ + if (!socket) + return; + { + std::lock_guard<std::mutex> lk(syncConnectionsMtx_); + socket->onShutdown([w = weak(), socket, deviceId]() { + // When sock is shutdown update syncConnections_ to be able to resync asap + auto shared = w.lock(); + if (!shared) + return; + std::lock_guard<std::mutex> lk(shared->syncConnectionsMtx_); + auto& connections = shared->syncConnections_[deviceId]; + auto conn = connections.begin(); + while (conn != connections.end()) { + if (*conn == socket) + conn = connections.erase(conn); + else + conn++; + } + if (connections.empty()) { + shared->syncConnections_.erase(deviceId); + } + }); + syncConnections_[deviceId].emplace_back(socket); + } + syncInfos(socket); +} + +void +JamiAccount::syncInfos(const std::shared_ptr<ChannelSocket>& socket) +{ + // Sync conversations + if (not socket or convInfos_.empty()) + return; + Json::Value syncValue; + for (const auto& info : convInfos_) { + syncValue["conversations"].append(info.toJson()); + } + + Json::StreamWriterBuilder builder; + const auto sync = Json::writeString(builder, syncValue); + + std::error_code ec; + socket->write(reinterpret_cast<const unsigned char*>(sync.c_str()), sync.size(), ec); +} + +void +JamiAccount::syncWithConnected() +{ + std::lock_guard<std::mutex> lk(syncConnectionsMtx_); + for (auto& [_deviceId, sockets] : syncConnections_) { + if (not sockets.empty()) + syncInfos(sockets[0]); + } +} + +void +JamiAccount::loadConvInfos() +{ + decltype(convInfos_) convInfo; + try { + // read file + auto file = fileutils::loadFile("convInfo", idPath_); + // load values + msgpack::object_handle oh = msgpack::unpack((const char*) file.data(), file.size()); + oh.get().convert(convInfo); + } catch (const std::exception& e) { + JAMI_WARN("[convInfo] error loading convInfo: %s", e.what()); + return; + } + + for (auto& info : convInfo) + convInfos_.emplace_back(info); +} + +void +JamiAccount::saveConvInfos() const +{ + std::ofstream file(idPath_ + DIR_SEPARATOR_STR "convInfo", std::ios::trunc | std::ios::binary); + msgpack::pack(file, convInfos_); +} + } // namespace jami diff --git a/src/jamidht/jamiaccount.h b/src/jamidht/jamiaccount.h index 3aa22f78a5a85b7de71a950dd8d264b95f56e22d..607926a043f8c629c242439604bf98d35ef650dd 100644 --- a/src/jamidht/jamiaccount.h +++ b/src/jamidht/jamiaccount.h @@ -83,6 +83,7 @@ struct AccountInfo; class SipTransport; class ChanneledOutgoingTransfer; class Conversation; +struct ConvInfo; /** * A ConversationRequest is a request which corresponds to a trust request, but for conversations @@ -516,8 +517,8 @@ public: const std::string& parent = "", const std::string& type = "text/plain"); uint32_t loadConversationMessages(const std::string& conversationId, - const std::string& fromMessage = "", - size_t n = 0); + const std::string& fromMessage = "", + size_t n = 0); // Received a new commit notification void onNewGitCommit(const std::string& peer, @@ -665,6 +666,9 @@ private: */ void generateDhParams(); + void loadConvInfos(); + void saveConvInfos() const; + template<class... Args> std::shared_ptr<IceTransport> createIceTransport(const Args&... args); void newOutgoingCallHelper(const std::shared_ptr<SIPCall>& call, std::string_view toUri); @@ -714,7 +718,14 @@ private: std::map<dht::InfoHash, BuddyInfo> trackedBuddies_; /** Conversations */ + mutable std::mutex conversationsMtx_ {}; std::map<std::string, std::unique_ptr<Conversation>> conversations_; + bool isConversation(const std::string& convId) const + { + std::lock_guard<std::mutex> lk(conversationsMtx_); + return conversations_.find(convId) != conversations_.end(); + } + std::vector<ConvInfo> convInfos_; mutable std::mutex dhtValuesMtx_; bool dhtPublicInCalls_ {true}; @@ -820,6 +831,12 @@ private: */ void callConnectionClosed(const DeviceId& deviceId, bool eraseDummy); + // Sync connections + std::mutex syncConnectionsMtx_; + std::set<std::string> pendingSync_ {}; + std::map<std::string /* deviceId */, std::vector<std::shared_ptr<ChannelSocket>>> + syncConnections_; + /** * Ask a device to open a channeled SIP socket * @param peerId The contact who owns the device @@ -836,6 +853,15 @@ private: void cacheSIPConnection(std::shared_ptr<ChannelSocket>&& socket, const std::string& peerId, const DeviceId& deviceId); + /** + * Store a new Sync connection + * @param socket The new sync channel + * @param peerId The contact who owns the device + * @param deviceId Device linked to that transport + */ + void cacheSyncConnection(std::shared_ptr<ChannelSocket>&& socket, + const std::string& peerId, + const DeviceId& deviceId); // File transfers std::mutex transfersMtx_ {}; @@ -882,6 +908,10 @@ private: std::shared_ptr<RepeatedTask> conversationsEventHandler {}; void checkConversationsEvents(); bool handlePendingConversations(); + + void syncWith(const std::string& deviceId, const std::shared_ptr<ChannelSocket>& socket); + void syncInfos(const std::shared_ptr<ChannelSocket>& socket); + void syncWithConnected(); }; static inline std::ostream& diff --git a/test/unitTest/Makefile.am b/test/unitTest/Makefile.am index fbbc6a856bde5ae7542e9216736af4bb7e2d7675..8164852fe5e458042d2f2a1ed5bd0990160fe5b1 100644 --- a/test/unitTest/Makefile.am +++ b/test/unitTest/Makefile.am @@ -151,4 +151,10 @@ ut_conversation_SOURCES = conversation/conversation.cpp check_PROGRAMS += ut_media_control ut_media_control_SOURCES = media_control/media_control.cpp +# +# syncHistory +# +check_PROGRAMS += ut_syncHistory +ut_syncHistory_SOURCES = syncHistory/syncHistory.cpp + TESTS = $(check_PROGRAMS) diff --git a/test/unitTest/syncHistory/syncHistory.cpp b/test/unitTest/syncHistory/syncHistory.cpp new file mode 100644 index 0000000000000000000000000000000000000000..1d5c8251f9b427aafa65346e851abf0067a02120 --- /dev/null +++ b/test/unitTest/syncHistory/syncHistory.cpp @@ -0,0 +1,380 @@ +/* + * Copyright (C) 2017-2019 Savoir-faire Linux Inc. + * Author: Sébastien Blin <sebastien.blin@savoirfairelinux.com> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <https://www.gnu.org/licenses/>. + */ + +#include <cppunit/TestAssert.h> +#include <cppunit/TestFixture.h> +#include <cppunit/extensions/HelperMacros.h> + +#include <condition_variable> +#include <filesystem> + +#include "manager.h" +#include "jamidht/connectionmanager.h" +#include "jamidht/multiplexed_socket.h" +#include "jamidht/jamiaccount.h" +#include "../../test_runner.h" +#include "dring.h" +#include "account_const.h" + +using namespace DRing::Account; + +namespace jami { +namespace test { + +class SyncHistoryTest : public CppUnit::TestFixture +{ +public: + SyncHistoryTest() + { + // Init daemon + DRing::init(DRing::InitFlag(DRing::DRING_FLAG_DEBUG | DRing::DRING_FLAG_CONSOLE_LOG)); + if (not Manager::instance().initialized) + CPPUNIT_ASSERT(DRing::start("dring-sample.yml")); + } + ~SyncHistoryTest() { DRing::fini(); } + static std::string name() { return "SyncHistory"; } + void setUp(); + void tearDown(); + + std::string aliceId; + std::string alice2Id; + +private: + void testCreateConversationThenSync(); + void testCreateConversationWithOnlineDevice(); + void testCreateConversationWithMessagesThenAddDevice(); + + CPPUNIT_TEST_SUITE(SyncHistoryTest); + CPPUNIT_TEST(testCreateConversationThenSync); + CPPUNIT_TEST(testCreateConversationWithOnlineDevice); + CPPUNIT_TEST(testCreateConversationWithMessagesThenAddDevice); + CPPUNIT_TEST_SUITE_END(); +}; + +CPPUNIT_TEST_SUITE_NAMED_REGISTRATION(SyncHistoryTest, SyncHistoryTest::name()); + +void +SyncHistoryTest::setUp() +{ + std::map<std::string, std::string> details = DRing::getAccountTemplate("RING"); + details[ConfProperties::TYPE] = "RING"; + details[ConfProperties::DISPLAYNAME] = "ALICE"; + details[ConfProperties::ALIAS] = "ALICE"; + details[ConfProperties::UPNP_ENABLED] = "true"; + details[ConfProperties::ARCHIVE_PASSWORD] = ""; + details[ConfProperties::ARCHIVE_PIN] = ""; + details[ConfProperties::ARCHIVE_PATH] = ""; + aliceId = Manager::instance().addAccount(details); + + JAMI_INFO("Initialize account..."); + auto aliceAccount = Manager::instance().getAccount<JamiAccount>(aliceId); + std::map<std::string, std::shared_ptr<DRing::CallbackWrapperBase>> confHandlers; + std::mutex mtx; + std::unique_lock<std::mutex> lk {mtx}; + std::condition_variable cv; + confHandlers.insert( + DRing::exportable_callback<DRing::ConfigurationSignal::VolatileDetailsChanged>( + [&](const std::string&, const std::map<std::string, std::string>&) { + bool ready = false; + auto details = aliceAccount->getVolatileAccountDetails(); + auto daemonStatus = details[DRing::Account::ConfProperties::Registration::STATUS]; + ready = (daemonStatus == "REGISTERED"); + if (ready) + cv.notify_one(); + })); + DRing::registerSignalHandlers(confHandlers); + cv.wait_for(lk, std::chrono::seconds(30)); + DRing::unregisterSignalHandlers(); +} + +void +SyncHistoryTest::tearDown() +{ + JAMI_INFO("Remove created accounts..."); + + std::map<std::string, std::shared_ptr<DRing::CallbackWrapperBase>> confHandlers; + std::mutex mtx; + std::unique_lock<std::mutex> lk {mtx}; + std::condition_variable cv; + auto currentAccSize = Manager::instance().getAccountList().size(); + confHandlers.insert( + DRing::exportable_callback<DRing::ConfigurationSignal::AccountsChanged>([&]() { + if (Manager::instance().getAccountList().size() <= currentAccSize - 1) { + cv.notify_one(); + } + })); + DRing::registerSignalHandlers(confHandlers); + + Manager::instance().removeAccount(aliceId, true); + // Because cppunit is not linked with dbus, just poll if removed + cv.wait_for(lk, std::chrono::seconds(30)); + + DRing::unregisterSignalHandlers(); +} + +void +SyncHistoryTest::testCreateConversationThenSync() +{ + auto aliceAccount = Manager::instance().getAccount<JamiAccount>(aliceId); + // Start conversation + auto convId = aliceAccount->startConversation(); + + // Now create alice2 + auto aliceArchive = std::filesystem::current_path().string() + "/alice.gz"; + std::remove(aliceArchive.c_str()); + aliceAccount->exportArchive(aliceArchive); + std::map<std::string, std::string> details = DRing::getAccountTemplate("RING"); + details[ConfProperties::TYPE] = "RING"; + details[ConfProperties::DISPLAYNAME] = "ALICE2"; + details[ConfProperties::ALIAS] = "ALICE2"; + details[ConfProperties::UPNP_ENABLED] = "true"; + details[ConfProperties::ARCHIVE_PASSWORD] = ""; + details[ConfProperties::ARCHIVE_PIN] = ""; + details[ConfProperties::ARCHIVE_PATH] = aliceArchive; + alice2Id = Manager::instance().addAccount(details); + + std::mutex mtx; + std::unique_lock<std::mutex> lk {mtx}; + std::condition_variable cv; + std::map<std::string, std::shared_ptr<DRing::CallbackWrapperBase>> confHandlers; + confHandlers.insert( + DRing::exportable_callback<DRing::ConfigurationSignal::VolatileDetailsChanged>( + [&](const std::string&, const std::map<std::string, std::string>&) { + auto alice2Account = Manager::instance().getAccount<JamiAccount>(alice2Id); + auto details = alice2Account->getVolatileAccountDetails(); + auto daemonStatus = details[DRing::Account::ConfProperties::Registration::STATUS]; + if (daemonStatus == "REGISTERED") + cv.notify_one(); + })); + DRing::registerSignalHandlers(confHandlers); + + cv.wait_for(lk, std::chrono::seconds(30)); + DRing::unregisterSignalHandlers(); + confHandlers.clear(); + + auto conversationReady = false; + confHandlers.insert(DRing::exportable_callback<DRing::ConversationSignal::ConversationReady>( + [&](const std::string& accountId, const std::string& conversationId) { + if (accountId == alice2Id && conversationId == convId) { + conversationReady = true; + cv.notify_one(); + } + })); + DRing::registerSignalHandlers(confHandlers); + cv.wait_for(lk, std::chrono::seconds(30)); + DRing::unregisterSignalHandlers(); + confHandlers.clear(); + + // Remove alice 2 and alice.gz + std::remove(aliceArchive.c_str()); + + auto currentAccSize = Manager::instance().getAccountList().size(); + confHandlers.insert( + DRing::exportable_callback<DRing::ConfigurationSignal::AccountsChanged>([&]() { + if (Manager::instance().getAccountList().size() <= currentAccSize - 1) { + cv.notify_one(); + } + })); + DRing::registerSignalHandlers(confHandlers); + + Manager::instance().removeAccount(alice2Id, true); + // Because cppunit is not linked with dbus, just poll if removed + cv.wait_for(lk, std::chrono::seconds(30)); + + DRing::unregisterSignalHandlers(); + confHandlers.clear(); + + // Check if conversation is ready + CPPUNIT_ASSERT(conversationReady); +} + +void +SyncHistoryTest::testCreateConversationWithOnlineDevice() +{ + auto aliceAccount = Manager::instance().getAccount<JamiAccount>(aliceId); + + // Now create alice2 + auto aliceArchive = std::filesystem::current_path().string() + "/alice.gz"; + std::remove(aliceArchive.c_str()); + aliceAccount->exportArchive(aliceArchive); + std::map<std::string, std::string> details = DRing::getAccountTemplate("RING"); + details[ConfProperties::TYPE] = "RING"; + details[ConfProperties::DISPLAYNAME] = "ALICE2"; + details[ConfProperties::ALIAS] = "ALICE2"; + details[ConfProperties::UPNP_ENABLED] = "true"; + details[ConfProperties::ARCHIVE_PASSWORD] = ""; + details[ConfProperties::ARCHIVE_PIN] = ""; + details[ConfProperties::ARCHIVE_PATH] = aliceArchive; + alice2Id = Manager::instance().addAccount(details); + + std::mutex mtx; + std::unique_lock<std::mutex> lk {mtx}; + std::condition_variable cv; + std::map<std::string, std::shared_ptr<DRing::CallbackWrapperBase>> confHandlers; + confHandlers.insert( + DRing::exportable_callback<DRing::ConfigurationSignal::VolatileDetailsChanged>( + [&](const std::string&, const std::map<std::string, std::string>&) { + auto alice2Account = Manager::instance().getAccount<JamiAccount>(alice2Id); + auto details = alice2Account->getVolatileAccountDetails(); + auto daemonStatus = details[DRing::Account::ConfProperties::Registration::STATUS]; + if (daemonStatus == "REGISTERED") + cv.notify_one(); + })); + DRing::registerSignalHandlers(confHandlers); + + cv.wait_for(lk, std::chrono::seconds(30)); + DRing::unregisterSignalHandlers(); + confHandlers.clear(); + + // Start conversation now + auto convId = aliceAccount->startConversation(); + auto conversationReady = false; + confHandlers.insert(DRing::exportable_callback<DRing::ConversationSignal::ConversationReady>( + [&](const std::string& accountId, const std::string& conversationId) { + if (accountId == alice2Id && conversationId == convId) { + conversationReady = true; + cv.notify_one(); + } + })); + DRing::registerSignalHandlers(confHandlers); + cv.wait_for(lk, std::chrono::seconds(30)); + DRing::unregisterSignalHandlers(); + confHandlers.clear(); + + // Remove alice 2 and alice.gz + std::remove(aliceArchive.c_str()); + + auto currentAccSize = Manager::instance().getAccountList().size(); + confHandlers.insert( + DRing::exportable_callback<DRing::ConfigurationSignal::AccountsChanged>([&]() { + if (Manager::instance().getAccountList().size() <= currentAccSize - 1) { + cv.notify_one(); + } + })); + DRing::registerSignalHandlers(confHandlers); + + Manager::instance().removeAccount(alice2Id, true); + // Because cppunit is not linked with dbus, just poll if removed + cv.wait_for(lk, std::chrono::seconds(30)); + + DRing::unregisterSignalHandlers(); + confHandlers.clear(); + + // Check if conversation is ready + CPPUNIT_ASSERT(conversationReady); +} + +void +SyncHistoryTest::testCreateConversationWithMessagesThenAddDevice() +{ + auto aliceAccount = Manager::instance().getAccount<JamiAccount>(aliceId); + // Start conversation + auto convId = aliceAccount->startConversation(); + aliceAccount->sendMessage(convId, "Message 1"); + aliceAccount->sendMessage(convId, "Message 2"); + aliceAccount->sendMessage(convId, "Message 3"); + + // Now create alice2 + auto aliceArchive = std::filesystem::current_path().string() + "/alice.gz"; + std::remove(aliceArchive.c_str()); + aliceAccount->exportArchive(aliceArchive); + std::map<std::string, std::string> details = DRing::getAccountTemplate("RING"); + details[ConfProperties::TYPE] = "RING"; + details[ConfProperties::DISPLAYNAME] = "ALICE2"; + details[ConfProperties::ALIAS] = "ALICE2"; + details[ConfProperties::UPNP_ENABLED] = "true"; + details[ConfProperties::ARCHIVE_PASSWORD] = ""; + details[ConfProperties::ARCHIVE_PIN] = ""; + details[ConfProperties::ARCHIVE_PATH] = aliceArchive; + alice2Id = Manager::instance().addAccount(details); + + std::mutex mtx; + std::unique_lock<std::mutex> lk {mtx}; + std::condition_variable cv; + std::map<std::string, std::shared_ptr<DRing::CallbackWrapperBase>> confHandlers; + auto conversationReady = false; + confHandlers.insert( + DRing::exportable_callback<DRing::ConfigurationSignal::VolatileDetailsChanged>( + [&](const std::string&, const std::map<std::string, std::string>&) { + auto alice2Account = Manager::instance().getAccount<JamiAccount>(alice2Id); + auto details = alice2Account->getVolatileAccountDetails(); + auto daemonStatus = details[DRing::Account::ConfProperties::Registration::STATUS]; + if (daemonStatus == "REGISTERED") + cv.notify_one(); + })); + confHandlers.insert(DRing::exportable_callback<DRing::ConversationSignal::ConversationReady>( + [&](const std::string& accountId, const std::string& conversationId) { + if (accountId == alice2Id && conversationId == convId) { + conversationReady = true; + cv.notify_one(); + } + })); + DRing::registerSignalHandlers(confHandlers); + DRing::unregisterSignalHandlers(); + confHandlers.clear(); + + // Check if conversation is ready + CPPUNIT_ASSERT(cv.wait_for(lk, std::chrono::seconds(60), [&]() { return conversationReady; })); + auto alice2Account = Manager::instance().getAccount<JamiAccount>(alice2Id); + std::vector<std::map<std::string, std::string>> messages; + confHandlers.insert(DRing::exportable_callback<DRing::ConversationSignal::ConversationLoaded>( + [&](uint32_t, + const std::string& accountId, + const std::string& conversationId, + std::vector<std::map<std::string, std::string>> msg) { + if (accountId == alice2Id && conversationId == convId) { + messages = msg; + cv.notify_one(); + } + })); + DRing::registerSignalHandlers(confHandlers); + alice2Account->loadConversationMessages(convId); + cv.wait_for(lk, std::chrono::seconds(30)); + DRing::unregisterSignalHandlers(); + confHandlers.clear(); + + // Check messages + CPPUNIT_ASSERT(messages.size() == 4 /* 3 + initial */); + CPPUNIT_ASSERT(messages[0]["body"] == "Message 3"); + CPPUNIT_ASSERT(messages[1]["body"] == "Message 2"); + CPPUNIT_ASSERT(messages[2]["body"] == "Message 1"); + + // Remove alice 2 and alice.gz + std::remove(aliceArchive.c_str()); + + auto currentAccSize = Manager::instance().getAccountList().size(); + confHandlers.insert( + DRing::exportable_callback<DRing::ConfigurationSignal::AccountsChanged>([&]() { + if (Manager::instance().getAccountList().size() <= currentAccSize - 1) { + cv.notify_one(); + } + })); + DRing::registerSignalHandlers(confHandlers); + + Manager::instance().removeAccount(alice2Id, true); + // Because cppunit is not linked with dbus, just poll if removed + cv.wait_for(lk, std::chrono::seconds(30)); + + DRing::unregisterSignalHandlers(); + confHandlers.clear(); +} + +} // namespace test +} // namespace jami + +RING_TEST_RUNNER(jami::test::SyncHistoryTest::name())