diff --git a/python/tools/benchmark.py b/python/tools/benchmark.py index eb1064eb32b9fe7a436e325e50c05a379ff74f2e..80fd567620b7e2332f8616e2f37d8e4aac4f1904 100755 --- a/python/tools/benchmark.py +++ b/python/tools/benchmark.py @@ -66,7 +66,11 @@ class WorkBench(): def create_virtual_net(self): if self.virtual_locs > 1: - cmd = ["python3", os.path.abspath(virtual_network_builder.__file__), "-i", self.ifname, "-n", str(self.clusters), '-l', str(self.loss), '-d', str(self.delay)] + cmd = ["python3", os.path.abspath(virtual_network_builder.__file__), + "-i", self.ifname, + "-n", str(self.clusters), + '-l', str(self.loss), + '-d', str(self.delay)] if not self.disable_ipv4: cmd.append('-4') if not self.disable_ipv6: @@ -90,10 +94,17 @@ class WorkBench(): cmd.extend(['-b', self.local_bootstrap.ip4]) if not self.disable_ipv6 and self.local_bootstrap.ip6: cmd.extend(['-b6', self.local_bootstrap.ip6]) - self.procs[i] = DhtNetworkSubProcess('node'+str(i), cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE) - while DhtNetworkSubProcess.NOTIFY_TOKEN not in self.procs[i].getline(): - # waiting for process to spawn - time.sleep(0.5) + lock = threading.Condition() + def dcb(success): + nonlocal lock + if not success: + DhtNetwork.Log.err("Failed to initialize network...") + with lock: + lock.notify() + with lock: + self.procs[i] = DhtNetworkSubProcess('node'+str(i), cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE) + self.procs[i].sendPing(done_cb=dcb) + lock.wait() else: raise Exception('First create bootstrap.') diff --git a/python/tools/dht/network.py b/python/tools/dht/network.py index fd340f35f1ce2079e76266e6b8042c9cb470d714..cb379b6bd1eaf0277f08330444f7ca4edb8a46a2 100755 --- a/python/tools/dht/network.py +++ b/python/tools/dht/network.py @@ -25,11 +25,13 @@ import time import threading import queue import re +import traceback import ipaddress import netifaces import numpy as np from pyroute2.netns.process.proxy import NSPopen +import msgpack from opendht import * @@ -37,8 +39,6 @@ from opendht import * # useful functions b_space_join = lambda *l: b' '.join(map(bytes, l)) -# TODO: find where token "notifyend" gets printed... Or switch to MSGPACK for -# serialisation of packets between both processes. class DhtNetworkSubProcess(NSPopen): """ Handles communication with DhtNetwork sub process. @@ -47,20 +47,21 @@ class DhtNetworkSubProcess(NSPopen): process' stdout until it finds 'DhtNetworkSubProcess.NOTIFY_TOKEN' token, therefor, waits for the sub process to spawn. """ - # requests - NODE_PUT_REQ = b"np" - NEW_NODE_REQ = b"nn" - REMOVE_NODE_REQ = b"rn" - SHUTDOWN_NODE_REQ = b"sdn" - SHUTDOWN_REPLACE_NODE_REQ = b'sdrn' - SHUTDOWN_CLUSTER_REQ = b"sdc" - DUMP_STORAGE_REQ = b"strl" - MESSAGE_STATS = b"gms" - + # Packet types + REQUEST = 'DhtNetworkSubProcess.request' + ANSWER = 'DhtNetworkSubProcess.answer' + OUT = 'DhtNetworkSubProcess.out' - # tokens - NOTIFY_TOKEN = 'notify' - NOTIFY_END_TOKEN = 'notifyend' + # requests + PING_REQ = "p" + NODE_PUT_REQ = "np" # "np <hash> <value>" + NEW_NODE_REQ = "nn" # "nn" + REMOVE_NODE_REQ = "rn" # "rn <id0>[ <id1>[ id2[...]]]" + SHUTDOWN_NODE_REQ = "sdn" # "sdn <id0>[ <id1>[ id2[...]]]" + SHUTDOWN_REPLACE_NODE_REQ = "sdrn" # "sdn <id0>[ <id1>[ id2[...]]]" + SHUTDOWN_CLUSTER_REQ = "sdc" # "sdc" + DUMP_STORAGE_REQ = "strl" # "strl" + MESSAGE_STATS = "gms" # "gms" def __init__(self, ns, cmd, quit=False, **kwargs): super(DhtNetworkSubProcess, self).__init__(ns, cmd, **kwargs) @@ -70,11 +71,11 @@ class DhtNetworkSubProcess(NSPopen): self._quit = quit self._lock = threading.Condition() self._in_queue = queue.Queue() - self._out_queue = queue.Queue() + self._callbacks = {} + self._tid = 0 # starting thread self._thread = threading.Thread(target=self._communicate) - self._thread.daemon = True self._thread.start() def __repr__(self): @@ -92,42 +93,38 @@ class DhtNetworkSubProcess(NSPopen): """ Communication thread. This reads and writes to the sub process. """ - ENCODING = 'utf-8' sleep_time = 0.1 - stdin_line, stdout_line = '', '' - - # first read of process living. Expecting NOTIFY_TOKEN - while DhtNetworkSubProcess.NOTIFY_TOKEN not in stdout_line: - stdout_line = self.stdout.readline().decode() - time.sleep(sleep_time) - - with self._lock: - self._out_queue.put(stdout_line) while not self._quit: with self._lock: try: - stdin_line = self._in_queue.get_nowait() + packet = self._in_queue.get_nowait() # sending data to sub process - self.stdin.write(stdin_line if isinstance(stdin_line, bytes) else - bytes(str(stdin_line), encoding=ENCODING)) + self.stdin.write(packet) self.stdin.flush() except queue.Empty: - #waiting for next stdin req to send - self._lock.wait(timeout=sleep_time) + pass - # reading response from sub process - for stdout_line in iter(self.stdout.readline, b''): - stdout_line = stdout_line.decode().replace('\n', '') - if stdout_line: - with self._lock: - self._out_queue.put(stdout_line) + # reading from sub process + out_string = '' + for p in msgpack.Unpacker(self.stdout): + if isinstance(p, dict): + self._process_packet(p) + else: + # Some non-msgpack data could slip into the stream. We + # have to treat those as characters. + out_string += chr(p) + if out_string: + print(out_string) + + #waiting for next stdin req to send + self._lock.wait(timeout=sleep_time) with self._lock: self._lock.notify() - def stop_communicating(self): + def _stop_communicating(self): """ Stops the I/O thread from communicating with the subprocess. """ @@ -142,12 +139,12 @@ class DhtNetworkSubProcess(NSPopen): Notifies thread and sub process to terminate. This is blocking call until the sub process finishes. """ - self.stop_communicating() + self._stop_communicating() self.send_signal(signal.SIGINT); self.wait() self.release() - def send(self, msg): + def _send(self, msg): """ Send data to sub process. """ @@ -155,83 +152,76 @@ class DhtNetworkSubProcess(NSPopen): self._in_queue.put(msg) self._lock.notify() - def getline(self): + def _process_packet(self, p): """ - Read line from sub process. - - @return: A line on sub process' stdout. - @rtype : str + Process msgpack packet received from """ - line = '' - with self._lock: - try: - line = self._out_queue.get_nowait() - except queue.Empty: - pass - return line + if not b'tid' in p: + DhtNetwork.Log.err('Bad packet...') + try: + self._callbacks[p[b'tid']](p) + except KeyError: + DhtNetwork.Log.err('Unknown tid...') - def getlinesUntilNotify(self, answer_cb=None): - """ - Reads the stdout queue until a proper notification is given by the sub - process. - @param answer_cb: Callback to call when an answer is given after notify. - The function takes a list of strings as argument. - @type answer_cb: function - """ - notified = False - answer = [] - while True: - out = self.getline() - if out.split(' ')[0] == DhtNetworkSubProcess.NOTIFY_TOKEN: - notified = True - elif notified and out.split(' ')[0] == DhtNetworkSubProcess.NOTIFY_END_TOKEN: - if answer_cb: - answer_cb(answer) - break - elif notified: - answer.append(out) - elif out: - yield out - else: - time.sleep(0.1) - - def _sendRequest(self, request, answer_cb=None): + def _sendRequest(self, request, tid, done_cb): """ Sends a request to the sub network and wait for output. @param request: The serialized request. - @type request: bytes + @type request: Msgpack object """ - self.send(request + b'\n') - for line in self.getlinesUntilNotify(answer_cb=answer_cb): - DhtNetwork.log(line) + self._callbacks[tid] = done_cb + self._send(request) + + def sendPing(self, done_cb=None): + """Sends a ping request to the DhtNetworkSubProcess. - def sendGetMessageStats(self): + @param done_cb: The callback to be executed when we get a response. This + function takes a boolean "success" as parameter. + @type done_cb: Function + """ + self._tid += 1 + def dcb(packet): + try: + done_cb(packet[b'success']) + except KeyError: + done_cb(False) + self._sendRequest(msgpack.packb({ + DhtNetworkSubProcess.REQUEST : True, + 'tid' : self._tid, + 'req' : DhtNetworkSubProcess.PING_REQ + }), self._tid, dcb) + + def sendGetMessageStats(self, done_cb=None): """ Sends DhtNetwork sub process statistics request about nodes messages sent. + @param done_cb: A function taking as parameter the returned list of + stats. + @type done_cb: function + @return: A list [num_nodes, ping, find, get, put, listen]. @rtype : list """ - stats = [] - def cb(answer): - """ - Callback fed to getlinesUntilNotify made to recover answer from the - DhtNetwork sub process. - - @param answer: the list of lines answered by the sub process. - @type answer: function - """ - nonlocal stats - if answer: - stats = [int(v) for v in re.findall("[0-9]+", answer.pop())] - - self._sendRequest(DhtNetworkSubProcess.MESSAGE_STATS, answer_cb=cb) - return stats - - def sendClusterPutRequest(self, _hash, value): + self._tid += 1 + def dcb(packet): + nonlocal done_cb + if not done_cb: + return + try: + stats = packet[b'stats'] + done_cb([] if not isinstance(stats, list) else done_cb(stats)) + except KeyError: + done_cb([]) + self._sendRequest(msgpack.packb({ + DhtNetworkSubProcess.REQUEST : True, + 'tid' : self._tid, + 'req' : DhtNetworkSubProcess.MESSAGE_STATS + }), self._tid, dcb) + + def sendClusterPutRequest(self, _hash, value, done_cb=None): """ Sends a put operation request. @@ -239,40 +229,89 @@ class DhtNetworkSubProcess(NSPopen): @type _hash: bytes. @param value: the value. @type value: bytes. + @param done_cb: A function taking as parameter a boolean "success". + @type done_cb: function """ - self._sendRequest(b_space_join(DhtNetworkSubProcess.NODE_PUT_REQ, _hash, - value)) - - def sendClusterRequest(self, request, ids=b''): + self._tid += 1 + def dcb(packet): + nonlocal done_cb + if not done_cb: + return + try: + done_cb(packet[b'success']) + except KeyError: + done_cb(False) + self._sendRequest(msgpack.packb({ + DhtNetworkSubProcess.REQUEST : True, + 'tid' : self._tid, + 'req' : DhtNetworkSubProcess.NODE_PUT_REQ, + 'hash' : _hash, + 'value' : value + }), self._tid, dcb) + + def sendClusterRequest(self, request, ids=[], done_cb=None): """ Send request to a list of nodes or the whole cluster. @param request: The request. Possible values are: - DhtNetworkSubProcess.NODE_PUT_REQ DhtNetworkSubProcess.REMOVE_NODE_REQ DhtNetworkSubProcess.SHUTDOWN_NODE_REQ DhtNetworkSubProcess.SHUTDOWN_REPLACE_NODE_REQ DhtNetworkSubProcess.SHUTDOWN_CLUSTER_REQ DhtNetworkSubProcess.DUMP_STORAGE_REQ - DhtNetworkSubProcess.MESSAGE_STATS @type request: bytes @param ids: The list of ids concerned by the request. @type ids: list """ - self._sendRequest(b_space_join(request, b_space_join(*ids))) + self._tid += 1 + def dcb(packet): + nonlocal done_cb + if not done_cb: + return + try: + done_cb(packet[b'success']) + except KeyError: + done_cb(False) + self._sendRequest(msgpack.packb({ + DhtNetworkSubProcess.REQUEST : True, + 'tid' : self._tid, + 'req' : request, + 'ids' : ids + }), self._tid, dcb) + class DhtNetwork(object): nodes = [] - @staticmethod - def log(*to_print): + class Log(object): BOLD = "\033[1m" NORMAL = "\033[0m" - print('%s[DhtNetwork-%s]%s' % (BOLD, DhtNetwork.iface, NORMAL), *to_print, file=sys.stderr) + WHITE = "\033[97m" + RED = "\033[31m" + YELLOW = "\033[33m" + + @staticmethod + def _log_with_color(*to_print, color=None): + color = color if color else DhtNetwork.Log.WHITE + print('%s%s[DhtNetwork-%s]%s%s' % + (DhtNetwork.Log.BOLD, color, DhtNetwork.iface, DhtNetwork.Log.NORMAL, color), + *to_print, DhtNetwork.Log.NORMAL, file=sys.stderr) + + @staticmethod + def log(*to_print): + DhtNetwork.Log._log_with_color(*to_print, color=DhtNetwork.Log.WHITE) + + @staticmethod + def warn(*to_print): + DhtNetwork.Log._log_with_color(*to_print, color=DhtNetwork.Log.YELLOW) + + @staticmethod + def err(*to_print): + DhtNetwork.Log._log_with_color(*to_print, color=DhtNetwork.Log.RED) @staticmethod def run_node(ip4, ip6, p, bootstrap=[], is_bootstrap=False): - DhtNetwork.log("run_node", ip4, ip6, p, bootstrap) + DhtNetwork.Log.log("run_node", ip4, ip6, p, bootstrap) n = DhtRunner() n.run(ipv4=ip4 if ip4 else "", ipv6=ip6 if ip6 else "", port=p, is_bootstrap=is_bootstrap) for b in bootstrap: @@ -297,7 +336,7 @@ class DhtNetwork(object): self.ip6 = ip6 if ip6 else ips[1] self.bootstrap = bootstrap if first_bootstrap: - DhtNetwork.log("Starting bootstrap node") + DhtNetwork.Log.log("Starting bootstrap node") self.nodes.append(DhtNetwork.run_node(self.ip4, self.ip6, self.port, self.bootstrap, is_bootstrap=True)) self.bootstrap = [(self.ip4, str(self.port))] self.port += 1 @@ -309,6 +348,9 @@ class DhtNetwork(object): return self.nodes[0][1] def get(self, i=None): + if not self.nodes: + return None + if i is None: l = list(self.nodes) random.shuffle(l) @@ -321,14 +363,13 @@ class DhtNetwork(object): for n in self.nodes: if n[1].getNodeId() == id: return n - else: - return None + return None def launch_node(self): n = DhtNetwork.run_node(self.ip4, self.ip6, self.port, self.bootstrap) self.nodes.append(n) if not self.bootstrap: - DhtNetwork.log("Using fallback bootstrap", self.ip4, self.port) + DhtNetwork.Log.log("Using fallback bootstrap", self.ip4, self.port) self.bootstrap = [(self.ip4, str(self.port))] self.port += 1 return n @@ -346,25 +387,25 @@ class DhtNetwork(object): lock = threading.Condition() def shutdown_cb(): nonlocal lock - DhtNetwork.log('Done.') + DhtNetwork.Log.log('Done.') with lock: lock.notify() if not self.nodes: return - elif id is not None: + elif id: n = self.getNodeInfoById(id) if n: if shutdown: with lock: - DhtNetwork.log('Waiting for node to shutdown... ') + DhtNetwork.Log.log('Waiting for node to shutdown... ') n[1].shutdown(shutdown_cb) lock.wait() if last_msg_stats: last_msg_stats.append(self.getMessageStats()) n[1].join() self.nodes.remove(n) - DhtNetwork.log(id, 'deleted !') + DhtNetwork.Log.log(id, 'deleted !') return True else: return False @@ -385,11 +426,11 @@ class DhtNetwork(object): if n == l: return if n > l: - DhtNetwork.log("Launching", n-l, "nodes", self.ip4, self.ip6) + DhtNetwork.Log.log("Launching", n-l, "nodes", self.ip4, self.ip6) for i in range(l, n): self.launch_node() else: - DhtNetwork.log("Ending", l-n, "nodes", self.ip4, self.ip6) + DhtNetwork.Log.log("Ending", l-n, "nodes", self.ip4, self.ip6) #random.shuffle(self.nodes) for i in range(n, l): self.end_node() @@ -409,30 +450,39 @@ if __name__ == '__main__': lock = threading.Condition() quit = False - def notify_benchmark(answer=[]): - sys.stdout.write('%s\n' % DhtNetworkSubProcess.NOTIFY_TOKEN) - for line in answer: - sys.stdout.write(str(line)+'\n') - sys.stdout.write('%s\n' % DhtNetworkSubProcess.NOTIFY_END_TOKEN) - sys.stdout.flush() - - def listen_to_mother_nature(stdin, q): - global quit + def send_msgpack_packet(packet): + sys.stdout.buffer.write(packet) + sys.stdout.buffer.flush() - def parse_req(req): - split_req = req.split(' ') - - op = split_req[0] - hashes = [this_hash.replace('\n', '').encode() for this_hash in split_req[1:]] - - return (op, hashes) + def notify_benchmark(packet, success): + """Notifies the benchmark when an operation has been completed. + @param success: If the operation has been successful + @type success: boolean + @param packet: The packet we are providing an answer for. + @type packet: dict + """ + send_msgpack_packet(msgpack.packb({ + DhtNetworkSubProcess.ANSWER : True, + 'tid' : packet[b'tid'], + 'success' : success + })) + + def send_stats(packet, stats): + send_msgpack_packet(msgpack.packb({ + DhtNetworkSubProcess.ANSWER : True, + 'tid' : packet[b'tid'], + 'stats' : stats + })) + + def listen_to_mother_nature(q): + global quit while not quit: - req = stdin.readline() - parsed_req = parse_req(req) - q.put(parsed_req) - with lock: - lock.notify() + for p in msgpack.Unpacker(sys.stdin.buffer.raw): + if isinstance(p, dict) and DhtNetworkSubProcess.REQUEST.encode() in p: + with lock: + q.put(p) + lock.notify() def handler(signum, frame): global quit @@ -466,48 +516,52 @@ if __name__ == '__main__': net.resize(args.node_num) q = queue.Queue() - t = threading.Thread(target=listen_to_mother_nature, args=(sys.stdin, q)) + t = threading.Thread(target=listen_to_mother_nature, args=tuple([q])) t.daemon = True t.start() - notify_benchmark() - msg_stats = [] with lock: while not quit: - lock.wait() try: - req,req_args = q.get_nowait() + packet = q.get_nowait() except queue.Empty: - pass + lock.wait() else: - NODE_PUT_REQ = DhtNetworkSubProcess.NODE_PUT_REQ.decode() - NEW_NODE_REQ = DhtNetworkSubProcess.NEW_NODE_REQ.decode() - REMOVE_NODE_REQ = DhtNetworkSubProcess.REMOVE_NODE_REQ.decode() - SHUTDOWN_NODE_REQ = DhtNetworkSubProcess.SHUTDOWN_NODE_REQ.decode() - SHUTDOWN_REPLACE_NODE_REQ = DhtNetworkSubProcess.SHUTDOWN_REPLACE_NODE_REQ.decode() - SHUTDOWN_CLUSTER_REQ = DhtNetworkSubProcess.SHUTDOWN_CLUSTER_REQ.decode() - DUMP_STORAGE_REQ = DhtNetworkSubProcess.DUMP_STORAGE_REQ.decode() - MESSAGE_STATS = DhtNetworkSubProcess.MESSAGE_STATS.decode() - - if req in [SHUTDOWN_NODE_REQ, - SHUTDOWN_REPLACE_NODE_REQ, - REMOVE_NODE_REQ]: - def delete_request(req, n): + NODE_PUT_REQ = DhtNetworkSubProcess.NODE_PUT_REQ + NEW_NODE_REQ = DhtNetworkSubProcess.NEW_NODE_REQ + REMOVE_NODE_REQ = DhtNetworkSubProcess.REMOVE_NODE_REQ + SHUTDOWN_NODE_REQ = DhtNetworkSubProcess.SHUTDOWN_NODE_REQ + SHUTDOWN_REPLACE_NODE_REQ = DhtNetworkSubProcess.SHUTDOWN_REPLACE_NODE_REQ + SHUTDOWN_CLUSTER_REQ = DhtNetworkSubProcess.SHUTDOWN_CLUSTER_REQ + DUMP_STORAGE_REQ = DhtNetworkSubProcess.DUMP_STORAGE_REQ + MESSAGE_STATS = DhtNetworkSubProcess.MESSAGE_STATS + + req = packet[b'req'].decode() + success = True + if req in [SHUTDOWN_NODE_REQ, SHUTDOWN_REPLACE_NODE_REQ, REMOVE_NODE_REQ]: + def delete_request(req, nid): global msg_stats + if not nid: + return if req == SHUTDOWN_NODE_REQ: - net.end_node(id=n, shutdown=True, last_msg_stats=msg_stats) + net.end_node(id=nid, shutdown=True, last_msg_stats=msg_stats) elif req == SHUTDOWN_REPLACE_NODE_REQ: - net.replace_node(id=n, shutdown=True, last_msg_stats=msg_stats) + net.replace_node(id=nid, shutdown=True, last_msg_stats=msg_stats) elif req == REMOVE_NODE_REQ: - net.end_node(id=n, last_msg_stats=msg_stats) + net.end_node(id=nid, last_msg_stats=msg_stats) - if len(req) > 0: - for n in req_args: - delete_request(req, n) + nodes = packet[b'ids'] + if nodes: + for nid in nodes: + delete_request(req, nid) else: - delete_request(req, net.get().getNodeId()) + n = net.get() + if n: + delete_request(req, n.getNodeId()) + else: + success = False elif req == SHUTDOWN_CLUSTER_REQ: for n in net.nodes: net.end_node(id=n[2], shutdown=True, last_msg_stats=msg_stats) @@ -515,20 +569,26 @@ if __name__ == '__main__': elif req == NEW_NODE_REQ: net.launch_node() elif req == NODE_PUT_REQ: - _hash, v = req_args[:2] - net.get().put(InfoHash(_hash), Value(v)) - + _hash = packet[b'hash'] + v = packet[b'value'] + n = net.get() + if n: + n.put(InfoHash(_hash), Value(v)) + else: + success = False elif req == DUMP_STORAGE_REQ: - for n in [m[1] for m in net.nodes if m[1].getNodeId() in req_args]: + hashes = packet[b'ids'] + for n in [m[1] for m in net.nodes if m[1].getNodeId() in hashes]: net.log(n.getStorageLog()) - elif MESSAGE_STATS in req: + elif req == MESSAGE_STATS: stats = sum([np.array(x) for x in [net.getMessageStats()]+msg_stats]) - notify_benchmark(answer=[stats]) + send_stats(packet, [int(_) for _ in stats]) msg_stats.clear() continue - notify_benchmark() + notify_benchmark(packet, success) except Exception as e: - DhtNetwork.log(e) + traceback.print_tb(e.__traceback__) + print(type(e).__name__+':', e, file=sys.stderr) finally: if net: net.resize(0) diff --git a/python/tools/dht/tests.py b/python/tools/dht/tests.py index f1d2e70c17fb484c2fe287eb8c75141186513ca4..a4a216ab99117cfbd14d4cdb444074c91c6c8681 100644 --- a/python/tools/dht/tests.py +++ b/python/tools/dht/tests.py @@ -3,6 +3,7 @@ # Author(s): Adrien Béraud <adrien.beraud@savoirfairelinux.com> # Simon Désaulniers <sim.desaulniers@gmail.com> +import sys import os import threading import random @@ -10,6 +11,7 @@ import string import time import subprocess import re +import traceback import numpy as np import matplotlib.pyplot as plt @@ -212,7 +214,7 @@ class DhtFeatureTest(FeatureTest): @staticmethod def getcb(value): vstr = value.__str__()[:100] - DhtNetwork.log('[GET]: %s' % vstr + ("..." if len(vstr) > 100 else "")) + DhtNetwork.Log.log('[GET]: %s' % vstr + ("..." if len(vstr) > 100 else "")) DhtFeatureTest.foreignValues.append(value) return True @@ -220,7 +222,7 @@ class DhtFeatureTest(FeatureTest): def putDoneCb(ok, nodes): with FeatureTest.lock: if not ok: - DhtNetwork.log("[PUT]: failed!") + DhtNetwork.Log.log("[PUT]: failed!") FeatureTest.done -= 1 FeatureTest.lock.notify() @@ -228,7 +230,7 @@ class DhtFeatureTest(FeatureTest): def getDoneCb(ok, nodes): with FeatureTest.lock: if not ok: - DhtNetwork.log("[GET]: failed!") + DhtNetwork.Log.log("[GET]: failed!") else: for node in nodes: if not node.getNode().isExpired(): @@ -240,7 +242,7 @@ class DhtFeatureTest(FeatureTest): with FeatureTest.lock: for val in values: vstr = val.__str__()[:100] - DhtNetwork.log('[PUT]:', _hash.toString(), '->', vstr + ("..." if len(vstr) > 100 else "")) + DhtNetwork.Log.log('[PUT]:', _hash.toString(), '->', vstr + ("..." if len(vstr) > 100 else "")) FeatureTest.done += 1 producer.put(_hash, val, DhtFeatureTest.putDoneCb) while FeatureTest.done > 0: @@ -251,7 +253,7 @@ class DhtFeatureTest(FeatureTest): DhtFeatureTest.foreignNodes = [] with FeatureTest.lock: FeatureTest.done += 1 - DhtNetwork.log('[GET]:', _hash.toString()) + DhtNetwork.Log.log('[GET]:', _hash.toString()) consumer.get(_hash, DhtFeatureTest.getcb, DhtFeatureTest.getDoneCb) while FeatureTest.done > 0: FeatureTest.lock.wait() @@ -323,7 +325,7 @@ class PersistenceTest(DhtFeatureTest): n.run(config=config) n.bootstrap(self.bootstrap.ip4, str(self.bootstrap.port)) - DhtNetwork.log('Node','['+_hash_str+']', + DhtNetwork.Log.log('Node','['+_hash_str+']', 'started around', _hash.toString().decode() if n.isRunning() else 'failed to start...' @@ -333,22 +335,22 @@ class PersistenceTest(DhtFeatureTest): def _result(self, local_values, new_nodes): bootstrap = self.bootstrap if not DhtFeatureTest.successfullTransfer(local_values, DhtFeatureTest.foreignValues): - DhtNetwork.log('[GET]: Only %s on %s values persisted.' % + DhtNetwork.Log.log('[GET]: Only %s on %s values persisted.' % (len(DhtFeatureTest.foreignValues), len(local_values))) else: - DhtNetwork.log('[GET]: All values successfully persisted.') + DhtNetwork.Log.log('[GET]: All values successfully persisted.') if DhtFeatureTest.foreignValues: if new_nodes: - DhtNetwork.log('Values are newly found on:') + DhtNetwork.Log.log('Values are newly found on:') for node in new_nodes: - DhtNetwork.log(node) + DhtNetwork.Log.log(node) if self._dump_storage: - DhtNetwork.log('Dumping all storage log from '\ + DhtNetwork.Log.log('Dumping all storage log from '\ 'hosting nodes.') for proc in self._workbench.procs: proc.sendClusterRequest(DhtNetworkSubProcess.DUMP_STORAGE_REQ, DhtFeatureTest.foreignNodes) else: - DhtNetwork.log("Values didn't reach new hosting nodes after shutdown.") + DhtNetwork.Log.log("Values didn't reach new hosting nodes after shutdown.") def run(self): try: @@ -363,7 +365,8 @@ class PersistenceTest(DhtFeatureTest): else: raise NameError("This test is not defined '" + self._test + "'") except Exception as e: - print(e) + traceback.print_tb(e.__traceback__) + print(type(e).__name__+':', e, file=sys.stderr) finally: if self._traffic_plot or self._op_plot: plot_fname = "traffic-plot" @@ -391,8 +394,6 @@ class PersistenceTest(DhtFeatureTest): # nodes and values counters total_nr_values = 0 nr_nodes = wb.node_num - nr_nodes_cv = threading.Condition() - op_cv = threading.Condition() # values string in string format. Used for sending cluster request. @@ -401,40 +402,67 @@ class PersistenceTest(DhtFeatureTest): def normalBehavior(do, t): nonlocal total_nr_values, op_cv while True: - # makes sure we receive our response from the subprocess and - # avoids using transaction numbers with op_cv: do() time.sleep(random.uniform(0.0, float(t))) def putRequest(): nonlocal hashes, VALUE_SIZE, total_nr_values - total_nr_values += 1 - DhtNetwork.log("Random value put on the DHT.", "(now "+ str(total_nr_values)+" values on the dht)") - random.choice(wb.procs).sendClusterPutRequest(random.choice(hashes).toString(), - random_str_val(size=VALUE_SIZE).encode()) + lock = threading.Condition() + def dcb(success): + nonlocal total_nr_values, lock + if success: + total_nr_values += 1 + DhtNetwork.Log.log("INFO: "+ str(total_nr_values)+" values put on the dht since begining") + with lock: + lock.notify() + with lock: + DhtNetwork.Log.warn("Random value put on the DHT...") + random.choice(wb.procs).sendClusterPutRequest(random.choice(hashes).toString(), + random_str_val(size=VALUE_SIZE).encode(), + done_cb=dcb) + lock.wait() + puts = threading.Thread(target=normalBehavior, args=(putRequest, 30.0/wb.node_num)) puts.daemon = True puts.start() def newNodeRequest(): - nonlocal nr_nodes, nr_nodes_cv - with nr_nodes_cv: + nonlocal nr_nodes + lock = threading.Condition() + def dcb(success): + nonlocal nr_nodes, lock nr_nodes += 1 - DhtNetwork.log("Node joining the DHT.", "(now "+str(nr_nodes)+" nodes on the dht)") - nr_nodes_cv.notify() - random.choice(wb.procs).sendClusterRequest(DhtNetworkSubProcess.NEW_NODE_REQ) + DhtNetwork.Log.log("INFO: now "+str(nr_nodes)+" nodes on the dht") + with lock: + lock.notify() + with lock: + DhtNetwork.Log.warn("Node joining...") + random.choice(wb.procs).sendClusterRequest(DhtNetworkSubProcess.NEW_NODE_REQ, done_cb=dcb) + lock.wait() + connections = threading.Thread(target=normalBehavior, args=(newNodeRequest, 1*50.0/wb.node_num)) connections.daemon = True connections.start() def shutdownNodeRequest(): - nonlocal nr_nodes, nr_nodes_cv - with nr_nodes_cv: - nr_nodes -= 1 - DhtNetwork.log("Node quitting the DHT.", "(now "+str(nr_nodes)+" nodes on the dht)") - nr_nodes_cv.notify() - random.choice(wb.procs).sendClusterRequest(DhtNetworkSubProcess.SHUTDOWN_NODE_REQ) + nonlocal nr_nodes + lock = threading.Condition() + def dcb(success): + nonlocal nr_nodes, lock + if success: + nr_nodes -= 1 + DhtNetwork.Log.log("INFO: now "+str(nr_nodes)+" nodes on the dht") + else: + DhtNetwork.Log.err("Oops.. No node to shutodwn.") + + with lock: + lock.notify() + with lock: + DhtNetwork.Log.warn("Node shutting down...") + random.choice(wb.procs).sendClusterRequest(DhtNetworkSubProcess.SHUTDOWN_NODE_REQ, done_cb=dcb) + lock.wait() + shutdowns = threading.Thread(target=normalBehavior, args=(shutdownNodeRequest, 1*60.0/wb.node_num)) shutdowns.daemon = True shutdowns.start() @@ -476,36 +504,54 @@ class PersistenceTest(DhtFeatureTest): self._dhtGet(consumer, myhash) if not DhtFeatureTest.successfullTransfer(local_values, DhtFeatureTest.foreignValues): if DhtFeatureTest.foreignValues: - DhtNetwork.log('[GET]: Only ', len(DhtFeatureTest.foreignValues) ,' on ', + DhtNetwork.Log.log('[GET]: Only ', len(DhtFeatureTest.foreignValues) ,' on ', len(local_values), ' values successfully put.') else: - DhtNetwork.log('[GET]: 0 values successfully put') + DhtNetwork.Log.log('[GET]: 0 values successfully put') if DhtFeatureTest.foreignValues and DhtFeatureTest.foreignNodes: - DhtNetwork.log('Values are found on :') + DhtNetwork.Log.log('Values are found on :') for node in DhtFeatureTest.foreignNodes: - DhtNetwork.log(node) + DhtNetwork.Log.log(node) for _ in range(max(1, int(self._workbench.node_num/32))): - DhtNetwork.log('Removing all nodes hosting target values...') + DhtNetwork.Log.log('Removing all nodes hosting target values...') cluster_ops_count = 0 for proc in self._workbench.procs: - DhtNetwork.log('[REMOVE]: sending shutdown request to', proc) - proc.sendClusterRequest( + DhtNetwork.Log.log('[REMOVE]: sending shutdown request to', proc) + lock = threading.Condition() + def dcb(success): + nonlocal lock + if not success: + DhtNetwork.Log.err("Failed to shutdown.") + with lock: + lock.notify() + + with lock: + proc.sendClusterRequest( DhtNetworkSubProcess.SHUTDOWN_NODE_REQ, - DhtFeatureTest.foreignNodes - ) - DhtNetwork.log('sending message stats request') - stats = proc.sendGetMessageStats() - cluster_ops_count += sum(stats[1:]) - DhtNetwork.log("5 seconds wait...") + DhtFeatureTest.foreignNodes, + done_cb=dcb + ) + lock.wait() + DhtNetwork.Log.log('sending message stats request') + def msg_dcb(stats): + nonlocal cluster_ops_count, lock + if stats: + cluster_ops_count += sum(stats[1:]) + with lock: + lock.notify() + with lock: + proc.sendGetMessageStats(done_cb=msg_dcb) + lock.wait() + DhtNetwork.Log.log("5 seconds wait...") time.sleep(5) ops_count.append(cluster_ops_count/self._workbench.node_num) # checking if values were transfered to new nodes foreignNodes_before_delete = DhtFeatureTest.foreignNodes - DhtNetwork.log('[GET]: trying to fetch persistent values') + DhtNetwork.Log.log('[GET]: trying to fetch persistent values') self._dhtGet(consumer, myhash) new_nodes = set(DhtFeatureTest.foreignNodes) - set(foreignNodes_before_delete) @@ -514,7 +560,7 @@ class PersistenceTest(DhtFeatureTest): if self._op_plot: display_plot(ops_count, color='blue') else: - DhtNetwork.log("[GET]: either couldn't fetch values or nodes hosting values...") + DhtNetwork.Log.log("[GET]: either couldn't fetch values or nodes hosting values...") if traffic_plot_thread: print("Traffic plot running for ever. Ctrl-c for stopping it.") @@ -540,16 +586,16 @@ class PersistenceTest(DhtFeatureTest): self._dhtGet(consumer, myhash) initial_nodes = DhtFeatureTest.foreignNodes - DhtNetwork.log('Replacing', clusters, 'random clusters successively...') + DhtNetwork.Log.log('Replacing', clusters, 'random clusters successively...') for n in range(clusters): i = random.randint(0, len(self._workbench.procs)-1) proc = self._workbench.procs[i] - DhtNetwork.log('Replacing', proc) + DhtNetwork.Log.log('Replacing', proc) proc.sendClusterRequest(DhtNetworkSubProcess.SHUTDOWN_CLUSTER_REQ) self._workbench.stop_cluster(i) self._workbench.start_cluster(i) - DhtNetwork.log('[GET]: trying to fetch persistent values') + DhtNetwork.Log.log('[GET]: trying to fetch persistent values') self._dhtGet(consumer, myhash) new_nodes = set(DhtFeatureTest.foreignNodes) - set(initial_nodes) @@ -589,20 +635,20 @@ class PersistenceTest(DhtFeatureTest): nodes = set([]) self._gottaGetThemAllPokeNodes(consumer, hashes, nodes=nodes) - DhtNetwork.log("Values are found on:") + DhtNetwork.Log.log("Values are found on:") for n in nodes: - DhtNetwork.log(n) + DhtNetwork.Log.log(n) - DhtNetwork.log("Creating 8 nodes around all of these hashes...") + DhtNetwork.Log.log("Creating 8 nodes around all of these hashes...") for _hash in hashes: self._trigger_dp(trigger_nodes, _hash, count=8) - DhtNetwork.log('Waiting', DP_TIMEOUT+1, 'minutes for normal storage maintenance.') + DhtNetwork.Log.log('Waiting', DP_TIMEOUT+1, 'minutes for normal storage maintenance.') time.sleep((DP_TIMEOUT+1)*60) - DhtNetwork.log('Deleting old nodes from previous search.') + DhtNetwork.Log.log('Deleting old nodes from previous search.') for proc in self._workbench.procs: - DhtNetwork.log('[REMOVE]: sending delete request to', proc) + DhtNetwork.Log.log('[REMOVE]: sending delete request to', proc) proc.sendClusterRequest( DhtNetworkSubProcess.REMOVE_NODE_REQ, nodes) @@ -644,7 +690,8 @@ class PerformanceTest(DhtFeatureTest): else: raise NameError("This test is not defined '" + self._test + "'") except Exception as e: - print(e) + traceback.print_tb(e.__traceback__) + print(type(e).__name__+':', e, file=sys.stderr) finally: self.bootstrap.resize(1) @@ -683,7 +730,7 @@ class PerformanceTest(DhtFeatureTest): def getcb(v): nonlocal bootstrap - DhtNetwork.log("found", v) + DhtNetwork.Log.log("found", v) return True def donecb(ok, nodes, start): @@ -691,7 +738,7 @@ class PerformanceTest(DhtFeatureTest): t = time.time()-start with lock: if not ok: - DhtNetwork.log("failed !") + DhtNetwork.Log.log("failed !") times.append(t) done -= 1 lock.notify() @@ -723,7 +770,7 @@ class PerformanceTest(DhtFeatureTest): for n in range(10): self._workbench.replace_cluster() plt.pause(2) - DhtNetwork.log("Getting 50 random hashes succesively.") + DhtNetwork.Log.log("Getting 50 random hashes succesively.") for i in range(50): with lock: for _ in range(1): @@ -759,27 +806,27 @@ class PerformanceTest(DhtFeatureTest): for _ in range(max(1, int(self._workbench.node_num/32))): self._dhtGet(consumer, myhash) - DhtNetwork.log("Waiting 15 seconds...") + DhtNetwork.Log.log("Waiting 15 seconds...") time.sleep(15) self._dhtPut(producer, myhash, *local_values) #checking if values were transfered self._dhtGet(consumer, myhash) - DhtNetwork.log('Values are found on :') + DhtNetwork.Log.log('Values are found on :') for node in DhtFeatureTest.foreignNodes: - DhtNetwork.log(node) + DhtNetwork.Log.log(node) if not DhtFeatureTest.successfullTransfer(local_values, DhtFeatureTest.foreignValues): if DhtFeatureTest.foreignValues: - DhtNetwork.log('[GET]: Only ', len(DhtFeatureTest.foreignValues) ,' on ', + DhtNetwork.Log.log('[GET]: Only ', len(DhtFeatureTest.foreignValues) ,' on ', len(local_values), ' values successfully put.') else: - DhtNetwork.log('[GET]: 0 values successfully put') + DhtNetwork.Log.log('[GET]: 0 values successfully put') - DhtNetwork.log('Removing all nodes hosting target values...') + DhtNetwork.Log.log('Removing all nodes hosting target values...') for proc in self._workbench.procs: - DhtNetwork.log('[REMOVE]: sending shutdown request to', proc) + DhtNetwork.Log.log('[REMOVE]: sending shutdown request to', proc) proc.sendClusterRequest( DhtNetworkSubProcess.SHUTDOWN_NODE_REQ, DhtFeatureTest.foreignNodes