mimetools.py 6.6 KB
Newer Older
1
"""Various tools used by MIME-reading or MIME-writing programs."""
2 3


4
import os
5
import rfc822
6
import tempfile
7

8 9
__all__ = ["Message","choose_boundary","encode","decode","copyliteral",
           "copybinary"]
10 11

class Message(rfc822.Message):
Tim Peters's avatar
Tim Peters committed
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
    """A derived class of rfc822.Message that knows about MIME headers and
    contains some hooks for decoding encoded and multipart messages."""

    def __init__(self, fp, seekable = 1):
        rfc822.Message.__init__(self, fp, seekable)
        self.encodingheader = \
                self.getheader('content-transfer-encoding')
        self.typeheader = \
                self.getheader('content-type')
        self.parsetype()
        self.parseplist()

    def parsetype(self):
        str = self.typeheader
        if str is None:
            str = 'text/plain'
        if ';' in str:
            i = str.index(';')
            self.plisttext = str[i:]
            str = str[:i]
        else:
            self.plisttext = ''
        fields = str.split('/')
        for i in range(len(fields)):
            fields[i] = fields[i].strip().lower()
        self.type = '/'.join(fields)
        self.maintype = fields[0]
        self.subtype = '/'.join(fields[1:])

    def parseplist(self):
        str = self.plisttext
        self.plist = []
        while str[:1] == ';':
            str = str[1:]
            if ';' in str:
                # XXX Should parse quotes!
                end = str.index(';')
            else:
                end = len(str)
            f = str[:end]
            if '=' in f:
                i = f.index('=')
                f = f[:i].strip().lower() + \
                        '=' + f[i+1:].strip()
            self.plist.append(f.strip())
            str = str[end:]

    def getplist(self):
        return self.plist

    def getparam(self, name):
        name = name.lower() + '='
        n = len(name)
        for p in self.plist:
            if p[:n] == name:
                return rfc822.unquote(p[n:])
        return None

    def getparamnames(self):
        result = []
        for p in self.plist:
            i = p.find('=')
            if i >= 0:
                result.append(p[:i].lower())
        return result

    def getencoding(self):
        if self.encodingheader is None:
            return '7bit'
        return self.encodingheader.lower()

    def gettype(self):
        return self.type

    def getmaintype(self):
        return self.maintype

    def getsubtype(self):
        return self.subtype
91 92 93 94 95 96 97




# Utility functions
# -----------------

98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
try:
    import thread
except ImportError:
    import dummy_thread as thread
_counter_lock = thread.allocate_lock()
del thread

_counter = 0
def _get_next_counter():
    global _counter
    _counter_lock.acquire()
    _counter += 1
    result = _counter
    _counter_lock.release()
    return result
113 114 115 116

_prefix = None

def choose_boundary():
117 118 119 120 121 122
    """Return a string usable as a multipart boundary.

    The string chosen is unique within a single program run, and
    incorporates the user id (if available), process id (if available),
    and current time.  So it's very unlikely the returned string appears
    in message text, but there's no guarantee.
Tim Peters's avatar
Tim Peters committed
123 124 125 126 127 128 129 130 131

    The boundary contains dots so you have to quote it in the header."""

    global _prefix
    import time
    if _prefix is None:
        import socket
        hostid = socket.gethostbyname(socket.gethostname())
        try:
132
            uid = repr(os.getuid())
133
        except AttributeError:
Tim Peters's avatar
Tim Peters committed
134 135
            uid = '1'
        try:
136
            pid = repr(os.getpid())
137
        except AttributeError:
Tim Peters's avatar
Tim Peters committed
138 139
            pid = '1'
        _prefix = hostid + '.' + uid + '.' + pid
140
    return "%s.%.3f.%d" % (_prefix, time.time(), _get_next_counter())
141 142 143 144 145


# Subroutines for decoding some common content-transfer-types

