Kaydet (Commit) 07f24c50 authored tarafından Ezio Melotti's avatar Ezio Melotti

#7944: close files explicitly in test_tarfile (backport d560eece0857).

üst 8861b291
...@@ -60,9 +60,10 @@ class UstarReadTest(ReadTest): ...@@ -60,9 +60,10 @@ class UstarReadTest(ReadTest):
self.tar.extract("ustar/regtype", TEMPDIR) self.tar.extract("ustar/regtype", TEMPDIR)
tarinfo = self.tar.getmember("ustar/regtype") tarinfo = self.tar.getmember("ustar/regtype")
fobj1 = open(os.path.join(TEMPDIR, "ustar/regtype"), "rU") fobj1 = open(os.path.join(TEMPDIR, "ustar/regtype"), "rU")
with open(os.path.join(TEMPDIR, "ustar/regtype"), "rU") as fobj1:
lines1 = fobj1.readlines()
fobj2 = self.tar.extractfile(tarinfo) fobj2 = self.tar.extractfile(tarinfo)
lines1 = fobj1.readlines()
lines2 = fobj2.readlines() lines2 = fobj2.readlines()
self.assertTrue(lines1 == lines2, self.assertTrue(lines1 == lines2,
"fileobj.readlines() failed") "fileobj.readlines() failed")
...@@ -75,18 +76,17 @@ class UstarReadTest(ReadTest): ...@@ -75,18 +76,17 @@ class UstarReadTest(ReadTest):
def test_fileobj_iter(self): def test_fileobj_iter(self):
self.tar.extract("ustar/regtype", TEMPDIR) self.tar.extract("ustar/regtype", TEMPDIR)
tarinfo = self.tar.getmember("ustar/regtype") tarinfo = self.tar.getmember("ustar/regtype")
fobj1 = open(os.path.join(TEMPDIR, "ustar/regtype"), "rU") with open(os.path.join(TEMPDIR, "ustar/regtype"), "rU") as fobj1:
lines1 = fobj1.readlines()
fobj2 = self.tar.extractfile(tarinfo) fobj2 = self.tar.extractfile(tarinfo)
lines1 = fobj1.readlines()
lines2 = [line for line in fobj2] lines2 = [line for line in fobj2]
self.assertTrue(lines1 == lines2, self.assertTrue(lines1 == lines2,
"fileobj.__iter__() failed") "fileobj.__iter__() failed")
def test_fileobj_seek(self): def test_fileobj_seek(self):
self.tar.extract("ustar/regtype", TEMPDIR) self.tar.extract("ustar/regtype", TEMPDIR)
fobj = open(os.path.join(TEMPDIR, "ustar/regtype"), "rb") with open(os.path.join(TEMPDIR, "ustar/regtype"), "rb") as fobj:
data = fobj.read() data = fobj.read()
fobj.close()
tarinfo = self.tar.getmember("ustar/regtype") tarinfo = self.tar.getmember("ustar/regtype")
fobj = self.tar.extractfile(tarinfo) fobj = self.tar.extractfile(tarinfo)
...@@ -238,19 +238,24 @@ class CommonReadTest(ReadTest): ...@@ -238,19 +238,24 @@ class CommonReadTest(ReadTest):
# This test checks if tarfile.open() is able to open an empty tar # This test checks if tarfile.open() is able to open an empty tar
# archive successfully. Note that an empty tar archive is not the # archive successfully. Note that an empty tar archive is not the
# same as an empty file! # same as an empty file!
tarfile.open(tmpname, self.mode.replace("r", "w")).close() with tarfile.open(tmpname, self.mode.replace("r", "w")):
pass
try: try:
tar = tarfile.open(tmpname, self.mode) tar = tarfile.open(tmpname, self.mode)
tar.getnames() tar.getnames()
except tarfile.ReadError: except tarfile.ReadError:
self.fail("tarfile.open() failed on empty archive") self.fail("tarfile.open() failed on empty archive")
self.assertListEqual(tar.getmembers(), []) else:
self.assertListEqual(tar.getmembers(), [])
finally:
tar.close()
def test_null_tarfile(self): def test_null_tarfile(self):
# Test for issue6123: Allow opening empty archives. # Test for issue6123: Allow opening empty archives.
# This test guarantees that tarfile.open() does not treat an empty # This test guarantees that tarfile.open() does not treat an empty
# file as an empty tar archive. # file as an empty tar archive.
open(tmpname, "wb").close() with open(tmpname, "wb"):
pass
self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, self.mode) self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, self.mode)
self.assertRaises(tarfile.ReadError, tarfile.open, tmpname) self.assertRaises(tarfile.ReadError, tarfile.open, tmpname)
...@@ -274,15 +279,16 @@ class CommonReadTest(ReadTest): ...@@ -274,15 +279,16 @@ class CommonReadTest(ReadTest):
for char in ('\0', 'a'): for char in ('\0', 'a'):
# Test if EOFHeaderError ('\0') and InvalidHeaderError ('a') # Test if EOFHeaderError ('\0') and InvalidHeaderError ('a')
# are ignored correctly. # are ignored correctly.
fobj = _open(tmpname, "wb") with _open(tmpname, "wb") as fobj:
fobj.write(char * 1024) fobj.write(char * 1024)
fobj.write(tarfile.TarInfo("foo").tobuf()) fobj.write(tarfile.TarInfo("foo").tobuf())
fobj.close()
tar = tarfile.open(tmpname, mode="r", ignore_zeros=True) tar = tarfile.open(tmpname, mode="r", ignore_zeros=True)
self.assertListEqual(tar.getnames(), ["foo"], try:
self.assertListEqual(tar.getnames(), ["foo"],
"ignore_zeros=True should have skipped the %r-blocks" % char) "ignore_zeros=True should have skipped the %r-blocks" % char)
tar.close() finally:
tar.close()
def test_premature_end_of_archive(self): def test_premature_end_of_archive(self):
for size in (512, 600, 1024, 1200): for size in (512, 600, 1024, 1200):
...@@ -313,19 +319,21 @@ class MiscReadTest(CommonReadTest): ...@@ -313,19 +319,21 @@ class MiscReadTest(CommonReadTest):
taropen = tarfile.TarFile.taropen taropen = tarfile.TarFile.taropen
def test_no_name_argument(self): def test_no_name_argument(self):
fobj = open(self.tarname, "rb") with open(self.tarname, "rb") as fobj:
tar = tarfile.open(fileobj=fobj, mode=self.mode) tar = tarfile.open(fileobj=fobj, mode=self.mode)
self.assertEqual(tar.name, os.path.abspath(fobj.name)) self.assertEqual(tar.name, os.path.abspath(fobj.name))
def test_no_name_attribute(self): def test_no_name_attribute(self):
data = open(self.tarname, "rb").read() with open(self.tarname, "rb") as fobj:
data = fobj.read()
fobj = StringIO.StringIO(data) fobj = StringIO.StringIO(data)
self.assertRaises(AttributeError, getattr, fobj, "name") self.assertRaises(AttributeError, getattr, fobj, "name")
tar = tarfile.open(fileobj=fobj, mode=self.mode) tar = tarfile.open(fileobj=fobj, mode=self.mode)
self.assertEqual(tar.name, None) self.assertEqual(tar.name, None)
def test_empty_name_attribute(self): def test_empty_name_attribute(self):
data = open(self.tarname, "rb").read() with open(self.tarname, "rb") as fobj:
data = fobj.read()
fobj = StringIO.StringIO(data) fobj = StringIO.StringIO(data)
fobj.name = "" fobj.name = ""
tar = tarfile.open(fileobj=fobj, mode=self.mode) tar = tarfile.open(fileobj=fobj, mode=self.mode)
...@@ -346,12 +354,14 @@ class MiscReadTest(CommonReadTest): ...@@ -346,12 +354,14 @@ class MiscReadTest(CommonReadTest):
# Skip the first member and store values from the second member # Skip the first member and store values from the second member
# of the testtar. # of the testtar.
tar = tarfile.open(self.tarname, mode=self.mode) tar = tarfile.open(self.tarname, mode=self.mode)
tar.next() try:
t = tar.next() tar.next()
name = t.name t = tar.next()
offset = t.offset name = t.name
data = tar.extractfile(t).read() offset = t.offset
tar.close() data = tar.extractfile(t).read()
finally:
tar.close()
# Open the testtar and seek to the offset of the second member. # Open the testtar and seek to the offset of the second member.
if self.mode.endswith(":gz"): if self.mode.endswith(":gz"):
...@@ -361,26 +371,30 @@ class MiscReadTest(CommonReadTest): ...@@ -361,26 +371,30 @@ class MiscReadTest(CommonReadTest):
else: else:
_open = open _open = open
fobj = _open(self.tarname, "rb") fobj = _open(self.tarname, "rb")
fobj.seek(offset) try:
fobj.seek(offset)
# Test if the tarfile starts with the second member.
tar = tar.open(self.tarname, mode="r:", fileobj=fobj) # Test if the tarfile starts with the second member.
t = tar.next() tar = tar.open(self.tarname, mode="r:", fileobj=fobj)
self.assertEqual(t.name, name) t = tar.next()
# Read to the end of fileobj and test if seeking back to the self.assertEqual(t.name, name)
# beginning works. # Read to the end of fileobj and test if seeking back to the
tar.getmembers() # beginning works.
self.assertEqual(tar.extractfile(t).read(), data, tar.getmembers()
"seek back did not work") self.assertEqual(tar.extractfile(t).read(), data,
tar.close() "seek back did not work")
tar.close()
finally:
fobj.close()
def test_fail_comp(self): def test_fail_comp(self):
# For Gzip and Bz2 Tests: fail with a ReadError on an uncompressed file. # For Gzip and Bz2 Tests: fail with a ReadError on an uncompressed file.
if self.mode == "r:": if self.mode == "r:":
self.skipTest('needs a gz or bz2 mode') self.skipTest('needs a gz or bz2 mode')
self.assertRaises(tarfile.ReadError, tarfile.open, tarname, self.mode) self.assertRaises(tarfile.ReadError, tarfile.open, tarname, self.mode)
fobj = open(tarname, "rb") with open(tarname, "rb") as fobj:
self.assertRaises(tarfile.ReadError, tarfile.open, fileobj=fobj, mode=self.mode) self.assertRaises(tarfile.ReadError, tarfile.open,
fileobj=fobj, mode=self.mode)
def test_v7_dirtype(self): def test_v7_dirtype(self):
# Test old style dirtype member (bug #1336623): # Test old style dirtype member (bug #1336623):
...@@ -434,22 +448,25 @@ class MiscReadTest(CommonReadTest): ...@@ -434,22 +448,25 @@ class MiscReadTest(CommonReadTest):
# Test if extractall() correctly restores directory permissions # Test if extractall() correctly restores directory permissions
# and times (see issue1735). # and times (see issue1735).
tar = tarfile.open(tarname, encoding="iso8859-1") tar = tarfile.open(tarname, encoding="iso8859-1")
directories = [t for t in tar if t.isdir()] try:
tar.extractall(TEMPDIR, directories) directories = [t for t in tar if t.isdir()]
for tarinfo in directories: tar.extractall(TEMPDIR, directories)
path = os.path.join(TEMPDIR, tarinfo.name) for tarinfo in directories:
if sys.platform != "win32": path = os.path.join(TEMPDIR, tarinfo.name)
# Win32 has no support for fine grained permissions. if sys.platform != "win32":
self.assertEqual(tarinfo.mode & 0777, os.stat(path).st_mode & 0777) # Win32 has no support for fine grained permissions.
self.assertEqual(tarinfo.mtime, os.path.getmtime(path)) self.assertEqual(tarinfo.mode & 0777, os.stat(path).st_mode & 0777)
tar.close() self.assertEqual(tarinfo.mtime, os.path.getmtime(path))
finally:
tar.close()
def test_init_close_fobj(self): def test_init_close_fobj(self):
# Issue #7341: Close the internal file object in the TarFile # Issue #7341: Close the internal file object in the TarFile
# constructor in case of an error. For the test we rely on # constructor in case of an error. For the test we rely on
# the fact that opening an empty file raises a ReadError. # the fact that opening an empty file raises a ReadError.
empty = os.path.join(TEMPDIR, "empty") empty = os.path.join(TEMPDIR, "empty")
open(empty, "wb").write("") with open(empty, "wb") as fobj:
fobj.write("")
try: try:
tar = object.__new__(tarfile.TarFile) tar = object.__new__(tarfile.TarFile)
...@@ -460,7 +477,7 @@ class MiscReadTest(CommonReadTest): ...@@ -460,7 +477,7 @@ class MiscReadTest(CommonReadTest):
else: else:
self.fail("ReadError not raised") self.fail("ReadError not raised")
finally: finally:
os.remove(empty) support.unlink(empty)
def test_parallel_iteration(self): def test_parallel_iteration(self):
# Issue #16601: Restarting iteration over tarfile continued # Issue #16601: Restarting iteration over tarfile continued
...@@ -489,42 +506,47 @@ class StreamReadTest(CommonReadTest): ...@@ -489,42 +506,47 @@ class StreamReadTest(CommonReadTest):
def test_compare_members(self): def test_compare_members(self):
tar1 = tarfile.open(tarname, encoding="iso8859-1") tar1 = tarfile.open(tarname, encoding="iso8859-1")
tar2 = self.tar try:
tar2 = self.tar
while True:
t1 = tar1.next() while True:
t2 = tar2.next() t1 = tar1.next()
if t1 is None: t2 = tar2.next()
break if t1 is None:
self.assertTrue(t2 is not None, "stream.next() failed.") break
self.assertTrue(t2 is not None, "stream.next() failed.")
if t2.islnk() or t2.issym():
self.assertRaises(tarfile.StreamError, tar2.extractfile, t2) if t2.islnk() or t2.issym():
continue self.assertRaises(tarfile.StreamError, tar2.extractfile, t2)
continue
v1 = tar1.extractfile(t1)
v2 = tar2.extractfile(t2) v1 = tar1.extractfile(t1)
if v1 is None: v2 = tar2.extractfile(t2)
continue if v1 is None:
self.assertTrue(v2 is not None, "stream.extractfile() failed") continue
self.assertTrue(v1.read() == v2.read(), "stream extraction failed") self.assertTrue(v2 is not None, "stream.extractfile() failed")
self.assertTrue(v1.read() == v2.read(), "stream extraction failed")
tar1.close() finally:
tar1.close()
class DetectReadTest(unittest.TestCase): class DetectReadTest(unittest.TestCase):
def _testfunc_file(self, name, mode): def _testfunc_file(self, name, mode):
try: try:
tarfile.open(name, mode) tar = tarfile.open(name, mode)
except tarfile.ReadError: except tarfile.ReadError:
self.fail() self.fail()
else:
tar.close()
def _testfunc_fileobj(self, name, mode): def _testfunc_fileobj(self, name, mode):
try: try:
tarfile.open(name, mode, fileobj=open(name, "rb")) tar = tarfile.open(name, mode, fileobj=open(name, "rb"))
except tarfile.ReadError: except tarfile.ReadError:
self.fail() self.fail()
else:
tar.close()
def _test_modes(self, testfunc): def _test_modes(self, testfunc):
testfunc(tarname, "r") testfunc(tarname, "r")
...@@ -715,33 +737,39 @@ class PaxReadTest(LongnameTest): ...@@ -715,33 +737,39 @@ class PaxReadTest(LongnameTest):
def test_pax_global_headers(self): def test_pax_global_headers(self):
tar = tarfile.open(tarname, encoding="iso8859-1") tar = tarfile.open(tarname, encoding="iso8859-1")
try:
tarinfo = tar.getmember("pax/regtype1") tarinfo = tar.getmember("pax/regtype1")
self.assertEqual(tarinfo.uname, "foo") self.assertEqual(tarinfo.uname, "foo")
self.assertEqual(tarinfo.gname, "bar") self.assertEqual(tarinfo.gname, "bar")
self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), u"\xc4\xd6\xdc\xe4\xf6\xfc\xdf") self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), u"\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
tarinfo = tar.getmember("pax/regtype2") tarinfo = tar.getmember("pax/regtype2")
self.assertEqual(tarinfo.uname, "") self.assertEqual(tarinfo.uname, "")
self.assertEqual(tarinfo.gname, "bar") self.assertEqual(tarinfo.gname, "bar")
self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), u"\xc4\xd6\xdc\xe4\xf6\xfc\xdf") self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), u"\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
tarinfo = tar.getmember("pax/regtype3") tarinfo = tar.getmember("pax/regtype3")
self.assertEqual(tarinfo.uname, "tarfile") self.assertEqual(tarinfo.uname, "tarfile")
self.assertEqual(tarinfo.gname, "tarfile") self.assertEqual(tarinfo.gname, "tarfile")
self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), u"\xc4\xd6\xdc\xe4\xf6\xfc\xdf") self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), u"\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
finally:
tar.close()
def test_pax_number_fields(self): def test_pax_number_fields(self):
# All following number fields are read from the pax header. # All following number fields are read from the pax header.
tar = tarfile.open(tarname, encoding="iso8859-1") tar = tarfile.open(tarname, encoding="iso8859-1")
tarinfo = tar.getmember("pax/regtype4") try:
self.assertEqual(tarinfo.size, 7011) tarinfo = tar.getmember("pax/regtype4")
self.assertEqual(tarinfo.uid, 123) self.assertEqual(tarinfo.size, 7011)
self.assertEqual(tarinfo.gid, 123) self.assertEqual(tarinfo.uid, 123)
self.assertEqual(tarinfo.mtime, 1041808783.0) self.assertEqual(tarinfo.gid, 123)
self.assertEqual(type(tarinfo.mtime), float) self.assertEqual(tarinfo.mtime, 1041808783.0)
self.assertEqual(float(tarinfo.pax_headers["atime"]), 1041808783.0) self.assertEqual(type(tarinfo.mtime), float)
self.assertEqual(float(tarinfo.pax_headers["ctime"]), 1041808783.0) self.assertEqual(float(tarinfo.pax_headers["atime"]), 1041808783.0)
self.assertEqual(float(tarinfo.pax_headers["ctime"]), 1041808783.0)
finally:
tar.close()
class WriteTestBase(unittest.TestCase): class WriteTestBase(unittest.TestCase):
...@@ -773,52 +801,60 @@ class WriteTest(WriteTestBase): ...@@ -773,52 +801,60 @@ class WriteTest(WriteTestBase):
# a trailing '\0'. # a trailing '\0'.
name = "0123456789" * 10 name = "0123456789" * 10
tar = tarfile.open(tmpname, self.mode) tar = tarfile.open(tmpname, self.mode)
t = tarfile.TarInfo(name) try:
tar.addfile(t) t = tarfile.TarInfo(name)
tar.close() tar.addfile(t)
finally:
tar.close()
tar = tarfile.open(tmpname) tar = tarfile.open(tmpname)
self.assertTrue(tar.getnames()[0] == name, try:
"failed to store 100 char filename") self.assertTrue(tar.getnames()[0] == name,
tar.close() "failed to store 100 char filename")
finally:
tar.close()
def test_tar_size(self): def test_tar_size(self):
# Test for bug #1013882. # Test for bug #1013882.
tar = tarfile.open(tmpname, self.mode) tar = tarfile.open(tmpname, self.mode)
path = os.path.join(TEMPDIR, "file") try:
fobj = open(path, "wb") path = os.path.join(TEMPDIR, "file")
fobj.write("aaa") with open(path, "wb") as fobj:
fobj.close() fobj.write("aaa")
tar.add(path) tar.add(path)
tar.close() finally:
tar.close()
self.assertTrue(os.path.getsize(tmpname) > 0, self.assertTrue(os.path.getsize(tmpname) > 0,
"tarfile is empty") "tarfile is empty")
# The test_*_size tests test for bug #1167128. # The test_*_size tests test for bug #1167128.
def test_file_size(self): def test_file_size(self):
tar = tarfile.open(tmpname, self.mode) tar = tarfile.open(tmpname, self.mode)
try:
path = os.path.join(TEMPDIR, "file") path = os.path.join(TEMPDIR, "file")
fobj = open(path, "wb") with open(path, "wb"):
fobj.close() pass
tarinfo = tar.gettarinfo(path) tarinfo = tar.gettarinfo(path)
self.assertEqual(tarinfo.size, 0) self.assertEqual(tarinfo.size, 0)
fobj = open(path, "wb")
fobj.write("aaa")
fobj.close()
tarinfo = tar.gettarinfo(path)
self.assertEqual(tarinfo.size, 3)
tar.close() with open(path, "wb") as fobj:
fobj.write("aaa")
tarinfo = tar.gettarinfo(path)
self.assertEqual(tarinfo.size, 3)
finally:
tar.close()
def test_directory_size(self): def test_directory_size(self):
path = os.path.join(TEMPDIR, "directory") path = os.path.join(TEMPDIR, "directory")
os.mkdir(path) os.mkdir(path)
try: try:
tar = tarfile.open(tmpname, self.mode) tar = tarfile.open(tmpname, self.mode)
tarinfo = tar.gettarinfo(path) try:
self.assertEqual(tarinfo.size, 0) tarinfo = tar.gettarinfo(path)
self.assertEqual(tarinfo.size, 0)
finally:
tar.close()
finally: finally:
os.rmdir(path) os.rmdir(path)
...@@ -826,16 +862,18 @@ class WriteTest(WriteTestBase): ...@@ -826,16 +862,18 @@ class WriteTest(WriteTestBase):
if hasattr(os, "link"): if hasattr(os, "link"):
link = os.path.join(TEMPDIR, "link") link = os.path.join(TEMPDIR, "link")
target = os.path.join(TEMPDIR, "link_target") target = os.path.join(TEMPDIR, "link_target")
fobj = open(target, "wb") with open(target, "wb") as fobj:
fobj.write("aaa") fobj.write("aaa")
fobj.close()
os.link(target, link) os.link(target, link)
try: try:
tar = tarfile.open(tmpname, self.mode) tar = tarfile.open(tmpname, self.mode)
# Record the link target in the inodes list. try:
tar.gettarinfo(target) # Record the link target in the inodes list.
tarinfo = tar.gettarinfo(link) tar.gettarinfo(target)
self.assertEqual(tarinfo.size, 0) tarinfo = tar.gettarinfo(link)
self.assertEqual(tarinfo.size, 0)
finally:
tar.close()
finally: finally:
os.remove(target) os.remove(target)
os.remove(link) os.remove(link)
...@@ -846,26 +884,30 @@ class WriteTest(WriteTestBase): ...@@ -846,26 +884,30 @@ class WriteTest(WriteTestBase):
os.symlink("link_target", path) os.symlink("link_target", path)
try: try:
tar = tarfile.open(tmpname, self.mode) tar = tarfile.open(tmpname, self.mode)
tarinfo = tar.gettarinfo(path) try:
self.assertEqual(tarinfo.size, 0) tarinfo = tar.gettarinfo(path)
self.assertEqual(tarinfo.size, 0)
finally:
tar.close()
finally: finally:
os.remove(path) os.remove(path)
def test_add_self(self): def test_add_self(self):
# Test for #1257255. # Test for #1257255.
dstname = os.path.abspath(tmpname) dstname = os.path.abspath(tmpname)
tar = tarfile.open(tmpname, self.mode) tar = tarfile.open(tmpname, self.mode)
self.assertTrue(tar.name == dstname, "archive name must be absolute") try:
self.assertTrue(tar.name == dstname, "archive name must be absolute")
tar.add(dstname) tar.add(dstname)
self.assertTrue(tar.getnames() == [], "added the archive to itself") self.assertTrue(tar.getnames() == [], "added the archive to itself")
cwd = os.getcwd() cwd = os.getcwd()
os.chdir(TEMPDIR) os.chdir(TEMPDIR)
tar.add(dstname) tar.add(dstname)
os.chdir(cwd) os.chdir(cwd)
self.assertTrue(tar.getnames() == [], "added the archive to itself") self.assertTrue(tar.getnames() == [], "added the archive to itself")
finally:
tar.close()
def test_exclude(self): def test_exclude(self):
tempdir = os.path.join(TEMPDIR, "exclude") tempdir = os.path.join(TEMPDIR, "exclude")
...@@ -878,14 +920,19 @@ class WriteTest(WriteTestBase): ...@@ -878,14 +920,19 @@ class WriteTest(WriteTestBase):
exclude = os.path.isfile exclude = os.path.isfile
tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1") tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1")
with test_support.check_warnings(("use the filter argument", try:
DeprecationWarning)): with test_support.check_warnings(("use the filter argument",
tar.add(tempdir, arcname="empty_dir", exclude=exclude) DeprecationWarning)):
tar.close() tar.add(tempdir, arcname="empty_dir", exclude=exclude)
finally:
tar.close()
tar = tarfile.open(tmpname, "r") tar = tarfile.open(tmpname, "r")
self.assertEqual(len(tar.getmembers()), 1) try:
self.assertEqual(tar.getnames()[0], "empty_dir") self.assertEqual(len(tar.getmembers()), 1)
self.assertEqual(tar.getnames()[0], "empty_dir")
finally:
tar.close()
finally: finally:
shutil.rmtree(tempdir) shutil.rmtree(tempdir)
...@@ -905,15 +952,19 @@ class WriteTest(WriteTestBase): ...@@ -905,15 +952,19 @@ class WriteTest(WriteTestBase):
return tarinfo return tarinfo
tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1") tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1")
tar.add(tempdir, arcname="empty_dir", filter=filter) try:
tar.close() tar.add(tempdir, arcname="empty_dir", filter=filter)
finally:
tar.close()
tar = tarfile.open(tmpname, "r") tar = tarfile.open(tmpname, "r")
for tarinfo in tar: try:
self.assertEqual(tarinfo.uid, 123) for tarinfo in tar:
self.assertEqual(tarinfo.uname, "foo") self.assertEqual(tarinfo.uid, 123)
self.assertEqual(len(tar.getmembers()), 3) self.assertEqual(tarinfo.uname, "foo")
tar.close() self.assertEqual(len(tar.getmembers()), 3)
finally:
tar.close()
finally: finally:
shutil.rmtree(tempdir) shutil.rmtree(tempdir)
...@@ -931,12 +982,16 @@ class WriteTest(WriteTestBase): ...@@ -931,12 +982,16 @@ class WriteTest(WriteTestBase):
os.mkdir(foo) os.mkdir(foo)
tar = tarfile.open(tmpname, self.mode) tar = tarfile.open(tmpname, self.mode)
tar.add(foo, arcname=path) try:
tar.close() tar.add(foo, arcname=path)
finally:
tar.close()
tar = tarfile.open(tmpname, "r") tar = tarfile.open(tmpname, "r")
t = tar.next() try:
tar.close() t = tar.next()
finally:
tar.close()
if not dir: if not dir:
os.remove(foo) os.remove(foo)
...@@ -972,16 +1027,18 @@ class WriteTest(WriteTestBase): ...@@ -972,16 +1027,18 @@ class WriteTest(WriteTestBase):
def test_cwd(self): def test_cwd(self):
# Test adding the current working directory. # Test adding the current working directory.
with support.change_cwd(TEMPDIR): with support.change_cwd(TEMPDIR):
open("foo", "w").close()
tar = tarfile.open(tmpname, self.mode) tar = tarfile.open(tmpname, self.mode)
tar.add(".") try:
tar.close() tar.add(".")
finally:
tar.close()
tar = tarfile.open(tmpname, "r") tar = tarfile.open(tmpname, "r")
for t in tar: try:
self.assertTrue(t.name == "." or t.name.startswith("./")) for t in tar:
tar.close() self.assertTrue(t.name == "." or t.name.startswith("./"))
finally:
tar.close()
@unittest.skipUnless(hasattr(os, 'symlink'), "needs os.symlink") @unittest.skipUnless(hasattr(os, 'symlink'), "needs os.symlink")
def test_extractall_symlinks(self): def test_extractall_symlinks(self):
...@@ -1098,19 +1155,18 @@ class StreamWriteTest(WriteTestBase): ...@@ -1098,19 +1155,18 @@ class StreamWriteTest(WriteTestBase):
tar.close() tar.close()
if self.mode.endswith("gz"): if self.mode.endswith("gz"):
fobj = gzip.GzipFile(tmpname) with gzip.GzipFile(tmpname) as fobj:
data = fobj.read() data = fobj.read()
fobj.close()
elif self.mode.endswith("bz2"): elif self.mode.endswith("bz2"):
dec = bz2.BZ2Decompressor() dec = bz2.BZ2Decompressor()
data = open(tmpname, "rb").read() with open(tmpname, "rb") as fobj:
data = fobj.read()
data = dec.decompress(data) data = dec.decompress(data)
self.assertTrue(len(dec.unused_data) == 0, self.assertTrue(len(dec.unused_data) == 0,
"found trailing data") "found trailing data")
else: else:
fobj = open(tmpname, "rb") with open(tmpname, "rb") as fobj:
data = fobj.read() data = fobj.read()
fobj.close()
self.assertTrue(data.count("\0") == tarfile.RECORDSIZE, self.assertTrue(data.count("\0") == tarfile.RECORDSIZE,
"incorrect zero padding") "incorrect zero padding")
...@@ -1171,23 +1227,27 @@ class GNUWriteTest(unittest.TestCase): ...@@ -1171,23 +1227,27 @@ class GNUWriteTest(unittest.TestCase):
tarinfo.type = tarfile.LNKTYPE tarinfo.type = tarfile.LNKTYPE
tar = tarfile.open(tmpname, "w") tar = tarfile.open(tmpname, "w")
tar.format = tarfile.GNU_FORMAT try:
tar.addfile(tarinfo) tar.format = tarfile.GNU_FORMAT
tar.addfile(tarinfo)
v1 = self._calc_size(name, link)
v2 = tar.offset
self.assertTrue(v1 == v2, "GNU longname/longlink creation failed")
tar.close() v1 = self._calc_size(name, link)
v2 = tar.offset
self.assertTrue(v1 == v2, "GNU longname/longlink creation failed")
finally:
tar.close()
tar = tarfile.open(tmpname) tar = tarfile.open(tmpname)
member = tar.next() try:
self.assertIsNotNone(member, member = tar.next()
"unable to read longname member") self.assertIsNotNone(member,
self.assertEqual(tarinfo.name, member.name, "unable to read longname member")
"unable to read longname member") self.assertEqual(tarinfo.name, member.name,
self.assertEqual(tarinfo.linkname, member.linkname, "unable to read longname member")
"unable to read longname member") self.assertEqual(tarinfo.linkname, member.linkname,
"unable to read longname member")
finally:
tar.close()
def test_longname_1023(self): def test_longname_1023(self):
self._test(("longnam/" * 127) + "longnam") self._test(("longnam/" * 127) + "longnam")
...@@ -1227,9 +1287,8 @@ class HardlinkTest(unittest.TestCase): ...@@ -1227,9 +1287,8 @@ class HardlinkTest(unittest.TestCase):
self.foo = os.path.join(TEMPDIR, "foo") self.foo = os.path.join(TEMPDIR, "foo")
self.bar = os.path.join(TEMPDIR, "bar") self.bar = os.path.join(TEMPDIR, "bar")
fobj = open(self.foo, "wb") with open(self.foo, "wb") as fobj:
fobj.write("foo") fobj.write("foo")
fobj.close()
os.link(self.foo, self.bar) os.link(self.foo, self.bar)
...@@ -1238,8 +1297,8 @@ class HardlinkTest(unittest.TestCase): ...@@ -1238,8 +1297,8 @@ class HardlinkTest(unittest.TestCase):
def tearDown(self): def tearDown(self):
self.tar.close() self.tar.close()
os.remove(self.foo) support.unlink(self.foo)
os.remove(self.bar) support.unlink(self.bar)
def test_add_twice(self): def test_add_twice(self):
# The same name will be added as a REGTYPE every # The same name will be added as a REGTYPE every
...@@ -1270,16 +1329,21 @@ class PaxWriteTest(GNUWriteTest): ...@@ -1270,16 +1329,21 @@ class PaxWriteTest(GNUWriteTest):
tarinfo.type = tarfile.LNKTYPE tarinfo.type = tarfile.LNKTYPE
tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT) tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT)
tar.addfile(tarinfo) try:
tar.close() tar.addfile(tarinfo)
finally:
tar.close()
tar = tarfile.open(tmpname) tar = tarfile.open(tmpname)
if link: try:
l = tar.getmembers()[0].linkname if link:
self.assertTrue(link == l, "PAX longlink creation failed") l = tar.getmembers()[0].linkname
else: self.assertTrue(link == l, "PAX longlink creation failed")
n = tar.getmembers()[0].name else:
self.assertTrue(name == n, "PAX longname creation failed") n = tar.getmembers()[0].name
self.assertTrue(name == n, "PAX longname creation failed")
finally:
tar.close()
def test_pax_global_header(self): def test_pax_global_header(self):
pax_headers = { pax_headers = {
...@@ -1291,23 +1355,28 @@ class PaxWriteTest(GNUWriteTest): ...@@ -1291,23 +1355,28 @@ class PaxWriteTest(GNUWriteTest):
tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT,
pax_headers=pax_headers) pax_headers=pax_headers)
tar.addfile(tarfile.TarInfo("test")) try:
tar.close() tar.addfile(tarfile.TarInfo("test"))
finally:
tar.close()
# Test if the global header was written correctly. # Test if the global header was written correctly.
tar = tarfile.open(tmpname, encoding="iso8859-1") tar = tarfile.open(tmpname, encoding="iso8859-1")
self.assertEqual(tar.pax_headers, pax_headers) try:
self.assertEqual(tar.getmembers()[0].pax_headers, pax_headers) self.assertEqual(tar.pax_headers, pax_headers)
self.assertEqual(tar.getmembers()[0].pax_headers, pax_headers)
# Test if all the fields are unicode.
for key, val in tar.pax_headers.iteritems(): # Test if all the fields are unicode.
self.assertTrue(type(key) is unicode) for key, val in tar.pax_headers.iteritems():
self.assertTrue(type(val) is unicode) self.assertTrue(type(key) is unicode)
if key in tarfile.PAX_NUMBER_FIELDS: self.assertTrue(type(val) is unicode)
try: if key in tarfile.PAX_NUMBER_FIELDS:
tarfile.PAX_NUMBER_FIELDS[key](val) try:
except (TypeError, ValueError): tarfile.PAX_NUMBER_FIELDS[key](val)
self.fail("unable to convert pax header field") except (TypeError, ValueError):
self.fail("unable to convert pax header field")
finally:
tar.close()
def test_pax_extended_header(self): def test_pax_extended_header(self):
# The fields from the pax header have priority over the # The fields from the pax header have priority over the
...@@ -1315,18 +1384,23 @@ class PaxWriteTest(GNUWriteTest): ...@@ -1315,18 +1384,23 @@ class PaxWriteTest(GNUWriteTest):
pax_headers = {u"path": u"foo", u"uid": u"123"} pax_headers = {u"path": u"foo", u"uid": u"123"}
tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, encoding="iso8859-1") tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, encoding="iso8859-1")
t = tarfile.TarInfo() try:
t.name = u"\xe4\xf6\xfc" # non-ASCII t = tarfile.TarInfo()
t.uid = 8**8 # too large t.name = u"\xe4\xf6\xfc" # non-ASCII
t.pax_headers = pax_headers t.uid = 8**8 # too large
tar.addfile(t) t.pax_headers = pax_headers
tar.close() tar.addfile(t)
finally:
tar.close()
tar = tarfile.open(tmpname, encoding="iso8859-1") tar = tarfile.open(tmpname, encoding="iso8859-1")
t = tar.getmembers()[0] try:
self.assertEqual(t.pax_headers, pax_headers) t = tar.getmembers()[0]
self.assertEqual(t.name, "foo") self.assertEqual(t.pax_headers, pax_headers)
self.assertEqual(t.uid, 123) self.assertEqual(t.name, "foo")
self.assertEqual(t.uid, 123)
finally:
tar.close()
class UstarUnicodeTest(unittest.TestCase): class UstarUnicodeTest(unittest.TestCase):
...@@ -1345,40 +1419,49 @@ class UstarUnicodeTest(unittest.TestCase): ...@@ -1345,40 +1419,49 @@ class UstarUnicodeTest(unittest.TestCase):
def _test_unicode_filename(self, encoding): def _test_unicode_filename(self, encoding):
tar = tarfile.open(tmpname, "w", format=self.format, encoding=encoding, errors="strict") tar = tarfile.open(tmpname, "w", format=self.format, encoding=encoding, errors="strict")
name = u"\xe4\xf6\xfc" try:
tar.addfile(tarfile.TarInfo(name)) name = u"\xe4\xf6\xfc"
tar.close() tar.addfile(tarfile.TarInfo(name))
finally:
tar.close()
tar = tarfile.open(tmpname, encoding=encoding) tar = tarfile.open(tmpname, encoding=encoding)
self.assertTrue(type(tar.getnames()[0]) is not unicode) try:
self.assertEqual(tar.getmembers()[0].name, name.encode(encoding)) self.assertTrue(type(tar.getnames()[0]) is not unicode)
tar.close() self.assertEqual(tar.getmembers()[0].name, name.encode(encoding))
finally:
tar.close()
def test_unicode_filename_error(self): def test_unicode_filename_error(self):
tar = tarfile.open(tmpname, "w", format=self.format, encoding="ascii", errors="strict") tar = tarfile.open(tmpname, "w", format=self.format, encoding="ascii", errors="strict")
tarinfo = tarfile.TarInfo() try:
tarinfo = tarfile.TarInfo()
tarinfo.name = "\xe4\xf6\xfc" tarinfo.name = "\xe4\xf6\xfc"
if self.format == tarfile.PAX_FORMAT: if self.format == tarfile.PAX_FORMAT:
self.assertRaises(UnicodeError, tar.addfile, tarinfo) self.assertRaises(UnicodeError, tar.addfile, tarinfo)
else: else:
tar.addfile(tarinfo) tar.addfile(tarinfo)
tarinfo.name = u"\xe4\xf6\xfc" tarinfo.name = u"\xe4\xf6\xfc"
self.assertRaises(UnicodeError, tar.addfile, tarinfo) self.assertRaises(UnicodeError, tar.addfile, tarinfo)
tarinfo.name = "foo" tarinfo.name = "foo"
tarinfo.uname = u"\xe4\xf6\xfc" tarinfo.uname = u"\xe4\xf6\xfc"
self.assertRaises(UnicodeError, tar.addfile, tarinfo) self.assertRaises(UnicodeError, tar.addfile, tarinfo)
finally:
tar.close()
def test_unicode_argument(self): def test_unicode_argument(self):
tar = tarfile.open(tarname, "r", encoding="iso8859-1", errors="strict") tar = tarfile.open(tarname, "r", encoding="iso8859-1", errors="strict")
for t in tar: try:
self.assertTrue(type(t.name) is str) for t in tar:
self.assertTrue(type(t.linkname) is str) self.assertTrue(type(t.name) is str)
self.assertTrue(type(t.uname) is str) self.assertTrue(type(t.linkname) is str)
self.assertTrue(type(t.gname) is str) self.assertTrue(type(t.uname) is str)
tar.close() self.assertTrue(type(t.gname) is str)
finally:
tar.close()
def test_uname_unicode(self): def test_uname_unicode(self):
for name in (u"\xe4\xf6\xfc", "\xe4\xf6\xfc"): for name in (u"\xe4\xf6\xfc", "\xe4\xf6\xfc"):
...@@ -1388,8 +1471,10 @@ class UstarUnicodeTest(unittest.TestCase): ...@@ -1388,8 +1471,10 @@ class UstarUnicodeTest(unittest.TestCase):
fobj = StringIO.StringIO() fobj = StringIO.StringIO()
tar = tarfile.open("foo.tar", mode="w", fileobj=fobj, format=self.format, encoding="iso8859-1") tar = tarfile.open("foo.tar", mode="w", fileobj=fobj, format=self.format, encoding="iso8859-1")
tar.addfile(t) try:
tar.close() tar.addfile(t)
finally:
tar.close()
fobj.seek(0) fobj.seek(0)
tar = tarfile.open("foo.tar", fileobj=fobj, encoding="iso8859-1") tar = tarfile.open("foo.tar", fileobj=fobj, encoding="iso8859-1")
...@@ -1447,22 +1532,20 @@ class AppendTest(unittest.TestCase): ...@@ -1447,22 +1532,20 @@ class AppendTest(unittest.TestCase):
os.remove(self.tarname) os.remove(self.tarname)
def _add_testfile(self, fileobj=None): def _add_testfile(self, fileobj=None):
tar = tarfile.open(self.tarname, "a", fileobj=fileobj) with tarfile.open(self.tarname, "a", fileobj=fileobj) as tar:
tar.addfile(tarfile.TarInfo("bar")) tar.addfile(tarfile.TarInfo("bar"))
tar.close()
def _create_testtar(self, mode="w:"): def _create_testtar(self, mode="w:"):
src = tarfile.open(tarname, encoding="iso8859-1") with tarfile.open(tarname, encoding="iso8859-1") as src:
t = src.getmember("ustar/regtype") t = src.getmember("ustar/regtype")
t.name = "foo" t.name = "foo"
f = src.extractfile(t) f = src.extractfile(t)
tar = tarfile.open(self.tarname, mode) with tarfile.open(self.tarname, mode) as tar:
tar.addfile(t, f) tar.addfile(t, f)
tar.close()
def _test(self, names=["bar"], fileobj=None): def _test(self, names=["bar"], fileobj=None):
tar = tarfile.open(self.tarname, fileobj=fileobj) with tarfile.open(self.tarname, fileobj=fileobj) as tar:
self.assertEqual(tar.getnames(), names) self.assertEqual(tar.getnames(), names)
def test_non_existing(self): def test_non_existing(self):
self._add_testfile() self._add_testfile()
...@@ -1481,7 +1564,8 @@ class AppendTest(unittest.TestCase): ...@@ -1481,7 +1564,8 @@ class AppendTest(unittest.TestCase):
def test_fileobj(self): def test_fileobj(self):
self._create_testtar() self._create_testtar()
data = open(self.tarname).read() with open(self.tarname) as fobj:
data = fobj.read()
fobj = StringIO.StringIO(data) fobj = StringIO.StringIO(data)
self._add_testfile(fobj) self._add_testfile(fobj)
fobj.seek(0) fobj.seek(0)
...@@ -1505,7 +1589,8 @@ class AppendTest(unittest.TestCase): ...@@ -1505,7 +1589,8 @@ class AppendTest(unittest.TestCase):
# Append mode is supposed to fail if the tarfile to append to # Append mode is supposed to fail if the tarfile to append to
# does not end with a zero block. # does not end with a zero block.
def _test_error(self, data): def _test_error(self, data):
open(self.tarname, "wb").write(data) with open(self.tarname, "wb") as fobj:
fobj.write(data)
self.assertRaises(tarfile.ReadError, self._add_testfile) self.assertRaises(tarfile.ReadError, self._add_testfile)
def test_null(self): def test_null(self):
...@@ -1641,15 +1726,14 @@ class ContextManagerTest(unittest.TestCase): ...@@ -1641,15 +1726,14 @@ class ContextManagerTest(unittest.TestCase):
def test_fileobj(self): def test_fileobj(self):
# Test that __exit__() did not close the external file # Test that __exit__() did not close the external file
# object. # object.
fobj = open(tmpname, "wb") with open(tmpname, "wb") as fobj:
try: try:
with tarfile.open(fileobj=fobj, mode="w") as tar: with tarfile.open(fileobj=fobj, mode="w") as tar:
raise Exception raise Exception
except: except:
pass pass
self.assertFalse(fobj.closed, "external file object was closed") self.assertFalse(fobj.closed, "external file object was closed")
self.assertTrue(tar.closed, "context manager failed") self.assertTrue(tar.closed, "context manager failed")
fobj.close()
class LinkEmulationTest(ReadTest): class LinkEmulationTest(ReadTest):
...@@ -1737,6 +1821,7 @@ class Bz2PartialReadTest(unittest.TestCase): ...@@ -1737,6 +1821,7 @@ class Bz2PartialReadTest(unittest.TestCase):
def test_main(): def test_main():
support.unlink(TEMPDIR)
os.makedirs(TEMPDIR) os.makedirs(TEMPDIR)
tests = [ tests = [
...@@ -1766,15 +1851,14 @@ def test_main(): ...@@ -1766,15 +1851,14 @@ def test_main():
else: else:
tests.append(LinkEmulationTest) tests.append(LinkEmulationTest)
fobj = open(tarname, "rb") with open(tarname, "rb") as fobj:
data = fobj.read() data = fobj.read()
fobj.close()
if gzip: if gzip:
# Create testtar.tar.gz and add gzip-specific tests. # Create testtar.tar.gz and add gzip-specific tests.
tar = gzip.open(gzipname, "wb") support.unlink(gzipname)
tar.write(data) with gzip.open(gzipname, "wb") as tar:
tar.close() tar.write(data)
tests += [ tests += [
GzipMiscReadTest, GzipMiscReadTest,
...@@ -1787,9 +1871,12 @@ def test_main(): ...@@ -1787,9 +1871,12 @@ def test_main():
if bz2: if bz2:
# Create testtar.tar.bz2 and add bz2-specific tests. # Create testtar.tar.bz2 and add bz2-specific tests.
support.unlink(bz2name)
tar = bz2.BZ2File(bz2name, "wb") tar = bz2.BZ2File(bz2name, "wb")
tar.write(data) try:
tar.close() tar.write(data)
finally:
tar.close()
tests += [ tests += [
Bz2MiscReadTest, Bz2MiscReadTest,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment