test_largefile.py 6.4 KB
Newer Older
Christian Heimes's avatar
Christian Heimes committed
1 2
"""Test largefile support on system where this makes sense.
"""
3

Christian Heimes's avatar
Christian Heimes committed
4 5 6 7
import os
import stat
import sys
import unittest
8
from test.support import TESTFN, requires, unlink
9 10
import io  # C implementation of io
import _pyio as pyio # Python implementation of io
11

12
# size of file to create (>2GB; 2GB == 2147483648 bytes)
13
size = 2500000000
14

15 16 17 18
class LargeFileTest:
    """Test that each file function works as expected for large
    (i.e. > 2GB) files.
    """
19

20 21 22 23 24
    def setUp(self):
        if os.path.exists(TESTFN):
            mode = 'r+b'
        else:
            mode = 'w+b'
25

26 27 28 29
        with self.open(TESTFN, mode) as f:
            current_size = os.fstat(f.fileno())[stat.ST_SIZE]
            if current_size == size+1:
                return
30

31 32
            if current_size == 0:
                f.write(b'z')
Christian Heimes's avatar
Christian Heimes committed
33 34 35

            f.seek(0)
            f.seek(size)
36
            f.write(b'a')
Christian Heimes's avatar
Christian Heimes committed
37 38 39
            f.flush()
            self.assertEqual(os.fstat(f.fileno())[stat.ST_SIZE], size+1)

40 41 42 43 44 45 46 47
    @classmethod
    def tearDownClass(cls):
        with cls.open(TESTFN, 'wb'):
            pass
        if not os.stat(TESTFN)[stat.ST_SIZE] == 0:
            raise cls.failureException('File was not truncated by opening '
                                       'with mode "wb"')

Christian Heimes's avatar
Christian Heimes committed
48 49 50 51
    def test_osstat(self):
        self.assertEqual(os.stat(TESTFN)[stat.ST_SIZE], size+1)

    def test_seek_read(self):
52
        with self.open(TESTFN, 'rb') as f:
Christian Heimes's avatar
Christian Heimes committed
53
            self.assertEqual(f.tell(), 0)
54
            self.assertEqual(f.read(1), b'z')
Christian Heimes's avatar
Christian Heimes committed
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
            self.assertEqual(f.tell(), 1)
            f.seek(0)
            self.assertEqual(f.tell(), 0)
            f.seek(0, 0)
            self.assertEqual(f.tell(), 0)
            f.seek(42)
            self.assertEqual(f.tell(), 42)
            f.seek(42, 0)
            self.assertEqual(f.tell(), 42)
            f.seek(42, 1)
            self.assertEqual(f.tell(), 84)
            f.seek(0, 1)
            self.assertEqual(f.tell(), 84)
            f.seek(0, 2)  # seek from the end
            self.assertEqual(f.tell(), size + 1 + 0)
            f.seek(-10, 2)
            self.assertEqual(f.tell(), size + 1 - 10)
            f.seek(-size-1, 2)
            self.assertEqual(f.tell(), 0)
            f.seek(size)
            self.assertEqual(f.tell(), size)
            # the 'a' that was written at the end of file above
77
            self.assertEqual(f.read(1), b'a')
Christian Heimes's avatar
Christian Heimes committed
78
            f.seek(-size-1, 1)
79
            self.assertEqual(f.read(1), b'z')
Christian Heimes's avatar
Christian Heimes committed
80 81 82
            self.assertEqual(f.tell(), 1)

    def test_lseek(self):
83
        with self.open(TESTFN, 'rb') as f:
Christian Heimes's avatar
Christian Heimes committed
84 85 86 87 88 89 90 91 92
            self.assertEqual(os.lseek(f.fileno(), 0, 0), 0)
            self.assertEqual(os.lseek(f.fileno(), 42, 0), 42)
            self.assertEqual(os.lseek(f.fileno(), 42, 1), 84)
            self.assertEqual(os.lseek(f.fileno(), 0, 1), 84)
            self.assertEqual(os.lseek(f.fileno(), 0, 2), size+1+0)
            self.assertEqual(os.lseek(f.fileno(), -10, 2), size+1-10)
            self.assertEqual(os.lseek(f.fileno(), -size-1, 2), 0)
            self.assertEqual(os.lseek(f.fileno(), size, 0), size)
            # the 'a' that was written at the end of file above
