Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
benchmark.py 7.93 KiB
#!/usr/bin/env python3
# Copyright (C) 2015-2016 Savoir-faire Linux Inc.
# Author(s): Adrien Béraud <adrien.beraud@savoirfairelinux.com>
#            Simon Désaulniers <sim.desaulniers@gmail.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; If not, see <http://www.gnu.org/licenses/>.

import os
import sys
import subprocess
import signal
import argparse
import time

from dht.network import DhtNetwork
from dht.network import DhtNetworkSubProcess
from dht.tests import PerformanceTest, PersistenceTest
from dht import virtual_network_builder
from dht import network as dhtnetwork

from opendht import *


class WorkBench():
    """
    This contains the initialisation information, such as ipv4/ipv6, number of
    nodes and cluster to create, etc. This class is also used to initialise and
    finish the network.
    """
    def __init__(self, ifname='ethdht', virtual_locs=8, node_num=32, remote_bootstrap=None, loss=0, delay=0, disable_ipv4=False,
            disable_ipv6=False):
        self.ifname       = ifname
        self.virtual_locs = virtual_locs
        self.node_num     = node_num
        self.clusters     = min(virtual_locs, node_num)
        self.node_per_loc = int(self.node_num / self.clusters)
        self.loss         = loss
        self.delay        = delay
        self.disable_ipv4 = disable_ipv4
        self.disable_ipv6 = disable_ipv6

        self.remote_bootstrap = remote_bootstrap
        self.local_bootstrap  = None
        self.bs_port          = "5000"
        self.procs            = [None for _ in range(self.clusters)]

    def get_bootstrap(self):
        if not self.local_bootstrap:
            self.local_bootstrap = DhtNetwork(iface='br'+self.ifname,
                    first_bootstrap=False if self.remote_bootstrap else True,
                    bootstrap=[(self.remote_bootstrap, self.bs_port)] if self.remote_bootstrap else [])
        return self.local_bootstrap

    def create_virtual_net(self):
        if self.virtual_locs > 1:
            cmd = ["python3", os.path.abspath(virtual_network_builder.__file__), "-i", self.ifname, "-n", str(self.clusters), '-l', str(self.loss), '-d', str(self.delay)]
            if not self.disable_ipv4:
                cmd.append('-4')
            if not self.disable_ipv6:
                cmd.append('-6')
            print(cmd)
            p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
            output, err = p.communicate()
            print(output.decode())

    def destroy_virtual_net(self):
        print('Shuting down the virtual IP network.')
        subprocess.call(["python3", os.path.abspath(virtual_network_builder.__file__), "-i", self.ifname, "-n", str(self.clusters), "-r"])

    def start_cluster(self, i):
        if self.local_bootstrap:
            cmd = ["python3", os.path.abspath(dhtnetwork.__file__), "-n", str(self.node_per_loc), '-I', self.ifname+str(i)+'.1']
            if self.remote_bootstrap:
                cmd.extend(['-b', self.remote_bootstrap, '-bp', "5000"])
            else:
                if not self.disable_ipv4 and self.local_bootstrap.ip4:
                    cmd.extend(['-b', self.local_bootstrap.ip4])
                if not self.disable_ipv6 and self.local_bootstrap.ip6:
                    cmd.extend(['-b6', self.local_bootstrap.ip6])
            self.procs[i] = DhtNetworkSubProcess('node'+str(i), cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
            while DhtNetworkSubProcess.NOTIFY_TOKEN not in self.procs[i].getline():
                # waiting for process to spawn
                time.sleep(0.5)
        else:
            raise Exception('First create bootstrap.')

    def stop_cluster(self, i):
        """
        Stops a cluster sub process. All nodes are put down without graceful
        shutdown.
        """
        if self.procs[i]:
            try:
                self.procs[i].quit()
            except Exception as e:
                print(e)
            self.procs[i] = None

    def replace_cluster(self):
        """
        Same as stop_cluster(), but creates a new cluster right after.
        """
        n = random.randrange(0, self.clusters)
        self.stop_cluster(n)
        self.start_cluster(n)

    def resize_clusters(self, n):
        """
        Resizes the list of clusters to be of length ``n``.
        """
        procs_count = len(self.procs)
        if procs_count < n:
            for i in range(n-procs_count):
                self.procs.append(None)
                self.start_cluster(procs_count+i)
        else:
            for i in range(procs_count-n):
                self.stop_cluster(procs_count-i-1)


if __name__ == '__main__':

    parser = argparse.ArgumentParser(description='Run, test and benchmark a '\
            'DHT network on a local virtual network with simulated packet '\
            'loss and latency.')
    ifConfArgs = parser.add_argument_group('Virtual interface configuration')
    ifConfArgs.add_argument('-i', '--ifname', default='ethdht', help='interface name')
    ifConfArgs.add_argument('-n', '--node-num', type=int, default=32, help='number of dht nodes to run')
    ifConfArgs.add_argument('-v', '--virtual-locs', type=int, default=8,
            help='number of virtual locations (node clusters)')
    ifConfArgs.add_argument('-l', '--loss', type=int, default=0, help='simulated cluster packet loss (percent)')
    ifConfArgs.add_argument('-d', '--delay', type=int, default=0, help='simulated cluster latency (ms)')
    ifConfArgs.add_argument('-b', '--bootstrap', default=None, help='Bootstrap node to use (if any)')
    ifConfArgs.add_argument('-no4', '--disable-ipv4', action="store_true", help='Enable IPv4')
    ifConfArgs.add_argument('-no6', '--disable-ipv6', action="store_true", help='Enable IPv6')

    testArgs = parser.add_argument_group('Test arguments')
    testArgs.add_argument('-t', '--test', type=str, default=None, required=True, help='Specifies the test.')
    testArgs.add_argument('-o', '--opt', type=str, default=[], nargs='+',
            help='Options passed to tests routines.')

    featureArgs = parser.add_mutually_exclusive_group(required=True)
    featureArgs.add_argument('--performance', action='store_true', default=0,
            help='Launches performance benchmark test. Available args for "-t" are: gets.')
    featureArgs.add_argument('--data-persistence', action='store_true', default=0,
            help='Launches data persistence benchmark test. '\
                    'Available args for "-t" are: delete, replace, mult_time. '\
                    'Available args for "-o" are : dump_str_log')


    args = parser.parse_args()

    wb = WorkBench(args.ifname, args.virtual_locs, args.node_num, loss=args.loss,
            delay=args.delay, disable_ipv4=args.disable_ipv4,
            disable_ipv6=args.disable_ipv6)
    wb.create_virtual_net()

    bootstrap = wb.get_bootstrap()
    bootstrap.resize(1)
    print("Launching", wb.node_num, "nodes (", wb.clusters, "clusters of", wb.node_per_loc, "nodes)")

    try:
        for i in range(wb.clusters):
            wb.start_cluster(i)

        if args.performance:
            PerformanceTest(args.test, wb, *args.opt).run()
        elif args.data_persistence:
            PersistenceTest(args.test, wb, *args.opt).run()

    except Exception as e:
        print(e)
    finally:
        for p in wb.procs:
            if p:
                p.quit()
        bootstrap.resize(0)
        sys.stdout.write('Shutting down the virtual IP network... ')
        sys.stdout.flush()
        wb.destroy_virtual_net()
        print('Done.')