test_bz2.py 16 KB
Newer Older
1
#!/usr/bin/python
2
from test import test_support
3
from test.test_support import TESTFN, import_module
4

5 6 7
import unittest
from cStringIO import StringIO
import os
8
import subprocess
9
import sys
10
import threading
11

12
bz2 = import_module('bz2')
13
from bz2 import BZ2File, BZ2Compressor, BZ2Decompressor
14

15
has_cmdline_bunzip2 = sys.platform not in ("win32", "os2emx", "riscos")
16

17
class BaseTest(unittest.TestCase):
Tim Peters's avatar
Tim Peters committed
18 19 20 21 22
    "Base for other testcases."
    TEXT = 'root:x:0:0:root:/root:/bin/bash\nbin:x:1:1:bin:/bin:\ndaemon:x:2:2:daemon:/sbin:\nadm:x:3:4:adm:/var/adm:\nlp:x:4:7:lp:/var/spool/lpd:\nsync:x:5:0:sync:/sbin:/bin/sync\nshutdown:x:6:0:shutdown:/sbin:/sbin/shutdown\nhalt:x:7:0:halt:/sbin:/sbin/halt\nmail:x:8:12:mail:/var/spool/mail:\nnews:x:9:13:news:/var/spool/news:\nuucp:x:10:14:uucp:/var/spool/uucp:\noperator:x:11:0:operator:/root:\ngames:x:12:100:games:/usr/games:\ngopher:x:13:30:gopher:/usr/lib/gopher-data:\nftp:x:14:50:FTP User:/var/ftp:/bin/bash\nnobody:x:65534:65534:Nobody:/home:\npostfix:x:100:101:postfix:/var/spool/postfix:\nniemeyer:x:500:500::/home/niemeyer:/bin/bash\npostgres:x:101:102:PostgreSQL Server:/var/lib/pgsql:/bin/bash\nmysql:x:102:103:MySQL server:/var/lib/mysql:/bin/bash\nwww:x:103:104::/var/www:/bin/false\n'
    DATA = 'BZh91AY&SY.\xc8N\x18\x00\x01>_\x80\x00\x10@\x02\xff\xf0\x01\x07n\x00?\xe7\xff\xe00\x01\x99\xaa\x00\xc0\x03F\x86\x8c#&\x83F\x9a\x03\x06\xa6\xd0\xa6\x93M\x0fQ\xa7\xa8\x06\x804hh\x12$\x11\xa4i4\xf14S\xd2<Q\xb5\x0fH\xd3\xd4\xdd\xd5\x87\xbb\xf8\x94\r\x8f\xafI\x12\xe1\xc9\xf8/E\x00pu\x89\x12]\xc9\xbbDL\nQ\x0e\t1\x12\xdf\xa0\xc0\x97\xac2O9\x89\x13\x94\x0e\x1c7\x0ed\x95I\x0c\xaaJ\xa4\x18L\x10\x05#\x9c\xaf\xba\xbc/\x97\x8a#C\xc8\xe1\x8cW\xf9\xe2\xd0\xd6M\xa7\x8bXa<e\x84t\xcbL\xb3\xa7\xd9\xcd\xd1\xcb\x84.\xaf\xb3\xab\xab\xad`n}\xa0lh\tE,\x8eZ\x15\x17VH>\x88\xe5\xcd9gd6\x0b\n\xe9\x9b\xd5\x8a\x99\xf7\x08.K\x8ev\xfb\xf7xw\xbb\xdf\xa1\x92\xf1\xdd|/";\xa2\xba\x9f\xd5\xb1#A\xb6\xf6\xb3o\xc9\xc5y\\\xebO\xe7\x85\x9a\xbc\xb6f8\x952\xd5\xd7"%\x89>V,\xf7\xa6z\xe2\x9f\xa3\xdf\x11\x11"\xd6E)I\xa9\x13^\xca\xf3r\xd0\x03U\x922\xf26\xec\xb6\xed\x8b\xc3U\x13\x9d\xc5\x170\xa4\xfa^\x92\xacDF\x8a\x97\xd6\x19\xfe\xdd\xb8\xbd\x1a\x9a\x19\xa3\x80ankR\x8b\xe5\xd83]\xa9\xc6\x08\x82f\xf6\xb9"6l$\xb8j@\xc0\x8a\xb0l1..\xbak\x83ls\x15\xbc\xf4\xc1\x13\xbe\xf8E\xb8\x9d\r\xa8\x9dk\x84\xd3n\xfa\xacQ\x07\xb1%y\xaav\xb4\x08\xe0z\x1b\x16\xf5\x04\xe9\xcc\xb9\x08z\x1en7.G\xfc]\xc9\x14\xe1B@\xbb!8`'
    DATA_CRLF = 'BZh91AY&SY\xaez\xbbN\x00\x01H\xdf\x80\x00\x12@\x02\xff\xf0\x01\x07n\x00?\xe7\xff\xe0@\x01\xbc\xc6`\x86*\x8d=M\xa9\x9a\x86\xd0L@\x0fI\xa6!\xa1\x13\xc8\x88jdi\x8d@\x03@\x1a\x1a\x0c\x0c\x83 \x00\xc4h2\x19\x01\x82D\x84e\t\xe8\x99\x89\x19\x1ah\x00\r\x1a\x11\xaf\x9b\x0fG\xf5(\x1b\x1f?\t\x12\xcf\xb5\xfc\x95E\x00ps\x89\x12^\xa4\xdd\xa2&\x05(\x87\x04\x98\x89u\xe40%\xb6\x19\'\x8c\xc4\x89\xca\x07\x0e\x1b!\x91UIFU%C\x994!DI\xd2\xfa\xf0\xf1N8W\xde\x13A\xf5\x9cr%?\x9f3;I45A\xd1\x8bT\xb1<l\xba\xcb_\xc00xY\x17r\x17\x88\x08\x08@\xa0\ry@\x10\x04$)`\xf2\xce\x89z\xb0s\xec\x9b.iW\x9d\x81\xb5-+t\x9f\x1a\'\x97dB\xf5x\xb5\xbe.[.\xd7\x0e\x81\xe7\x08\x1cN`\x88\x10\xca\x87\xc3!"\x80\x92R\xa1/\xd1\xc0\xe6mf\xac\xbd\x99\xcca\xb3\x8780>\xa4\xc7\x8d\x1a\\"\xad\xa1\xabyBg\x15\xb9l\x88\x88\x91k"\x94\xa4\xd4\x89\xae*\xa6\x0b\x10\x0c\xd6\xd4m\xe86\xec\xb5j\x8a\x86j\';\xca.\x01I\xf2\xaaJ\xe8\x88\x8cU+t3\xfb\x0c\n\xa33\x13r2\r\x16\xe0\xb3(\xbf\x1d\x83r\xe7M\xf0D\x1365\xd8\x88\xd3\xa4\x92\xcb2\x06\x04\\\xc1\xb0\xea//\xbek&\xd8\xe6+t\xe5\xa1\x13\xada\x16\xder5"w]\xa2i\xb7[\x97R \xe2IT\xcd;Z\x04dk4\xad\x8a\t\xd3\x81z\x10\xf1:^`\xab\x1f\xc5\xdc\x91N\x14$+\x9e\xae\xd3\x80'

