From 2289243bfc291b5b33b45e808c7308012bba34e0 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Simon=20D=C3=A9saulniers?= <rostydela@gmail.com>
Date: Mon, 25 Jan 2016 16:33:42 -0500
Subject: [PATCH] benchmark: fix tests + new test: totallyNormalTest

The "Totally normal test" is meant to create a realistic environment, i.e.
with normal asynchronous packet activity. This is useful in order to
differentiate normal and data persistence traffic.
---
 python/tools/benchmark.py   |  23 ++-
 python/tools/dht/network.py |  39 +++-
 python/tools/dht/tests.py   | 356 ++++++++++++++++++++++++++++--------
 3 files changed, 328 insertions(+), 90 deletions(-)

diff --git a/python/tools/benchmark.py b/python/tools/benchmark.py
index 1840d662..0d20a93a 100755
--- a/python/tools/benchmark.py
+++ b/python/tools/benchmark.py
@@ -23,6 +23,7 @@ import subprocess
 import signal
 import argparse
 import time
+import random
 
 from dht.network import DhtNetwork
 from dht.network import DhtNetworkSubProcess
@@ -150,17 +151,22 @@ if __name__ == '__main__':
     testArgs.add_argument('-t', '--test', type=str, default=None, required=True, help='Specifies the test.')
     testArgs.add_argument('-o', '--opt', type=str, default=[], nargs='+',
             help='Options passed to tests routines.')
+    testArgs.add_argument('-m', type=int, default=None, help='Generic size option passed to tests.')
+    testArgs.add_argument('-e', type=int, default=None, help='Generic size option passed to tests.')
 
     featureArgs = parser.add_mutually_exclusive_group(required=True)
-    featureArgs.add_argument('--performance', action='store_true', default=0,
+    featureArgs.add_argument('--performance', action='store_true', default=False,
             help='Launches performance benchmark test. Available args for "-t" are: gets.')
     featureArgs.add_argument('--data-persistence', action='store_true', default=0,
             help='Launches data persistence benchmark test. '\
-                    'Available args for "-t" are: delete, replace, mult_time. '\
-                    'Available args for "-o" are : dump_str_log')
+                 'Available args for "-t" are: delete, replace, mult_time. '\
+                 'Available args for "-o" are : dump_str_log, keep_alive, trigger, traffic_plot, op_plot. '\
+                 'Use "-m" to specify the number of producers on the DHT.'\
+                 'Use "-e" to specify the number of values to put on the DHT.')
 
 
     args = parser.parse_args()
