generic functions
determine the channels control character.
check for ‘chan #dunkbots’ like string.
check whether permissions on a dir are set to umode.
remove all .pyc files.
remove filename’s .pyc file.
copy file from src to target.
decode a string character for character, ignoring those that cannot be decoded.
stop the bot.
apply sedstring to filename.
return the file size of a file.
make sure bold char are in balance.
convert from enconding.
return a filename with increasing numbering.
search for a free socket to listen on.
return a pseudo random nick.
return the version of the bot.
get userhost from bots userhost cache
Bases: str
str as a class (can set attributes on them).
remove unwanted chars from string.
convert tuples into lists so its is json compatible.
load all eggs in the eggdir.
create ievent.args and ievent.rest.
make datadir sub directories.
return the plugin file.
return all plugin names in a directory.
set the default encoding.
split a string into seperate portions.
check whether (sub) string in a list.
remove bold chars from string.
strip ident char from userhost
strip ident char from list of userhosts
remove everything after the /
strip not allowed chars from the string.
convert to ascii.
convert to encoding.
convert to latin-1.
touch a file.
filter duplicate elements from list.
wait for a queue to be filled with results.
wait for output from the user.
# gozerbot/utils/generic.py # # """ generic functions """ __status__ = "seen"
from gozerbot.persist.persist import Persist from exception import handle_exception from trace import calledfrom from stat import ST_UID, ST_MODE, S_IMODE from gozerbot.config import config from log import rlog from lazydict import LazyDict
from simplejson import dumps
import time import sys import re import getopt import types import os import random import socket import Queue
def loadeggs(eggdir, version=None): """ load all eggs in the eggdir. """ rlog(10, 'booting', 'scanning %s' % eggdir) for f in os.listdir(eggdir): if f.endswith('.egg'): rlog(10, 'booting', 'loading %s' % f) try: from pkg_resources import require require("%s>0.0") except ImportEror: sys.path.insert(0, eggdir + os.sep + f)
def jsonstring(s): """ convert tuples into lists so its is json compatible. """ if type(s) == types.TupleType: s = list(s) return dumps(s)
def stripident(userhost): """ strip ident char from userhost """ try: userhost.getNode() ; return str(userhost) except AttributeError: pass if not userhost: return None if userhost[0] in "~-+^": userhost = userhost[1:] elif userhost[1] == '=': userhost = userhost[2:] return userhost
def stripidents(ulist): """ strip ident char from list of userhosts """ result = [] for userhost in ulist: result.append(stripident(userhost)) return result
def makedirs(datadir): """ make datadir sub directories. """ if not os.path.isdir(datadir): os.mkdir(datadir) if not os.path.isdir(datadir + '/states/'): os.mkdir(datadir + '/states/') if not os.path.isdir(datadir + '/db/'): os.mkdir(datadir + '/db/') if not os.path.isdir(datadir + '/configs/'): os.mkdir(datadir + '/configs/')
def cleanpyc(): """ remove all .pyc files. """ removed = [] try: files = os.listdir('gplugs') for file in files: if file.endswith('.pyc'): os.unlink('gplugs' + os.sep + file) removed.append(file) except: pass try: files = os.listdir('gplugs/olddb') for file in files: if file.endswith('.pyc'): os.unlink('gplugs/olddb' + os.sep + file) removed.append(file) except: pass try: files = os.listdir('gplugs/alchemy') for file in files: if file.endswith('.pyc'): os.unlink('gplugs/alchemy' + os.sep + file) removed.append(file) except: pass return removed
def cleanpycfile(filename): """ remove filename's .pyc file. """ try: if filename.endswith('.pyc'): os.unlink(filename) rlog(10, 'generic', 'cleaned %s' % filename) except: pass
def getversion(): """ return the version of the bot. """ version = config['version'] driver = config.get("db_driver") if driver: driver = driver.upper() version += ' ' + config['dbtype'].upper() + ' ' + driver or "" return version
def makeoptions(ievent, options={}): options = LazyDict(options) try: optargs = "" optlist = [] if not options.has_key('--filter'): options['--filter'] = "" if not options.has_key('--to'): options['--to'] = None if not options.has_key('--chan'): options['--chan'] = ievent.channel if not options.has_key('--how'): options['--how'] = "msg" if not options.has_key('--speed'): options['--speed'] = str(ievent.speed) else: options['--speed'] = str(options['--speed']) for i, j in options.iteritems(): if '--' in i: optlist.append("%s=" % i[2:]) if j: optlist.append(j) continue if '-' in i: if j: optargs += ":%s" % i[1:] else: optargs += i[1:] args = ievent.txt.split() try: (opts, rest) = getopt.getopt(args[1:], optargs, optlist) except AttributeError, ex: print "option not allowed: %s" % str(ex), ievent.txt, options return 0 except getopt.GetoptError, ex: return 0 if opts: for item in opts: ievent.optionset.append(item[0]) o = dict(options) o.update(dict(opts)) try: filter = o['--filter'] if filter and filter not in ievent.filter: ievent.filter.append(filter) except KeyError: pass try: speed = o['--speed'] ievent.speed = int(speed) except KeyError: pass try: ievent.channel = o['--chan'] or ievent.channel except KeyError: pass ievent.options.update(o) if args: ievent.txt = args[0] + ' ' + ' '.join(rest) makeargrest(ievent) except Exception, ex: handle_exception() ; return return ievent.options
def makeargrest(ievent): """ create ievent.args and ievent.rest. """ try: ievent.args = ievent.txt.split()[1:] except ValueError: ievent.args = [] try: cmnd, ievent.rest = ievent.txt.split(' ', 1) except ValueError: ievent.rest = "" ievent.command = ievent.txt.split(' ')[0]
def setdefenc(encoding): """ set the default encoding. """ import sys reload(sys) sys.setdefaultencoding(encoding)
def plugfile(datadir): """ return the plugin file. """ return datadir + os.sep + calledfrom(sys._getframe())
def cchar(bot, ievent): """ determine the channels control character. """ try: cchar = bot.channels[ievent.channel]['cc'] except LookupError: cchar = config['defaultcc'] or '!' except TypeError: cchar = config['defaultcc'] or '!' return cchar
def splittxt(what, l=375): """ split a string into seperate portions. """ txtlist = [] start = 0 end = l length = len(what) for i in range(length/end+1): endword = what.find(' ', end) if endword == -1: endword = length res = what[start:endword] if res: txtlist.append(res) start = endword end = start + l return txtlist
class istr(str): """ str as a class (can set attributes on them). """ pass
def die(): """ stop the bot. """ os.kill(os.getpid(), 9)
def getlistensocket(listenip): """ search for a free socket to listen on. """ port = 5000 while 1: time.sleep(0.01) try: port += 1 if ':' in listenip: sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) else: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.setblocking(1) if port > 65000: port = 5000 sock.bind((listenip, port)) sock.listen(1) return (port, sock) except Exception, ex: pass
def checkchan(bot, item): """ check for 'chan #dunkbots' like string. """ chanre = re.search(' chan (\S+)', item) if chanre: chan = str(chanre.group(1)) item = re.sub(' chan ' + re.escape(chan), '', item) return (chan.lower(), item)
def getwho(bot, who): """ get userhost from bots userhost cache """ try: result = bot.userhosts.data[who] if bot.cfg['stripident']: rlog(10, 'getwho', 'removed ident from %s' % result) result = stripident(result) return result except KeyError: return None
def waitforuser(bot, userhost, timeout=15): """ wait for output from the user. """ queue = Queue.Queue() waitnr = bot.privwait.register(userhost, queue, timeout) result = queue.get() bot.privwait.delete(waitnr) return result
def getrandomnick(): """ return a pseudo random nick. """ return "gbot" + str(random.randint(0, 100))
def waitforqueue(queue, timeout=10, maxitems=None): """ wait for a queue to be filled with results. """ result = [] while 1: try: res = queue.get(1, timeout) except Queue.Empty: break if not res: break result.append(res) if maxitems and len(result) == maxitems: break return result
def decodeperchar(txt, encoding='utf-8', what=""): """ decode a string character for character, ignoring those that cannot be decoded. """ res = [] nogo = [] for i in txt: try: res.append(i.decode(encoding)) except UnicodeDecodeError: if i not in nogo: nogo.append(i) continue if nogo: if what: rlog(10, 'generic', "%s: can't decode %s characters to %s" % (what, nogo, encoding)) else: rlog(10, 'generic', "can't decode %s characters to %s" % (nogo, encoding)) return u"".join(res)
def toenc(what, encoding='utf-8'): """ convert to encoding. """ if not what: return u"" try: w = unicode(what) return w.encode(encoding) except UnicodeEncodeError: rlog(10, 'generic', "can't encode %s to %s" % (what, encoding)) return u""
def fromenc(txt, encoding='utf-8', what=""): """ convert from enconding. """ if not txt: return u"" try: if type(txt) == types.UnicodeType: t = txt.encode(encoding) t = unicode(txt) return unicode(t.decode(encoding)) except UnicodeDecodeError: return decodeperchar(txt, encoding, what)
def toascii(what): """ convert to ascii. """ what = what.encode('ascii', 'replace') return what
def tolatin1(what): """ convert to latin-1. """ what = what.encode('latin-1', 'replace') return what
def strippedtxt(what, allowed=[]): """ strip not allowed chars from the string. """ txt = [] allowed = allowed + ['\001', '\002', '\003', '\t'] for i in what: if ord(i) > 31 or (allowed and i in allowed): txt.append(i) return ''.join(txt)
def uniqlist(l): """ filter duplicate elements from list. """ result = [] for i in l: j = i.strip() if j not in result: result.append(j) return result
def fix_format(s): """ make sure bold char are in balance. """ counters = { chr(2): 0, chr(3): 0 } for letter in s: if letter in counters: counters[letter] += 1 for char in counters: if counters[char] % 2: s += char return s
def stripbold(s): """ remove bold chars from string. """ s = s.replace(chr(2), '') s = s.replace(chr(3), '') return s
def jabberstrip(text, allowed=[]): """ remove unwanted chars from string. """ txt = [] allowed = allowed + ['\n', '\t'] for i in text: if ord(i) > 31 or (allowed and i in allowed): txt.append(i) return ''.join(txt)
def plugnames(dirname): """ return all plugin names in a directory. """ result = [] if not os.path.isdir(dirname): return [] for i in os.listdir(dirname): if os.path.isdir(dirname + os.sep + i): if os.path.exists(dirname + os.sep + i + os.sep + '__init__.py'): result.append(i) elif i.endswith('.py'): result.append(i[:-3]) try: result.remove('__init__') except: pass return result
def filesize(path): """ return the file size of a file. """ return os.stat(path)[6]
def touch(fname): """ touch a file. """ fd = os.open(fname, os.O_WRONLY | os.O_CREAT) os.close(fd)
def stringinlist(s, l): """ check whether (sub) string in a list. """ for i in l: if s in i: return 1
def stripped(userhost): """ remove everything after the / """ return userhost.split('/')[0]
def checkpermissions(ddir, umode): """ check whether permissions on a dir are set to umode. """ try: uid = os.getuid() gid = os.getgid() except AttributeError: return try: stat = os.stat(ddir) except OSError: return if stat[ST_UID] != uid: try: os.chown(ddir, uid, gid) except: pass if S_IMODE(stat[ST_MODE]) != umode: try: os.chmod(ddir, umode) except: handle_exception()
def gethighest(ddir, ffile): """ return a filename with increasing numbering. """ highest = 0 for i in os.listdir(ddir): if os.path.isdir(ddir + os.sep + i) and ffile in i: try: seqnr = i.split('.')[2] except IndexError: continue try: if int(seqnr) > highest: highest = int(seqnr) except ValueError: pass ffile += '.' + str(highest + 1) return ffile
def copyfile(src, target): """ copy file from src to target. """ try: pdir = os.sep.join(target.split(os.sep)[:-1]) if not pdir: print "can't determine parent dir of %s" % src ; return if not os.path.isdir(pdir): print "making %s dir" % pdir ; os.mkdir(pdir) print "copying %s to %s" % (src, target) shutil.copy(src, target) except IOError, ex: return except Exception, ex: handle_exception()
def dosed(filename, sedstring): """ apply sedstring to filename. """ try: f = open(filename, 'r') except IOError, ex: if 'Is a dir' in str(ex): return else: raise tmp = filename + '.tmp' fout = open(tmp, 'w') seds = sedstring.split('/') fr = seds[1].replace('\\', '') to = seds[2].replace('\\', '') try: for line in f: l = line.replace(fr,to) ; fout.write(l) finally: fout.flush() ; fout.close() try: os.rename(tmp, filename) except WindowsError: os.remove(filename) os.rename(tmp, filename)