23 24
    if has_cmdline_bunzip2:
        def decompress(self, data):
25 26 27 28 29 30 31 32
            pop = subprocess.Popen("bunzip2", shell=True,
                                   stdin=subprocess.PIPE,
                                   stdout=subprocess.PIPE,
                                   stderr=subprocess.STDOUT)
            pop.stdin.write(data)
            pop.stdin.close()
            ret = pop.stdout.read()
            pop.stdout.close()
33 34 35 36 37
            if pop.wait() != 0:
                ret = bz2.decompress(data)
            return ret

    else:
38
        # bunzip2 isn't available to run on Windows.
39 40 41
        def decompress(self, data):
            return bz2.decompress(data)

42
class BZ2FileTest(BaseTest):
43
    "Test BZ2File type miscellaneous methods."
Tim Peters's avatar
Tim Peters committed
44 45

    def setUp(self):
46
        self.filename = TESTFN
Tim Peters's avatar
Tim Peters committed
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64

    def tearDown(self):
        if os.path.isfile(self.filename):
            os.unlink(self.filename)

    def createTempFile(self, crlf=0):
        f = open(self.filename, "wb")
        if crlf:
            data = self.DATA_CRLF
        else:
            data = self.DATA
        f.write(data)
        f.close()

    def testRead(self):
        # "Test BZ2File.read()"
        self.createTempFile()
        bz2f = BZ2File(self.filename)
