Kaydet (Commit) 569e61f3 authored tarafından Ezio Melotti's avatar Ezio Melotti

#5511: Added the ability to use ZipFile as a context manager. Patch by Brian Curtin.

üst eb74da8e
...@@ -105,28 +105,35 @@ ZipFile Objects ...@@ -105,28 +105,35 @@ ZipFile Objects
Open a ZIP file, where *file* can be either a path to a file (a string) or a Open a ZIP file, where *file* can be either a path to a file (a string) or a
file-like object. The *mode* parameter should be ``'r'`` to read an existing file-like object. The *mode* parameter should be ``'r'`` to read an existing
file, ``'w'`` to truncate and write a new file, or ``'a'`` to append to an file, ``'w'`` to truncate and write a new file, or ``'a'`` to append to an
existing file. If *mode* is ``'a'`` and *file* refers to an existing ZIP file, existing file. If *mode* is ``'a'`` and *file* refers to an existing ZIP
then additional files are added to it. If *file* does not refer to a ZIP file, file, then additional files are added to it. If *file* does not refer to a
then a new ZIP archive is appended to the file. This is meant for adding a ZIP ZIP file, then a new ZIP archive is appended to the file. This is meant for
archive to another file, such as :file:`python.exe`. Using :: adding a ZIP archive to another file (such as :file:`python.exe`).
cat myzip.zip >> python.exe
also works, and at least :program:`WinZip` can read such files. If *mode* is
``a`` and the file does not exist at all, it is created. *compression* is the
ZIP compression method to use when writing the archive, and should be
:const:`ZIP_STORED` or :const:`ZIP_DEFLATED`; unrecognized values will cause
:exc:`RuntimeError` to be raised. If :const:`ZIP_DEFLATED` is specified but the
:mod:`zlib` module is not available, :exc:`RuntimeError` is also raised. The
default is :const:`ZIP_STORED`. If *allowZip64* is ``True`` zipfile will create
ZIP files that use the ZIP64 extensions when the zipfile is larger than 2 GB. If
it is false (the default) :mod:`zipfile` will raise an exception when the ZIP
file would require ZIP64 extensions. ZIP64 extensions are disabled by default
because the default :program:`zip` and :program:`unzip` commands on Unix (the
InfoZIP utilities) don't support these extensions.
.. versionchanged:: 2.6 .. versionchanged:: 2.6
If the file does not exist, it is created if the mode is 'a'. If *mode* is ``a`` and the file does not exist at all, it is created.
*compression* is the ZIP compression method to use when writing the archive,
and should be :const:`ZIP_STORED` or :const:`ZIP_DEFLATED`; unrecognized
values will cause :exc:`RuntimeError` to be raised. If :const:`ZIP_DEFLATED`
is specified but the :mod:`zlib` module is not available, :exc:`RuntimeError`
is also raised. The default is :const:`ZIP_STORED`. If *allowZip64* is
``True`` zipfile will create ZIP files that use the ZIP64 extensions when
the zipfile is larger than 2 GB. If it is false (the default) :mod:`zipfile`
will raise an exception when the ZIP file would require ZIP64 extensions.
ZIP64 extensions are disabled by default because the default :program:`zip`
and :program:`unzip` commands on Unix (the InfoZIP utilities) don't support
these extensions.
ZipFile is also a context manager and therefore supports the
:keyword:`with` statement. In the example, *myzip* is closed after the
:keyword:`with` statement's suite is finished---even if an exception occurs::
with ZipFile('spam.zip', 'w') as myzip:
myzip.write('eggs.txt')
.. versionadded:: 2.7
Added the ability to use :class:`ZipFile` as a context manager.
.. method:: ZipFile.close() .. method:: ZipFile.close()
......
...@@ -41,69 +41,67 @@ class TestsWithSourceFile(unittest.TestCase): ...@@ -41,69 +41,67 @@ class TestsWithSourceFile(unittest.TestCase):
def make_test_archive(self, f, compression): def make_test_archive(self, f, compression):
# Create the ZIP archive # Create the ZIP archive
zipfp = zipfile.ZipFile(f, "w", compression) with zipfile.ZipFile(f, "w", compression) as zipfp:
zipfp.write(TESTFN, "another"+os.extsep+"name") zipfp.write(TESTFN, "another"+os.extsep+"name")
zipfp.write(TESTFN, TESTFN) zipfp.write(TESTFN, TESTFN)
zipfp.writestr("strfile", self.data) zipfp.writestr("strfile", self.data)
zipfp.close()
def zip_test(self, f, compression): def zip_test(self, f, compression):
self.make_test_archive(f, compression) self.make_test_archive(f, compression)
# Read the ZIP archive # Read the ZIP archive
zipfp = zipfile.ZipFile(f, "r", compression) with zipfile.ZipFile(f, "r", compression) as zipfp:
self.assertEqual(zipfp.read(TESTFN), self.data) self.assertEqual(zipfp.read(TESTFN), self.data)
self.assertEqual(zipfp.read("another"+os.extsep+"name"), self.data) self.assertEqual(zipfp.read("another"+os.extsep+"name"), self.data)
self.assertEqual(zipfp.read("strfile"), self.data) self.assertEqual(zipfp.read("strfile"), self.data)
# Print the ZIP directory # Print the ZIP directory
fp = StringIO() fp = StringIO()
stdout = sys.stdout stdout = sys.stdout
try: try:
sys.stdout = fp sys.stdout = fp
zipfp.printdir() zipfp.printdir()
finally: finally:
sys.stdout = stdout sys.stdout = stdout
directory = fp.getvalue() directory = fp.getvalue()
lines = directory.splitlines() lines = directory.splitlines()
self.assertEquals(len(lines), 4) # Number of files + header self.assertEquals(len(lines), 4) # Number of files + header
self.assertTrue('File Name' in lines[0]) self.assertTrue('File Name' in lines[0])
self.assertTrue('Modified' in lines[0]) self.assertTrue('Modified' in lines[0])
self.assertTrue('Size' in lines[0]) self.assertTrue('Size' in lines[0])
fn, date, time, size = lines[1].split() fn, date, time, size = lines[1].split()
self.assertEquals(fn, 'another.name') self.assertEquals(fn, 'another.name')
# XXX: timestamp is not tested # XXX: timestamp is not tested
self.assertEquals(size, str(len(self.data))) self.assertEquals(size, str(len(self.data)))
# Check the namelist # Check the namelist
names = zipfp.namelist() names = zipfp.namelist()
self.assertEquals(len(names), 3) self.assertEquals(len(names), 3)
self.assertTrue(TESTFN in names) self.assertTrue(TESTFN in names)
self.assertTrue("another"+os.extsep+"name" in names) self.assertTrue("another"+os.extsep+"name" in names)
self.assertTrue("strfile" in names) self.assertTrue("strfile" in names)
# Check infolist # Check infolist
infos = zipfp.infolist() infos = zipfp.infolist()
names = [ i.filename for i in infos ] names = [ i.filename for i in infos ]
self.assertEquals(len(names), 3) self.assertEquals(len(names), 3)
self.assertTrue(TESTFN in names) self.assertTrue(TESTFN in names)
self.assertTrue("another"+os.extsep+"name" in names) self.assertTrue("another"+os.extsep+"name" in names)
self.assertTrue("strfile" in names) self.assertTrue("strfile" in names)
for i in infos: for i in infos:
self.assertEquals(i.file_size, len(self.data)) self.assertEquals(i.file_size, len(self.data))
# check getinfo # check getinfo
for nm in (TESTFN, "another"+os.extsep+"name", "strfile"): for nm in (TESTFN, "another"+os.extsep+"name", "strfile"):
info = zipfp.getinfo(nm) info = zipfp.getinfo(nm)
self.assertEquals(info.filename, nm) self.assertEquals(info.filename, nm)
self.assertEquals(info.file_size, len(self.data)) self.assertEquals(info.file_size, len(self.data))
# Check that testzip doesn't raise an exception # Check that testzip doesn't raise an exception
zipfp.testzip() zipfp.testzip()
zipfp.close()
def test_stored(self): def test_stored(self):
for f in (TESTFN2, TemporaryFile(), StringIO()): for f in (TESTFN2, TemporaryFile(), StringIO()):
...@@ -113,26 +111,25 @@ class TestsWithSourceFile(unittest.TestCase): ...@@ -113,26 +111,25 @@ class TestsWithSourceFile(unittest.TestCase):
self.make_test_archive(f, compression) self.make_test_archive(f, compression)
# Read the ZIP archive # Read the ZIP archive
zipfp = zipfile.ZipFile(f, "r", compression) with zipfile.ZipFile(f, "r", compression) as zipfp:
zipdata1 = [] zipdata1 = []
zipopen1 = zipfp.open(TESTFN) zipopen1 = zipfp.open(TESTFN)
while 1: while True:
read_data = zipopen1.read(256) read_data = zipopen1.read(256)
if not read_data: if not read_data:
break break
zipdata1.append(read_data) zipdata1.append(read_data)
zipdata2 = [] zipdata2 = []
zipopen2 = zipfp.open("another"+os.extsep+"name") zipopen2 = zipfp.open("another"+os.extsep+"name")
while 1: while True:
read_data = zipopen2.read(256) read_data = zipopen2.read(256)
if not read_data: if not read_data:
break break
zipdata2.append(read_data) zipdata2.append(read_data)
self.assertEqual(''.join(zipdata1), self.data) self.assertEqual(''.join(zipdata1), self.data)
self.assertEqual(''.join(zipdata2), self.data) self.assertEqual(''.join(zipdata2), self.data)
zipfp.close()
def test_open_stored(self): def test_open_stored(self):
for f in (TESTFN2, TemporaryFile(), StringIO()): for f in (TESTFN2, TemporaryFile(), StringIO()):
...@@ -140,38 +137,35 @@ class TestsWithSourceFile(unittest.TestCase): ...@@ -140,38 +137,35 @@ class TestsWithSourceFile(unittest.TestCase):
def test_open_via_zip_info(self): def test_open_via_zip_info(self):
# Create the ZIP archive # Create the ZIP archive
zipfp = zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
zipfp.writestr("name", "foo") zipfp.writestr("name", "foo")
zipfp.writestr("name", "bar") zipfp.writestr("name", "bar")
zipfp.close()
with zipfile.ZipFile(TESTFN2, "r") as zipfp:
zipfp = zipfile.ZipFile(TESTFN2, "r") infos = zipfp.infolist()
infos = zipfp.infolist() data = ""
data = "" for info in infos:
for info in infos: data += zipfp.open(info).read()
data += zipfp.open(info).read() self.assertTrue(data == "foobar" or data == "barfoo")
self.assertTrue(data == "foobar" or data == "barfoo") data = ""
data = "" for info in infos:
for info in infos: data += zipfp.read(info)
data += zipfp.read(info) self.assertTrue(data == "foobar" or data == "barfoo")
self.assertTrue(data == "foobar" or data == "barfoo")
zipfp.close()
def zip_random_open_test(self, f, compression): def zip_random_open_test(self, f, compression):
self.make_test_archive(f, compression) self.make_test_archive(f, compression)
# Read the ZIP archive # Read the ZIP archive
zipfp = zipfile.ZipFile(f, "r", compression) with zipfile.ZipFile(f, "r", compression) as zipfp:
zipdata1 = [] zipdata1 = []
zipopen1 = zipfp.open(TESTFN) zipopen1 = zipfp.open(TESTFN)
while 1: while True:
read_data = zipopen1.read(randint(1, 1024)) read_data = zipopen1.read(randint(1, 1024))
if not read_data: if not read_data:
break break
zipdata1.append(read_data) zipdata1.append(read_data)
self.assertEqual(''.join(zipdata1), self.data) self.assertEqual(''.join(zipdata1), self.data)
zipfp.close()
def test_random_open_stored(self): def test_random_open_stored(self):
for f in (TESTFN2, TemporaryFile(), StringIO()): for f in (TESTFN2, TemporaryFile(), StringIO()):
...@@ -181,34 +175,28 @@ class TestsWithSourceFile(unittest.TestCase): ...@@ -181,34 +175,28 @@ class TestsWithSourceFile(unittest.TestCase):
self.make_test_archive(f, compression) self.make_test_archive(f, compression)
# Read the ZIP archive # Read the ZIP archive
zipfp = zipfile.ZipFile(f, "r") with zipfile.ZipFile(f, "r") as zipfp:
zipopen = zipfp.open(TESTFN) zipopen = zipfp.open(TESTFN)
for line in self.line_gen: for line in self.line_gen:
linedata = zipopen.readline() linedata = zipopen.readline()
self.assertEqual(linedata, line + '\n') self.assertEqual(linedata, line + '\n')
zipfp.close()
def zip_readlines_test(self, f, compression): def zip_readlines_test(self, f, compression):
self.make_test_archive(f, compression) self.make_test_archive(f, compression)
# Read the ZIP archive # Read the ZIP archive
zipfp = zipfile.ZipFile(f, "r") with zipfile.ZipFile(f, "r") as zipfp:
ziplines = zipfp.open(TESTFN).readlines() ziplines = zipfp.open(TESTFN).readlines()
for line, zipline in zip(self.line_gen, ziplines): for line, zipline in zip(self.line_gen, ziplines):
self.assertEqual(zipline, line + '\n') self.assertEqual(zipline, line + '\n')
zipfp.close()
def zip_iterlines_test(self, f, compression): def zip_iterlines_test(self, f, compression):
self.make_test_archive(f, compression) self.make_test_archive(f, compression)
# Read the ZIP archive # Read the ZIP archive
zipfp = zipfile.ZipFile(f, "r") with zipfile.ZipFile(f, "r") as zipfp:
for line, zipline in zip(self.line_gen, zipfp.open(TESTFN)): for line, zipline in zip(self.line_gen, zipfp.open(TESTFN)):
self.assertEqual(zipline, line + '\n') self.assertEqual(zipline, line + '\n')
zipfp.close()
def test_readline_stored(self): def test_readline_stored(self):
for f in (TESTFN2, TemporaryFile(), StringIO()): for f in (TESTFN2, TemporaryFile(), StringIO()):
...@@ -256,34 +244,30 @@ class TestsWithSourceFile(unittest.TestCase): ...@@ -256,34 +244,30 @@ class TestsWithSourceFile(unittest.TestCase):
def test_low_compression(self): def test_low_compression(self):
# Checks for cases where compressed data is larger than original # Checks for cases where compressed data is larger than original
# Create the ZIP archive # Create the ZIP archive
zipfp = zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_DEFLATED) with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_DEFLATED) as zipfp:
zipfp.writestr("strfile", '12') zipfp.writestr("strfile", '12')
zipfp.close()
# Get an open object for strfile # Get an open object for strfile
zipfp = zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_DEFLATED) with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_DEFLATED) as zipfp:
openobj = zipfp.open("strfile") openobj = zipfp.open("strfile")
self.assertEqual(openobj.read(1), '1') self.assertEqual(openobj.read(1), '1')
self.assertEqual(openobj.read(1), '2') self.assertEqual(openobj.read(1), '2')
def test_absolute_arcnames(self): def test_absolute_arcnames(self):
zipfp = zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
zipfp.write(TESTFN, "/absolute") zipfp.write(TESTFN, "/absolute")
zipfp.close()
zipfp = zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_STORED) with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_STORED) as zipfp:
self.assertEqual(zipfp.namelist(), ["absolute"]) self.assertEqual(zipfp.namelist(), ["absolute"])
zipfp.close()
def test_append_to_zip_file(self): def test_append_to_zip_file(self):
# Test appending to an existing zipfile # Test appending to an existing zipfile
zipfp = zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
zipfp.write(TESTFN, TESTFN) zipfp.write(TESTFN, TESTFN)
zipfp.close()
zipfp = zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED) with zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED) as zipfp:
zipfp.writestr("strfile", self.data) zipfp.writestr("strfile", self.data)
self.assertEqual(zipfp.namelist(), [TESTFN, "strfile"]) self.assertEqual(zipfp.namelist(), [TESTFN, "strfile"])
zipfp.close()
def test_append_to_non_zip_file(self): def test_append_to_non_zip_file(self):
# Test appending to an existing file that is not a zipfile # Test appending to an existing file that is not a zipfile
...@@ -293,94 +277,83 @@ class TestsWithSourceFile(unittest.TestCase): ...@@ -293,94 +277,83 @@ class TestsWithSourceFile(unittest.TestCase):
f = file(TESTFN2, 'wb') f = file(TESTFN2, 'wb')
f.write(d) f.write(d)
f.close() f.close()
zipfp = zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED) with zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED) as zipfp:
zipfp.write(TESTFN, TESTFN) zipfp.write(TESTFN, TESTFN)
zipfp.close()
f = file(TESTFN2, 'rb') f = file(TESTFN2, 'rb')
f.seek(len(d)) f.seek(len(d))
zipfp = zipfile.ZipFile(f, "r") with zipfile.ZipFile(f, "r") as zipfp:
self.assertEqual(zipfp.namelist(), [TESTFN]) self.assertEqual(zipfp.namelist(), [TESTFN])
zipfp.close()
f.close() f.close()
def test_write_default_name(self): def test_write_default_name(self):
# Check that calling ZipFile.write without arcname specified produces the expected result # Check that calling ZipFile.write without arcname specified produces the expected result
zipfp = zipfile.ZipFile(TESTFN2, "w") with zipfile.ZipFile(TESTFN2, "w") as zipfp:
zipfp.write(TESTFN) zipfp.write(TESTFN)
self.assertEqual(zipfp.read(TESTFN), file(TESTFN).read()) self.assertEqual(zipfp.read(TESTFN), file(TESTFN).read())
zipfp.close()
@skipUnless(zlib, "requires zlib") @skipUnless(zlib, "requires zlib")
def test_per_file_compression(self): def test_per_file_compression(self):
# Check that files within a Zip archive can have different compression options # Check that files within a Zip archive can have different compression options
zipfp = zipfile.ZipFile(TESTFN2, "w") with zipfile.ZipFile(TESTFN2, "w") as zipfp:
zipfp.write(TESTFN, 'storeme', zipfile.ZIP_STORED) zipfp.write(TESTFN, 'storeme', zipfile.ZIP_STORED)
zipfp.write(TESTFN, 'deflateme', zipfile.ZIP_DEFLATED) zipfp.write(TESTFN, 'deflateme', zipfile.ZIP_DEFLATED)
sinfo = zipfp.getinfo('storeme') sinfo = zipfp.getinfo('storeme')
dinfo = zipfp.getinfo('deflateme') dinfo = zipfp.getinfo('deflateme')
self.assertEqual(sinfo.compress_type, zipfile.ZIP_STORED) self.assertEqual(sinfo.compress_type, zipfile.ZIP_STORED)
self.assertEqual(dinfo.compress_type, zipfile.ZIP_DEFLATED) self.assertEqual(dinfo.compress_type, zipfile.ZIP_DEFLATED)
zipfp.close()
def test_write_to_readonly(self): def test_write_to_readonly(self):
# Check that trying to call write() on a readonly ZipFile object # Check that trying to call write() on a readonly ZipFile object
# raises a RuntimeError # raises a RuntimeError
zipf = zipfile.ZipFile(TESTFN2, mode="w") with zipfile.ZipFile(TESTFN2, mode="w") as zipfp:
zipf.writestr("somefile.txt", "bogus") zipfp.writestr("somefile.txt", "bogus")
zipf.close()
zipf = zipfile.ZipFile(TESTFN2, mode="r")
self.assertRaises(RuntimeError, zipf.write, TESTFN)
zipf.close()
def test_extract(self): with zipfile.ZipFile(TESTFN2, mode="r") as zipfp:
zipfp = zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) self.assertRaises(RuntimeError, zipfp.write, TESTFN)
for fpath, fdata in SMALL_TEST_DATA:
zipfp.writestr(fpath, fdata)
zipfp.close()
zipfp = zipfile.ZipFile(TESTFN2, "r") def test_extract(self):
for fpath, fdata in SMALL_TEST_DATA: with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
writtenfile = zipfp.extract(fpath) for fpath, fdata in SMALL_TEST_DATA:
zipfp.writestr(fpath, fdata)
# make sure it was written to the right place with zipfile.ZipFile(TESTFN2, "r") as zipfp:
if os.path.isabs(fpath): for fpath, fdata in SMALL_TEST_DATA:
correctfile = os.path.join(os.getcwd(), fpath[1:]) writtenfile = zipfp.extract(fpath)
else:
correctfile = os.path.join(os.getcwd(), fpath)
correctfile = os.path.normpath(correctfile)
self.assertEqual(writtenfile, correctfile) # make sure it was written to the right place
if os.path.isabs(fpath):
correctfile = os.path.join(os.getcwd(), fpath[1:])
else:
correctfile = os.path.join(os.getcwd(), fpath)
correctfile = os.path.normpath(correctfile)
# make sure correct data is in correct file self.assertEqual(writtenfile, correctfile)
self.assertEqual(fdata, file(writtenfile, "rb").read())
os.remove(writtenfile) # make sure correct data is in correct file
self.assertEqual(fdata, file(writtenfile, "rb").read())
zipfp.close() os.remove(writtenfile)
# remove the test file subdirectories # remove the test file subdirectories
shutil.rmtree(os.path.join(os.getcwd(), 'ziptest2dir')) shutil.rmtree(os.path.join(os.getcwd(), 'ziptest2dir'))
def test_extract_all(self): def test_extract_all(self):
zipfp = zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
for fpath, fdata in SMALL_TEST_DATA: for fpath, fdata in SMALL_TEST_DATA:
zipfp.writestr(fpath, fdata) zipfp.writestr(fpath, fdata)
zipfp.close()
zipfp = zipfile.ZipFile(TESTFN2, "r") with zipfile.ZipFile(TESTFN2, "r") as zipfp:
zipfp.extractall() zipfp.extractall()
for fpath, fdata in SMALL_TEST_DATA: for fpath, fdata in SMALL_TEST_DATA:
if os.path.isabs(fpath): if os.path.isabs(fpath):
outfile = os.path.join(os.getcwd(), fpath[1:]) outfile = os.path.join(os.getcwd(), fpath[1:])
else: else:
outfile = os.path.join(os.getcwd(), fpath) outfile = os.path.join(os.getcwd(), fpath)
self.assertEqual(fdata, file(outfile, "rb").read()) self.assertEqual(fdata, file(outfile, "rb").read())
os.remove(outfile) os.remove(outfile)
zipfp.close()
# remove the test file subdirectories # remove the test file subdirectories
shutil.rmtree(os.path.join(os.getcwd(), 'ziptest2dir')) shutil.rmtree(os.path.join(os.getcwd(), 'ziptest2dir'))
...@@ -390,14 +363,39 @@ class TestsWithSourceFile(unittest.TestCase): ...@@ -390,14 +363,39 @@ class TestsWithSourceFile(unittest.TestCase):
# when it is passed a name rather than a ZipInfo instance. # when it is passed a name rather than a ZipInfo instance.
self.make_test_archive(f, compression) self.make_test_archive(f, compression)
zipfp = zipfile.ZipFile(f, "r") with zipfile.ZipFile(f, "r") as zipfp:
zinfo = zipfp.getinfo('strfile') zinfo = zipfp.getinfo('strfile')
self.assertEqual(zinfo.external_attr, 0600 << 16) self.assertEqual(zinfo.external_attr, 0600 << 16)
def test_writestr_permissions(self): def test_writestr_permissions(self):
for f in (TESTFN2, TemporaryFile(), StringIO()): for f in (TESTFN2, TemporaryFile(), StringIO()):
self.zip_test_writestr_permissions(f, zipfile.ZIP_STORED) self.zip_test_writestr_permissions(f, zipfile.ZIP_STORED)
def test_close(self):
"""Check that the zipfile is closed after the 'with' block."""
with zipfile.ZipFile(TESTFN2, "w") as zipfp:
for fpath, fdata in SMALL_TEST_DATA:
zipfp.writestr(fpath, fdata)
self.assertTrue(zipfp.fp is not None, 'zipfp is not open')
self.assertTrue(zipfp.fp is None, 'zipfp is not closed')
with zipfile.ZipFile(TESTFN2, "r") as zipfp:
self.assertTrue(zipfp.fp is not None, 'zipfp is not open')
self.assertTrue(zipfp.fp is None, 'zipfp is not closed')
def test_close_on_exception(self):
"""Check that the zipfile is closed if an exception is raised in the
'with' block."""
with zipfile.ZipFile(TESTFN2, "w") as zipfp:
for fpath, fdata in SMALL_TEST_DATA:
zipfp.writestr(fpath, fdata)
try:
with zipfile.ZipFile(TESTFN2, "r") as zipfp2:
raise zipfile.BadZipfile()
except zipfile.BadZipfile:
self.assertTrue(zipfp2.fp is None, 'zipfp is not closed')
def tearDown(self): def tearDown(self):
unlink(TESTFN) unlink(TESTFN)
unlink(TESTFN2) unlink(TESTFN2)
...@@ -420,16 +418,14 @@ class TestZip64InSmallFiles(unittest.TestCase): ...@@ -420,16 +418,14 @@ class TestZip64InSmallFiles(unittest.TestCase):
fp.close() fp.close()
def large_file_exception_test(self, f, compression): def large_file_exception_test(self, f, compression):
zipfp = zipfile.ZipFile(f, "w", compression) with zipfile.ZipFile(f, "w", compression) as zipfp:
self.assertRaises(zipfile.LargeZipFile, self.assertRaises(zipfile.LargeZipFile,
zipfp.write, TESTFN, "another"+os.extsep+"name") zipfp.write, TESTFN, "another"+os.extsep+"name")
zipfp.close()
def large_file_exception_test2(self, f, compression): def large_file_exception_test2(self, f, compression):
zipfp = zipfile.ZipFile(f, "w", compression) with zipfile.ZipFile(f, "w", compression) as zipfp:
self.assertRaises(zipfile.LargeZipFile, self.assertRaises(zipfile.LargeZipFile,
zipfp.writestr, "another"+os.extsep+"name", self.data) zipfp.writestr, "another"+os.extsep+"name", self.data)
zipfp.close()
def test_large_file_exception(self): def test_large_file_exception(self):
for f in (TESTFN2, TemporaryFile(), StringIO()): for f in (TESTFN2, TemporaryFile(), StringIO()):
...@@ -438,68 +434,64 @@ class TestZip64InSmallFiles(unittest.TestCase): ...@@ -438,68 +434,64 @@ class TestZip64InSmallFiles(unittest.TestCase):
def zip_test(self, f, compression): def zip_test(self, f, compression):
# Create the ZIP archive # Create the ZIP archive
zipfp = zipfile.ZipFile(f, "w", compression, allowZip64=True) with zipfile.ZipFile(f, "w", compression, allowZip64=True) as zipfp:
zipfp.write(TESTFN, "another"+os.extsep+"name") zipfp.write(TESTFN, "another"+os.extsep+"name")
zipfp.write(TESTFN, TESTFN) zipfp.write(TESTFN, TESTFN)
zipfp.writestr("strfile", self.data) zipfp.writestr("strfile", self.data)
zipfp.close()
# Read the ZIP archive # Read the ZIP archive
zipfp = zipfile.ZipFile(f, "r", compression) with zipfile.ZipFile(f, "r", compression) as zipfp:
self.assertEqual(zipfp.read(TESTFN), self.data) self.assertEqual(zipfp.read(TESTFN), self.data)
self.assertEqual(zipfp.read("another"+os.extsep+"name"), self.data) self.assertEqual(zipfp.read("another"+os.extsep+"name"), self.data)
self.assertEqual(zipfp.read("strfile"), self.data) self.assertEqual(zipfp.read("strfile"), self.data)
# Print the ZIP directory # Print the ZIP directory
fp = StringIO() fp = StringIO()
stdout = sys.stdout stdout = sys.stdout
try: try:
sys.stdout = fp sys.stdout = fp
zipfp.printdir()
zipfp.printdir() finally:
finally: sys.stdout = stdout
sys.stdout = stdout
directory = fp.getvalue()
directory = fp.getvalue() lines = directory.splitlines()
lines = directory.splitlines() self.assertEquals(len(lines), 4) # Number of files + header
self.assertEquals(len(lines), 4) # Number of files + header
self.assertTrue('File Name' in lines[0])
self.assertTrue('File Name' in lines[0]) self.assertTrue('Modified' in lines[0])
self.assertTrue('Modified' in lines[0]) self.assertTrue('Size' in lines[0])
self.assertTrue('Size' in lines[0])
fn, date, time, size = lines[1].split()
fn, date, time, size = lines[1].split() self.assertEquals(fn, 'another.name')
self.assertEquals(fn, 'another.name') # XXX: timestamp is not tested
# XXX: timestamp is not tested self.assertEquals(size, str(len(self.data)))
self.assertEquals(size, str(len(self.data)))
# Check the namelist
# Check the namelist names = zipfp.namelist()
names = zipfp.namelist() self.assertEquals(len(names), 3)
self.assertEquals(len(names), 3) self.assertTrue(TESTFN in names)
self.assertTrue(TESTFN in names) self.assertTrue("another"+os.extsep+"name" in names)
self.assertTrue("another"+os.extsep+"name" in names) self.assertTrue("strfile" in names)
self.assertTrue("strfile" in names)
# Check infolist
# Check infolist infos = zipfp.infolist()
infos = zipfp.infolist() names = [ i.filename for i in infos ]
names = [ i.filename for i in infos ] self.assertEquals(len(names), 3)
self.assertEquals(len(names), 3) self.assertTrue(TESTFN in names)
self.assertTrue(TESTFN in names) self.assertTrue("another"+os.extsep+"name" in names)
self.assertTrue("another"+os.extsep+"name" in names) self.assertTrue("strfile" in names)
self.assertTrue("strfile" in names) for i in infos:
for i in infos: self.assertEquals(i.file_size, len(self.data))
self.assertEquals(i.file_size, len(self.data))
# check getinfo
# check getinfo for nm in (TESTFN, "another"+os.extsep+"name", "strfile"):
for nm in (TESTFN, "another"+os.extsep+"name", "strfile"): info = zipfp.getinfo(nm)
info = zipfp.getinfo(nm) self.assertEquals(info.filename, nm)
self.assertEquals(info.filename, nm) self.assertEquals(info.file_size, len(self.data))
self.assertEquals(info.file_size, len(self.data))
# Check that testzip doesn't raise an exception
# Check that testzip doesn't raise an exception zipfp.testzip()
zipfp.testzip()
zipfp.close()
def test_stored(self): def test_stored(self):
for f in (TESTFN2, TemporaryFile(), StringIO()): for f in (TESTFN2, TemporaryFile(), StringIO()):
...@@ -511,13 +503,12 @@ class TestZip64InSmallFiles(unittest.TestCase): ...@@ -511,13 +503,12 @@ class TestZip64InSmallFiles(unittest.TestCase):
self.zip_test(f, zipfile.ZIP_DEFLATED) self.zip_test(f, zipfile.ZIP_DEFLATED)
def test_absolute_arcnames(self): def test_absolute_arcnames(self):
zipfp = zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED, allowZip64=True) with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED,
zipfp.write(TESTFN, "/absolute") allowZip64=True) as zipfp:
zipfp.close() zipfp.write(TESTFN, "/absolute")
zipfp = zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_STORED) with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_STORED) as zipfp:
self.assertEqual(zipfp.namelist(), ["absolute"]) self.assertEqual(zipfp.namelist(), ["absolute"])
zipfp.close()
def tearDown(self): def tearDown(self):
zipfile.ZIP64_LIMIT = self._limit zipfile.ZIP64_LIMIT = self._limit
...@@ -527,41 +518,43 @@ class TestZip64InSmallFiles(unittest.TestCase): ...@@ -527,41 +518,43 @@ class TestZip64InSmallFiles(unittest.TestCase):
class PyZipFileTests(unittest.TestCase): class PyZipFileTests(unittest.TestCase):
def test_write_pyfile(self): def test_write_pyfile(self):
zipfp = zipfile.PyZipFile(TemporaryFile(), "w") with zipfile.PyZipFile(TemporaryFile(), "w") as zipfp:
fn = __file__ fn = __file__
if fn.endswith('.pyc') or fn.endswith('.pyo'): if fn.endswith('.pyc') or fn.endswith('.pyo'):
fn = fn[:-1] fn = fn[:-1]
zipfp.writepy(fn) zipfp.writepy(fn)
bn = os.path.basename(fn) bn = os.path.basename(fn)
self.assertTrue(bn not in zipfp.namelist()) self.assertTrue(bn not in zipfp.namelist())
self.assertTrue(bn + 'o' in zipfp.namelist() or bn + 'c' in zipfp.namelist()) self.assertTrue(bn + 'o' in zipfp.namelist() or
zipfp.close() bn + 'c' in zipfp.namelist())
zipfp = zipfile.PyZipFile(TemporaryFile(), "w") with zipfile.PyZipFile(TemporaryFile(), "w") as zipfp:
fn = __file__ fn = __file__
if fn.endswith('.pyc') or fn.endswith('.pyo'): if fn.endswith('.pyc') or fn.endswith('.pyo'):
fn = fn[:-1] fn = fn[:-1]
zipfp.writepy(fn, "testpackage") zipfp.writepy(fn, "testpackage")
bn = "%s/%s"%("testpackage", os.path.basename(fn)) bn = "%s/%s"%("testpackage", os.path.basename(fn))
self.assertTrue(bn not in zipfp.namelist()) self.assertTrue(bn not in zipfp.namelist())
self.assertTrue(bn + 'o' in zipfp.namelist() or bn + 'c' in zipfp.namelist()) self.assertTrue(bn + 'o' in zipfp.namelist() or
zipfp.close() bn + 'c' in zipfp.namelist())
def test_write_python_package(self): def test_write_python_package(self):
import email import email
packagedir = os.path.dirname(email.__file__) packagedir = os.path.dirname(email.__file__)
zipfp = zipfile.PyZipFile(TemporaryFile(), "w") with zipfile.PyZipFile(TemporaryFile(), "w") as zipfp:
zipfp.writepy(packagedir) zipfp.writepy(packagedir)
# Check for a couple of modules at different levels of the hieararchy # Check for a couple of modules at different levels of the hieararchy
names = zipfp.namelist() names = zipfp.namelist()
self.assertTrue('email/__init__.pyo' in names or 'email/__init__.pyc' in names) self.assertTrue('email/__init__.pyo' in names or
self.assertTrue('email/mime/text.pyo' in names or 'email/mime/text.pyc' in names) 'email/__init__.pyc' in names)
self.assertTrue('email/mime/text.pyo' in names or
'email/mime/text.pyc' in names)
def test_write_python_directory(self): def test_write_python_directory(self):
os.mkdir(TESTFN2) os.mkdir(TESTFN2)
...@@ -590,23 +583,22 @@ class PyZipFileTests(unittest.TestCase): ...@@ -590,23 +583,22 @@ class PyZipFileTests(unittest.TestCase):
shutil.rmtree(TESTFN2) shutil.rmtree(TESTFN2)
def test_write_non_pyfile(self): def test_write_non_pyfile(self):
zipfp = zipfile.PyZipFile(TemporaryFile(), "w") with zipfile.PyZipFile(TemporaryFile(), "w") as zipfp:
file(TESTFN, 'w').write('most definitely not a python file') file(TESTFN, 'w').write('most definitely not a python file')
self.assertRaises(RuntimeError, zipfp.writepy, TESTFN) self.assertRaises(RuntimeError, zipfp.writepy, TESTFN)
os.remove(TESTFN) os.remove(TESTFN)
class OtherTests(unittest.TestCase): class OtherTests(unittest.TestCase):
def test_unicode_filenames(self): def test_unicode_filenames(self):
zf = zipfile.ZipFile(TESTFN, "w") with zipfile.ZipFile(TESTFN, "w") as zf:
zf.writestr(u"foo.txt", "Test for unicode filename") zf.writestr(u"foo.txt", "Test for unicode filename")
zf.writestr(u"\xf6.txt", "Test for unicode filename") zf.writestr(u"\xf6.txt", "Test for unicode filename")
self.assertTrue(isinstance(zf.infolist()[0].filename, unicode)) self.assertTrue(isinstance(zf.infolist()[0].filename, unicode))
zf.close()
zf = zipfile.ZipFile(TESTFN, "r") with zipfile.ZipFile(TESTFN, "r") as zf:
self.assertEqual(zf.filelist[0].filename, "foo.txt") self.assertEqual(zf.filelist[0].filename, "foo.txt")
self.assertEqual(zf.filelist[1].filename, u"\xf6.txt") self.assertEqual(zf.filelist[1].filename, u"\xf6.txt")
zf.close()
def test_create_non_existent_file_for_append(self): def test_create_non_existent_file_for_append(self):
if os.path.exists(TESTFN): if os.path.exists(TESTFN):
...@@ -616,17 +608,15 @@ class OtherTests(unittest.TestCase): ...@@ -616,17 +608,15 @@ class OtherTests(unittest.TestCase):
content = 'hello, world. this is some content.' content = 'hello, world. this is some content.'
try: try:
zf = zipfile.ZipFile(TESTFN, 'a') with zipfile.ZipFile(TESTFN, 'a') as zf:
zf.writestr(filename, content) zf.writestr(filename, content)
zf.close()
except IOError: except IOError:
self.fail('Could not append data to a non-existent zip file.') self.fail('Could not append data to a non-existent zip file.')
self.assertTrue(os.path.exists(TESTFN)) self.assertTrue(os.path.exists(TESTFN))
zf = zipfile.ZipFile(TESTFN, 'r') with zipfile.ZipFile(TESTFN, 'r') as zf:
self.assertEqual(zf.read(filename), content) self.assertEqual(zf.read(filename), content)
zf.close()
def test_close_erroneous_file(self): def test_close_erroneous_file(self):
# This test checks that the ZipFile constructor closes the file object # This test checks that the ZipFile constructor closes the file object
...@@ -670,9 +660,8 @@ class OtherTests(unittest.TestCase): ...@@ -670,9 +660,8 @@ class OtherTests(unittest.TestCase):
# a file that is a zip file # a file that is a zip file
# - passing a filename # - passing a filename
zipf = zipfile.ZipFile(TESTFN, mode="w") with zipfile.ZipFile(TESTFN, mode="w") as zipf:
zipf.writestr("foo.txt", "O, for a Muse of Fire!") zipf.writestr("foo.txt", "O, for a Muse of Fire!")
zipf.close()
chk = zipfile.is_zipfile(TESTFN) chk = zipfile.is_zipfile(TESTFN)
self.assertTrue(chk) self.assertTrue(chk)
# - passing a file object # - passing a file object
...@@ -717,9 +706,8 @@ class OtherTests(unittest.TestCase): ...@@ -717,9 +706,8 @@ class OtherTests(unittest.TestCase):
def test_closed_zip_raises_RuntimeError(self): def test_closed_zip_raises_RuntimeError(self):
# Verify that testzip() doesn't swallow inappropriate exceptions. # Verify that testzip() doesn't swallow inappropriate exceptions.
data = StringIO() data = StringIO()
zipf = zipfile.ZipFile(data, mode="w") with zipfile.ZipFile(data, mode="w") as zipf:
zipf.writestr("foo.txt", "O, for a Muse of Fire!") zipf.writestr("foo.txt", "O, for a Muse of Fire!")
zipf.close()
# This is correct; calling .read on a closed ZipFile should throw # This is correct; calling .read on a closed ZipFile should throw
# a RuntimeError, and so should calling .testzip. An earlier # a RuntimeError, and so should calling .testzip. An earlier
...@@ -738,33 +726,31 @@ class OtherTests(unittest.TestCase): ...@@ -738,33 +726,31 @@ class OtherTests(unittest.TestCase):
def test_bad_open_mode(self): def test_bad_open_mode(self):
# Check that bad modes passed to ZipFile.open are caught # Check that bad modes passed to ZipFile.open are caught
zipf = zipfile.ZipFile(TESTFN, mode="w") with zipfile.ZipFile(TESTFN, mode="w") as zipf:
zipf.writestr("foo.txt", "O, for a Muse of Fire!") zipf.writestr("foo.txt", "O, for a Muse of Fire!")
zipf.close()
zipf = zipfile.ZipFile(TESTFN, mode="r") with zipfile.ZipFile(TESTFN, mode="r") as zipf:
# read the data to make sure the file is there # read the data to make sure the file is there
zipf.read("foo.txt") zipf.read("foo.txt")
self.assertRaises(RuntimeError, zipf.open, "foo.txt", "q") self.assertRaises(RuntimeError, zipf.open, "foo.txt", "q")
zipf.close()
def test_read0(self): def test_read0(self):
# Check that calling read(0) on a ZipExtFile object returns an empty # Check that calling read(0) on a ZipExtFile object returns an empty
# string and doesn't advance file pointer # string and doesn't advance file pointer
zipf = zipfile.ZipFile(TESTFN, mode="w") with zipfile.ZipFile(TESTFN, mode="w") as zipf:
zipf.writestr("foo.txt", "O, for a Muse of Fire!") zipf.writestr("foo.txt", "O, for a Muse of Fire!")
# read the data to make sure the file is there # read the data to make sure the file is there
f = zipf.open("foo.txt") f = zipf.open("foo.txt")
for i in xrange(FIXEDTEST_SIZE): for i in xrange(FIXEDTEST_SIZE):
self.assertEqual(f.read(0), '') self.assertEqual(f.read(0), '')
self.assertEqual(f.read(), "O, for a Muse of Fire!") self.assertEqual(f.read(), "O, for a Muse of Fire!")
zipf.close()
def test_open_non_existent_item(self): def test_open_non_existent_item(self):
# Check that attempting to call open() for an item that doesn't # Check that attempting to call open() for an item that doesn't
# exist in the archive raises a RuntimeError # exist in the archive raises a RuntimeError
zipf = zipfile.ZipFile(TESTFN, mode="w") with zipfile.ZipFile(TESTFN, mode="w") as zipf:
self.assertRaises(KeyError, zipf.open, "foo.txt", "r") self.assertRaises(KeyError, zipf.open, "foo.txt", "r")
def test_bad_compression_mode(self): def test_bad_compression_mode(self):
# Check that bad compression methods passed to ZipFile.open are caught # Check that bad compression methods passed to ZipFile.open are caught
...@@ -772,9 +758,9 @@ class OtherTests(unittest.TestCase): ...@@ -772,9 +758,9 @@ class OtherTests(unittest.TestCase):
def test_null_byte_in_filename(self): def test_null_byte_in_filename(self):
# Check that a filename containing a null byte is properly terminated # Check that a filename containing a null byte is properly terminated
zipf = zipfile.ZipFile(TESTFN, mode="w") with zipfile.ZipFile(TESTFN, mode="w") as zipf:
zipf.writestr("foo.txt\x00qqq", "O, for a Muse of Fire!") zipf.writestr("foo.txt\x00qqq", "O, for a Muse of Fire!")
self.assertEqual(zipf.namelist(), ['foo.txt']) self.assertEqual(zipf.namelist(), ['foo.txt'])
def test_struct_sizes(self): def test_struct_sizes(self):
# check that ZIP internal structure sizes are calculated correctly # check that ZIP internal structure sizes are calculated correctly
...@@ -787,42 +773,36 @@ class OtherTests(unittest.TestCase): ...@@ -787,42 +773,36 @@ class OtherTests(unittest.TestCase):
# This test checks that comments on the archive are handled properly # This test checks that comments on the archive are handled properly
# check default comment is empty # check default comment is empty
zipf = zipfile.ZipFile(TESTFN, mode="w") with zipfile.ZipFile(TESTFN, mode="w") as zipf:
self.assertEqual(zipf.comment, '') self.assertEqual(zipf.comment, '')
zipf.writestr("foo.txt", "O, for a Muse of Fire!") zipf.writestr("foo.txt", "O, for a Muse of Fire!")
zipf.close()
zipfr = zipfile.ZipFile(TESTFN, mode="r") with zipfile.ZipFile(TESTFN, mode="r") as zipf:
self.assertEqual(zipfr.comment, '') self.assertEqual(zipf.comment, '')
zipfr.close()
# check a simple short comment # check a simple short comment
comment = 'Bravely taking to his feet, he beat a very brave retreat.' comment = 'Bravely taking to his feet, he beat a very brave retreat.'
zipf = zipfile.ZipFile(TESTFN, mode="w") with zipfile.ZipFile(TESTFN, mode="w") as zipf:
zipf.comment = comment zipf.comment = comment
zipf.writestr("foo.txt", "O, for a Muse of Fire!") zipf.writestr("foo.txt", "O, for a Muse of Fire!")
zipf.close() with zipfile.ZipFile(TESTFN, mode="r") as zipf:
zipfr = zipfile.ZipFile(TESTFN, mode="r") self.assertEqual(zipf.comment, comment)
self.assertEqual(zipfr.comment, comment)
zipfr.close()
# check a comment of max length # check a comment of max length
comment2 = ''.join(['%d' % (i**3 % 10) for i in xrange((1 << 16)-1)]) comment2 = ''.join(['%d' % (i**3 % 10) for i in xrange((1 << 16)-1)])
zipf = zipfile.ZipFile(TESTFN, mode="w") with zipfile.ZipFile(TESTFN, mode="w") as zipf:
zipf.comment = comment2 zipf.comment = comment2
zipf.writestr("foo.txt", "O, for a Muse of Fire!") zipf.writestr("foo.txt", "O, for a Muse of Fire!")
zipf.close()
zipfr = zipfile.ZipFile(TESTFN, mode="r") with zipfile.ZipFile(TESTFN, mode="r") as zipf:
self.assertEqual(zipfr.comment, comment2) self.assertEqual(zipf.comment, comment2)
zipfr.close()
# check a comment that is too long is truncated # check a comment that is too long is truncated
zipf = zipfile.ZipFile(TESTFN, mode="w") with zipfile.ZipFile(TESTFN, mode="w") as zipf:
zipf.comment = comment2 + 'oops' zipf.comment = comment2 + 'oops'
zipf.writestr("foo.txt", "O, for a Muse of Fire!") zipf.writestr("foo.txt", "O, for a Muse of Fire!")
zipf.close() with zipfile.ZipFile(TESTFN, mode="r") as zipf:
zipfr = zipfile.ZipFile(TESTFN, mode="r") self.assertEqual(zipf.comment, comment2)
self.assertEqual(zipfr.comment, comment2)
zipfr.close()
def tearDown(self): def tearDown(self):
unlink(TESTFN) unlink(TESTFN)
...@@ -907,21 +887,19 @@ class TestsWithRandomBinaryFiles(unittest.TestCase): ...@@ -907,21 +887,19 @@ class TestsWithRandomBinaryFiles(unittest.TestCase):
def make_test_archive(self, f, compression): def make_test_archive(self, f, compression):
# Create the ZIP archive # Create the ZIP archive
zipfp = zipfile.ZipFile(f, "w", compression) with zipfile.ZipFile(f, "w", compression) as zipfp:
zipfp.write(TESTFN, "another"+os.extsep+"name") zipfp.write(TESTFN, "another"+os.extsep+"name")
zipfp.write(TESTFN, TESTFN) zipfp.write(TESTFN, TESTFN)
zipfp.close()
def zip_test(self, f, compression): def zip_test(self, f, compression):
self.make_test_archive(f, compression) self.make_test_archive(f, compression)
# Read the ZIP archive # Read the ZIP archive
zipfp = zipfile.ZipFile(f, "r", compression) with zipfile.ZipFile(f, "r", compression) as zipfp:
testdata = zipfp.read(TESTFN) testdata = zipfp.read(TESTFN)
self.assertEqual(len(testdata), len(self.data)) self.assertEqual(len(testdata), len(self.data))
self.assertEqual(testdata, self.data) self.assertEqual(testdata, self.data)
self.assertEqual(zipfp.read("another"+os.extsep+"name"), self.data) self.assertEqual(zipfp.read("another"+os.extsep+"name"), self.data)
zipfp.close()
def test_stored(self): def test_stored(self):
for f in (TESTFN2, TemporaryFile(), StringIO()): for f in (TESTFN2, TemporaryFile(), StringIO()):
...@@ -931,31 +909,30 @@ class TestsWithRandomBinaryFiles(unittest.TestCase): ...@@ -931,31 +909,30 @@ class TestsWithRandomBinaryFiles(unittest.TestCase):
self.make_test_archive(f, compression) self.make_test_archive(f, compression)
# Read the ZIP archive # Read the ZIP archive
zipfp = zipfile.ZipFile(f, "r", compression) with zipfile.ZipFile(f, "r", compression) as zipfp:
zipdata1 = [] zipdata1 = []
zipopen1 = zipfp.open(TESTFN) zipopen1 = zipfp.open(TESTFN)
while 1: while True:
read_data = zipopen1.read(256) read_data = zipopen1.read(256)
if not read_data: if not read_data:
break break
zipdata1.append(read_data) zipdata1.append(read_data)
zipdata2 = [] zipdata2 = []
zipopen2 = zipfp.open("another"+os.extsep+"name") zipopen2 = zipfp.open("another"+os.extsep+"name")
while 1: while True:
read_data = zipopen2.read(256) read_data = zipopen2.read(256)
if not read_data: if not read_data:
break break
zipdata2.append(read_data) zipdata2.append(read_data)
testdata1 = ''.join(zipdata1) testdata1 = ''.join(zipdata1)
self.assertEqual(len(testdata1), len(self.data)) self.assertEqual(len(testdata1), len(self.data))
self.assertEqual(testdata1, self.data) self.assertEqual(testdata1, self.data)
testdata2 = ''.join(zipdata2) testdata2 = ''.join(zipdata2)
self.assertEqual(len(testdata1), len(self.data)) self.assertEqual(len(testdata1), len(self.data))
self.assertEqual(testdata1, self.data) self.assertEqual(testdata1, self.data)
zipfp.close()
def test_open_stored(self): def test_open_stored(self):
for f in (TESTFN2, TemporaryFile(), StringIO()): for f in (TESTFN2, TemporaryFile(), StringIO()):
...@@ -965,19 +942,18 @@ class TestsWithRandomBinaryFiles(unittest.TestCase): ...@@ -965,19 +942,18 @@ class TestsWithRandomBinaryFiles(unittest.TestCase):
self.make_test_archive(f, compression) self.make_test_archive(f, compression)
# Read the ZIP archive # Read the ZIP archive
zipfp = zipfile.ZipFile(f, "r", compression) with zipfile.ZipFile(f, "r", compression) as zipfp:
zipdata1 = [] zipdata1 = []
zipopen1 = zipfp.open(TESTFN) zipopen1 = zipfp.open(TESTFN)
while 1: while True:
read_data = zipopen1.read(randint(1, 1024)) read_data = zipopen1.read(randint(1, 1024))
if not read_data: if not read_data:
break break
zipdata1.append(read_data) zipdata1.append(read_data)
testdata = ''.join(zipdata1) testdata = ''.join(zipdata1)
self.assertEqual(len(testdata), len(self.data)) self.assertEqual(len(testdata), len(self.data))
self.assertEqual(testdata, self.data) self.assertEqual(testdata, self.data)
zipfp.close()
def test_random_open_stored(self): def test_random_open_stored(self):
for f in (TESTFN2, TemporaryFile(), StringIO()): for f in (TESTFN2, TemporaryFile(), StringIO()):
...@@ -988,51 +964,47 @@ class TestsWithRandomBinaryFiles(unittest.TestCase): ...@@ -988,51 +964,47 @@ class TestsWithRandomBinaryFiles(unittest.TestCase):
class TestsWithMultipleOpens(unittest.TestCase): class TestsWithMultipleOpens(unittest.TestCase):
def setUp(self): def setUp(self):
# Create the ZIP archive # Create the ZIP archive
zipfp = zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_DEFLATED) with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_DEFLATED) as zipfp:
zipfp.writestr('ones', '1'*FIXEDTEST_SIZE) zipfp.writestr('ones', '1'*FIXEDTEST_SIZE)
zipfp.writestr('twos', '2'*FIXEDTEST_SIZE) zipfp.writestr('twos', '2'*FIXEDTEST_SIZE)
zipfp.close()
def test_same_file(self): def test_same_file(self):
# Verify that (when the ZipFile is in control of creating file objects) # Verify that (when the ZipFile is in control of creating file objects)
# multiple open() calls can be made without interfering with each other. # multiple open() calls can be made without interfering with each other.
zipf = zipfile.ZipFile(TESTFN2, mode="r") with zipfile.ZipFile(TESTFN2, mode="r") as zipf:
zopen1 = zipf.open('ones') zopen1 = zipf.open('ones')
zopen2 = zipf.open('ones') zopen2 = zipf.open('ones')
data1 = zopen1.read(500) data1 = zopen1.read(500)
data2 = zopen2.read(500) data2 = zopen2.read(500)
data1 += zopen1.read(500) data1 += zopen1.read(500)
data2 += zopen2.read(500) data2 += zopen2.read(500)
self.assertEqual(data1, data2) self.assertEqual(data1, data2)
zipf.close()
def test_different_file(self): def test_different_file(self):
# Verify that (when the ZipFile is in control of creating file objects) # Verify that (when the ZipFile is in control of creating file objects)
# multiple open() calls can be made without interfering with each other. # multiple open() calls can be made without interfering with each other.
zipf = zipfile.ZipFile(TESTFN2, mode="r") with zipfile.ZipFile(TESTFN2, mode="r") as zipf:
zopen1 = zipf.open('ones') zopen1 = zipf.open('ones')
zopen2 = zipf.open('twos') zopen2 = zipf.open('twos')
data1 = zopen1.read(500) data1 = zopen1.read(500)
data2 = zopen2.read(500) data2 = zopen2.read(500)
data1 += zopen1.read(500) data1 += zopen1.read(500)
data2 += zopen2.read(500) data2 += zopen2.read(500)
self.assertEqual(data1, '1'*FIXEDTEST_SIZE) self.assertEqual(data1, '1'*FIXEDTEST_SIZE)
self.assertEqual(data2, '2'*FIXEDTEST_SIZE) self.assertEqual(data2, '2'*FIXEDTEST_SIZE)
zipf.close()
def test_interleaved(self): def test_interleaved(self):
# Verify that (when the ZipFile is in control of creating file objects) # Verify that (when the ZipFile is in control of creating file objects)
# multiple open() calls can be made without interfering with each other. # multiple open() calls can be made without interfering with each other.
zipf = zipfile.ZipFile(TESTFN2, mode="r") with zipfile.ZipFile(TESTFN2, mode="r") as zipf:
zopen1 = zipf.open('ones') zopen1 = zipf.open('ones')
data1 = zopen1.read(500) data1 = zopen1.read(500)
zopen2 = zipf.open('twos') zopen2 = zipf.open('twos')
data2 = zopen2.read(500) data2 = zopen2.read(500)
data1 += zopen1.read(500) data1 += zopen1.read(500)
data2 += zopen2.read(500) data2 += zopen2.read(500)
self.assertEqual(data1, '1'*FIXEDTEST_SIZE) self.assertEqual(data1, '1'*FIXEDTEST_SIZE)
self.assertEqual(data2, '2'*FIXEDTEST_SIZE) self.assertEqual(data2, '2'*FIXEDTEST_SIZE)
zipf.close()
def tearDown(self): def tearDown(self):
unlink(TESTFN2) unlink(TESTFN2)
...@@ -1043,8 +1015,8 @@ class TestWithDirectory(unittest.TestCase): ...@@ -1043,8 +1015,8 @@ class TestWithDirectory(unittest.TestCase):
os.mkdir(TESTFN2) os.mkdir(TESTFN2)
def test_extract_dir(self): def test_extract_dir(self):
zipf = zipfile.ZipFile(findfile("zipdir.zip")) with zipfile.ZipFile(findfile("zipdir.zip")) as zipf:
zipf.extractall(TESTFN2) zipf.extractall(TESTFN2)
self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "a"))) self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "a")))
self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "a", "b"))) self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "a", "b")))
self.assertTrue(os.path.exists(os.path.join(TESTFN2, "a", "b", "c"))) self.assertTrue(os.path.exists(os.path.join(TESTFN2, "a", "b", "c")))
...@@ -1078,57 +1050,48 @@ class UniversalNewlineTests(unittest.TestCase): ...@@ -1078,57 +1050,48 @@ class UniversalNewlineTests(unittest.TestCase):
def make_test_archive(self, f, compression): def make_test_archive(self, f, compression):
# Create the ZIP archive # Create the ZIP archive
zipfp = zipfile.ZipFile(f, "w", compression) with zipfile.ZipFile(f, "w", compression) as zipfp:
for fn in self.arcfiles.values(): for fn in self.arcfiles.values():
zipfp.write(fn, fn) zipfp.write(fn, fn)
zipfp.close()
def read_test(self, f, compression): def read_test(self, f, compression):
self.make_test_archive(f, compression) self.make_test_archive(f, compression)
# Read the ZIP archive # Read the ZIP archive
zipfp = zipfile.ZipFile(f, "r") with zipfile.ZipFile(f, "r") as zipfp:
for sep, fn in self.arcfiles.items(): for sep, fn in self.arcfiles.items():
zipdata = zipfp.open(fn, "rU").read() zipdata = zipfp.open(fn, "rU").read()
self.assertEqual(self.arcdata[sep], zipdata) self.assertEqual(self.arcdata[sep], zipdata)
zipfp.close()
def readline_test(self, f, compression): def readline_test(self, f, compression):
self.make_test_archive(f, compression) self.make_test_archive(f, compression)
# Read the ZIP archive # Read the ZIP archive
zipfp = zipfile.ZipFile(f, "r") with zipfile.ZipFile(f, "r") as zipfp:
for sep, fn in self.arcfiles.items(): for sep, fn in self.arcfiles.items():
zipopen = zipfp.open(fn, "rU") zipopen = zipfp.open(fn, "rU")
for line in self.line_gen: for line in self.line_gen:
linedata = zipopen.readline() linedata = zipopen.readline()
self.assertEqual(linedata, line + '\n') self.assertEqual(linedata, line + '\n')
zipfp.close()
def readlines_test(self, f, compression): def readlines_test(self, f, compression):
self.make_test_archive(f, compression) self.make_test_archive(f, compression)
# Read the ZIP archive # Read the ZIP archive
zipfp = zipfile.ZipFile(f, "r") with zipfile.ZipFile(f, "r") as zipfp:
for sep, fn in self.arcfiles.items(): for sep, fn in self.arcfiles.items():
ziplines = zipfp.open(fn, "rU").readlines() ziplines = zipfp.open(fn, "rU").readlines()
for line, zipline in zip(self.line_gen, ziplines): for line, zipline in zip(self.line_gen, ziplines):
self.assertEqual(zipline, line + '\n') self.assertEqual(zipline, line + '\n')
zipfp.close()
def iterlines_test(self, f, compression): def iterlines_test(self, f, compression):
self.make_test_archive(f, compression) self.make_test_archive(f, compression)
# Read the ZIP archive # Read the ZIP archive
zipfp = zipfile.ZipFile(f, "r") with zipfile.ZipFile(f, "r") as zipfp:
for sep, fn in self.arcfiles.items(): for sep, fn in self.arcfiles.items():
for line, zipline in zip(self.line_gen, zipfp.open(fn, "rU")): for line, zipline in zip(self.line_gen, zipfp.open(fn, "rU")):
self.assertEqual(zipline, line + '\n') self.assertEqual(zipline, line + '\n')
zipfp.close()
def test_read_stored(self): def test_read_stored(self):
for f in (TESTFN2, TemporaryFile(), StringIO()): for f in (TESTFN2, TemporaryFile(), StringIO()):
......
...@@ -721,6 +721,12 @@ class ZipFile: ...@@ -721,6 +721,12 @@ class ZipFile:
self.fp = None self.fp = None
raise RuntimeError, 'Mode must be "r", "w" or "a"' raise RuntimeError, 'Mode must be "r", "w" or "a"'
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
self.close()
def _GetContents(self): def _GetContents(self):
"""Read the directory, making sure we close the file if the format """Read the directory, making sure we close the file if the format
is bad.""" is bad."""
......
...@@ -44,8 +44,11 @@ Core and Builtins ...@@ -44,8 +44,11 @@ Core and Builtins
Library Library
------- -------
- Issue #5511: now zipfile.ZipFile can be used as a context manager.
Initial patch by Brian Curtin.
- Distutils now correctly identifies the build architecture as "x86_64" - Distutils now correctly identifies the build architecture as "x86_64"
when building on OSX 10.6 without "-arch" flags. when building on OSX 10.6 without "-arch" flags.
- Issue #7556: Distutils' msvc9compiler now opens the MSVC Manifest - Issue #7556: Distutils' msvc9compiler now opens the MSVC Manifest
file in text mode. file in text mode.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment