diff --git a/python/tools/benchmark.py b/python/tools/benchmark.py index accf83111c80ff19e4cf8cdad8b2d07028451306..1840d662c91947f26c324c13c3f1483aa5cba157 100755 --- a/python/tools/benchmark.py +++ b/python/tools/benchmark.py @@ -1,3 +1,4 @@ +# -*- coding: utf-8 -*- #!/usr/bin/env python3 # Copyright (C) 2015-2016 Savoir-faire Linux Inc. # Author(s): Adrien Béraud <adrien.beraud@savoirfairelinux.com> diff --git a/python/tools/dht/network.py b/python/tools/dht/network.py index 46dafab2df6b9858690fe6081e27b5a3df1b36fb..b0b68baf0af07321451c1c1f9af75a54af9d7c44 100755 --- a/python/tools/dht/network.py +++ b/python/tools/dht/network.py @@ -1,3 +1,4 @@ +# -*- coding: utf-8 -*- #!/usr/bin/env python3 # Copyright (c) 2015-2016 Savoir-faire Linux Inc. # Author(s): Adrien Béraud <adrien.beraud@savoirfairelinux.com> @@ -33,6 +34,9 @@ from pyroute2.netns.process.proxy import NSPopen from opendht import * +# useful functions +b_space_join = lambda *l: b' '.join(map(bytes, l)) + # TODO: find where token "notifyend" gets printed... class DhtNetworkSubProcess(NSPopen): """ @@ -189,6 +193,17 @@ class DhtNetworkSubProcess(NSPopen): else: time.sleep(0.1) + def _sendRequest(self, request, answer_cb=None): + """ + Sends a request to the sub network and wait for output. + + @param request: The serialized request. + @type request: bytes + """ + self.send(request + b'\n') + for line in self.getlinesUntilNotify(answer_cb=answer_cb): + DhtNetwork.log(line) + def sendGetMessageStats(self): """ Sends DhtNetwork sub process statistics request about nodes messages @@ -203,51 +218,33 @@ class DhtNetworkSubProcess(NSPopen): Callback fed to getlinesUntilNotify made to recover answer from the DhtNetwork sub process. - :answer: the list of lines answered by the sub process. + @param answer: the list of lines answered by the sub process. + @type answer: function """ nonlocal stats if answer: stats = [int(v) for v in re.findall("[0-9]+", answer.pop())] - self.send(DhtNetworkSubProcess.MESSAGE_STATS + b'\n') - for line in self.getlinesUntilNotify(answer_cb=cb): - DhtNetwork.log(line) - + self._sendRequest(DhtNetworkSubProcess.MESSAGE_STATS, answer_cb=cb) return stats - def sendNodesRequest(self, request, ids): - """ - Shutsdown nodes on the DhtNetwork sub process. - - @param request: The request - @type request: bytes - @param ids: ids of nodes concerned by the request. - @type ids: list - """ - serialized_req = request + b' ' + b' '.join(map(bytes, ids)) - self.send(serialized_req + b'\n') - for line in self.getlinesUntilNotify(): - DhtNetwork.log(line) - - def sendShutdown(self): + def sendNodesRequest(self, request, ids=b''): """ - Shutdown the whole cluster. This does not terminate comunicating thread; - use quit(). + Send request to a list of nodes or the whole cluster. + + @param request: The request. Possible values are: + DhtNetworkSubProcess.NODE_PUT_REQ + DhtNetworkSubProcess.REMOVE_NODE_REQ + DhtNetworkSubProcess.SHUTDOWN_NODE_REQ + DhtNetworkSubProcess.SHUTDOWN_REPLACE_NODE_REQ + DhtNetworkSubProcess.SHUTDOWN_CLUSTER_REQ + DhtNetworkSubProcess.DUMP_STORAGE_REQ + DhtNetworkSubProcess.MESSAGE_STATS + @type request: bytes + @param ids: The list of ids concerned by the request. + @type ids: list """ - self.send(DhtNetworkSubProcess.SHUTDOWN_CLUSTER_REQ + b'\n') - for line in self.getlinesUntilNotify(): - DhtNetwork.log(line) - - def sendDumpStorage(self, ids): - """ - Dumps storage log from nodes with id in `ids`. - """ - serialized_req = DhtNetworkSubProcess.DUMP_STORAGE_REQ + b' ' + \ - b' '.join(map(bytes, ids)) - self.send(serialized_req + b'\n') - for line in self.getlinesUntilNotify(): - DhtNetwork.log(line) - + self._sendRequest(b_space_join(request, b_space_join(*ids))) class DhtNetwork(object): nodes = [] @@ -298,8 +295,21 @@ class DhtNetwork(object): return None return self.nodes[0][1] - def get(self, n): - return self.nodes[n][1] + def get(self, i=None): + if i is None: + l = list(self.nodes) + random.shuffle(l) + return l[0][1] + else: + return self.nodes[i][1] + + def getNodeInfoById(self, id=None): + if id: + for n in self.nodes: + if n[1].getNodeId() == id: + return n + else: + return None def launch_node(self): n = DhtNetwork.run_node(self.ip4, self.ip6, self.port, self.bootstrap) @@ -329,21 +339,22 @@ class DhtNetwork(object): if not self.nodes: return - if id is not None: - for n in self.nodes: - if n[1].getNodeId() == id: - if shutdown: - with lock: - DhtNetwork.log('Waiting for node to shutdown... ') - n[1].shutdown(shutdown_cb) - lock.wait() - if last_msg_stats: - last_msg_stats.append(self.getMessageStats()) - n[1].join() - self.nodes.remove(n) - DhtNetwork.log(id, 'deleted !') - return True - return False + elif id is not None: + n = self.getNodeInfoById(id) + if n: + if shutdown: + with lock: + DhtNetwork.log('Waiting for node to shutdown... ') + n[1].shutdown(shutdown_cb) + lock.wait() + if last_msg_stats: + last_msg_stats.append(self.getMessageStats()) + n[1].join() + self.nodes.remove(n) + DhtNetwork.log(id, 'deleted !') + return True + else: + return False else: n = self.nodes.pop() n[1].join() @@ -385,14 +396,11 @@ if __name__ == '__main__': lock = threading.Condition() quit = False - def notify_benchmark(answer=None): - NOTIFY_TOKEN = 'notify' - NOTIFY_END_TOKEN = 'notifyend' - - sys.stdout.write('%s\n' % NOTIFY_TOKEN) - for line in answer if answer else []: + def notify_benchmark(answer=[]): + sys.stdout.write('%s\n' % DhtNetworkSubProcess.NOTIFY_TOKEN) + for line in answer: sys.stdout.write(str(line)+'\n') - sys.stdout.write('%s\n' % NOTIFY_END_TOKEN) + sys.stdout.write('%s\n' % DhtNetworkSubProcess.NOTIFY_END_TOKEN) sys.stdout.flush() def listen_to_mother_nature(stdin, q): @@ -461,12 +469,12 @@ if __name__ == '__main__': except queue.Empty: pass else: - REMOVE_NODE_REQ = 'rn' - SHUTDOWN_NODE_REQ = 'sdn' - SHUTDOWN_REPLACE_NODE_REQ = 'sdrn' - SHUTDOWN_CLUSTER_REQ = 'sdc' - DUMP_STORAGE_REQ = 'strl' - MESSAGE_STATS = 'gms' + REMOVE_NODE_REQ = DhtNetworkSubProcess.REMOVE_NODE_REQ.decode() + SHUTDOWN_NODE_REQ = DhtNetworkSubProcess.SHUTDOWN_NODE_REQ.decode() + SHUTDOWN_REPLACE_NODE_REQ = DhtNetworkSubProcess.SHUTDOWN_REPLACE_NODE_REQ.decode() + SHUTDOWN_CLUSTER_REQ = DhtNetworkSubProcess.SHUTDOWN_CLUSTER_REQ.decode() + DUMP_STORAGE_REQ = DhtNetworkSubProcess.DUMP_STORAGE_REQ.decode() + MESSAGE_STATS = DhtNetworkSubProcess.MESSAGE_STATS.decode() if req in [SHUTDOWN_NODE_REQ, SHUTDOWN_REPLACE_NODE_REQ, diff --git a/python/tools/dht/tests.py b/python/tools/dht/tests.py index 46f984b717de7810f237a26fd8e270252428563e..7fa31cb7dcf2aecc7a4ca506dd79d365064b6802 100644 --- a/python/tools/dht/tests.py +++ b/python/tools/dht/tests.py @@ -1,3 +1,4 @@ +# -*- coding: utf-8 -*- # Copyright (C) 2015 Savoir-Faire Linux Inc. # Author(s): Adrien Béraud <adrien.beraud@savoirfairelinux.com> # Simon Désaulniers <sim.desaulniers@gmail.com> @@ -181,7 +182,7 @@ class PersistenceTest(DhtFeatureTest): 'hosting nodes.') for proc in self._workbench.procs: - proc.sendDumpStorage(DhtFeatureTest.foreignNodes) + proc.sendNodesRequest(DhtNetworkSubProcess.DUMP_STORAGE_REQ, DhtFeatureTest.foreignNodes) else: DhtNetwork.log("Values didn't reach new hosting nodes after shutdown.") @@ -299,7 +300,7 @@ class PersistenceTest(DhtFeatureTest): i = random.randint(0, len(self._workbench.procs)-1) proc = self._workbench.procs[i] DhtNetwork.log('Replacing', proc) - proc.sendShutdown() + proc.sendNodesRequest(DhtNetworkSubProcess.SHUTDOWN_CLUSTER_REQ) self._workbench.stop_cluster(i) self._workbench.start_cluster(i) diff --git a/python/tools/dht/virtual_network_builder.py b/python/tools/dht/virtual_network_builder.py index a48fd3eb0b1260360f423725dddddd7401a3be30..3198267a677d18baf727c1a7d6b3ebabe07627be 100755 --- a/python/tools/dht/virtual_network_builder.py +++ b/python/tools/dht/virtual_network_builder.py @@ -1,3 +1,4 @@ +# -*- coding: utf-8 -*- #!/usr/bin/env python3 # Copyright (c) 2015-2016 Savoir-faire Linux Inc. # Author: Adrien Béraud <adrien.beraud@savoirfairelinux.com>