From 3b0def0923a7f999e79885783e8d2693abd97fd1 Mon Sep 17 00:00:00 2001 From: Fadi SHEHADEH <fadi.shehadeh@savoirfairelinux.com> Date: Mon, 27 Mar 2023 09:45:27 -0400 Subject: [PATCH] DRT: swarm components -created buckets for sockets -created routing table that can have one or more buckets -created a swarm protocol -added swarm channel handler -added manager which includes all the components above -updated the makefile Change-Id: I7973b2bb5e950a9dddd8dedb788f01e0c26f5f9f --- CMakeLists.txt | 2 + src/CMakeLists.txt | 1 + src/jamidht/CMakeLists.txt | 5 +- src/jamidht/Makefile.am | 11 +- src/jamidht/swarm/CMakeLists.txt | 15 + src/jamidht/swarm/routing_table.cpp | 581 ++++++++++++++++++++ src/jamidht/swarm/routing_table.h | 559 +++++++++++++++++++ src/jamidht/swarm/swarm_channel_handler.cpp | 65 +++ src/jamidht/swarm/swarm_channel_handler.h | 73 +++ src/jamidht/swarm/swarm_manager.cpp | 292 ++++++++++ src/jamidht/swarm/swarm_manager.h | 152 +++++ src/jamidht/swarm/swarm_protocol.cpp | 26 + src/jamidht/swarm/swarm_protocol.h | 68 +++ 13 files changed, 1848 insertions(+), 2 deletions(-) create mode 100644 src/jamidht/swarm/CMakeLists.txt create mode 100644 src/jamidht/swarm/routing_table.cpp create mode 100644 src/jamidht/swarm/routing_table.h create mode 100644 src/jamidht/swarm/swarm_channel_handler.cpp create mode 100644 src/jamidht/swarm/swarm_channel_handler.h create mode 100644 src/jamidht/swarm/swarm_manager.cpp create mode 100644 src/jamidht/swarm/swarm_manager.h create mode 100644 src/jamidht/swarm/swarm_protocol.cpp create mode 100644 src/jamidht/swarm/swarm_protocol.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 5b2a235e12..75d0eade02 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -164,6 +164,7 @@ source_group("Source Files\\im" FILES ${Source_Files__im}) source_group("Source Files\\jamidht" FILES ${Source_Files__jamidht}) source_group("Source Files\\jamidht\\eth\\libdevcore" FILES ${Source_Files__jamidht__eth__libdevcore}) source_group("Source Files\\jamidht\\eth\\libdevcrypto" FILES ${Source_Files__jamidht__eth__libdevcrypto}) +source_group("Source Files\\jamidht\\swarm" FILES ${Source_Files__jamidht__swarm}) source_group("Source Files\\media" FILES ${Source_Files__media}) source_group("Source Files\\media\\audio" FILES ${Source_Files__media__audio}) source_group("Source Files\\media\\audio\\audio-processing" FILES ${Source_Files__media__audio__audio_processing}) @@ -191,6 +192,7 @@ list (APPEND ALL_FILES ${Source_Files__jamidht} ${Source_Files__jamidht__eth__libdevcore} ${Source_Files__jamidht__eth__libdevcrypto} + ${Source_Files__jamidht__swarm} ${Source_Files__media} ${Source_Files__media__audio} ${Source_Files__media__audio__sound} diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 3929aacdb8..8c4a7e19d3 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -86,6 +86,7 @@ set (Source_Files__im ${Source_Files__im} PARENT_SCOPE) set (Source_Files__jamidht ${Source_Files__jamidht} PARENT_SCOPE) set (Source_Files__jamidht__eth__libdevcore ${Source_Files__jamidht__eth__libdevcore} PARENT_SCOPE) set (Source_Files__jamidht__eth__libdevcrypto ${Source_Files__jamidht__eth__libdevcrypto} PARENT_SCOPE) +set (Source_Files__jamidht__swarm ${Source_Files__jamidht__swarm} PARENT_SCOPE) set (Source_Files__media ${Source_Files__media} PARENT_SCOPE) set (Source_Files__media__audio ${Source_Files__media__audio} PARENT_SCOPE) set (Source_Files__media__audio__sound ${Source_Files__media__audio__sound} PARENT_SCOPE) diff --git a/src/jamidht/CMakeLists.txt b/src/jamidht/CMakeLists.txt index e8bebfc7d7..078ca361ce 100644 --- a/src/jamidht/CMakeLists.txt +++ b/src/jamidht/CMakeLists.txt @@ -48,8 +48,11 @@ list (APPEND Source_Files__jamidht set (Source_Files__jamidht ${Source_Files__jamidht} PARENT_SCOPE) +add_subdirectory("swarm") add_subdirectory("eth/libdevcore") add_subdirectory("eth/libdevcrypto") +set (Source_Files__jamidht__swarm ${Source_Files__jamidht__swarm} PARENT_SCOPE) set (Source_Files__jamidht__eth__libdevcore ${Source_Files__jamidht__eth__libdevcore} PARENT_SCOPE) -set (Source_Files__jamidht__eth__libdevcrypto ${Source_Files__jamidht__eth__libdevcrypto} PARENT_SCOPE) \ No newline at end of file +set (Source_Files__jamidht__eth__libdevcrypto ${Source_Files__jamidht__eth__libdevcrypto} PARENT_SCOPE) +set (Source_Files__jamidht__swarm ${Source_Files__jamidht__swarm} PARENT_SCOPE) \ No newline at end of file diff --git a/src/jamidht/Makefile.am b/src/jamidht/Makefile.am index 632362026c..cf7bbc8ea9 100644 --- a/src/jamidht/Makefile.am +++ b/src/jamidht/Makefile.am @@ -39,7 +39,16 @@ libjamiacc_la_SOURCES = \ ./jamidht/sync_module.h \ ./jamidht/sync_module.cpp \ ./jamidht/transfer_channel_handler.h \ - ./jamidht/transfer_channel_handler.cpp + ./jamidht/transfer_channel_handler.cpp \ + ./jamidht/swarm/routing_table.h \ + ./jamidht/swarm/routing_table.cpp \ + ./jamidht/swarm/swarm_protocol.h \ + ./jamidht/swarm/swarm_protocol.cpp \ + ./jamidht/swarm/swarm_channel_handler.h \ + ./jamidht/swarm/swarm_channel_handler.cpp \ + ./jamidht/swarm/swarm_manager.h \ + ./jamidht/swarm/swarm_manager.cpp + if RINGNS libjamiacc_la_SOURCES += \ diff --git a/src/jamidht/swarm/CMakeLists.txt b/src/jamidht/swarm/CMakeLists.txt new file mode 100644 index 0000000000..426702bc9e --- /dev/null +++ b/src/jamidht/swarm/CMakeLists.txt @@ -0,0 +1,15 @@ +################################################################################ +# Source groups - swarm +################################################################################ +list (APPEND Source_Files__jamidht__swarm + "${CMAKE_CURRENT_SOURCE_DIR}/routing_table.h" + "${CMAKE_CURRENT_SOURCE_DIR}/routing_table.cpp" + "${CMAKE_CURRENT_SOURCE_DIR}/swarm_channel_handler.h" + "${CMAKE_CURRENT_SOURCE_DIR}/swarm_channel_handler.cpp" + "${CMAKE_CURRENT_SOURCE_DIR}/swarm_manager.h" + "${CMAKE_CURRENT_SOURCE_DIR}/swarm_manager.cpp" + "${CMAKE_CURRENT_SOURCE_DIR}/swarm_protocol.h" + "${CMAKE_CURRENT_SOURCE_DIR}/swarm_protocol.cpp" +) + +set (Source_Files__jamidht__swarm ${Source_Files__jamidht__swarm} PARENT_SCOPE) \ No newline at end of file diff --git a/src/jamidht/swarm/routing_table.cpp b/src/jamidht/swarm/routing_table.cpp new file mode 100644 index 0000000000..c3dc468511 --- /dev/null +++ b/src/jamidht/swarm/routing_table.cpp @@ -0,0 +1,581 @@ +/* + * Copyright (C) 2023 Savoir-faire Linux Inc. + * + * Author: Fadi Shehadeh <fadi.shehadeh@savoirfairelinux.com> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#include "routing_table.h" +#include "connectivity/multiplexed_socket.h" +#include "opendht/infohash.h" + +#include <math.h> +#include <stdio.h> +#include <iostream> +#include <iterator> +#include <stdlib.h> +#include <time.h> + +constexpr const std::chrono::minutes FIND_PERIOD {10}; +using namespace std::placeholders; + +namespace jami { + +using namespace dht; + +Bucket::Bucket(const NodeId& id) + : lowerLimit_(id) +{} + +bool +Bucket::addNode(const std::shared_ptr<ChannelSocketInterface>& socket) +{ + return addNode(std::move(NodeInfo(socket))); +} + +bool +Bucket::addNode(NodeInfo&& info) +{ + auto nodeId = info.socket->deviceId(); + if (nodes.try_emplace(nodeId, std::move(info)).second) { + connecting_nodes.erase(nodeId); + known_nodes.erase(nodeId); + mobile_nodes.erase(nodeId); + return true; + } + return false; +} + +bool +Bucket::removeNode(const NodeId& nodeId) +{ + auto node = nodes.find(nodeId); + auto isMobile = node->second.isMobile_; + if (node == nodes.end()) + return false; + nodes.erase(nodeId); + if (isMobile) { + addMobileNode(nodeId); + } else { + addKnownNode(nodeId); + } + + return true; +} + +std::set<NodeId> +Bucket::getNodeIds() const +{ + std::set<NodeId> nodesId; + for (auto const& key : nodes) + nodesId.insert(key.first); + return nodesId; +} + +bool +Bucket::hasNode(const NodeId& nodeId) const +{ + return nodes.find(nodeId) != nodes.end(); +} + +bool +Bucket::addKnownNode(const NodeId& nodeId) +{ + if (!hasNode(nodeId)) { + if (known_nodes.emplace(nodeId).second) { + return true; + } + } + return false; +} + +NodeId +Bucket::getKnownNode(unsigned index) const +{ + if (index > known_nodes.size()) { + throw std::out_of_range("End of table for get known Node Id " + std::to_string(index)); + } + auto it = known_nodes.begin(); + std::advance(it, index); + + return *it; +} + +bool +Bucket::addMobileNode(const NodeId& nodeId) +{ + if (!hasNode(nodeId)) { + if (mobile_nodes.emplace(nodeId).second) { + known_nodes.erase(nodeId); + return true; + } + } + return false; +} + +bool +Bucket::addConnectingNode(const NodeId& nodeId) +{ + if (!hasNode(nodeId)) { + if (connecting_nodes.emplace(nodeId).second) { + known_nodes.erase(nodeId); + mobile_nodes.erase(nodeId); + return true; + } + } + return false; +} + +std::set<NodeId> +Bucket::getKnownNodesRandom(unsigned numberNodes, std::mt19937_64& rd) const +{ + std::set<NodeId> nodesToReturn; + + if (getKnownNodesSize() <= numberNodes) + return getKnownNodes(); + + std::uniform_int_distribution<unsigned> distrib(0, getKnownNodesSize() - 1); + + while (nodesToReturn.size() < numberNodes) { + nodesToReturn.emplace(getKnownNode(distrib(rd))); + } + + return nodesToReturn; +} + +asio::steady_timer& +Bucket::getNodeTimer(const std::shared_ptr<ChannelSocketInterface>& socket) +{ + auto node = nodes.find(socket->deviceId()); + if (node == nodes.end()) { + throw std::range_error("Can't find timer " + socket->deviceId().toString()); + } + return node->second.refresh_timer; +} + +bool +Bucket::shutdownNode(const NodeId& nodeId) +{ + auto node = nodes.find(nodeId); + + if (node != nodes.end()) { + auto socket = node->second.socket; + auto node = socket->deviceId(); + socket->shutdown(); + removeNode(node); + return true; + } + return false; +} + +void +Bucket::shutdownAllNodes() +{ + while (not nodes.empty()) { + auto it = nodes.begin(); + auto socket = it->second.socket; + auto nodeId = socket->deviceId(); + socket->shutdown(); + removeNode(nodeId); + } +} + +void +Bucket::printBucket(unsigned number) const +{ + JAMI_ERROR("BUCKET Number: {:d}", number); + + unsigned nodeNum = 1; + for (auto it = nodes.begin(); it != nodes.end(); ++it) { + JAMI_DEBUG("Node {:s} Id: {:s}", std::to_string(nodeNum), it->first.toString()); + nodeNum++; + } + JAMI_ERROR("Mobile Nodes"); + nodeNum = 0; + for (auto it = mobile_nodes.begin(); it != mobile_nodes.end(); ++it) { + JAMI_DEBUG("Node {:s} Id: {:s}", std::to_string(nodeNum), (*it).toString()); + nodeNum++; + } + + JAMI_ERROR("Known Nodes"); + nodeNum = 0; + for (auto it = known_nodes.begin(); it != known_nodes.end(); ++it) { + JAMI_DEBUG("Node {:s} Id: {:s}", std::to_string(nodeNum), (*it).toString()); + nodeNum++; + } + JAMI_ERROR("Connecting_nodes"); + nodeNum = 0; + for (auto it = connecting_nodes.begin(); it != connecting_nodes.end(); ++it) { + JAMI_DEBUG("Node {:s} Id: {:s}", std::to_string(nodeNum), (*it).toString()); + nodeNum++; + } +}; + +void +Bucket::changeMobility(const NodeId& nodeId, bool isMobile) +{ + auto itn = nodes.find(nodeId); + if (itn != nodes.end()) { + itn->second.isMobile_ = isMobile; + } +} + +// For tests + +std::set<std::shared_ptr<ChannelSocketInterface>> +Bucket::getNodeSockets() const +{ + std::set<std::shared_ptr<ChannelSocketInterface>> sockets; + for (auto const& info : nodes) + sockets.insert(info.second.socket); + return sockets; +} + +// #################################################################################################### + +RoutingTable::RoutingTable() +{ + buckets.emplace_back(NodeId::zero()); +} + +bool +RoutingTable::addNode(std::shared_ptr<ChannelSocketInterface> socket) +{ + auto bucket = findBucket(socket->deviceId()); + return addNode(socket, bucket); +} + +bool +RoutingTable::addNode(std::shared_ptr<ChannelSocketInterface> channel, + std::list<Bucket>::iterator& bucket) +{ + NodeId nodeId = channel->deviceId(); + + if (bucket->hasNode(nodeId) || id_ == nodeId) { + return false; + } + + while (bucket->isFull()) { + if (contains(bucket, id_)) { + split(bucket); + bucket = findBucket(nodeId); + + } else { + return bucket->addNode(std::move(channel)); + } + } + return bucket->addNode(std::move(channel)); +} + +bool +RoutingTable::removeNode(const NodeId& nodeId) +{ + return findBucket(nodeId)->removeNode(nodeId); +} + +bool +RoutingTable::hasNode(const NodeId& nodeId) +{ + return findBucket(nodeId)->hasNode(nodeId); +} + +bool +RoutingTable::addKnownNode(const NodeId& nodeId) +{ + if (id_ == nodeId) + return false; + + auto bucket = findBucket(nodeId); + + if (bucket == buckets.end()) + return 0; + + bucket->addKnownNode(nodeId); + return 1; +} + +bool +RoutingTable::addMobileNode(const NodeId& nodeId) +{ + if (id_ == nodeId) + return false; + + auto bucket = findBucket(nodeId); + + if (bucket == buckets.end()) + return 0; + + bucket->addMobileNode(nodeId); + return 1; +} + +void +RoutingTable::removeMobileNode(const NodeId& nodeId) +{ + return findBucket(nodeId)->removeMobileNode(nodeId); +} + +bool +RoutingTable::hasMobileNode(const NodeId& nodeId) +{ + return findBucket(nodeId)->hasMobileNode(nodeId); +}; + +bool +RoutingTable::addConnectingNode(const NodeId& nodeId) +{ + if (id_ == nodeId) + return false; + + auto bucket = findBucket(nodeId); + + if (bucket == buckets.end()) + return 0; + + bucket->addConnectingNode(nodeId); + return 1; +} + +void +RoutingTable::removeConnectingNode(const NodeId& nodeId) +{ + findBucket(nodeId)->removeConnectingNode(nodeId); +} + +std::list<Bucket>::iterator +RoutingTable::findBucket(const NodeId& nodeId) +{ + if (buckets.empty()) + throw std::runtime_error("No bucket"); + + auto b = buckets.begin(); + + while (true) { + auto next = std::next(b); + if (next == buckets.end()) + return b; + if (std::memcmp(nodeId.data(), next->getLowerLimit().data(), nodeId.size()) < 0) + return b; + b = next; + } +} + +std::vector<NodeId> +RoutingTable::closestNodes(const NodeId& nodeId, unsigned count) +{ + std::vector<NodeId> closestNodes; + auto bucket = findBucket(nodeId); + auto sortedBucketInsert = [&](const std::list<Bucket>::iterator& b) { + auto nodes = b->getNodeIds(); + for (auto n : nodes) { + if (n != nodeId) { + auto here = std::find_if(closestNodes.begin(), + closestNodes.end(), + [&nodeId, &n](NodeId& NodeId) { + return nodeId.xorCmp(n, NodeId) < 0; + }); + + closestNodes.insert(here, n); + } + } + }; + + auto itn = bucket; + auto itp = (bucket == buckets.begin()) ? buckets.end() : std::prev(bucket); + while (itn != buckets.end() || itp != buckets.end()) { + if (itn != buckets.end()) { + sortedBucketInsert(itn); + itn = std::next(itn); + } + if (itp != buckets.end()) { + sortedBucketInsert(itp); + itp = (itp == buckets.begin()) ? buckets.end() : std::prev(itp); + } + } + + if (closestNodes.size() > count) { + closestNodes.resize(count); + } + + return closestNodes; +} + +void +RoutingTable::printRoutingTable() const +{ + int counter = 1; + for (auto it = buckets.begin(); it != buckets.end(); ++it) { + it->printBucket(counter); + counter++; + } + JAMI_DEBUG("_____________________________________________________________________________"); +} + +void +RoutingTable::shutdownNode(const NodeId& nodeId) +{ + findBucket(nodeId)->shutdownNode(nodeId); +} + +std::vector<NodeId> +RoutingTable::getNodes() const +{ + std::lock_guard<std::mutex> lock(mutex_); + std::vector<NodeId> ret; + for (const auto& b : buckets) { + const auto& nodes = b.getNodeIds(); + ret.insert(ret.end(), nodes.begin(), nodes.end()); + } + return ret; +} + +std::vector<NodeId> +RoutingTable::getKnownNodes() const +{ + std::vector<NodeId> ret; + for (const auto& b : buckets) { + const auto& nodes = b.getKnownNodes(); + ret.insert(ret.end(), nodes.begin(), nodes.end()); + } + return ret; +} + +std::vector<NodeId> +RoutingTable::getMobileNodes() const +{ + std::vector<NodeId> ret; + for (const auto& b : buckets) { + const auto& nodes = b.getMobileNodes(); + ret.insert(ret.end(), nodes.begin(), nodes.end()); + } + return ret; +} + +std::vector<NodeId> +RoutingTable::getConnectingNodes() const +{ + std::vector<NodeId> ret; + for (const auto& b : buckets) { + const auto& nodes = b.getConnectingNodes(); + ret.insert(ret.end(), nodes.begin(), nodes.end()); + } + return ret; +} + +std::vector<NodeId> +RoutingTable::getBucketMobileNodes() const +{ + std::vector<NodeId> ret; + auto bucket = findBucket(id_); + const auto& nodes = bucket->getMobileNodes(); + ret.insert(ret.end(), nodes.begin(), nodes.end()); + + return ret; +} + +bool +RoutingTable::contains(const std::list<Bucket>::iterator& bucket, const NodeId& nodeId) const +{ + return NodeId::cmp(bucket->getLowerLimit(), nodeId) <= 0 + && (std::next(bucket) == buckets.end() + || NodeId::cmp(nodeId, std::next(bucket)->getLowerLimit()) < 0); +} + +NodeId +RoutingTable::middle(std::list<Bucket>::iterator& it) const +{ + unsigned bit = depth(it); + if (bit >= 8 * HASH_LEN) + throw std::out_of_range("End of table"); + + NodeId id = it->getLowerLimit(); + id.setBit(bit, true); + return id; +} + +unsigned +RoutingTable::depth(std::list<Bucket>::iterator& bucket) const +{ + int bit1 = bucket->getLowerLimit().lowbit(); + int bit2 = std::next(bucket) != buckets.end() ? std::next(bucket)->getLowerLimit().lowbit() + : -1; + return std::max(bit1, bit2) + 1; +} + +bool +RoutingTable::split(std::list<Bucket>::iterator& bucket) +{ + NodeId id = middle(bucket); + auto newBucketIt = buckets.emplace(std::next(bucket), id); + // Re-assign nodes + auto& nodeSwap = bucket->getNodes(); + + for (auto it = nodeSwap.begin(); it != nodeSwap.end();) { + auto& node = *it; + + auto nodeId = it->first; + + if (!contains(bucket, nodeId)) { + newBucketIt->addNode(std::move(node.second)); + it = nodeSwap.erase(it); + } else { + ++it; + } + } + + auto connectingSwap = bucket->getConnectingNodes(); + for (auto it = connectingSwap.begin(); it != connectingSwap.end();) { + auto nodeId = *it; + + if (!contains(bucket, nodeId)) { + newBucketIt->addConnectingNode(nodeId); + it = connectingSwap.erase(it); + bucket->removeConnectingNode(nodeId); + } else { + ++it; + } + } + + auto knownSwap = bucket->getKnownNodes(); + for (auto it = knownSwap.begin(); it != knownSwap.end();) { + auto nodeId = *it; + + if (!contains(bucket, nodeId)) { + newBucketIt->addKnownNode(nodeId); + it = knownSwap.erase(it); + bucket->removeKnownNode(nodeId); + } else { + ++it; + } + } + + auto mobileSwap = bucket->getMobileNodes(); + for (auto it = mobileSwap.begin(); it != mobileSwap.end();) { + auto nodeId = *it; + + if (!contains(bucket, nodeId)) { + newBucketIt->addMobileNode(nodeId); + it = mobileSwap.erase(it); + bucket->removeMobileNode(nodeId); + } else { + ++it; + } + } + + return true; +} + +} // namespace jami \ No newline at end of file diff --git a/src/jamidht/swarm/routing_table.h b/src/jamidht/swarm/routing_table.h new file mode 100644 index 0000000000..c20d5a9cb5 --- /dev/null +++ b/src/jamidht/swarm/routing_table.h @@ -0,0 +1,559 @@ +/* + * Copyright (C) 2023 Savoir-faire Linux Inc. + * + * Author: Fadi Shehadeh <fadi.shehadeh@savoirfairelinux.com> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#pragma once + +#include "manager.h" + +#include <opendht/infohash.h> +#include <vector> +#include <memory> +#include <list> +#include <set> +#include <algorithm> +#include <asio.hpp> +#include <asio/detail/deadline_timer_service.hpp> + +using NodeId = dht::PkId; + +namespace jami { + +class ChannelSocketInterface; +class io_context; + +static constexpr const std::chrono::minutes FIND_PERIOD {10}; + +struct NodeInfo +{ + bool isMobile_ {false}; + std::shared_ptr<ChannelSocketInterface> socket {}; + asio::steady_timer refresh_timer {*Manager::instance().ioContext(), FIND_PERIOD}; + NodeInfo() = delete; + NodeInfo(NodeInfo&&) = default; + NodeInfo(std::shared_ptr<ChannelSocketInterface> socket_) + : socket(socket_) + {} + NodeInfo(bool mobile, std::shared_ptr<ChannelSocketInterface> socket_) + : isMobile_(mobile) + , socket(socket_) + {} +}; + +class Bucket + +{ +public: + static constexpr int BUCKET_MAX_SIZE = 4; + + Bucket() = delete; + Bucket(const Bucket&) = delete; + Bucket(const NodeId&); + + /** + * Add Node socket to bucket + * @param socket + * @return true if node was added, false if not + */ + bool addNode(const std::shared_ptr<ChannelSocketInterface>& socket); + + /** + * Add NodeInfo to bucket + * @param nodeInfo + * @return true if node was added, false if not + */ + bool addNode(NodeInfo&& info); + + /** + * Remove NodeId socket from bucket and insert it in known_nodes or + * mobile_nodes depending on its type + * @param nodeId + * @return true if node was removed, false if not + */ + bool removeNode(const NodeId& nodeId); + + /** + * Get connected nodes from bucket + * @return map of NodeId and NodeInfo + */ + std::map<NodeId, NodeInfo>& getNodes() { return nodes; } + + /** + * Get NodeIds from bucket + * @return set of NodeIds + */ + std::set<NodeId> getNodeIds() const; + + /** + * Test if socket exists in nodes + * @param nodeId + * @return true if node exists, false if not + */ + bool hasNode(const NodeId& nodeId) const; + + /** + * Add NodeId to known_nodes if it doesn't exist in nodes + * @param nodeId + * @return true if known node was added, false if not + */ + bool addKnownNode(const NodeId& nodeId); + + /** + * Remove NodeId from known_nodes + * @param nodeId + */ + void removeKnownNode(const NodeId& nodeId) { known_nodes.erase(nodeId); } + + /** + * Get NodeIds from known_nodes + * @return set of known NodeIds + */ + const std::set<NodeId>& getKnownNodes() const { return known_nodes; } + + /** + * Returns NodeId from known_nodes at index + * @param index + * @return NodeId + */ + NodeId getKnownNode(unsigned index) const; + + /** + * Test if NodeId exist in known_nodes + * @param nodeId + * @return true if known node exists, false if not + */ + bool hasKnownNode(const NodeId& nodeId) const + { + return known_nodes.find(nodeId) != known_nodes.end(); + } + + /** + * Add NodeId to mobile_nodes if it doesn't exist in nodes + * @param nodeId + * @return true if mobile node was added, false if not + */ + bool addMobileNode(const NodeId& nodeId); + + /** + * Remove NodeId from mobile_nodes + * @param nodeId + */ + void removeMobileNode(const NodeId& nodeId) { mobile_nodes.erase(nodeId); } + + /** + * Test if NodeId exist in mobile_nodes + * @param nodeId + * @return true if mobile node exists, false if not + */ + bool hasMobileNode(const NodeId& nodeId) + { + return mobile_nodes.find(nodeId) != mobile_nodes.end(); + } + + /** + * Get NodeIds from mobile_nodes + * @return set of mobile NodeIds + */ + const std::set<NodeId>& getMobileNodes() const { return mobile_nodes; } + + /** + * Add NodeId to connecting_nodes if it doesn't exist in nodes + * @param nodeId + * @param nodeInfo + * @return true if connecting node was added, false if not + */ + bool addConnectingNode(const NodeId& nodeId); + + /** + * Remove NodeId from connecting_nodes + * @param nodeId + */ + void removeConnectingNode(const NodeId& nodeId) { connecting_nodes.erase(nodeId); } + + /** Get NodeIds of connecting_nodes + * @return set of connecting NodeIds + */ + const std::set<NodeId>& getConnectingNodes() const { return connecting_nodes; }; + + /** + * Test if NodeId exist in connecting_nodes + * @param nodeId + * @return true if connecting node exists, false if not + */ + bool hasConnectingNode(const NodeId& nodeId) const + { + return connecting_nodes.find(nodeId) != connecting_nodes.end(); + } + + /** + * Indicate if bucket is full + * @return true if bucket is full, false if not + */ + bool isFull() const { return nodes.size() == BUCKET_MAX_SIZE; }; + + /** + * Returns random numberNodes NodeId from known_nodes + * @param numberNodes + * @param rd + * @return set of numberNodes random known NodeIds + */ + std::set<NodeId> getKnownNodesRandom(unsigned numberNodes, std::mt19937_64& rd) const; + + /** + * Returns random NodeId from known_nodes + * @param rd + * @return random known NodeId + */ + NodeId randomId(std::mt19937_64& rd) const + { + auto node = getKnownNodesRandom(1, rd); + return *node.begin(); + } + + /** + * Returns socket's timer + * @param socket + * @return timer + */ + asio::steady_timer& getNodeTimer(const std::shared_ptr<ChannelSocketInterface>& socket); + + /** + * Shutdowns socket and removes it from nodes. + * The corresponding node is moved to known_nodes or mobile_nodes + * @param socket + * @return true if node was shutdown, false if not found + */ + bool shutdownNode(const NodeId& nodeId); + + /** + * Shutdowns all sockets in nodes through shutdownNode + */ + void shutdownAllNodes(); + + /** + * Prints bucket and bucket's number + */ + void printBucket(unsigned number) const; + + /** + * Change mobility of specific node, mobile or not + */ + void changeMobility(const NodeId& nodeId, bool isMobile); + + /** + * Returns number of nodes in bucket + * @return size of nodes + */ + unsigned getNodesSize() const { return nodes.size(); } + + /** + * Returns number of knwon_nodes in bucket + * @return size of knwon_nodes + */ + unsigned getKnownNodesSize() const { return known_nodes.size(); } + + /** + * Returns number of mobile_nodes in bucket + * @return size of mobile_nodes + */ + unsigned getConnectingNodesSize() const { return connecting_nodes.size(); } + + /** + * Returns bucket lower limit + * @return NodeId lower limit + */ + NodeId getLowerLimit() const { return lowerLimit_; }; + + /** + * Set bucket's lower limit + * @param nodeId + */ + void setLowerLimit(const NodeId& nodeId) { lowerLimit_ = nodeId; } + + // For tests + + /** + * Get sockets from bucket + * @return set of sockets + */ + std::set<std::shared_ptr<ChannelSocketInterface>> getNodeSockets() const; + +private: + NodeId lowerLimit_; + std::map<NodeId, NodeInfo> nodes; + std::set<NodeId> known_nodes; + std::set<NodeId> connecting_nodes; + std::set<NodeId> mobile_nodes; + mutable std::mutex mutex; +}; + +// #################################################################################################### + +class RoutingTable +{ +public: + RoutingTable(); + + /** + * Add socket to bucket + * @param socket + * @return true if socket was added, false if not + */ + bool addNode(std::shared_ptr<ChannelSocketInterface> socket); + + /** + * Add socket to specific bucket + * @param channel + * @param bucket + * @return true if socket was added to bucket, false if not + */ + bool addNode(std::shared_ptr<ChannelSocketInterface> channel, + std::list<Bucket>::iterator& bucket); + + /** + * Removes node from routing table + * Adds it to known_nodes or mobile_nodes depending on mobility + * @param socket + * @return true if node was removed, false if not + */ + bool removeNode(const NodeId& nodeId); + + /** + * Check if connected node exsits in routing table + * @param nodeId + * @return true if node exists, false if not + */ + bool hasNode(const NodeId& nodeId); + + /** + * Add known node to routing table + * @param nodeId + * @return true if known node was added, false if not + */ + bool addKnownNode(const NodeId& nodeId); + + /** + * Checks if known node exists in routing table + * @param nodeId + * @return true if known node exists, false if not + */ + bool hasKnownNode(const NodeId& nodeId) const + { + auto bucket = findBucket(nodeId); + return bucket->hasKnownNode(nodeId); + } + + /** + * Add mobile node to routing table + * @param nodeId + * @return true if mobile node was added, false if not + */ + bool addMobileNode(const NodeId& nodeId); + + /** + * Remove mobile node to routing table + * @param nodeId + * @return true if mobile node was removed, false if not + */ + void removeMobileNode(const NodeId& nodeId); + + /** + * Check if mobile node exists in routing table + * @param nodeId + * @return true if mobile node exists, false if not + */ + bool hasMobileNode(const NodeId& nodeId); + + /** + * Add connecting node to routing table + * @param nodeId + * @return true if connecting node was added, false if not + */ + bool addConnectingNode(const NodeId& nodeId); + + /** + * Remove connecting connecting node to routing table + * @param nodeId + * @return true if connecting node was removed, false if not + */ + void removeConnectingNode(const NodeId& nodeId); + + /** + * Check if Connecting node exists in routing table + * @param nodeId + * @return true if connecting node exists, false if not + */ + bool hasConnectingNode(const NodeId& nodeId) const + { + auto bucket = findBucket(nodeId); + return bucket->hasConnectingNode(nodeId); + } + + /** + * Returns bucket iterator containing nodeId + * @param nodeId + * @return bucket iterator + */ + std::list<Bucket>::iterator findBucket(const NodeId& nodeId); + + /** + * Returns bucket iterator containing nodeId + * @param nodeId + * @return bucket iterator + */ + inline const std::list<Bucket>::const_iterator findBucket(const NodeId& nodeId) const + { + return std::list<Bucket>::const_iterator( + const_cast<RoutingTable*>(this)->findBucket(nodeId)); + } + + /** + * Returns the count closest nodes to a specific nodeId + * @param nodeId + * @param count + * @return vector of nodeIds + */ + std::vector<NodeId> closestNodes(const NodeId& nodeId, unsigned count); + + /** + * Returns number of buckets in routing table + * @return size of buckets + */ + unsigned getRoutingTableSize() const { return buckets.size(); } + + /** + * Returns number of total nodes in routing table + * @return size of nodes + */ + unsigned getRoutingTableNodeCount() const + { + size_t count = 0; + for (const auto& b : buckets) + count += b.getNodesSize(); + return count; + } + + /** + * Prints routing table + */ + void printRoutingTable() const; + + /** + * Shutdowns a node + * @param nodeId + */ + void shutdownNode(const NodeId& nodeId); + + /** + * Shutdowns all nodes in routing table and add them to known_nodes or mobile_nodes + */ + void shutdownAllNodes() + { + for (auto& bucket : buckets) + bucket.shutdownAllNodes(); + } + + /** + * Sets id for routing table + * @param node + */ + void setId(const NodeId& node) { id_ = node; } + + /** + * Returns id for routing table + * @return Nodeid + */ + NodeId getId() const { return id_; } + + /** + * Returns buckets in routing table + * @return list buckets + */ + std::list<Bucket>& getBuckets() { return buckets; } + + /** + * Returns all routing table's connected nodes + * @return vector of nodeIds + */ + std::vector<NodeId> getNodes() const; + + /** + * Returns all routing table's known nodes + *@return vector of nodeIds + */ + std::vector<NodeId> getKnownNodes() const; + + /** + * Returns all routing table's mobile nodes + * @return vector of nodeIds + */ + std::vector<NodeId> getMobileNodes() const; + + /** + * Returns all routing table's connecting nodes + * @return vector of nodeIds + */ + std::vector<NodeId> getConnectingNodes() const; + + /** + * Returns mobile nodes corresponding to the swarm's id + * @return vector of nodeIds + */ + std::vector<NodeId> getBucketMobileNodes() const; + + /** + * Test if connected nodeId is in specific bucket + * @param it + * @param nodeId + * @return true if nodeId is in bucket, false if not + */ + bool contains(const std::list<Bucket>::iterator& it, const NodeId& nodeId) const; + +private: + RoutingTable(const RoutingTable&) = delete; + RoutingTable& operator=(const RoutingTable&) = delete; + + /** + * Returns middle of routing table + * @param it + * @return NodeId + */ + NodeId middle(std::list<Bucket>::iterator& it) const; + + /** + * Returns depth of routing table + * @param bucket + * @return depth + */ + unsigned depth(std::list<Bucket>::iterator& bucket) const; + + /** + * Splits bucket + * @param bucket + * @return true if bucket was split, false if not + */ + bool split(std::list<Bucket>::iterator& bucket); + + NodeId id_; + + std::list<Bucket> buckets; + + mutable std::mutex mutex_; +}; +}; // namespace jami diff --git a/src/jamidht/swarm/swarm_channel_handler.cpp b/src/jamidht/swarm/swarm_channel_handler.cpp new file mode 100644 index 0000000000..b33cc94ca3 --- /dev/null +++ b/src/jamidht/swarm/swarm_channel_handler.cpp @@ -0,0 +1,65 @@ +/* + * Copyright (C) 2023 Savoir-faire Linux Inc. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#include "swarm_channel_handler.h" + +namespace jami { + +SwarmChannelHandler::SwarmChannelHandler(const std::shared_ptr<JamiAccount>& acc, + ConnectionManager& cm) + : ChannelHandlerInterface() + , account_(acc) + , connectionManager_(cm) +{} + +SwarmChannelHandler::~SwarmChannelHandler() {} + +void +SwarmChannelHandler::connect(const DeviceId& deviceId, + const std::string& conversationId, + ConnectCb&& cb) +{ + connectionManager_.connectDevice(deviceId, fmt::format("swarm://{}", conversationId), cb); +} + +bool +SwarmChannelHandler::onRequest(const std::shared_ptr<dht::crypto::Certificate>& cert, + const std::string& name) +{ + auto acc = account_.lock(); + if (!cert || !cert->issuer || !acc) + return false; + + auto sep = name.find_last_of('/'); + auto conversationId = name.substr(sep + 1); + if (auto acc = account_.lock()) + if (auto convModule = acc->convModule()) { + auto res = !convModule->isBannedDevice(conversationId, + cert->issuer->getLongId().toString()); + return res; + } + return false; +} + +void +SwarmChannelHandler::onReady(const std::shared_ptr<dht::crypto::Certificate>&, + const std::string& uri, + std::shared_ptr<ChannelSocket> socket) +{ +} +} // namespace jami \ No newline at end of file diff --git a/src/jamidht/swarm/swarm_channel_handler.h b/src/jamidht/swarm/swarm_channel_handler.h new file mode 100644 index 0000000000..9f82f6d1bc --- /dev/null +++ b/src/jamidht/swarm/swarm_channel_handler.h @@ -0,0 +1,73 @@ +/* + * Copyright (C) 2023 Savoir-faire Linux Inc. + * + * Author: Fadi Shehadeh <fadi.shehadeh@savoirfairelinux.com> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#pragma once + +#include "jamidht/channel_handler.h" +#include "connectivity/connectionmanager.h" +#include "jamidht/conversation_channel_handler.h" + +#include "logger.h" + +namespace jami { + +using NodeId = dht::h256; + +class JamiAccount; + +/** + * Manages Conversation's channels + */ +class SwarmChannelHandler : public ChannelHandlerInterface +{ +public: + SwarmChannelHandler(const std::shared_ptr<JamiAccount>& acc, ConnectionManager& cm); + ~SwarmChannelHandler(); + + /** + * Ask for a new git channel + * @param nodeId The node to connect + * @param name The name of the channel + * @param cb The callback to call when connected (can be immediate if already connected) + */ + void connect(const NodeId& nodeId, const std::string& name, ConnectCb&& cb) override; + + /** + * Determine if we accept or not the git request + * @param nodeId node who asked + * @param name name asked + * @return if the channel is for a valid conversation and node not banned + */ + bool onRequest(const std::shared_ptr<dht::crypto::Certificate>& peer, + const std::string& name) override; + + /** + * TODO, this needs to extract gitservers from JamiAccount + */ + void onReady(const std::shared_ptr<dht::crypto::Certificate>& peer, + const std::string& name, + std::shared_ptr<ChannelSocket> channel) override; + +private: + std::weak_ptr<JamiAccount> account_; + ConnectionManager& connectionManager_; +}; + +} // namespace jami \ No newline at end of file diff --git a/src/jamidht/swarm/swarm_manager.cpp b/src/jamidht/swarm/swarm_manager.cpp new file mode 100644 index 0000000000..03f9f69cd5 --- /dev/null +++ b/src/jamidht/swarm/swarm_manager.cpp @@ -0,0 +1,292 @@ +/* + * Copyright (C) 2022 Savoir-faire Linux Inc. + * Author: Fadi Shehadeh <fadi.shehadeh@savoirfairelinux.com> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <https://www.gnu.org/licenses/>. + */ + +#include <iostream> + +#include "swarm_manager.h" +#include "connectivity/multiplexed_socket.h" + +constexpr const std::chrono::minutes FIND_PERIOD {10}; + +namespace jami { + +using namespace swarm_protocol; + +SwarmManager::SwarmManager(const NodeId& id) + : id_(id) + , rd(dht::crypto::getSeededRandomEngine<std::mt19937_64>()) +{ + routing_table.setId(id); +} + +SwarmManager::~SwarmManager() +{ + shutdown(); +} + +void +SwarmManager::removeNode(const NodeId& nodeId) +{ + { + std::lock_guard<std::mutex> lock(mutex); + removeNodeInternal(nodeId); + } + maintainBuckets(); +} + +void +SwarmManager::removeNodeInternal(const NodeId& nodeId) +{ + routing_table.removeNode(nodeId); +} + +void +SwarmManager::setKnownNodes(const std::vector<NodeId>& known_nodes) +{ + isShutdown_ = false; + { + std::lock_guard<std::mutex> lock(mutex); + for (const auto& NodeId : known_nodes) + addKnownNodes(std::move(NodeId)); + } + maintainBuckets(); +} + +void +SwarmManager::setMobileNodes(const std::vector<NodeId>& mobile_nodes) +{ + { + std::lock_guard<std::mutex> lock(mutex); + for (const auto& nodeId : mobile_nodes) + addMobileNodes(nodeId); + } + maintainBuckets(); +} + +void +SwarmManager::addKnownNodes(const NodeId& nodeId) +{ + if (id_ != nodeId) { + routing_table.addKnownNode(nodeId); + } +} + +void +SwarmManager::addMobileNodes(const NodeId& nodeId) +{ + if (id_ != nodeId) { + routing_table.addMobileNode(nodeId); + } +} + +void +SwarmManager::sendRequest(const std::shared_ptr<ChannelSocketInterface>& socket, + NodeId& nodeId, + Query q, + int numberNodes) +{ + msgpack::sbuffer buffer; + msgpack::packer<msgpack::sbuffer> pk(&buffer); + std::error_code ec; + Message msg; + msg.request = Request {q, numberNodes, nodeId}; + pk.pack(msg); + + socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec); + if (ec) { + JAMI_ERR("%s", ec.message().c_str()); + return; + } +} + +void +SwarmManager::sendAnswer(const std::shared_ptr<ChannelSocketInterface>& socket, const Message& msg_) +{ + std::lock_guard<std::mutex> lock(mutex); + Response toResponse; + Message msg; + std::vector<NodeId> nodesToSend; + + if (msg_.request->q == Query::FIND) { + nodesToSend = routing_table.closestNodes(msg_.request->nodeId, msg_.request->num); + toResponse.q = Query::FOUND; + + toResponse.nodes = nodesToSend; + msg.response = toResponse; + + msgpack::sbuffer buffer; + msgpack::packer<msgpack::sbuffer> pk(&buffer); + + pk.pack(msg); + + std::error_code ec; + socket->write(reinterpret_cast<const unsigned char*>(buffer.data()), buffer.size(), ec); + if (ec) { + JAMI_ERR("%s", ec.message().c_str()); + return; + } + } +} + +void +SwarmManager::receiveMessage(const std::shared_ptr<ChannelSocketInterface>& socket) +{ + struct DecodingContext + { + msgpack::unpacker pac {[](msgpack::type::object_type, std::size_t, void*) { return true; }, + nullptr, + 512}; + }; + + socket->setOnRecv([w = weak(), + wsocket = std::weak_ptr<ChannelSocketInterface>(socket), + ctx = std::make_shared<DecodingContext>()](const uint8_t* buf, size_t len) { + ctx->pac.reserve_buffer(len); + std::copy_n(buf, len, ctx->pac.buffer()); + ctx->pac.buffer_consumed(len); + + msgpack::object_handle oh; + while (ctx->pac.next(oh)) { + auto shared = w.lock(); + auto socket = wsocket.lock(); + if (!shared || !socket) { + JAMI_ERROR("Swarm Manager was destroyed :/"); + return (size_t) 0; + } + + try { + Message msg; + oh.get().convert(msg); + + if (msg.request) { + shared->sendAnswer(socket, msg); + } + if (msg.response) { + shared->setKnownNodes(msg.response->nodes); + } + + } catch (const std::exception& e) { + JAMI_WARN("Error DRT recv: %s", e.what()); + // return len; + } + } + + return len; + }); + + socket->onShutdown([w = weak(), wsocket = std::weak_ptr<ChannelSocketInterface>(socket)] { + auto shared = w.lock(); + auto socket = wsocket.lock(); + if (shared and socket) { + shared->removeNode(socket->deviceId()); + shared->maintainBuckets(); + } + }); +} + +void +SwarmManager::maintainBuckets() +{ + std::set<NodeId> nodes; + std::unique_lock<std::mutex> lock(mutex); + auto& buckets = routing_table.getBuckets(); + for (auto it = buckets.begin(); it != buckets.end(); ++it) { + auto& bucket = *it; + bool myBucket = routing_table.contains(it, id_); + auto connecting_nodes = myBucket ? bucket.getConnectingNodesSize() + : bucket.getConnectingNodesSize() + bucket.getNodesSize(); + if (connecting_nodes < Bucket::BUCKET_MAX_SIZE) { + auto nodesToTry = bucket.getKnownNodesRandom(Bucket::BUCKET_MAX_SIZE - connecting_nodes, + rd); + for (auto& node : nodesToTry) + routing_table.addConnectingNode(node); + + nodes.insert(nodesToTry.begin(), nodesToTry.end()); + } + } + lock.unlock(); + for (auto& node : nodes) + tryConnect(node); +} + +void +SwarmManager::tryConnect(const NodeId& nodeId) +{ + if (needSocketCb_) + needSocketCb_(nodeId.toString(), + [this, nodeId](const std::shared_ptr<ChannelSocketInterface>& socket) { + std::unique_lock<std::mutex> lock(mutex); + if (socket) { + if (routing_table.addNode(socket)) { + std::error_code ec; + resetNodeExpiry(ec, socket, id_); + lock.unlock(); + receiveMessage(socket); + } + } else { + routing_table.addKnownNode(nodeId); + maintainBuckets(); + } + return true; + }); +} + +void +SwarmManager::addChannel(const std::shared_ptr<ChannelSocketInterface>& channel) +{ + { + std::lock_guard<std::mutex> lock(mutex); + auto bucket = routing_table.findBucket(channel->deviceId()); + routing_table.addNode(channel, bucket); + } + receiveMessage(channel); +} + +void +SwarmManager::resetNodeExpiry(const asio::error_code& ec, + const std::shared_ptr<ChannelSocketInterface>& socket, + NodeId node) +{ + NodeId idToFind; + std::list<Bucket>::iterator bucket; + + if (ec == asio::error::operation_aborted) + return; + + if (!node) { + bucket = routing_table.findBucket(socket->deviceId()); + idToFind = bucket->randomId(rd); + } else { + bucket = routing_table.findBucket(node); + idToFind = node; + } + + sendRequest(socket, idToFind, Query::FIND, Bucket::BUCKET_MAX_SIZE); + + if (!node) { + auto& nodeTimer = bucket->getNodeTimer(socket); + nodeTimer.expires_after(FIND_PERIOD); + nodeTimer.async_wait(std::bind(&jami::SwarmManager::resetNodeExpiry, + this, + std::placeholders::_1, + socket, + NodeId {})); + } +} + +} // namespace jami diff --git a/src/jamidht/swarm/swarm_manager.h b/src/jamidht/swarm/swarm_manager.h new file mode 100644 index 0000000000..f20eda1d9b --- /dev/null +++ b/src/jamidht/swarm/swarm_manager.h @@ -0,0 +1,152 @@ +/* + * Copyright (C) 2023 Savoir-faire Linux Inc. + * + * Author: Fadi Shehadeh <fadi.shehadeh@savoirfairelinux.com> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ +#pragma once + +#include "routing_table.h" +#include "swarm_protocol.h" + +#include <iostream> +#include <memory> + +namespace jami { + +using namespace swarm_protocol; + +class SwarmManager : public std::enable_shared_from_this<SwarmManager> +{ + using ChannelCb = std::function<bool(const std::shared_ptr<ChannelSocketInterface>&)>; + using NeedSocketCb = std::function<void(const std::string&, ChannelCb&&)>; + +public: + SwarmManager(const NodeId&); + ~SwarmManager(); + + NeedSocketCb needSocketCb_; + + std::weak_ptr<SwarmManager> weak() + { + return std::static_pointer_cast<SwarmManager>(shared_from_this()); + } + + const NodeId& getId() const { return id_; } + + /** + * Add list of nodes to the known nodes list + * @param vector<NodeId>& known_nodes + */ + void setKnownNodes(const std::vector<NodeId>& known_nodes); + + /** + * Add list of nodes to the mobile nodes list + * @param vector<NodeId>& mobile_nodes + */ + void setMobileNodes(const std::vector<NodeId>& mobile_nodes); + + /** + * Add channel to routing table + * @param shared_ptr<ChannelSocketInterface>& channel + */ + void addChannel(const std::shared_ptr<ChannelSocketInterface>& channel); + + void removeNode(const NodeId& nodeId); + + /** For testing */ + RoutingTable& getRoutingTable() { return routing_table; }; + std::list<Bucket>& getBuckets() { return routing_table.getBuckets(); }; + + void shutdown() { routing_table.shutdownAllNodes(); }; + + void display() + { + JAMI_DEBUG("SwarmManager {:s} has {:d} nodes in table", + getId().to_c_str(), + routing_table.getRoutingTableNodeCount()); + } + +private: + /** + * Add node to the known_Nodes list + * @param NodeId nodeId + */ + void addKnownNodes(const NodeId& nodeId); + + /** + * Add node to the mobile_Nodes list + * @param NodeId nodeId + */ + void addMobileNodes(const NodeId& nodeId); + + /** + * Send nodes request to fill known_nodes list + * @param shared_ptr<ChannelSocketInterface>& socket + * @param NodeId& nodeId + * @param Query q + * @param int numberNodes + */ + void sendRequest(const std::shared_ptr<ChannelSocketInterface>& socket, + NodeId& nodeId, + Query q, + int numberNodes = Bucket::BUCKET_MAX_SIZE); + + /** + * Send answer to request + * @param std::shared_ptr<ChannelSocketInterface>& socket + * @param Message msg + */ + void sendAnswer(const std::shared_ptr<ChannelSocketInterface>& socket, const Message& msg_); + + /** + * Interpret received message + * @param std::shared_ptr<ChannelSocketInterface>& socket + */ + void receiveMessage(const std::shared_ptr<ChannelSocketInterface>& socket); + + /** + * Maintain/Update buckets + */ + void maintainBuckets(); + + /** + * Add list of nodes to the known nodes list + * @param asio::error_code& ec + * @param shared_ptr<ChannelSocketInterface>& socket + * @param NodeId node + */ + void resetNodeExpiry(const asio::error_code& ec, + const std::shared_ptr<ChannelSocketInterface>& socket, + NodeId node = {}); + + /** + * Try to establich connexion with specific node + * @param NodeId nodeId + */ + void tryConnect(const NodeId& nodeId); + + void removeNodeInternal(const NodeId& nodeId); + + const NodeId id_; + mutable std::mt19937_64 rd; + mutable std::mutex mutex; + RoutingTable routing_table; + + std::atomic_bool isShutdown_ {false}; +}; + +} // namespace jami diff --git a/src/jamidht/swarm/swarm_protocol.cpp b/src/jamidht/swarm/swarm_protocol.cpp new file mode 100644 index 0000000000..d91e69a1ac --- /dev/null +++ b/src/jamidht/swarm/swarm_protocol.cpp @@ -0,0 +1,26 @@ +/* + * Copyright (C) 2023 Savoir-faire Linux Inc. + * Author: Fadi Shehadeh <fadi.shehadeh@savoirfairelinux.com> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <https://www.gnu.org/licenses/>. + */ + +#include "swarm_protocol.h" + +#include <iostream> + +namespace jami { +namespace swarm_protocol {}; // namespace swarm_protocol + +} // namespace jami diff --git a/src/jamidht/swarm/swarm_protocol.h b/src/jamidht/swarm/swarm_protocol.h new file mode 100644 index 0000000000..33fb798962 --- /dev/null +++ b/src/jamidht/swarm/swarm_protocol.h @@ -0,0 +1,68 @@ +/* + * Copyright (C) 2023 Savoir-faire Linux Inc. + * Author: Fadi Shehadeh <fadi.shehadeh@savoirfairelinux.com> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <https://www.gnu.org/licenses/>. + */ +#pragma once + +#include <string> +#include <string_view> +#include <msgpack.hpp> + +#include <opendht/infohash.h> + +using namespace std::literals; +using NodeId = dht::h256; + +namespace jami { + +namespace swarm_protocol { + +static constexpr int version = 1; + +enum class Query : uint8_t { FIND = 1, FOUND = 2 }; + +using NodeId = dht::PkId; + +struct Request +{ + Query q; // Type of query + int num; // Number of nodes + NodeId nodeId; + MSGPACK_DEFINE_MAP(q, num, nodeId); +}; + +struct Response +{ + Query q; + std::vector<NodeId> nodes; + std::vector<NodeId> mobile_nodes; + + MSGPACK_DEFINE_MAP(q, nodes, mobile_nodes); +}; + +struct Message +{ + int v = version; + bool is_mobile {false}; + std::optional<Request> request; + std::optional<Response> response; + MSGPACK_DEFINE_MAP(v, is_mobile, request, response); +}; + +}; // namespace swarm_protocol + +} // namespace jami +MSGPACK_ADD_ENUM(jami::swarm_protocol::Query); -- GitLab