gozerbot.utils.generic

generic functions

gozerbot.utils.generic.cchar(bot, ievent)

determine the channels control character.

gozerbot.utils.generic.checkchan(bot, item)

check for ‘chan #dunkbots’ like string.

gozerbot.utils.generic.checkpermissions(ddir, umode)

check whether permissions on a dir are set to umode.

gozerbot.utils.generic.cleanpyc()

remove all .pyc files.

gozerbot.utils.generic.cleanpycfile(filename)

remove filename’s .pyc file.

gozerbot.utils.generic.copyfile(src, target)

copy file from src to target.

gozerbot.utils.generic.decodeperchar(txt, encoding='utf-8', what='')

decode a string character for character, ignoring those that cannot be decoded.

gozerbot.utils.generic.die()

stop the bot.

gozerbot.utils.generic.dosed(filename, sedstring)

apply sedstring to filename.

gozerbot.utils.generic.filesize(path)

return the file size of a file.

gozerbot.utils.generic.fix_format(s)

make sure bold char are in balance.

gozerbot.utils.generic.fromenc(txt, encoding='utf-8', what='')

convert from enconding.

gozerbot.utils.generic.gethighest(ddir, ffile)

return a filename with increasing numbering.

gozerbot.utils.generic.getlistensocket(listenip)

search for a free socket to listen on.

gozerbot.utils.generic.getrandomnick()

return a pseudo random nick.

gozerbot.utils.generic.getversion()

return the version of the bot.

gozerbot.utils.generic.getwho(bot, who)

get userhost from bots userhost cache

class gozerbot.utils.generic.istr

Bases: str

str as a class (can set attributes on them).

gozerbot.utils.generic.jabberstrip(text, allowed=[])

remove unwanted chars from string.

gozerbot.utils.generic.jsonstring(s)

convert tuples into lists so its is json compatible.

gozerbot.utils.generic.loadeggs(eggdir, version=None)

load all eggs in the eggdir.

gozerbot.utils.generic.makeargrest(ievent)

create ievent.args and ievent.rest.

gozerbot.utils.generic.makedirs(datadir)

make datadir sub directories.

gozerbot.utils.generic.makeoptions(ievent, options={})
gozerbot.utils.generic.plugfile(datadir)

return the plugin file.

gozerbot.utils.generic.plugnames(dirname)

return all plugin names in a directory.

gozerbot.utils.generic.setdefenc(encoding)

set the default encoding.

gozerbot.utils.generic.splittxt(what, l=375)

split a string into seperate portions.

gozerbot.utils.generic.stringinlist(s, l)

check whether (sub) string in a list.

gozerbot.utils.generic.stripbold(s)

remove bold chars from string.

gozerbot.utils.generic.stripident(userhost)

strip ident char from userhost

gozerbot.utils.generic.stripidents(ulist)

strip ident char from list of userhosts

gozerbot.utils.generic.stripped(userhost)

remove everything after the /

gozerbot.utils.generic.strippedtxt(what, allowed=[])

strip not allowed chars from the string.

gozerbot.utils.generic.toascii(what)

convert to ascii.

gozerbot.utils.generic.toenc(what, encoding='utf-8')

convert to encoding.

gozerbot.utils.generic.tolatin1(what)

convert to latin-1.

gozerbot.utils.generic.touch(fname)

touch a file.

gozerbot.utils.generic.uniqlist(l)

filter duplicate elements from list.

gozerbot.utils.generic.waitforqueue(queue, timeout=10, maxitems=None)

wait for a queue to be filled with results.

gozerbot.utils.generic.waitforuser(bot, userhost, timeout=15)

wait for output from the user.

CODE

# gozerbot/utils/generic.py
#
#

""" generic functions """

__status__ = "seen"

gozerbot imports

from gozerbot.persist.persist import Persist
from exception import handle_exception
from trace import calledfrom
from stat import ST_UID, ST_MODE, S_IMODE
from gozerbot.config import config
from log import rlog
from lazydict import LazyDict

simplejson import

from simplejson import dumps

basic imports

import time
import sys
import re
import getopt
import types
import os
import random
import socket
import Queue

loadeggs function

def loadeggs(eggdir, version=None):
    """ load all eggs in the eggdir. """
    rlog(10, 'booting', 'scanning %s' % eggdir)
    for f in os.listdir(eggdir):
        if f.endswith('.egg'):
            rlog(10, 'booting', 'loading %s' % f)
            try:
                from pkg_resources import require
                require("%s>0.0")
            except ImportEror: sys.path.insert(0, eggdir + os.sep + f)

jsonstring function

def jsonstring(s):
    """ convert tuples into lists so its is json compatible. """
    if type(s) == types.TupleType: s = list(s)
    return dumps(s)

stripident function

def stripident(userhost):
    """ strip ident char from userhost """
    try: userhost.getNode() ; return str(userhost)
    except AttributeError:  pass
    if not userhost: return None
    if userhost[0] in "~-+^": userhost = userhost[1:]
    elif userhost[1] == '=': userhost = userhost[2:]
    return userhost

stripidents function

def stripidents(ulist):
    """ strip ident char from list of userhosts """
    result = []
    for userhost in ulist: result.append(stripident(userhost))
    return result

makedirs function

def makedirs(datadir):
    """ make datadir sub directories. """
    if not os.path.isdir(datadir): os.mkdir(datadir)
    if not os.path.isdir(datadir + '/states/'): os.mkdir(datadir + '/states/')
    if not os.path.isdir(datadir + '/db/'): os.mkdir(datadir + '/db/')
    if not os.path.isdir(datadir + '/configs/'): os.mkdir(datadir + '/configs/')

cleanpyc function

def cleanpyc():
    """ remove all .pyc files. """
    removed = []
    try:
        files = os.listdir('gplugs')
        for file in files:
            if file.endswith('.pyc'):
                os.unlink('gplugs' + os.sep + file)
                removed.append(file)
    except: pass
    try:
        files = os.listdir('gplugs/olddb')
        for file in files:
            if file.endswith('.pyc'):
                os.unlink('gplugs/olddb' + os.sep + file)
                removed.append(file)
    except: pass
    try:
        files = os.listdir('gplugs/alchemy')
        for file in files:
            if file.endswith('.pyc'):
                os.unlink('gplugs/alchemy' + os.sep + file)
                removed.append(file)
    except: pass
    return removed

cleanpycfile function

def cleanpycfile(filename):
    """ remove filename's .pyc file. """
    try:
        if filename.endswith('.pyc'):
            os.unlink(filename)
            rlog(10, 'generic', 'cleaned %s' % filename)
    except: pass

getversion function

def getversion():
    """ return the version of the bot. """
    version = config['version']
    driver = config.get("db_driver")
    if driver: driver = driver.upper()
    version += ' ' + config['dbtype'].upper() + ' ' + driver or ""
    return version

makeoptions function

def makeoptions(ievent, options={}):
    options = LazyDict(options)
    try:
        optargs = ""
        optlist = []
        if not options.has_key('--filter'): options['--filter'] = ""
        if not options.has_key('--to'): options['--to'] = None
        if not options.has_key('--chan'): options['--chan'] = ievent.channel
        if not options.has_key('--how'): options['--how'] = "msg"
        if not options.has_key('--speed'): options['--speed'] = str(ievent.speed)
        else: options['--speed'] = str(options['--speed'])
        for i, j in options.iteritems():
            if '--' in i:
                optlist.append("%s=" % i[2:])
                if j: optlist.append(j)
                continue
            if '-' in i:
                if j: optargs += ":%s" % i[1:]
                else: optargs += i[1:]
        args = ievent.txt.split()
        try: (opts, rest) = getopt.getopt(args[1:], optargs, optlist)
        except AttributeError, ex:
            print "option not allowed: %s" % str(ex), ievent.txt, options
            return 0
        except getopt.GetoptError, ex: return 0
        if opts:
             for item in opts: ievent.optionset.append(item[0])
        o = dict(options)
        o.update(dict(opts))
        try:
            filter = o['--filter']
            if filter and filter not in ievent.filter: ievent.filter.append(filter)
        except KeyError: pass
        try:
            speed = o['--speed']
            ievent.speed = int(speed)
        except KeyError: pass
        try: ievent.channel = o['--chan'] or ievent.channel
        except KeyError: pass
        ievent.options.update(o)
        if args: ievent.txt = args[0] + ' ' + ' '.join(rest)
        makeargrest(ievent)
    except Exception, ex: handle_exception() ; return
    return ievent.options

makeargrest function

def makeargrest(ievent):
    """ create ievent.args and ievent.rest. """
    try: ievent.args = ievent.txt.split()[1:]
    except ValueError: ievent.args = []
    try: cmnd, ievent.rest = ievent.txt.split(' ', 1)
    except ValueError: ievent.rest = ""
    ievent.command = ievent.txt.split(' ')[0]

setdefenc function

def setdefenc(encoding):
    """ set the default encoding. """
    import sys
    reload(sys)
    sys.setdefaultencoding(encoding)

plugfile function

def plugfile(datadir):
    """ return the plugin file. """
    return datadir + os.sep + calledfrom(sys._getframe())

cchar function

def cchar(bot, ievent):
    """ determine the channels control character. """
    try: cchar = bot.channels[ievent.channel]['cc']
    except LookupError: cchar = config['defaultcc'] or '!'
    except TypeError: cchar = config['defaultcc'] or '!'
    return cchar

splittxt function

def splittxt(what, l=375):
    """ split a string into seperate portions. """
    txtlist = []
    start = 0
    end = l
    length = len(what)
    for i in range(length/end+1):
        endword = what.find(' ', end)
        if endword == -1: endword = length
        res = what[start:endword]
        if res: txtlist.append(res)
        start = endword
        end = start + l
    return txtlist

istr class

class istr(str):
    """ str as a class (can set attributes on them). """
    pass

die function

def die():
    """ stop the bot. """
    os.kill(os.getpid(), 9)

getlistensocket function

def getlistensocket(listenip):
    """ search for a free socket to listen on. """
    port = 5000
    while 1:
        time.sleep(0.01)
        try:
            port += 1
            if ':' in listenip: sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
            else: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            sock.setblocking(1)
            if port > 65000: port = 5000
            sock.bind((listenip, port))
            sock.listen(1)
            return (port, sock)
        except Exception, ex: pass

checkchan function

def checkchan(bot, item):
    """ check for 'chan #dunkbots' like string. """
    chanre = re.search(' chan (\S+)', item)
    if chanre:
        chan = str(chanre.group(1))
        item = re.sub(' chan ' + re.escape(chan), '', item)
        return (chan.lower(), item)

getwho function

def getwho(bot, who):
    """ get userhost from bots userhost cache """
    try:
        result = bot.userhosts.data[who]
        if bot.cfg['stripident']:
            rlog(10, 'getwho', 'removed ident from %s' % result)
            result = stripident(result)
        return result
    except KeyError: return None

waitforuser function

def waitforuser(bot, userhost, timeout=15):
    """ wait for output from the user. """
    queue = Queue.Queue()
    waitnr = bot.privwait.register(userhost, queue, timeout)
    result = queue.get()
    bot.privwait.delete(waitnr)
    return result

getrandomnick function

def getrandomnick():
    """ return a pseudo random nick. """
    return "gbot" + str(random.randint(0, 100))

waitforqueue function

def waitforqueue(queue, timeout=10, maxitems=None):
    """ wait for a queue to be filled with results. """
    result = []
    while 1:
        try: res = queue.get(1, timeout)
        except Queue.Empty: break
        if not res: break
        result.append(res)
        if maxitems and len(result) == maxitems: break
    return result

decodeperchar function

def decodeperchar(txt, encoding='utf-8', what=""):
    """ decode a string character for character, ignoring those that cannot be decoded. """
    res = []
    nogo = []
    for i in txt:
        try: res.append(i.decode(encoding))
        except UnicodeDecodeError:
            if i not in nogo: nogo.append(i)
            continue
    if nogo:
        if what: rlog(10, 'generic', "%s: can't decode %s characters to %s" % (what, nogo, encoding))
        else: rlog(10, 'generic', "can't decode %s characters to %s" % (nogo, encoding))
    return u"".join(res)

toenc function

def toenc(what, encoding='utf-8'):
    """ convert to encoding. """
    if not what: return u""
    try:
        w = unicode(what)
        return w.encode(encoding)
    except UnicodeEncodeError:
        rlog(10, 'generic', "can't encode %s to %s" % (what, encoding))
        return u""

fromenc function

def fromenc(txt, encoding='utf-8', what=""):
    """ convert from enconding. """
    if not txt: return u""
    try:
        if type(txt) == types.UnicodeType: t = txt.encode(encoding)
        t = unicode(txt)
        return unicode(t.decode(encoding))
    except UnicodeDecodeError:
        return decodeperchar(txt, encoding, what)

toascii function

def toascii(what):
    """ convert to ascii. """
    what = what.encode('ascii', 'replace')
    return what

tolatin1 function

def tolatin1(what):
    """ convert to latin-1. """
    what = what.encode('latin-1', 'replace')
    return what

strippedtxt function

def strippedtxt(what, allowed=[]):
    """ strip not allowed chars from the string. """
    txt = []
    allowed = allowed + ['\001', '\002', '\003', '\t']
    for i in what:
        if ord(i) > 31 or (allowed and i in allowed): txt.append(i)
    return ''.join(txt)

uniqlist function

def uniqlist(l):
    """ filter duplicate elements from list. """
    result = []
    for i in l:
        j = i.strip()
        if j not in result: result.append(j)
    return result

fix_format function

def fix_format(s):
    """ make sure bold char are in balance. """
    counters = {
        chr(2): 0,
        chr(3): 0
        }
    for letter in s:
        if letter in counters: counters[letter] += 1
    for char in counters:
        if counters[char] % 2: s += char
    return s

stripbold function

def stripbold(s):
    """ remove bold chars from string. """
    s = s.replace(chr(2), '')
    s = s.replace(chr(3), '')
    return s

jabberstrip function

def jabberstrip(text, allowed=[]):
    """ remove unwanted chars from string. """
    txt = []
    allowed = allowed + ['\n', '\t']
    for i in text:
        if ord(i) > 31 or (allowed and i in allowed): txt.append(i)
    return ''.join(txt)

plugnames function

def plugnames(dirname):
    """ return all plugin names in a directory. """
    result = []
    if not os.path.isdir(dirname): return []
    for i in os.listdir(dirname):
        if os.path.isdir(dirname + os.sep + i):
            if os.path.exists(dirname + os.sep + i + os.sep + '__init__.py'): result.append(i)
        elif i.endswith('.py'): result.append(i[:-3])
    try: result.remove('__init__')
    except: pass
    return result

filesize function

def filesize(path):
    """ return the file size of a file. """
    return os.stat(path)[6]

touch function

def touch(fname):
    """ touch a file. """
    fd = os.open(fname, os.O_WRONLY | os.O_CREAT)
    os.close(fd)

stringinlist function

def stringinlist(s, l):
    """ check whether (sub) string in a list. """
    for i in l:
        if s in i:
            return 1

stripped function

def stripped(userhost):
    """ remove everything after the / """
    return userhost.split('/')[0]

checkpermission function

def checkpermissions(ddir, umode):
    """ check whether permissions on a dir are set to umode. """
    try:
        uid = os.getuid()
        gid = os.getgid()
    except AttributeError: return
    try: stat = os.stat(ddir)
    except OSError: return
    if stat[ST_UID] != uid:
        try: os.chown(ddir, uid, gid)
        except: pass
    if S_IMODE(stat[ST_MODE]) != umode:
        try: os.chmod(ddir, umode)
        except: handle_exception()

gethighest function

def gethighest(ddir, ffile):
    """ return a filename with increasing numbering. """
    highest = 0
    for i in os.listdir(ddir):
        if os.path.isdir(ddir + os.sep + i) and ffile in i:
            try: seqnr = i.split('.')[2]
            except IndexError: continue
            try:
                if int(seqnr) > highest: highest = int(seqnr)
            except ValueError: pass
    ffile += '.' + str(highest + 1)
    return ffile

copyfile function

def copyfile(src, target):
    """ copy file from src to target. """
    try:
        pdir = os.sep.join(target.split(os.sep)[:-1])
        if not pdir: print "can't determine parent dir of %s" % src ; return
        if not os.path.isdir(pdir): print "making %s dir" % pdir ; os.mkdir(pdir)
        print "copying %s to %s" % (src, target)
        shutil.copy(src, target)
    except IOError, ex: return
    except Exception, ex: handle_exception()

dosed function

def dosed(filename, sedstring):
    """ apply sedstring to filename. """
    try: f = open(filename, 'r')
    except IOError, ex:
        if 'Is a dir' in str(ex): return
        else: raise
    tmp = filename + '.tmp'
    fout = open(tmp, 'w')
    seds = sedstring.split('/')
    fr = seds[1].replace('\\', '')
    to = seds[2].replace('\\', '')
    try:
        for line in f: l = line.replace(fr,to) ; fout.write(l)
    finally: fout.flush() ; fout.close()
    try: os.rename(tmp, filename)
    except WindowsError:
        os.remove(filename)
        os.rename(tmp, filename)