test_dbm_dumb.py 9.66 KB
Newer Older
1 2 3 4
"""Test script for the dumbdbm module
   Original by Roger E. Masse
"""

5
import io
6
import operator
7
import os
8
import stat
9
import unittest
10
import dbm.dumb as dumbdbm
11
from test import support
12
from functools import partial
13

14
_fname = support.TESTFN
15 16 17 18 19 20 21 22

def _delete_files():
    for ext in [".dir", ".dat", ".bak"]:
        try:
            os.unlink(_fname + ext)
        except OSError:
            pass

23
class DumbDBMTestCase(unittest.TestCase):
24 25 26 27 28 29 30 31
    _dict = {b'0': b'',
             b'a': b'Python:',
             b'b': b'Programming',
             b'c': b'the',
             b'd': b'way',
             b'f': b'Guido',
             b'g': b'intended',
             '\u00fc'.encode('utf-8') : b'!',
32 33 34
             }

    def test_dumbdbm_creation(self):
35
        f = dumbdbm.open(_fname, 'c')
36
        self.assertEqual(list(f.keys()), [])
37
        for key in self._dict:
38
            f[key] = self._dict[key]
39 40 41
        self.read_helper(f)
        f.close()

42 43
    @unittest.skipUnless(hasattr(os, 'umask'), 'test needs os.umask()')
    @unittest.skipUnless(hasattr(os, 'chmod'), 'test needs os.chmod()')
44 45
    def test_dumbdbm_creation_mode(self):
        try:
46 47
            old_umask = os.umask(0o002)
            f = dumbdbm.open(_fname, 'c', 0o637)
48 49 50
            f.close()
        finally:
            os.umask(old_umask)
51

52
        expected_mode = 0o635
53 54 55
        if os.name != 'posix':
            # Windows only supports setting the read-only attribute.
            # This shouldn't fail, but doesn't work like Unix either.
56
            expected_mode = 0o666
57

58 59
        import stat
        st = os.stat(_fname + '.dat')
60
        self.assertEqual(stat.S_IMODE(st.st_mode), expected_mode)
61
        st = os.stat(_fname + '.dir')
62
        self.assertEqual(stat.S_IMODE(st.st_mode), expected_mode)
63

64 65
    def test_close_twice(self):
        f = dumbdbm.open(_fname)
66 67
        f[b'a'] = b'b'
        self.assertEqual(f[b'a'], b'b')
68 69 70
        f.close()
        f.close()

71
    def test_dumbdbm_modification(self):
72
        self.init_db()
73
        f = dumbdbm.open(_fname, 'w')
74
        self._dict[b'g'] = f[b'g'] = b"indented"
75 76 77 78
        self.read_helper(f)
        f.close()

    def test_dumbdbm_read(self):
79
        self.init_db()
80
        f = dumbdbm.open(_fname, 'r')
81
        self.read_helper(f)
82 83 84 85 86 87
        with self.assertWarnsRegex(DeprecationWarning,
                                   'The database is opened for reading only'):
            f[b'g'] = b'x'
        with self.assertWarnsRegex(DeprecationWarning,
                                   'The database is opened for reading only'):
            del f[b'a']
88 89 90
        f.close()

    def test_dumbdbm_keys(self):
91
        self.init_db()
92
        f = dumbdbm.open(_fname)
93 94 95
        keys = self.keys_helper(f)
        f.close()

96 97 98
    def test_write_contains(self):
        f = dumbdbm.open(_fname)
        f[b'1'] = b'hello'
99
        self.assertIn(b'1', f)
100 101
        f.close()

102 103 104
    def test_write_write_read(self):
        # test for bug #482460
        f = dumbdbm.open(_fname)
105 106
        f[b'1'] = b'hello'
        f[b'1'] = b'hello2'
107 108
        f.close()
        f = dumbdbm.open(_fname)
109
        self.assertEqual(f[b'1'], b'hello2')
110 111
        f.close()

112 113 114 115 116 117 118 119 120
    def test_str_read(self):
        self.init_db()
        f = dumbdbm.open(_fname, 'r')
        self.assertEqual(f['\u00fc'], self._dict['\u00fc'.encode('utf-8')])

    def test_str_write_contains(self):
        self.init_db()
        f = dumbdbm.open(_fname)
        f['\u00fc'] = b'!'
121
        f['1'] = 'a'
122 123
        f.close()
        f = dumbdbm.open(_fname, 'r')
124
        self.assertIn('\u00fc', f)
125 126
        self.assertEqual(f['\u00fc'.encode('utf-8')],
                         self._dict['\u00fc'.encode('utf-8')])
127
        self.assertEqual(f[b'1'], b'a')
128

129 130 131 132
    def test_line_endings(self):
        # test for bug #1172763: dumbdbm would die if the line endings
        # weren't what was expected.
        f = dumbdbm.open(_fname)
133 134
        f[b'1'] = b'hello'
        f[b'2'] = b'hello2'
135 136
        f.close()

Christian Heimes's avatar
Christian Heimes committed
137
        # Mangle the file by changing the line separator to Windows or Unix
138 139
        with io.open(_fname + '.dir', 'rb') as file:
            data = file.read()
140
        if os.linesep == '\n':
Christian Heimes's avatar
Christian Heimes committed
141 142 143
            data = data.replace(b'\n', b'\r\n')
        else:
            data = data.replace(b'\r\n', b'\n')
144 145
        with io.open(_fname + '.dir', 'wb') as file:
            file.write(data)
146

147
        f = dumbdbm.open(_fname)
148 149
        self.assertEqual(f[b'1'], b'hello')
        self.assertEqual(f[b'2'], b'hello2')
150 151


152 153 154
    def read_helper(self, f):
        keys = self.keys_helper(f)
        for key in self._dict:
155
            self.assertEqual(self._dict[key], f[key])
Tim Peters's avatar
Tim Peters committed
156

157
    def init_db(self):
158
        f = dumbdbm.open(_fname, 'n')
159
        for k in self._dict:
160
            f[k] = self._dict[k]
161 162
        f.close()

163
    def keys_helper(self, f):
164
        keys = sorted(f.keys())
165
        dkeys = sorted(self._dict.keys())
166
        self.assertEqual(keys, dkeys)
167 168
        return keys

169 170 171 172 173 174 175 176 177 178 179 180
    # Perform randomized operations.  This doesn't make assumptions about
    # what *might* fail.
    def test_random(self):
        import random
        d = {}  # mirror the database
        for dummy in range(5):
            f = dumbdbm.open(_fname)
            for dummy in range(100):
                k = random.choice('abcdefghijklm')
                if random.random() < 0.2:
                    if k in d:
                        del d[k]
181
                        del f[k]
182
                else:
183
                    v = random.choice((b'a', b'b', b'c')) * random.randrange(10000)
184
                    d[k] = v
185 186
                    f[k] = v
                    self.assertEqual(f[k], v)
187 188 189
            f.close()

            f = dumbdbm.open(_fname)
190
            expected = sorted((k.encode("latin-1"), v) for k, v in d.items())
191
            got = sorted(f.items())
192 193 194
            self.assertEqual(expected, got)
            f.close()

195 196 197 198 199 200 201
    def test_context_manager(self):
        with dumbdbm.open(_fname, 'c') as db:
            db["dumbdbm context manager"] = "context manager"

        with dumbdbm.open(_fname, 'r') as db:
            self.assertEqual(list(db.keys()), [b"dumbdbm context manager"])

202
        with self.assertRaises(dumbdbm.error):
203 204
            db.keys()

205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226
    def test_check_closed(self):
        f = dumbdbm.open(_fname, 'c')
        f.close()

        for meth in (partial(operator.delitem, f),
                     partial(operator.setitem, f, 'b'),
                     partial(operator.getitem, f),
                     partial(operator.contains, f)):
            with self.assertRaises(dumbdbm.error) as cm:
                meth('test')
            self.assertEqual(str(cm.exception),
                             "DBM object has already been closed")

        for meth in (operator.methodcaller('keys'),
                     operator.methodcaller('iterkeys'),
                     operator.methodcaller('items'),
                     len):
            with self.assertRaises(dumbdbm.error) as cm:
                meth(f)
            self.assertEqual(str(cm.exception),
                             "DBM object has already been closed")

227 228 229 230 231 232 233 234
    def test_create_new(self):
        with dumbdbm.open(_fname, 'n') as f:
            for k in self._dict:
                f[k] = self._dict[k]

        with dumbdbm.open(_fname, 'n') as f:
            self.assertEqual(f.keys(), [])

235 236 237 238 239 240 241 242 243
    def test_eval(self):
        with open(_fname + '.dir', 'w') as stream:
            stream.write("str(print('Hacked!')), 0\n")
        with support.captured_stdout() as stdout:
            with self.assertRaises(ValueError):
                with dumbdbm.open(_fname) as f:
                    pass
            self.assertEqual(stdout.getvalue(), '')

244 245 246 247 248 249 250 251 252 253
    def test_warn_on_ignored_flags(self):
        for value in ('r', 'w'):
            _delete_files()
            with self.assertWarnsRegex(DeprecationWarning,
                                       "The database file is missing, the "
                                       "semantics of the 'c' flag will "
                                       "be used."):
                f = dumbdbm.open(_fname, value)
            f.close()

254 255 256 257 258 259 260 261 262 263 264 265 266 267
    def test_missing_index(self):
        with dumbdbm.open(_fname, 'n') as f:
            pass
        os.unlink(_fname + '.dir')
        for value in ('r', 'w'):
            with self.assertWarnsRegex(DeprecationWarning,
                                       "The index file is missing, the "
                                       "semantics of the 'c' flag will "
                                       "be used."):
                f = dumbdbm.open(_fname, value)
            f.close()
            self.assertEqual(os.path.exists(_fname + '.dir'), value == 'w')
            self.assertFalse(os.path.exists(_fname + '.bak'))

268 269 270 271 272 273 274 275
    def test_invalid_flag(self):
        for flag in ('x', 'rf', None):
            with self.assertWarnsRegex(DeprecationWarning,
                                       "Flag must be one of "
                                       "'r', 'w', 'c', or 'n'"):
                f = dumbdbm.open(_fname, flag)
            f.close()

276 277 278 279 280 281 282 283 284 285 286 287 288 289 290
    @unittest.skipUnless(hasattr(os, 'chmod'), 'test needs os.chmod()')
    def test_readonly_files(self):
        with support.temp_dir() as dir:
            fname = os.path.join(dir, 'db')
            with dumbdbm.open(fname, 'n') as f:
                self.assertEqual(list(f.keys()), [])
                for key in self._dict:
                    f[key] = self._dict[key]
            os.chmod(fname + ".dir", stat.S_IRUSR)
            os.chmod(fname + ".dat", stat.S_IRUSR)
            os.chmod(dir, stat.S_IRUSR|stat.S_IXUSR)
            with dumbdbm.open(fname, 'r') as f:
                self.assertEqual(sorted(f.keys()), sorted(self._dict))
                f.close()  # don't write

291 292 293 294 295 296
    def tearDown(self):
        _delete_files()

    def setUp(self):
        _delete_files()

297 298

if __name__ == "__main__":
299
    unittest.main()