diff --git a/python/tools/benchmark.py b/python/tools/benchmark.py index 1840d662c91947f26c324c13c3f1483aa5cba157..0d20a93a0f7073896c909cc358702edc16ebd71d 100755 --- a/python/tools/benchmark.py +++ b/python/tools/benchmark.py @@ -23,6 +23,7 @@ import subprocess import signal import argparse import time +import random from dht.network import DhtNetwork from dht.network import DhtNetworkSubProcess @@ -150,17 +151,22 @@ if __name__ == '__main__': testArgs.add_argument('-t', '--test', type=str, default=None, required=True, help='Specifies the test.') testArgs.add_argument('-o', '--opt', type=str, default=[], nargs='+', help='Options passed to tests routines.') + testArgs.add_argument('-m', type=int, default=None, help='Generic size option passed to tests.') + testArgs.add_argument('-e', type=int, default=None, help='Generic size option passed to tests.') featureArgs = parser.add_mutually_exclusive_group(required=True) - featureArgs.add_argument('--performance', action='store_true', default=0, + featureArgs.add_argument('--performance', action='store_true', default=False, help='Launches performance benchmark test. Available args for "-t" are: gets.') featureArgs.add_argument('--data-persistence', action='store_true', default=0, help='Launches data persistence benchmark test. '\ - 'Available args for "-t" are: delete, replace, mult_time. '\ - 'Available args for "-o" are : dump_str_log') + 'Available args for "-t" are: delete, replace, mult_time. '\ + 'Available args for "-o" are : dump_str_log, keep_alive, trigger, traffic_plot, op_plot. '\ + 'Use "-m" to specify the number of producers on the DHT.'\ + 'Use "-e" to specify the number of values to put on the DHT.') args = parser.parse_args() + test_opt = { o : True for o in args.opt } wb = WorkBench(args.ifname, args.virtual_locs, args.node_num, loss=args.loss, delay=args.delay, disable_ipv4=args.disable_ipv4, @@ -175,10 +181,17 @@ if __name__ == '__main__': for i in range(wb.clusters): wb.start_cluster(i) + # recover -e and -m values. + if args.e: + test_opt.update({ 'num_values' : args.e }) + if args.m: + test_opt.update({ 'num_producers' : args.m }) + + # run the test if args.performance: - PerformanceTest(args.test, wb, *args.opt).run() + PerformanceTest(args.test, wb, test_opt).run() elif args.data_persistence: - PersistenceTest(args.test, wb, *args.opt).run() + PersistenceTest(args.test, wb, test_opt).run() except Exception as e: print(e) diff --git a/python/tools/dht/network.py b/python/tools/dht/network.py index b0b68baf0af07321451c1c1f9af75a54af9d7c44..7a84c1a5979f52ef92958d5ababffbee1b062aa6 100755 --- a/python/tools/dht/network.py +++ b/python/tools/dht/network.py @@ -37,7 +37,8 @@ from opendht import * # useful functions b_space_join = lambda *l: b' '.join(map(bytes, l)) -# TODO: find where token "notifyend" gets printed... +# TODO: find where token "notifyend" gets printed... Or switch to MSGPACK for +# serialisation of packets between both processes. class DhtNetworkSubProcess(NSPopen): """ Handles communication with DhtNetwork sub process. @@ -47,6 +48,8 @@ class DhtNetworkSubProcess(NSPopen): therefor, waits for the sub process to spawn. """ # requests + NODE_PUT_REQ = b"np" + NEW_NODE_REQ = b"nn" REMOVE_NODE_REQ = b"rn" SHUTDOWN_NODE_REQ = b"sdn" SHUTDOWN_REPLACE_NODE_REQ = b'sdrn' @@ -173,7 +176,7 @@ class DhtNetworkSubProcess(NSPopen): process. @param answer_cb: Callback to call when an answer is given after notify. - The function takes a list of lines as argument. + The function takes a list of strings as argument. @type answer_cb: function """ notified = False @@ -228,6 +231,18 @@ class DhtNetworkSubProcess(NSPopen): self._sendRequest(DhtNetworkSubProcess.MESSAGE_STATS, answer_cb=cb) return stats + def sendNodePutRequest(self, _hash, value): + """ + Sends a put operation request. + + @param _hash: the hash of the value. + @type _hash: bytes. + @param value: the value. + @type value: bytes. + """ + self._sendRequest(b_space_join(DhtNetworkSubProcess.NODE_PUT_REQ, _hash, + value)) + def sendNodesRequest(self, request, ids=b''): """ Send request to a list of nodes or the whole cluster. @@ -469,6 +484,8 @@ if __name__ == '__main__': except queue.Empty: pass else: + NODE_PUT_REQ = DhtNetworkSubProcess.NODE_PUT_REQ.decode() + NEW_NODE_REQ = DhtNetworkSubProcess.NEW_NODE_REQ.decode() REMOVE_NODE_REQ = DhtNetworkSubProcess.REMOVE_NODE_REQ.decode() SHUTDOWN_NODE_REQ = DhtNetworkSubProcess.SHUTDOWN_NODE_REQ.decode() SHUTDOWN_REPLACE_NODE_REQ = DhtNetworkSubProcess.SHUTDOWN_REPLACE_NODE_REQ.decode() @@ -479,18 +496,30 @@ if __name__ == '__main__': if req in [SHUTDOWN_NODE_REQ, SHUTDOWN_REPLACE_NODE_REQ, REMOVE_NODE_REQ]: - DhtNetwork.log('got node deletion request.') - for n in req_args: + def delete_request(req, n): + global msg_stats if req == SHUTDOWN_NODE_REQ: net.end_node(id=n, shutdown=True, last_msg_stats=msg_stats) elif req == SHUTDOWN_REPLACE_NODE_REQ: net.replace_node(id=n, shutdown=True, last_msg_stats=msg_stats) elif req == REMOVE_NODE_REQ: net.end_node(id=n, last_msg_stats=msg_stats) + + if len(req) > 0: + for n in req_args: + delete_request(req, n) + else: + delete_request(req, net.get().getNodeId()) elif req == SHUTDOWN_CLUSTER_REQ: for n in net.nodes: - n.end_node(shutdown=True, last_msg_stats=msg_stats) + net.end_node(id=n[2], shutdown=True, last_msg_stats=msg_stats) quit = True + elif req == NEW_NODE_REQ: + net.launch_node() + elif req == NODE_PUT_REQ: + _hash, v = req_args[:2] + net.get().put(InfoHash(_hash), Value(v)) + elif req == DUMP_STORAGE_REQ: for n in [m[1] for m in net.nodes if m[1].getNodeId() in req_args]: net.log(n.getStorageLog()) diff --git a/python/tools/dht/tests.py b/python/tools/dht/tests.py index 7fa31cb7dcf2aecc7a4ca506dd79d365064b6802..894c7c0e72c44df4a7f03074bcaa21d69387909f 100644 --- a/python/tools/dht/tests.py +++ b/python/tools/dht/tests.py @@ -3,25 +3,33 @@ # Author(s): Adrien Béraud <adrien.beraud@savoirfairelinux.com> # Simon Désaulniers <sim.desaulniers@gmail.com> +import os import threading import random import string import time +import subprocess +import re import numpy as np import matplotlib.pyplot as plt +from matplotlib.ticker import FuncFormatter from opendht import * from dht.network import DhtNetwork, DhtNetworkSubProcess -###################### -# Common functions # -###################### +############ +# Common # +############ + +# matplotlib display format for bits (b, Kb, Mb) +bit_format = None +Kbit_format = FuncFormatter(lambda x, pos: '%1.1f' % (x*1024**-1) + 'Kb') +Mbit_format = FuncFormatter(lambda x, pos: '%1.1f' % (x*1024**-2) + 'Mb') def random_hash(): return InfoHash(''.join(random.SystemRandom().choice(string.hexdigits) for _ in range(40)).encode()) - def reset_before_test(featureTestMethod): """ This is a decorator for all test methods needing reset(). @@ -37,6 +45,84 @@ def reset_before_test(featureTestMethod): return featureTestMethod(*args, **kwargs) return call +def display_plot(yvals, xvals=None, yformatter=None, display_time=3, **kwargs): + """ + Displays a plot of data in interactive mode. This method is made to be + called successively for plot refreshing. + + @param yvals: Ordinate values (float). + @type yvals: list + @param xvals: Abscissa values (float). + @type xvals: list + @param yformatter: The matplotlib FuncFormatter to use for y values. + @type yformatter: matplotlib.ticker.FuncFormatter + @param displaytime: The time matplotlib can take to refresht the plot. + @type displaytime: int + """ + plt.ion() + plt.clf() + plt.show() + if yformatter: + plt.axes().yaxis.set_major_formatter(Kbit_format) + if xvals: + plt.plot(xvals, yvals, **kwargs) + else: + plt.plot(yvals, **kwargs) + plt.pause(display_time) + +def iftop_traffic_data(ifname, interval=2, rate_type='send_receive'): + """ + Generator (yields data) function collecting traffic data from iftop + subprocess. + + @param ifname: Interface to listen to. + @type ifname: string + @param interval: Interval of time between to data collections. Possible + values are 2, 10 or 40. + @type interval: int + @param rates: (default: send_receive) Wether to pick "send", "receive" + or "send and receive" rates. Possible values : "send", + "receive" and "send_receive". + @type rates: string + @param _format: Format in which to display data on the y axis. + Possible values: Mb, Kb or b. + @type _format: string + """ + # iftop stdout string format + SEND_RATE_STR = "Total send rate" + RECEIVE_RATE_STR = "Total receive rate" + SEND_RECEIVE_RATE_STR = "Total send and receive rate" + RATE_STR = { + "send" : SEND_RATE_STR, + "receive" : RECEIVE_RATE_STR, + "send_receive" : SEND_RECEIVE_RATE_STR + } + TWO_SECONDS_RATE_COL = 0 + TEN_SECONDS_RATE_COL = 1 + FOURTY_SECONDS_RATE_COL = 2 + COLS = { + 2 : TWO_SECONDS_RATE_COL, + 10 : TEN_SECONDS_RATE_COL, + 40 : FOURTY_SECONDS_RATE_COL + } + FLOAT_REGEX = "[0-9]+[.]*[0-9]*" + BIT_REGEX = "[KM]*b" + + iftop = subprocess.Popen(["iftop", "-i", ifname, "-t"], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL) + while True: + line = iftop.stdout.readline().decode() + if RATE_STR[rate_type] in line: + rate, unit = re.findall("("+FLOAT_REGEX+")("+BIT_REGEX+")", line)[COLS[interval]] + rate = float(rate) + if unit == "Kb": + rate *= 1024 + elif unit == "Mb": + rate *= 1024**2 + yield rate + +########### +# Tests # +########### class FeatureTest(object): done = 0 @@ -96,7 +182,8 @@ class DhtFeatureTest(FeatureTest): @staticmethod def getcb(value): - DhtNetwork.log('[GET]: %s' % value) + vstr = value.__str__()[:100] + DhtNetwork.log('[GET]: %s' % vstr + ("..." if len(vstr) > 100 else "")) DhtFeatureTest.foreignValues.append(value) return True @@ -123,10 +210,11 @@ class DhtFeatureTest(FeatureTest): def _dhtPut(self, producer, _hash, *values): for val in values: with FeatureTest.lock: - DhtNetwork.log('[PUT]: %s' % val) + vstr = val.__str__()[:100] + DhtNetwork.log('[PUT]: %s' % vstr + ("..." if len(vstr) > 100 else "")) FeatureTest.done += 1 - producer.put(_hash, val, DhtFeatureTest.putDoneCb) while FeatureTest.done > 0: + producer.put(_hash, val, DhtFeatureTest.putDoneCb) FeatureTest.lock.wait() def _dhtGet(self, consumer, _hash): @@ -134,17 +222,24 @@ class DhtFeatureTest(FeatureTest): DhtFeatureTest.foreignNodes = [] with FeatureTest.lock: FeatureTest.done += 1 - consumer.get(_hash, DhtFeatureTest.getcb, DhtFeatureTest.getDoneCb) while FeatureTest.done > 0: + consumer.get(_hash, DhtFeatureTest.getcb, DhtFeatureTest.getDoneCb) FeatureTest.lock.wait() + def _gottaGetThemAllPokeNodes(self, consumer, hashes, nodes=None): + for h in hashes: + self._dhtGet(consumer, h) + if nodes is not None: + for n in DhtFeatureTest.foreignNodes: + nodes.add(n) + class PersistenceTest(DhtFeatureTest): """ This tests persistence of data on the network. """ - def __init__(self, test, workbench, *opts): + def __init__(self, test, workbench, opts): """ @param test: is one of the following: - 'mult_time': test persistence of data based on internal @@ -157,13 +252,53 @@ class PersistenceTest(DhtFeatureTest): OPTIONS - - dump_str_log: enables storage log at test ending. + - dump_str_log: Enables storage log at test ending. + - keep_alive: Keeps the test running indefinately. This may be useful + to manually analyse the network traffic during a longer + period. + - num_producers: Number of producers of data during a DHT test. + - num_values: Number of values to initialize the DHT with. """ # opts super(PersistenceTest, self).__init__(test, workbench) - self._dump_storage = True if 'dump_str_log' in opts else False - self._plot = True if 'plot' in opts else False + self._traffic_plot = True if 'traffic_plot' in opts else False + self._dump_storage = True if 'dump_str_log' in opts else False + self._op_plot = True if 'op_plot' in opts else False + self._keep_alive = True if 'keep_alive' in opts else False + self._num_producers = opts['num_producers'] if 'num_producers' in opts else None + self._num_values = opts['num_values'] if 'num_values' in opts else None + + def _trigger_dp(self, trigger_nodes, _hash, count=1): + """ + Triggers the data persistence over time. In order to this, `count` nodes + are created with an id around the hash of a value. + + @param trigger_nodes: List of created nodes. The nodes created in this + function are append to this list. + @type trigger_nodes: list + @param _hash: Is the id of the value around which creating nodes. + @type _hash: InfoHash + @param count: The number of nodes to create with id around the id of + value. + @type count: int + """ + _hash_str = _hash.toString().decode() + _hash_int = int(_hash_str, 16) + for i in range(int(-count/2), int(count/2)+1): + _hash_str = '{:40x}'.format(_hash_int + i) + config = DhtConfig() + config.setNodeId(InfoHash(_hash_str.encode())) + n = DhtRunner() + n.run(config=config) + n.bootstrap(self.bootstrap.ip4, + str(self.bootstrap.port)) + DhtNetwork.log('Node','['+_hash_str+']', + 'started around', _hash.toString().decode() + if n.isRunning() else + 'failed to start...' + ) + trigger_nodes.append(n) def _result(self, local_values, new_nodes): bootstrap = self.bootstrap @@ -180,7 +315,6 @@ class PersistenceTest(DhtFeatureTest): if self._dump_storage: DhtNetwork.log('Dumping all storage log from '\ 'hosting nodes.') - for proc in self._workbench.procs: proc.sendNodesRequest(DhtNetworkSubProcess.DUMP_STORAGE_REQ, DhtFeatureTest.foreignNodes) else: @@ -188,21 +322,104 @@ class PersistenceTest(DhtFeatureTest): def run(self): try: - if self._test == 'delete': + if self._test == 'normal': + self._totallyNormalTest() + elif self._test == 'delete': self._deleteTest() elif self._test == 'replace': self._replaceClusterTest() elif self._test == 'mult_time': self._multTimeTest() + else: + raise NameError("This test is not defined '" + self._test + "'") except Exception as e: print(e) finally: + if self._traffic_plot or self._op_plot: + plot_fname = "traffic-plot" + print('plot saved to', plot_fname) + plt.savefig(plot_fname) self.bootstrap.resize(1) ########### # Tests # ########### + @reset_before_test + def _totallyNormalTest(self): + """ + Reproduces a network in a realistic state. + """ + trigger_nodes = [] + wb = self._workbench + bootstrap = self.bootstrap + # Value representing an ICE packet. Each ICE packet is around 1KB. + VALUE_SIZE = 1024 + NUM_VALUES = self._num_values/wb.node_num if self._num_values else 50 + nr_values = NUM_VALUES * wb.node_num + nr_nodes = wb.node_num + nr_nodes_cv = threading.Condition() + + values = [b''.join(random.choice(string.hexdigits).encode() for _ in range(VALUE_SIZE)) for __ in range(NUM_VALUES)] + hashes = [random_hash() for _ in range(wb.node_num)] + + # initial set of values + for h in hashes: + self._dhtPut(bootstrap.front(), h, *[Value(v) for v in values]) + + def normalBehavior(do, t, log=None): + nonlocal nr_values + while True: + do() + time.sleep(random.choice(range(t))) + + def putRequest(): + nonlocal hashes, values, nr_values + nr_values += 1 + DhtNetwork.log("Random value put on the DHT.", "(now "+ str(nr_values)+" values on the dht)") + random.choice(wb.procs).sendNodePutRequest(random.choice(hashes).toString(), random.choice(values)) + puts = threading.Thread(target=normalBehavior, args=(putRequest, 30)) + puts.daemon = True + puts.start() + def newNodeRequest(): + nonlocal nr_nodes + with nr_nodes_cv: + nr_nodes += 1 + DhtNetwork.log("Node joining the DHT.", "(now "+str(nr_nodes)+" nodes on the dht)") + nr_nodes_cv.notify() + random.choice(wb.procs).sendNodesRequest(DhtNetworkSubProcess.NEW_NODE_REQ) + connections = threading.Thread(target=normalBehavior, args=(newNodeRequest, 1*60)) + connections.daemon = True + connections.start() + def shutdownNodeRequest(): + nonlocal nr_nodes + with nr_nodes_cv: + nr_nodes -= 1 + DhtNetwork.log("Node quitting the DHT.", "(now "+str(nr_nodes)+" nodes on the dht)") + nr_nodes_cv.notify() + random.choice(wb.procs).sendNodesRequest(DhtNetworkSubProcess.SHUTDOWN_NODE_REQ) + shutdowns = threading.Thread(target=normalBehavior, args=(shutdownNodeRequest, 1*60)) + shutdowns.daemon = True + shutdowns.start() + + for h in hashes: + self._trigger_dp(trigger_nodes, h) + + if self._traffic_plot: + ydata = [] + xdata = [] + # warning: infinite loop + interval = 2 + for rate in iftop_traffic_data("br"+wb.ifname, interval=interval): + ydata.append(rate) + xdata.append((xdata[-1] if len(xdata) > 0 else 0) + interval) + display_plot(ydata, xvals=xdata, yformatter=Kbit_format, color='blue') + else: + # blocks in matplotlib thread + while True: + plt.pause(3600) + + @reset_before_test def _deleteTest(self): """ @@ -252,8 +469,8 @@ class PersistenceTest(DhtFeatureTest): DhtNetwork.log('sending message stats request') stats = proc.sendGetMessageStats() cluster_ops_count += sum(stats[1:]) - DhtNetwork.log("3 seconds wait...") - time.sleep(3) + DhtNetwork.log("5 seconds wait...") + time.sleep(5) ops_count.append(cluster_ops_count/self._workbench.node_num) # checking if values were transfered to new nodes @@ -264,23 +481,17 @@ class PersistenceTest(DhtFeatureTest): self._result(local_values, new_nodes) - if self._plot: - plt.plot(ops_count, color='blue') - plt.draw() - plt.ioff() - plt.show() + if self._op_plot: + display_plot(ops_count, color='blue') else: DhtNetwork.log("[GET]: either couldn't fetch values or nodes hosting values...") - #TODO: complete this test. @reset_before_test def _replaceClusterTest(self): """ It replaces all clusters one after the other. """ - - #clusters = opts['clusters'] if 'clusters' in opts else 5 - clusters = 5 + clusters = 8 bootstrap = self.bootstrap @@ -310,7 +521,6 @@ class PersistenceTest(DhtFeatureTest): self._result(local_values, new_nodes) - #TODO: complete this test. @reset_before_test def _multTimeTest(self): """ @@ -319,75 +529,59 @@ class PersistenceTest(DhtFeatureTest): enable storage maintenance each nodes. Therefor, this tests will wait 10 minutes for the nodes to trigger storage maintenance. """ + trigger_nodes = [] bootstrap = self.bootstrap - N_PRODUCERS = 16 + N_PRODUCERS = self._num_producers if self._num_values else 16 + DP_TIMEOUT = 1 hashes = [] - values = [Value(b'foo')] - nodes = set([]) - - # prevents garbage collecting of unused flood nodes during the test. - flood_nodes = [] - - def gottaGetThemAllPokeNodes(nodes=None): - nonlocal consumer, hashes - for h in hashes: - self._dhtGet(consumer, h) - if nodes is not None: - for n in DhtFeatureTest.foreignNodes: - nodes.add(n) - - def createNodesAroundHash(_hash, radius=4): - nonlocal flood_nodes - - _hash_str = _hash.toString().decode() - _hash_int = int(_hash_str, 16) - for i in range(-radius, radius+1): - _hash_str = '{:40x}'.format(_hash_int + i) - config = DhtConfig() - config.setNodeId(InfoHash(_hash_str.encode())) - n = DhtRunner() - n.run(config=config) - n.bootstrap(self.bootstrap.ip4, - str(self.bootstrap.port)) - flood_nodes.append(n) + + # Generating considerable amount of values of size 1KB. + VALUE_SIZE = 1024 + NUM_VALUES = self._num_values if self._num_values else 50 + values = [Value(''.join(random.choice(string.hexdigits) for _ in range(VALUE_SIZE)).encode()) for _ in range(NUM_VALUES)] bootstrap.resize(N_PRODUCERS+2) - consumer = bootstrap.get(1) - producers = (bootstrap.get(n) for n in range(2,N_PRODUCERS+2)) + consumer = bootstrap.get(N_PRODUCERS+1) + producers = (bootstrap.get(n) for n in range(1,N_PRODUCERS+1)) for p in producers: hashes.append(random_hash()) self._dhtPut(p, hashes[-1], *values) - gottaGetThemAllPokeNodes(nodes=nodes) + once = True + while self._keep_alive or once: + nodes = set([]) + self._gottaGetThemAllPokeNodes(consumer, hashes, nodes=nodes) - DhtNetwork.log("Values are found on:") - for n in nodes: - DhtNetwork.log(n) + DhtNetwork.log("Values are found on:") + for n in nodes: + DhtNetwork.log(n) - DhtNetwork.log("Creating 8 nodes around all of these nodes...") - for _hash in hashes: - createNodesAroundHash(_hash) + DhtNetwork.log("Creating 8 nodes around all of these hashes...") + for _hash in hashes: + self._trigger_dp(trigger_nodes, _hash, count=8) - DhtNetwork.log('Waiting 10 minutes for normal storage maintenance.') - time.sleep(10*60) + DhtNetwork.log('Waiting', DP_TIMEOUT+1, 'minutes for normal storage maintenance.') + time.sleep((DP_TIMEOUT+1)*60) - DhtNetwork.log('Deleting old nodes from previous search.') - for proc in self._workbench.procs: - DhtNetwork.log('[REMOVE]: sending shutdown request to', proc) - proc.sendNodesRequest( - DhtNetworkSubProcess.REMOVE_NODE_REQ, - nodes - ) + DhtNetwork.log('Deleting old nodes from previous search.') + for proc in self._workbench.procs: + DhtNetwork.log('[REMOVE]: sending delete request to', proc) + proc.sendNodesRequest( + DhtNetworkSubProcess.REMOVE_NODE_REQ, + nodes) + + # new consumer (fresh cache) + bootstrap.resize(N_PRODUCERS+1) + bootstrap.resize(N_PRODUCERS+2) + consumer = bootstrap.get(N_PRODUCERS+1) - # new consumer (fresh cache) - bootstrap.resize(N_PRODUCERS+3) - consumer = bootstrap.get(N_PRODUCERS+2) + nodes_after_time = set([]) + self._gottaGetThemAllPokeNodes(consumer, hashes, nodes=nodes_after_time) + self._result(values, nodes_after_time - nodes) - nodes_after_time = set([]) - gottaGetThemAllPokeNodes(nodes=nodes_after_time) - self._result(values, nodes_after_time - nodes) + once = False class PerformanceTest(DhtFeatureTest): @@ -395,7 +589,7 @@ class PerformanceTest(DhtFeatureTest): Tests for general performance of dht operations. """ - def __init__(self, test, workbench, *opts): + def __init__(self, test, workbench, opts): """ @param test: is one of the following: - 'gets': multiple get operations and statistical results. @@ -412,6 +606,8 @@ class PerformanceTest(DhtFeatureTest): self._getsTimesTest() elif self._test == 'delete': self._delete() + else: + raise NameError("This test is not defined '" + self._test + "'") except Exception as e: print(e) finally: