Skip to content
Snippets Groups Projects
Select Git revision
  • df27e71a32ec578a12cb6813bea201c2fce1e097
  • master default
  • windows_ci_static
  • c_link
  • cpack
  • windows_ci
  • cert_pk_id
  • proxy_push_result
  • cnode_put_id
  • update-windows-build
  • proxy
  • resubscribe_on_token_change
  • actions
  • client_mode
  • llhttp
  • search_node_add
  • crypto_aes_gcm_argon2
  • ios_notifications
  • log_fmt
  • v2asio
  • fix-msvc
  • v3.4.0
  • v3.3.1
  • v3.3.1rc1
  • v3.3.1rc2
  • v3.3.0
  • v3.2.0
  • v3.1.11
  • v3.1.10
  • v3.1.9
  • v3.1.8.2
  • v3.1.8.1
  • v3.1.8
  • v3.1.7
  • v3.1.6
  • v3.1.5
  • v3.1.4
  • v3.1.3
  • v3.1.2
  • v3.1
  • v3.0.1
41 results

dht_proxy_client.cpp

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    benchmark.py 7.22 KiB
    #!/usr/bin/env python3
    # Copyright (C) 2015 Savoir-Faire Linux Inc.
    # Author: Adrien Béraud <adrien.beraud@savoirfairelinux.com>
    
    import sys, subprocess, argparse, time, random, string, threading, signal
    from pyroute2.netns.process.proxy import NSPopen
    import numpy as np
    import matplotlib.pyplot as plt
    from dhtnetwork import DhtNetwork
    
    sys.path.append('..')
    from opendht import *
    
    class WorkBench():
        """docstring for WorkBench"""
        def __init__(self, ifname='ethdht', virtual_locs=8, node_num=32, remote_bootstrap=None, loss=0, delay=0, disable_ipv4=False,
                disable_ipv6=False):
            self.ifname       = ifname
            self.virtual_locs = virtual_locs
            self.node_num     = node_num
            self.clusters     = min(virtual_locs, node_num)
            self.node_per_loc = int(self.node_num / self.clusters)
            self.loss         = loss
            self.delay        = delay
            self.disable_ipv4 = disable_ipv4
            self.disable_ipv6 = disable_ipv6
    
            self.remote_bootstrap = remote_bootstrap
            self.local_bootstrap  = None
            self.procs            = [None for _ in range(self.clusters)]
    
        def get_bootstrap(self):
            if not self.local_bootstrap:
                self.local_bootstrap = DhtNetwork(iface='br'+self.ifname,
                        first_bootstrap=False if self.remote_bootstrap else True,
                        bootstrap=[(self.remote_bootstrap, "5000")] if self.remote_bootstrap else [])
            return self.local_bootstrap
    
        def create_virtual_net(self):
            if self.virtual_locs > 1:
                cmd = ["python3", "virtual_network_builder.py", "-i", self.ifname, "-n", str(self.clusters), '-l', str(self.loss), '-d', str(self.delay)]
                if not self.disable_ipv4:
                    cmd.append('-4')
                if not self.disable_ipv6:
                    cmd.append('-6')
                print(cmd)
                p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
                output, err = p.communicate()
                print(output.decode())
    
        def destroy_virtual_net(self):
            print('Shuting down the virtual IP network.')
            subprocess.call(["python3", "virtual_network_builder.py", "-i", self.ifname, "-n", str(self.clusters), "-r"])
    
        def start_cluster(self, i):
            if self.local_bootstrap:
                cmd = ["python3", "dhtnetwork.py", "-n", str(self.node_per_loc), '-I', self.ifname+str(i)+'.1']
                if self.remote_bootstrap:
                    cmd.extend(['-b', self.remote_bootstrap, '-bp', "5000"])
                else:
                    if not self.disable_ipv4 and self.local_bootstrap.ip4:
                        cmd.extend(['-b', self.local_bootstrap.ip4])
                    if not self.disable_ipv6 and self.local_bootstrap.ip6:
                        cmd.extend(['-b6', self.local_bootstrap.ip6])
                self.procs[i] = NSPopen('node'+str(i), cmd)
            else:
                raise Exception('First create bootstrap.')
    
        def stop_cluster(self, i):
            if self.procs[i]:
                try:
                    self.procs[i].send_signal(signal.SIGINT);
                    self.procs[i].wait()
                    self.procs[i].release()
                except Exception as e:
                    print(e)
                self.procs[i] = None
    
        def replace_cluster(self):
            n = random.randrange(0, self.clusters)
            self.stop_cluster(n)
            self.start_cluster(n)
    
    
    
    def getsTimesTest():
        """TODO: Docstring for
    
        """
    
        plt.ion()
    
        fig, axes = plt.subplots(2, 1)
        fig.tight_layout()
    
        lax = axes[0]
        hax = axes[1]
    
        lines = None#ax.plot([])
        #plt.ylabel('time (s)')
        hax.set_ylim(0, 2)
    
        # let the network stabilise
        plt.pause(60)
    
        #start = time.time()
        times = []
        done = 0
    
        lock = threading.Condition()
    
        def getcb(v):
            print("found", v)
            return True
    
        def donecb(ok, nodes):
            nonlocal lock, done, times
            t = time.time()-start
            with lock:
                if not ok:
                    print("failed !")
                times.append(t)
                done -= 1
                lock.notify()
    
        def update_plot():
            nonlocal lines
            while lines:
                l = lines.pop()
                l.remove()
                del l
            lines = plt.plot(times, color='blue')
            plt.draw()
    
        def run_get():
            nonlocal done
            done += 1
            start = time.time()
            bootstrap.front().get(InfoHash.getRandom(), getcb, lambda ok, nodes: donecb(ok, nodes, start))
    
        plt.pause(5)
    
        plt.show()
        update_plot()
    
        times = []
        for n in range(10):
            wb.replace_cluster()
            plt.pause(2)
            print("Getting 50 random hashes succesively.")
            for i in range(50):
                with lock:
                    done += 1
                    start = time.time()
                    bootstrap.front().get(InfoHash.getRandom(), getcb, donecb)
                    while done > 0:
                        lock.wait()
                        update_plot()
                update_plot()
            print("Took", np.sum(times), "mean", np.mean(times), "std", np.std(times), "min", np.min(times), "max", np.max(times))
    
        print('GET calls timings benchmark test : DONE. '  \
                'Close Matplotlib window for terminating the program.')
        plt.ioff()
        plt.show()
    
    if __name__ == '__main__':
    
        parser = argparse.ArgumentParser(description='Run, test and benchmark a DHT network on a local virtual network with simulated packet loss and latency.')
        parser.add_argument('-i', '--ifname', help='interface name', default='ethdht')
        parser.add_argument('-n', '--node-num', help='number of dht nodes to run', type=int, default=32)
        parser.add_argument('-v', '--virtual-locs', help='number of virtual locations (node clusters)', type=int, default=8)
        parser.add_argument('-l', '--loss', help='simulated cluster packet loss (percent)', type=int, default=0)
        parser.add_argument('-d', '--delay', help='simulated cluster latency (ms)', type=int, default=0)
        parser.add_argument('-b', '--bootstrap', help='Bootstrap node to use (if any)', default=None)
        parser.add_argument('-no4', '--disable-ipv4', help='Enable IPv4', action="store_true")
        parser.add_argument('-no6', '--disable-ipv6', help='Enable IPv6', action="store_true")
        parser.add_argument('--gets', action='store_true', help='Launches get calls timings benchmark test.', default=0)
    
        args = parser.parse_args()
    
        if args.gets < 1:
            print('No test specified... Quitting.', file=sys.stderr)
            sys.exit(1)
    
        wb = WorkBench(args.ifname, args.virtual_locs, args.node_num, loss=args.loss,
                delay=args.delay, disable_ipv4=args.disable_ipv4,
                disable_ipv6=args.disable_ipv6)
        wb.create_virtual_net()
    
        bootstrap = wb.get_bootstrap()
        bootstrap.resize(1)
        print("Launching", wb.node_num, "nodes (", wb.clusters, "clusters of", wb.node_per_loc, "nodes)")
    
        try:
            for i in range(wb.clusters):
                wb.start_cluster(i)
    
            if args.gets:
                getsTimesTest()
    
        except Exception as e:
            print(e)
        finally:
            for p in wb.procs:
                if p:
                    p.send_signal(signal.SIGINT);
            bootstrap.resize(0)
            wb.destroy_virtual_net()
            for p in wb.procs:
                if p:
                    try:
                        p.wait()
                        p.release()
                    except Exception as e:
                        print(e)