this module contains the core xmpp handling functions.
Bases: gozerbot.utils.lazydict.LazyDict
dict to store xml stanza attributes.
get a attribute by name.
convert to string.
convert the dictionary to xml.
Bases: gozerbot.contrib.xmlstream.NodeBuilder
XMLStream.
add a namespace handler.
connect to the server.
delete a namespace handler.
handler called on disconnect.
dispatch a dom to the appropiate handler.
convert the socket to a ssl connection.
stop the stream handling.
finish processing of an xml stanza.
get a namespace handler.
node data handler.
default stream error handler.
default stream error handler.
default stream error handler.
called upon logon on the server.
handle one xml stanza.
reconnect to the server.
handler called by the self._parser on start of a unknown endtag.
handler called by the self._parser on start of a unknown start tag.
# gozerbot/xmpp/core.py # # """ this module contains the core xmpp handling functions. """ __status__ = "seen"
from gozerbot.datadir import datadir from gozerbot.config import Config, config from gozerbot.utils.generic import toenc, jabberstrip from gozerbot.utils.log import rlog from gozerbot.utils.lazydict import LazyDict from gozerbot.utils.exception import handle_exception from gozerbot.utils.locking import lockdec from gozerbot.threads.thr import start_new_thread from gozerbot.utils.trace import whichmodule from gozerbot.contrib.xmlstream import NodeBuilder, XMLescape from namespace import attributes, subelements
import xml.parsers.expat import socket import os import time import copy import logging import thread
outlock = thread.allocate_lock() inlock = thread.allocate_lock() connectlock = thread.allocate_lock() outlocked = lockdec(outlock) inlocked = lockdec(inlock) connectlocked = lockdec(connectlock)
class XMLDict(LazyDict): """ dict to store xml stanza attributes. """ def __init__(self, input={}): if input == None: LazyDict.__init__(self) else: LazyDict.__init__(self, input) try: self['fromm'] = self['from'] except (KeyError, TypeError): self['fromm'] = '' def __getattr__(self, name): """ override getattribute so nodes in payload can be accessed. """ if not self.has_key(name) and self.has_key('subelements'): for i in self['subelements']: if name in i: return i[name] return LazyDict.__getattr__(self, name, default="") def get(self, name): """ get a attribute by name. """ if self.has_key('subelements'): for i in self['subelements']: if name in i: return i[name] if self.has_key(name): return self[name] return LazyDict() def toxml(self): """ convert the dictionary to xml. """ res = dict(self) if not res: raise Exception("%s .. toxml() can't convert empty dict" % self.name) rlog(-1, self.name, u"toxml input: %s" % res) elem = self['element'] main = "<%s" % self['element'] for attribute in attributes[elem]: if attribute in res: if res[attribute]: main += u" %s='%s'" % (attribute, XMLescape(res[attribute])) continue main += ">" gotsub = False if res.has_key('txt'): if res['txt']: main += u"<body>%s</body>" % XMLescape(res['txt']) gotsub = True for subelement in subelements[elem]: try: data = res[subelement] if data: main += "<%s>%s</%s>" % (subelement, XMLescape(data), subelement) gotsub = True except KeyError: pass if gotsub: main += "</%s>" % elem else: main = main[:-1] main += "/>" return main def str(self): """ convert to string. """ result = "" elem = self['element'] for item, value in dict(self).iteritems(): if item in attributes[elem] or item in subelements[elem] or item == 'txt': result += "%s='%s' " % (item, value) return result
class XMLStream(NodeBuilder): """ XMLStream. """ def __init__(self, host, port, name='gozerxmpp'): self.name = name self.connection = None self.stop = False self.result = LazyDict() self.final = LazyDict() self.subelements = [] self.reslist = [] self.cur = u"" self.tags = [] self.host = host self.port = port self.handlers = LazyDict() self.addHandler('message', self.handle_message) self.addHandler('presence', self.handle_presence) self.addHandler('iq', self.handle_iq) self.addHandler('stream:stream', self.handle_stream) self.addHandler('stream:error', self.handle_streamerror) self.addHandler('stream:features', self.handle_streamfeatures) def handle_stream(self, data): """ default stream error handler. """ rlog(1, self.name, "STREAM: %s" % data) def handle_streamerror(self, data): """ default stream error handler. """ rlog(10, self.name, "STREAMERROR: %s" % data) def handle_streamfeatures(self, data): """ default stream error handler. """ rlog(1, self.name, "STREAMFEATURES: %s" % data) def addHandler(self, namespace, func): """ add a namespace handler. """ self.handlers[namespace] = func def delHandler(self, namespace): """ delete a namespace handler. """ del self.handlers[namespace] def getHandler(self, namespace): """ get a namespace handler. """ try: return self.handlers[namespace] except KeyError: return None def loop_one(self, data): """ handle one xml stanza. """ NodeBuilder.__init__(self) self._dispatch_depth = 2 try: self._parser.Parse(data.strip()) except xml.parsers.expat.ExpatError, ex: if 'not well-formed' in str(ex): rlog(10, self.name, "data is not well formed: %s" % str(data)) return {} rlog(10, self.name, "ALERT: %s - %s" % (str(ex), data)) except Exception, ex: handle_exception() return {} return self.finish(data) @inlocked def _doprocess(self): """ proces all incoming data. """ rlog(1, self.name, 'starting readloop') self.buffer = "" while not self.stopped: try: data = self.connection.read() if data == "": rlog(10, self.name, 'remote disconnected') self.error = 'disconnected' self.disconnectHandler(Exception('remote %s disconnected' % self.host)) break if data: self.buffer += data else: continue self.buffer = self.buffer.strip() rlog(9, "<", self.buffer) if not self.loop_one(self.buffer): rlog(10, self.name, 'failed to process %s' % self.buffer) else: self.buffer = "" except xml.parsers.expat.ExpatError, ex: rlog(10, self.name, u"%s: %s" % (str(ex), data)) self.buffer = "" self.error = str(ex) self.disconnectHandler(ex) break except Exception, ex: handle_exception() self.error = str(ex) self.disconnectHandler(ex) break rlog(1, self.name, 'stopping readloop .. %s' % (self.error or 'error not set')) def _raw(self, stanza): """ output a xml stanza to the socket. """ try: if self.stopped: rlog(9, self.name, 'bot is stopped .. not sending') ; return what = toenc(jabberstrip(stanza.strip()), self.encoding) if not what.endswith('>') or not what.startswith('<'): rlog(100, self.name, 'invalid stanza: %s' % what) return if what.startswith('<stream') or what.startswith('<message') or what.startswith('<presence') or what.startswith('<iq'): try: self.connection.send(what + u"\r\n") except AttributeError: self.connection.write(what) rlog(9, ">", what) else: rlog(100, self.name, 'invalid stanza: %s' % what) except socket.error, ex: if 'Broken pipe' in str(ex): rlog(10, self.name, 'broken pipe .. ignoring') time.sleep(0.01) return self.error = str(ex) handle_exception() except Exception, ex: self.error = str(ex) handle_exception() def connect(self): """ connect to the server. """ if self.stopped: rlog(10, self.name, 'bot is stopped not connecting to %s' % self.host) ; return self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.settimeout(30) if not self.port: self.port = 5222 if self.server: self.host = self.server rlog(10, self.name, "connecting to %s:%s" % (self.host, self.port)) self.sock.connect((self.host, self.port)) self.sock.setblocking(False) self.sock.settimeout(60) time.sleep(1) rlog(10, self.name, "starting stream") self.sock.send('<stream:stream to="%s" xmlns="jabber:client" xmlns:stream="http://etherx.jabber.org/streams" version="1.0">\r\n' % self.user.split('@')[1]) time.sleep(3) result = self.sock.recv(1500) rlog(3, self.name, result) self.sock.send('<starttls xmlns="urn:ietf:params:xml:ns:xmpp-tls"/>\r\n') time.sleep(3) rlog(3, self.name, self.sock.recv(1500)) self.sock.settimeout(60) return self.dossl() def dossl(self): """ convert the socket to a ssl connection. """ try: import ssl self.connection = ssl.wrap_socket(self.sock) except ImportError: self.connection = socket.ssl(self.sock) if self.connection: return True else: return False def logon(self): """ called upon logon on the server. """ start_new_thread(self._doprocess, ()) def finish(self, data): """ finish processing of an xml stanza. """ self.final['subelements'] = self.subelements if self.tags: element = self.tags[0] rlog(3, self.name, "setting element: %s" % element) else: element = 'presence' self.final['element'] = element method = self.getHandler(element) if method: try: result = XMLDict(self.final) result.bot = self result.orig = data result.jabber = True method(result) except Exception, ex: handle_exception() result = {} else: rlog(10, self.name, "can't find handler for %s" % element) result = {} self.final = {} self.reslist = [] self.tags = [] self.subelements = [] return result def unknown_starttag(self, tag, attrs): """ handler called by the self._parser on start of a unknown start tag. """ NodeBuilder.unknown_starttag(self, tag, attrs) self.cur = tag if not self.tags: self.final.update(attrs) else: self.result[tag] = attrs self.tags.append(tag) def unknown_endtag(self, tag): """ handler called by the self._parser on start of a unknown endtag. """ NodeBuilder.unknown_endtag(self, tag) self.result = {} self.cur = u"" def handle_data(self, data): """ node data handler. """ NodeBuilder.handle_data(self, data) def dispatch(self, dom): """ dispatch a dom to the appropiate handler. """ res = LazyDict() parentname = dom.getName() data = dom.getData() if data: self.final[parentname] = data if parentname == 'body': self.final['txt'] = data attrs = dom.getAttributes() ns = dom.getNamespace() res[parentname] = LazyDict() res[parentname]['data'] = data res[parentname].update(attrs) if ns: res[parentname]['xmlns'] = ns for child in dom.getChildren(): name = child.getName() data = child.getData() if data: self.final[name] = data attrs = child.getAttributes() ns = child.getNamespace() res[parentname][name] = LazyDict() res[parentname][name]['data'] = data res[parentname][name].update(attrs) self.final.update(attrs) if ns: res[parentname][name]['xmlns'] = ns self.subelements.append(res) def exit(self): """ stop the stream handling. """ self.stopped = True def reconnect(self): """ reconnect to the server. """ rlog(10, self.name, 'reconnect') self.exit() rlog(10, self.name, 'sleeping 15 seconds') time.sleep(15) return self.connect() def disconnectHandler(self, ex): """ handler called on disconnect. """ self.stop = True rlog(10, self.name, 'disconnected: %s' % str(ex)) self.reconnect()