gozerbot tests framework.
Bases: object
a test object.
run the test on bot with event.
Bases: object
collection of all tests.
activate tests.
add a test.
unload tests.
fire all tests.
optional end function.
call bot.fakein().
sleep nr of seconds.
optional start function.
unload tests.
# gozerbot/tests.py # # """ gozerbot tests framework. """ __status__ = "seen"
from config import config from threads.thr import start_new_thread from utils.locking import lockdec from utils.log import rlog from utils.trace import calledfrom, whichmodule from utils.exception import exceptionmsg from utils.dol import Dol from eventbase import EventBase
import sys import re import thread import copy import time import threading import random
cpy = copy.deepcopy
testlock = threading.RLock() locked = lockdec(testlock)
class Test(object): """ a test object. """ def __init__(self, execstring="", expect="", descr="", where="", fakein=""): self.plugin = calledfrom(sys._getframe(1)) if not self.plugin: self.plugin = calledfrom(sys._getframe(2)) self.descr = cpy(descr) self.execstring = cpy(execstring) self.expect = cpy(expect) self.response = "" self.groups = [] self.error = "" self.where = cpy(where) self.fakein = cpy(fakein) self.start = None self.end = None self.prev = None self.activate = False def __str__(self): return "test %s (%s) %s ==> %s (%s)" % (self.descr, self.where, self.execstring, self.response, self.error) def begin(self): if self.start: self.start() return self def run(self, bot, event): """ run the test on bot with event. """ if not self.activate: return if config['loadlist'] and self.plugin not in config['loadlist']: return mevent = copy.deepcopy(event) mevent.onlyqueues = False mevent.channel = '#dunkbots' bot.userhosts['bottest'] = 'bottest@test' bot.userhosts['mtest'] = 'mekker@test' bot.userhosts['exec'] = 'exec@gozerbot' if 'exec@gozerbot' not in bot.state['joinedchannels']: bot.state['joinedchannels'].append('exec@gozerbot') bot.channels['exec@gozerbot'] = {} if '#dunkbots' not in bot.state['joinedchannels']: bot.state['joinedchannels'].append('#dunkbots') bot.channels['#dunkbots'] = {'cc': '!'} self.error = "" self.response = "" self.groups = [] origexec = self.execstring origexpect = self.expect if self.start: self.start() ; return self if self.end: self.end() ; return self if self.fakein: bot.fakein(self.fakein) ; return self if self.prev and self.prev.groups: try: execstring = self.execstring % self.prev.groups self.execstring = execstring except TypeError: pass try: expect = self.expect % self.prev.groups self.expect = expect except TypeError: pass self.execstring = self.execstring.replace('{{ me }}', mevent.nick) mevent.txt = mevent.origtxt = str(self.execstring) rlog(100, 'tests', 'launching %s' % mevent.txt) from gozerbot.plugins import plugins self.response = plugins.cmnd(bot, mevent) if self.response and self.expect: self.expect = self.expect.replace('{{ me }}', mevent.nick) expects = self.expect.split('|') got = False for expect in expects: regex = re.compile(expect) result = regex.search(str(self.response)) if result: got = True ; break if not got: self.error = 'invalid response' else: self.groups = result.groups() self.execstring = origexec self.expect = origexpect return self
class Tests(object): """ collection of all tests. """ def __init__(self): self.tests = [] self.err = Dol() self.toolate = Dol() self.teller = 0 def add(self, execstr, expect=None, descr="", fakein=""): """ add a test. """ where = whichmodule(1) if not where: where = whichmodule(2) if not where: where = whichmodule(3) test = Test(execstr, expect, descr, where, fakein) self.tests.append(test) return self def fakein(self, execstr, expect=None, descr=""): """ call bot.fakein(). """ where = whichmodule(1) if not where: where = whichmodule(2) if not where: where = whichmodule(3) test = Test(execstr, expect, descr, where, execstr) test.where = where self.tests.append(test) return self def start(self, func): """ optional start function. """ where = whichmodule(1) if not where: where = whichmodule(2) if not where: where = whichmodule(3) test = Test() test.start = func test.where = where test.execstring = 'start' self.tests.append(test) return self def end(self, func): """ optional end function. """ where = whichmodule(1) if not where: where = whichmodule(2) if not where: where = whichmodule(3) test = Test() test.end = func test.where = where test.execstring = 'end' self.tests.append(test) return self def unload(self, plugname): """ unload tests. """ for i in range(len(self.tests)-1, -1, -1): if self.tests[i].plugin == plugname: del self.tests[i] return self def activate(self, plugname): """ activate tests. """ for i in range(len(self.tests)-1, -1, -1): if self.tests[i].plugin == plugname: self.tests[i].activate = True return self def disable(self, plugname): """ unload tests. """ for i in range(len(self.tests)-1, -1, -1): if self.tests[i].plugin == plugname: self.tests[i].activate = False return self def dorun(self, bot, event, tests, where, plug=None): teller = 0 err = {} toolate = [] prev = None for test in tests: if event.rest and event.rest not in test.plugin: continue if prev: test.prev = prev prev = test if test.expect: teller += 1 try: starttime = time.time() self.teller += 1 e = copy.deepcopy(event) result = test.run(bot, e) finished = time.time() if finished - starttime > 10: self.toolate[test.execstring] = finished - starttime if not result: continue if not result.error: event.reply("OK %s (%s) ==> %s" % (test.execstring, test.where, result.response)) else: self.err[test.execstring] = test event.reply('ERROR %s (%s): %s ==> %s (%s)' % (test.error, test.where, test.execstring, test.response, test.expect)) except Exception, ex: test.error = exceptionmsg() self.err[test.execstring] = test event.reply(test.error) return self def dotests(self, bot, event, threaded=False, plug=None): """ fire all tests. """ groups = Dol() for test in self.tests: groups.add(test.where, test) threads = [] testlist = [] for where, tests in groups.iteritems(): testlist.append((where, tests)) random.shuffle(testlist) for t in testlist: where, tests = t if plug and plug not in where: continue event.reply("running tests on %s" % where) if threaded: thread = start_new_thread(self.dorun, (bot, event, tests, where, plug)) threads.append(thread) else: self.dorun(bot, event, tests, where, plug) for thread in threads: thread.join() event.done() def sleep(self, seconds): """ sleep nr of seconds. """ time.sleep(seconds) return self
expect = {} tests = Tests()