+    test_opt = { o : True for o in args.opt }
 
     wb = WorkBench(args.ifname, args.virtual_locs, args.node_num, loss=args.loss,
             delay=args.delay, disable_ipv4=args.disable_ipv4,
@@ -175,10 +181,17 @@ if __name__ == '__main__':
         for i in range(wb.clusters):
             wb.start_cluster(i)
 
+        # recover -e and -m values.
+        if args.e:
+            test_opt.update({ 'num_values' : args.e })
+        if args.m:
+            test_opt.update({ 'num_producers' : args.m })
+
+        # run the test
         if args.performance:
-            PerformanceTest(args.test, wb, *args.opt).run()
+            PerformanceTest(args.test, wb, test_opt).run()
         elif args.data_persistence:
-            PersistenceTest(args.test, wb, *args.opt).run()
+            PersistenceTest(args.test, wb, test_opt).run()
 
     except Exception as e:
         print(e)
diff --git a/python/tools/dht/network.py b/python/tools/dht/network.py
index b0b68baf..7a84c1a5 100755
--- a/python/tools/dht/network.py
+++ b/python/tools/dht/network.py
@@ -37,7 +37,8 @@ from opendht import *
 # useful functions
 b_space_join = lambda *l: b' '.join(map(bytes, l))
 
-# TODO: find where token "notifyend" gets printed...
+# TODO: find where token "notifyend" gets printed... Or switch to MSGPACK for
+# serialisation of packets between both processes.
 class DhtNetworkSubProcess(NSPopen):
     """
     Handles communication with DhtNetwork sub process.
@@ -47,6 +48,8 @@ class DhtNetworkSubProcess(NSPopen):
     therefor, waits for the sub process to spawn.
     """
     # requests
+    NODE_PUT_REQ              = b"np"
+    NEW_NODE_REQ              = b"nn"
     REMOVE_NODE_REQ           = b"rn"
     SHUTDOWN_NODE_REQ         = b"sdn"
     SHUTDOWN_REPLACE_NODE_REQ = b'sdrn'
@@ -173,7 +176,7 @@ class DhtNetworkSubProcess(NSPopen):
         process.
 
         @param answer_cb: Callback to call when an answer is given after notify.
-                          The function takes a list of lines as argument.
+                          The function takes a list of strings as argument.
         @type  answer_cb:  function
         """
         notified = False
@@ -228,6 +231,18 @@ class DhtNetworkSubProcess(NSPopen):
         self._sendRequest(DhtNetworkSubProcess.MESSAGE_STATS, answer_cb=cb)
         return stats
 
+    def sendNodePutRequest(self, _hash, value):
+        """
+        Sends a put operation request.
+
+        @param _hash: the hash of the value.
+        @type  _hash: bytes.
+        @param value: the value.
+        @type  value: bytes.
+        """
+        self._sendRequest(b_space_join(DhtNetworkSubProcess.NODE_PUT_REQ, _hash,
+            value))
+
     def sendNodesRequest(self, request, ids=b''):
         """
         Send request to a list of nodes or the whole cluster.
@@ -469,6 +484,8 @@ if __name__ == '__main__':
                 except queue.Empty:
                     pass
                 else:
+                    NODE_PUT_REQ              = DhtNetworkSubProcess.NODE_PUT_REQ.decode()
+                    NEW_NODE_REQ              = DhtNetworkSubProcess.NEW_NODE_REQ.decode()
                     REMOVE_NODE_REQ           = DhtNetworkSubProcess.REMOVE_NODE_REQ.decode()
                     SHUTDOWN_NODE_REQ         = DhtNetworkSubProcess.SHUTDOWN_NODE_REQ.decode()
                     SHUTDOWN_REPLACE_NODE_REQ = DhtNetworkSubProcess.SHUTDOWN_REPLACE_NODE_REQ.decode()
@@ -479,18 +496,30 @@ if __name__ == '__main__':
                     if req in [SHUTDOWN_NODE_REQ,
                                SHUTDOWN_REPLACE_NODE_REQ,
                                REMOVE_NODE_REQ]:
-                        DhtNetwork.log('got node deletion request.')
-                        for n in req_args:
+                        def delete_request(req, n):
+                            global msg_stats
                             if req == SHUTDOWN_NODE_REQ:
                                 net.end_node(id=n, shutdown=True, last_msg_stats=msg_stats)
                             elif req == SHUTDOWN_REPLACE_NODE_REQ:
                                 net.replace_node(id=n, shutdown=True, last_msg_stats=msg_stats)
                             elif req == REMOVE_NODE_REQ:
                                 net.end_node(id=n, last_msg_stats=msg_stats)
+
+                        if len(req) > 0:
+                            for n in req_args:
+                                delete_request(req, n)
+                        else:
+                            delete_request(req, net.get().getNodeId())
                     elif req == SHUTDOWN_CLUSTER_REQ:
                         for n in net.nodes:
-                            n.end_node(shutdown=True, last_msg_stats=msg_stats)
+                            net.end_node(id=n[2], shutdown=True, last_msg_stats=msg_stats)
                         quit = True
+                    elif req == NEW_NODE_REQ:
+                        net.launch_node()
+                    elif req == NODE_PUT_REQ:
+                        _hash, v = req_args[:2]
+                        net.get().put(InfoHash(_hash), Value(v))
+
                     elif req == DUMP_STORAGE_REQ:
                         for n in [m[1] for m in net.nodes if m[1].getNodeId() in req_args]:
                             net.log(n.getStorageLog())
diff --git a/python/tools/dht/tests.py b/python/tools/dht/tests.py
index 7fa31cb7..894c7c0e 100644
--- a/python/tools/dht/tests.py
+++ b/python/tools/dht/tests.py
@@ -3,25 +3,33 @@
 # Author(s): Adrien Béraud <adrien.beraud@savoirfairelinux.com>
 #            Simon Désaulniers <sim.desaulniers@gmail.com>
 
+import os
 import threading
 import random
 import string
 import time
+import subprocess
+import re
 
 import numpy as np
 import matplotlib.pyplot as plt
+from matplotlib.ticker import FuncFormatter
 
 from opendht import *
 from dht.network import DhtNetwork, DhtNetworkSubProcess
 
-######################
-#  Common functions  #
-######################
+############
+#  Common  #
+############
+
+# matplotlib display format for bits (b, Kb, Mb)
+bit_format = None
+Kbit_format = FuncFormatter(lambda x, pos: '%1.1f' % (x*1024**-1) + 'Kb')
+Mbit_format = FuncFormatter(lambda x, pos: '%1.1f' % (x*1024**-2) + 'Mb')
 
 def random_hash():
     return InfoHash(''.join(random.SystemRandom().choice(string.hexdigits) for _ in range(40)).encode())
 
-
 def reset_before_test(featureTestMethod):
     """
     This is a decorator for all test methods needing reset().
@@ -37,6 +45,84 @@ def reset_before_test(featureTestMethod):
         return featureTestMethod(*args, **kwargs)
     return call
 
+def display_plot(yvals, xvals=None, yformatter=None, display_time=3, **kwargs):
+    """
+    Displays a plot of data in interactive mode. This method is made to be
+    called successively for plot refreshing.
+
+    @param yvals:  Ordinate values (float).
+    @type  yvals:  list
+    @param xvals:  Abscissa values (float).
+    @type  xvals:  list
+    @param yformatter:  The matplotlib FuncFormatter to use for y values.
+    @type  yformatter:  matplotlib.ticker.FuncFormatter
+    @param displaytime:  The time matplotlib can take to refresht the plot.
+    @type  displaytime:  int
+    """
+    plt.ion()
+    plt.clf()
+    plt.show()
+    if yformatter:
+        plt.axes().yaxis.set_major_formatter(Kbit_format)
+    if xvals:
+        plt.plot(xvals, yvals, **kwargs)
+    else:
+        plt.plot(yvals, **kwargs)
+    plt.pause(display_time)
+
+def iftop_traffic_data(ifname, interval=2, rate_type='send_receive'):
+    """
+    Generator (yields data) function collecting traffic data from iftop
+    subprocess.
+
+    @param ifname: Interface to listen to.
+    @type  ifname: string
+    @param interval: Interval of time between to data collections. Possible
+                     values are 2, 10 or 40.
+    @type  interval: int
+    @param rates: (default: send_receive) Wether to pick "send", "receive"
+                  or "send and receive" rates. Possible values : "send",
+                  "receive" and "send_receive".
+    @type  rates: string
+    @param _format: Format in which to display data on the y axis.
+                    Possible values: Mb, Kb or b.
+    @type  _format: string
+    """
+    # iftop stdout string format
+    SEND_RATE_STR               = "Total send rate"
+    RECEIVE_RATE_STR            = "Total receive rate"
+    SEND_RECEIVE_RATE_STR       = "Total send and receive rate"
+    RATE_STR = {
+            "send"         : SEND_RATE_STR,
+            "receive"      : RECEIVE_RATE_STR,
+            "send_receive" : SEND_RECEIVE_RATE_STR
+    }
+    TWO_SECONDS_RATE_COL    = 0
+    TEN_SECONDS_RATE_COL    = 1
+    FOURTY_SECONDS_RATE_COL = 2
+    COLS = {
+            2  : TWO_SECONDS_RATE_COL,
+            10 : TEN_SECONDS_RATE_COL,
+            40 : FOURTY_SECONDS_RATE_COL
+    }
+    FLOAT_REGEX = "[0-9]+[.]*[0-9]*"
+    BIT_REGEX = "[KM]*b"
+
+    iftop = subprocess.Popen(["iftop", "-i", ifname, "-t"], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
+    while True:
+        line = iftop.stdout.readline().decode()
+        if RATE_STR[rate_type] in line:
+            rate, unit = re.findall("("+FLOAT_REGEX+")("+BIT_REGEX+")", line)[COLS[interval]]
+            rate = float(rate)
+            if unit == "Kb":
+                rate *= 1024
+            elif unit == "Mb":
+                rate *= 1024**2
+            yield rate
+
+###########
+#  Tests  #
+###########
 
 class FeatureTest(object):
     done = 0
@@ -96,7 +182,8 @@ class DhtFeatureTest(FeatureTest):
 
     @staticmethod
     def getcb(value):
-        DhtNetwork.log('[GET]: %s' % value)
+        vstr = value.__str__()[:100]
+        DhtNetwork.log('[GET]: %s' % vstr + ("..." if len(vstr) > 100 else ""))
         DhtFeatureTest.foreignValues.append(value)
         return True
 
@@ -123,10 +210,11 @@ class DhtFeatureTest(FeatureTest):
     def _dhtPut(self, producer, _hash, *values):
         for val in values:
             with FeatureTest.lock:
-                DhtNetwork.log('[PUT]: %s' % val)
+                vstr = val.__str__()[:100]
+                DhtNetwork.log('[PUT]: %s' % vstr + ("..." if len(vstr) > 100 else ""))
                 FeatureTest.done += 1
-                producer.put(_hash, val, DhtFeatureTest.putDoneCb)
                 while FeatureTest.done > 0:
+                    producer.put(_hash, val, DhtFeatureTest.putDoneCb)
                     FeatureTest.lock.wait()
 
     def _dhtGet(self, consumer, _hash):
@@ -134,17 +222,24 @@ class DhtFeatureTest(FeatureTest):
         DhtFeatureTest.foreignNodes = []
         with FeatureTest.lock:
             FeatureTest.done += 1
-            consumer.get(_hash, DhtFeatureTest.getcb, DhtFeatureTest.getDoneCb)
             while FeatureTest.done > 0:
+                consumer.get(_hash, DhtFeatureTest.getcb, DhtFeatureTest.getDoneCb)
                 FeatureTest.lock.wait()
 
+    def _gottaGetThemAllPokeNodes(self, consumer, hashes, nodes=None):
+        for h in hashes:
+            self._dhtGet(consumer, h)
+            if nodes is not None:
+                for n in DhtFeatureTest.foreignNodes:
+                    nodes.add(n)
+
 
 class PersistenceTest(DhtFeatureTest):
     """
     This tests persistence of data on the network.
     """
 
-    def __init__(self, test, workbench, *opts):
+    def __init__(self, test, workbench, opts):
         """
         @param test: is one of the following:
                      - 'mult_time': test persistence of data based on internal
@@ -157,13 +252,53 @@ class PersistenceTest(DhtFeatureTest):
 
         OPTIONS
 
-        - dump_str_log: enables storage log at test ending.
+        - dump_str_log:  Enables storage log at test ending.
+        - keep_alive:    Keeps the test running indefinately. This may be useful
+                         to manually analyse the network traffic during a longer
+                         period.
+        - num_producers: Number of producers of data during a DHT test.
+        - num_values:    Number of values to initialize the DHT with.
         """
 
         # opts
         super(PersistenceTest, self).__init__(test, workbench)
-        self._dump_storage = True if 'dump_str_log' in opts else False
-        self._plot = True if 'plot' in opts else False
+        self._traffic_plot  = True if 'traffic_plot' in opts else False
+        self._dump_storage  = True if 'dump_str_log' in opts else False
+        self._op_plot       = True if 'op_plot' in opts else False
+        self._keep_alive    = True if 'keep_alive' in opts else False
+        self._num_producers = opts['num_producers'] if 'num_producers' in opts else None
+        self._num_values    = opts['num_values'] if 'num_values' in opts else None
+
+    def _trigger_dp(self, trigger_nodes, _hash, count=1):
+        """
+        Triggers the data persistence over time. In order to this, `count` nodes
+        are created with an id around the hash of a value.
+
+        @param trigger_nodes: List of created nodes. The nodes created in this
+                              function are append to this list.
+        @type  trigger_nodes: list
+        @param _hash: Is the id of the value around which creating nodes.
+        @type  _hash: InfoHash
+        @param count: The number of nodes to create with id around the id of
+                      value.
+        @type  count: int
+        """
+        _hash_str = _hash.toString().decode()
+        _hash_int = int(_hash_str, 16)
+        for i in range(int(-count/2), int(count/2)+1):
+            _hash_str = '{:40x}'.format(_hash_int + i)
+            config = DhtConfig()
+            config.setNodeId(InfoHash(_hash_str.encode()))
+            n = DhtRunner()
+            n.run(config=config)
+            n.bootstrap(self.bootstrap.ip4,
+                        str(self.bootstrap.port))
+            DhtNetwork.log('Node','['+_hash_str+']',
+                           'started around', _hash.toString().decode()
+                           if n.isRunning() else
+                           'failed to start...'
+            )
+            trigger_nodes.append(n)
 
     def _result(self, local_values, new_nodes):
         bootstrap = self.bootstrap
@@ -180,7 +315,6 @@ class PersistenceTest(DhtFeatureTest):
                 if self._dump_storage:
                     DhtNetwork.log('Dumping all storage log from '\
                                   'hosting nodes.')
-
                     for proc in self._workbench.procs:
                         proc.sendNodesRequest(DhtNetworkSubProcess.DUMP_STORAGE_REQ, DhtFeatureTest.foreignNodes)
             else:
@@ -188,21 +322,104 @@ class PersistenceTest(DhtFeatureTest):
 
     def run(self):
         try:
-            if self._test == 'delete':
+            if self._test == 'normal':
+                self._totallyNormalTest()
+            elif self._test == 'delete':
                 self._deleteTest()
             elif self._test == 'replace':
                 self._replaceClusterTest()
             elif self._test == 'mult_time':
                 self._multTimeTest()
+            else:
+                raise NameError("This test is not defined '" + self._test + "'")
         except Exception as e:
             print(e)
         finally:
+            if self._traffic_plot or self._op_plot:
+                plot_fname = "traffic-plot"
+                print('plot saved to', plot_fname)
+                plt.savefig(plot_fname)
             self.bootstrap.resize(1)
 
     ###########
     #  Tests  #
     ###########
 
+    @reset_before_test
+    def _totallyNormalTest(self):
+        """
+        Reproduces a network in a realistic state.
+        """
+        trigger_nodes = []
+        wb = self._workbench
+        bootstrap = self.bootstrap
+        # Value representing an ICE packet. Each ICE packet is around 1KB.
+        VALUE_SIZE = 1024
+        NUM_VALUES = self._num_values/wb.node_num if self._num_values else 50
+        nr_values = NUM_VALUES * wb.node_num
+        nr_nodes = wb.node_num
+        nr_nodes_cv = threading.Condition()
+
+        values = [b''.join(random.choice(string.hexdigits).encode() for _ in range(VALUE_SIZE)) for __ in range(NUM_VALUES)]
+        hashes = [random_hash() for _ in range(wb.node_num)]
+
+        # initial set of values
+        for h in hashes:
+           self._dhtPut(bootstrap.front(), h, *[Value(v) for v in values])
+
+        def normalBehavior(do, t, log=None):
+            nonlocal nr_values
+            while True:
+                do()
+                time.sleep(random.choice(range(t)))
+
+        def putRequest():
+            nonlocal hashes, values, nr_values
+            nr_values += 1
+            DhtNetwork.log("Random value put on the DHT.", "(now "+ str(nr_values)+" values on the dht)")
+            random.choice(wb.procs).sendNodePutRequest(random.choice(hashes).toString(), random.choice(values))
+        puts = threading.Thread(target=normalBehavior, args=(putRequest, 30))
+        puts.daemon = True
+        puts.start()
+        def newNodeRequest():
+            nonlocal nr_nodes
+            with nr_nodes_cv:
+                nr_nodes += 1
+                DhtNetwork.log("Node joining the DHT.", "(now "+str(nr_nodes)+" nodes on the dht)")
+                nr_nodes_cv.notify()
+            random.choice(wb.procs).sendNodesRequest(DhtNetworkSubProcess.NEW_NODE_REQ)
+        connections = threading.Thread(target=normalBehavior, args=(newNodeRequest, 1*60))
+        connections.daemon = True
+        connections.start()
+        def shutdownNodeRequest():
+            nonlocal nr_nodes
+            with nr_nodes_cv:
+                nr_nodes -= 1
+                DhtNetwork.log("Node quitting the DHT.", "(now "+str(nr_nodes)+" nodes on the dht)")
+                nr_nodes_cv.notify()
+            random.choice(wb.procs).sendNodesRequest(DhtNetworkSubProcess.SHUTDOWN_NODE_REQ)
+        shutdowns = threading.Thread(target=normalBehavior, args=(shutdownNodeRequest, 1*60))
+        shutdowns.daemon = True
+        shutdowns.start()
+
+        for h in hashes:
+           self._trigger_dp(trigger_nodes, h)
+
+        if self._traffic_plot:
+            ydata = []
+            xdata = []
+            # warning: infinite loop
+            interval = 2
+            for rate in iftop_traffic_data("br"+wb.ifname, interval=interval):
+                ydata.append(rate)
+                xdata.append((xdata[-1] if len(xdata) > 0 else 0) + interval)
+                display_plot(ydata, xvals=xdata, yformatter=Kbit_format, color='blue')
+        else:
+            # blocks in matplotlib thread
+            while True:
+                plt.pause(3600)
+
+
     @reset_before_test
     def _deleteTest(self):
         """
@@ -252,8 +469,8 @@ class PersistenceTest(DhtFeatureTest):
                     DhtNetwork.log('sending message stats request')
                     stats = proc.sendGetMessageStats()
                     cluster_ops_count += sum(stats[1:])
-                    DhtNetwork.log("3 seconds wait...")
-                    time.sleep(3)
+                    DhtNetwork.log("5 seconds wait...")
+                    time.sleep(5)
                 ops_count.append(cluster_ops_count/self._workbench.node_num)
 
                 # checking if values were transfered to new nodes
@@ -264,23 +481,17 @@ class PersistenceTest(DhtFeatureTest):
 
                 self._result(local_values, new_nodes)
 
-            if self._plot:
-                plt.plot(ops_count, color='blue')
-                plt.draw()
-                plt.ioff()
-                plt.show()
+            if self._op_plot:
+                display_plot(ops_count, color='blue')
         else:
             DhtNetwork.log("[GET]: either couldn't fetch values or nodes hosting values...")
 
-    #TODO: complete this test.
     @reset_before_test
     def _replaceClusterTest(self):
         """
         It replaces all clusters one after the other.
         """
-
-        #clusters = opts['clusters'] if 'clusters' in opts else 5
-        clusters = 5
+        clusters = 8
 
         bootstrap = self.bootstrap
 
@@ -310,7 +521,6 @@ class PersistenceTest(DhtFeatureTest):
 
         self._result(local_values, new_nodes)
 
-    #TODO: complete this test.
     @reset_before_test
     def _multTimeTest(self):
         """
@@ -319,75 +529,59 @@ class PersistenceTest(DhtFeatureTest):
         enable storage maintenance each nodes. Therefor, this tests will wait 10
         minutes for the nodes to trigger storage maintenance.
         """
+        trigger_nodes = []
         bootstrap = self.bootstrap
 
-        N_PRODUCERS = 16
+        N_PRODUCERS = self._num_producers if self._num_values else 16
+        DP_TIMEOUT = 1
 
         hashes = []
-        values = [Value(b'foo')]
-        nodes = set([])
-
-        # prevents garbage collecting of unused flood nodes during the test.
-        flood_nodes = []
-
-        def gottaGetThemAllPokeNodes(nodes=None):
-            nonlocal consumer, hashes
-            for h in hashes:
-                self._dhtGet(consumer, h)
-                if nodes is not None:
-                    for n in DhtFeatureTest.foreignNodes:
-                        nodes.add(n)
-
-        def createNodesAroundHash(_hash, radius=4):
-            nonlocal flood_nodes
-
-            _hash_str = _hash.toString().decode()
-            _hash_int = int(_hash_str, 16)
-            for i in range(-radius, radius+1):
-                _hash_str = '{:40x}'.format(_hash_int + i)
-                config = DhtConfig()
-                config.setNodeId(InfoHash(_hash_str.encode()))
-                n = DhtRunner()
-                n.run(config=config)
-                n.bootstrap(self.bootstrap.ip4,
-                            str(self.bootstrap.port))
-                flood_nodes.append(n)
+
+        # Generating considerable amount of values of size 1KB.
+        VALUE_SIZE = 1024
+        NUM_VALUES = self._num_values if self._num_values else 50
+        values = [Value(''.join(random.choice(string.hexdigits) for _ in range(VALUE_SIZE)).encode()) for _ in range(NUM_VALUES)]
 
         bootstrap.resize(N_PRODUCERS+2)
-        consumer = bootstrap.get(1)
-        producers = (bootstrap.get(n) for n in range(2,N_PRODUCERS+2))
+        consumer = bootstrap.get(N_PRODUCERS+1)
+        producers = (bootstrap.get(n) for n in range(1,N_PRODUCERS+1))
         for p in producers:
             hashes.append(random_hash())
             self._dhtPut(p, hashes[-1], *values)
 
-        gottaGetThemAllPokeNodes(nodes=nodes)
+        once = True
+        while self._keep_alive or once:
+            nodes = set([])
+            self._gottaGetThemAllPokeNodes(consumer, hashes, nodes=nodes)
 
-        DhtNetwork.log("Values are found on:")
-        for n in nodes:
-            DhtNetwork.log(n)
+            DhtNetwork.log("Values are found on:")
+            for n in nodes:
+                DhtNetwork.log(n)
 
-        DhtNetwork.log("Creating 8 nodes around all of these nodes...")
-        for _hash in hashes:
-            createNodesAroundHash(_hash)
+            DhtNetwork.log("Creating 8 nodes around all of these hashes...")
+            for _hash in hashes:
+                self._trigger_dp(trigger_nodes, _hash, count=8)
 
-        DhtNetwork.log('Waiting 10 minutes for normal storage maintenance.')
-        time.sleep(10*60)
+            DhtNetwork.log('Waiting', DP_TIMEOUT+1, 'minutes for normal storage maintenance.')
+            time.sleep((DP_TIMEOUT+1)*60)
 
-        DhtNetwork.log('Deleting old nodes from previous search.')
-        for proc in self._workbench.procs:
-            DhtNetwork.log('[REMOVE]: sending shutdown request to', proc)
-            proc.sendNodesRequest(
-                DhtNetworkSubProcess.REMOVE_NODE_REQ,
-                nodes
-            )
+            DhtNetwork.log('Deleting old nodes from previous search.')
+            for proc in self._workbench.procs:
+                DhtNetwork.log('[REMOVE]: sending delete request to', proc)
+                proc.sendNodesRequest(
+                    DhtNetworkSubProcess.REMOVE_NODE_REQ,
+                    nodes)
+
+            # new consumer (fresh cache)
+            bootstrap.resize(N_PRODUCERS+1)
+            bootstrap.resize(N_PRODUCERS+2)
+            consumer = bootstrap.get(N_PRODUCERS+1)
 
-        # new consumer (fresh cache)
-        bootstrap.resize(N_PRODUCERS+3)
-        consumer = bootstrap.get(N_PRODUCERS+2)
+            nodes_after_time = set([])
+            self._gottaGetThemAllPokeNodes(consumer, hashes, nodes=nodes_after_time)
+            self._result(values, nodes_after_time - nodes)
 
-        nodes_after_time = set([])
-        gottaGetThemAllPokeNodes(nodes=nodes_after_time)
-        self._result(values, nodes_after_time - nodes)
+            once = False
 
 
 class PerformanceTest(DhtFeatureTest):
@@ -395,7 +589,7 @@ class PerformanceTest(DhtFeatureTest):
     Tests for general performance of dht operations.
     """
 
-    def __init__(self, test, workbench, *opts):
+    def __init__(self, test, workbench, opts):
         """
         @param test: is one of the following:
                      - 'gets': multiple get operations and statistical results.
@@ -412,6 +606,8 @@ class PerformanceTest(DhtFeatureTest):
                 self._getsTimesTest()
             elif self._test == 'delete':
                 self._delete()
+            else:
+                raise NameError("This test is not defined '" + self._test + "'")
         except Exception as e:
             print(e)
         finally:
-- 
GitLab