65
        self.assertRaises(TypeError, bz2f.read, None)
Tim Peters's avatar
Tim Peters committed
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
        self.assertEqual(bz2f.read(), self.TEXT)
        bz2f.close()

    def testReadChunk10(self):
        # "Test BZ2File.read() in chunks of 10 bytes"
        self.createTempFile()
        bz2f = BZ2File(self.filename)
        text = ''
        while 1:
            str = bz2f.read(10)
            if not str:
                break
            text += str
        self.assertEqual(text, text)
        bz2f.close()

    def testRead100(self):
        # "Test BZ2File.read(100)"
        self.createTempFile()
        bz2f = BZ2File(self.filename)
        self.assertEqual(bz2f.read(100), self.TEXT[:100])
        bz2f.close()

    def testReadLine(self):
        # "Test BZ2File.readline()"
        self.createTempFile()
        bz2f = BZ2File(self.filename)
93
        self.assertRaises(TypeError, bz2f.readline, None)
Tim Peters's avatar
Tim Peters committed
94 95 96 97 98 99 100 101 102
        sio = StringIO(self.TEXT)
        for line in sio.readlines():
            self.assertEqual(bz2f.readline(), line)
        bz2f.close()

    def testReadLines(self):
        # "Test BZ2File.readlines()"
        self.createTempFile()
        bz2f = BZ2File(self.filename)
103
        self.assertRaises(TypeError, bz2f.readlines, None)
Tim Peters's avatar
Tim Peters committed
104 105 106 107 108 109 110 111 112 113 114 115
        sio = StringIO(self.TEXT)
        self.assertEqual(bz2f.readlines(), sio.readlines())
        bz2f.close()

    def testIterator(self):
        # "Test iter(BZ2File)"
        self.createTempFile()
        bz2f = BZ2File(self.filename)
        sio = StringIO(self.TEXT)
        self.assertEqual(list(iter(bz2f)), sio.readlines())
        bz2f.close()

116 117 118 119 120 121 122 123 124 125 126
    def testClosedIteratorDeadlock(self):
        # "Test that iteration on a closed bz2file releases the lock."
        # http://bugs.python.org/issue3309
        self.createTempFile()
        bz2f = BZ2File(self.filename)
        bz2f.close()
        self.assertRaises(ValueError, bz2f.next)
        # This call will deadlock of the above .next call failed to
        # release the lock.
        self.assertRaises(ValueError, bz2f.readlines)

Tim Peters's avatar
Tim Peters committed
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153
    def testXReadLines(self):
        # "Test BZ2File.xreadlines()"
        self.createTempFile()
        bz2f = BZ2File(self.filename)
        sio = StringIO(self.TEXT)
        self.assertEqual(list(bz2f.xreadlines()), sio.readlines())
        bz2f.close()

    def testUniversalNewlinesLF(self):
        # "Test BZ2File.read() with universal newlines (\\n)"
        self.createTempFile()
        bz2f = BZ2File(self.filename, "rU")
        self.assertEqual(bz2f.read(), self.TEXT)
        self.assertEqual(bz2f.newlines, "\n")
        bz2f.close()

    def testUniversalNewlinesCRLF(self):
        # "Test BZ2File.read() with universal newlines (\\r\\n)"
        self.createTempFile(crlf=1)
        bz2f = BZ2File(self.filename, "rU")
        self.assertEqual(bz2f.read(), self.TEXT)
        self.assertEqual(bz2f.newlines, "\r\n")
        bz2f.close()

    def testWrite(self):
        # "Test BZ2File.write()"
        bz2f = BZ2File(self.filename, "w")
