gozerbot.utils.url

url related helper functions

class gozerbot.utils.url.CBURLopener(version, *args)

Bases: urllib.FancyURLopener

our URLOpener

class gozerbot.utils.url.Stripper

Bases: sgmllib.SGMLParser

html stripper

handle_data(data)

data handler

strip(some_html)

strip html

gozerbot.utils.url.decode_html_entities(s)

smart decoding of html entities to utf-8

gozerbot.utils.url.deleteurl(url, myheaders={}, postdata={}, keyfile='', certfile='', port=80)

very basic HTTP DELETE url deleter

gozerbot.utils.url.get_encoding(data)

get encoding from web data

gozerbot.utils.url.getpostdata(event)

postdata extractor for events.

gozerbot.utils.url.geturl(url, version=None)

fetch an url

gozerbot.utils.url.geturl2(url, decode=False)

use urllib2 to fetch an url

gozerbot.utils.url.geturl4(url, myheaders={}, postdata={}, keyfile='', certfile='', port=80)

fetch url using the http library.

class gozerbot.utils.url.istr

Bases: str

gozerbot.utils.url.posturl(url, myheaders, postdata, keyfile=None, certfile='', port=80)

very basic HTTP POST url poster

gozerbot.utils.url.puturl(url, myheaders={}, postdata={}, keyfile='', certfile='', port=80)

very basic HTTP PUT url retriever

gozerbot.utils.url.striphtml(txt)

strip html from txt

gozerbot.utils.url.useragent()

provide useragent string

CODE

# gozerbot/utils/url.py
#
#

""" url related helper functions """

__status__ = "seen"

gozerbot imports

from log import rlog
from generic import fromenc
from gozerbot.datadir import datadir
from gozerbot.config import config

basic imports

import time
import sys
import re
import urllib
import urllib2
import urlparse
import socket
import random
import os
import sgmllib
import thread
import types
import httplib
import StringIO
import htmlentitydefs
import tempfile
import cgi

defines

try: import chardet
except ImportError: chardet = None

class istr(str): pass

useragent function

def useragent():
    """ provide useragent string """
    (name, version) = config['version'].split()[0:2]
    return 'Mozilla/5.0 (compatible; %s %s; http://gozerbot.googlecode.com)' % (name, version)

CBURLopener class

class CBURLopener(urllib.FancyURLopener):

    """ our URLOpener """

    def __init__(self, version, *args):
        if version: self.version = version
        else: self.version = useragent()
        urllib.FancyURLopener.__init__(self, *args)

geturl function

def geturl(url, version=None):
    """ fetch an url """
    urllib._urlopener = CBURLopener(version)
    rlog(10, 'url', 'fetching %s' % url)
    result = urllib.urlopen(url)
    tmp = result.read()
    result.close()
    return tmp

geturl2 function

def geturl2(url, decode=False):
    """ use urllib2 to fetch an url """
    rlog(10, 'url', 'fetching %s' % url)
    request = urllib2.Request(url)
    request.add_header('User-Agent', useragent())
    opener = urllib2.build_opener()
    result = opener.open(request)
    tmp = result.read()
    info = result.info()
    result.close()
    if decode:
        encoding = get_encoding(tmp)
        rlog(0, 'url', '%s encoding: %s' % (url, encoding))
        res = istr(fromenc(tmp, encoding, url))
    else: res = istr(tmp)
    res.info = info
    return res

geturl4 function

def geturl4(url, myheaders={}, postdata={}, keyfile="", certfile="", port=80):
    """ fetch url using the http library. """
    headers = {'Content-Type': 'text/html', 'Accept': 'text/plain; text/html', 'User-Agent': useragent()}
    headers.update(myheaders)
    urlparts = urlparse.urlparse(url)
    try:
       port = int(urlparts[1].split(':')[1])
       host = urlparts[1].split(':')[0]
    except: host = urlparts[1]
    if keyfile: connection = httplib.HTTPSConnection(host, port, keyfile, certfile)
    elif 'https' in urlparts[0]: connection = httplib.HTTPSConnection(host, port)
    else: connection = httplib.HTTPConnection(host, port)
    if type(postdata) == types.DictType: postdata = urllib.urlencode(postdata)
    rlog(10, 'url', 'fetching %s' % url)
    connection.request('GET', urlparts[2])
    return connection.getresponse()

posturl function

def posturl(url, myheaders, postdata, keyfile=None, certfile="",port=80):
    """ very basic HTTP POST url poster """
    headers = {'Content-Type': 'application/x-www-form-urlencoded', 'Accept': 'text/plain; text/html', 'User-Agent': useragent()}
    headers.update(myheaders)
    urlparts = urlparse.urlparse(url)
    if keyfile: connection = httplib.HTTPSConnection(urlparts[1], port, keyfile, certfile)
    else: connection = httplib.HTTPConnection(urlparts[1])
    if type(postdata) == types.DictType: postdata = urllib.urlencode(postdata)
    rlog(10, 'url', 'fetching %s' % url)
    connection.request('POST', urlparts[2], postdata, headers)
    return connection.getresponse()

