Kaydet (Commit) f6b16a4b authored tarafından Martin v. Löwis's avatar Martin v. Löwis

Issue #14371: Support bzip2 in zipfile module.

Patch by Serhiy Storchaka.
üst 9acbb607
...@@ -87,7 +87,22 @@ The module defines the following items: ...@@ -87,7 +87,22 @@ The module defines the following items:
.. data:: ZIP_DEFLATED .. data:: ZIP_DEFLATED
The numeric constant for the usual ZIP compression method. This requires the The numeric constant for the usual ZIP compression method. This requires the
zlib module. No other compression methods are currently supported. zlib module.
.. data:: ZIP_BZIP2
The numeric constant for the BZIP2 compression method. This requires the
bz2 module.
.. versionadded:: 3.3
.. note::
The ZIP file format specification has included support for bzip2 compression
since 2001. However, some tools (including older Python releases) do not
support it, and may either refuse to process the ZIP file altogether, or
fail to extract individual files.
.. seealso:: .. seealso::
...@@ -118,9 +133,11 @@ ZipFile Objects ...@@ -118,9 +133,11 @@ ZipFile Objects
adding a ZIP archive to another file (such as :file:`python.exe`). If adding a ZIP archive to another file (such as :file:`python.exe`). If
*mode* is ``a`` and the file does not exist at all, it is created. *mode* is ``a`` and the file does not exist at all, it is created.
*compression* is the ZIP compression method to use when writing the archive, *compression* is the ZIP compression method to use when writing the archive,
and should be :const:`ZIP_STORED` or :const:`ZIP_DEFLATED`; unrecognized and should be :const:`ZIP_STORED`, :const:`ZIP_DEFLATED`; or
values will cause :exc:`RuntimeError` to be raised. If :const:`ZIP_DEFLATED` :const:`ZIP_DEFLATED`; unrecognized
is specified but the :mod:`zlib` module is not available, :exc:`RuntimeError` values will cause :exc:`RuntimeError` to be raised. If :const:`ZIP_DEFLATED` or
:const:`ZIP_BZIP2` is specified but the corresponded module
(:mod:`zlib` or :mod:`bz2`) is not available, :exc:`RuntimeError`
is also raised. The default is :const:`ZIP_STORED`. If *allowZip64* is is also raised. The default is :const:`ZIP_STORED`. If *allowZip64* is
``True`` zipfile will create ZIP files that use the ZIP64 extensions when ``True`` zipfile will create ZIP files that use the ZIP64 extensions when
the zipfile is larger than 2 GB. If it is false (the default) :mod:`zipfile` the zipfile is larger than 2 GB. If it is false (the default) :mod:`zipfile`
...@@ -143,6 +160,9 @@ ZipFile Objects ...@@ -143,6 +160,9 @@ ZipFile Objects
.. versionadded:: 3.2 .. versionadded:: 3.2
Added the ability to use :class:`ZipFile` as a context manager. Added the ability to use :class:`ZipFile` as a context manager.
.. versionchanged:: 3.3
Added support for :mod:`bzip2` compression.
.. method:: ZipFile.close() .. method:: ZipFile.close()
......
...@@ -40,6 +40,11 @@ try: ...@@ -40,6 +40,11 @@ try:
except ImportError: except ImportError:
zlib = None zlib = None
try:
import bz2
except ImportError:
bz2 = None
__all__ = [ __all__ = [
"Error", "TestFailed", "ResourceDenied", "import_module", "Error", "TestFailed", "ResourceDenied", "import_module",
"verbose", "use_resources", "max_memuse", "record_original_stdout", "verbose", "use_resources", "max_memuse", "record_original_stdout",
...@@ -57,7 +62,7 @@ __all__ = [ ...@@ -57,7 +62,7 @@ __all__ = [
"get_attribute", "swap_item", "swap_attr", "requires_IEEE_754", "get_attribute", "swap_item", "swap_attr", "requires_IEEE_754",
"TestHandler", "Matcher", "can_symlink", "skip_unless_symlink", "TestHandler", "Matcher", "can_symlink", "skip_unless_symlink",
"import_fresh_module", "requires_zlib", "PIPE_MAX_SIZE", "failfast", "import_fresh_module", "requires_zlib", "PIPE_MAX_SIZE", "failfast",
"anticipate_failure", "run_with_tz" "anticipate_failure", "run_with_tz", "requires_bz2"
] ]
class Error(Exception): class Error(Exception):
...@@ -506,6 +511,8 @@ requires_IEEE_754 = unittest.skipUnless( ...@@ -506,6 +511,8 @@ requires_IEEE_754 = unittest.skipUnless(
requires_zlib = unittest.skipUnless(zlib, 'requires zlib') requires_zlib = unittest.skipUnless(zlib, 'requires zlib')
requires_bz2 = unittest.skipUnless(bz2, 'requires bz2')
is_jython = sys.platform.startswith('java') is_jython = sys.platform.startswith('java')
# Filename used for testing # Filename used for testing
......
...@@ -13,7 +13,7 @@ from tempfile import TemporaryFile ...@@ -13,7 +13,7 @@ from tempfile import TemporaryFile
from random import randint, random from random import randint, random
from unittest import skipUnless from unittest import skipUnless
from test.support import TESTFN, run_unittest, findfile, unlink, requires_zlib from test.support import TESTFN, run_unittest, findfile, unlink, requires_zlib, requires_bz2
TESTFN2 = TESTFN + "2" TESTFN2 = TESTFN + "2"
TESTFNDIR = TESTFN + "d" TESTFNDIR = TESTFN + "d"
...@@ -313,6 +313,54 @@ class TestsWithSourceFile(unittest.TestCase): ...@@ -313,6 +313,54 @@ class TestsWithSourceFile(unittest.TestCase):
self.assertEqual(openobj.read(1), b'1') self.assertEqual(openobj.read(1), b'1')
self.assertEqual(openobj.read(1), b'2') self.assertEqual(openobj.read(1), b'2')
@requires_bz2
def test_bzip2(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.zip_test(f, zipfile.ZIP_BZIP2)
@requires_bz2
def test_open_bzip2(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.zip_open_test(f, zipfile.ZIP_BZIP2)
@requires_bz2
def test_random_open_bzip2(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.zip_random_open_test(f, zipfile.ZIP_BZIP2)
@requires_bz2
def test_readline_read_bzip2(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.zip_readline_read_test(f, zipfile.ZIP_BZIP2)
@requires_bz2
def test_readline_bzip2(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.zip_readline_test(f, zipfile.ZIP_BZIP2)
@requires_bz2
def test_readlines_bzip2(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.zip_readlines_test(f, zipfile.ZIP_BZIP2)
@requires_bz2
def test_iterlines_bzip2(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.zip_iterlines_test(f, zipfile.ZIP_BZIP2)
@requires_bz2
def test_low_compression_bzip2(self):
"""Check for cases where compressed data is larger than original."""
# Create the ZIP archive
with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_BZIP2) as zipfp:
zipfp.writestr("strfile", '12')
# Get an open object for strfile
with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_BZIP2) as zipfp:
with zipfp.open("strfile") as openobj:
self.assertEqual(openobj.read(1), b'1')
self.assertEqual(openobj.read(1), b'2')
def test_absolute_arcnames(self): def test_absolute_arcnames(self):
with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
zipfp.write(TESTFN, "/absolute") zipfp.write(TESTFN, "/absolute")
...@@ -453,6 +501,13 @@ class TestsWithSourceFile(unittest.TestCase): ...@@ -453,6 +501,13 @@ class TestsWithSourceFile(unittest.TestCase):
info = zipfp.getinfo('b.txt') info = zipfp.getinfo('b.txt')
self.assertEqual(info.compress_type, zipfile.ZIP_DEFLATED) self.assertEqual(info.compress_type, zipfile.ZIP_DEFLATED)
@requires_bz2
def test_writestr_compression_bzip2(self):
zipfp = zipfile.ZipFile(TESTFN2, "w")
zipfp.writestr("b.txt", "hello world", compress_type=zipfile.ZIP_BZIP2)
info = zipfp.getinfo('b.txt')
self.assertEqual(info.compress_type, zipfile.ZIP_BZIP2)
def zip_test_writestr_permissions(self, f, compression): def zip_test_writestr_permissions(self, f, compression):
# Make sure that writestr creates files with mode 0600, # Make sure that writestr creates files with mode 0600,
# when it is passed a name rather than a ZipInfo instance. # when it is passed a name rather than a ZipInfo instance.
...@@ -626,6 +681,11 @@ class TestZip64InSmallFiles(unittest.TestCase): ...@@ -626,6 +681,11 @@ class TestZip64InSmallFiles(unittest.TestCase):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()): for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.zip_test(f, zipfile.ZIP_DEFLATED) self.zip_test(f, zipfile.ZIP_DEFLATED)
@requires_bz2
def test_bzip2(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.zip_test(f, zipfile.ZIP_BZIP2)
def test_absolute_arcnames(self): def test_absolute_arcnames(self):
with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED, with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED,
allowZip64=True) as zipfp: allowZip64=True) as zipfp:
...@@ -754,6 +814,18 @@ class OtherTests(unittest.TestCase): ...@@ -754,6 +814,18 @@ class OtherTests(unittest.TestCase):
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01\x00\x00\x00' b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01\x00\x00\x00'
b'\x00afilePK\x05\x06\x00\x00\x00\x00\x01\x00' b'\x00afilePK\x05\x06\x00\x00\x00\x00\x01\x00'
b'\x01\x003\x00\x00\x003\x00\x00\x00\x00\x00'), b'\x01\x003\x00\x00\x003\x00\x00\x00\x00\x00'),
zipfile.ZIP_BZIP2: (
b'PK\x03\x04\x14\x03\x00\x00\x0c\x00nu\x0c=FA'
b'KE8\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af'
b'ileBZh91AY&SY\xd4\xa8\xca'
b'\x7f\x00\x00\x0f\x11\x80@\x00\x06D\x90\x80 \x00 \xa5'
b'P\xd9!\x03\x03\x13\x13\x13\x89\xa9\xa9\xc2u5:\x9f'
b'\x8b\xb9"\x9c(HjTe?\x80PK\x01\x02\x14'
b'\x03\x14\x03\x00\x00\x0c\x00nu\x0c=FAKE8'
b'\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00\x00\x00\x00\x00\x00'
b'\x00 \x80\x80\x81\x00\x00\x00\x00afilePK'
b'\x05\x06\x00\x00\x00\x00\x01\x00\x01\x003\x00\x00\x00[\x00'
b'\x00\x00\x00\x00'),
} }
def test_unicode_filenames(self): def test_unicode_filenames(self):
...@@ -1007,6 +1079,10 @@ class OtherTests(unittest.TestCase): ...@@ -1007,6 +1079,10 @@ class OtherTests(unittest.TestCase):
def test_testzip_with_bad_crc_deflated(self): def test_testzip_with_bad_crc_deflated(self):
self.check_testzip_with_bad_crc(zipfile.ZIP_DEFLATED) self.check_testzip_with_bad_crc(zipfile.ZIP_DEFLATED)
@requires_bz2
def test_testzip_with_bad_crc_bzip2(self):
self.check_testzip_with_bad_crc(zipfile.ZIP_BZIP2)
def check_read_with_bad_crc(self, compression): def check_read_with_bad_crc(self, compression):
"""Tests that files with bad CRCs raise a BadZipFile exception when read.""" """Tests that files with bad CRCs raise a BadZipFile exception when read."""
zipdata = self.zips_with_bad_crc[compression] zipdata = self.zips_with_bad_crc[compression]
...@@ -1035,6 +1111,10 @@ class OtherTests(unittest.TestCase): ...@@ -1035,6 +1111,10 @@ class OtherTests(unittest.TestCase):
def test_read_with_bad_crc_deflated(self): def test_read_with_bad_crc_deflated(self):
self.check_read_with_bad_crc(zipfile.ZIP_DEFLATED) self.check_read_with_bad_crc(zipfile.ZIP_DEFLATED)
@requires_bz2
def test_read_with_bad_crc_bzip2(self):
self.check_read_with_bad_crc(zipfile.ZIP_BZIP2)
def check_read_return_size(self, compression): def check_read_return_size(self, compression):
# Issue #9837: ZipExtFile.read() shouldn't return more bytes # Issue #9837: ZipExtFile.read() shouldn't return more bytes
# than requested. # than requested.
...@@ -1055,6 +1135,10 @@ class OtherTests(unittest.TestCase): ...@@ -1055,6 +1135,10 @@ class OtherTests(unittest.TestCase):
def test_read_return_size_deflated(self): def test_read_return_size_deflated(self):
self.check_read_return_size(zipfile.ZIP_DEFLATED) self.check_read_return_size(zipfile.ZIP_DEFLATED)
@requires_bz2
def test_read_return_size_bzip2(self):
self.check_read_return_size(zipfile.ZIP_BZIP2)
def test_empty_zipfile(self): def test_empty_zipfile(self):
# Check that creating a file in 'w' or 'a' mode and closing without # Check that creating a file in 'w' or 'a' mode and closing without
# adding any files to the archives creates a valid empty ZIP file # adding any files to the archives creates a valid empty ZIP file
...@@ -1196,6 +1280,11 @@ class TestsWithRandomBinaryFiles(unittest.TestCase): ...@@ -1196,6 +1280,11 @@ class TestsWithRandomBinaryFiles(unittest.TestCase):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()): for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.zip_test(f, zipfile.ZIP_DEFLATED) self.zip_test(f, zipfile.ZIP_DEFLATED)
@requires_bz2
def test_bzip2(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.zip_test(f, zipfile.ZIP_BZIP2)
def zip_open_test(self, f, compression): def zip_open_test(self, f, compression):
self.make_test_archive(f, compression) self.make_test_archive(f, compression)
...@@ -1236,6 +1325,11 @@ class TestsWithRandomBinaryFiles(unittest.TestCase): ...@@ -1236,6 +1325,11 @@ class TestsWithRandomBinaryFiles(unittest.TestCase):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()): for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.zip_open_test(f, zipfile.ZIP_DEFLATED) self.zip_open_test(f, zipfile.ZIP_DEFLATED)
@requires_bz2
def test_open_bzip2(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.zip_open_test(f, zipfile.ZIP_BZIP2)
def zip_random_open_test(self, f, compression): def zip_random_open_test(self, f, compression):
self.make_test_archive(f, compression) self.make_test_archive(f, compression)
...@@ -1264,6 +1358,11 @@ class TestsWithRandomBinaryFiles(unittest.TestCase): ...@@ -1264,6 +1358,11 @@ class TestsWithRandomBinaryFiles(unittest.TestCase):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()): for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.zip_random_open_test(f, zipfile.ZIP_DEFLATED) self.zip_random_open_test(f, zipfile.ZIP_DEFLATED)
@requires_bz2
def test_random_open_bzip2(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.zip_random_open_test(f, zipfile.ZIP_BZIP2)
@requires_zlib @requires_zlib
class TestsWithMultipleOpens(unittest.TestCase): class TestsWithMultipleOpens(unittest.TestCase):
...@@ -1483,6 +1582,31 @@ class UniversalNewlineTests(unittest.TestCase): ...@@ -1483,6 +1582,31 @@ class UniversalNewlineTests(unittest.TestCase):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()): for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.iterlines_test(f, zipfile.ZIP_DEFLATED) self.iterlines_test(f, zipfile.ZIP_DEFLATED)
@requires_bz2
def test_read_bzip2(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.read_test(f, zipfile.ZIP_BZIP2)
@requires_bz2
def test_readline_read_bzip2(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.readline_read_test(f, zipfile.ZIP_BZIP2)
@requires_bz2
def test_readline_bzip2(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.readline_test(f, zipfile.ZIP_BZIP2)
@requires_bz2
def test_readlines_bzip2(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.readlines_test(f, zipfile.ZIP_BZIP2)
@requires_bz2
def test_iterlines_bzip2(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.iterlines_test(f, zipfile.ZIP_BZIP2)
def tearDown(self): def tearDown(self):
for sep, fn in self.arcfiles.items(): for sep, fn in self.arcfiles.items():
os.remove(fn) os.remove(fn)
......
This diff is collapsed.
...@@ -87,6 +87,9 @@ Core and Builtins ...@@ -87,6 +87,9 @@ Core and Builtins
Library Library
------- -------
- Issue #14371: Support bzip2 in zipfile module.
Patch by Serhiy Storchaka.
- Issue #13183: Fix pdb skipping frames after hitting a breakpoint and running - Issue #13183: Fix pdb skipping frames after hitting a breakpoint and running
step. Patch by Xavier de Gaye. step. Patch by Xavier de Gaye.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment