Kaydet (Commit) 0d654335 authored tarafından Brian Curtin's avatar Brian Curtin

Fix #8886. Use context managers throughout zipfile tests.

This was fixed in py3k SVN. Consider this a backport.
üst c98556e7
...@@ -115,20 +115,20 @@ class TestsWithSourceFile(unittest.TestCase): ...@@ -115,20 +115,20 @@ class TestsWithSourceFile(unittest.TestCase):
# Read the ZIP archive # Read the ZIP archive
with zipfile.ZipFile(f, "r", compression) as zipfp: with zipfile.ZipFile(f, "r", compression) as zipfp:
zipdata1 = [] zipdata1 = []
zipopen1 = zipfp.open(TESTFN) with zipfp.open(TESTFN) as zipopen1:
while True: while True:
read_data = zipopen1.read(256) read_data = zipopen1.read(256)
if not read_data: if not read_data:
break break
zipdata1.append(read_data) zipdata1.append(read_data)
zipdata2 = [] zipdata2 = []
zipopen2 = zipfp.open("another.name") with zipfp.open("another.name") as zipopen2:
while True: while True:
read_data = zipopen2.read(256) read_data = zipopen2.read(256)
if not read_data: if not read_data:
break break
zipdata2.append(read_data) zipdata2.append(read_data)
self.assertEqual(''.join(zipdata1), self.data) self.assertEqual(''.join(zipdata1), self.data)
self.assertEqual(''.join(zipdata2), self.data) self.assertEqual(''.join(zipdata2), self.data)
...@@ -147,7 +147,8 @@ class TestsWithSourceFile(unittest.TestCase): ...@@ -147,7 +147,8 @@ class TestsWithSourceFile(unittest.TestCase):
infos = zipfp.infolist() infos = zipfp.infolist()
data = "" data = ""
for info in infos: for info in infos:
data += zipfp.open(info).read() with zipfp.open(info) as f:
data += f.read()
self.assertTrue(data == "foobar" or data == "barfoo") self.assertTrue(data == "foobar" or data == "barfoo")
data = "" data = ""
for info in infos: for info in infos:
...@@ -160,12 +161,12 @@ class TestsWithSourceFile(unittest.TestCase): ...@@ -160,12 +161,12 @@ class TestsWithSourceFile(unittest.TestCase):
# Read the ZIP archive # Read the ZIP archive
with zipfile.ZipFile(f, "r", compression) as zipfp: with zipfile.ZipFile(f, "r", compression) as zipfp:
zipdata1 = [] zipdata1 = []
zipopen1 = zipfp.open(TESTFN) with zipfp.open(TESTFN) as zipopen1:
while True: while True:
read_data = zipopen1.read(randint(1, 1024)) read_data = zipopen1.read(randint(1, 1024))
if not read_data: if not read_data:
break break
zipdata1.append(read_data) zipdata1.append(read_data)
self.assertEqual(''.join(zipdata1), self.data) self.assertEqual(''.join(zipdata1), self.data)
...@@ -177,16 +178,14 @@ class TestsWithSourceFile(unittest.TestCase): ...@@ -177,16 +178,14 @@ class TestsWithSourceFile(unittest.TestCase):
f = StringIO() f = StringIO()
data = 'a\r\n' * 16 * 1024 data = 'a\r\n' * 16 * 1024
zipfp = zipfile.ZipFile(f, 'w', zipfile.ZIP_STORED) with zipfile.ZipFile(f, 'w', zipfile.ZIP_STORED) as zipfp:
zipfp.writestr(TESTFN, data) zipfp.writestr(TESTFN, data)
zipfp.close()
data2 = '' data2 = ''
zipfp = zipfile.ZipFile(f, 'r') with zipfile.ZipFile(f, 'r') as zipfp:
zipopen = zipfp.open(TESTFN, 'rU') with zipfp.open(TESTFN, 'rU') as zipopen:
for line in zipopen: for line in zipopen:
data2 += line data2 += line
zipfp.close()
self.assertEqual(data, data2.replace('\n', '\r\n')) self.assertEqual(data, data2.replace('\n', '\r\n'))
...@@ -194,42 +193,41 @@ class TestsWithSourceFile(unittest.TestCase): ...@@ -194,42 +193,41 @@ class TestsWithSourceFile(unittest.TestCase):
self.make_test_archive(f, compression) self.make_test_archive(f, compression)
# Read the ZIP archive # Read the ZIP archive
zipfp = zipfile.ZipFile(f, "r") with zipfile.ZipFile(f, "r") as zipfp:
zipopen = zipfp.open(TESTFN) with zipfp.open(TESTFN) as zipopen:
data = ''
data = '' while True:
while True: read = zipopen.readline()
read = zipopen.readline() if not read:
if not read: break
break data += read
data += read
read = zipopen.read(100)
read = zipopen.read(100) if not read:
if not read: break
break data += read
data += read
self.assertEqual(data, self.data) self.assertEqual(data, self.data)
zipfp.close()
def zip_readline_test(self, f, compression): def zip_readline_test(self, f, compression):
self.make_test_archive(f, compression) self.make_test_archive(f, compression)
# Read the ZIP archive # Read the ZIP archive
with zipfile.ZipFile(f, "r") as zipfp: with zipfile.ZipFile(f, "r") as zipfp:
zipopen = zipfp.open(TESTFN) with zipfp.open(TESTFN) as zipopen:
for line in self.line_gen: for line in self.line_gen:
linedata = zipopen.readline() linedata = zipopen.readline()
self.assertEqual(linedata, line + '\n') self.assertEqual(linedata, line + '\n')
def zip_readlines_test(self, f, compression): def zip_readlines_test(self, f, compression):
self.make_test_archive(f, compression) self.make_test_archive(f, compression)
# Read the ZIP archive # Read the ZIP archive
with zipfile.ZipFile(f, "r") as zipfp: with zipfile.ZipFile(f, "r") as zipfp:
ziplines = zipfp.open(TESTFN).readlines() with zipfp.open(TESTFN) as zo:
for line, zipline in zip(self.line_gen, ziplines): ziplines = zo.readlines()
self.assertEqual(zipline, line + '\n') for line, zipline in zip(self.line_gen, ziplines):
self.assertEqual(zipline, line + '\n')
def zip_iterlines_test(self, f, compression): def zip_iterlines_test(self, f, compression):
self.make_test_archive(f, compression) self.make_test_archive(f, compression)
...@@ -301,9 +299,9 @@ class TestsWithSourceFile(unittest.TestCase): ...@@ -301,9 +299,9 @@ class TestsWithSourceFile(unittest.TestCase):
# Get an open object for strfile # Get an open object for strfile
with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_DEFLATED) as zipfp: with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_DEFLATED) as zipfp:
openobj = zipfp.open("strfile") with zipfp.open("strfile") as openobj:
self.assertEqual(openobj.read(1), '1') self.assertEqual(openobj.read(1), '1')
self.assertEqual(openobj.read(1), '2') self.assertEqual(openobj.read(1), '2')
def test_absolute_arcnames(self): def test_absolute_arcnames(self):
with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
...@@ -775,8 +773,8 @@ class OtherTests(unittest.TestCase): ...@@ -775,8 +773,8 @@ class OtherTests(unittest.TestCase):
self.assertRaises(IOError, zipfile.ZipFile, TESTFN) self.assertRaises(IOError, zipfile.ZipFile, TESTFN)
def test_empty_file_raises_BadZipFile(self): def test_empty_file_raises_BadZipFile(self):
f = open(TESTFN, 'w') with open(TESTFN, 'w') as f:
f.close() pass
self.assertRaises(zipfile.BadZipfile, zipfile.ZipFile, TESTFN) self.assertRaises(zipfile.BadZipfile, zipfile.ZipFile, TESTFN)
with open(TESTFN, 'w') as fp: with open(TESTFN, 'w') as fp:
...@@ -820,11 +818,11 @@ class OtherTests(unittest.TestCase): ...@@ -820,11 +818,11 @@ class OtherTests(unittest.TestCase):
with zipfile.ZipFile(TESTFN, mode="w") as zipf: with zipfile.ZipFile(TESTFN, mode="w") as zipf:
zipf.writestr("foo.txt", "O, for a Muse of Fire!") zipf.writestr("foo.txt", "O, for a Muse of Fire!")
# read the data to make sure the file is there # read the data to make sure the file is there
f = zipf.open("foo.txt") with zipf.open("foo.txt") as f:
for i in xrange(FIXEDTEST_SIZE): for i in xrange(FIXEDTEST_SIZE):
self.assertEqual(f.read(0), '') self.assertEqual(f.read(0), '')
self.assertEqual(f.read(), "O, for a Muse of Fire!") self.assertEqual(f.read(), "O, for a Muse of Fire!")
def test_open_non_existent_item(self): def test_open_non_existent_item(self):
"""Check that attempting to call open() for an item that doesn't """Check that attempting to call open() for an item that doesn't
...@@ -952,15 +950,15 @@ class OtherTests(unittest.TestCase): ...@@ -952,15 +950,15 @@ class OtherTests(unittest.TestCase):
def test_empty_zipfile(self): def test_empty_zipfile(self):
# Check that creating a file in 'w' or 'a' mode and closing without # Check that creating a file in 'w' or 'a' mode and closing without
# adding any files to the archives creates a valid empty ZIP file # adding any files to the archives creates a valid empty ZIP file
zipf = zipfile.ZipFile(TESTFN, mode="w") with zipfile.ZipFile(TESTFN, mode="w") as zipf:
zipf.close() pass
try: try:
zipf = zipfile.ZipFile(TESTFN, mode="r") zipf = zipfile.ZipFile(TESTFN, mode="r")
except zipfile.BadZipfile: except zipfile.BadZipfile:
self.fail("Unable to create empty ZIP file in 'w' mode") self.fail("Unable to create empty ZIP file in 'w' mode")
zipf = zipfile.ZipFile(TESTFN, mode="a") with zipfile.ZipFile(TESTFN, mode="a") as zipf:
zipf.close() pass
try: try:
zipf = zipfile.ZipFile(TESTFN, mode="r") zipf = zipfile.ZipFile(TESTFN, mode="r")
except: except:
...@@ -970,8 +968,8 @@ class OtherTests(unittest.TestCase): ...@@ -970,8 +968,8 @@ class OtherTests(unittest.TestCase):
# Issue 1710703: Check that opening a file with less than 22 bytes # Issue 1710703: Check that opening a file with less than 22 bytes
# raises a BadZipfile exception (rather than the previously unhelpful # raises a BadZipfile exception (rather than the previously unhelpful
# IOError) # IOError)
f = open(TESTFN, 'w') with open(TESTFN, 'w') as f:
f.close() pass
self.assertRaises(zipfile.BadZipfile, zipfile.ZipFile, TESTFN, 'r') self.assertRaises(zipfile.BadZipfile, zipfile.ZipFile, TESTFN, 'r')
def tearDown(self): def tearDown(self):
...@@ -1084,20 +1082,20 @@ class TestsWithRandomBinaryFiles(unittest.TestCase): ...@@ -1084,20 +1082,20 @@ class TestsWithRandomBinaryFiles(unittest.TestCase):
# Read the ZIP archive # Read the ZIP archive
with zipfile.ZipFile(f, "r", compression) as zipfp: with zipfile.ZipFile(f, "r", compression) as zipfp:
zipdata1 = [] zipdata1 = []
zipopen1 = zipfp.open(TESTFN) with zipfp.open(TESTFN) as zipopen1:
while True: while True:
read_data = zipopen1.read(256) read_data = zipopen1.read(256)
if not read_data: if not read_data:
break break
zipdata1.append(read_data) zipdata1.append(read_data)
zipdata2 = [] zipdata2 = []
zipopen2 = zipfp.open("another.name") with zipfp.open("another.name") as zipopen2:
while True: while True:
read_data = zipopen2.read(256) read_data = zipopen2.read(256)
if not read_data: if not read_data:
break break
zipdata2.append(read_data) zipdata2.append(read_data)
testdata1 = ''.join(zipdata1) testdata1 = ''.join(zipdata1)
self.assertEqual(len(testdata1), len(self.data)) self.assertEqual(len(testdata1), len(self.data))
...@@ -1122,12 +1120,12 @@ class TestsWithRandomBinaryFiles(unittest.TestCase): ...@@ -1122,12 +1120,12 @@ class TestsWithRandomBinaryFiles(unittest.TestCase):
# Read the ZIP archive # Read the ZIP archive
with zipfile.ZipFile(f, "r", compression) as zipfp: with zipfile.ZipFile(f, "r", compression) as zipfp:
zipdata1 = [] zipdata1 = []
zipopen1 = zipfp.open(TESTFN) with zipfp.open(TESTFN) as zipopen1:
while True: while True:
read_data = zipopen1.read(randint(1, 1024)) read_data = zipopen1.read(randint(1, 1024))
if not read_data: if not read_data:
break break
zipdata1.append(read_data) zipdata1.append(read_data)
testdata = ''.join(zipdata1) testdata = ''.join(zipdata1)
self.assertEqual(len(testdata), len(self.data)) self.assertEqual(len(testdata), len(self.data))
...@@ -1167,12 +1165,11 @@ class TestsWithMultipleOpens(unittest.TestCase): ...@@ -1167,12 +1165,11 @@ class TestsWithMultipleOpens(unittest.TestCase):
# Verify that (when the ZipFile is in control of creating file objects) # Verify that (when the ZipFile is in control of creating file objects)
# multiple open() calls can be made without interfering with each other. # multiple open() calls can be made without interfering with each other.
with zipfile.ZipFile(TESTFN2, mode="r") as zipf: with zipfile.ZipFile(TESTFN2, mode="r") as zipf:
zopen1 = zipf.open('ones') with zipf.open('ones') as zopen1, zipf.open('twos') as zopen2:
zopen2 = zipf.open('twos') data1 = zopen1.read(500)
data1 = zopen1.read(500) data2 = zopen2.read(500)
data2 = zopen2.read(500) data1 += zopen1.read(500)
data1 += zopen1.read(500) data2 += zopen2.read(500)
data2 += zopen2.read(500)
self.assertEqual(data1, '1'*FIXEDTEST_SIZE) self.assertEqual(data1, '1'*FIXEDTEST_SIZE)
self.assertEqual(data2, '2'*FIXEDTEST_SIZE) self.assertEqual(data2, '2'*FIXEDTEST_SIZE)
...@@ -1180,12 +1177,11 @@ class TestsWithMultipleOpens(unittest.TestCase): ...@@ -1180,12 +1177,11 @@ class TestsWithMultipleOpens(unittest.TestCase):
# Verify that (when the ZipFile is in control of creating file objects) # Verify that (when the ZipFile is in control of creating file objects)
# multiple open() calls can be made without interfering with each other. # multiple open() calls can be made without interfering with each other.
with zipfile.ZipFile(TESTFN2, mode="r") as zipf: with zipfile.ZipFile(TESTFN2, mode="r") as zipf:
zopen1 = zipf.open('ones') with zipf.open('ones') as zopen1, zipf.open('twos') as zopen2:
data1 = zopen1.read(500) data1 = zopen1.read(500)
zopen2 = zipf.open('twos') data2 = zopen2.read(500)
data2 = zopen2.read(500) data1 += zopen1.read(500)
data1 += zopen1.read(500) data2 += zopen2.read(500)
data2 += zopen2.read(500)
self.assertEqual(data1, '1'*FIXEDTEST_SIZE) self.assertEqual(data1, '1'*FIXEDTEST_SIZE)
self.assertEqual(data2, '2'*FIXEDTEST_SIZE) self.assertEqual(data2, '2'*FIXEDTEST_SIZE)
...@@ -1244,7 +1240,8 @@ class UniversalNewlineTests(unittest.TestCase): ...@@ -1244,7 +1240,8 @@ class UniversalNewlineTests(unittest.TestCase):
# Read the ZIP archive # Read the ZIP archive
with zipfile.ZipFile(f, "r") as zipfp: with zipfile.ZipFile(f, "r") as zipfp:
for sep, fn in self.arcfiles.items(): for sep, fn in self.arcfiles.items():
zipdata = zipfp.open(fn, "rU").read() with zipfp.open(fn, "rU") as fp:
zipdata = fp.read()
self.assertEqual(self.arcdata[sep], zipdata) self.assertEqual(self.arcdata[sep], zipdata)
def readline_read_test(self, f, compression): def readline_read_test(self, f, compression):
...@@ -1253,18 +1250,18 @@ class UniversalNewlineTests(unittest.TestCase): ...@@ -1253,18 +1250,18 @@ class UniversalNewlineTests(unittest.TestCase):
# Read the ZIP archive # Read the ZIP archive
zipfp = zipfile.ZipFile(f, "r") zipfp = zipfile.ZipFile(f, "r")
for sep, fn in self.arcfiles.items(): for sep, fn in self.arcfiles.items():
zipopen = zipfp.open(fn, "rU") with zipfp.open(fn, "rU") as zipopen:
data = '' data = ''
while True: while True:
read = zipopen.readline() read = zipopen.readline()
if not read: if not read:
break break
data += read data += read
read = zipopen.read(5) read = zipopen.read(5)
if not read: if not read:
break break
data += read data += read
self.assertEqual(data, self.arcdata['\n']) self.assertEqual(data, self.arcdata['\n'])
...@@ -1276,10 +1273,10 @@ class UniversalNewlineTests(unittest.TestCase): ...@@ -1276,10 +1273,10 @@ class UniversalNewlineTests(unittest.TestCase):
# Read the ZIP archive # Read the ZIP archive
with zipfile.ZipFile(f, "r") as zipfp: with zipfile.ZipFile(f, "r") as zipfp:
for sep, fn in self.arcfiles.items(): for sep, fn in self.arcfiles.items():
zipopen = zipfp.open(fn, "rU") with zipfp.open(fn, "rU") as zipopen:
for line in self.line_gen: for line in self.line_gen:
linedata = zipopen.readline() linedata = zipopen.readline()
self.assertEqual(linedata, line + '\n') self.assertEqual(linedata, line + '\n')
def readlines_test(self, f, compression): def readlines_test(self, f, compression):
self.make_test_archive(f, compression) self.make_test_archive(f, compression)
...@@ -1287,7 +1284,8 @@ class UniversalNewlineTests(unittest.TestCase): ...@@ -1287,7 +1284,8 @@ class UniversalNewlineTests(unittest.TestCase):
# Read the ZIP archive # Read the ZIP archive
with zipfile.ZipFile(f, "r") as zipfp: with zipfile.ZipFile(f, "r") as zipfp:
for sep, fn in self.arcfiles.items(): for sep, fn in self.arcfiles.items():
ziplines = zipfp.open(fn, "rU").readlines() with zipfp.open(fn, "rU") as fp:
ziplines = fp.readlines()
for line, zipline in zip(self.line_gen, ziplines): for line, zipline in zip(self.line_gen, ziplines):
self.assertEqual(zipline, line + '\n') self.assertEqual(zipline, line + '\n')
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment