Skip to content
Snippets Groups Projects
Commit 8de1d809 authored by Guillaume Roguez's avatar Guillaume Roguez
Browse files

test: fix pep8 compliance in doombot script

Change-Id: I810248de65c38e47bcf117d0f020b9392cb57ed9
parent c035cae6
No related branches found
No related tags found
No related merge requests found
SFL DoomBot
===========
This collection of script make it easy to run a native application in a `gdb`
shell. This allow unit, integration, fuzzy and stress tests to monitor the
process itself in case of a crash or assert. The DoomBot will then create a
report the developers can use to fix the issues.
We created this script to test `sflphone`, a network facing application. The
`SIP` protocol has some corner case and possible race conditions that may leave
the application in an unstable state.
## Installation
**Require:**
* python 3.0+
* gdb 7.7+
* flask-python3
* sqlite3 (+python3 bindings)
## Usage
### Common use case
#### Unit test executable
#### DBus interface
Make a script that call dbus methods until it crash
#### Network packet injection
Make a script that send packets to a port
#### File loading
Add an "load file and exit command line args"
## Web service API
The API is quite simple for now
```sh
# GDB session
curl http://127.0.0.1:5000/run/
# Kill the current GDB session
curl http://127.0.0.1:5000/kill/
```
## Roadmap
* The application is still tightly integrated with sflphone, a better separation is needed.
* Add valgrind support
* Add an API to add metadata
* Add sqlite backend
* Add md5 checksumming
* Use the gdb python API to check frames instead of `threads apply all bt full`
* Add a basic graph of the last 52 weeks
* Add the ability to execute a `gdb` script the next time this happen
## FAQ
### Is it possible to add accounts and passwords?
No, this service is not designed to be published on the internet. It allow
remote code execution by design. Keep this service in your intranet or add
some kind of HTTP authentication.
### Your code look like PHP4!
Yes, it does, the DoomBot frontend was designed as simple and as quick to develop
as possible. It doesn't have huge abstraction or any kind of template system. It
does what it does and that's it. If this ever take of, it would be one of the
first thing to fix.
#!/usr/bin/python3
import sys
import os
import argparse
from config import DefaultConfig
__version__ = "0.3"
config = DefaultConfig()
args = {}
for i in range(len(sys.argv)):
args[sys.argv[i]] = i
########################################################
# #
# Options validation #
# #
########################################################
_USAGE = """Usage: doombot [options] executable_path [executable_args]
Options:
--help (-h) : Show options and exit
--directory (-d) path : Directory full of scripts to run (optional)
--script (-s) path : Run a single script (optional)
--port (-p) port number : Run the server on a specific port (default:5000)
--interface (-i) interface_name : Run the server on a specific network interface
--continue ( ) : Continue execution after a SIGABRT (not recommended)
--version (-v) : Print the version and exit
"""
parser = argparse.ArgumentParser(description='')
parser.add_argument('--version',
help='Print the version and exit',
action='version',
version='SFL DoomBot %s' % __version__)
parser.add_argument('--directory', '-d',
help='Directory full of scripts to run (optional)',
dest='directory')
parser.add_argument('--script', '-s',
help='Run a single script (optional)',
dest='script')
parser.add_argument('--port', '-p',
help='Run the server on a specific port',
dest='port',
type=int,
default=config.port)
parser.add_argument('--interface', '-i',
help='Run the server on a specific network interface',
dest='interface')
parser.add_argument('--continue', '-c',
help='Continue execution after a SIGABRT (not recommended)',
dest='continue',
action='store_true',
default=config.cont)
config.command = config.command.strip()
config.args = sub_range
doombot_path = os.path.dirname(os.path.realpath(__file__)) + "/"
if not os.path.exists(doombot_path+"gdb_wrapper.py"):
print("Wrapper script not found")
exit(1)
########################################################
# #
# Start the server #
# #
########################################################
print("Starting the DoomBot server")
print("Executable : " + config.command )
print("Port : " + str( config.port ))
print("Continue on Asserts : " + str( config.cont ))
print("Using a script directory : " + str( config.directory != ""))
print("Using a script : " + str( config.script != "" ))
print()
# Start the server
import server
server.app.run()
#!/usr/bin/python3
import time
import os
# This file is used to wrap the GDB instances
#
# Doombot master server
# |-----> process_watcher threads
# |----> GDB process
# |----->Doombot wrapper <=== You are here
# -----> Real process
#
# This may seem a little overkill, but using the same
# process for both GDB and the Web server had too many
# limitations.
backtrace_collecton = {}
total_sig = 0
max_run = 10
class Bt_info:
content = ""
count = 1
bt_hash = ""
bt_type = ""
def to_xml(self):
result = ""
result += " <backtrace>\n"
result += " <signature>" + self.bt_hash+"</signature>\n"
result += " <type>" + self.bt_type+"</type>\n"
result += " <count>" + str(self.count) +"</count>\n"
result += " <content>" + self.content +" </content>\n"
result += " </backtrace>\n"
return result
#Generate output
def to_xml():
result = ""
result += "<doombot>\n"
result += " <backtraces>\n"
for key,bt in backtrace_collecton.items():
result += bt.to_xml()
result += " </backtraces>\n"
result += "</doombot>\n"
print(result)
f = open('/tmp/dommbot','w')
f.write(result)
f.close()
def run():
if total_sig <= max_run:
time.sleep(4)
gdb.execute("run 2>&1 > /dev/null")#,False,True)
else:
to_xml()
def get_backtrace_identity(trace):
result = ""
counter = 0
for line in trace.split('\n'):
fields = line.split()
if fields[3][0:2] != "__":
result = result + fields[-1]
counter += 1
if counter >= 3:
break
return result
def get_backtrace(bt_type):
output = gdb.execute("bt",False,True)
bt_hash = get_backtrace_identity(output)
if not bt_hash in backtrace_collecton:
print("\n\n\nADDING "+bt_type+ " "+ bt_hash+" to list")
info = Bt_info()
info.content = output
info.bt_hash = bt_hash
info.bt_type = bt_type
backtrace_collecton[bt_hash] = info
else:
backtrace_collecton[bt_hash].count += 1
print("\n\n\nEXISTING " +bt_type+ " ("+ str(backtrace_collecton[bt_hash].count)+")")
run()
def stop_handler (event):
if isinstance(event,gdb.SignalEvent):
global total_sig
if event.stop_signal == "SIGSEGV":
total_sig +=1
get_backtrace(event.stop_signal)
if event.stop_signal == "SIGABRT":
print("SIGABRT")
total_sig +=1
get_backtrace(event.stop_signal)
elif isinstance(event,gdb.BreakpointEvent):
print("BREAKPOINT "+ str(event.breakpoint.expression) +" " \
+str(event.breakpoint.location)+" "\
+str(event.breakpoint.condition) + " " \
+str(event.breakpoint.commands))
for k,v in event.breakpoints:
print("HERE "+str(k)+" "+str(v))
#Force restart
def exit_handler(event):
if (ExitedEvent.exit_code == 0):
gdb.run("quit")
gdb.events.stop.connect (stop_handler)
gdb.events.exited.connect (exit_handler)
gdb.execute("set confirm off")
gdb.execute("set height 0")
os.system('export MALLOC_CHECK=2')
#gdb.execute("file /home/etudiant/prefix/lib/sflphone/sflphoned")
run()
\ No newline at end of file
import threading
import subprocess
import os
# This file is used to wrap the GDB instances
#
# Doombot master server
# |-----> process_watcher threads <=== You are here
# |----> GDB process
# |----->Doombot wrapper
# -----> Real process
#
# This may seem a little overkill, but using the same
# process for both GDB and the Web server had too many
# limitations.
def launch_process(config):
"""
Runs the given args in a subprocess.Popen, and then calls the function
onExit when the subprocess completes.
onExit is a callable object, and popenArgs is a list/tuple of args that
would give to subprocess.Popen.
"""
def runInThread(onExit, popenArgs):
print(popenArgs)
#print("starting "+popenArgs.executable)
proc = subprocess.Popen(**popenArgs)
proc.wait()
#output = proc.communicate()[0]
#error = proc.communicate()[2]
onExit(output,error)
return
t = { 'args' : ['gdb', '-x', os.path.dirname(os.path.realpath(__file__)) + "/gdb_wrapper.py" ,'--args'] + config.args}#,
#'stdout' : subprocess.PIPE ,
#'stderr' : subprocess.PIPE }
print("foo")
def onExit():
return
thread = threading.Thread(target=runInThread, args=(onExit, t) )
thread.start()
return 4 #session_id TODO
#!/usr/bin/python
# -*- coding: utf-8 -*-
from flask import Flask
import config
import process_watcher
app = Flask(__name__)
# This file is used to wrap the GDB instances
#
# Doombot master server
# |-----> process_watcher threads
# |----> GDB process <=== You are here
# |----->Doombot wrapper
# -----> Real process
#
# This may seem a little overkill, but using the same
# process for both GDB and the Web server had too many
# limitations.
# This file is a hardcoded Web UI for the DoomBot. It print tons inflexible HTML
# is it ugly?: Yes, of course it is. Does it work?: Well enough
def print_banner():
return "<pre>\
##############################################################################\n\
# --SFL-- #\n\
# /¯¯¯¯\ /¯¯¯¯\ /¯¯¯¯\ /¯¯¯\_/¯¯¯\ /¯¯¯¯¯\ /¯¯¯¯\ |¯¯¯¯¯¯¯| #\n\
# / /¯\ | / /\ \ /\ \| /¯\ /¯\ | | |¯| | | /\ | ¯¯| |¯¯ #\n\
# / / / |/ | | | | | || | | | | | | ¯ < | | | | | | #\n\
# / /__/ / | ¯ | ¯ || | | | | | | |¯| | | |_| | | | #\n\
# |______/ \_____/ \____/ |_| |_| |_| \_____/ \____/ |_| #\n\
# MASTER #\n\
##############################################################################\n\
</pre>"
def print_head(body):
return "<html><head><title>DoomBot</title></head><body>%s\
<br /><div>Copyright Savoir-faire Linux (2012-2014)</div></body></html>" \
% body
def print_options():
return "<table>\n\
<tr><td>directory </td><td><code>%s</code></td></tr>\n\
<tr><td>script </td><td><code>%s</code></td></tr>\n\
<tr><td>cont </td><td><code>%s</code></td>/tr>\n\
<tr><td>command </td><td><code>%s</code></td></tr>\n\
</table>\n" % (str(config.directory), str(config.script and "true" or "false")
, str(config.cont and "true" or "false"), str(config.command))
def print_actions():
return "<a href='http://127.0.0.1:5000/run/'>Run now</a>\n\
<a href='http://127.0.0.1:5000/kill/'>Kill</a>"
@app.route("/")
def dashboard():
return print_head(
#Print the banner
print_banner () +
#Print the current options values
print_options() +
#Print the possible action
print_actions()
#Print the recent issues
)
@app.route("/run/")
def db_run():
print("BOB")
process_watcher.launch_process(config)
return "Starting the DoomBot"
@app.route("/kill/")
def db_kill():
return "Killing the DoomBot"
#!/usr/bin/python
# -*- coding: utf-8 -*-
print "\
# --SFLPhone-- #\n\
# /¯¯¯¯\ /¯¯¯¯\ /¯¯¯¯\ /¯¯¯\_/¯¯¯\ /¯¯¯¯¯\ /¯¯¯¯\ |¯¯¯¯¯¯¯| #\n\
# / /¯\ | / /\ \ /\ \| /¯\ /¯\ | | |¯| | | /\ | ¯¯| |¯¯ #\n\
# / / / |/ | | | | | || | | | | | | ¯ < | | | | | | #\n\
# / /__/ / | ¯ | ¯ || | | | | | | |¯| | | |_| | | | #\n\
# |______/ \_____/ \____/ |_| |_| |_| \_____/ \____/ |_| #\n\
# #\n\
# copyright: Savoir-Faire Linux (2012) #\n\
# author: Emmanuel Lepage Vallee <emmanuel.lepage@savoirfairelinux.com> #\n\
# description: This script perform stress tests to trigger rare race #\n\
# conditions or ASSERT caused by excessive load. This script #\n\
# should, in theory, never crash or end the sflphone daemon #\n"
import dbus
import time
import sys
from random import randint
#Initialise DBUS
bus = dbus.SessionBus()
callManagerBus = bus.get_object('org.sflphone.SFLphone', '/org/sflphone/SFLphone/CallManager')
callManager = dbus.Interface(callManagerBus, dbus_interface='org.sflphone.SFLphone.CallManager')
configurationManagerBus = bus.get_object('org.sflphone.SFLphone', '/org/sflphone/SFLphone/ConfigurationManager')
configurationManager = dbus.Interface(configurationManagerBus, dbus_interface='org.sflphone.SFLphone.ConfigurationManager')
#---------------------------------------------------------------------#
# #
# Tools #
# #
#---------------------------------------------------------------------#
#Get the first non-IP2IP account
def get_first_account():
accounts = configurationManager.getAccountList()
for i, v in enumerate(accounts):
if v != "IP2IP":
details = configurationManager.getAccountDetails(v)
if details["Account.type"] == True or details["Account.type"] == "SIP":
return v
return "IP2IP"
#Get the first IAX account
def get_first_iax_account():
accounts = configurationManager.getAccountList()
for i, v in enumerate(accounts):
if v != "IP2IP":
details = configurationManager.getAccountDetails(v)
if details["Account.type"] != True and details["Account.type"] != "SIP":
return v
return "IP2IP"
def get_account_number(account):
details = configurationManager.getAccountDetails(account)
return details["Account.username"]
def answer_all_calls():
calls = callManager.getCallList()
for i, v in enumerate(calls):
details = callManager.getCallDetails(v)
if details["CALL_STATE"] == "INCOMING":
callManager.accept(v)
#Return true is the account is registered
def check_account_state(account):
details = configurationManager.getAccountDetails(account)
#details = {'test':1,'test2':2,'registrationStatus':3}
return details['Account.registrationStatus'] == "REGISTERED"
#Meta test, common for all tests
def meta_test(test_func):
try:
for y in range(0,15):
for x in range(0,10):
ret = test_func()
if ret['code'] > 0:
sys.stdout.write(' \033[91m(Failure)\033[0m\n')
print " \033[0;33m"+ret['error']+"\033[0m"
return 1
sys.stdout.write('#')
sys.stdout.flush()
sys.stdout.write(' \033[92m(Success)\033[0m\n')
except dbus.exceptions.DBusException:
sys.stdout.write(' \033[91m(Failure)\033[0m\n')
print " \033[0;33mUnit test \"stress_answer_hangup_server\" failed: Timeout, the daemon is unreachable, it may be a crash, a lock or an assert\033[0m"
return 1
#except Exception:
#sys.stdout.write(' \033[91m(Failure)\033[0m\n')
#print " \033[0;33mUnit test \"stress_answer_hangup_server\" failed: Unknown error, disable 'except Exception' for details\033[0m"
#return 1
return 0
#Add a new test
suits = {}
def add_to_suit(test_suite_name,test_name,test_func):
if not test_suite_name in suits:
suits[test_suite_name] = []
suits[test_suite_name].append({'test_name':test_name,'test_func':test_func})
# Run tests
def run():
counter = 1
results = {}
for k in suits.keys():
print "\n\033[1mExecuting \""+str(k)+"\" tests suit:\033[0m ("+str(counter)+"/"+str(len(suits))+")"
for i, v in enumerate(suits[k]):
sys.stdout.write(" ["+str(i+1)+"/"+str(len(suits[k]))+"] Testing \""+v['test_name']+"\": ")
sys.stdout.flush()
retval = meta_test(v['test_func'])
if not k in results:
results[k] = 0
if retval > 0:
results[k]= results[k] + 1
counter = counter + 1
print "\n\n\033[1mSummary:\033[0m"
totaltests = 0
totalsuccess = 0
for k in suits.keys():
print " Suit \""+k+"\": "+str(len(suits[k])-results[k])+"/"+str(len(suits[k]))
totaltests = totaltests + len(suits[k])
totalsuccess = totalsuccess + len(suits[k])-results[k]
print "\nTotal: "+str(totalsuccess)+"/"+str(totaltests)+", "+str(totaltests-totalsuccess)+" failures"
#---------------------------------------------------------------------#
# #
# Variables #
# #
#---------------------------------------------------------------------#
first_account = get_first_account()
first_iax_account = get_first_iax_account()
first_account_number = get_account_number(first_account)
#---------------------------------------------------------------------#
# #
# Unit Tests #
# #
#---------------------------------------------------------------------#
# This unit case test the basic senario of calling asterisk/freeswitch and then hanging up
# It call itself to make the test simpler, this also test answering up as a side bonus
def stress_answer_hangup_server():
callManager.placeCall(first_account,str(randint(100000000,100000000000)),first_account_number)
time.sleep(0.05)
calls = callManager.getCallList()
# Check if the call worked
if len(calls) < 2:
if not check_account_state(first_account):
#TODO Try to register again instead of failing
return {'code':2,'error':"Unit test \"stress_answer_hangup_server\" failed: Account went unregistered"}
else:
return {'code':1,'error':"Unit test \"stress_answer_hangup_server\" failed: Error while placing call, there is "+str(len(calls))+" calls"}
else:
#Accept the calls
for i, v in enumerate(calls):
time.sleep(0.05)
#callManager.accept(v)
#Hang up
callManager.hangUp(calls[0])
return {'code':0,'error':""}
add_to_suit("Place call",'Place, answer and hangup',stress_answer_hangup_server)
# This test is similar to stress_answer_hangup_server, but test using IP2IP calls
def stress_answer_hangup_IP2IP():
callManager.placeCall(first_account,str(randint(100000000,100000000000)),"sip:127.0.0.1")
time.sleep(0.05)
calls = callManager.getCallList()
# Check if the call worked
if len(calls) < 2:
if not check_account_state(first_account):
#TODO Try to register again instead of failing
return {'code':2,'error':"\nUnit test \"stress_answer_hangup_server\" failed: Account went unregistered"}
else:
return {'code':1,'error':"\nUnit test \"stress_answer_hangup_server\" failed: Error while placing call, there is "+str(len(calls))+" calls"}
else:
#Accept the calls
for i, v in enumerate(calls):
time.sleep(0.05)
#callManager.accept(v)
#Hang up
callManager.hangUp(calls[0])
return {'code':0,'error':""}
add_to_suit("Place call",'Place, answer and hangup (IP2IP)',stress_answer_hangup_IP2IP)
# Test various type of transfers between various type of calls
# Use both localhost and
def stress_transfers():
for i in range(0,50): #repeat the tests
for j in range(0,3): # alternate between IP2IP, SIP and IAX
for k in range(0,2): #alternate between classic transfer and attended one
acc1 = ""
if j == 0:
acc1 = first_account
elif j == 1:
acc1 = "IP2IP"
else:
acc1 = first_iax_account
acc2 = ""
if i%3 == 0: #Use the first loop to shuffle second account type
acc2 = first_account
elif i%3 == 1:
acc2 = "IP2IP"
else:
acc2 = first_iax_account
print "ACC1"+acc1+" ACC2 "+acc2+ " FIRST IAX "+ first_iax_account +" FISRT "+first_account
destination_number = ""
if acc2 == "IP2IP":
destination_number = "sip:127.0.0.1"
else:
destination_number = configurationManager.getAccountDetails(acc2)["Account.username"]
callManager.placeCall(acc1,str(randint(100000000,100000000000)),destination_number)
second_call = None
if k == 1:
callManager.placeCall(acc1,str(randint(100000000,100000000000)),"188")
answer_all_calls()
if k == 1:
first_call = None
calls = callManager.getCallList()
for i, v in enumerate(calls):
if first_call == None:
first_call = v
else:
callManager.attendedTransfer(v,first_call)
else:
calls = callManager.getCallList()
for i, v in enumerate(calls):
callManager.transfer(v,destination_number)
# This test make as tons or calls, then hangup them all as fast as it can
def stress_concurent_calls():
for i in range(0,50): #repeat the tests
for j in range(0,3): # alternate between IP2IP, SIP and IAX
for k in range(0,2): #alternate between classic transfer and attended one
acc1 = ""
if j == 0:
acc1 = first_account
elif j == 1:
acc1 = "IP2IP"
else:
acc1 = first_iax_account
acc2 = ""
if i%3 == 0: #Use the first loop to shuffle second account type
acc2 = first_account
elif i%3 == 1:
acc2 = "IP2IP"
else:
acc2 = first_iax_account
print "ACC1"+acc1+" ACC2 "+acc2+ " FIRST IAX "+ first_iax_account +" FISRT "+first_account
destination_number = ""
if acc2 == "IP2IP":
destination_number = "sip:127.0.0.1"
else:
destination_number = configurationManager.getAccountDetails(acc2)["Account.username"]
callManager.placeCall(acc1,str(randint(100000000,100000000000)),destination_number)
calls = callManager.getCallList()
for i, v in enumerate(calls):
callManager.hangUp(v)
add_to_suit("Place call",'Many simultanious calls (IP2IP)',stress_concurent_calls)
# Test if SFLPhone can handle more than 50 simultanious IP2IP call over localhost
# Using localhost to save bandwidth, this is about concurent calls, not network load
#def stress_concurent_calls():
## Create 50 calls
#for i in range(0,50):
#callManager.placeCall(first_account,str(randint(100000000,100000000000)),"sip:127.0.0.1")
##TODO check if the could is right
## Accept all calls that worked
#calls = callManager.getCallList()
#for i, v in enumerate(calls):
#callManager.accept(v)
## Hang up all calls
#for i, v in enumerate(calls):
#callManager.hangUp(v)
#return {'code':0,'error':""}
#add_to_suit("Place call",'Many simultanious calls (IP2IP)',stress_concurent_calls)
# Test if a call can be put and removed from hold multiple time
def stress_hold_unhold_server():
# Hang up everything left
calls = callManager.getCallList()
for i, v in enumerate(calls):
callManager.hangUp(v)
#Place a call
callManager.placeCall(first_account,str(randint(100000000,100000000000)),first_account_number)
calls = callManager.getCallList()
if len(calls) < 1:
return {'code':5,'error':"\nUnit test \"stress_hold_unhold\" failed: The call is gone"}
call = calls[0]
#Hold and unhold it
for i in range(0,10):
callManager.hold(call)
details = callManager.getCallDetails(call)
if not 'CALL_STATE' in details:
return {'code':1,'error':"\nUnit test \"stress_hold_unhold\" failed: The call is gone (hold)"}
if not details['CALL_STATE'] == "HOLD":
return {'code':2,'error':"\nUnit test \"stress_hold_unhold\" failed: The call should be on hold, but is "+details['CALL_STATE']}
callManager.unhold(call)
details = callManager.getCallDetails(call)
if not 'CALL_STATE' in details:
return {'code':3,'error':"\nUnit test \"stress_hold_unhold\" failed: The call is gone (unhold)"}
if not details['CALL_STATE'] == "CURRENT":
return {'code':4,'error':"\nUnit test \"stress_hold_unhold\" failed: The call should be current, but is "+details['CALL_STATE']}
return {'code':0,'error':""}
#add_to_suit("Hold call",'Hold and unhold',stress_hold_unhold_server)
#Run the tests
#run()
stress_transfers()
#kate: space-indent off; tab-indents on; mixedindent off; indent-width 4;tab-width 4;
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment