gozerbot.plugs.throttle

gozerbot throggle plugin.

gozerbot.plugs.throttle.handle_throttleget(bot, ievent)

<nick> - get the throttle state of a user.

gozerbot.plugs.throttle.handle_throttlelist(bot, ievent)

show the throttle list.

gozerbot.plugs.throttle.handle_throttleremove(bot, ievent)

untrottle a user.

gozerbot.plugs.throttle.handle_throttleset(bot, ievent)

<nick> <commands per minute> - set throttle parameters of an user.

gozerbot.plugs.throttle.init()
gozerbot.plugs.throttle.throttlecb(bot, ievent)

check the ievent if it needs to be throttled.

gozerbot.plugs.throttle.throttlepre(bot, ievent)

CODE

# gozerbot/plugs/throttle.py
#
#

""" gozerbot throggle plugin. """

__status__ = "seen"

gozerbot imports

from gozerbot.persist.persiststate import PlugState
from gozerbot.persist.persistconfig import PersistConfig
from gozerbot.callbacks import callbacks
from gozerbot.commands import cmnds
from gozerbot.examples import examples
from gozerbot.plughelp import plughelp
from gozerbot.users import users
from gozerbot.generic import getwho, rlog
from gozerbot.tests import tests
from gozerbot.datadir import makedirs

basic imports

import time

plughelp

plughelp.add('throttle', 'throttle user commands per minute')

defines

makedirs()

state = PlugState()
state.define('lasttime', {})
state.define('level', {})
state.define('cpm', {}) # commands per minute

cfg = PersistConfig()
cfg.define('enable', 1)

throttlepre precondition

def throttlepre(bot, ievent):
    if cfg.get('enable'): return 1

throttlecb callback

def throttlecb(bot, ievent):
    """ check the ievent if it needs to be throttled. """
    try:
        cpm = state['cpm']
        uh = ievent.userhost
        if ievent.usercmnd:
            if cpm.has_key(uh): cpm[uh] += 1
            else: cpm[uh] = 1
        if uh in bot.throttle:
            if time.time() - state['lasttime'][uh] > 60:
                bot.throttle.remove(uh)
                cpm[uh] = 0
                ievent.reply('next command is unthrottled (%s)' % uh)
                rlog(10, 'throttle', 'unignoring %s' % uh)
        else:
            if ievent.usercmnd and (cpm[uh] > state['level'][uh]):
                perms = users.getperms(uh)
                if perms and 'OPER' in perms: cpm[uh] = 0 ; return
                bot.throttle.append(uh)
                ievent.reply('%s throttled' % uh)
                rlog(10, 'throttle', 'ignoring %s' % uh)
                state['lasttime'][uh] = time.time()
    except (ValueError, KeyError): pass

plugin initialisation

def init():
    callbacks.add('PRIVMSG', throttlecb, throttlepre)

throttle-get command

def handle_throttleget(bot, ievent):
    """ <nick> - get the throttle state of a user. """
    if not ievent.rest: ievent.missing("<nick>") ; return
    nick = ievent.rest
    uh = getwho(bot, nick)
    if not uh: ievent.reply("can't find userhost of %s" % nick) ; return
    try: ievent.reply("cpm of %s is %s" % (uh, state['level'][uh]))
    except KeyError: pass

cmnds.add('throttle-get', handle_throttleget, 'OPER')
examples.add('throttle-get', 'get commands per minute of <nick>', 'throttle-get dunker')
tests.add('throttle-get exec')

throttle-set command

def handle_throttleset(bot, ievent):
    """ <nick> <commands per minute> - set throttle parameters of an user. """
    try: (nick, cpm) = ievent.args
    except ValueError: ievent.missing('<nick> <commands per minute>') ; return
    uh = getwho(bot, nick)
    if not uh: ievent.reply("can't find userhost of %s" % nick) ; return
    perms = users.getperms(uh)
    if perms and 'OPER' in perms: ievent.reply("can't throttle a OPER") ; return
    try:
        cpm = float(cpm)
        if cpm == 0: ievent.reply("cpm can't be zero") ; return
        state['level'][uh] = cpm
        state['cpm'][uh] = 0
        state.save()
    except ValueError: ievent.reply('%s is not an integer' % cpm) ; return
    try: bot.throttle.remove(uh)
    except ValueError: pass
    ievent.reply('cpm set to %s for %s' % (cpm, uh))

cmnds.add('throttle-set', handle_throttleset, 'OPER')
examples.add('throttle-set', 'set allowed <cpm> commands per minute for <nick>' , 'throttle-set dunker 10')
tests.add('throttle-set mekker 10')

throttle-remove

def handle_throttleremove(bot, ievent):
    """ untrottle a user. """
    if not ievent.rest: ievent.missing('<nick>') ; return
    uh = getwho(bot, ievent.rest)
    if not uh: ievent.reply("can't find userhost of %s" % ievent.rest) ; return
    try:
        bot.throttle.remove(uh)
        state['cpm'][uh] = 0
        state.save()
        ievent.reply('throttle on %s removed' % uh)
    except (KeyError, ValueError): pass

cmnds.add('throttle-remove', handle_throttleremove, 'OPER')
examples.add('throttle-remove', 'remove throttle from <nick>', 'throttle-remove dunker')
tests.add('throttle-remove exec')

throttle-list command

def handle_throttlelist(bot, ievent):
    """ show the throttle list. """
    ievent.reply(bot.throttle)

cmnds.add('throttle-list', handle_throttlelist, 'OPER')
tests.add('throttle-list')