def decode(input, output, encoding):
Tim Peters's avatar
Tim Peters committed
146 147 148 149 150 151 152 153 154 155 156 157
    """Decode common content-transfer-encodings (base64, quopri, uuencode)."""
    if encoding == 'base64':
        import base64
        return base64.decode(input, output)
    if encoding == 'quoted-printable':
        import quopri
        return quopri.decode(input, output)
    if encoding in ('uuencode', 'x-uuencode', 'uue', 'x-uue'):
        import uu
        return uu.decode(input, output)
    if encoding in ('7bit', '8bit'):
        return output.write(input.read())
158
    if encoding in decodetab:
Tim Peters's avatar
Tim Peters committed
159 160 161 162
        pipethrough(input, decodetab[encoding], output)
    else:
        raise ValueError, \
              'unknown Content-Transfer-Encoding: %s' % encoding
163 164

def encode(input, output, encoding):
Tim Peters's avatar
Tim Peters committed
165 166 167 168 169 170 171 172 173 174 175 176
    """Encode common content-transfer-encodings (base64, quopri, uuencode)."""
    if encoding == 'base64':
        import base64
        return base64.encode(input, output)
    if encoding == 'quoted-printable':
        import quopri
        return quopri.encode(input, output, 0)
    if encoding in ('uuencode', 'x-uuencode', 'uue', 'x-uue'):
        import uu
        return uu.encode(input, output)
    if encoding in ('7bit', '8bit'):
        return output.write(input.read())
177
    if encoding in encodetab:
Tim Peters's avatar
Tim Peters committed
178 179 180 181
        pipethrough(input, encodetab[encoding], output)
    else:
        raise ValueError, \
              'unknown Content-Transfer-Encoding: %s' % encoding
182

183 184 185 186
# The following is no longer used for standard encodings

# XXX This requires that uudecode and mmencode are in $PATH

187 188 189 190 191 192 193 194
uudecode_pipe = '''(
TEMP=/tmp/@uu.$$
sed "s%^begin [0-7][0-7]* .*%begin 600 $TEMP%" | uudecode
cat $TEMP
rm $TEMP
)'''

decodetab = {
Tim Peters's avatar
Tim Peters committed
195 196 197 198 199 200
        'uuencode':             uudecode_pipe,
        'x-uuencode':           uudecode_pipe,
        'uue':                  uudecode_pipe,
        'x-uue':                uudecode_pipe,
        'quoted-printable':     'mmencode -u -q',
        'base64':               'mmencode -u -b',
201 202 203
}

encodetab = {
Tim Peters's avatar
Tim Peters committed
204 205 206 207 208 209
        'x-uuencode':           'uuencode tempfile',
        'uuencode':             'uuencode tempfile',
        'x-uue':                'uuencode tempfile',
        'uue':                  'uuencode tempfile',
        'quoted-printable':     'mmencode -q',
        'base64':               'mmencode -b',
210 211 212
}

def pipeto(input, command):
Tim Peters's avatar
Tim Peters committed
213 214 215
    pipe = os.popen(command, 'w')
    copyliteral(input, pipe)
    pipe.close()
216 217

def pipethrough(input, command, output):
218 219
    (fd, tempname) = tempfile.mkstemp()
    temp = os.fdopen(fd, 'w')
Tim Peters's avatar
Tim Peters committed
220 221 222 223 224 225
    copyliteral(input, temp)
    temp.close()
    pipe = os.popen(command + ' <' + tempname, 'r')
    copybinary(pipe, output)
    pipe.close()
    os.unlink(tempname)
226 227

def copyliteral(input, output):
Tim Peters's avatar
Tim Peters committed
228 229 230 231
    while 1:
        line = input.readline()
        if not line: break
        output.write(line)
232 233

def copybinary(input, output):
Tim Peters's avatar
Tim Peters committed
234 235 236 237 238
    BUFSIZE = 8192
    while 1:
        line = input.read(BUFSIZE)
        if not line: break
        output.write(line)