From 77c480e2cfe2e6391ea221d323e616e25b81b938 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20D=C3=A9saulniers?= <rostydela@gmail.com> Date: Fri, 8 Jan 2016 17:44:19 -0500 Subject: [PATCH] refactored python benchmark in separate modules --- python/tools/benchmark.py | 724 +----------------- python/tools/dht/__init__.py | 0 .../tools/{dhtnetwork.py => dht/network.py} | 226 +++++- python/tools/dht/tests.py | 505 ++++++++++++ python/tools/dht/virtual_network_builder.py | 120 +++ python/tools/virtual_network_builder.py | 119 --- 6 files changed, 859 insertions(+), 835 deletions(-) create mode 100644 python/tools/dht/__init__.py rename python/tools/{dhtnetwork.py => dht/network.py} (58%) create mode 100644 python/tools/dht/tests.py create mode 100755 python/tools/dht/virtual_network_builder.py delete mode 100755 python/tools/virtual_network_builder.py diff --git a/python/tools/benchmark.py b/python/tools/benchmark.py index fe3bfc99..21a32956 100755 --- a/python/tools/benchmark.py +++ b/python/tools/benchmark.py @@ -19,22 +19,22 @@ import os import sys import subprocess -import time -import random -import string -import threading -import queue import signal import argparse -import re +import time -from pyroute2.netns.process.proxy import NSPopen import numpy as np import matplotlib.pyplot as plt -from dhtnetwork import DhtNetwork +from dht.network import DhtNetwork +from dht.network import DhtNetworkSubProcess +from dht.tests import PerformanceTest, PersistenceTest +from dht import virtual_network_builder +from dht import network as dhtnetwork + from opendht import * + class WorkBench(): """ This contains the initialisation information, such as ipv4/ipv6, number of @@ -67,7 +67,7 @@ class WorkBench(): def create_virtual_net(self): if self.virtual_locs > 1: - cmd = ["python3", "virtual_network_builder.py", "-i", self.ifname, "-n", str(self.clusters), '-l', str(self.loss), '-d', str(self.delay)] + cmd = ["python3", os.path.abspath(virtual_network_builder.__file__), "-i", self.ifname, "-n", str(self.clusters), '-l', str(self.loss), '-d', str(self.delay)] if not self.disable_ipv4: cmd.append('-4') if not self.disable_ipv6: @@ -79,11 +79,11 @@ class WorkBench(): def destroy_virtual_net(self): print('Shuting down the virtual IP network.') - subprocess.call(["python3", "virtual_network_builder.py", "-i", self.ifname, "-n", str(self.clusters), "-r"]) + subprocess.call(["python3", os.path.abspath(virtual_network_builder.__file__), "-i", self.ifname, "-n", str(self.clusters), "-r"]) def start_cluster(self, i): if self.local_bootstrap: - cmd = ["python3", "dhtnetwork.py", "-n", str(self.node_per_loc), '-I', self.ifname+str(i)+'.1'] + cmd = ["python3", os.path.abspath(dhtnetwork.__file__), "-n", str(self.node_per_loc), '-I', self.ifname+str(i)+'.1'] if self.remote_bootstrap: cmd.extend(['-b', self.remote_bootstrap, '-bp', "5000"]) else: @@ -132,708 +132,6 @@ class WorkBench(): self.stop_cluster(procs_count-i-1) -class DhtNetworkSubProcess(NSPopen): - """ - Handles communication with DhtNetwork sub process. - - When instanciated, the object's thread is started and will read the sub - process' stdout until it finds 'DhtNetworkSubProcess.NOTIFY_TOKEN' token, - therefor, waits for the sub process to spawn. - """ - # requests - REMOVE_NODE_REQ = b"rn" - SHUTDOWN_NODE_REQ = b"sdn" - SHUTDOWN_REPLACE_NODE_REQ = b'sdrn' - SHUTDOWN_CLUSTER_REQ = b"sdc" - DUMP_STORAGE_REQ = b"strl" - MESSAGE_STATS = b"gms" - - - # tokens - NOTIFY_TOKEN = 'notify' - NOTIFY_END_TOKEN = 'notifyend' - - def __init__(self, ns, cmd, quit=False, **kwargs): - super(DhtNetworkSubProcess, self).__init__(ns, cmd, **kwargs) - self._setStdoutFlags() - self._virtual_ns = ns - - self._quit = quit - self._lock = threading.Condition() - self._in_queue = queue.Queue() - self._out_queue = queue.Queue() - - # starting thread - self._thread = threading.Thread(target=self._communicate) - self._thread.daemon = True - self._thread.start() - - def __repr__(self): - return 'DhtNetwork on virtual namespace "%s"' % self._virtual_ns - - def _setStdoutFlags(self): - """ - Sets non-blocking read flags for subprocess stdout file descriptor. - """ - import fcntl - flags = self.stdout.fcntl(fcntl.F_GETFL) - self.stdout.fcntl(fcntl.F_SETFL, flags | os.O_NDELAY) - - def _communicate(self): - """ - Communication thread. This reads and writes to the sub process. - """ - ENCODING = 'utf-8' - sleep_time = 0.1 - stdin_line, stdout_line = '', '' - - # first read of process living. Expecting NOTIFY_TOKEN - while DhtNetworkSubProcess.NOTIFY_TOKEN not in stdout_line: - stdout_line = self.stdout.readline().decode() - time.sleep(sleep_time) - - with self._lock: - self._out_queue.put(stdout_line) - - while not self._quit: - with self._lock: - try: - stdin_line = self._in_queue.get_nowait() - - # sending data to sub process - self.stdin.write(stdin_line if isinstance(stdin_line, bytes) else - bytes(str(stdin_line), encoding=ENCODING)) - self.stdin.flush() - except queue.Empty: - #waiting for next stdin req to send - self._lock.wait(timeout=sleep_time) - - # reading response from sub process - for stdout_line in iter(self.stdout.readline, b''): - stdout_line = stdout_line.decode().replace('\n', '') - if stdout_line: - with self._lock: - self._out_queue.put(stdout_line) - - with self._lock: - self._lock.notify() - - def stop_communicating(self): - """ - Stops the I/O thread from communicating with the subprocess. - """ - if not self._quit: - self._quit = True - with self._lock: - self._lock.notify() - self._lock.wait() - - def quit(self): - """ - Notifies thread and sub process to terminate. This is blocking call - until the sub process finishes. - """ - self.stop_communicating() - self.send_signal(signal.SIGINT); - self.wait() - self.release() - - def send(self, msg): - """ - Send data to sub process. - """ - with self._lock: - self._in_queue.put(msg) - self._lock.notify() - - def getline(self): - """ - Read line from sub process. - - @return: A line on sub process' stdout. - @rtype : str - """ - line = '' - with self._lock: - try: - line = self._out_queue.get_nowait() - except queue.Empty: - pass - return line - - def getlinesUntilNotify(self, answer_cb=None): - """ - Reads the stdout queue until a proper notification is given by the sub - process. - - @param answer_cb: Callback to call when an answer is given after notify. - The function takes a list of lines as argument. - @type answer_cb: function - """ - notified = False - answer = [] - while True: - out = self.getline() - if out.split(' ')[0] == DhtNetworkSubProcess.NOTIFY_TOKEN: - notified = True - elif notified and out.split(' ')[0] == DhtNetworkSubProcess.NOTIFY_END_TOKEN: - if answer_cb: - answer_cb(answer) - break - elif notified: - answer.append(out) - elif out: - yield out - else: - time.sleep(0.1) - - def sendGetMessageStats(self): - """ - Sends DhtNetwork sub process statistics request about nodes messages - sent. - - @return: A list [num_nodes, ping, find, get, put, listen]. - @rtype : list - """ - stats = [] - def cb(answer): - """ - Callback fed to getlinesUntilNotify made to recover answer from the - DhtNetwork sub process. - - :answer: the list of lines answered by the sub process. - """ - nonlocal stats - if answer: - stats = [int(v) for v in re.findall("[0-9]+", answer.pop())] - - self.send(DhtNetworkSubProcess.MESSAGE_STATS + b'\n') - for line in self.getlinesUntilNotify(answer_cb=cb): - DhtNetwork.log(line) - - return stats - - def sendNodesRequest(self, request, ids): - """ - Shutsdown nodes on the DhtNetwork sub process. - - @param request: The request - @type request: bytes - @param ids: ids of nodes concerned by the request. - @type ids: list - """ - serialized_req = request + b' ' + b' '.join(map(bytes, ids)) - self.send(serialized_req + b'\n') - for line in self.getlinesUntilNotify(): - DhtNetwork.log(line) - - def sendShutdown(self): - """ - Shutdown the whole cluster. This does not terminate comunicating thread; - use quit(). - """ - self.send(DhtNetworkSubProcess.SHUTDOWN_CLUSTER_REQ + b'\n') - for line in self.getlinesUntilNotify(): - DhtNetwork.log(line) - - def sendDumpStorage(self, ids): - """ - Dumps storage log from nodes with id in `ids`. - """ - serialized_req = DhtNetworkSubProcess.DUMP_STORAGE_REQ + b' ' + \ - b' '.join(map(bytes, ids)) - self.send(serialized_req + b'\n') - for line in self.getlinesUntilNotify(): - DhtNetwork.log(line) - - -def random_hash(): - return InfoHash(''.join(random.SystemRandom().choice(string.hexdigits) for _ in range(40)).encode()) - -class FeatureTest(object): - """ - This is base test. A method run() implementation is required. - """ - #static variables used by class callbacks - successfullTransfer = lambda lv,fv: len(lv) == len(fv) - done = 0 - lock = None - foreign_nodes = None - foreign_values = None - - def __init__(self, test, workbench): - self._test = test - self.wb = workbench - self.bootstrap = self.wb.get_bootstrap() - - @staticmethod - def getcb(value): - DhtNetwork.log('[GET]: %s' % value) - FeatureTest.foreign_values.append(value) - return True - - @staticmethod - def putDoneCb(ok, nodes): - if not ok: - DhtNetwork.log("[PUT]: failed!") - with FeatureTest.lock: - FeatureTest.done -= 1 - FeatureTest.lock.notify() - - @staticmethod - def getDoneCb(ok, nodes): - with FeatureTest.lock: - if not ok: - DhtNetwork.log("[GET]: failed!") - else: - for node in nodes: - if not node.getNode().isExpired(): - FeatureTest.foreign_nodes.append(node.getId().toString()) - FeatureTest.done -= 1 - FeatureTest.lock.notify() - - def _dhtPut(self, producer, _hash, *values): - for val in values: - with FeatureTest.lock: - DhtNetwork.log('[PUT]: %s' % val) - FeatureTest.done += 1 - producer.put(_hash, val, FeatureTest.putDoneCb) - while FeatureTest.done > 0: - FeatureTest.lock.wait() - - def _dhtGet(self, consumer, _hash): - FeatureTest.foreign_values = [] - FeatureTest.foreign_nodes = [] - with FeatureTest.lock: - FeatureTest.done += 1 - consumer.get(_hash, FeatureTest.getcb, FeatureTest.getDoneCb) - while FeatureTest.done > 0: - FeatureTest.lock.wait() - - - def run(self): - raise NotImplementedError('This method must be implemented.') - -class PersistenceTest(FeatureTest): - """ - This tests persistence of data on the network. - """ - - def __init__(self, test, workbench, *opts): - """ - @param test: is one of the following: - - 'mult_time': test persistence of data based on internal - OpenDHT storage maintenance timings. - - 'delete': test persistence of data upon deletion of - nodes. - - 'replace': replacing cluster successively. - @type test: string - - - OPTIONS - - - dump_str_log: enables storage log at test ending. - """ - - # opts - super(PersistenceTest, self).__init__(test, workbench) - self._dump_storage = True if 'dump_str_log' in opts else False - self._plot = True if 'plot' in opts else False - - def _result(self, local_values, new_nodes): - bootstrap = self.bootstrap - if not FeatureTest.successfullTransfer(local_values, FeatureTest.foreign_values): - DhtNetwork.log('[GET]: Only %s on %s values persisted.' % - (len(FeatureTest.foreign_values), len(local_values))) - else: - DhtNetwork.log('[GET]: All values successfully persisted.') - if FeatureTest.foreign_values: - if new_nodes: - DhtNetwork.log('Values are newly found on:') - for node in new_nodes: - DhtNetwork.log(node) - if self._dump_storage: - DhtNetwork.log('Dumping all storage log from '\ - 'hosting nodes.') - - for proc in self.wb.procs: - proc.sendDumpStorage(FeatureTest.foreign_nodes) - else: - DhtNetwork.log("Values didn't reach new hosting nodes after shutdown.") - - def run(self): - try: - if self._test == 'delete': - self._deleteTest() - elif self._test == 'replace': - self._resplaceClusterTest() - elif self._test == 'mult_time': - self._multTimeTest() - except Exception as e: - print(e) - finally: - bootstrap.resize(1) - - #----------- - #- Tests - - #----------- - - def _deleteTest(self): - """ - It uses Dht shutdown call from the API to gracefuly finish the nodes one - after the other. - """ - FeatureTest.done = 0 - FeatureTest.lock = threading.Condition() - FeatureTest.foreign_nodes = [] - FeatureTest.foreign_values = [] - - bootstrap = self.bootstrap - - ops_count = [] - - bootstrap.resize(3) - consumer = bootstrap.get(1) - producer = bootstrap.get(2) - - myhash = random_hash() - local_values = [Value(b'foo'), Value(b'bar'), Value(b'foobar')] - - self._dhtPut(producer, myhash, *local_values) - - #checking if values were transfered - self._dhtGet(consumer, myhash) - if not FeatureTest.successfullTransfer(local_values, FeatureTest.foreign_values): - if FeatureTest.foreign_values: - DhtNetwork.log('[GET]: Only ', len(FeatureTest.foreign_values) ,' on ', - len(local_values), ' values successfully put.') - else: - DhtNetwork.log('[GET]: 0 values successfully put') - - - if FeatureTest.foreign_values and FeatureTest.foreign_nodes: - DhtNetwork.log('Values are found on :') - for node in FeatureTest.foreign_nodes: - DhtNetwork.log(node) - - DhtNetwork.log("Waiting a minute for the network to settle down.") - time.sleep(60) - - for _ in range(max(1, int(self.wb.node_num/32))): - DhtNetwork.log('Removing all nodes hosting target values...') - cluster_ops_count = 0 - for proc in self.wb.procs: - DhtNetwork.log('[REMOVE]: sending shutdown request to', proc) - proc.sendNodesRequest( - DhtNetworkSubProcess.SHUTDOWN_NODE_REQ, - FeatureTest.foreign_nodes - ) - DhtNetwork.log('sending message stats request') - stats = proc.sendGetMessageStats() - cluster_ops_count += sum(stats[1:]) - #DhtNetwork.log("Waiting 15 seconds for packets to work their way effectively.") - #time.sleep(15) - ops_count.append(cluster_ops_count/self.wb.node_num) - - # checking if values were transfered to new nodes - foreign_nodes_before_delete = FeatureTest.foreign_nodes - DhtNetwork.log('[GET]: trying to fetch persistent values') - self._dhtGet(consumer, myhash) - new_nodes = set(FeatureTest.foreign_nodes) - set(foreign_nodes_before_delete) - - self._result(local_values, new_nodes) - - if self._plot: - plt.plot(ops_count, color='blue') - plt.draw() - plt.ioff() - plt.show() - else: - DhtNetwork.log("[GET]: either couldn't fetch values or nodes hosting values...") - - def _resplaceClusterTest(self): - """ - It replaces all clusters one after the other. - """ - FeatureTest.done = 0 - FeatureTest.lock = threading.Condition() - FeatureTest.foreign_nodes = [] - FeatureTest.foreign_values = [] - - clusters = opts['clusters'] if 'clusters' in opts else 5 - - bootstrap = self.bootstrap - - bootstrap.resize(3) - consumer = bootstrap.get(1) - producer = bootstrap.get(2) - - myhash = random_hash() - local_values = [Value(b'foo'), Value(b'bar'), Value(b'foobar')] - - self._dhtPut(producer, myhash, *local_values) - self._dhtGet(consumer, myhash) - initial_nodes = FeatureTest.foreign_nodes - - DhtNetwork.log('Replacing', clusters, 'random clusters successively...') - for n in range(clusters): - i = random.randint(0, len(self.wb.procs)-1) - proc = self.wb.procs[i] - DhtNetwork.log('Replacing', proc) - proc.sendShutdown() - self.wb.stop_cluster(i) - self.wb.start_cluster(i) - - DhtNetwork.log('[GET]: trying to fetch persistent values') - self._dhtGet(consumer, myhash) - new_nodes = set(FeatureTest.foreign_nodes) - set(initial_nodes) - - self._result(local_values, new_nodes) - - def _multTimeTest(self): - """ - Multiple put() calls are made from multiple nodes to multiple hashes - after what a set of 8 nodes is created around each hashes in order to - enable storage maintenance each nodes. Therefor, this tests will wait 10 - minutes for the nodes to trigger storage maintenance. - """ - FeatureTest.done = 0 - FeatureTest.lock = threading.Condition() - FeatureTest.foreign_nodes = [] - FeatureTest.foreign_values = [] - bootstrap = self.bootstrap - - N_PRODUCERS = 16 - - hashes = [] - values = [Value(b'foo')] - nodes = set([]) - - # prevents garbage collecting of unused flood nodes during the test. - flood_nodes = [] - - def gottaGetThemAllPokeNodes(nodes=None): - nonlocal consumer, hashes - for h in hashes: - self._dhtGet(consumer, h) - if nodes is not None: - for n in FeatureTest.foreign_nodes: - nodes.add(n) - - def createNodesAroundHash(_hash, radius=4): - nonlocal flood_nodes - - _hash_str = _hash.toString().decode() - _hash_int = int(_hash_str, 16) - for i in range(-radius, radius+1): - _hash_str = '{:40x}'.format(_hash_int + i) - config = DhtConfig() - config.setNodeId(InfoHash(_hash_str.encode())) - n = DhtRunner() - n.run(config=config) - n.bootstrap(self.bootstrap.ip4, - str(self.bootstrap.port)) - flood_nodes.append(n) - - bootstrap.resize(N_PRODUCERS+2) - consumer = bootstrap.get(1) - producers = (bootstrap.get(n) for n in range(2,N_PRODUCERS+2)) - for p in producers: - hashes.append(random_hash()) - self._dhtPut(p, hashes[-1], *values) - - gottaGetThemAllPokeNodes(nodes=nodes) - - DhtNetwork.log("Values are found on:") - for n in nodes: - DhtNetwork.log(n) - - DhtNetwork.log("Creating 8 nodes around all of these nodes...") - for _hash in hashes: - createNodesAroundHash(_hash) - - DhtNetwork.log('Waiting 10 minutes for normal storage maintenance.') - time.sleep(10*60) - - DhtNetwork.log('Deleting old nodes from previous search.') - for proc in self.wb.procs: - DhtNetwork.log('[REMOVE]: sending shutdown request to', proc) - proc.sendNodesRequest( - DhtNetworkSubProcess.REMOVE_NODE_REQ, - nodes - ) - - # new consumer (fresh cache) - bootstrap.resize(N_PRODUCERS+3) - consumer = bootstrap.get(N_PRODUCERS+2) - - nodes_after_time = set([]) - gottaGetThemAllPokeNodes(nodes=nodes_after_time) - self._result(values, nodes_after_time - nodes) - -class PerformanceTest(FeatureTest): - """ - Tests for general performance of dht operations. - """ - - def __init__(self, test, workbench, *opts): - """ - @param test: is one of the following: - - 'gets': multiple get operations and statistical results. - - 'delete': perform multiple put() operations followed - by targeted deletion of nodes hosting the values. Doing - so until half of the nodes on the network remain. - @type test: string - """ - super(PerformanceTest, self).__init__(test, workbench) - - def run(self): - try: - if self._test == 'gets': - self._getsTimesTest() - elif self._test == 'delete': - self._delete() - except Exception as e: - print(e) - finally: - self.bootstrap.resize(1) - - - ########### - # Tests # - ########### - - def _getsTimesTest(self): - """ - Tests for performance of the DHT doing multiple get() operation. - """ - bootstrap = self.bootstrap - - plt.ion() - - fig, axes = plt.subplots(2, 1) - fig.tight_layout() - - lax = axes[0] - hax = axes[1] - - lines = None#ax.plot([]) - #plt.ylabel('time (s)') - hax.set_ylim(0, 2) - - # let the network stabilise - plt.pause(60) - - #start = time.time() - times = [] - - lock = threading.Condition() - done = 0 - - def getcb(v): - nonlocal bootstrap - DhtNetwork.log("found", v) - return True - - def donecb(ok, nodes): - nonlocal bootstrap, lock, done, times - t = time.time()-start - with lock: - if not ok: - DhtNetwork.log("failed !") - times.append(t) - done -= 1 - lock.notify() - - def update_plot(): - nonlocal lines - while lines: - l = lines.pop() - l.remove() - del l - lines = plt.plot(times, color='blue') - plt.draw() - - def run_get(): - nonlocal done - done += 1 - start = time.time() - bootstrap.front().get(InfoHash.getRandom(), getcb, lambda ok, nodes: donecb(ok, nodes, start)) - - plt.pause(5) - - plt.show() - update_plot() - - times = [] - for n in range(10): - self.wb.replace_cluster() - plt.pause(2) - DhtNetwork.log("Getting 50 random hashes succesively.") - for i in range(50): - with lock: - done += 1 - start = time.time() - bootstrap.front().get(InfoHash.getRandom(), getcb, donecb) - while done > 0: - lock.wait() - update_plot() - update_plot() - print("Took", np.sum(times), "mean", np.mean(times), "std", np.std(times), "min", np.min(times), "max", np.max(times)) - - print('GET calls timings benchmark test : DONE. ' \ - 'Close Matplotlib window for terminating the program.') - plt.ioff() - plt.show() - - def _delete(self): - """ - Tests for performance of get() and put() operations on the network while - deleting around the target hash. - """ - - FeatureTest.done = 0 - FeatureTest.lock = threading.Condition() - FeatureTest.foreign_nodes = [] - FeatureTest.foreign_values = [] - - bootstrap = self.bootstrap - - bootstrap.resize(3) - consumer = bootstrap.get(1) - producer = bootstrap.get(2) - - myhash = random_hash() - local_values = [Value(b'foo'), Value(b'bar'), Value(b'foobar')] - - for _ in range(max(1, int(self.wb.node_num/32))): - self._dhtGet(consumer, myhash) - DhtNetwork.log("Waiting 15 seconds...") - time.sleep(15) - - self._dhtPut(producer, myhash, *local_values) - - #checking if values were transfered - self._dhtGet(consumer, myhash) - DhtNetwork.log('Values are found on :') - for node in FeatureTest.foreign_nodes: - DhtNetwork.log(node) - - if not FeatureTest.successfullTransfer(local_values, FeatureTest.foreign_values): - if FeatureTest.foreign_values: - DhtNetwork.log('[GET]: Only ', len(FeatureTest.foreign_values) ,' on ', - len(local_values), ' values successfully put.') - else: - DhtNetwork.log('[GET]: 0 values successfully put') - - DhtNetwork.log('Removing all nodes hosting target values...') - for proc in self.wb.procs: - DhtNetwork.log('[REMOVE]: sending shutdown request to', proc) - proc.sendNodesRequest( - DhtNetworkSubProcess.SHUTDOWN_NODE_REQ, - FeatureTest.foreign_nodes - ) - if __name__ == '__main__': parser = argparse.ArgumentParser(description='Run, test and benchmark a '\ diff --git a/python/tools/dht/__init__.py b/python/tools/dht/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/python/tools/dhtnetwork.py b/python/tools/dht/network.py similarity index 58% rename from python/tools/dhtnetwork.py rename to python/tools/dht/network.py index c3dda601..46dafab2 100755 --- a/python/tools/dhtnetwork.py +++ b/python/tools/dht/network.py @@ -16,20 +16,239 @@ # You should have received a copy of the GNU General Public License # along with this program; If not, see <http://www.gnu.org/licenses/>. +import os import sys import signal import random import time import threading import queue +import re import ipaddress import netifaces - import numpy as np +from pyroute2.netns.process.proxy import NSPopen from opendht import * + +# TODO: find where token "notifyend" gets printed... +class DhtNetworkSubProcess(NSPopen): + """ + Handles communication with DhtNetwork sub process. + + When instanciated, the object's thread is started and will read the sub + process' stdout until it finds 'DhtNetworkSubProcess.NOTIFY_TOKEN' token, + therefor, waits for the sub process to spawn. + """ + # requests + REMOVE_NODE_REQ = b"rn" + SHUTDOWN_NODE_REQ = b"sdn" + SHUTDOWN_REPLACE_NODE_REQ = b'sdrn' + SHUTDOWN_CLUSTER_REQ = b"sdc" + DUMP_STORAGE_REQ = b"strl" + MESSAGE_STATS = b"gms" + + + # tokens + NOTIFY_TOKEN = 'notify' + NOTIFY_END_TOKEN = 'notifyend' + + def __init__(self, ns, cmd, quit=False, **kwargs): + super(DhtNetworkSubProcess, self).__init__(ns, cmd, **kwargs) + self._setStdoutFlags() + self._virtual_ns = ns + + self._quit = quit + self._lock = threading.Condition() + self._in_queue = queue.Queue() + self._out_queue = queue.Queue() + + # starting thread + self._thread = threading.Thread(target=self._communicate) + self._thread.daemon = True + self._thread.start() + + def __repr__(self): + return 'DhtNetwork on virtual namespace "%s"' % self._virtual_ns + + def _setStdoutFlags(self): + """ + Sets non-blocking read flags for subprocess stdout file descriptor. + """ + import fcntl + flags = self.stdout.fcntl(fcntl.F_GETFL) + self.stdout.fcntl(fcntl.F_SETFL, flags | os.O_NDELAY) + + def _communicate(self): + """ + Communication thread. This reads and writes to the sub process. + """ + ENCODING = 'utf-8' + sleep_time = 0.1 + stdin_line, stdout_line = '', '' + + # first read of process living. Expecting NOTIFY_TOKEN + while DhtNetworkSubProcess.NOTIFY_TOKEN not in stdout_line: + stdout_line = self.stdout.readline().decode() + time.sleep(sleep_time) + + with self._lock: + self._out_queue.put(stdout_line) + + while not self._quit: + with self._lock: + try: + stdin_line = self._in_queue.get_nowait() + + # sending data to sub process + self.stdin.write(stdin_line if isinstance(stdin_line, bytes) else + bytes(str(stdin_line), encoding=ENCODING)) + self.stdin.flush() + except queue.Empty: + #waiting for next stdin req to send + self._lock.wait(timeout=sleep_time) + + # reading response from sub process + for stdout_line in iter(self.stdout.readline, b''): + stdout_line = stdout_line.decode().replace('\n', '') + if stdout_line: + with self._lock: + self._out_queue.put(stdout_line) + + with self._lock: + self._lock.notify() + + def stop_communicating(self): + """ + Stops the I/O thread from communicating with the subprocess. + """ + if not self._quit: + self._quit = True + with self._lock: + self._lock.notify() + self._lock.wait() + + def quit(self): + """ + Notifies thread and sub process to terminate. This is blocking call + until the sub process finishes. + """ + self.stop_communicating() + self.send_signal(signal.SIGINT); + self.wait() + self.release() + + def send(self, msg): + """ + Send data to sub process. + """ + with self._lock: + self._in_queue.put(msg) + self._lock.notify() + + def getline(self): + """ + Read line from sub process. + + @return: A line on sub process' stdout. + @rtype : str + """ + line = '' + with self._lock: + try: + line = self._out_queue.get_nowait() + except queue.Empty: + pass + return line + + def getlinesUntilNotify(self, answer_cb=None): + """ + Reads the stdout queue until a proper notification is given by the sub + process. + + @param answer_cb: Callback to call when an answer is given after notify. + The function takes a list of lines as argument. + @type answer_cb: function + """ + notified = False + answer = [] + while True: + out = self.getline() + if out.split(' ')[0] == DhtNetworkSubProcess.NOTIFY_TOKEN: + notified = True + elif notified and out.split(' ')[0] == DhtNetworkSubProcess.NOTIFY_END_TOKEN: + if answer_cb: + answer_cb(answer) + break + elif notified: + answer.append(out) + elif out: + yield out + else: + time.sleep(0.1) + + def sendGetMessageStats(self): + """ + Sends DhtNetwork sub process statistics request about nodes messages + sent. + + @return: A list [num_nodes, ping, find, get, put, listen]. + @rtype : list + """ + stats = [] + def cb(answer): + """ + Callback fed to getlinesUntilNotify made to recover answer from the + DhtNetwork sub process. + + :answer: the list of lines answered by the sub process. + """ + nonlocal stats + if answer: + stats = [int(v) for v in re.findall("[0-9]+", answer.pop())] + + self.send(DhtNetworkSubProcess.MESSAGE_STATS + b'\n') + for line in self.getlinesUntilNotify(answer_cb=cb): + DhtNetwork.log(line) + + return stats + + def sendNodesRequest(self, request, ids): + """ + Shutsdown nodes on the DhtNetwork sub process. + + @param request: The request + @type request: bytes + @param ids: ids of nodes concerned by the request. + @type ids: list + """ + serialized_req = request + b' ' + b' '.join(map(bytes, ids)) + self.send(serialized_req + b'\n') + for line in self.getlinesUntilNotify(): + DhtNetwork.log(line) + + def sendShutdown(self): + """ + Shutdown the whole cluster. This does not terminate comunicating thread; + use quit(). + """ + self.send(DhtNetworkSubProcess.SHUTDOWN_CLUSTER_REQ + b'\n') + for line in self.getlinesUntilNotify(): + DhtNetwork.log(line) + + def sendDumpStorage(self, ids): + """ + Dumps storage log from nodes with id in `ids`. + """ + serialized_req = DhtNetworkSubProcess.DUMP_STORAGE_REQ + b' ' + \ + b' '.join(map(bytes, ids)) + self.send(serialized_req + b'\n') + for line in self.getlinesUntilNotify(): + DhtNetwork.log(line) + + class DhtNetwork(object): nodes = [] @@ -94,10 +313,10 @@ class DhtNetwork(object): def end_node(self, id=None, shutdown=False, last_msg_stats=None): """ Ends a running node. - + @param id: The 40 hex chars id of the node. @type id: bytes - + @return: If a node was deleted or not. @rtype : boolean """ @@ -159,6 +378,7 @@ class DhtNetwork(object): stats_list.extend(stats.tolist()) return stats_list + if __name__ == '__main__': import argparse diff --git a/python/tools/dht/tests.py b/python/tools/dht/tests.py new file mode 100644 index 00000000..b94c3794 --- /dev/null +++ b/python/tools/dht/tests.py @@ -0,0 +1,505 @@ +# Copyright (C) 2015 Savoir-Faire Linux Inc. +# Author(s): Adrien Béraud <adrien.beraud@savoirfairelinux.com> +# Simon Désaulniers <sim.desaulniers@gmail.com> + +import threading +import random +import string +import time + +from opendht import * +from dht.network import DhtNetwork, DhtNetworkSubProcess + + +def random_hash(): + return InfoHash(''.join(random.SystemRandom().choice(string.hexdigits) for _ in range(40)).encode()) + + +class FeatureTest(object): + """ + This is base test. A method run() implementation is required. + """ + #static variables used by class callbacks + successfullTransfer = lambda lv,fv: len(lv) == len(fv) + done = 0 + lock = None + foreign_nodes = None + foreign_values = None + + def __init__(self, test, workbench): + self._test = test + self.wb = workbench + self.bootstrap = self.wb.get_bootstrap() + + @staticmethod + def getcb(value): + DhtNetwork.log('[GET]: %s' % value) + FeatureTest.foreign_values.append(value) + return True + + @staticmethod + def putDoneCb(ok, nodes): + if not ok: + DhtNetwork.log("[PUT]: failed!") + with FeatureTest.lock: + FeatureTest.done -= 1 + FeatureTest.lock.notify() + + @staticmethod + def getDoneCb(ok, nodes): + with FeatureTest.lock: + if not ok: + DhtNetwork.log("[GET]: failed!") + else: + for node in nodes: + if not node.getNode().isExpired(): + FeatureTest.foreign_nodes.append(node.getId().toString()) + FeatureTest.done -= 1 + FeatureTest.lock.notify() + + def _dhtPut(self, producer, _hash, *values): + for val in values: + with FeatureTest.lock: + DhtNetwork.log('[PUT]: %s' % val) + FeatureTest.done += 1 + producer.put(_hash, val, FeatureTest.putDoneCb) + while FeatureTest.done > 0: + FeatureTest.lock.wait() + + def _dhtGet(self, consumer, _hash): + FeatureTest.foreign_values = [] + FeatureTest.foreign_nodes = [] + with FeatureTest.lock: + FeatureTest.done += 1 + consumer.get(_hash, FeatureTest.getcb, FeatureTest.getDoneCb) + while FeatureTest.done > 0: + FeatureTest.lock.wait() + + + def run(self): + raise NotImplementedError('This method must be implemented.') + + +class PersistenceTest(FeatureTest): + """ + This tests persistence of data on the network. + """ + + def __init__(self, test, workbench, *opts): + """ + @param test: is one of the following: + - 'mult_time': test persistence of data based on internal + OpenDHT storage maintenance timings. + - 'delete': test persistence of data upon deletion of + nodes. + - 'replace': replacing cluster successively. + @type test: string + + + OPTIONS + + - dump_str_log: enables storage log at test ending. + """ + + # opts + super(PersistenceTest, self).__init__(test, workbench) + self._dump_storage = True if 'dump_str_log' in opts else False + self._plot = True if 'plot' in opts else False + + def _result(self, local_values, new_nodes): + bootstrap = self.bootstrap + if not FeatureTest.successfullTransfer(local_values, FeatureTest.foreign_values): + DhtNetwork.log('[GET]: Only %s on %s values persisted.' % + (len(FeatureTest.foreign_values), len(local_values))) + else: + DhtNetwork.log('[GET]: All values successfully persisted.') + if FeatureTest.foreign_values: + if new_nodes: + DhtNetwork.log('Values are newly found on:') + for node in new_nodes: + DhtNetwork.log(node) + if self._dump_storage: + DhtNetwork.log('Dumping all storage log from '\ + 'hosting nodes.') + + for proc in self.wb.procs: + proc.sendDumpStorage(FeatureTest.foreign_nodes) + else: + DhtNetwork.log("Values didn't reach new hosting nodes after shutdown.") + + def run(self): + try: + if self._test == 'delete': + self._deleteTest() + elif self._test == 'replace': + self._replaceClusterTest() + elif self._test == 'mult_time': + self._multTimeTest() + except Exception as e: + print(e) + finally: + self.bootstrap.resize(1) + + #----------- + #- Tests - + #----------- + + def _deleteTest(self): + """ + It uses Dht shutdown call from the API to gracefuly finish the nodes one + after the other. + """ + FeatureTest.done = 0 + FeatureTest.lock = threading.Condition() + FeatureTest.foreign_nodes = [] + FeatureTest.foreign_values = [] + + bootstrap = self.bootstrap + + ops_count = [] + + bootstrap.resize(3) + consumer = bootstrap.get(1) + producer = bootstrap.get(2) + + myhash = random_hash() + local_values = [Value(b'foo'), Value(b'bar'), Value(b'foobar')] + + self._dhtPut(producer, myhash, *local_values) + + #checking if values were transfered + self._dhtGet(consumer, myhash) + if not FeatureTest.successfullTransfer(local_values, FeatureTest.foreign_values): + if FeatureTest.foreign_values: + DhtNetwork.log('[GET]: Only ', len(FeatureTest.foreign_values) ,' on ', + len(local_values), ' values successfully put.') + else: + DhtNetwork.log('[GET]: 0 values successfully put') + + + if FeatureTest.foreign_values and FeatureTest.foreign_nodes: + DhtNetwork.log('Values are found on :') + for node in FeatureTest.foreign_nodes: + DhtNetwork.log(node) + + #DhtNetwork.log("Waiting a minute for the network to settle down.") + #time.sleep(60) + + for _ in range(max(1, int(self.wb.node_num/32))): + DhtNetwork.log('Removing all nodes hosting target values...') + cluster_ops_count = 0 + for proc in self.wb.procs: + DhtNetwork.log('[REMOVE]: sending shutdown request to', proc) + proc.sendNodesRequest( + DhtNetworkSubProcess.SHUTDOWN_NODE_REQ, + FeatureTest.foreign_nodes + ) + DhtNetwork.log('sending message stats request') + stats = proc.sendGetMessageStats() + cluster_ops_count += sum(stats[1:]) + DhtNetwork.log("3 seconds wait...") + time.sleep(3) + ops_count.append(cluster_ops_count/self.wb.node_num) + + # checking if values were transfered to new nodes + foreign_nodes_before_delete = FeatureTest.foreign_nodes + DhtNetwork.log('[GET]: trying to fetch persistent values') + self._dhtGet(consumer, myhash) + new_nodes = set(FeatureTest.foreign_nodes) - set(foreign_nodes_before_delete) + + self._result(local_values, new_nodes) + + if self._plot: + plt.plot(ops_count, color='blue') + plt.draw() + plt.ioff() + plt.show() + else: + DhtNetwork.log("[GET]: either couldn't fetch values or nodes hosting values...") + + #TODO: complete this test. + def _replaceClusterTest(self): + """ + It replaces all clusters one after the other. + """ + FeatureTest.done = 0 + FeatureTest.lock = threading.Condition() + FeatureTest.foreign_nodes = [] + FeatureTest.foreign_values = [] + + #clusters = opts['clusters'] if 'clusters' in opts else 5 + clusters = 5 + + bootstrap = self.bootstrap + + bootstrap.resize(3) + consumer = bootstrap.get(1) + producer = bootstrap.get(2) + + myhash = random_hash() + local_values = [Value(b'foo'), Value(b'bar'), Value(b'foobar')] + + self._dhtPut(producer, myhash, *local_values) + self._dhtGet(consumer, myhash) + initial_nodes = FeatureTest.foreign_nodes + + DhtNetwork.log('Replacing', clusters, 'random clusters successively...') + for n in range(clusters): + i = random.randint(0, len(self.wb.procs)-1) + proc = self.wb.procs[i] + DhtNetwork.log('Replacing', proc) + proc.sendShutdown() + self.wb.stop_cluster(i) + self.wb.start_cluster(i) + + DhtNetwork.log('[GET]: trying to fetch persistent values') + self._dhtGet(consumer, myhash) + new_nodes = set(FeatureTest.foreign_nodes) - set(initial_nodes) + + self._result(local_values, new_nodes) + + #TODO: complete this test. + def _multTimeTest(self): + """ + Multiple put() calls are made from multiple nodes to multiple hashes + after what a set of 8 nodes is created around each hashes in order to + enable storage maintenance each nodes. Therefor, this tests will wait 10 + minutes for the nodes to trigger storage maintenance. + """ + FeatureTest.done = 0 + FeatureTest.lock = threading.Condition() + FeatureTest.foreign_nodes = [] + FeatureTest.foreign_values = [] + bootstrap = self.bootstrap + + N_PRODUCERS = 16 + + hashes = [] + values = [Value(b'foo')] + nodes = set([]) + + # prevents garbage collecting of unused flood nodes during the test. + flood_nodes = [] + + def gottaGetThemAllPokeNodes(nodes=None): + nonlocal consumer, hashes + for h in hashes: + self._dhtGet(consumer, h) + if nodes is not None: + for n in FeatureTest.foreign_nodes: + nodes.add(n) + + def createNodesAroundHash(_hash, radius=4): + nonlocal flood_nodes + + _hash_str = _hash.toString().decode() + _hash_int = int(_hash_str, 16) + for i in range(-radius, radius+1): + _hash_str = '{:40x}'.format(_hash_int + i) + config = DhtConfig() + config.setNodeId(InfoHash(_hash_str.encode())) + n = DhtRunner() + n.run(config=config) + n.bootstrap(self.bootstrap.ip4, + str(self.bootstrap.port)) + flood_nodes.append(n) + + bootstrap.resize(N_PRODUCERS+2) + consumer = bootstrap.get(1) + producers = (bootstrap.get(n) for n in range(2,N_PRODUCERS+2)) + for p in producers: + hashes.append(random_hash()) + self._dhtPut(p, hashes[-1], *values) + + gottaGetThemAllPokeNodes(nodes=nodes) + + DhtNetwork.log("Values are found on:") + for n in nodes: + DhtNetwork.log(n) + + DhtNetwork.log("Creating 8 nodes around all of these nodes...") + for _hash in hashes: + createNodesAroundHash(_hash) + + DhtNetwork.log('Waiting 10 minutes for normal storage maintenance.') + time.sleep(10*60) + + DhtNetwork.log('Deleting old nodes from previous search.') + for proc in self.wb.procs: + DhtNetwork.log('[REMOVE]: sending shutdown request to', proc) + proc.sendNodesRequest( + DhtNetworkSubProcess.REMOVE_NODE_REQ, + nodes + ) + + # new consumer (fresh cache) + bootstrap.resize(N_PRODUCERS+3) + consumer = bootstrap.get(N_PRODUCERS+2) + + nodes_after_time = set([]) + gottaGetThemAllPokeNodes(nodes=nodes_after_time) + self._result(values, nodes_after_time - nodes) + + +class PerformanceTest(FeatureTest): + """ + Tests for general performance of dht operations. + """ + + def __init__(self, test, workbench, *opts): + """ + @param test: is one of the following: + - 'gets': multiple get operations and statistical results. + - 'delete': perform multiple put() operations followed + by targeted deletion of nodes hosting the values. Doing + so until half of the nodes on the network remain. + @type test: string + """ + super(PerformanceTest, self).__init__(test, workbench) + + def run(self): + try: + if self._test == 'gets': + self._getsTimesTest() + elif self._test == 'delete': + self._delete() + except Exception as e: + print(e) + finally: + self.bootstrap.resize(1) + + + ########### + # Tests # + ########### + + def _getsTimesTest(self): + """ + Tests for performance of the DHT doing multiple get() operation. + """ + bootstrap = self.bootstrap + + plt.ion() + + fig, axes = plt.subplots(2, 1) + fig.tight_layout() + + lax = axes[0] + hax = axes[1] + + lines = None#ax.plot([]) + #plt.ylabel('time (s)') + hax.set_ylim(0, 2) + + # let the network stabilise + plt.pause(60) + + #start = time.time() + times = [] + + lock = threading.Condition() + done = 0 + + def getcb(v): + nonlocal bootstrap + DhtNetwork.log("found", v) + return True + + def donecb(ok, nodes): + nonlocal bootstrap, lock, done, times + t = time.time()-start + with lock: + if not ok: + DhtNetwork.log("failed !") + times.append(t) + done -= 1 + lock.notify() + + def update_plot(): + nonlocal lines + while lines: + l = lines.pop() + l.remove() + del l + lines = plt.plot(times, color='blue') + plt.draw() + + def run_get(): + nonlocal done + done += 1 + start = time.time() + bootstrap.front().get(InfoHash.getRandom(), getcb, lambda ok, nodes: donecb(ok, nodes, start)) + + plt.pause(5) + + plt.show() + update_plot() + + times = [] + for n in range(10): + self.wb.replace_cluster() + plt.pause(2) + DhtNetwork.log("Getting 50 random hashes succesively.") + for i in range(50): + with lock: + done += 1 + start = time.time() + bootstrap.front().get(InfoHash.getRandom(), getcb, donecb) + while done > 0: + lock.wait() + update_plot() + update_plot() + print("Took", np.sum(times), "mean", np.mean(times), "std", np.std(times), "min", np.min(times), "max", np.max(times)) + + print('GET calls timings benchmark test : DONE. ' \ + 'Close Matplotlib window for terminating the program.') + plt.ioff() + plt.show() + + def _delete(self): + """ + Tests for performance of get() and put() operations on the network while + deleting around the target hash. + """ + + FeatureTest.done = 0 + FeatureTest.lock = threading.Condition() + FeatureTest.foreign_nodes = [] + FeatureTest.foreign_values = [] + + bootstrap = self.bootstrap + + bootstrap.resize(3) + consumer = bootstrap.get(1) + producer = bootstrap.get(2) + + myhash = random_hash() + local_values = [Value(b'foo'), Value(b'bar'), Value(b'foobar')] + + for _ in range(max(1, int(self.wb.node_num/32))): + self._dhtGet(consumer, myhash) + DhtNetwork.log("Waiting 15 seconds...") + time.sleep(15) + + self._dhtPut(producer, myhash, *local_values) + + #checking if values were transfered + self._dhtGet(consumer, myhash) + DhtNetwork.log('Values are found on :') + for node in FeatureTest.foreign_nodes: + DhtNetwork.log(node) + + if not FeatureTest.successfullTransfer(local_values, FeatureTest.foreign_values): + if FeatureTest.foreign_values: + DhtNetwork.log('[GET]: Only ', len(FeatureTest.foreign_values) ,' on ', + len(local_values), ' values successfully put.') + else: + DhtNetwork.log('[GET]: 0 values successfully put') + + DhtNetwork.log('Removing all nodes hosting target values...') + for proc in self.wb.procs: + DhtNetwork.log('[REMOVE]: sending shutdown request to', proc) + proc.sendNodesRequest( + DhtNetworkSubProcess.SHUTDOWN_NODE_REQ, + FeatureTest.foreign_nodes + ) diff --git a/python/tools/dht/virtual_network_builder.py b/python/tools/dht/virtual_network_builder.py new file mode 100755 index 00000000..a48fd3eb --- /dev/null +++ b/python/tools/dht/virtual_network_builder.py @@ -0,0 +1,120 @@ +#!/usr/bin/env python3 +# Copyright (c) 2015-2016 Savoir-faire Linux Inc. +# Author: Adrien Béraud <adrien.beraud@savoirfairelinux.com> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; If not, see <http://www.gnu.org/licenses/>. + +import argparse, subprocess + +from pyroute2 import IPDB, NetNS +from pyroute2.netns.process.proxy import NSPopen + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description='Creates a virtual network topology for testing') + parser.add_argument('-i', '--ifname', help='interface name', default='ethdht') + parser.add_argument('-n', '--ifnum', type=int, help='number of isolated interfaces to create', default=1) + parser.add_argument('-r', '--remove', help='remove instead of adding network interfaces', action="store_true") + parser.add_argument('-l', '--loss', help='simulated packet loss (percent)', type=int, default=0) + parser.add_argument('-d', '--delay', help='simulated latency (ms)', type=int, default=0) + parser.add_argument('-4', '--ipv4', help='Enable IPv4', action="store_true") + parser.add_argument('-6', '--ipv6', help='Enable IPv6', action="store_true") + + args = parser.parse_args() + + local_addr4 = '10.0.42.' + local_addr6 = '2001:db9::' + brige_name = 'br'+args.ifname + + ip = None + try: + ip = IPDB() + if args.remove: + # cleanup interfaces + for ifn in range(args.ifnum): + iface = args.ifname+str(ifn) + if iface in ip.interfaces: + with ip.interfaces[iface] as i: + i.remove() + if 'tap'+args.ifname in ip.interfaces: + with ip.interfaces['tap'+args.ifname] as i: + i.remove() + if brige_name in ip.interfaces: + with ip.interfaces[brige_name] as i: + i.remove() + for ifn in range(args.ifnum): + netns = NetNS('node'+str(ifn)) + netns.close() + netns.remove() + else: + for ifn in range(args.ifnum): + iface = args.ifname+str(ifn) + if not iface in ip.interfaces: + ip.create(kind='veth', ifname=iface, peer=iface+'.1').commit() + + ip.create(kind='tuntap', ifname='tap'+args.ifname, mode='tap').commit() + + with ip.create(kind='bridge', ifname=brige_name) as i: + for ifn in range(args.ifnum): + iface = args.ifname+str(ifn) + i.add_port(ip.interfaces[iface]) + i.add_port(ip.interfaces['tap'+args.ifname]) + if args.ipv4: + i.add_ip(local_addr4+'1/24') + if args.ipv6: + i.add_ip(local_addr6+'1/64') + i.up() + + with ip.interfaces['tap'+args.ifname] as tap: + tap.up() + + for ifn in range(args.ifnum): + iface = args.ifname+str(ifn) + + nns = NetNS('node'+str(ifn)) + iface1 = iface+'.1' + with ip.interfaces[iface1] as i: + i.net_ns_fd = nns.netns + + with ip.interfaces[iface] as i: + i.up() + + ip_ns = IPDB(nl=nns) + try: + with ip_ns.interfaces.lo as lo: + lo.up() + with ip_ns.interfaces[iface1] as i: + if args.ipv4: + i.add_ip(local_addr4+str(ifn+8)+'/24') + if args.ipv6: + i.add_ip(local_addr6+str(ifn+8)+'/64') + i.up() + finally: + ip_ns.release() + + nsp = NSPopen(nns.netns, ["tc", "qdisc", "add", "dev", iface1, "root", "netem", "delay", str(args.delay)+"ms", str(int(args.delay/2))+"ms", "loss", str(args.loss)+"%", "25%"], stdout=subprocess.PIPE) + #print(nsp.communicate()[0].decode()) + nsp.communicate() + nsp.wait() + nsp.release() + + if args.ipv4: + subprocess.call(["sysctl", "-w", "net.ipv4.conf."+brige_name+".forwarding=1"]) + if args.ipv6: + subprocess.call(["sysctl", "-w", "net.ipv6.conf."+brige_name+".forwarding=1"]) + + except Exception as e: + print('Error',e) + finally: + if ip: + ip.release() diff --git a/python/tools/virtual_network_builder.py b/python/tools/virtual_network_builder.py deleted file mode 100755 index 682d2351..00000000 --- a/python/tools/virtual_network_builder.py +++ /dev/null @@ -1,119 +0,0 @@ -#!/usr/bin/env python3 -# Copyright (c) 2015-2016 Savoir-faire Linux Inc. -# Author: Adrien Béraud <adrien.beraud@savoirfairelinux.com> -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; If not, see <http://www.gnu.org/licenses/>. - -import argparse, subprocess - -from pyroute2 import IPDB, NetNS -from pyroute2.netns.process.proxy import NSPopen - -parser = argparse.ArgumentParser(description='Creates a virtual network topology for testing') -parser.add_argument('-i', '--ifname', help='interface name', default='ethdht') -parser.add_argument('-n', '--ifnum', type=int, help='number of isolated interfaces to create', default=1) -parser.add_argument('-r', '--remove', help='remove instead of adding network interfaces', action="store_true") -parser.add_argument('-l', '--loss', help='simulated packet loss (percent)', type=int, default=0) -parser.add_argument('-d', '--delay', help='simulated latency (ms)', type=int, default=0) -parser.add_argument('-4', '--ipv4', help='Enable IPv4', action="store_true") -parser.add_argument('-6', '--ipv6', help='Enable IPv6', action="store_true") - -args = parser.parse_args() - -local_addr4 = '10.0.42.' -local_addr6 = '2001:db9::' -brige_name = 'br'+args.ifname - -ip = None -try: - ip = IPDB() - if args.remove: - # cleanup interfaces - for ifn in range(args.ifnum): - iface = args.ifname+str(ifn) - if iface in ip.interfaces: - with ip.interfaces[iface] as i: - i.remove() - if 'tap'+args.ifname in ip.interfaces: - with ip.interfaces['tap'+args.ifname] as i: - i.remove() - if brige_name in ip.interfaces: - with ip.interfaces[brige_name] as i: - i.remove() - for ifn in range(args.ifnum): - netns = NetNS('node'+str(ifn)) - netns.close() - netns.remove() - else: - for ifn in range(args.ifnum): - iface = args.ifname+str(ifn) - if not iface in ip.interfaces: - ip.create(kind='veth', ifname=iface, peer=iface+'.1').commit() - - ip.create(kind='tuntap', ifname='tap'+args.ifname, mode='tap').commit() - - with ip.create(kind='bridge', ifname=brige_name) as i: - for ifn in range(args.ifnum): - iface = args.ifname+str(ifn) - i.add_port(ip.interfaces[iface]) - i.add_port(ip.interfaces['tap'+args.ifname]) - if args.ipv4: - i.add_ip(local_addr4+'1/24') - if args.ipv6: - i.add_ip(local_addr6+'1/64') - i.up() - - with ip.interfaces['tap'+args.ifname] as tap: - tap.up() - - for ifn in range(args.ifnum): - iface = args.ifname+str(ifn) - - nns = NetNS('node'+str(ifn)) - iface1 = iface+'.1' - with ip.interfaces[iface1] as i: - i.net_ns_fd = nns.netns - - with ip.interfaces[iface] as i: - i.up() - - ip_ns = IPDB(nl=nns) - try: - with ip_ns.interfaces.lo as lo: - lo.up() - with ip_ns.interfaces[iface1] as i: - if args.ipv4: - i.add_ip(local_addr4+str(ifn+8)+'/24') - if args.ipv6: - i.add_ip(local_addr6+str(ifn+8)+'/64') - i.up() - finally: - ip_ns.release() - - nsp = NSPopen(nns.netns, ["tc", "qdisc", "add", "dev", iface1, "root", "netem", "delay", str(args.delay)+"ms", str(int(args.delay/2))+"ms", "loss", str(args.loss)+"%", "25%"], stdout=subprocess.PIPE) - #print(nsp.communicate()[0].decode()) - nsp.communicate() - nsp.wait() - nsp.release() - - if args.ipv4: - subprocess.call(["sysctl", "-w", "net.ipv4.conf."+brige_name+".forwarding=1"]) - if args.ipv6: - subprocess.call(["sysctl", "-w", "net.ipv6.conf."+brige_name+".forwarding=1"]) - -except Exception as e: - print('Error',e) -finally: - if ip: - ip.release() -- GitLab