Skip to content
Snippets Groups Projects
Commit 5b3b22ed authored by Simon Désaulniers's avatar Simon Désaulniers
Browse files

benchmark: refactor and enhance network.py IPC

parent 059e0a5f
No related branches found
No related tags found
No related merge requests found
# -*- coding: utf-8 -*-
#!/usr/bin/env python3
# Copyright (C) 2015-2016 Savoir-faire Linux Inc.
# Author(s): Adrien Béraud <adrien.beraud@savoirfairelinux.com>
......
# -*- coding: utf-8 -*-
#!/usr/bin/env python3
# Copyright (c) 2015-2016 Savoir-faire Linux Inc.
# Author(s): Adrien Béraud <adrien.beraud@savoirfairelinux.com>
......@@ -33,6 +34,9 @@ from pyroute2.netns.process.proxy import NSPopen
from opendht import *
# useful functions
b_space_join = lambda *l: b' '.join(map(bytes, l))
# TODO: find where token "notifyend" gets printed...
class DhtNetworkSubProcess(NSPopen):
"""
......@@ -189,6 +193,17 @@ class DhtNetworkSubProcess(NSPopen):
else:
time.sleep(0.1)
def _sendRequest(self, request, answer_cb=None):
"""
Sends a request to the sub network and wait for output.
@param request: The serialized request.
@type request: bytes
"""
self.send(request + b'\n')
for line in self.getlinesUntilNotify(answer_cb=answer_cb):
DhtNetwork.log(line)
def sendGetMessageStats(self):
"""
Sends DhtNetwork sub process statistics request about nodes messages
......@@ -203,51 +218,33 @@ class DhtNetworkSubProcess(NSPopen):
Callback fed to getlinesUntilNotify made to recover answer from the
DhtNetwork sub process.
:answer: the list of lines answered by the sub process.
@param answer: the list of lines answered by the sub process.
@type answer: function
"""
nonlocal stats
if answer:
stats = [int(v) for v in re.findall("[0-9]+", answer.pop())]
self.send(DhtNetworkSubProcess.MESSAGE_STATS + b'\n')
for line in self.getlinesUntilNotify(answer_cb=cb):
DhtNetwork.log(line)
self._sendRequest(DhtNetworkSubProcess.MESSAGE_STATS, answer_cb=cb)
return stats
def sendNodesRequest(self, request, ids):
def sendNodesRequest(self, request, ids=b''):
"""
Shutsdown nodes on the DhtNetwork sub process.
@param request: The request
Send request to a list of nodes or the whole cluster.
@param request: The request. Possible values are:
DhtNetworkSubProcess.NODE_PUT_REQ
DhtNetworkSubProcess.REMOVE_NODE_REQ
DhtNetworkSubProcess.SHUTDOWN_NODE_REQ
DhtNetworkSubProcess.SHUTDOWN_REPLACE_NODE_REQ
DhtNetworkSubProcess.SHUTDOWN_CLUSTER_REQ
DhtNetworkSubProcess.DUMP_STORAGE_REQ
DhtNetworkSubProcess.MESSAGE_STATS
@type request: bytes
@param ids: ids of nodes concerned by the request.
@param ids: The list of ids concerned by the request.
@type ids: list
"""
serialized_req = request + b' ' + b' '.join(map(bytes, ids))
self.send(serialized_req + b'\n')
for line in self.getlinesUntilNotify():
DhtNetwork.log(line)
def sendShutdown(self):
"""
Shutdown the whole cluster. This does not terminate comunicating thread;
use quit().
"""
self.send(DhtNetworkSubProcess.SHUTDOWN_CLUSTER_REQ + b'\n')
for line in self.getlinesUntilNotify():
DhtNetwork.log(line)
def sendDumpStorage(self, ids):
"""
Dumps storage log from nodes with id in `ids`.
"""
serialized_req = DhtNetworkSubProcess.DUMP_STORAGE_REQ + b' ' + \
b' '.join(map(bytes, ids))
self.send(serialized_req + b'\n')
for line in self.getlinesUntilNotify():
DhtNetwork.log(line)
self._sendRequest(b_space_join(request, b_space_join(*ids)))
class DhtNetwork(object):
nodes = []
......@@ -298,8 +295,21 @@ class DhtNetwork(object):
return None
return self.nodes[0][1]
def get(self, n):
return self.nodes[n][1]
def get(self, i=None):
if i is None:
l = list(self.nodes)
random.shuffle(l)
return l[0][1]
else:
return self.nodes[i][1]
def getNodeInfoById(self, id=None):
if id:
for n in self.nodes:
if n[1].getNodeId() == id:
return n
else:
return None
def launch_node(self):
n = DhtNetwork.run_node(self.ip4, self.ip6, self.port, self.bootstrap)
......@@ -329,9 +339,9 @@ class DhtNetwork(object):
if not self.nodes:
return
if id is not None:
for n in self.nodes:
if n[1].getNodeId() == id:
elif id is not None:
n = self.getNodeInfoById(id)
if n:
if shutdown:
with lock:
DhtNetwork.log('Waiting for node to shutdown... ')
......@@ -343,6 +353,7 @@ class DhtNetwork(object):
self.nodes.remove(n)
DhtNetwork.log(id, 'deleted !')
return True
else:
return False
else:
n = self.nodes.pop()
......@@ -385,14 +396,11 @@ if __name__ == '__main__':
lock = threading.Condition()
quit = False
def notify_benchmark(answer=None):
NOTIFY_TOKEN = 'notify'
NOTIFY_END_TOKEN = 'notifyend'
sys.stdout.write('%s\n' % NOTIFY_TOKEN)
for line in answer if answer else []:
def notify_benchmark(answer=[]):
sys.stdout.write('%s\n' % DhtNetworkSubProcess.NOTIFY_TOKEN)
for line in answer:
sys.stdout.write(str(line)+'\n')
sys.stdout.write('%s\n' % NOTIFY_END_TOKEN)
sys.stdout.write('%s\n' % DhtNetworkSubProcess.NOTIFY_END_TOKEN)
sys.stdout.flush()
def listen_to_mother_nature(stdin, q):
......@@ -461,12 +469,12 @@ if __name__ == '__main__':
except queue.Empty:
pass
else:
REMOVE_NODE_REQ = 'rn'
SHUTDOWN_NODE_REQ = 'sdn'
SHUTDOWN_REPLACE_NODE_REQ = 'sdrn'
SHUTDOWN_CLUSTER_REQ = 'sdc'
DUMP_STORAGE_REQ = 'strl'
MESSAGE_STATS = 'gms'
REMOVE_NODE_REQ = DhtNetworkSubProcess.REMOVE_NODE_REQ.decode()
SHUTDOWN_NODE_REQ = DhtNetworkSubProcess.SHUTDOWN_NODE_REQ.decode()
SHUTDOWN_REPLACE_NODE_REQ = DhtNetworkSubProcess.SHUTDOWN_REPLACE_NODE_REQ.decode()
SHUTDOWN_CLUSTER_REQ = DhtNetworkSubProcess.SHUTDOWN_CLUSTER_REQ.decode()
DUMP_STORAGE_REQ = DhtNetworkSubProcess.DUMP_STORAGE_REQ.decode()
MESSAGE_STATS = DhtNetworkSubProcess.MESSAGE_STATS.decode()
if req in [SHUTDOWN_NODE_REQ,
SHUTDOWN_REPLACE_NODE_REQ,
......
# -*- coding: utf-8 -*-
# Copyright (C) 2015 Savoir-Faire Linux Inc.
# Author(s): Adrien Béraud <adrien.beraud@savoirfairelinux.com>
# Simon Désaulniers <sim.desaulniers@gmail.com>
......@@ -181,7 +182,7 @@ class PersistenceTest(DhtFeatureTest):
'hosting nodes.')
for proc in self._workbench.procs:
proc.sendDumpStorage(DhtFeatureTest.foreignNodes)
proc.sendNodesRequest(DhtNetworkSubProcess.DUMP_STORAGE_REQ, DhtFeatureTest.foreignNodes)
else:
DhtNetwork.log("Values didn't reach new hosting nodes after shutdown.")
......@@ -299,7 +300,7 @@ class PersistenceTest(DhtFeatureTest):
i = random.randint(0, len(self._workbench.procs)-1)
proc = self._workbench.procs[i]
DhtNetwork.log('Replacing', proc)
proc.sendShutdown()
proc.sendNodesRequest(DhtNetworkSubProcess.SHUTDOWN_CLUSTER_REQ)
self._workbench.stop_cluster(i)
self._workbench.start_cluster(i)
......
# -*- coding: utf-8 -*-
#!/usr/bin/env python3
# Copyright (c) 2015-2016 Savoir-faire Linux Inc.
# Author: Adrien Béraud <adrien.beraud@savoirfairelinux.com>
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment