From f2288e0cc31c7eb0f2e99e5f6a8b11c3302d644a Mon Sep 17 00:00:00 2001 From: Fadi SHEHADEH <fadi.shehadeh@savoirfairelinux.com> Date: Mon, 27 Mar 2023 09:45:27 -0400 Subject: [PATCH] DRT: implement Distributed Routing Table This patch adds the minimal code for the DRT. This component should avoid the fact that every peer in a swarm must connects to every other peer. The idea is to get a similar algorithm we use in OpenDHT, meaning to store known devices in buckets and try to dynamically connect to some of the peers, creating a routing table, so every peer in a conversation should connect to a subset of peers. The big difference is that we must includes mobile devices (for OpenDHT push/baterry optimizations can be taken into accounts). For this, we consider a special type of node in the DRT that is considered as reachable but not connected and stored in the mobile nodes. A mobile announces itself as a mobile after bootstraping via a message in the swarm channel. In this case, if they disconnect the node is moved to a special state (mobile node) instead (known nodes). A user can know to which devices they can send messages via Routing Table's mobile nodes and connected nodes. Docs: TODO add link GitLab: #297 Change-Id: Ic061c58ca487698afee712cb2eef5e57f9c208fb --- src/gittransport.cpp | 18 +- src/jamidht/account_manager.h | 1 - src/jamidht/conversation.cpp | 133 +- src/jamidht/conversation.h | 51 +- src/jamidht/conversation_channel_handler.cpp | 16 +- src/jamidht/conversation_module.cpp | 428 +++++- src/jamidht/conversation_module.h | 54 +- src/jamidht/jamiaccount.cpp | 42 +- src/jamidht/jamiaccount.h | 51 +- src/jamidht/swarm/swarm_channel_handler.cpp | 15 + src/jamidht/swarm/swarm_channel_handler.h | 4 + src/jamidht/swarm/swarm_manager.cpp | 123 +- src/jamidht/swarm/swarm_manager.h | 34 +- src/manager.cpp | 13 +- src/manager.h | 10 +- test/unitTest/Makefile.am | 6 + test/unitTest/swarm/nodes.h | 93 ++ test/unitTest/swarm/routing_table.cpp | 1318 ++++++++++++++++++ test/unitTest/swarm/swarm_spread.cpp | 479 +++++++ 19 files changed, 2650 insertions(+), 239 deletions(-) create mode 100644 test/unitTest/swarm/nodes.h create mode 100644 test/unitTest/swarm/routing_table.cpp create mode 100644 test/unitTest/swarm/swarm_spread.cpp diff --git a/src/gittransport.cpp b/src/gittransport.cpp index 7d4e2fc60d..3a11c85093 100644 --- a/src/gittransport.cpp +++ b/src/gittransport.cpp @@ -51,7 +51,7 @@ generateRequest(git_buf* request, const std::string& cmd, const std::string_view + HOST_TAG.size() + deviceId.size() /* device */ + nullSeparator.size() /* \0 */; - std::stringstream streamed; + std::ostringstream streamed; streamed << std::setw(4) << std::setfill('0') << std::hex << (total & 0x0FFFF) << cmd; streamed << " " << conversationId; streamed << nullSeparator << HOST_TAG << deviceId << nullSeparator; @@ -180,18 +180,16 @@ P2PSubTransportAction(git_smart_subtransport_stream** out, auto conversationId = gitUrl.substr(delim + 1, gitUrl.size()); if (action == GIT_SERVICE_UPLOADPACK_LS) { - auto gitSocket = jami::Manager::instance().gitSocket(std::string(accountId), - std::string(deviceId), - std::string(conversationId)); - if (gitSocket == std::nullopt) { - JAMI_ERR("Can't find related socket for %s, %s, %s", - std::string(accountId).c_str(), - std::string(deviceId).c_str(), - std::string(conversationId).c_str()); + auto gitSocket = jami::Manager::instance().gitSocket(accountId, deviceId, conversationId); + if (!gitSocket) { + JAMI_ERROR("Can't find related socket for {:s}, {:s}, {:s}", + accountId, + deviceId, + conversationId); return -1; } auto stream = std::make_unique<P2PStream>(); - stream->socket = *gitSocket; + stream->socket = gitSocket; stream->base.read = P2PStreamRead; stream->base.write = P2PStreamWrite; stream->base.free = P2PStreamFree; diff --git a/src/jamidht/account_manager.h b/src/jamidht/account_manager.h index a56b963295..dffea2d560 100644 --- a/src/jamidht/account_manager.h +++ b/src/jamidht/account_manager.h @@ -21,7 +21,6 @@ #include "config.h" #endif -#include "jamidht/conversation.h" #include "contact_list.h" #include "logger.h" #if HAVE_RINGNS diff --git a/src/jamidht/conversation.cpp b/src/jamidht/conversation.cpp index fdf796cc4e..f62fc51f41 100644 --- a/src/jamidht/conversation.cpp +++ b/src/jamidht/conversation.cpp @@ -28,7 +28,8 @@ #include <json/json.h> #include <string_view> #include <opendht/thread_pool.h> - +#include <tuple> +#include "swarm/swarm_manager.h" #ifdef ENABLE_PLUGIN #include "manager.h" #include "plugin/jamipluginmanager.h" @@ -124,9 +125,9 @@ public: Impl(const std::weak_ptr<JamiAccount>& account, ConversationMode mode, const std::string& otherMember = "") - : account_(account) + : repository_(ConversationRepository::createConversation(account, mode, otherMember)) + , account_(account) { - repository_ = ConversationRepository::createConversation(account, mode, otherMember); if (!repository_) { throw std::logic_error("Couldn't create repository"); } @@ -170,6 +171,7 @@ public: void init() { if (auto shared = account_.lock()) { + swarmManager_ = std::make_shared<SwarmManager>(NodeId(shared->currentDeviceId())); accountId_ = shared->getAccountID(); transferManager_ = std::make_shared<TransferManager>(shared->getAccountID(), repository_->id()); @@ -205,9 +207,6 @@ public: bool isAdmin() const; std::string repoPath() const; - // Protect against parallel commits in the repo - // As the index can add files to the commit we want. - std::mutex writeMtx_ {}; void announce(const std::string& commitId) const { std::vector<std::string> vec; @@ -418,10 +417,7 @@ public: } if (announceMember) { - std::vector<std::string> members; - for (const auto& m : repository_->members()) - members.emplace_back(m.uri); - shared->saveMembers(convId, members); + shared->saveMembers(convId, repository_->memberUris("", {})); } } } @@ -524,27 +520,53 @@ public: msgpack::pack(file, hostedCalls_); } - void voteUnban(const std::string& contactUri, const std::string& type, const OnDoneCb& cb); + void voteUnban(const std::string& contactUri, const std::string_view type, const OnDoneCb& cb); - std::string bannedType(std::string_view uri) const + std::vector<std::map<std::string, std::string>> getMembers(bool includeInvited, + bool includeLeft, bool includeBanned) const; + + std::string_view bannedType(const std::string& uri) const { auto bannedMember = fmt::format("{}/banned/members/{}.crt", repoPath(), uri); if (fileutils::isFile(bannedMember)) - return "members"; + return "members"sv; auto bannedAdmin = fmt::format("{}/banned/admins/{}.crt", repoPath(), uri); if (fileutils::isFile(bannedAdmin)) - return "admins"; + return "admins"sv; auto bannedInvited = fmt::format("{}/banned/invited/{}", repoPath(), uri); if (fileutils::isFile(bannedInvited)) - return "invited"; + return "invited"sv; auto bannedDevice = fmt::format("{}/banned/devices/{}.crt", repoPath(), uri); if (fileutils::isFile(bannedDevice)) - return "devices"; + return "devices"sv; return {}; } + std::shared_ptr<ChannelSocket> gitSocket(const DeviceId& deviceId) const + { + auto deviceSockets = gitSocketList_.find(deviceId); + return (deviceSockets != gitSocketList_.end()) ? deviceSockets->second : nullptr; + } + + void addGitSocket(const DeviceId& deviceId, const std::shared_ptr<ChannelSocket>& socket) + { + gitSocketList_[deviceId] = socket; + } + void removeGitSocket(const DeviceId& deviceId) + { + auto deviceSockets = gitSocketList_.find(deviceId); + if (deviceSockets != gitSocketList_.end()) + gitSocketList_.erase(deviceSockets); + } + std::vector<std::map<std::string, std::string>> getMembers(bool includeInvited, - bool includeLeft, bool includeBanned) const; + bool includeLeft) const; + + std::shared_ptr<SwarmManager> swarmManager_; + std::set<std::string> checkedMembers_; // Store members we tried + std::function<void()> bootstrapCb_; + + std::mutex writeMtx_ {}; std::unique_ptr<ConversationRepository> repository_; std::weak_ptr<JamiAccount> account_; std::atomic_bool isRemoving_ {false}; @@ -580,6 +602,8 @@ public: std::string activeCallsPath_ {}; mutable std::mutex activeCallsMtx_ {}; mutable std::vector<std::map<std::string, std::string>> activeCalls_ {}; + + GitSocketList gitSocketList_ {}; }; bool @@ -750,7 +774,7 @@ Conversation::addMember(const std::string& contactUri, const OnDoneCb& cb) [w = weak(), contactUri = std::move(contactUri), cb = std::move(cb)] { if (auto sthis = w.lock()) { auto members = sthis->pimpl_->repository_->members(); - std::string type = sthis->pimpl_->bannedType(contactUri); + auto type = sthis->pimpl_->bannedType(contactUri); if (type.empty()) { cb(false, {}); return; @@ -778,9 +802,37 @@ Conversation::addMember(const std::string& contactUri, const OnDoneCb& cb) }); } +std::shared_ptr<ChannelSocket> +Conversation::gitSocket(const DeviceId& deviceId) const +{ + return pimpl_->gitSocket(deviceId); +} +/*bool +Conversation::hasGitSocket(const DeviceId& deviceId) const +{ + return pimpl_->hasGitSocket(deviceId); +}*/ +void +Conversation::addGitSocket(const DeviceId& deviceId, const std::shared_ptr<ChannelSocket>& socket) +{ + pimpl_->addGitSocket(deviceId, socket); +} + +void +Conversation::removeGitSocket(const DeviceId& deviceId) +{ + pimpl_->removeGitSocket(deviceId); +} + +void +Conversation::removeGitSockets() +{ + pimpl_->gitSocketList_.clear(); +} + void Conversation::Impl::voteUnban(const std::string& contactUri, - const std::string& type, + const std::string_view type, const OnDoneCb& cb) { // Check if admin @@ -896,6 +948,19 @@ Conversation::memberUris(std::string_view filter, const std::set<MemberRole>& fi return pimpl_->repository_->memberUris(filter, filteredRoles); } +std::vector<NodeId> +Conversation::peersToSyncWith() const +{ + const auto& routingTable = pimpl_->swarmManager_->getRoutingTable(); + const auto& nodes = routingTable.getNodes(); + const auto& mobiles = routingTable.getMobileNodes(); + std::vector<NodeId> s; + s.reserve(nodes.size() + mobiles.size()); + s.insert(s.end(), nodes.begin(), nodes.end()); + s.insert(s.end(), mobiles.begin(), mobiles.end()); + return s; +} + std::string Conversation::join() { @@ -1616,6 +1681,22 @@ Conversation::displayed() const return {}; } +void +Conversation::bootstrap(std::function<void()> onBootstraped) +{ + if (!pimpl_ || !pimpl_->repository_ || !pimpl_->swarmManager_) + return; + pimpl_->bootstrapCb_ = std::move(onBootstraped); + std::vector<DeviceId> devices; + for (const auto& m : pimpl_->repository_->devices()) + devices.insert(devices.end(), m.second.begin(), m.second.end()); + // Add known devices + JAMI_DEBUG("[SwarmManager {}] Bootstrap with {} devices", + fmt::ptr(pimpl_->swarmManager_.get()), + devices.size()); + pimpl_->swarmManager_->setKnownNodes(devices); +} + std::vector<std::string> Conversation::refreshActiveCalls() { @@ -1629,6 +1710,20 @@ Conversation::onLastDisplayedUpdated( pimpl_->lastDisplayedUpdatedCb_ = std::move(lastDisplayedUpdatedCb); } +void +Conversation::onNeedSocket(NeedSocketCb needSocket) +{ + pimpl_->swarmManager_->needSocketCb_ = [needSocket = std::move(needSocket), + this](const std::string& deviceId, ChannelCb&& cb) { + return needSocket(id(), deviceId, std::move(cb), "application/im-gitmessage-id"); + }; +} +void +Conversation::addSwarmChannel(std::shared_ptr<ChannelSocket> channel) +{ + pimpl_->swarmManager_->addChannel(std::move(channel)); +} + uint32_t Conversation::countInteractions(const std::string& toId, const std::string& fromId, diff --git a/src/jamidht/conversation.h b/src/jamidht/conversation.h index ee267d7c3b..10dd08b451 100644 --- a/src/jamidht/conversation.h +++ b/src/jamidht/conversation.h @@ -20,8 +20,8 @@ #pragma once #include "jamidht/conversationrepository.h" -#include "jami/datatransfer_interface.h" #include "conversationrepository.h" +#include "swarm/swarm_protocol.h" #include <json/json.h> #include <msgpack.hpp> @@ -117,6 +117,11 @@ using OnLoadMessages using OnCommitCb = std::function<void(const std::string&)>; using OnDoneCb = std::function<void(bool, const std::string&)>; using OnMultiDoneCb = std::function<void(const std::vector<std::string>&)>; +using DeviceId = dht::PkId; +using GitSocketList = std::map<DeviceId, std::shared_ptr<ChannelSocket>>; +using ChannelCb = std::function<bool(const std::shared_ptr<ChannelSocket>&)>; +using NeedSocketCb + = std::function<void(const std::string&, const std::string&, ChannelCb&&, const std::string&)>; class Conversation : public std::enable_shared_from_this<Conversation> { @@ -130,6 +135,12 @@ public: const std::string& conversationId); ~Conversation(); + /** + * Bootstrap swarm manager to other peers + * @param onBootstrapped Callback called when connection is successfully established + */ + void bootstrap(std::function<void()> onBootstraped); + /** * Refresh active calls. * @note: If the host crash during a call, when initializing, we need to update @@ -146,6 +157,21 @@ public: void onLastDisplayedUpdated( std::function<void(const std::string&, const std::string&)>&& lastDisplayedUpdatedCb); + /** + * Set the callback that will be called whenever a new socket will be needed + * @param cb + */ + void onNeedSocket(NeedSocketCb cb); + /** + * Add swarm connection to the DRT + * @param channel Related channel + */ + void addSwarmChannel(std::shared_ptr<ChannelSocket> channel); + + /** + * Get conversation's id + * @return conversation Id + */ std::string id() const; // Member management @@ -183,6 +209,24 @@ public: MemberRole::LEFT, MemberRole::BANNED}) const; + /** + * Get peers to sync with. This is mostly managed by the DRT + * @return some mobile nodes and all connected nodes + */ + std::vector<NodeId> peersToSyncWith() const; + /** + * Check if we're at least connected to one node + * @return if the DRT is connected + */ + bool isBoostraped() const; + /** + * Retrieve the uri from a deviceId + * @note used by swarm manager (peersToSyncWith) + * @param deviceId + * @return corresponding issuer + */ + std::string uriFromDevice(const std::string& deviceId) const; + /** * Join a conversation * @return commit id to send @@ -442,6 +486,11 @@ public: */ std::vector<std::map<std::string, std::string>> currentCalls() const; + std::shared_ptr<ChannelSocket> gitSocket(const DeviceId& deviceId) const; + void addGitSocket(const DeviceId& deviceId, const std::shared_ptr<ChannelSocket>& socket); + void removeGitSocket(const DeviceId& deviceId); + void removeGitSockets(); + private: std::shared_ptr<Conversation> shared() { diff --git a/src/jamidht/conversation_channel_handler.cpp b/src/jamidht/conversation_channel_handler.cpp index 20f9bd00bf..ca0405cb4a 100644 --- a/src/jamidht/conversation_channel_handler.cpp +++ b/src/jamidht/conversation_channel_handler.cpp @@ -38,26 +38,26 @@ ConversationChannelHandler::connect(const DeviceId& deviceId, { connectionManager_.connectDevice(deviceId, "git://" + deviceId.toString() + "/" + channelName, - [cb = std::move(cb)](std::shared_ptr<ChannelSocket> socket, - const DeviceId& dev) { - if (cb) - cb(socket, dev); - }); + std::move(cb)); } bool -ConversationChannelHandler::onRequest(const std::shared_ptr<dht::crypto::Certificate>&, +ConversationChannelHandler::onRequest(const std::shared_ptr<dht::crypto::Certificate>& cert, const std::string& name) { + auto acc = account_.lock(); + if (!cert || !cert->issuer || !acc) + return false; // Pre-check before acceptance. Sometimes, another device can start a conversation // which is still not synced. So, here we decline channel's request in this case // to avoid the other device to want to sync with us if we are not ready. auto sep = name.find_last_of('/'); auto conversationId = name.substr(sep + 1); - auto remoteDevice = name.substr(6, sep - 6); + if (auto acc = account_.lock()) if (auto convModule = acc->convModule()) { - auto res = !convModule->isBannedDevice(conversationId, remoteDevice); + auto res = !convModule->isBannedDevice(conversationId, + cert->issuer->getLongId().toString()); return res; } return false; diff --git a/src/jamidht/conversation_module.cpp b/src/jamidht/conversation_module.cpp index 5cc3e77a8d..d1d8736aca 100644 --- a/src/jamidht/conversation_module.cpp +++ b/src/jamidht/conversation_module.cpp @@ -20,6 +20,7 @@ #include "conversation_module.h" +#include <algorithm> #include <fstream> #include <opendht/thread_pool.h> @@ -47,6 +48,7 @@ struct PendingConversationFetch std::map<std::string, std::string> preferences {}; std::map<std::string, std::string> lastDisplayed {}; std::set<std::string> connectingTo {}; + std::shared_ptr<ChannelSocket> socket {}; }; class ConversationModule::Impl : public std::enable_shared_from_this<Impl> @@ -56,6 +58,7 @@ public: NeedsSyncingCb&& needsSyncingCb, SengMsgCb&& sendMsgCb, NeedSocketCb&& onNeedSocket, + NeedSocketCb&& onNeedSwarmSocket, UpdateConvReq&& updateConvReqCb); // Retrieving recent commits @@ -119,13 +122,16 @@ public: * @param conversation * @param commit * @param sync If we send an update to other account's devices + * @param deviceId If we need to filter a specific device */ void sendMessageNotification(const std::string& conversationId, - const std::string& commitId, - bool sync); - void sendMessageNotification(const Conversation& conversation, - const std::string& commitId, - bool sync); + bool sync, + const std::string& commitId = "", + const std::string& deviceId = ""); + void sendMessageNotification(Conversation& conversation, + bool sync, + const std::string& commitId = "", + const std::string& deviceId = ""); /** * @return if a convId is a valid conversation (repository cloned & usable) @@ -179,6 +185,7 @@ public: NeedsSyncingCb needsSyncingCb_; SengMsgCb sendMsgCb_; NeedSocketCb onNeedSocket_; + NeedSocketCb onNeedSwarmSocket_; UpdateConvReq updateConvReqCb_; std::string accountId_ {}; @@ -191,9 +198,9 @@ public: // Conversations mutable std::mutex conversationsMtx_ {}; - std::map<std::string, std::shared_ptr<Conversation>> conversations_; + std::map<std::string, std::shared_ptr<Conversation>, std::less<>> conversations_; std::mutex pendingConversationsFetchMtx_ {}; - std::map<std::string, PendingConversationFetch> pendingConversationsFetch_; + std::map<std::string, PendingConversationFetch, std::less<>> pendingConversationsFetch_; bool startFetch(const std::string& convId, const std::string& deviceId) { @@ -245,6 +252,8 @@ public: const std::string& newBody, const std::string& editedId); + void boostrapCb(const std::string& convId); + // The following informations are stored on the disk mutable std::mutex convInfosMtx_; // Note, should be locked after conversationsMtx_ if needed std::map<std::string, ConvInfo> convInfos_; @@ -283,6 +292,13 @@ public: // Receiving new commits std::shared_ptr<RepeatedTask> conversationsEventHandler {}; + // When sending a new message, we need to send the notification to some peers of the + // conversation However, the conversation may be not bootstraped, so the list will be empty. + // notSyncedNotification_ will store the notifiaction to announce until we have peers to sync + // with. + std::mutex notSyncedNotificationMtx_; + std::map<std::string, std::string> notSyncedNotification_; + std::weak_ptr<Impl> weak() { return std::static_pointer_cast<Impl>(shared_from_this()); } // Replay conversations (after erasing/re-adding) @@ -290,17 +306,23 @@ public: std::map<std::string, std::vector<std::map<std::string, std::string>>> replay_; std::map<std::string, uint64_t> refreshMessage; std::atomic_int syncCnt {0}; + +#ifdef LIBJAMI_TESTABLE + std::function<void(std::string, Conversation::BootstrapStatus)> bootstrapCbTest_; +#endif }; ConversationModule::Impl::Impl(std::weak_ptr<JamiAccount>&& account, NeedsSyncingCb&& needsSyncingCb, SengMsgCb&& sendMsgCb, NeedSocketCb&& onNeedSocket, + NeedSocketCb&& onNeedSwarmSocket, UpdateConvReq&& updateConvReqCb) : account_(account) , needsSyncingCb_(needsSyncingCb) , sendMsgCb_(sendMsgCb) , onNeedSocket_(onNeedSocket) + , onNeedSwarmSocket_(onNeedSwarmSocket) , updateConvReqCb_(updateConvReqCb) { if (auto shared = account.lock()) { @@ -349,8 +371,8 @@ ConversationModule::Impl::cloneConversation(const std::string& deviceId, if (channel) { pending.ready = true; pending.deviceId = channel->deviceId().toString(); + pending.socket = channel; lk.unlock(); - acc->addGitSocket(channel->deviceId(), convId, channel); checkConversationsEvents(); return true; } else { @@ -397,8 +419,13 @@ ConversationModule::Impl::fetchNewCommits(const std::string& peer, deviceId.c_str()); std::unique_lock<std::mutex> lk(conversationsMtx_); + auto conversation = conversations_.find(conversationId); if (conversation != conversations_.end() && conversation->second) { + // Check if we already have the commit + if (not commitId.empty() && conversation->second->getCommit(commitId) != std::nullopt) { + return; + } if (!conversation->second->isMember(peer, true)) { JAMI_WARN("[Account %s] %s is not a member of %s", accountId_.c_str(), @@ -427,6 +454,7 @@ ConversationModule::Impl::fetchNewCommits(const std::string& peer, conversationId.c_str()); return; } + syncCnt.fetch_add(1); onNeedSocket_( conversationId, @@ -445,7 +473,8 @@ ConversationModule::Impl::fetchNewCommits(const std::string& peer, syncCnt.fetch_sub(1); return false; } - acc->addGitSocket(channel->deviceId(), conversationId, channel); + conversation->second->addGitSocket(channel->deviceId(), channel); + conversation->second->sync( peer, deviceId, @@ -470,6 +499,10 @@ ConversationModule::Impl::fetchNewCommits(const std::string& peer, std::lock_guard<std::mutex> lk(pendingConversationsFetchMtx_); pendingConversationsFetch_.erase(conversationId); } + // Notify peers that a new commit is there (DRT) + if (not commitId.empty()) { + sendMessageNotification(conversationId, false, commitId, deviceId); + } if (syncCnt.fetch_sub(1) == 1) { if (auto account = account_.lock()) emitSignal<libjami::ConversationSignal::ConversationSyncFinished>( @@ -504,6 +537,7 @@ ConversationModule::Impl::fetchNewCommits(const std::string& peer, accountId_.c_str(), conversationId.c_str()); sendMsgCb_(peer, + {}, std::map<std::string, std::string> {{"application/invite", conversationId}}, 0); } @@ -533,6 +567,9 @@ void ConversationModule::Impl::handlePendingConversation(const std::string& conversationId, const std::string& deviceId) { + auto acc = account_.lock(); + if (!acc) + return; auto erasePending = [&] { std::lock_guard<std::mutex> lk(pendingConversationsFetchMtx_); auto oldFetch = pendingConversationsFetch_.find(conversationId); @@ -541,7 +578,13 @@ ConversationModule::Impl::handlePendingConversation(const std::string& conversat pendingConversationsFetch_.erase(conversationId); }; try { - auto conversation = std::make_shared<Conversation>(account_, deviceId, conversationId); + auto conversation = std::make_shared<Conversation>(acc, deviceId, conversationId); + { + std::lock_guard<std::mutex> lk(pendingConversationsFetchMtx_); + auto oldFetch = pendingConversationsFetch_.find(conversationId); + if (oldFetch != pendingConversationsFetch_.end() && oldFetch->second.socket) + conversation->addGitSocket(DeviceId(deviceId), std::move(oldFetch->second.socket)); + } conversation->onLastDisplayedUpdated( [&](auto convId, auto lastId) { onLastDisplayedUpdated(convId, lastId); }); if (!conversation->isMember(username_, true)) { @@ -550,6 +593,12 @@ ConversationModule::Impl::handlePendingConversation(const std::string& conversat erasePending(); return; } + conversation->onNeedSocket(onNeedSwarmSocket_); +#ifdef LIBJAMI_TESTABLE + conversation->onBootstrapStatus(bootstrapCbTest_); +#endif // LIBJAMI_TESTABLE + conversation->bootstrap( + std::bind(&ConversationModule::Impl::boostrapCb, this, conversation->id())); auto removeRepo = false; { std::lock_guard<std::mutex> lk(conversationsMtx_); @@ -579,7 +628,7 @@ ConversationModule::Impl::handlePendingConversation(const std::string& conversat } } if (!commitId.empty()) - sendMessageNotification(conversationId, commitId, false); + sendMessageNotification(conversationId, false, commitId); std::map<std::string, std::string> preferences; std::map<std::string, std::string> lastDisplayed; { @@ -615,8 +664,8 @@ ConversationModule::Impl::handlePendingConversation(const std::string& conversat auto shared = w.lock(); if (shared and not commits.empty()) shared->sendMessageNotification(conversationId, - *commits.rbegin(), - true); + true, + *commits.rbegin()); }); // Download members profile on first sync auto isOneOne = conversation->mode() == ConversationMode::ONE_TO_ONE; @@ -681,7 +730,8 @@ ConversationModule::Impl::getOneToOneConversation(const std::string& uri) const } std::vector<std::map<std::string, std::string>> -ConversationModule::Impl::getConversationMembers(const std::string& conversationId, bool includeBanned) const +ConversationModule::Impl::getConversationMembers(const std::string& conversationId, + bool includeBanned) const { std::unique_lock<std::mutex> lk(conversationsMtx_); auto conversation = conversations_.find(conversationId); @@ -777,7 +827,7 @@ ConversationModule::Impl::removeConversation(const std::string& conversationId) // Commit that we left if (!commitId.empty()) { // Do not sync as it's synched by convInfos - sendMessageNotification(*it->second, commitId, false); + sendMessageNotification(*it->second, false, commitId); } else { JAMI_ERR("Failed to send message to conversation %s", conversationId.c_str()); } @@ -800,34 +850,100 @@ ConversationModule::Impl::removeConversation(const std::string& conversationId) void ConversationModule::Impl::sendMessageNotification(const std::string& conversationId, + bool sync, const std::string& commitId, - bool sync) + const std::string& deviceId) { std::lock_guard<std::mutex> lk(conversationsMtx_); auto it = conversations_.find(conversationId); if (it != conversations_.end() && it->second) { - sendMessageNotification(*it->second, commitId, sync); + sendMessageNotification(*it->second, sync, commitId, deviceId); } } void -ConversationModule::Impl::sendMessageNotification(const Conversation& conversation, +ConversationModule::Impl::sendMessageNotification(Conversation& conversation, + bool sync, const std::string& commitId, - bool sync) + const std::string& deviceId) { + auto acc = account_.lock(); + if (!acc) + return; Json::Value message; + auto commit = commitId == "" ? conversation.lastCommitId() : commitId; message["id"] = conversation.id(); - message["commit"] = commitId; + message["commit"] = commit; message["deviceId"] = deviceId_; Json::StreamWriterBuilder builder; const auto text = Json::writeString(builder, message); - for (const auto& member : conversation.memberUris(sync ? "" : username_)) { - // Announce to all members that a new message is sent + + // Send message notification will announce the new commit in 3 steps. + + // First, because our account can have several devices, announce to other devices + if (sync) { + // Announce to our devices + refreshMessage[username_] = sendMsgCb_(username_, + {}, + std::map<std::string, std::string> { + {"application/im-gitmessage-id", text}}, + refreshMessage[username_]); + } + + // Then, we announce to 2 random members in the conversation that aren't in the DRT + // This allow new devices without the ability to sync to their other devices to sync with us. + // Or they can also use an old backup. + std::vector<std::string> nonConnectedMembers; + std::vector<NodeId> devices; + { + std::lock_guard<std::mutex> lk(notSyncedNotificationMtx_); + devices = conversation.peersToSyncWith(); + auto members = conversation.memberUris(username_, {}); + std::vector<std::string> connectedMembers; + for (const auto& device : devices) { + auto cert = tls::CertificateStore::instance().getCertificate(device.toString()); + if (cert && cert->issuer) + connectedMembers.emplace_back(cert->issuer->getId().toString()); + } + std::sort(std::begin(members), std::end(members)); + std::sort(std::begin(connectedMembers), std::end(connectedMembers)); + std::set_difference(members.begin(), + members.end(), + connectedMembers.begin(), + connectedMembers.end(), + std::inserter(nonConnectedMembers, nonConnectedMembers.begin())); + std::shuffle(nonConnectedMembers.begin(), nonConnectedMembers.end(), acc->rand); + if (nonConnectedMembers.size() > 2) + nonConnectedMembers.resize(2); + if (!conversation.isBoostraped()) { + JAMI_DEBUG("[Conversation {}] Not yet bootstraped, save notification", + conversation.id()); + // Because we can get some git channels but not bootstraped, we should keep this + // to refresh when bootstraped. + notSyncedNotification_[conversation.id()] = commit; + } + } + + for (const auto& member : nonConnectedMembers) { refreshMessage[member] = sendMsgCb_(member, + {}, std::map<std::string, std::string> { {"application/im-gitmessage-id", text}}, refreshMessage[member]); } + + // Finally we send to devices that the DRT choose. + for (const auto& device : devices) { + auto deviceIdStr = device.toString(); + auto memberUri = conversation.uriFromDevice(deviceIdStr); + if (memberUri.empty() || deviceIdStr == deviceId) + continue; + refreshMessage[deviceIdStr] = sendMsgCb_(memberUri, + device, + std::map<std::string, std::string> { + {"application/im-gitmessage-id", text}}, + refreshMessage[deviceIdStr]); + } } void @@ -872,7 +988,7 @@ ConversationModule::Impl::sendMessage(const std::string& conversationId, if (!announce) return; if (ok) - sendMessageNotification(conversationId, commitId, true); + sendMessageNotification(conversationId, true, commitId); else JAMI_ERR("Failed to send message to conversation %s", conversationId.c_str()); }); @@ -909,6 +1025,25 @@ ConversationModule::Impl::editMessage(const std::string& conversationId, sendMessage(conversationId, std::move(json)); } +void +ConversationModule::Impl::boostrapCb(const std::string& convId) +{ + std::string commitId; + { + std::lock_guard<std::mutex> lk(notSyncedNotificationMtx_); + auto it = notSyncedNotification_.find(convId); + if (it != notSyncedNotification_.end()) { + commitId = it->second; + notSyncedNotification_.erase(it); + } + } + JAMI_DEBUG("[Conversation {}] Resend last message notification", convId); + dht::ThreadPool::io().run([w = weak(), convId, commitId] { + if (auto sthis = w.lock()) + sthis->sendMessageNotification(convId, true, commitId); + }); +} + //////////////////////////////////////////////////////////////// void @@ -951,16 +1086,31 @@ ConversationModule::ConversationModule(std::weak_ptr<JamiAccount>&& account, NeedsSyncingCb&& needsSyncingCb, SengMsgCb&& sendMsgCb, NeedSocketCb&& onNeedSocket, + NeedSocketCb&& onNeedSwarmSocket, UpdateConvReq&& updateConvReqCb) : pimpl_ {std::make_unique<Impl>(std::move(account), std::move(needsSyncingCb), std::move(sendMsgCb), std::move(onNeedSocket), + std::move(onNeedSwarmSocket), std::move(updateConvReqCb))} { loadConversations(); } +#ifdef LIBJAMI_TESTABLE +void +ConversationModule::onBootstrapStatus( + const std::function<void(std::string, Conversation::BootstrapStatus)>& cb) +{ + pimpl_->bootstrapCbTest_ = cb; + std::unique_lock<std::mutex> lk(pimpl_->conversationsMtx_); + for (auto& [_, conv] : pimpl_->conversations_) + if (conv) + conv->onBootstrapStatus(pimpl_->bootstrapCbTest_); +} +#endif + void ConversationModule::loadConversations() { @@ -978,7 +1128,10 @@ ConversationModule::loadConversations() std::set<std::string> toRm; for (const auto& repository : conversationsRepositories) { try { - auto conv = std::make_shared<Conversation>(pimpl_->account_, repository); + auto conv = std::make_shared<Conversation>(acc, repository); + conv->onLastDisplayedUpdated( + [&](auto convId, auto lastId) { pimpl_->onLastDisplayedUpdated(convId, lastId); }); + conv->onNeedSocket(pimpl_->onNeedSwarmSocket_); auto members = conv->memberUris(uri, {}); // NOTE: The following if is here to protect against any incorrect state // that can be introduced @@ -1020,7 +1173,7 @@ ConversationModule::loadConversations() // Note: here, this means that some calls were actives while the // daemon finished (can be a crash). // Notify other in the conversation that the call is finished - pimpl_->sendMessageNotification(*conv, *commits.rbegin(), true); + pimpl_->sendMessageNotification(*conv, true, *commits.rbegin()); } pimpl_->conversations_.emplace(repository, std::move(conv)); } catch (const std::logic_error& e) { @@ -1074,14 +1227,14 @@ ConversationModule::loadConversations() std::vector<std::string> invalidPendingRequests; { std::lock_guard<std::mutex> lk(pimpl_->conversationsRequestsMtx_); - for (const auto& request: acc->getTrustRequests()) { + for (const auto& request : acc->getTrustRequests()) { auto itConvId = request.find(libjami::Account::TrustRequest::CONVERSATIONID); auto itConvFrom = request.find(libjami::Account::TrustRequest::FROM); - if (itConvId != request.end() && itConvFrom != request.end()) - { + if (itConvId != request.end() && itConvFrom != request.end()) { // Check if requests exists or is declined. auto itReq = pimpl_->conversationsRequests_.find(itConvId->second); - auto declined = itReq == pimpl_->conversationsRequests_.end() || itReq->second.declined; + auto declined = itReq == pimpl_->conversationsRequests_.end() + || itReq->second.declined; if (declined) { JAMI_WARNING("Invalid trust request found: {:s}", itConvId->second); invalidPendingRequests.emplace_back(itConvFrom->second); @@ -1089,16 +1242,17 @@ ConversationModule::loadConversations() } } } - dht::ThreadPool::io().run([w=pimpl_->weak(), invalidPendingRequests = std::move(invalidPendingRequests)] () { - // Will lock account manager - auto shared = w.lock(); - if (!shared) - return; - if (auto acc = shared->account_.lock()) { - for (const auto& invalidPendingRequest : invalidPendingRequests) - acc->discardTrustRequest(invalidPendingRequest); - } - }); + dht::ThreadPool::io().run( + [w = pimpl_->weak(), invalidPendingRequests = std::move(invalidPendingRequests)]() { + // Will lock account manager + auto shared = w.lock(); + if (!shared) + return; + if (auto acc = shared->account_.lock()) { + for (const auto& invalidPendingRequest : invalidPendingRequests) + acc->discardTrustRequest(invalidPendingRequest); + } + }); //////////////////////////////////////////////////////////////// for (const auto& conv : toRm) { @@ -1108,6 +1262,31 @@ ConversationModule::loadConversations() JAMI_INFO("[Account %s] Conversations loaded!", pimpl_->accountId_.c_str()); } +void +ConversationModule::bootstrap() +{ + std::unique_lock<std::mutex> lk(pimpl_->conversationsMtx_); + for (auto& [_, conv] : pimpl_->conversations_) { + if (conv) { +#ifdef LIBJAMI_TESTABLE + conv->onBootstrapStatus(pimpl_->bootstrapCbTest_); +#endif // LIBJAMI_TESTABLE + conv->bootstrap( + std::bind(&ConversationModule::Impl::boostrapCb, pimpl_.get(), conv->id())); + } + } +} +void +ConversationModule::monitor() +{ + std::unique_lock<std::mutex> lk(pimpl_->conversationsMtx_); + for (auto& [_, conv] : pimpl_->conversations_) { + if (conv) { + conv->monitor(); + } + } +} + void ConversationModule::clearPendingFetch() { @@ -1179,8 +1358,8 @@ ConversationModule::onTrustRequest(const std::string& uri, } if (pimpl_->isAcceptedConversation(conversationId)) { JAMI_INFO("[Account %s] Received a request for a conversation " - "already handled. Ignore", - pimpl_->accountId_.c_str()); + "already handled. Ignore", + pimpl_->accountId_.c_str()); return; } if (pimpl_->getRequest(conversationId) != std::nullopt) { @@ -1268,7 +1447,7 @@ ConversationModule::onNeedConversationRequest(const std::string& from, auto invite = itConv->second->generateInvitation(); lk.unlock(); JAMI_DBG("%s is asking a new invite for %s", from.c_str(), conversationId.c_str()); - pimpl_->sendMsgCb_(from, std::move(invite), 0); + pimpl_->sendMsgCb_(from, {}, std::move(invite), 0); } } @@ -1306,12 +1485,21 @@ ConversationModule::declineConversationRequest(const std::string& conversationId std::string ConversationModule::startConversation(ConversationMode mode, const std::string& otherMember) { + auto acc = pimpl_->account_.lock(); + if (!acc) + return {}; // Create the conversation object std::shared_ptr<Conversation> conversation; try { - conversation = std::make_shared<Conversation>(pimpl_->account_, mode, otherMember); + conversation = std::make_shared<Conversation>(acc, mode, otherMember); conversation->onLastDisplayedUpdated( [&](auto convId, auto lastId) { pimpl_->onLastDisplayedUpdated(convId, lastId); }); + conversation->onNeedSocket(pimpl_->onNeedSwarmSocket_); +#ifdef LIBJAMI_TESTABLE + conversation->onBootstrapStatus(pimpl_->bootstrapCbTest_); +#endif // LIBJAMI_TESTABLE + conversation->bootstrap( + std::bind(&ConversationModule::Impl::boostrapCb, pimpl_.get(), conversation->id())); } catch (const std::exception& e) { JAMI_ERR("[Account %s] Error while generating a conversation %s", pimpl_->accountId_.c_str(), @@ -1357,13 +1545,14 @@ ConversationModule::cloneConversationFrom(const std::string& conversationId, auto deviceId = pk->getLongId().toString(); if (!sthis or deviceId == sthis->deviceId_) return; - if (!sthis->startFetch(conversationId, deviceId)) { JAMI_WARN("[Account %s] Already fetching %s", sthis->accountId_.c_str(), conversationId.c_str()); return; } + + // We need a onNeedSocket_ with old logic. sthis->onNeedSocket_( conversationId, pk->getLongId().toString(), @@ -1377,11 +1566,8 @@ ConversationModule::cloneConversationFrom(const std::string& conversationId, if (channel) { pending.ready = true; pending.deviceId = channel->deviceId().toString(); + pending.socket = channel; lk.unlock(); - // Save the git socket - acc->addGitSocket(channel->deviceId(), - conversationId, - channel); sthis->checkConversationsEvents(); return true; } else { @@ -1611,8 +1797,14 @@ ConversationModule::syncConversations(const std::string& peer, const std::string for (const auto& [key, ci] : pimpl_->convInfos_) { auto it = pimpl_->conversations_.find(key); if (it != pimpl_->conversations_.end() && it->second) { - if (!it->second->isRemoving() && it->second->isMember(peer, false)) + if (!it->second->isRemoving() && it->second->isMember(peer, false)) { toFetch.emplace(key); + if (!it->second->hasSwarmChannel(deviceId)) { + if (auto acc = pimpl_->account_.lock()) { + // TODO connect to Swarm + } + } + } } else if (!ci.removed && std::find(ci.members.begin(), ci.members.end(), peer) != ci.members.end()) { @@ -1692,18 +1884,18 @@ ConversationModule::onSyncData(const SyncMsg& msg, if (req.declined != 0) { // Request declined JAMI_LOG("[Account {:s}] Declined request detected for conversation {:s} (device {:s})", - pimpl_->accountId_, - convId, - deviceId); + pimpl_->accountId_, + convId, + deviceId); emitSignal<libjami::ConversationSignal::ConversationRequestDeclined>(pimpl_->accountId_, convId); continue; } JAMI_LOG("[Account {:s}] New request detected for conversation {:s} (device {:s})", - pimpl_->accountId_, - convId, - deviceId); + pimpl_->accountId_, + convId, + deviceId); emitSignal<libjami::ConversationSignal::ConversationRequestReceived>(pimpl_->accountId_, convId, @@ -1795,19 +1987,20 @@ ConversationModule::onNewCommit(const std::string& peer, // it means that the contact was removed but not banned. So we can generate // a new trust request JAMI_WARNING("[Account {:s}] Could not find conversation {:s}, ask for an invite", - pimpl_->accountId_, - conversationId); + pimpl_->accountId_, + conversationId); pimpl_->sendMsgCb_(peer, + {}, std::map<std::string, std::string> { {"application/invite", conversationId}}, 0); return; } JAMI_DEBUG("[Account {:s}] on new commit notification from {:s}, for {:s}, commit {:s}", - pimpl_->accountId_, - peer, - conversationId, - commitId); + pimpl_->accountId_, + peer, + conversationId, + commitId); lk.unlock(); pimpl_->fetchNewCommits(peer, deviceId, conversationId, commitId); } @@ -1826,14 +2019,12 @@ ConversationModule::addConversationMember(const std::string& conversationId, } if (it->second->isMember(contactUri, true)) { - JAMI_DEBUG("{:s} is already a member of {:s}, resend invite", - contactUri, - conversationId); + JAMI_DEBUG("{:s} is already a member of {:s}, resend invite", contactUri, conversationId); // Note: This should not be necessary, but if for whatever reason the other side didn't join // we should not forbid new invites auto invite = it->second->generateInvitation(); lk.unlock(); - pimpl_->sendMsgCb_(contactUri, std::move(invite), 0); + pimpl_->sendMsgCb_(contactUri, {}, std::move(invite), 0); return; } @@ -1846,12 +2037,12 @@ ConversationModule::addConversationMember(const std::string& conversationId, auto it = pimpl_->conversations_.find(conversationId); if (it != pimpl_->conversations_.end() && it->second) { pimpl_->sendMessageNotification(*it->second, - commitId, - true); // For the other members + true, + commitId); // For the other members if (sendRequest) { auto invite = it->second->generateInvitation(); lk.unlock(); - pimpl_->sendMsgCb_(contactUri, std::move(invite), 0); + pimpl_->sendMsgCb_(contactUri, {}, std::move(invite), 0); } } } @@ -1871,15 +2062,16 @@ ConversationModule::removeConversationMember(const std::string& conversationId, [this, conversationId](bool ok, const std::string& commitId) { if (ok) { pimpl_->sendMessageNotification(conversationId, - commitId, - true); + true, + commitId); } }); } } std::vector<std::map<std::string, std::string>> -ConversationModule::getConversationMembers(const std::string& conversationId, bool includeBanned) const +ConversationModule::getConversationMembers(const std::string& conversationId, + bool includeBanned) const { return pimpl_->getConversationMembers(conversationId, includeBanned); } @@ -1933,7 +2125,7 @@ ConversationModule::updateConversationInfos(const std::string& conversationId, it->second->updateInfos(infos, [this, conversationId, sync](bool ok, const std::string& commitId) { if (ok && sync) { - pimpl_->sendMessageNotification(conversationId, commitId, true); + pimpl_->sendMessageNotification(conversationId, true, commitId); } else if (sync) JAMI_WARNING("Couldn't update infos on {:s}", conversationId); }); @@ -1984,7 +2176,8 @@ ConversationModule::setConversationPreferences(const std::string& conversationId } std::map<std::string, std::string> -ConversationModule::getConversationPreferences(const std::string& conversationId, bool includeCreated) const +ConversationModule::getConversationPreferences(const std::string& conversationId, + bool includeCreated) const { std::lock_guard<std::mutex> lk(pimpl_->conversationsMtx_); auto it = pimpl_->conversations_.find(conversationId); @@ -2022,6 +2215,7 @@ ConversationModule::conversationVCard(const std::string& conversationId) const return it->second->vCard(); } + bool ConversationModule::isBannedDevice(const std::string& convId, const std::string& deviceId) const { @@ -2319,11 +2513,10 @@ ConversationModule::hostConference(const std::string& conversationId, value["confId"] = confId; value["type"] = "application/call-history+json"; conv->hostConference(std::move(value), - [w = pimpl_->weak(), - conversationId](bool ok, const std::string& commitId) { + [w = pimpl_->weak(), conversationId](bool ok, const std::string& commitId) { if (ok) { if (auto shared = w.lock()) - shared->sendMessageNotification(conversationId, commitId, true); + shared->sendMessageNotification(conversationId, true, commitId); } else { JAMI_ERR("Failed to send message to conversation %s", conversationId.c_str()); @@ -2356,7 +2549,7 @@ ConversationModule::hostConference(const std::string& conversationId, std::move(value), [w, conversationId](bool ok, const std::string& commitId) { if (ok) { if (auto shared = w.lock()) { - shared->sendMessageNotification(conversationId, commitId, true); + shared->sendMessageNotification(conversationId, true, commitId); } } else { JAMI_ERR("Failed to send message to conversation %s", @@ -2436,4 +2629,85 @@ ConversationModule::setConversationMembers(const std::string& convId, } } +std::shared_ptr<Conversation> +ConversationModule::getConversation(const std::string& convId) +{ + std::lock_guard<std::mutex> lk(pimpl_->conversationsMtx_); + auto convIt = pimpl_->conversations_.find(convId); + if (convIt != pimpl_->conversations_.end()) + return convIt->second; + return nullptr; +} + +std::shared_ptr<ChannelSocket> +ConversationModule::gitSocket(std::string_view deviceId, std::string_view convId) const +{ + { + std::lock_guard<std::mutex> lk(pimpl_->conversationsMtx_); + auto convIt = pimpl_->conversations_.find(convId); + if (convIt != pimpl_->conversations_.end()) + return convIt->second->gitSocket(DeviceId(deviceId)); + } + std::lock_guard<std::mutex> lk(pimpl_->pendingConversationsFetchMtx_); + auto it = pimpl_->pendingConversationsFetch_.find(convId); + if (it != pimpl_->pendingConversationsFetch_.end()) + return it->second.socket; + return nullptr; +} + +void +ConversationModule::addGitSocket(std::string_view deviceId, + std::string_view convId, + const std::shared_ptr<ChannelSocket>& channel) +{ + std::lock_guard<std::mutex> lk(pimpl_->conversationsMtx_); + auto convIt = pimpl_->conversations_.find(convId); + if (convIt != pimpl_->conversations_.end()) + convIt->second->addGitSocket(DeviceId(deviceId), channel); + else + JAMI_WARNING("addGitSocket: can't find conversation {:s}", convId); +} + +void +ConversationModule::removeGitSocket(std::string_view deviceId, std::string_view convId) +{ + std::lock_guard<std::mutex> lk(pimpl_->conversationsMtx_); + auto convIt = pimpl_->conversations_.find(convId); + if (convIt != pimpl_->conversations_.end()) + return convIt->second->removeGitSocket(DeviceId(deviceId)); +} +void +ConversationModule::shutdownConnections() +{ + { + std::lock_guard<std::mutex> lk(pimpl_->conversationsMtx_); + for (auto& [k, conversation] : pimpl_->conversations_) + conversation->removeGitSockets(); + } + { + std::lock_guard<std::mutex> lk(pimpl_->pendingConversationsFetchMtx_); + for (auto& [k, pending] : pimpl_->pendingConversationsFetch_) + pending.socket = {}; + } +} +void +ConversationModule::addSwarmChannel(const std::string& conversationId, + std::shared_ptr<ChannelSocket> channel) +{ + std::lock_guard<std::mutex> lk(pimpl_->conversationsMtx_); + auto convIt = pimpl_->conversations_.find(conversationId); + if (convIt != pimpl_->conversations_.end()) + convIt->second->addSwarmChannel(std::move(channel)); +} + +void +ConversationModule::connectivityChanged() +{ + { + std::lock_guard<std::mutex> lk(pimpl_->conversationsMtx_); + for (auto& [k, conversation] : pimpl_->conversations_) { + conversation->connectivityChanged(); + } + } +} } // namespace jami diff --git a/src/jamidht/conversation_module.h b/src/jamidht/conversation_module.h index fdf135899f..e02fb46d00 100644 --- a/src/jamidht/conversation_module.h +++ b/src/jamidht/conversation_module.h @@ -36,9 +36,9 @@ class SIPCall; struct SyncMsg { - jami::DeviceSync ds; - std::map<std::string, jami::ConvInfo> c; - std::map<std::string, jami::ConversationRequest> cr; + DeviceSync ds; + std::map<std::string, ConvInfo> c; + std::map<std::string, ConversationRequest> cr; // p is conversation's preferences. It's not stored in c, as // we can update the preferences without touching any confInfo. std::map<std::string, std::map<std::string, std::string>> p; @@ -50,8 +50,8 @@ struct SyncMsg using ChannelCb = std::function<bool(const std::shared_ptr<ChannelSocket>&)>; using NeedSocketCb = std::function<void(const std::string&, const std::string&, ChannelCb&&, const std::string&)>; -using SengMsgCb - = std::function<uint64_t(const std::string&, std::map<std::string, std::string>, uint64_t)>; +using SengMsgCb = std::function< + uint64_t(const std::string&, const DeviceId&, std::map<std::string, std::string>, uint64_t)>; using NeedsSyncingCb = std::function<void(std::shared_ptr<SyncMsg>&&)>; using UpdateConvReq = std::function<void(const std::string&, const std::string&, bool)>; @@ -62,6 +62,7 @@ public: NeedsSyncingCb&& needsSyncingCb, SengMsgCb&& sendMsgCb, NeedSocketCb&& onNeedSocket, + NeedSocketCb&& onNeedSwarmSocket, UpdateConvReq&& updateConvReqCb); ~ConversationModule() = default; @@ -70,6 +71,17 @@ public: */ void loadConversations(); +#ifdef LIBJAMI_TESTABLE + void onBootstrapStatus(const std::function<void(std::string, Conversation::BootstrapStatus)>& cb); +#endif + + void monitor(); + + /** + * Bootstrap swarm managers to other peers + */ + void bootstrap(); + /** * Clear not removed fetch */ @@ -435,6 +447,38 @@ public: void addConvInfo(const ConvInfo& info); void setConversationMembers(const std::string& convId, const std::vector<std::string>& members); + /** + * Get a conversation + * @param convId + */ + std::shared_ptr<Conversation> getConversation(const std::string& convId); + /** + * Return current git socket used for a conversation + * @param deviceId Related device + * @param conversationId Related conversation + * @return the related socket + */ + std::shared_ptr<ChannelSocket> gitSocket(std::string_view deviceId, + std::string_view convId) const; + void removeGitSocket(std::string_view deviceId, std::string_view convId); + void addGitSocket(std::string_view deviceId, + std::string_view convId, + const std::shared_ptr<ChannelSocket>& channel); + /** + * Clear all connection (swarm channels) + */ + void shutdownConnections(); + /** + * Add a swarm connection + * @param conversationId + * @param socket + */ + void addSwarmChannel(const std::string& conversationId, std::shared_ptr<ChannelSocket> socket); + /** + * Triggers a bucket maintainance for DRTs + */ + void connectivityChanged(); + private: class Impl; std::shared_ptr<Impl> pimpl_; diff --git a/src/jamidht/jamiaccount.cpp b/src/jamidht/jamiaccount.cpp index 9e8fa8781a..58446e3e74 100644 --- a/src/jamidht/jamiaccount.cpp +++ b/src/jamidht/jamiaccount.cpp @@ -34,6 +34,7 @@ #include "conversation_channel_handler.h" #include "sync_channel_handler.h" #include "transfer_channel_handler.h" +#include "swarm/swarm_channel_handler.h" #include "sip/sdp.h" #include "sip/sipvoiplink.h" @@ -67,7 +68,6 @@ #include "string_utils.h" #include "archiver.h" #include "data_transfer.h" -#include "conversation.h" #include "connectivity/security/certstore.h" #include "libdevcrypto/Common.h" @@ -318,7 +318,9 @@ JamiAccount::shutdownConnections() channelHandlers_.clear(); connectionManager_.reset(); } - gitSocketList_.clear(); + if (convModule_) + convModule_->shutdownConnections(); + std::lock_guard<std::mutex> lk(sipConnsMtx_); sipConns_.clear(); } @@ -2031,8 +2033,8 @@ JamiAccount::doRegister_() return; } - auto sock = gitSocket(deviceId, conversationId); - if (sock != std::nullopt && sock->lock() == channel) { + auto sock = convModule()->gitSocket(deviceId.toString(), conversationId); + if (sock == channel) { // The onConnectionReady is already used as client (for retrieving messages) // So it's not the server socket return; @@ -2151,15 +2153,12 @@ JamiAccount::convModule() auto shared = w.lock(); if (!shared) return; - auto gs = shared->gitSocket(DeviceId(deviceId), convId); - if (gs != std::nullopt) { - if (auto socket = gs->lock()) { - if (!cb(socket)) - socket->shutdown(); - else - cb({}); - return; - } + if (auto socket = shared->convModule()->gitSocket(deviceId, convId)) { + if (!cb(socket)) + socket->shutdown(); + else + cb({}); + return; } std::lock_guard<std::mutex> lkCM(shared->connManagerMtx_); if (!shared->connectionManager_) { @@ -2173,7 +2172,8 @@ JamiAccount::convModule() const DeviceId&) { if (socket) { socket->onShutdown([shared, deviceId = socket->deviceId(), convId] { - shared->removeGitSocket(deviceId, convId); + shared->convModule()->removeGitSocket(deviceId.toString(), + convId); }); if (!cb(socket)) socket->shutdown(); @@ -2185,6 +2185,18 @@ JamiAccount::convModule() type); }); }, + [this](const auto& convId, const auto& deviceId, auto&& cb, const auto& connectionType) { + std::lock_guard<std::mutex> lkCM(connManagerMtx_); + if (!connectionManager_) { + Manager::instance().ioContext()->post([cb = std::move(cb)] { cb({}); }); + return; + } + connectionManager_->connectDevice(DeviceId(deviceId), + fmt::format("swarm://{}", convId), + [cb = std::move( + cb)](std::shared_ptr<ChannelSocket> socket, + const DeviceId&) { cb(socket); }); + }, [this](auto&& convId, auto&& contactUri, bool accept) { // NOTE: do not reschedule as the conversation's requests // should be synched with trust requests @@ -4141,6 +4153,8 @@ JamiAccount::initConnectionManager() { if (!connectionManager_) { connectionManager_ = std::make_unique<ConnectionManager>(*this); + channelHandlers_[Uri::Scheme::SWARM] + = std::make_unique<SwarmChannelHandler>(shared(), *connectionManager_.get()); channelHandlers_[Uri::Scheme::GIT] = std::make_unique<ConversationChannelHandler>(shared(), *connectionManager_.get()); channelHandlers_[Uri::Scheme::SYNC] diff --git a/src/jamidht/jamiaccount.h b/src/jamidht/jamiaccount.h index 93f580c190..a2b62d3656 100644 --- a/src/jamidht/jamiaccount.h +++ b/src/jamidht/jamiaccount.h @@ -91,10 +91,6 @@ class SyncModule; struct TextMessageCtx; using SipConnectionKey = std::pair<std::string /* accountId */, DeviceId>; -using GitSocketList = std::map<DeviceId, /* device Id */ - std::map<std::string, /* conversation */ - std::shared_ptr<ChannelSocket> /* related socket */ - >>; /** * @brief Ring Account is build on top of SIPAccountBase and uses DHT to handle call connectivity. @@ -452,43 +448,6 @@ public: * ConnectionManager needs the account to exists */ void shutdownConnections(); - std::optional<std::weak_ptr<ChannelSocket>> gitSocket(const DeviceId& deviceId, - const std::string& conversationId) const - { - auto deviceSockets = gitSocketList_.find(deviceId); - if (deviceSockets == gitSocketList_.end()) { - return std::nullopt; - } - auto socketIt = deviceSockets->second.find(conversationId); - if (socketIt == deviceSockets->second.end()) { - return std::nullopt; - } - return socketIt->second; - } - bool hasGitSocket(const DeviceId& deviceId, const std::string& conversationId) const - { - return gitSocket(deviceId, conversationId) != std::nullopt; - } - - void addGitSocket(const DeviceId& deviceId, - const std::string& conversationId, - const std::shared_ptr<ChannelSocket>& socket) - { - auto& deviceSockets = gitSocketList_[deviceId]; - deviceSockets[conversationId] = socket; - } - - void removeGitSocket(const DeviceId& deviceId, const std::string& conversationId) - { - auto deviceSockets = gitSocketList_.find(deviceId); - if (deviceSockets == gitSocketList_.end()) { - return; - } - deviceSockets->second.erase(conversationId); - if (deviceSockets->second.empty()) { - gitSocketList_.erase(deviceSockets); - } - } std::string_view currentDeviceId() const; @@ -612,6 +571,15 @@ public: */ void handleIncomingConversationCall(const std::string& callId, const std::string& destination); + /** + * The DRT component is composed on some special nodes, that are usually present but not connected. + * This kind of node corresponds to devices with push notifications & proxy and are + * stored in the mobile nodes + */ + bool isMobile() const { + return config().proxyEnabled and not config().deviceKey.empty(); + } + private: NON_COPYABLE(JamiAccount); @@ -786,7 +754,6 @@ private: mutable std::mutex connManagerMtx_ {}; std::unique_ptr<ConnectionManager> connectionManager_; - GitSocketList gitSocketList_ {}; std::mutex discoveryMapMtx_; std::shared_ptr<dht::PeerDiscovery> peerDiscovery_; diff --git a/src/jamidht/swarm/swarm_channel_handler.cpp b/src/jamidht/swarm/swarm_channel_handler.cpp index b33cc94ca3..625795899a 100644 --- a/src/jamidht/swarm/swarm_channel_handler.cpp +++ b/src/jamidht/swarm/swarm_channel_handler.cpp @@ -34,6 +34,10 @@ SwarmChannelHandler::connect(const DeviceId& deviceId, const std::string& conversationId, ConnectCb&& cb) { +#ifdef LIBJAMI_TESTABLE + if (disableSwarmManager) + return; +#endif connectionManager_.connectDevice(deviceId, fmt::format("swarm://{}", conversationId), cb); } @@ -41,6 +45,10 @@ bool SwarmChannelHandler::onRequest(const std::shared_ptr<dht::crypto::Certificate>& cert, const std::string& name) { +#ifdef LIBJAMI_TESTABLE + if (disableSwarmManager) + return false; +#endif auto acc = account_.lock(); if (!cert || !cert->issuer || !acc) return false; @@ -61,5 +69,12 @@ SwarmChannelHandler::onReady(const std::shared_ptr<dht::crypto::Certificate>&, const std::string& uri, std::shared_ptr<ChannelSocket> socket) { + auto sep = uri.find_last_of('/'); + auto conversationId = uri.substr(sep + 1); + if (auto acc = account_.lock()) { + if (auto convModule = acc->convModule()) { + convModule->addSwarmChannel(conversationId, socket); + } + } } } // namespace jami \ No newline at end of file diff --git a/src/jamidht/swarm/swarm_channel_handler.h b/src/jamidht/swarm/swarm_channel_handler.h index 9f82f6d1bc..ef88cf4940 100644 --- a/src/jamidht/swarm/swarm_channel_handler.h +++ b/src/jamidht/swarm/swarm_channel_handler.h @@ -41,6 +41,10 @@ public: SwarmChannelHandler(const std::shared_ptr<JamiAccount>& acc, ConnectionManager& cm); ~SwarmChannelHandler(); +#ifdef LIBJAMI_TESTABLE + std::atomic_bool disableSwarmManager {false}; +#endif + /** * Ask for a new git channel * @param nodeId The node to connect diff --git a/src/jamidht/swarm/swarm_manager.cpp b/src/jamidht/swarm/swarm_manager.cpp index 03f9f69cd5..c6d52a5954 100644 --- a/src/jamidht/swarm/swarm_manager.cpp +++ b/src/jamidht/swarm/swarm_manager.cpp @@ -1,5 +1,5 @@ /* - * Copyright (C) 2022 Savoir-faire Linux Inc. + * Copyright (C) 2023 Savoir-faire Linux Inc. * Author: Fadi Shehadeh <fadi.shehadeh@savoirfairelinux.com> * * This program is free software; you can redistribute it and/or modify @@ -20,6 +20,7 @@ #include "swarm_manager.h" #include "connectivity/multiplexed_socket.h" +#include <opendht/thread_pool.h> constexpr const std::chrono::minutes FIND_PERIOD {10}; @@ -103,13 +104,18 @@ SwarmManager::sendRequest(const std::shared_ptr<ChannelSocketInterface>& socket, msgpack::sbuffer buffer; msgpack::packer<msgpack::sbuffer> pk(&buffer); std::error_code ec; + + Request toRequest {q, numberNodes, nodeId}; Message msg; - msg.request = Request {q, numberNodes, nodeId}; + msg.is_mobile = isMobile_; + msg.request = std::move(toRequest); + pk.pack(msg); socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec); + if (ec) { - JAMI_ERR("%s", ec.message().c_str()); + JAMI_ERROR("{}", ec.message()); return; } } @@ -118,26 +124,25 @@ void SwarmManager::sendAnswer(const std::shared_ptr<ChannelSocketInterface>& socket, const Message& msg_) { std::lock_guard<std::mutex> lock(mutex); - Response toResponse; - Message msg; - std::vector<NodeId> nodesToSend; if (msg_.request->q == Query::FIND) { - nodesToSend = routing_table.closestNodes(msg_.request->nodeId, msg_.request->num); - toResponse.q = Query::FOUND; + auto nodes = routing_table.closestNodes(msg_.request->nodeId, msg_.request->num); + auto bucket = routing_table.findBucket(msg_.request->nodeId); + const auto& m_nodes = bucket->getMobileNodes(); + Response toResponse {Query::FOUND, nodes, {m_nodes.begin(), m_nodes.end()}}; - toResponse.nodes = nodesToSend; - msg.response = toResponse; + Message msg; + msg.is_mobile = isMobile_; + msg.response = std::move(toResponse); - msgpack::sbuffer buffer; + msgpack::sbuffer buffer((size_t) 1024); msgpack::packer<msgpack::sbuffer> pk(&buffer); - pk.pack(msg); std::error_code ec; socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec); if (ec) { - JAMI_ERR("%s", ec.message().c_str()); + JAMI_ERROR("{}", ec.message()); return; } } @@ -164,37 +169,36 @@ SwarmManager::receiveMessage(const std::shared_ptr<ChannelSocketInterface>& sock while (ctx->pac.next(oh)) { auto shared = w.lock(); auto socket = wsocket.lock(); - if (!shared || !socket) { - JAMI_ERROR("Swarm Manager was destroyed :/"); - return (size_t) 0; - } + if (!shared || !socket) + return 0lu; try { Message msg; oh.get().convert(msg); - if (msg.request) { + if (msg.is_mobile) + shared->changeMobility(socket->deviceId(), msg.is_mobile); + + if (msg.request) shared->sendAnswer(socket, msg); - } - if (msg.response) { + else if (msg.response) { shared->setKnownNodes(msg.response->nodes); + shared->setMobileNodes(msg.response->mobile_nodes); } } catch (const std::exception& e) { - JAMI_WARN("Error DRT recv: %s", e.what()); - // return len; + JAMI_WARNING("Error DRT recv: {}", e.what()); + return len; } } return len; }); - socket->onShutdown([w = weak(), wsocket = std::weak_ptr<ChannelSocketInterface>(socket)] { + socket->onShutdown([w = weak(), deviceId = socket->deviceId()] { auto shared = w.lock(); - auto socket = wsocket.lock(); - if (shared and socket) { - shared->removeNode(socket->deviceId()); - shared->maintainBuckets(); + if (shared && !shared->isShutdown_) { + shared->removeNode(deviceId); } }); } @@ -223,38 +227,60 @@ SwarmManager::maintainBuckets() for (auto& node : nodes) tryConnect(node); } +bool +SwarmManager::hasChannel(const NodeId& deviceId) +{ + return routing_table.hasNode(deviceId); +} void SwarmManager::tryConnect(const NodeId& nodeId) { if (needSocketCb_) needSocketCb_(nodeId.toString(), - [this, nodeId](const std::shared_ptr<ChannelSocketInterface>& socket) { - std::unique_lock<std::mutex> lock(mutex); + [w = weak(), nodeId](const std::shared_ptr<ChannelSocketInterface>& socket) { + auto shared = w.lock(); + if (!shared) + return true; if (socket) { - if (routing_table.addNode(socket)) { - std::error_code ec; - resetNodeExpiry(ec, socket, id_); - lock.unlock(); - receiveMessage(socket); - } - } else { - routing_table.addKnownNode(nodeId); - maintainBuckets(); + shared->addChannel(socket); + return true; + } + std::unique_lock<std::mutex> lk(shared->mutex); + auto bucket = shared->routing_table.findBucket(nodeId); + bucket->removeConnectingNode(nodeId); + bucket->addKnownNode(nodeId); + bucket = shared->routing_table.findBucket(shared->getId()); + if (bucket->getConnectingNodesSize() == 0 + && bucket->getNodeIds().size() == 0 && shared->onConnectionChanged_) { + lk.unlock(); + JAMI_WARNING("[SwarmManager {:p}] Bootstrap: all connections failed", + fmt::ptr(shared.get())); + shared->onConnectionChanged_(false); } return true; }); } void -SwarmManager::addChannel(const std::shared_ptr<ChannelSocketInterface>& channel) +SwarmManager::addChannel(std::shared_ptr<ChannelSocketInterface> channel) { + auto emit = false; { std::lock_guard<std::mutex> lock(mutex); + emit = routing_table.findBucket(getId())->getNodeIds().size() == 0; auto bucket = routing_table.findBucket(channel->deviceId()); - routing_table.addNode(channel, bucket); + if (routing_table.addNode(channel, bucket)) { + std::error_code ec; + resetNodeExpiry(ec, channel, id_); + } } receiveMessage(channel); + if (emit && onConnectionChanged_) { + // If it's the first channel we add, we're now connected! + JAMI_DEBUG("[SwarmManager {}] Bootstrap: Connected!", fmt::ptr(this)); + onConnectionChanged_(true); + } } void @@ -282,11 +308,26 @@ SwarmManager::resetNodeExpiry(const asio::error_code& ec, auto& nodeTimer = bucket->getNodeTimer(socket); nodeTimer.expires_after(FIND_PERIOD); nodeTimer.async_wait(std::bind(&jami::SwarmManager::resetNodeExpiry, - this, + shared_from_this(), std::placeholders::_1, socket, NodeId {})); } } +void +SwarmManager::changeMobility(const NodeId& nodeId, bool isMobile) +{ + std::lock_guard<std::mutex> lock(mutex); + auto bucket = routing_table.findBucket(nodeId); + bucket->changeMobility(nodeId, isMobile); +} + +void +SwarmManager::shutdown() +{ + isShutdown_ = true; + std::lock_guard<std::mutex> lock(mutex); + routing_table.shutdownAllNodes(); +} } // namespace jami diff --git a/src/jamidht/swarm/swarm_manager.h b/src/jamidht/swarm/swarm_manager.h index f20eda1d9b..df1d247226 100644 --- a/src/jamidht/swarm/swarm_manager.h +++ b/src/jamidht/swarm/swarm_manager.h @@ -33,6 +33,7 @@ class SwarmManager : public std::enable_shared_from_this<SwarmManager> { using ChannelCb = std::function<bool(const std::shared_ptr<ChannelSocketInterface>&)>; using NeedSocketCb = std::function<void(const std::string&, ChannelCb&&)>; + using OnConnectionChanged = std::function<void(bool ok)>; public: SwarmManager(const NodeId&); @@ -63,26 +64,41 @@ public: * Add channel to routing table * @param shared_ptr<ChannelSocketInterface>& channel */ - void addChannel(const std::shared_ptr<ChannelSocketInterface>& channel); + void addChannel(std::shared_ptr<ChannelSocketInterface> channel); void removeNode(const NodeId& nodeId); + void changeMobility(const NodeId& nodeId, bool isMobile); /** For testing */ RoutingTable& getRoutingTable() { return routing_table; }; std::list<Bucket>& getBuckets() { return routing_table.getBuckets(); }; - void shutdown() { routing_table.shutdownAllNodes(); }; + void shutdown(); void display() { - JAMI_DEBUG("SwarmManager {:s} has {:d} nodes in table", + JAMI_DEBUG("SwarmManager {:s} has {:d} nodes in table [P = {}]", getId().to_c_str(), - routing_table.getRoutingTableNodeCount()); + routing_table.getRoutingTableNodeCount(), + isMobile_); } + void onConnectionChanged(OnConnectionChanged cb) { onConnectionChanged_ = std::move(cb); } + + void setMobility(bool isMobile) { isMobile_ = isMobile; } + + bool isMobile() const { return isMobile_; } + + /** + * Maintain/Update buckets + */ + void maintainBuckets(); + + bool hasChannel(const NodeId& deviceId); + private: /** - * Add node to the known_Nodes list + * Add node to the known_nodes list * @param NodeId nodeId */ void addKnownNodes(const NodeId& nodeId); @@ -118,11 +134,6 @@ private: */ void receiveMessage(const std::shared_ptr<ChannelSocketInterface>& socket); - /** - * Maintain/Update buckets - */ - void maintainBuckets(); - /** * Add list of nodes to the known nodes list * @param asio::error_code& ec @@ -142,11 +153,14 @@ private: void removeNodeInternal(const NodeId& nodeId); const NodeId id_; + bool isMobile_ {false}; mutable std::mt19937_64 rd; mutable std::mutex mutex; RoutingTable routing_table; std::atomic_bool isShutdown_ {false}; + + OnConnectionChanged onConnectionChanged_ {}; }; } // namespace jami diff --git a/src/manager.cpp b/src/manager.cpp index 2ca950b4ac..cf9d3efaf2 100644 --- a/src/manager.cpp +++ b/src/manager.cpp @@ -3149,14 +3149,15 @@ Manager::getJamiPluginManager() const } #endif -std::optional<std::weak_ptr<ChannelSocket>> -Manager::gitSocket(const std::string& accountId, - const std::string& deviceId, - const std::string& conversationId) +std::shared_ptr<ChannelSocket> +Manager::gitSocket(std::string_view accountId, + std::string_view deviceId, + std::string_view conversationId) { if (const auto acc = getAccount<JamiAccount>(accountId)) - return acc->gitSocket(DeviceId(deviceId), conversationId); - return std::nullopt; + if (auto convModule = acc->convModule()) + return convModule->gitSocket(deviceId, conversationId); + return nullptr; } std::map<std::string, std::string> diff --git a/src/manager.h b/src/manager.h index 998e179114..e75ac960ba 100644 --- a/src/manager.h +++ b/src/manager.h @@ -710,9 +710,9 @@ public: * @return std::shared_ptr<Account> Shared pointer on an Account instance or nullptr if not found */ template<class T = Account> - std::shared_ptr<T> getAccount(const std::string& accountID) const + inline std::shared_ptr<T> getAccount(std::string_view accountId) const { - return accountFactory.getAccount<T>(accountID); + return accountFactory.getAccount<T>(accountId); } /** @@ -852,9 +852,9 @@ public: * @param conversationId Related conversation * @return std::optional<std::weak_ptr<ChannelSocket>> the related socket */ - std::optional<std::weak_ptr<ChannelSocket>> gitSocket(const std::string& accountId, - const std::string& deviceId, - const std::string& conversationId); + std::shared_ptr<ChannelSocket> gitSocket(const std::string_view accountId, + const std::string_view deviceId, + const std::string_view conversationId); void setDefaultModerator(const std::string& accountID, const std::string& peerURI, bool state); std::vector<std::string> getDefaultModerators(const std::string& accountID); diff --git a/test/unitTest/Makefile.am b/test/unitTest/Makefile.am index abfa3bc7b5..03719f78ae 100644 --- a/test/unitTest/Makefile.am +++ b/test/unitTest/Makefile.am @@ -237,4 +237,10 @@ ut_sip_srtp_SOURCES = sip_account/sip_srtp.cpp check_PROGRAMS += ut_plugins ut_plugins_SOURCES = plugins/plugins.cpp common.cpp +# +# Routing Table +# +check_PROGRAMS += ut_routing_table +ut_routing_table_SOURCES = swarm/routing_table.cpp + TESTS = $(check_PROGRAMS) diff --git a/test/unitTest/swarm/nodes.h b/test/unitTest/swarm/nodes.h new file mode 100644 index 0000000000..8dcf7d105b --- /dev/null +++ b/test/unitTest/swarm/nodes.h @@ -0,0 +1,93 @@ +#pragma once + +#include "opendht/infohash.h" +#include "connectivity/multiplexed_socket.h" + +using NodeId = dht::PkId; + +std::vector<NodeId> nodeTestIds1({ + + NodeId("053927d831827a9f7e606d4c9c9fe833922c0d35b3960dd2250085f46c0e4f41"), + NodeId("41a05179e4b3e42c3409b10280bb448d5bbd5ef64784b997d2d1663457bb6ba8"), + NodeId("105ba3496ecaa41ad360b45afcefab63ce1c458527ac128d41c791d40e8c62b8"), + NodeId("1b8cc1705ede1abfdf3de98cf94b62d1482ef87ff8206aff483379ad2ff8a3a5"), + NodeId("1bd92a8aab91e63267fd91c6ff4d88896bca4b69e422b11894881cd849fa1467"), + NodeId("28f4c7e34eb4310b2e1ea3b139ee6993e6b021770ee98895a54cdd1e372bd78e"), + NodeId("2dd1dd976c7dc234ca737c85e4ea48ad09423067a77405254424c4cdd845720d"), + NodeId("30e177a56bd1a7969e1973ad8b210a556f6a2b15debc972661a8f555d52edbe2"), + NodeId("312226d8fa653704758a681c8c21ec81cec914d0b8aa19e1142d3cf900e3f3b4"), + NodeId("33f280d8208f42ac34321e6e6871aecd100c2bfd4f1848482e7a7ed8ae895414") + +}); + +std::vector<NodeId> nodeTestIds2({ + + NodeId("053927d831827a9f7e606d4c9c9fe833922c0d35b3960dd2250085f46c0e4f41"), // 0 + NodeId("41a05179e4b3e42c3409b10280bb448d5bbd5ef64784b997d2d1663457bb6ba8"), // 1 + NodeId("77a9fba2c5a65812d9290c567897131b20a723e0ca2f65ef5c6b421585e4da2b"), // 2 + NodeId("6110cda4bc6f5465acab2e779fad607bf4edcf6114c7333968a2d197aa7a0a63"), // 3 + NodeId("6aed0ef602f5c676e7afd337e3984a211c385f38230fa894cfa91e1cbd592b5c"), // 4 + NodeId("e633ca67cc8801ec141b2f7eb55b78886e9266ed60c7e4bc12c232ab60d7317e"), // 5 + NodeId("84b59e42e8156d18794d263baae06344871b9f97d5006e1f99e8545337c37c37"), // 6 + NodeId("a3b1b35be59ed62926569479d72718ccca553710f2a22490d1155d834d972525"), // 7 + NodeId("4f76e769061f343b2caf9eea35632d28cde8d7a67e5e0f59857733cabc538997"), // 8 + NodeId("fda3516c620bf55511ed0184fc3e32fc346ea0f3a2c6bc19257bd98e19734307") // 9 + +}); + +std::vector<NodeId> nodeTestIds3({ + + NodeId("41a05179e4b3e42c3409b10280bb448d5bbd5ef64784b997d2d1663457bb6ba8"), + NodeId("053927d831827a9f7e606d4c9c9fe833922c0d35b3960dd2250085f46c0e4f41"), + NodeId("271c3365b92f249597be69e4fde318cb13abd1059fb3ad88da52d7690083ffb0"), + NodeId("6aed0ef602f5f676e7afd337e3984a211c385f38230fa894cfa91e1cbd592b5c"), + NodeId("821bc564703ba2fc147f12f1ec30a2bd39bd8ad9fe241da3b47d391cfcc87519"), + NodeId("84b59e42e815dd18794d263baae06344871b9f97d5006e1f99e8545337c37c37"), + NodeId("897b3ff45a8e1c1fa7168b2fac0f32b6cfa3bf430685b36b6f2d646a9125164e"), + NodeId("a3b1b35be59ed62926569479d72718ccca553710f2a22490d1155d834d972525"), + NodeId("f04d8116705f692677cb7cb519c078341fe8aaae3f792904a7be3a8ae0bfa1ea"), + NodeId("f3a0b932602befe4c00b8bf7d2101f60a712bb55b0f62a023766250044b883a8") + +}); + +std::vector<std::shared_ptr<jami::ChannelSocketTest>> nodeTestChannels1( + {std::make_shared<jami::ChannelSocketTest>(nodeTestIds1.at(0), "test1", 0), + std::make_shared<jami::ChannelSocketTest>(nodeTestIds1.at(1), "test1", 0), + std::make_shared<jami::ChannelSocketTest>(nodeTestIds1.at(2), "test1", 0), + std::make_shared<jami::ChannelSocketTest>(nodeTestIds1.at(3), "test1", 0), + std::make_shared<jami::ChannelSocketTest>(nodeTestIds1.at(4), "test1", 0), + std::make_shared<jami::ChannelSocketTest>(nodeTestIds1.at(5), "test1", 0), + std::make_shared<jami::ChannelSocketTest>(nodeTestIds1.at(6), "test1", 0), + std::make_shared<jami::ChannelSocketTest>(nodeTestIds1.at(7), "test1", 0), + std::make_shared<jami::ChannelSocketTest>(nodeTestIds1.at(8), "test1", 0), + std::make_shared<jami::ChannelSocketTest>(nodeTestIds1.at(9), "test1", 0) + + }); + +std::vector<std::shared_ptr<jami::ChannelSocketTest>> nodeTestChannels2( + {std::make_shared<jami::ChannelSocketTest>(nodeTestIds2.at(0), "test1", 0), + std::make_shared<jami::ChannelSocketTest>(nodeTestIds2.at(1), "test1", 0), + std::make_shared<jami::ChannelSocketTest>(nodeTestIds2.at(2), "test1", 0), + std::make_shared<jami::ChannelSocketTest>(nodeTestIds2.at(3), "test1", 0), + std::make_shared<jami::ChannelSocketTest>(nodeTestIds2.at(4), "test1", 0), + std::make_shared<jami::ChannelSocketTest>(nodeTestIds2.at(5), "test1", 0), + std::make_shared<jami::ChannelSocketTest>(nodeTestIds2.at(6), "test1", 0), + std::make_shared<jami::ChannelSocketTest>(nodeTestIds2.at(7), "test1", 0), + std::make_shared<jami::ChannelSocketTest>(nodeTestIds2.at(8), "test1", 0), + std::make_shared<jami::ChannelSocketTest>(nodeTestIds2.at(9), "test1", 0) + + }); + +std::vector<std::shared_ptr<jami::ChannelSocketTest>> nodeTestChannels3( + {std::make_shared<jami::ChannelSocketTest>(nodeTestIds3.at(0), "test1", 0), + std::make_shared<jami::ChannelSocketTest>(nodeTestIds3.at(1), "test1", 0), + std::make_shared<jami::ChannelSocketTest>(nodeTestIds3.at(2), "test1", 0), + std::make_shared<jami::ChannelSocketTest>(nodeTestIds3.at(3), "test1", 0), + std::make_shared<jami::ChannelSocketTest>(nodeTestIds3.at(4), "test1", 0), + std::make_shared<jami::ChannelSocketTest>(nodeTestIds3.at(5), "test1", 0), + std::make_shared<jami::ChannelSocketTest>(nodeTestIds3.at(6), "test1", 0), + std::make_shared<jami::ChannelSocketTest>(nodeTestIds3.at(7), "test1", 0), + std::make_shared<jami::ChannelSocketTest>(nodeTestIds3.at(8), "test1", 0), + std::make_shared<jami::ChannelSocketTest>(nodeTestIds3.at(9), "test1", 0) + + }); diff --git a/test/unitTest/swarm/routing_table.cpp b/test/unitTest/swarm/routing_table.cpp new file mode 100644 index 0000000000..f031fe7f1a --- /dev/null +++ b/test/unitTest/swarm/routing_table.cpp @@ -0,0 +1,1318 @@ +/* + * Copyright (C) 2023 Savoir-faire Linux Inc. + * Author: Fadi Shehadeh <fadi.shehadeh@savoirfairelinux.com> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <https://www.gnu.org/licenses/>. + */ +#include <cppunit/TestAssert.h> +#include <cppunit/TestFixture.h> +#include <cppunit/extensions/HelperMacros.h> + +#include "../../test_runner.h" +#include "jami.h" +#include "../common.h" +#include "jamidht/swarm/swarm_manager.h" +#include "connectivity/multiplexed_socket.h" +#include <algorithm> + +#include "connectivity/peer_connection.h" +#include "nodes.h" + +#include <opendht/thread_pool.h> + +using namespace std::string_literals; +using namespace std::chrono_literals; +using namespace dht; +using NodeId = dht::PkId; + +namespace jami { +namespace test { + +constexpr size_t nNodes = 10; +constexpr size_t mNodes = 5; +constexpr size_t kNodes = 10; + +constexpr size_t BOOTSTRAP_SIZE = 2; +constexpr int time = 2; + +struct Counter +{ + Counter(unsigned t) + : target(t) + {} + const unsigned target; + unsigned added {0}; + std::mutex mutex; + std::condition_variable cv; + + void count() + { + std::lock_guard<std::mutex> lock(mutex); + ++added; + if (added == target) + cv.notify_one(); + } + bool wait(std::chrono::steady_clock::duration timeout) + { + std::unique_lock<std::mutex> lock(mutex); + return cv.wait_for(lock, timeout, [&] { return added == target; }); + } + void wait() + { + std::unique_lock<std::mutex> lock(mutex); + return cv.wait(lock, [&] { return added == target; }); + } +}; + +class RoutingTableTest : public CppUnit::TestFixture +{ +public: + ~RoutingTableTest() { libjami::fini(); } + static std::string name() { return "RoutingTable"; } + + void setUp(); + void tearDown(); + +private: + // ################# METHODS AND VARIABLES GENERATING DATA #################// + + std::mt19937_64 rd {dht::crypto::getSeededRandomEngine<std::mt19937_64>()}; + std::mutex channelSocketsMtx_; + std::vector<NodeId> randomNodeIds; + std::map<NodeId, std::map<NodeId, std::shared_ptr<jami::ChannelSocketTest>>> channelSockets_; + std::map<NodeId, std::shared_ptr<jami::SwarmManager>> swarmManagers; + std::map<NodeId, std::set<NodeId>> nodesToConnect; + std::set<NodeId> messageNode; + + void generaterandomNodeIds(); + void generateSwarmManagers(); + std::shared_ptr<jami::SwarmManager> getManager(const NodeId& id) + { + auto it = swarmManagers.find(id); + return it == swarmManagers.end() ? nullptr : it->second; + } + void setKnownNodesToManager(const std::shared_ptr<SwarmManager>& sm); + void needSocketCallBack(const std::shared_ptr<SwarmManager>& sm); + + // ################# METHODS AND VARIABLES TO TEST DATA #################// + + std::map<std::shared_ptr<jami::SwarmManager>, std::vector<NodeId>> knownNodesSwarmManager; + std::map<NodeId, std::shared_ptr<jami::SwarmManager>> swarmManagersTest_; + std::vector<NodeId> discoveredNodes; + + void crossNodes(NodeId nodeId); + void distribution(); + + // ################# UNIT TEST METHODES #################// + + void testBucketMainFunctions(); + void testRoutingTableMainFunctions(); + void testBucketKnownNodes(); + void testSwarmManagerConnectingNodes_1b(); + void testClosestNodes_1b(); + void testClosestNodes_multipleb(); + void testSendKnownNodes_1b(); + void testSendKnownNodes_multipleb(); + void testMobileNodeFunctions(); + void testMobileNodeAnnouncement(); + void testMobileNodeSplit(); + void testSendMobileNodes(); + void testBucketSplit_1n(); + void testSwarmManagersSmallBootstrapList(); + void testRoutingTableForConnectingNode(); + void testRoutingTableForShuttingNode(); + void testRoutingTableForMassShuttingsNodes(); + void testSwarmManagersWMobileModes(); + + CPPUNIT_TEST_SUITE(RoutingTableTest); + CPPUNIT_TEST(testBucketMainFunctions); + CPPUNIT_TEST(testRoutingTableMainFunctions); + CPPUNIT_TEST(testClosestNodes_multipleb); + CPPUNIT_TEST(testBucketSplit_1n); + CPPUNIT_TEST(testBucketKnownNodes); + CPPUNIT_TEST(testSendKnownNodes_1b); + CPPUNIT_TEST(testSendKnownNodes_multipleb); + CPPUNIT_TEST(testClosestNodes_1b); + CPPUNIT_TEST(testSwarmManagersSmallBootstrapList); + CPPUNIT_TEST(testSwarmManagerConnectingNodes_1b); + CPPUNIT_TEST(testRoutingTableForConnectingNode); + CPPUNIT_TEST(testMobileNodeFunctions); + CPPUNIT_TEST(testMobileNodeAnnouncement); + CPPUNIT_TEST(testMobileNodeSplit); + CPPUNIT_TEST(testSendMobileNodes); + CPPUNIT_TEST(testSwarmManagersWMobileModes); + CPPUNIT_TEST(testRoutingTableForMassShuttingsNodes); + CPPUNIT_TEST(testRoutingTableForShuttingNode); + CPPUNIT_TEST_SUITE_END(); +}; + +CPPUNIT_TEST_SUITE_NAMED_REGISTRATION(RoutingTableTest, RoutingTableTest::name()); + +void +RoutingTableTest::setUp() +{ + libjami::init( + libjami::InitFlag(libjami::LIBJAMI_FLAG_DEBUG | libjami::LIBJAMI_FLAG_CONSOLE_LOG)); + if (not Manager::instance().initialized) { + CPPUNIT_ASSERT(libjami::start("jami-sample.yml")); + } + + generaterandomNodeIds(); + generateSwarmManagers(); +} + +void +RoutingTableTest::tearDown() +{ + discoveredNodes.clear(); + swarmManagersTest_.clear(); +} + +void +RoutingTableTest::generaterandomNodeIds() +{ + auto total = nNodes + mNodes; + randomNodeIds.reserve(total); + for (size_t i = 0; i < total; i++) { + NodeId node = Hash<32>::getRandom(); + randomNodeIds.emplace_back(node); + } +} + +void +RoutingTableTest::generateSwarmManagers() +{ + auto total = nNodes + mNodes; + for (size_t i = 0; i < total; i++) { + const NodeId& node = randomNodeIds.at(i); + auto sm = std::make_shared<SwarmManager>(node); + i >= nNodes ? sm->setMobility(true) : sm->setMobility(false); + swarmManagers[node] = sm; + } +} + +void +RoutingTableTest::setKnownNodesToManager(const std::shared_ptr<SwarmManager>& sm) +{ + std::uniform_int_distribution<> distrib(1, kNodes - 1); + + int numberKnownNodesToAdd = distrib(rd); + + std::uniform_int_distribution<> distribBis(0, kNodes - 1); + int indexNodeIdToAdd; + std::vector<NodeId> kNodesToAdd; + knownNodesSwarmManager.insert({sm, {}}); + + int counter = 0; + + while (counter < numberKnownNodesToAdd) { + indexNodeIdToAdd = distribBis(rd); + + NodeId node = randomNodeIds.at(indexNodeIdToAdd); + auto it = find(kNodesToAdd.begin(), kNodesToAdd.end(), node); + if (sm->getId() != node && it == kNodesToAdd.end()) { + kNodesToAdd.push_back(node); + knownNodesSwarmManager.at(sm).push_back(node); + counter++; + } + } + + sm->setKnownNodes(kNodesToAdd); +} + +void +RoutingTableTest::needSocketCallBack(const std::shared_ptr<SwarmManager>& sm) +{ + sm->needSocketCb_ = [this, wsm = std::weak_ptr<SwarmManager>(sm)](const std::string& nodeId, + auto&& onSocket) { + Manager::instance().ioContext()->post([this, wsm, nodeId, onSocket = std::move(onSocket)] { + auto sm = wsm.lock(); + + if (!sm) + return; + + NodeId node = DeviceId(nodeId); + std::lock_guard<std::mutex> lk(channelSocketsMtx_); + if (auto smRemote = getManager(node)) { + auto myId = sm->getId(); + auto& cstRemote = channelSockets_[node][myId]; + auto& cstMe = channelSockets_[myId][node]; + if (!cstRemote) { + cstRemote = std::make_shared<ChannelSocketTest>(myId, "test1", 0); + } + if (!cstMe) { + cstMe = std::make_shared<ChannelSocketTest>(node, "test1", 0); + } + ChannelSocketTest::link(cstMe, cstRemote); + + onSocket(cstMe); + smRemote->addChannel(cstRemote); + } + }); + }; +} + +void +RoutingTableTest::distribution() +{ + std::vector<unsigned> dist(8); + for (const auto& sm : swarmManagers) { + auto val = sm.second->getRoutingTable().getRoutingTableNodeCount(); + if (dist.size() <= val) + dist.resize(val + 1); + dist[val]++; + } + for (size_t i = 0; i < dist.size(); i++) { + std::cout << "Swarm Managers with " << i << " nodes: " << dist[i] << std::endl; + } +} + +void +RoutingTableTest::testBucketMainFunctions() +{ + NodeId node0 = nodeTestIds1.at(0); + NodeId node1 = nodeTestIds1.at(1); + NodeId node2 = nodeTestIds1.at(2); + NodeId node3 = nodeTestIds1.at(3); + + auto sNode1 = nodeTestChannels1.at(1); + auto sNode2 = nodeTestChannels1.at(2); + auto sNode3 = nodeTestChannels1.at(3); + + NodeInfo InfoNode1(true, sNode2); + + std::set<std::shared_ptr<ChannelSocketInterface>> socketsCheck {sNode1, sNode2}; + std::set<NodeId> nodesCheck {node1, node2}; + + Bucket bucket(node0); + + CPPUNIT_ASSERT_EQUAL_MESSAGE("Lower limit error", node0, bucket.getLowerLimit()); + + bucket.addNode(sNode1); + bucket.addNode(std::move(InfoNode1)); + + bucket.printBucket(0); + + CPPUNIT_ASSERT_EQUAL_MESSAGE("Supposed to have node", true, bucket.hasNode(sNode1->deviceId())); + CPPUNIT_ASSERT_EQUAL_MESSAGE("Supposed to have node", true, bucket.hasNode(sNode2->deviceId())); + + CPPUNIT_ASSERT_EQUAL_MESSAGE("Supposed to have nodes", + true, + socketsCheck == bucket.getNodeSockets()); + CPPUNIT_ASSERT_EQUAL_MESSAGE("Supposed to have nodes", true, nodesCheck == bucket.getNodeIds()); + CPPUNIT_ASSERT_EQUAL_MESSAGE("Supposed to have nodes", true, bucket.isFull()); + CPPUNIT_ASSERT_EQUAL_MESSAGE("Not supposed to have known node", + false, + bucket.hasKnownNode(node1)); + CPPUNIT_ASSERT_EQUAL_MESSAGE("Not supposed to have known node", + false, + bucket.hasKnownNode(node2)); + CPPUNIT_ASSERT_EQUAL_MESSAGE("Not supposed to have connecting node", + false, + bucket.hasConnectingNode(node1)); + CPPUNIT_ASSERT_EQUAL_MESSAGE("Not supposed to have connecting node", + false, + bucket.hasConnectingNode(node2)); + + CPPUNIT_ASSERT_THROW_MESSAGE("Supposed to be out of range", + bucket.getKnownNode(5), + std::out_of_range); + + bucket.removeNode(sNode1->deviceId()); + bucket.shutdownNode(sNode2->deviceId()); + + CPPUNIT_ASSERT_EQUAL_MESSAGE("Not supposed to have node", false, bucket.hasNode(node1)); + CPPUNIT_ASSERT_EQUAL_MESSAGE("Not supposed to have node", false, bucket.hasNode(node2)); + CPPUNIT_ASSERT_EQUAL_MESSAGE("Supposed to have known node", true, bucket.hasKnownNode(node1)); + CPPUNIT_ASSERT_EQUAL_MESSAGE("Supposed to have known node", false, bucket.hasKnownNode(node2)); + CPPUNIT_ASSERT_EQUAL_MESSAGE("Supposed to have known node", false, bucket.hasMobileNode(node1)); + CPPUNIT_ASSERT_EQUAL_MESSAGE("Supposed to have known node", true, bucket.hasMobileNode(node2)); + CPPUNIT_ASSERT_EQUAL_MESSAGE("Not supposed to have connecting node", + false, + bucket.hasConnectingNode(node1)); + CPPUNIT_ASSERT_EQUAL_MESSAGE("Not supposed to have connecting node", + false, + bucket.hasConnectingNode(node2)); + + auto nodeTest = bucket.randomId(rd); + + CPPUNIT_ASSERT_EQUAL_MESSAGE("One of the two nodes", + true, + nodeTest == node1 || nodeTest == node2); + + bucket.addNode(sNode1); + bucket.addNode(sNode2); + + CPPUNIT_ASSERT_EQUAL_MESSAGE("Supposed to be 2", 2u, bucket.getNodesSize()); + CPPUNIT_ASSERT_EQUAL_MESSAGE("Supposed to return zero, node already added", + false, + bucket.addNode(sNode2)); + + bucket.removeNode(node1); + bucket.removeNode(node2); + + CPPUNIT_ASSERT_EQUAL_MESSAGE("Not supposed to have node", false, bucket.hasNode(node1)); + CPPUNIT_ASSERT_EQUAL_MESSAGE("Not supposed to have node", false, bucket.hasNode(node2)); + + bucket.addKnownNode(node3); + + CPPUNIT_ASSERT_EQUAL_MESSAGE("Supposed to have known node", true, bucket.hasKnownNode(node3)); + CPPUNIT_ASSERT_EQUAL_MESSAGE("Not supposed to have connecting node", + false, + bucket.hasConnectingNode(node3)); + CPPUNIT_ASSERT_EQUAL_MESSAGE("Supposed to be 3", 3u, bucket.getKnownNodesSize()); + bucket.removeKnownNode(node3); + + CPPUNIT_ASSERT_EQUAL_MESSAGE("Not supposed to have known node", + false, + bucket.hasKnownNode(node3)); + CPPUNIT_ASSERT_EQUAL_MESSAGE("Not supposed to have connecting node", + false, + bucket.hasConnectingNode(node3)); + + bucket.addConnectingNode(node1); + bucket.addConnectingNode(node2); + + CPPUNIT_ASSERT_EQUAL_MESSAGE("Supposed to have connecting node", + true, + bucket.hasConnectingNode(node1)); + + CPPUNIT_ASSERT_EQUAL_MESSAGE("Supposed to have nodes", + true, + nodesCheck == bucket.getConnectingNodes()); + + CPPUNIT_ASSERT_EQUAL_MESSAGE("Supposed to be 2", 2u, bucket.getConnectingNodesSize()); + + bucket.removeConnectingNode(node2); + + CPPUNIT_ASSERT_EQUAL_MESSAGE("Not upposed to have connecting node", + false, + bucket.hasConnectingNode(node2)); + CPPUNIT_ASSERT_EQUAL_MESSAGE("Supposed to be 1", 1u, bucket.getConnectingNodesSize()); +} + +void +RoutingTableTest::testBucketKnownNodes() +{ + std::cout << "\ntestBucketKnownNodes" << std::endl; + Bucket bucket(randomNodeIds.at(0)); + + for (size_t i = 0; i < randomNodeIds.size(); i++) { + bucket.addKnownNode(randomNodeIds.at(i)); + } + + CPPUNIT_ASSERT_EQUAL_MESSAGE("Supposed to have the known node", + true, + bucket.hasKnownNode(randomNodeIds.at(randomNodeIds.size() - 1))); + CPPUNIT_ASSERT_EQUAL_MESSAGE("Error with bucket size", + true, + bucket.getKnownNodesSize() == randomNodeIds.size()); +} + +void +RoutingTableTest::testRoutingTableMainFunctions() +{ + std::cout << "\ntestRoutingTableMainFunctions" << std::endl; + + RoutingTable rt; + NodeId node1 = nodeTestIds1.at(0); + NodeId node2 = nodeTestIds1.at(1); + NodeId node3 = nodeTestIds1.at(2); + + rt.setId(node1); + + rt.addKnownNode(node1); + rt.addKnownNode(node2); + rt.addKnownNode(node3); + + CPPUNIT_ASSERT(!rt.hasKnownNode(node1)); + CPPUNIT_ASSERT(rt.hasKnownNode(node2)); + + auto knownNodes = rt.getKnownNodes(); + + CPPUNIT_ASSERT_EQUAL_MESSAGE("Supposed to have 2 nodes", true, knownNodes.size() == 2); + + auto bucket1 = rt.findBucket(node1); + auto bucket2 = rt.findBucket(node2); + auto bucket3 = rt.findBucket(node3); + + rt.addNode(nodeTestChannels1.at(0), bucket1); + rt.addNode(nodeTestChannels1.at(1), bucket2); + rt.addNode(nodeTestChannels1.at(2), bucket3); + + CPPUNIT_ASSERT(!rt.hasNode(node1)); + CPPUNIT_ASSERT(rt.hasNode(node2)); + CPPUNIT_ASSERT(rt.hasNode(node3)); + + CPPUNIT_ASSERT_EQUAL_MESSAGE("Not supposed to exist 0", false, rt.removeNode(node1)); + CPPUNIT_ASSERT_EQUAL_MESSAGE("Not supposed to exist 1", true, rt.removeNode(node2)); + + rt.removeNode(node1); + rt.removeNode(node2); + rt.removeNode(node3); + + rt.addConnectingNode(node1); + rt.addConnectingNode(node2); + rt.addConnectingNode(node3); + + std::vector<NodeId> nodesCheck({node2, node3}); + const auto& nodes = rt.getConnectingNodes(); + + std::vector<NodeId> connectingNode; + connectingNode.insert(connectingNode.end(), nodes.begin(), nodes.end()); + + CPPUNIT_ASSERT_EQUAL_MESSAGE("Not supposed to exist 3", false, rt.hasNode(node3)); + CPPUNIT_ASSERT_EQUAL_MESSAGE("Not supposed to exist 1", false, rt.hasConnectingNode(node1)); + CPPUNIT_ASSERT_EQUAL_MESSAGE("Not supposed to exist 3", true, rt.hasConnectingNode(node3)); + + std::vector<NodeId> diff; + std::set_difference(connectingNode.begin(), + connectingNode.end(), + nodes.begin(), + nodes.end(), + std::inserter(diff, diff.begin())); + + CPPUNIT_ASSERT_EQUAL_MESSAGE("Supposed to be equal", true, diff.size() == 0); + + rt.shutdownNode(node2); + rt.shutdownNode(node3); + rt.printRoutingTable(); + CPPUNIT_ASSERT_EQUAL_MESSAGE("Not supposed to exist", true, rt.hasConnectingNode(node2)); + CPPUNIT_ASSERT_EQUAL_MESSAGE("Not supposed to exist", true, rt.hasConnectingNode(node3)); +} + +void +RoutingTableTest::testSwarmManagerConnectingNodes_1b() +{ + std::cout << "\ntestSwarmManagerConnectingNodes_1b" << std::endl; + + SwarmManager sm1(nodeTestIds1.at(0)); + auto& rt1 = sm1.getRoutingTable(); + + std::vector<NodeId> toTest( + {NodeId("053927d831827a9f7e606d4c9c9fe833922c0d35b3960dd2250085f46c0e4f41"), + NodeId("41a05179e4b3e42c3409b10280bb448d5bbd5ef64784b997d2d1663457bb6ba8")}); + + sm1.setKnownNodes(toTest); + + CPPUNIT_ASSERT(!rt1.hasConnectingNode(nodeTestIds1.at(0))); + CPPUNIT_ASSERT(rt1.hasConnectingNode(nodeTestIds1.at(1))); + CPPUNIT_ASSERT(!rt1.hasKnownNode(nodeTestIds1.at(0))); + CPPUNIT_ASSERT(!rt1.hasKnownNode(nodeTestIds1.at(1))); +} + +void +RoutingTableTest::testClosestNodes_1b() +{ + std::cout << "\ntestClosestNodes_1b" << std::endl; + + SwarmManager sm1(nodeTestIds1.at(0)); + SwarmManager sm2(nodeTestIds2.at(0)); + + auto& rt1 = sm1.getRoutingTable(); + auto& rt2 = sm2.getRoutingTable(); + + auto bucket1 = rt1.findBucket(nodeTestIds1.at(0)); + auto bucket2 = rt2.findBucket(nodeTestIds2.at(0)); + + for (size_t i = 0; i < nodeTestIds2.size(); i++) { + bucket1->addNode(nodeTestChannels1.at(i)); + bucket2->addNode(nodeTestChannels2.at(i)); + } + + std::vector<NodeId> + closestNodes1 {NodeId("41a05179e4b3e42c3409b10280bb448d5bbd5ef64784b997d2d1663457bb6ba8"), + NodeId("28f4c7e34eb4310b2e1ea3b139ee6993e6b021770ee98895a54cdd1e372bd78e"), + NodeId("2dd1dd976c7dc234ca737c85e4ea48ad09423067a77405254424c4cdd845720d"), + NodeId("33f280d8208f42ac34321e6e6871aecd100c2bfd4f1848482e7a7ed8ae895414") + + }; + + std::vector<NodeId> + closestNodes2 {NodeId("053927d831827a9f7e606d4c9c9fe833922c0d35b3960dd2250085f46c0e4f41"), + NodeId("4f76e769061f343b2caf9eea35632d28cde8d7a67e5e0f59857733cabc538997"), + NodeId("41a05179e4b3e42c3409b10280bb448d5bbd5ef64784b997d2d1663457bb6ba8"), + NodeId("77a9fba2c5a65812d9290c567897131b20a723e0ca2f65ef5c6b421585e4da2b") + + }; + + auto closestNodes1_ = rt1.closestNodes(nodeTestIds2.at(4), 4); + auto closestNodes2_ = rt2.closestNodes(nodeTestIds1.at(4), 4); + auto sameIdTest = rt2.closestNodes(nodeTestIds2.at(0), 1); + + CPPUNIT_ASSERT_EQUAL_MESSAGE("ERROR", true, closestNodes1 == closestNodes1_); + CPPUNIT_ASSERT_EQUAL_MESSAGE("ERROR", true, closestNodes2 == closestNodes2_); + CPPUNIT_ASSERT_EQUAL_MESSAGE("ERROR", false, nodeTestIds1.at(0) == sameIdTest.at(0)); +} + +void +RoutingTableTest::testClosestNodes_multipleb() +{ + std::cout << "\ntestClosestNodes_multipleb" << std::endl; + + auto sm1 = std::make_shared<SwarmManager>(nodeTestIds1.at(2)); + auto sm2 = std::make_shared<SwarmManager>(nodeTestIds1.at(6)); + + for (size_t i = 0; i < nodeTestChannels1.size(); i++) { + sm1->addChannel(nodeTestChannels1.at(i)); + sm2->addChannel(nodeTestChannels1.at(i)); + } + + std::vector<NodeId> + closestNodes1 {NodeId("2dd1dd976c7dc234ca737c85e4ea48ad09423067a77405254424c4cdd845720d"), + NodeId("30e177a56bd1a7969e1973ad8b210a556f6a2b15debc972661a8f555d52edbe2"), + NodeId("312226d8fa653704758a681c8c21ec81cec914d0b8aa19e1142d3cf900e3f3b4")}; + + std::vector<NodeId> + closestNodes2 {NodeId("30e177a56bd1a7969e1973ad8b210a556f6a2b15debc972661a8f555d52edbe2"), + NodeId("312226d8fa653704758a681c8c21ec81cec914d0b8aa19e1142d3cf900e3f3b4"), + NodeId("33f280d8208f42ac34321e6e6871aecd100c2bfd4f1848482e7a7ed8ae895414")}; + + auto closestNodes1_ = sm1->getRoutingTable().closestNodes(nodeTestIds1.at(5), 3); + auto closestNodes2_ = sm2->getRoutingTable().closestNodes(nodeTestIds1.at(5), 3); + + CPPUNIT_ASSERT_EQUAL_MESSAGE("ERROR", true, closestNodes1 == closestNodes1_); + CPPUNIT_ASSERT_EQUAL_MESSAGE("ERROR", true, closestNodes2 == closestNodes2_); +} + +void +RoutingTableTest::testBucketSplit_1n() +{ + std::cout << "\ntestBucketSplit_1n" << std::endl; + + SwarmManager sm1(nodeTestIds2.at(0)); + SwarmManager sm2(nodeTestIds2.at(nodeTestIds2.size() - 1)); + SwarmManager sm3(nodeTestIds2.at(nodeTestIds2.size() / 2)); + + auto& rt1 = sm1.getRoutingTable(); + auto& rt2 = sm2.getRoutingTable(); + auto& rt3 = sm3.getRoutingTable(); + + auto& b1 = rt1.getBuckets(); + auto& b2 = rt2.getBuckets(); + auto& b3 = rt3.getBuckets(); + + for (size_t i = 0; i < nodeTestIds2.size(); i++) { + auto bucket1 = rt1.findBucket(nodeTestIds2.at(i)); + auto bucket2 = rt2.findBucket(nodeTestIds2.at(i)); + auto bucket3 = rt3.findBucket(nodeTestIds2.at(i)); + + rt1.addNode(nodeTestChannels2.at(i), bucket1); + rt2.addNode(nodeTestChannels2.at(i), bucket2); + rt3.addNode(nodeTestChannels2.at(i), bucket3); + } + + // SM1 + CPPUNIT_ASSERT_EQUAL_MESSAGE("Not supposed to have node ntc2 0", + false, + rt1.hasNode(nodeTestChannels2.at(0)->deviceId())); + + int sm1BucketCounter = 1; + for (const auto& buckIt : b1) { + switch (sm1BucketCounter) { + case 1: + CPPUNIT_ASSERT_EQUAL_MESSAGE("Size error", 0u, buckIt.getNodesSize()); + break; + + case 2: { + std::set<NodeId> nodesCheck {nodeTestIds2.at(1), + nodeTestIds2.at(2), + nodeTestIds2.at(3), + nodeTestIds2.at(4), + nodeTestIds2.at(8)}; + + CPPUNIT_ASSERT_EQUAL_MESSAGE("Not supposed to have known nodes", + true, + nodesCheck == buckIt.getNodeIds()); + } + + break; + + case 3: { + std::set<NodeId> nodesCheck {nodeTestIds2.at(5), + nodeTestIds2.at(6), + nodeTestIds2.at(7), + nodeTestIds2.at(9)}; + CPPUNIT_ASSERT_EQUAL_MESSAGE("Not supposed to have known nodes", + true, + nodesCheck == buckIt.getNodeIds()); + } + + break; + } + + sm1BucketCounter++; + } + + CPPUNIT_ASSERT_EQUAL_MESSAGE("ERROR", 3, sm1BucketCounter - 1); + + // SM2 + CPPUNIT_ASSERT_EQUAL_MESSAGE("Not supposed to have node ntc2 9", + false, + rt2.hasNode(nodeTestChannels2.at(9)->deviceId())); + + int sm2BucketCounter = 1; + for (const auto& buckIt : b2) { + switch (sm2BucketCounter) { + case 1: { + std::set<NodeId> nodesCheck {nodeTestIds2.at(0), + nodeTestIds2.at(1), + nodeTestIds2.at(2), + nodeTestIds2.at(3), + nodeTestIds2.at(4), + nodeTestIds2.at(8)}; + CPPUNIT_ASSERT_EQUAL_MESSAGE("Not supposed to have known nodes", + true, + nodesCheck == buckIt.getNodeIds()); + } + + break; + + case 2: { + std::set<NodeId> nodesCheck {nodeTestIds2.at(6), nodeTestIds2.at(7)}; + CPPUNIT_ASSERT_EQUAL_MESSAGE("Not supposed to have known nodes", + true, + nodesCheck == buckIt.getNodeIds()); + } + + break; + + case 3: + CPPUNIT_ASSERT_EQUAL_MESSAGE("Supposed to have node ntc2 5", + true, + buckIt.hasNode(nodeTestChannels2.at(5)->deviceId())); + break; + } + + sm2BucketCounter++; + } + + CPPUNIT_ASSERT_EQUAL_MESSAGE("ERROR", 3, sm2BucketCounter - 1); + + // SM3 + CPPUNIT_ASSERT_EQUAL_MESSAGE("Not supposed to have node ntc2 5", + false, + rt3.hasNode(nodeTestChannels2.at(5)->deviceId())); + + int sm3BucketCounter = 1; + for (const auto& buckIt : b3) { + switch (sm3BucketCounter) { + case 1: { + std::set<NodeId> nodesCheck {nodeTestIds2.at(0), + nodeTestIds2.at(1), + nodeTestIds2.at(2), + nodeTestIds2.at(3), + nodeTestIds2.at(4), + nodeTestIds2.at(8)}; + CPPUNIT_ASSERT_EQUAL_MESSAGE("Not supposed to have known nodes", + true, + nodesCheck == buckIt.getNodeIds()); + } + + break; + + case 2: { + std::set<NodeId> nodesCheck {nodeTestIds2.at(6), nodeTestIds2.at(7)}; + CPPUNIT_ASSERT_EQUAL_MESSAGE("Not supposed to have known nodes", + true, + nodesCheck == buckIt.getNodeIds()); + } break; + + case 3: + CPPUNIT_ASSERT_EQUAL_MESSAGE("Supposed to have node ntc2 9", + true, + buckIt.hasNode(nodeTestChannels2.at(9)->deviceId())); + break; + } + + sm3BucketCounter++; + } + CPPUNIT_ASSERT_EQUAL_MESSAGE("ERROR", 3, sm3BucketCounter - 1); +} + +void +RoutingTableTest::testSendKnownNodes_1b() +{ + std::cout << "\ntestSendKnownNodes" << std::endl; + + auto sm1 = std::make_shared<SwarmManager>(nodeTestIds2.at(0)); + auto sm2 = std::make_shared<SwarmManager>(nodeTestIds3.at(0)); + + swarmManagers.insert({sm1->getId(), sm1}); + swarmManagers.insert({sm2->getId(), sm2}); + + auto& rt1 = sm1->getRoutingTable(); + auto& rt2 = sm2->getRoutingTable(); + + auto bucket1 = rt1.findBucket(nodeTestIds2.at(0)); + auto bucket2 = rt2.findBucket(nodeTestIds3.at(0)); + + for (size_t i = 0; i < nodeTestChannels3.size(); i++) { + auto node = nodeTestChannels3.at(i)->deviceId(); + if (node != sm1->getId() && node != sm2->getId()) { + bucket2->addNode(nodeTestChannels3.at(i)); + } + } + + std::vector<NodeId> node2Co = { + NodeId("41a05179e4b3e42c3409b10280bb448d5bbd5ef64784b997d2d1663457bb6ba8")}; + needSocketCallBack(sm1); + + sm1->setKnownNodes(node2Co); + + auto start = std::chrono::steady_clock::now(); + bool cn1 {false}, cn2 {false}; + + auto isGood = [&] { + return (cn1 and cn2); + }; + do { + std::this_thread::sleep_for(1s); + cn1 = bucket1->hasConnectingNode(nodeTestIds3.at(2)); + cn2 = bucket1->hasConnectingNode(nodeTestIds3.at(3)); + + if (isGood()) + break; + } while (std::chrono::steady_clock::now() - start < 10s); + + CPPUNIT_ASSERT(isGood()); +} + +void +RoutingTableTest::testSendKnownNodes_multipleb() +{ + std::cout << "\ntestSendKnownNodes_multipleb" << std::endl; + + auto sm1 = std::make_shared<SwarmManager>(nodeTestIds2.at(8)); + auto sm2 = std::make_shared<SwarmManager>(nodeTestIds3.at(0)); + + swarmManagers.insert({sm1->getId(), sm1}); + swarmManagers.insert({sm2->getId(), sm2}); + + auto& rt1 = sm1->getRoutingTable(); + auto& rt2 = sm2->getRoutingTable(); + + for (size_t i = 0; i < nodeTestIds2.size(); i++) { + if (i != 1 && i != 0) { + auto bucket1 = rt1.findBucket(nodeTestIds2.at(i)); + rt1.addNode(nodeTestChannels2.at(i), bucket1); + } + + auto bucket2 = rt2.findBucket(nodeTestIds3.at(i)); + rt2.addNode(nodeTestChannels3.at(i), bucket2); + } + + std::vector<NodeId> node2Co = { + NodeId("41a05179e4b3e42c3409b10280bb448d5bbd5ef64784b997d2d1663457bb6ba8")}; + needSocketCallBack(sm1); + + sm1->setKnownNodes(node2Co); + + auto bucket1 = rt1.findBucket(nodeTestIds3.at(1)); + auto bucket2 = rt1.findBucket(nodeTestIds3.at(3)); + + auto start = std::chrono::steady_clock::now(); + bool cn1 {false}, cn2 {false}; + auto isGood = [&] { + return (cn1 or cn2); + }; + do { + std::this_thread::sleep_for(1s); + cn1 = bucket1->hasConnectingNode(nodeTestIds3.at(1)); + cn2 = bucket2->hasConnectingNode(nodeTestIds3.at(3)); + + } while (not isGood() and std::chrono::steady_clock::now() - start < 10s); + + CPPUNIT_ASSERT(isGood()); +} + +void +RoutingTableTest::testMobileNodeFunctions() +{ + std::cout << "\ntestMobileNodeFunctions" << std::endl; + + RoutingTable rt; + NodeId node1 = nodeTestIds1.at(0); + NodeId node2 = nodeTestIds1.at(1); + NodeId node3 = nodeTestIds1.at(2); + + rt.setId(node1); + rt.addMobileNode(node1); + rt.addMobileNode(node2); + rt.addMobileNode(node3); + + CPPUNIT_ASSERT(!rt.hasMobileNode(node1)); + CPPUNIT_ASSERT(rt.hasMobileNode(node2)); + CPPUNIT_ASSERT(rt.hasMobileNode(node3)); + + auto mobileNodes = rt.getMobileNodes(); + CPPUNIT_ASSERT(mobileNodes.size() == 2); + + rt.removeMobileNode(node2); + rt.removeMobileNode(node3); + + CPPUNIT_ASSERT(!rt.hasMobileNode(node2)); + CPPUNIT_ASSERT(!rt.hasMobileNode(node3)); +} + +void +RoutingTableTest::testMobileNodeAnnouncement() +{ + std::cout << "\ntestMobileNodeAnnouncement" << std::endl; + + auto sm1 = std::make_shared<SwarmManager>(nodeTestIds1.at(0)); + auto sm2 = std::make_shared<SwarmManager>(nodeTestIds2.at(1)); + + swarmManagers.insert({sm1->getId(), sm1}); + swarmManagers.insert({sm2->getId(), sm2}); + sm2->setMobility(true); + + std::vector<NodeId> node2Co = { + NodeId("41a05179e4b3e42c3409b10280bb448d5bbd5ef64784b997d2d1663457bb6ba8")}; + + needSocketCallBack(sm1); + + sm1->setKnownNodes(node2Co); + sleep(1); + auto& rt1 = sm1->getRoutingTable(); + + CPPUNIT_ASSERT_EQUAL_MESSAGE( + "Supposed to have", + true, + rt1.hasNode(NodeId("41a05179e4b3e42c3409b10280bb448d5bbd5ef64784b997d2d1663457bb6ba8"))); + + sm2->shutdown(); + + auto mb1 = rt1.getMobileNodes(); + + std::vector<NodeId> node2Test = { + NodeId("41a05179e4b3e42c3409b10280bb448d5bbd5ef64784b997d2d1663457bb6ba8")}; + + CPPUNIT_ASSERT_EQUAL_MESSAGE("Supposed to be identical", true, node2Test == mb1); +} + +void +RoutingTableTest::testMobileNodeSplit() +{ + std::cout << "\ntestMobileNodeSplit" << std::endl; + + SwarmManager sm1(nodeTestIds1.at(0)); + + auto& rt1 = sm1.getRoutingTable(); + + for (size_t i = 0; i < nodeTestIds1.size(); i++) { + rt1.addNode(nodeTestChannels1.at(i)); + } + + sm1.setMobileNodes(nodeTestIds2); + + auto& buckets = rt1.getBuckets(); + + rt1.printRoutingTable(); + + unsigned counter = 1; + + for (auto& buckIt : buckets) { + switch (counter) { + case 1: + CPPUNIT_ASSERT_EQUAL_MESSAGE("Not supposed to have", + false, + buckIt.hasMobileNode(nodeTestIds2.at(0))); + break; + + case 4: { + std::set<NodeId> nodesCheck {nodeTestIds2.at(2), + nodeTestIds2.at(3), + nodeTestIds2.at(4), + nodeTestIds2.at(8)}; + CPPUNIT_ASSERT_EQUAL_MESSAGE("Not supposed to have known nodes", + true, + nodesCheck == buckIt.getMobileNodes()); + } + + break; + + case 5: { + std::set<NodeId> nodesCheck {nodeTestIds2.at(5), + nodeTestIds2.at(6), + nodeTestIds2.at(7), + nodeTestIds2.at(9)}; + CPPUNIT_ASSERT_EQUAL_MESSAGE("Not supposed to have known nodes", + true, + nodesCheck == buckIt.getMobileNodes()); + } + + break; + } + + counter++; + } +} + +void +RoutingTableTest::testSendMobileNodes() +{ + std::cout << "\ntestSendMobileNodes" << std::endl; + + auto sm1 = std::make_shared<SwarmManager>(nodeTestIds2.at(8)); + auto sm2 = std::make_shared<SwarmManager>(nodeTestIds3.at(0)); + + std::cout << sm1->getId() << std::endl; + + swarmManagers.insert({sm1->getId(), sm1}); + swarmManagers.insert({sm2->getId(), sm2}); + + auto& rt1 = sm1->getRoutingTable(); + auto& rt2 = sm2->getRoutingTable(); + + for (size_t i = 0; i < nodeTestIds2.size(); i++) { + if (i != 1 && i != 0) { + auto bucket1 = rt1.findBucket(nodeTestIds2.at(i)); + rt1.addNode(nodeTestChannels2.at(i), bucket1); + } + + auto bucket2 = rt2.findBucket(nodeTestIds3.at(i)); + rt2.addNode(nodeTestChannels3.at(i), bucket2); + } + + std::vector<NodeId> mobileNodes + = {NodeId("4000000000000000000000000000000000000000000000000000000000000000"), + NodeId("8000000000000000000000000000000000000000000000000000000000000000")}; + sm2->setMobileNodes(mobileNodes); + + std::vector<NodeId> node2Co = { + NodeId("41a05179e4b3e42c3409b10280bb448d5bbd5ef64784b997d2d1663457bb6ba8")}; + needSocketCallBack(sm1); + + sm1->setKnownNodes(node2Co); + + sleep(4); + + auto bucket1 = rt1.findBucket(sm1->getId()); + auto bucket2 = rt2.findBucket(sm2->getId()); + + CPPUNIT_ASSERT_EQUAL_MESSAGE("Supposed to have", + true, + bucket1->hasMobileNode(mobileNodes.at(0))); + CPPUNIT_ASSERT_EQUAL_MESSAGE("Supposed to have", + false, + bucket1->hasMobileNode(mobileNodes.at(1))); + CPPUNIT_ASSERT_EQUAL_MESSAGE("Supposed to have", false, rt1.hasMobileNode(mobileNodes.at(1))); + + CPPUNIT_ASSERT_EQUAL_MESSAGE("Supposed to have", + true, + bucket2->hasMobileNode(mobileNodes.at(0))); + CPPUNIT_ASSERT_EQUAL_MESSAGE("Supposed to have", true, rt2.hasMobileNode(mobileNodes.at(1))); +} + +void +RoutingTableTest::crossNodes(NodeId nodeId) +{ + std::list<NodeId> pendingNodes {nodeId}; + discoveredNodes.clear(); + + for (const auto& curNode : pendingNodes) { + if (std::find(discoveredNodes.begin(), discoveredNodes.end(), curNode) + == discoveredNodes.end()) { + if (discoveredNodes.emplace_back(curNode)) { + if (auto sm = getManager(curNode)) + for (auto const& node : sm->getRoutingTable().getNodes()) { + pendingNodes.emplace_back(node); + } + } + } + } +} + +void +RoutingTableTest::testSwarmManagersSmallBootstrapList() +{ + std::cout << "\ntestSwarmManagersSmallBootstrapList" << std::endl; + + for (const auto& sm : swarmManagers) { + needSocketCallBack(sm.second); + } + + Counter counter(swarmManagers.size()); + for (const auto& sm : swarmManagers) { + dht::ThreadPool::computation().run([&] { + std::vector<NodeId> randIds(BOOTSTRAP_SIZE); + std::uniform_int_distribution<size_t> distribution(0, randomNodeIds.size() - 1); + std::generate(randIds.begin(), randIds.end(), [&] { + return randomNodeIds[distribution(rd)]; + }); + sm.second->setKnownNodes(randIds); + counter.count(); + }); + } + + counter.wait(); + + std::cout << "Waiting " << time * 2 << "s..." << std::endl; + sleep(time * 2); + + crossNodes(swarmManagers.begin()->first); + distribution(); + + CPPUNIT_ASSERT_EQUAL(swarmManagers.size(), discoveredNodes.size()); +} + +void +RoutingTableTest::testRoutingTableForConnectingNode() +{ + std::cout << "\ntestRoutingTableForConnectingNode" << std::endl; + + for (const auto& sm : swarmManagers) { + needSocketCallBack(sm.second); + } + + Counter counter(swarmManagers.size()); + for (const auto& sm : swarmManagers) { + dht::ThreadPool::computation().run([&] { + std::vector<NodeId> randIds(BOOTSTRAP_SIZE); + std::uniform_int_distribution<size_t> distribution(0, randomNodeIds.size() - 1); + std::generate(randIds.begin(), randIds.end(), [&] { + return randomNodeIds[distribution(rd)]; + }); + sm.second->setKnownNodes(randIds); + counter.count(); + }); + } + counter.wait(); + + auto sm1 = std::make_shared<SwarmManager>(nodeTestIds3.at(0)); + auto sm2 = std::make_shared<SwarmManager>(nodeTestIds3.at(1)); + + swarmManagers.insert({sm1->getId(), sm1}); + swarmManagers.insert({sm2->getId(), sm2}); + + needSocketCallBack(sm1); + needSocketCallBack(sm2); + + std::vector<NodeId> knownNodesSm1({randomNodeIds.at(2), randomNodeIds.at(3)}); + std::vector<NodeId> knownNodesSm2({randomNodeIds.at(4), randomNodeIds.at(5)}); + + sm1->setKnownNodes(knownNodesSm1); + sm2->setKnownNodes(knownNodesSm2); + + sleep(10); + + crossNodes(swarmManagers.begin()->first); + CPPUNIT_ASSERT_EQUAL(swarmManagers.size(), discoveredNodes.size()); +} + +void +RoutingTableTest::testRoutingTableForShuttingNode() +{ + std::cout << "\ntestRoutingTableForShuttingNode" << std::endl; + + for (const auto& sm : swarmManagers) { + needSocketCallBack(sm.second); + } + + Counter counter(swarmManagers.size()); + for (const auto& sm : swarmManagers) { + dht::ThreadPool::computation().run([&] { + std::vector<NodeId> randIds(BOOTSTRAP_SIZE); + std::uniform_int_distribution<size_t> distribution(0, randomNodeIds.size() - 1); + std::generate(randIds.begin(), randIds.end(), [&] { + return randomNodeIds[distribution(rd)]; + }); + sm.second->setKnownNodes(randIds); + counter.count(); + }); + } + + counter.wait(); + + auto sm1 = std::make_shared<SwarmManager>(nodeTestIds3.at(0)); + auto sm1Id = sm1->getId(); + + swarmManagers.emplace(sm1->getId(), sm1); + needSocketCallBack(sm1); + + std::vector<NodeId> knownNodesSm1({randomNodeIds.at(2), randomNodeIds.at(3)}); + sm1->setKnownNodes(knownNodesSm1); + + sleep(10); + + crossNodes(swarmManagers.begin()->first); + CPPUNIT_ASSERT_EQUAL(swarmManagers.size(), discoveredNodes.size()); + + for (const auto& sm : swarmManagers) { + if (sm.first != nodeTestIds3.at(0)) { + swarmManagersTest_.emplace(sm); + } + } + + auto it1 = swarmManagers.find(sm1Id); + swarmManagers.erase(it1); + + auto it2 = channelSockets_.find(sm1Id); + channelSockets_.erase(it2); + + sm1 = {}; + sleep(5); + for (const auto& sm : swarmManagersTest_) { + auto& a = sm.second->getRoutingTable(); + CPPUNIT_ASSERT(!a.hasNode(sm1Id)); + } +} + +void +RoutingTableTest::testRoutingTableForMassShuttingsNodes() +{ + std::cout << "\ntestRoutingTableForMassShuttingsNodes" << std::endl; + std::vector<NodeId> swarmToCompare; + + for (const auto& sm : swarmManagers) { + needSocketCallBack(sm.second); + swarmManagersTest_.emplace(sm); + swarmToCompare.emplace_back(sm.first); + } + + Counter counter(swarmManagers.size()); + for (const auto& sm : swarmManagers) { + dht::ThreadPool::computation().run([&] { + std::vector<NodeId> randIds(BOOTSTRAP_SIZE); + std::uniform_int_distribution<size_t> distribution(0, randomNodeIds.size() - 1); + std::generate(randIds.begin(), randIds.end(), [&] { + return randomNodeIds[distribution(rd)]; + }); + sm.second->setKnownNodes(randIds); + + counter.count(); + }); + } + counter.wait(); + + std::cout << "Waiting " << time * 2 << "s... " << std::endl; + sleep(time * 2); + + crossNodes(swarmManagers.begin()->first); + + CPPUNIT_ASSERT_EQUAL(swarmManagers.size(), discoveredNodes.size()); + + // ADDING NEW NODES TO NETWORK + for (size_t i = 0; i < nodeTestIds1.size(); i++) { + auto sm = std::make_shared<SwarmManager>(nodeTestIds1.at(i)); + auto smId = sm->getId(); + swarmManagers.emplace(smId, sm); + needSocketCallBack(sm); + std::vector<NodeId> knownNodesSm({randomNodeIds.at(2), randomNodeIds.at(3)}); + sm->setKnownNodes(knownNodesSm); + } + + sleep(time * 3); + crossNodes(swarmManagers.begin()->first); + + CPPUNIT_ASSERT_EQUAL(swarmManagers.size(), discoveredNodes.size()); + + // SHUTTING DOWN ADDED NODES + std::lock_guard<std::mutex> lk(channelSocketsMtx_); + for (auto& nodes : nodeTestIds1) { + auto it = swarmManagers.find(nodes); + if (it != swarmManagers.end()) { + it->second->shutdown(); + channelSockets_.erase(it->second->getId()); + swarmManagers.erase(it); + } + } + + sleep(time * 2); + + crossNodes(swarmManagers.begin()->first); + + CPPUNIT_ASSERT_EQUAL(swarmManagers.size(), discoveredNodes.size()); + + for (const auto& sm : swarmManagersTest_) { + for (size_t i = 0; i < nodeTestIds1.size(); i++) { + auto& a = sm.second->getRoutingTable(); + if (!a.hasNode(nodeTestIds1.at(i))) { + CPPUNIT_ASSERT(true); + } else { + CPPUNIT_ASSERT(false); + } + } + } +} + +void +RoutingTableTest::testSwarmManagersWMobileModes() +{ + std::cout << "\testSwarmManagersWMobileModes" << std::endl; + + for (const auto& sm : swarmManagers) { + needSocketCallBack(sm.second); + } + + Counter counter(swarmManagers.size()); + for (const auto& sm : swarmManagers) { + dht::ThreadPool::computation().run([&] { + std::vector<NodeId> randIds(BOOTSTRAP_SIZE); + std::uniform_int_distribution<size_t> distribution(0, randomNodeIds.size() - 1); + std::generate(randIds.begin(), randIds.end(), [&] { + return randomNodeIds[distribution(rd)]; + }); + sm.second->setKnownNodes(randIds); + counter.count(); + }); + } + + counter.wait(); + + std::cout << "Waiting " << time << "s..." << std::endl; + sleep(time); + + distribution(); + + crossNodes(swarmManagers.begin()->first); + sleep(2); + CPPUNIT_ASSERT_EQUAL_MESSAGE("Supposed to be equal", + swarmManagers.size(), + discoveredNodes.size()); + + // Shutting down Mobile Nodes + { + std::lock_guard<std::mutex> lk(channelSocketsMtx_); + for (auto it = swarmManagers.begin(); it != swarmManagers.end();) { + if (it->second->isMobile()) { + it->second->shutdown(); + it = swarmManagers.erase(it); + channelSockets_.erase(it->second->getId()); + } else { + ++it; + } + } + } + + sleep(5); + + { + if (!swarmManagers.empty()) { + crossNodes(swarmManagers.begin()->first); + distribution(); + } + } + + sleep(4); + + CPPUNIT_ASSERT_EQUAL_MESSAGE("Supposed to be equal", + swarmManagers.size(), + discoveredNodes.size()); +} + +}; // namespace test +} // namespace jami +RING_TEST_RUNNER(jami::test::RoutingTableTest::name()) diff --git a/test/unitTest/swarm/swarm_spread.cpp b/test/unitTest/swarm/swarm_spread.cpp new file mode 100644 index 0000000000..51c0fc65d8 --- /dev/null +++ b/test/unitTest/swarm/swarm_spread.cpp @@ -0,0 +1,479 @@ +/* + * Copyright (C) 2023 Savoir-faire Linux Inc. + * Author: Fadi Shehadeh <fadi.shehadeh@savoirfairelinux.com> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <https://www.gnu.org/licenses/>. + */ + +#include <cppunit/TestAssert.h> +#include <cppunit/TestFixture.h> +#include <cppunit/extensions/HelperMacros.h> + +#include <algorithm> +#include <msgpack.hpp> +#include <opendht/thread_pool.h> +#include <opendht/utils.h> + +#include <iostream> +#include <fstream> +#include <string> + +#include "../../test_runner.h" +#include "jami.h" +#include "../common.h" +#include "jamidht/swarm/swarm_manager.h" +#include "connectivity/multiplexed_socket.h" +#include "connectivity/peer_connection.h" +#include "nodes.h" + +using namespace std::string_literals; +using namespace std::chrono_literals; +using namespace dht; +using NodeId = dht::PkId; + +namespace jami { +namespace test { + +constexpr size_t nNodes = 5000; + +constexpr size_t BOOTSTRAP_SIZE = 2; +auto time = 300s; + +int TOTAL_HOPS = 0; +int moyenne = 0; +int max = 0; +int min = 10000; + +struct Message +{ + int identifier_; // message identifier + int hops_ = 0; // number of hops + MSGPACK_DEFINE_MAP(identifier_, hops_); +}; + +struct Counter +{ + Counter(unsigned t) + : target(t) + {} + const unsigned target; + unsigned added {0}; + std::mutex mutex; + std::condition_variable cv; + + void count() + { + std::lock_guard<std::mutex> lock(mutex); + ++added; + if (added == target) + cv.notify_one(); + } + bool wait(std::chrono::steady_clock::duration timeout) + { + std::unique_lock<std::mutex> lock(mutex); + return cv.wait_for(lock, timeout, [&] { return added == target; }); + } + void wait() + { + std::unique_lock<std::mutex> lock(mutex); + return cv.wait(lock, [&] { return added == target; }); + } +}; + +class SwarmMessageSpread : public CppUnit::TestFixture +{ +public: + ~SwarmMessageSpread() { libjami::fini(); } + static std::string name() { return "SwarmMessageSpread"; } + + void setUp(); + void tearDown(); + +private: + std::mt19937_64 rd {dht::crypto::getSeededRandomEngine<std::mt19937_64>()}; + std::mutex channelSocketsMtx_; + std::condition_variable channelSocketsCv_; + + std::map<NodeId, std::shared_ptr<jami::SwarmManager>> swarmManagers; + std::map<NodeId, + std::map<NodeId, + std::pair<std::shared_ptr<jami::ChannelSocketTest>, + std::shared_ptr<jami::ChannelSocketTest>>>> + channelSockets_; + std::vector<NodeId> randomNodeIds; + std::vector<std::shared_ptr<jami::SwarmManager>> swarmManagersShuffled; + std::set<NodeId> discoveredNodes; + std::map<NodeId, int> numberTimesReceived; + std::map<NodeId, int> requestsReceived; + std::map<NodeId, int> answersSent; + + int iterations = 0; + + void generateSwarmManagers(); + void needSocketCallBack(const std::shared_ptr<SwarmManager>& sm); + void sendMessage(const std::shared_ptr<ChannelSocketInterface>& socket, Message msg); + void receiveMessage(const NodeId nodeId, const std::shared_ptr<ChannelSocketInterface>& socket); + void relayMessageToRoutingTable(const NodeId nodeId, const NodeId sourceId, const Message msg); + void updateHops(int hops); + void crossNodes(NodeId nodeId); + void displayBucketDistribution(const NodeId& id); + void distribution(); + std::shared_ptr<jami::SwarmManager> getManager(const NodeId& id); + + void testWriteMessage(); + + CPPUNIT_TEST_SUITE(SwarmMessageSpread); + CPPUNIT_TEST(testWriteMessage); + CPPUNIT_TEST_SUITE_END(); +}; + +CPPUNIT_TEST_SUITE_NAMED_REGISTRATION(SwarmMessageSpread, SwarmMessageSpread::name()); + +void +SwarmMessageSpread::setUp() +{ + libjami::init( + libjami::InitFlag(libjami::LIBJAMI_FLAG_DEBUG | libjami::LIBJAMI_FLAG_CONSOLE_LOG)); + if (not Manager::instance().initialized) { + CPPUNIT_ASSERT(libjami::start("jami-sample.yml")); + } + + generateSwarmManagers(); +} + +void +SwarmMessageSpread::tearDown() +{} + +void +SwarmMessageSpread::generateSwarmManagers() +{ + for (size_t i = 0; i < nNodes; i++) { + const NodeId node = Hash<32>::getRandom(); + auto sm = std::make_shared<SwarmManager>(node); + swarmManagers[node] = sm; + randomNodeIds.emplace_back(node); + swarmManagersShuffled.emplace_back(sm); + } +} + +std::shared_ptr<jami::SwarmManager> +SwarmMessageSpread::getManager(const NodeId& id) +{ + auto it = swarmManagers.find(id); + return it == swarmManagers.end() ? nullptr : it->second; +} + +void +SwarmMessageSpread::crossNodes(NodeId nodeId) +{ + std::list<NodeId> pendingNodes {nodeId}; + discoveredNodes.clear(); + + for (const auto& curNode : pendingNodes) { + if (discoveredNodes.emplace(curNode).second) { + if (auto sm = getManager(curNode)) + for (const auto& node : sm->getRoutingTable().getNodes()) { + pendingNodes.emplace_back(node); + } + } + } +} + +void +SwarmMessageSpread::sendMessage(const std::shared_ptr<ChannelSocketInterface>& socket, Message msg) +{ + auto buffer = std::make_shared<msgpack::sbuffer>(32); + msgpack::packer<msgpack::sbuffer> pk(buffer.get()); + pk.pack(msg); + + dht::ThreadPool::io().run([socket, buffer = std::move(buffer)] { + std::error_code ec; + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + + socket->write(reinterpret_cast<const unsigned char*>(buffer->data()), buffer->size(), ec); + }); +} + +void +SwarmMessageSpread::receiveMessage(const NodeId nodeId, + const std::shared_ptr<ChannelSocketInterface>& socket) +{ + struct DecodingContext + { + msgpack::unpacker pac {[](msgpack::type::object_type, std::size_t, void*) { return true; }, + nullptr, + 32}; + }; + + socket->setOnRecv([this, + wsocket = std::weak_ptr<ChannelSocketInterface>(socket), + ctx = std::make_shared<DecodingContext>(), + nodeId](const uint8_t* buf, size_t len) { + auto socket = wsocket.lock(); + if (!socket) + return 0lu; + + ctx->pac.reserve_buffer(len); + std::copy_n(buf, len, ctx->pac.buffer()); + ctx->pac.buffer_consumed(len); + + msgpack::object_handle oh; + while (ctx->pac.next(oh)) { + try { + Message msg; + oh.get().convert(msg); + + if (msg.identifier_ == 1) { + std::lock_guard<std::mutex> lk(channelSocketsMtx_); + auto var = numberTimesReceived.find(nodeId); + iterations = iterations + 1; + + if (var != numberTimesReceived.end()) { + var->second += 1; + } else { + Message msgToSend; + msgToSend.identifier_ = 1; + msgToSend.hops_ = msg.hops_ + 1; + numberTimesReceived[nodeId] = 1; + updateHops(msgToSend.hops_); + relayMessageToRoutingTable(nodeId, socket->deviceId(), msgToSend); + } + channelSocketsCv_.notify_all(); + } + + } catch (const std::exception& e) { + JAMI_WARNING("Error DRT recv: {}", e.what()); + return 0lu; + } + } + + return 0lu; + }); +}; + +void +SwarmMessageSpread::updateHops(int hops) +{ + if (hops > TOTAL_HOPS) { + TOTAL_HOPS = hops; + } +} + +void +SwarmMessageSpread::needSocketCallBack(const std::shared_ptr<SwarmManager>& sm) +{ + sm->needSocketCb_ = [this, wsm = std::weak_ptr<SwarmManager>(sm)](const std::string& nodeId, + auto&& onSocket) { + dht::ThreadPool::io().run( + [this, wsm = std::move(wsm), nodeId, onSocket = std::move(onSocket)] { + auto sm = wsm.lock(); + if (!sm) + return; + + NodeId node = DeviceId(nodeId); + if (auto smRemote = getManager(node)) { + auto myId = sm->getId(); + std::unique_lock<std::mutex> lk(channelSocketsMtx_); + auto& cstRemote = channelSockets_[node][myId]; + auto& cstMe = channelSockets_[myId][node]; + if (cstMe.second && cstMe.first) + return; + if (!cstMe.second) { + cstMe.second = std::make_shared<ChannelSocketTest>(node, "test1", 1); + cstRemote.second = std::make_shared<ChannelSocketTest>(myId, "test1", 1); + } + if (!cstMe.first) { + cstRemote.first = std::make_shared<ChannelSocketTest>(myId, "swarm1", 0); + cstMe.first = std::make_shared<ChannelSocketTest>(node, "swarm1", 0); + } + lk.unlock(); + ChannelSocketTest::link(cstMe.second, cstRemote.second); + receiveMessage(myId, cstMe.second); + receiveMessage(node, cstRemote.second); + std::this_thread::sleep_for(std::chrono::seconds(5)); + ChannelSocketTest::link(cstMe.first, cstRemote.first); + smRemote->addChannel(cstRemote.first); + onSocket(cstMe.first); + } + }); + }; +} + +void +SwarmMessageSpread::relayMessageToRoutingTable(const NodeId nodeId, + const NodeId sourceId, + const Message msg) +{ + auto swarmManager = getManager(nodeId); + const auto& routingtable = swarmManager->getRoutingTable().getNodes(); + for (auto& node : routingtable) { + if (node != sourceId) { + auto channelToSend = channelSockets_[nodeId][node].second; + sendMessage(channelToSend, msg); + } + } +} + +void +SwarmMessageSpread::distribution() +{ + std::string const fileName("distrib_nodes_" + std::to_string(nNodes) + ".txt"); + std::ofstream myStream(fileName.c_str()); + + std::vector<unsigned> dist(10); + int mean = 0; + for (const auto& sm : swarmManagers) { + auto val = sm.second->getRoutingTable().getRoutingTableNodeCount(); + if (dist.size() <= val) + dist.resize(val + 1); + + dist[val]++; + } + + for (size_t i = 0; i < dist.size(); i++) { + // std::cout << "Swarm Managers with " << i << " nodes: " << dist[i] << std::endl; + if (myStream) { + myStream << i << "," << dist[i] << std::endl; + } + mean += i * dist[i]; + } + std::cout << "Le noeud avec le plus de noeuds dans sa routing table: " << dist.size() + << std::endl; + std::cout << "Moyenne de nombre de noeuds par Swarm: " << mean / (float) swarmManagers.size() + << std::endl; +} + +void +SwarmMessageSpread::displayBucketDistribution(const NodeId& id) +{ + std::string const fileName("distrib_rt_" + std::to_string(nNodes) + "_" + id.toString() + + ".txt"); + std::ofstream myStream(fileName.c_str()); + + const auto& routingtable = swarmManagers[id]->getRoutingTable().getBuckets(); + + std::cout << "Bucket distribution for node " << id << std::endl; + + for (auto it = routingtable.begin(); it != routingtable.end(); ++it) { + auto lowerLimit = it->getLowerLimit().toString(); + + std::string hex_prefix = lowerLimit.substr(0, 4); // extraire les deux premiers caractères + std::cout << "Bucket " << hex_prefix << " has " << it->getNodesSize() << " nodes" + << std::endl; + + if (myStream) { + myStream << hex_prefix << "," << it->getNodesSize() << std::endl; + } + } +} + +void +SwarmMessageSpread::testWriteMessage() +{ + std::cout << "\ntestWriteMessage()" << std::endl; + for (const auto& sm : swarmManagersShuffled) { + needSocketCallBack(sm); + } + + Counter counter(swarmManagers.size()); + for (const auto& sm : swarmManagers) { + dht::ThreadPool::computation().run([&] { + std::vector<NodeId> randIds(BOOTSTRAP_SIZE); + std::uniform_int_distribution<size_t> distribution(0, randomNodeIds.size() - 1); + std::generate(randIds.begin(), randIds.end(), [&] { + auto dev = randomNodeIds[distribution(rd)]; + return dev; + }); + sm.second->setKnownNodes(randIds); + counter.count(); + }); + } + counter.wait(); + + std::this_thread::sleep_for(time); + + auto& firstNode = *channelSockets_.begin(); + + crossNodes(swarmManagers.begin()->first); + CPPUNIT_ASSERT_EQUAL(swarmManagers.size(), discoveredNodes.size()); + + std::cout << "Sending First Message to " << firstNode.second.size() << std::endl; + auto start = std::chrono::steady_clock::now(); + + numberTimesReceived[firstNode.first] = 1; + + for (const auto& channel : firstNode.second) { + if (channel.second.second) { + sendMessage(channel.second.second, {1, 0}); + } + } + + std::unique_lock<std::mutex> lk(channelSocketsMtx_); + bool ok = channelSocketsCv_.wait_for(lk, 1200s, [&] { + std::cout << "\r" + << "Size of Received " << numberTimesReceived.size(); + return numberTimesReceived.size() == swarmManagers.size(); + }); + auto now = std::chrono::steady_clock::now(); + + std::cout << "#########################################################################" + << std::endl; + std::cout << "Time for everyone to receive the message " << dht::print_duration(now - start) + << std::endl; + std::cout << " IS OK " << ok << std::endl; + + // ############################################################################## + + for (const auto& count : numberTimesReceived) { + moyenne = moyenne + count.second; + + if (count.second > max) { + max = count.second; + } + + if (count.second < min) { + min = count.second; + } + } + + auto it = channelSockets_.begin(); + + displayBucketDistribution((*it).first); + std::advance(it, swarmManagers.size() / 2); + displayBucketDistribution((*it).first); + std::advance(it, swarmManagers.size() / 2 - 1); + displayBucketDistribution((*it).first); + + std::cout << "MOYENNE DE RECEPTION PAR NOEUD [ " << moyenne / (float) numberTimesReceived.size() + << " ] " << std::endl; + std::cout << "MAX DE RECEPTION PAR NOEUD [ " << max << " ] " << std::endl; + std::cout << "MIN DE RECEPTION PAR NOEUD [ " << min << " ] " << std::endl; + + std::cout << "NOMBRE DE SAUTS DIRECTS [ " << TOTAL_HOPS << " ] " << std::endl; + std::cout << "NOMBRE D'ITERATIONS [ " << iterations << " ] " << std::endl; + + distribution(); + std::cout << "#########################################################################" + << std::endl; + + std::cout << "Number of times received " << numberTimesReceived.size() << std::endl; + std::cout << "Number of swarm managers " << swarmManagers.size() << std::endl; + + CPPUNIT_ASSERT(true); +} + +}; // namespace test +} // namespace jami +RING_TEST_RUNNER(jami::test::SwarmMessageSpread::name()) -- GitLab