deleteurl function

def deleteurl(url, myheaders={}, postdata={}, keyfile="", certfile="", port=80):
    """ very basic HTTP DELETE url deleter """
    headers = {'Content-Type': 'application/x-www-form-urlencoded', 'Accept': 'text/plain; text/html', 'User-Agent': useragent()}
    headers.update(myheaders)
    urlparts = urlparse.urlparse(url)
    if keyfile and certfile: connection = httplib.HTTPSConnection(urlparts[1], port, keyfile, certfile)
    else: connection = httplib.HTTPConnection(urlparts[1])
    if type(postdata) == types.DictType: postdata = urllib.urlencode(postdata)
    rlog(10, 'url', 'fetching %s' % url)
    connection.request('DELETE', urlparts[2], postdata, headers)
    return connection.getresponse()

puturl function

def puturl(url, myheaders={}, postdata={}, keyfile="", certfile="", port=80):
    """ very basic HTTP PUT url retriever """
    headers = {'Content-Type': 'application/x-www-form-urlencoded', 'Accept': 'text/plain; text/html', 'User-Agent': useragent()}
    headers.update(myheaders)
    urlparts = urlparse.urlparse(url)
    if keyfile: connection = httplib.HTTPSConnection(urlparts[1], port, keyfile, certfile)
    else: connection = httplib.HTTPConnection(urlparts[1])
    if type(postdata) == types.DictType: postdata = urllib.urlencode(postdata)
    rlog(10, 'url', 'fetching %s' % url)
    connection.request('PUT', urlparts[2], postdata, headers)
    return connection.getresponse()

getpostdata function

def getpostdata(event):
    """ postdata extractor for events. """
    ctype, pdict = cgi.parse_header(event.headers.getheader('content-type'))
    body = cgi.FieldStorage(fp=event.rfile, headers=event.headers, environ = {'REQUEST_METHOD':'POST'}, keep_blank_values = 1)
    result = {}
    for name in dict(body): result[name] = body.getfirst(name)
    return result

decode_html_entities function

def decode_html_entities(s):
    """ smart decoding of html entities to utf-8 """
    re_ent_match = re.compile(u'&([^;]+);')
    re_entn_match = re.compile(u'&#([^x;]+);')
    re_entnx_match = re.compile(u'&#x([^;]+);')
    s = s.decode('utf-8', 'replace')

    def to_entn(match):
        """ convert to entities """
        if htmlentitydefs.entitydefs.has_key(match.group(1)):
            return htmlentitydefs.entitydefs[match.group(1)].decode('utf-8', 'replace')
        return match.group(0)

    def to_utf8(match):
        """ convert to utf-8 """
        return unichr(long(match.group(1)))

    def to_utf8x(match):
        """ convert to utf-8 """
        return unichr(long(match.group(1), 16))

    s = re_ent_match.sub(to_entn, s)
    s = re_entn_match.sub(to_utf8, s)
    s = re_entnx_match.sub(to_utf8x, s)
    return s

get_encoding function

def get_encoding(data):
    """ get encoding from web data """
    # first we try if we have the .info attribute to determine the encoding from
    if hasattr(data, 'info') and data.info.has_key('content-type') and 'charset' in data.info['content-type'].lower():
        charset = data.info['content-type'].lower().split('charset', 1)[1].strip()
        if charset[0] == '=':
            charset = charset[1:].strip()
            if ';' in charset: return charset.split(';')[0].strip()
            return charset
    # <meta http-equiv="content-type" content="text/html; charset=..." />
    if '<meta' in data.lower():
        metas = re.findall(u'<meta[^>]+>', data, re.I | re.M)
        if metas:
            for meta in metas:
                test_http_equiv = re.search('http-equiv\s*=\s*[\'"]([^\'"]+)[\'"]', meta, re.I)
                if test_http_equiv and test_http_equiv.group(1).lower() == 'content-type':
                    test_content = re.search('content\s*=\s*[\'"]([^\'"]+)[\'"]', meta, re.I)
                    if test_content:
                        test_charset = re.search('charset\s*=\s*([^\s\'"]+)', meta, re.I)
                        if test_charset:
                            return test_charset.group(1)
    if chardet:
        test = chardet.detect(data)
        if test.has_key('encoding'): return test['encoding']
    return sys.getdefaultencoding()

Stripper Class

class Stripper(sgmllib.SGMLParser):

    """ html stripper """

    def __init__(self):
        sgmllib.SGMLParser.__init__(self)

    def strip(self, some_html):
        """ strip html """
        self.theString = ""
        self.feed(some_html)
        self.close()
        return self.theString

    def handle_data(self, data):
        """ data handler """
        self.theString += fromenc(data)

striphtml function

def striphtml(txt):
    """ strip html from txt """
    stripper = Stripper()
    txt = stripper.strip(fromenc(txt))
    return txt