threads management to run jobs.
Bases: gozerbot.runner.Runner
schedule a bot command.
Bases: gozerbot.threads.threadloop.RunnerLoop
a runner is a thread with a queue on which jobs can be pushed. jobs scheduled should not take too long since only one job can be executed in a Runner at the same time.
schedule a job.
Bases: object
runners is a collection of runner objects.
clean up idle runners.
create a new runner.
put a job on a free runner.
return sizes of runner objects.
return list of running jobs.
overload this if needed.
stop runners.
# gozerbot/runner.py # # """ threads management to run jobs. """ __status__ = "seen"
from gozerbot.threads.thr import getname, start_new_thread, start_bot_command from gozerbot.utils.generic import handle_exception from gozerbot.utils.trace import callstack from gozerbot.threads.threadloop import RunnerLoop from gozerbot.utils.log import rlog
import Queue import time import thread import random import logging import sys
class Runner(RunnerLoop): """ a runner is a thread with a queue on which jobs can be pushed. jobs scheduled should not take too long since only one job can be executed in a Runner at the same time. """ def __init__(self, name="runner", doready=True): RunnerLoop.__init__(self, name) self.working = False self.starttime = time.time() self.elapsed = self.starttime self.finished = time.time() self.doready = doready def handle(self, descr, func, *args, **kwargs): """ schedule a job. """ self.working = True name = getname(str(func)) try: name = getname(str(func)) self.name = name rlog(9, "runner", 'running %s: %s' % (descr, name)) self.starttime = time.time() func(*args, **kwargs) self.finished = time.time() self.elapsed = self.finished - self.starttime except Exception, ex: handle_exception() self.working = False
class BotEventRunner(Runner): def handle(self, descr, func, bot, ievent, *args, **kwargs): """ schedule a bot command. """ try: self.starttime = time.time() name = getname(str(func)) self.name = name self.working = True rlog(9, "runner", "now running %s" % name) func(bot, ievent, *args, **kwargs) for queue in ievent.queues: queue.put_nowait(None) self.finished = time.time() self.elapsed = self.finished - self.starttime except Exception, ex: handle_exception(ievent) self.working = False self.name = "finished"
class Runners(object): """ runners is a collection of runner objects. """ def __init__(self, max=100, runnertype=Runner, doready=True): self.max = max self.runners = [] self.runnertype = runnertype self.doready = doready def runnersizes(self): """ return sizes of runner objects. """ result = [] for runner in self.runners: result.append("%s - %s" % (runner.queue.qsize(), runner.name)) return result def stop(self): """ stop runners. """ for runner in self.runners: runner.stop() def start(self): """ overload this if needed. """ pass def put(self, *data): """ put a job on a free runner. """ for runner in self.runners: if not runner.queue.qsize(): runner.put(*data) ; return runner = self.makenew() runner.put(*data) def running(self): """ return list of running jobs. """ result = [] for runner in self.runners: if runner.queue.qsize(): result.append(runner.nowrunning) return result def makenew(self): """ create a new runner. """ runner = None for i in self.runners: if not i.queue.qsize(): return i if len(self.runners) < self.max: runner = self.runnertype(self.doready) runner.start() self.runners.append(runner) else: runner = random.choice(self.runners) return runner def cleanup(self): """ clean up idle runners. """ for index in range(len(self.runners)-1, -1, -1): runner = self.runners[index] if not runner.queue.qsize(): try: runner.stop() ; del self.runners[index] except IndexError: pass except: handle_exception()
cmndrunners = defaultrunner = longrunner = Runners(50, BotEventRunner) cbrunners = Runners(100, BotEventRunner, doready=False) waitrunners = Runners(10, Runner)
def runners_start(): pass def runners_stop(): pass