binhex.py 13.2 KB
Newer Older
1
"""Macintosh binhex compression/decompression.
2

Jack Jansen's avatar
Jack Jansen committed
3 4 5 6 7 8 9 10 11 12 13 14 15
easy interface:
binhex(inputfilename, outputfilename)
hexbin(inputfilename, outputfilename)
"""

#
# Jack Jansen, CWI, August 1995.
#
# The module is supposed to be as compatible as possible. Especially the
# easy interface should work "as expected" on any platform.
# XXXX Note: currently, textfiles appear in mac-form on all platforms.
# We seem to lack a simple character-translate in python.
# (we should probably use ISO-Latin-1 on all but the mac platform).
Jeremy Hylton's avatar
Jeremy Hylton committed
16
# XXXX The simple routines are too simple: they expect to hold the complete
Jack Jansen's avatar
Jack Jansen committed
17
# files in-core. Should be fixed.
18 19
# XXXX It would be nice to handle AppleDouble format on unix
# (for servers serving macs).
Jack Jansen's avatar
Jack Jansen committed
20 21 22 23
# XXXX I don't understand what happens when you get 0x90 times the same byte on
# input. The resulting code (xx 90 90) would appear to be interpreted as an
# escaped *value* of 0x90. All coders I've seen appear to ignore this nicety...
#
24
import io
Jack Jansen's avatar
Jack Jansen committed
25
import os
26
import sys
Jack Jansen's avatar
Jack Jansen committed
27 28
import struct
import binascii
Tim Peters's avatar
Tim Peters committed
29

30 31
__all__ = ["binhex","hexbin","Error"]

32 33
class Error(Exception):
    pass
Jack Jansen's avatar
Jack Jansen committed
34 35 36 37 38

# States (what have we written)
[_DID_HEADER, _DID_DATA, _DID_RSRC] = range(3)

# Various constants
39 40
REASONABLY_LARGE = 32768  # Minimal amount we pass the rle-coder
LINELEN = 64
41
RUNCHAR = b"\x90"
Jack Jansen's avatar
Jack Jansen committed
42 43

#
44
# This code is no longer byte-order dependent
Jack Jansen's avatar
Jack Jansen committed
45

46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78

class FInfo:
    def __init__(self):
        self.Type = '????'
        self.Creator = '????'
        self.Flags = 0

def getfileinfo(name):
    finfo = FInfo()
    fp = io.open(name, 'rb')
    # Quick check for textfile
    data = fp.read(512)
    if 0 not in data:
        finfo.Type = 'TEXT'
    fp.seek(0, 2)
    dsize = fp.tell()
    fp.close()
    dir, file = os.path.split(name)
    file = file.replace(':', '-', 1)
    return file, finfo, dsize, 0

class openrsrc:
    def __init__(self, *args):
        pass

    def read(self, *args):
        return b''

    def write(self, *args):
        pass

    def close(self):
        pass
Tim Peters's avatar
Tim Peters committed
79

Jack Jansen's avatar
Jack Jansen committed
80
class _Hqxcoderengine:
81
    """Write data to the coder in 3-byte chunks"""
Tim Peters's avatar
Tim Peters committed
82

83 84
    def __init__(self, ofp):
        self.ofp = ofp
85 86
        self.data = b''
        self.hqxdata = b''
87
        self.linelen = LINELEN - 1
88 89 90 91

    def write(self, data):
        self.data = self.data + data
        datalen = len(self.data)
92
        todo = (datalen // 3) * 3
93 94 95 96 97 98 99 100 101
        data = self.data[:todo]
        self.data = self.data[todo:]
        if not data:
            return
        self.hqxdata = self.hqxdata + binascii.b2a_hqx(data)
        self._flush(0)

    def _flush(self, force):
        first = 0
102
        while first <= len(self.hqxdata) - self.linelen:
103
            last = first + self.linelen
104
            self.ofp.write(self.hqxdata[first:last] + b'\n')
105 106 107 108
            self.linelen = LINELEN
            first = last
        self.hqxdata = self.hqxdata[first:]
        if force:
109
            self.ofp.write(self.hqxdata + b':\n')
110 111 112

    def close(self):
        if self.data:
113
            self.hqxdata = self.hqxdata + binascii.b2a_hqx(self.data)
114 115 116
        self._flush(1)
        self.ofp.close()
        del self.ofp
Jack Jansen's avatar
Jack Jansen committed
117 118

class _Rlecoderengine:
119 120 121 122
    """Write data to the RLE-coder in suitably large chunks"""

    def __init__(self, ofp):
        self.ofp = ofp
123
        self.data = b''
124 125 126 127 128 129 130

    def write(self, data):
        self.data = self.data + data
        if len(self.data) < REASONABLY_LARGE:
            return
        rledata = binascii.rlecode_hqx(self.data)
        self.ofp.write(rledata)
131
        self.data = b''
132 133 134 135 136 137 138

    def close(self):
        if self.data:
            rledata = binascii.rlecode_hqx(self.data)
            self.ofp.write(rledata)
        self.ofp.close()
        del self.ofp
Jack Jansen's avatar
Jack Jansen committed
139 140

class BinHex:
141 142
    def __init__(self, name_finfo_dlen_rlen, ofp):
        name, finfo, dlen, rlen = name_finfo_dlen_rlen
143
        if isinstance(ofp, str):
144
            ofname = ofp
145 146
            ofp = io.open(ofname, 'wb')
        ofp.write(b'(This file must be converted with BinHex 4.0)\r\r:')
147 148 149
        hqxer = _Hqxcoderengine(ofp)
        self.ofp = _Rlecoderengine(hqxer)
        self.crc = 0
150
        if finfo is None:
151 152 153 154 155 156 157 158 159
            finfo = FInfo()
        self.dlen = dlen
        self.rlen = rlen
        self._writeinfo(name, finfo)
        self.state = _DID_HEADER

    def _writeinfo(self, name, finfo):
        nl = len(name)
        if nl > 63:
160
            raise Error('Filename too long')
161
        d = bytes([nl]) + name.encode("latin-1") + b'\0'
162 163 164 165 166 167
        tp, cr = finfo.Type, finfo.Creator
        if isinstance(tp, str):
            tp = tp.encode("latin-1")
        if isinstance(cr, str):
            cr = cr.encode("latin-1")
        d2 = tp + cr
168 169 170 171 172 173 174 175 176 177 178 179 180 181 182

        # Force all structs to be packed with big-endian
        d3 = struct.pack('>h', finfo.Flags)
        d4 = struct.pack('>ii', self.dlen, self.rlen)
        info = d + d2 + d3 + d4
        self._write(info)
        self._writecrc()

    def _write(self, data):
        self.crc = binascii.crc_hqx(data, self.crc)
        self.ofp.write(data)

    def _writecrc(self):
        # XXXX Should this be here??
        # self.crc = binascii.crc_hqx('\0\0', self.crc)
183 184 185 186 187
        if self.crc < 0:
            fmt = '>h'
        else:
            fmt = '>H'
        self.ofp.write(struct.pack(fmt, self.crc))
188 189 190 191
        self.crc = 0

    def write(self, data):
        if self.state != _DID_HEADER:
192
            raise Error('Writing data at the wrong time')
193 194 195 196
        self.dlen = self.dlen - len(data)
        self._write(data)

    def close_data(self):
197
        if self.dlen != 0:
198
            raise Error('Incorrect data size, diff=%r' % (self.rlen,))
199 200 201 202 203 204 205
        self._writecrc()
        self.state = _DID_DATA

    def write_rsrc(self, data):
        if self.state < _DID_DATA:
            self.close_data()
        if self.state != _DID_DATA:
206
            raise Error('Writing resource data at the wrong time')
207 208 209 210 211 212 213
        self.rlen = self.rlen - len(data)
        self._write(data)

    def close(self):
        if self.state < _DID_DATA:
            self.close_data()
        if self.state != _DID_DATA:
214
            raise Error('Close at the wrong time')
215
        if self.rlen != 0:
216
            raise Error("Incorrect resource-datasize, diff=%r" % (self.rlen,))
217 218 219 220
        self._writecrc()
        self.ofp.close()
        self.state = None
        del self.ofp
Tim Peters's avatar
Tim Peters committed
221

Jack Jansen's avatar
Jack Jansen committed
222
def binhex(inp, out):
223
    """binhex(infilename, outfilename): create binhex-encoded copy of a file"""
224 225
    finfo = getfileinfo(inp)
    ofp = BinHex(finfo, out)
Tim Peters's avatar
Tim Peters committed
226

227
    ifp = io.open(inp, 'rb')
228
    # XXXX Do textfile translation on non-mac systems
229
    while True:
230 231 232 233 234 235 236
        d = ifp.read(128000)
        if not d: break
        ofp.write(d)
    ofp.close_data()
    ifp.close()

    ifp = openrsrc(inp, 'rb')
237
    while True:
238 239 240 241
        d = ifp.read(128000)
        if not d: break
        ofp.write_rsrc(d)
    ofp.close()
Tim Peters's avatar
Tim Peters committed
242
    ifp.close()
Jack Jansen's avatar
Jack Jansen committed
243 244

class _Hqxdecoderengine:
245
    """Read data via the decoder in 4-byte chunks"""
Tim Peters's avatar
Tim Peters committed
246

247 248 249 250 251 252
    def __init__(self, ifp):
        self.ifp = ifp
        self.eof = 0

    def read(self, totalwtd):
        """Read at least wtd bytes (or until EOF)"""
253
        decdata = b''
254 255
        wtd = totalwtd
        #
Tim Peters's avatar
Tim Peters committed
256
        # The loop here is convoluted, since we don't really now how
257 258 259
        # much to decode: there may be newlines in the incoming data.
        while wtd > 0:
            if self.eof: return decdata
260
            wtd = ((wtd + 2) // 3) * 4
261 262 263 264 265 266
            data = self.ifp.read(wtd)
            #
            # Next problem: there may not be a complete number of
            # bytes in what we pass to a2b. Solve by yet another
            # loop.
            #
267
            while True:
268
                try:
269
                    decdatacur, self.eof = binascii.a2b_hqx(data)
270 271 272 273 274
                    break
                except binascii.Incomplete:
                    pass
                newdata = self.ifp.read(1)
                if not newdata:
275
                    raise Error('Premature EOF on binhex file')
276 277 278 279
                data = data + newdata
            decdata = decdata + decdatacur
            wtd = totalwtd - len(decdata)
            if not decdata and not self.eof:
280
                raise Error('Premature EOF on binhex file')
281 282 283 284
        return decdata

    def close(self):
        self.ifp.close()
Jack Jansen's avatar
Jack Jansen committed
285 286

class _Rledecoderengine:
287 288 289 290
    """Read data via the RLE-coder"""

    def __init__(self, ifp):
        self.ifp = ifp
291 292
        self.pre_buffer = b''
        self.post_buffer = b''
293 294 295 296
        self.eof = 0

    def read(self, wtd):
        if wtd > len(self.post_buffer):
297
            self._fill(wtd - len(self.post_buffer))
298 299 300 301 302
        rv = self.post_buffer[:wtd]
        self.post_buffer = self.post_buffer[wtd:]
        return rv

    def _fill(self, wtd):
303
        self.pre_buffer = self.pre_buffer + self.ifp.read(wtd + 4)
304 305 306
        if self.ifp.eof:
            self.post_buffer = self.post_buffer + \
                binascii.rledecode_hqx(self.pre_buffer)
307
            self.pre_buffer = b''
308
            return
Tim Peters's avatar
Tim Peters committed
309

310 311 312 313 314 315 316 317 318 319 320 321
        #
        # Obfuscated code ahead. We have to take care that we don't
        # end up with an orphaned RUNCHAR later on. So, we keep a couple
        # of bytes in the buffer, depending on what the end of
        # the buffer looks like:
        # '\220\0\220' - Keep 3 bytes: repeated \220 (escaped as \220\0)
        # '?\220' - Keep 2 bytes: repeated something-else
        # '\220\0' - Escaped \220: Keep 2 bytes.
        # '?\220?' - Complete repeat sequence: decode all
        # otherwise: keep 1 byte.
        #
        mark = len(self.pre_buffer)
322
        if self.pre_buffer[-3:] == RUNCHAR + b'\0' + RUNCHAR:
323
            mark = mark - 3
324
        elif self.pre_buffer[-1:] == RUNCHAR:
325
            mark = mark - 2
326
        elif self.pre_buffer[-2:] == RUNCHAR + b'\0':
327
            mark = mark - 2
328
        elif self.pre_buffer[-2:-1] == RUNCHAR:
329 330 331 332 333 334 335 336 337 338
            pass # Decode all
        else:
            mark = mark - 1

        self.post_buffer = self.post_buffer + \
            binascii.rledecode_hqx(self.pre_buffer[:mark])
        self.pre_buffer = self.pre_buffer[mark:]

    def close(self):
        self.ifp.close()
Jack Jansen's avatar
Jack Jansen committed
339 340

class HexBin:
341
    def __init__(self, ifp):
342
        if isinstance(ifp, str):
343
            ifp = io.open(ifp, 'rb')
344 345 346
        #
        # Find initial colon.
        #
347
        while True:
348 349
            ch = ifp.read(1)
            if not ch:
350
                raise Error("No binhex data found")
351 352
            # Cater for \r\n terminated lines (which show up as \n\r, hence
            # all lines start with \r)
353
            if ch == b'\r':
354
                continue
355
            if ch == b':':
356
                break
Tim Peters's avatar
Tim Peters committed
357

358 359 360 361
        hqxifp = _Hqxdecoderengine(ifp)
        self.ifp = _Rledecoderengine(hqxifp)
        self.crc = 0
        self._readheader()
Tim Peters's avatar
Tim Peters committed
362

363 364 365 366
    def _read(self, len):
        data = self.ifp.read(len)
        self.crc = binascii.crc_hqx(data, self.crc)
        return data
Tim Peters's avatar
Tim Peters committed
367

368 369 370 371 372 373
    def _checkcrc(self):
        filecrc = struct.unpack('>h', self.ifp.read(2))[0] & 0xffff
        #self.crc = binascii.crc_hqx('\0\0', self.crc)
        # XXXX Is this needed??
        self.crc = self.crc & 0xffff
        if filecrc != self.crc:
374 375
            raise Error('CRC error, computed %x, read %x'
                        % (self.crc, filecrc))
376 377 378 379 380
        self.crc = 0

    def _readheader(self):
        len = self._read(1)
        fname = self._read(ord(len))
381
        rest = self._read(1 + 4 + 4 + 2 + 4 + 4)
382
        self._checkcrc()
Tim Peters's avatar
Tim Peters committed
383

384 385 386 387 388
        type = rest[1:5]
        creator = rest[5:9]
        flags = struct.unpack('>h', rest[9:11])[0]
        self.dlen = struct.unpack('>l', rest[11:15])[0]
        self.rlen = struct.unpack('>l', rest[15:19])[0]
Tim Peters's avatar
Tim Peters committed
389

390 391
        self.FName = fname
        self.FInfo = FInfo()
392 393
        self.FInfo.Creator = creator
        self.FInfo.Type = type
394
        self.FInfo.Flags = flags
Tim Peters's avatar
Tim Peters committed
395

396
        self.state = _DID_HEADER
Tim Peters's avatar
Tim Peters committed
397

398 399
    def read(self, *n):
        if self.state != _DID_HEADER:
400
            raise Error('Read data at wrong time')
401 402 403 404 405
        if n:
            n = n[0]
            n = min(n, self.dlen)
        else:
            n = self.dlen
406
        rv = b''
407 408 409 410
        while len(rv) < n:
            rv = rv + self._read(n-len(rv))
        self.dlen = self.dlen - n
        return rv
Tim Peters's avatar
Tim Peters committed
411

412 413
    def close_data(self):
        if self.state != _DID_HEADER:
414
            raise Error('close_data at wrong time')
415 416 417 418
        if self.dlen:
            dummy = self._read(self.dlen)
        self._checkcrc()
        self.state = _DID_DATA
Tim Peters's avatar
Tim Peters committed
419

420 421 422 423
    def read_rsrc(self, *n):
        if self.state == _DID_HEADER:
            self.close_data()
        if self.state != _DID_DATA:
424
            raise Error('Read resource data at wrong time')
425 426 427 428 429 430 431
        if n:
            n = n[0]
            n = min(n, self.rlen)
        else:
            n = self.rlen
        self.rlen = self.rlen - n
        return self._read(n)
Tim Peters's avatar
Tim Peters committed
432

433 434 435 436 437 438
    def close(self):
        if self.rlen:
            dummy = self.read_rsrc(self.rlen)
        self._checkcrc()
        self.state = _DID_RSRC
        self.ifp.close()
Tim Peters's avatar
Tim Peters committed
439

Jack Jansen's avatar
Jack Jansen committed
440
def hexbin(inp, out):
441
    """hexbin(infilename, outfilename) - Decode binhexed file"""
442 443 444 445 446
    ifp = HexBin(inp)
    finfo = ifp.FInfo
    if not out:
        out = ifp.FName

447
    ofp = io.open(out, 'wb')
448
    # XXXX Do translation on non-mac systems
449
    while True:
450 451 452 453 454
        d = ifp.read(128000)
        if not d: break
        ofp.write(d)
    ofp.close()
    ifp.close_data()
Tim Peters's avatar
Tim Peters committed
455

456 457 458 459
    d = ifp.read_rsrc(128000)
    if d:
        ofp = openrsrc(out, 'wb')
        ofp.write(d)
460
        while True:
461 462 463 464 465 466
            d = ifp.read_rsrc(128000)
            if not d: break
            ofp.write(d)
        ofp.close()

    ifp.close()