93
            self.assertEqual(f.read(1), b'a')
Christian Heimes's avatar
Christian Heimes committed
94 95

    def test_truncate(self):
96
        with self.open(TESTFN, 'r+b') as f:
97
            if not hasattr(f, 'truncate'):
98 99
                raise unittest.SkipTest("open().truncate() not available "
                                        "on this system")
Christian Heimes's avatar
Christian Heimes committed
100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
            f.seek(0, 2)
            # else we've lost track of the true size
            self.assertEqual(f.tell(), size+1)
            # Cut it back via seek + truncate with no argument.
            newsize = size - 10
            f.seek(newsize)
            f.truncate()
            self.assertEqual(f.tell(), newsize)  # else pointer moved
            f.seek(0, 2)
            self.assertEqual(f.tell(), newsize)  # else wasn't truncated
            # Ensure that truncate(smaller than true size) shrinks
            # the file.
            newsize -= 1
            f.seek(42)
            f.truncate(newsize)
115
            self.assertEqual(f.tell(), 42)
116 117
            f.seek(0, 2)
            self.assertEqual(f.tell(), newsize)
Christian Heimes's avatar
Christian Heimes committed
118 119 120 121
            # XXX truncate(larger than true size) is ill-defined
            # across platform; cut it waaaaay back
            f.seek(0)
            f.truncate(1)
122
            self.assertEqual(f.tell(), 0)       # else pointer moved
123
            f.seek(0)
Christian Heimes's avatar
Christian Heimes committed
124 125
            self.assertEqual(len(f.read()), 1)  # else wasn't truncated

126 127 128 129 130 131
    def test_seekable(self):
        # Issue #5016; seekable() can return False when the current position
        # is negative when truncated to an int.
        for pos in (2**31-1, 2**31, 2**31+1):
            with self.open(TESTFN, 'rb') as f:
                f.seek(pos)
132
                self.assertTrue(f.seekable())
133

134 135 136 137 138 139 140 141 142
def setUpModule():
    try:
        import signal
        # The default handler for SIGXFSZ is to abort the process.
        # By ignoring it, system calls exceeding the file size resource
        # limit will raise OSError instead of crashing the interpreter.
        signal.signal(signal.SIGXFSZ, signal.SIG_IGN)
    except (ImportError, AttributeError):
        pass
143

Christian Heimes's avatar
Christian Heimes committed
144 145 146 147 148 149 150
    # On Windows and Mac OSX this test comsumes large resources; It
    # takes a long time to build the >2GB file and takes >2GB of disk
    # space therefore the resource must be enabled to run this test.
    # If not, nothing after this line stanza will be executed.
    if sys.platform[:3] == 'win' or sys.platform == 'darwin':
        requires('largefile',
                 'test requires %s bytes and a long time to run' % str(size))
151
    else:
Christian Heimes's avatar
Christian Heimes committed
152 153 154
        # Only run if the current filesystem supports large files.
        # (Skip this test on Windows, since we now always support
        # large files.)
155
        f = open(TESTFN, 'wb', buffering=0)
Christian Heimes's avatar
Christian Heimes committed
156 157 158
        try:
            # 2**31 == 2147483648
            f.seek(2147483649)
159
            # Seeking is not enough of a test: you must write and flush, too!
160
            f.write(b'x')
Christian Heimes's avatar
Christian Heimes committed
161
            f.flush()
162
        except (OSError, OverflowError):
163 164 165
            raise unittest.SkipTest("filesystem does not have "
                                    "largefile support")
        finally:
Christian Heimes's avatar
Christian Heimes committed
166 167
            f.close()
            unlink(TESTFN)
168 169 170 171 172 173 174 175 176 177


class CLargeFileTest(LargeFileTest, unittest.TestCase):
    open = staticmethod(io.open)

class PyLargeFileTest(LargeFileTest, unittest.TestCase):
    open = staticmethod(pyio.open)

def tearDownModule():
    unlink(TESTFN)
Christian Heimes's avatar
Christian Heimes committed
178 179

if __name__ == '__main__':
180
    unittest.main()