154
        self.assertRaises(TypeError, bz2f.write)
Tim Peters's avatar
Tim Peters committed
155 156
        bz2f.write(self.TEXT)
        bz2f.close()
157
        f = open(self.filename, 'rb')
Tim Peters's avatar
Tim Peters committed
158 159 160 161 162 163 164 165 166 167 168 169 170 171
        self.assertEqual(self.decompress(f.read()), self.TEXT)
        f.close()

    def testWriteChunks10(self):
        # "Test BZ2File.write() with chunks of 10 bytes"
        bz2f = BZ2File(self.filename, "w")
        n = 0
        while 1:
            str = self.TEXT[n*10:(n+1)*10]
            if not str:
                break
            bz2f.write(str)
            n += 1
        bz2f.close()
172
        f = open(self.filename, 'rb')
Tim Peters's avatar
Tim Peters committed
173 174 175 176 177 178
        self.assertEqual(self.decompress(f.read()), self.TEXT)
        f.close()

    def testWriteLines(self):
        # "Test BZ2File.writelines()"
        bz2f = BZ2File(self.filename, "w")
179
        self.assertRaises(TypeError, bz2f.writelines)
Tim Peters's avatar
Tim Peters committed
180 181 182
        sio = StringIO(self.TEXT)
        bz2f.writelines(sio.readlines())
        bz2f.close()
183 184
        # patch #1535500
        self.assertRaises(ValueError, bz2f.writelines, ["a"])
185
        f = open(self.filename, 'rb')
Tim Peters's avatar
Tim Peters committed
186 187 188
        self.assertEqual(self.decompress(f.read()), self.TEXT)
        f.close()

189 190 191 192 193 194 195 196 197
    def testWriteMethodsOnReadOnlyFile(self):
        bz2f = BZ2File(self.filename, "w")
        bz2f.write("abc")
        bz2f.close()

        bz2f = BZ2File(self.filename, "r")
        self.assertRaises(IOError, bz2f.write, "a")
        self.assertRaises(IOError, bz2f.writelines, ["a"])

Tim Peters's avatar
Tim Peters committed
198 199 200 201
    def testSeekForward(self):
        # "Test BZ2File.seek(150, 0)"
        self.createTempFile()
        bz2f = BZ2File(self.filename)
202
        self.assertRaises(TypeError, bz2f.seek)
Tim Peters's avatar
Tim Peters committed
203 204
        bz2f.seek(150)
        self.assertEqual(bz2f.read(), self.TEXT[150:])
205
        bz2f.close()
Tim Peters's avatar
Tim Peters committed
206 207 208 209 210 211 212 213

    def testSeekBackwards(self):
        # "Test BZ2File.seek(-150, 1)"
        self.createTempFile()
        bz2f = BZ2File(self.filename)
        bz2f.read(500)
        bz2f.seek(-150, 1)
        self.assertEqual(bz2f.read(), self.TEXT[500-150:])
214
        bz2f.close()
Tim Peters's avatar
Tim Peters committed
215 216 217 218

    def testSeekBackwardsFromEnd(self):
        # "Test BZ2File.seek(-150, 2)"
        self.createTempFile()
219
        bz2f = BZ2File(self.filename)
Tim Peters's avatar
Tim Peters committed
220 221
        bz2f.seek(-150, 2)
        self.assertEqual(bz2f.read(), self.TEXT[len(self.TEXT)-150:])
222
        bz2f.close()
Tim Peters's avatar
Tim Peters committed
223 224 225 226 227 228 229 230

    def testSeekPostEnd(self):
        # "Test BZ2File.seek(150000)"
        self.createTempFile()
        bz2f = BZ2File(self.filename)
        bz2f.seek(150000)
        self.assertEqual(bz2f.tell(), len(self.TEXT))
        self.assertEqual(bz2f.read(), "")
231
        bz2f.close()
Tim Peters's avatar
Tim Peters committed
232 233 234 235 236 237 238 239 240

    def testSeekPostEndTwice(self):
        # "Test BZ2File.seek(150000) twice"
        self.createTempFile()
        bz2f = BZ2File(self.filename)
        bz2f.seek(150000)
        bz2f.seek(150000)
        self.assertEqual(bz2f.tell(), len(self.TEXT))
        self.assertEqual(bz2f.read(), "")
241
        bz2f.close()
Tim Peters's avatar
Tim Peters committed
242 243 244 245 246 247 248 249

    def testSeekPreStart(self):
        # "Test BZ2File.seek(-150, 0)"
        self.createTempFile()
        bz2f = BZ2File(self.filename)
        bz2f.seek(-150)
        self.assertEqual(bz2f.tell(), 0)
        self.assertEqual(bz2f.read(), self.TEXT)
250
        bz2f.close()
251

252
    def testOpenDel(self):
253
        # "Test opening and deleting a file many times"
254 255 256 257 258
        self.createTempFile()
        for i in xrange(10000):
            o = BZ2File(self.filename)
            del o

259 260 261 262
    def testOpenNonexistent(self):
        # "Test opening a nonexistent file"
        self.assertRaises(IOError, BZ2File, "/non/existent")

263 264 265 266 267 268 269 270 271 272
    def testModeU(self):
        # Bug #1194181: bz2.BZ2File opened for write with mode "U"
        self.createTempFile()
        bz2f = BZ2File(self.filename, "U")
        bz2f.close()
        f = file(self.filename)
        f.seek(0, 2)
        self.assertEqual(f.tell(), len(self.DATA))
        f.close()

273 274 275 276 277 278 279 280 281 282 283
    def testBug1191043(self):
        # readlines() for files containing no newline
        data = 'BZh91AY&SY\xd9b\x89]\x00\x00\x00\x03\x80\x04\x00\x02\x00\x0c\x00 \x00!\x9ah3M\x13<]\xc9\x14\xe1BCe\x8a%t'
        f = open(self.filename, "wb")
        f.write(data)
        f.close()
        bz2f = BZ2File(self.filename)
        lines = bz2f.readlines()
        bz2f.close()
        self.assertEqual(lines, ['Test'])
        bz2f = BZ2File(self.filename)
284
        xlines = list(bz2f.xreadlines())
285
        bz2f.close()
286
        self.assertEqual(xlines, ['Test'])
287

288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309
    def testContextProtocol(self):
        # BZ2File supports the context management protocol
        f = None
        with BZ2File(self.filename, "wb") as f:
            f.write(b"xxx")
        f = BZ2File(self.filename, "rb")
        f.close()
        try:
            with f:
                pass
        except ValueError:
            pass
        else:
            self.fail("__enter__ on a closed file didn't raise an exception")
        try:
            with BZ2File(self.filename, "wb") as f:
                1/0
        except ZeroDivisionError:
            pass
        else:
            self.fail("1/0 didn't raise an exception")

310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326
    def testThreading(self):
        # Using a BZ2File from several threads doesn't deadlock (issue #7205).
        data = "1" * 2**20
        nthreads = 10
        f = bz2.BZ2File(self.filename, 'wb')
        try:
            def comp():
                for i in range(5):
                    f.write(data)
            threads = [threading.Thread(target=comp) for i in range(nthreads)]
            for t in threads:
                t.start()
            for t in threads:
                t.join()
        finally:
            f.close()

327

328
class BZ2CompressorTest(BaseTest):
Tim Peters's avatar
Tim Peters committed
329 330 331
    def testCompress(self):
        # "Test BZ2Compressor.compress()/flush()"
        bz2c = BZ2Compressor()
332
        self.assertRaises(TypeError, bz2c.compress)
Tim Peters's avatar
Tim Peters committed
333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349
        data = bz2c.compress(self.TEXT)
        data += bz2c.flush()
        self.assertEqual(self.decompress(data), self.TEXT)

    def testCompressChunks10(self):
        # "Test BZ2Compressor.compress()/flush() with chunks of 10 bytes"
        bz2c = BZ2Compressor()
        n = 0
        data = ''
        while 1:
            str = self.TEXT[n*10:(n+1)*10]
            if not str:
                break
            data += bz2c.compress(str)
            n += 1
        data += bz2c.flush()
        self.assertEqual(self.decompress(data), self.TEXT)
350 351

class BZ2DecompressorTest(BaseTest):
352 353 354
    def test_Constructor(self):
        self.assertRaises(TypeError, BZ2Decompressor, 42)

Tim Peters's avatar
Tim Peters committed
355 356 357
    def testDecompress(self):
        # "Test BZ2Decompressor.decompress()"
        bz2d = BZ2Decompressor()
358
        self.assertRaises(TypeError, bz2d.decompress)
Tim Peters's avatar
Tim Peters committed
359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387
        text = bz2d.decompress(self.DATA)
        self.assertEqual(text, self.TEXT)

    def testDecompressChunks10(self):
        # "Test BZ2Decompressor.decompress() with chunks of 10 bytes"
        bz2d = BZ2Decompressor()
        text = ''
        n = 0
        while 1:
            str = self.DATA[n*10:(n+1)*10]
            if not str:
                break
            text += bz2d.decompress(str)
            n += 1
        self.assertEqual(text, self.TEXT)

    def testDecompressUnusedData(self):
        # "Test BZ2Decompressor.decompress() with unused data"
        bz2d = BZ2Decompressor()
        unused_data = "this is unused data"
        text = bz2d.decompress(self.DATA+unused_data)
        self.assertEqual(text, self.TEXT)
        self.assertEqual(bz2d.unused_data, unused_data)

    def testEOFError(self):
        # "Calling BZ2Decompressor.decompress() after EOS must raise EOFError"
        bz2d = BZ2Decompressor()
        text = bz2d.decompress(self.DATA)
        self.assertRaises(EOFError, bz2d.decompress, "anything")
388 389 390


class FuncTest(BaseTest):
Tim Peters's avatar
Tim Peters committed
391 392 393 394
    "Test module functions"

    def testCompress(self):
        # "Test compress() function"
395
        data = bz2.compress(self.TEXT)
Tim Peters's avatar
Tim Peters committed
396 397 398 399
        self.assertEqual(self.decompress(data), self.TEXT)

    def testDecompress(self):
        # "Test decompress() function"
400
        text = bz2.decompress(self.DATA)
Tim Peters's avatar
Tim Peters committed
401 402 403 404
        self.assertEqual(text, self.TEXT)

    def testDecompressEmpty(self):
        # "Test decompress() function with empty string"
405
        text = bz2.decompress("")
Tim Peters's avatar
Tim Peters committed
406 407 408 409
        self.assertEqual(text, "")

    def testDecompressIncomplete(self):
        # "Test decompress() function with incomplete data"
410
        self.assertRaises(ValueError, bz2.decompress, self.DATA[:-10])
411 412

def test_main():
413 414 415 416 417 418
    test_support.run_unittest(
        BZ2FileTest,
        BZ2CompressorTest,
        BZ2DecompressorTest,
        FuncTest
    )
419
    test_support.reap_children()
420 421 422 423 424

if __name__ == '__main__':
    test_main()

# vim:ts=4:sw=4