test_recno.py 6.69 KB
Newer Older
1
"""TestCases for exercising a Recno DB.
2 3
"""

4 5
import os
import sys
6
import errno
7 8 9 10
import tempfile
from pprint import pprint
import unittest

11
from test_all import verbose
12

13
try:
14 15 16
    # For Pythons w/distutils pybsddb
    from bsddb3 import db
except ImportError:
17 18 19
    # For Python 2.3
    from bsddb import db

20 21 22
letters = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'


23 24 25 26 27 28 29 30 31
#----------------------------------------------------------------------

class SimpleRecnoTestCase(unittest.TestCase):
    def setUp(self):
        self.filename = tempfile.mktemp()

    def tearDown(self):
        try:
            os.remove(self.filename)
32 33
        except OSError, e:
            if e.errno <> errno.EEXIST: raise
34 35 36 37 38

    def test01_basic(self):
        d = db.DB()
        d.open(self.filename, db.DB_RECNO, db.DB_CREATE)

39
        for x in letters:
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135
            recno = d.append(x * 60)
            assert type(recno) == type(0)
            assert recno >= 1
            if verbose:
                print recno,

        if verbose: print

        stat = d.stat()
        if verbose:
            pprint(stat)

        for recno in range(1, len(d)+1):
            data = d[recno]
            if verbose:
                print data

            assert type(data) == type("")
            assert data == d.get(recno)

        try:
            data = d[0]  # This should raise a KeyError!?!?!
        except db.DBInvalidArgError, val:
            assert val[0] == db.EINVAL
            if verbose: print val
        else:
            self.fail("expected exception")

        try:
            data = d[100]
        except KeyError:
            pass
        else:
            self.fail("expected exception")

        data = d.get(100)
        assert data == None

        keys = d.keys()
        if verbose:
            print keys
        assert type(keys) == type([])
        assert type(keys[0]) == type(123)
        assert len(keys) == len(d)

        items = d.items()
        if verbose:
            pprint(items)
        assert type(items) == type([])
        assert type(items[0]) == type(())
        assert len(items[0]) == 2
        assert type(items[0][0]) == type(123)
        assert type(items[0][1]) == type("")
        assert len(items) == len(d)

        assert d.has_key(25)

        del d[25]
        assert not d.has_key(25)

        d.delete(13)
        assert not d.has_key(13)

        data = d.get_both(26, "z" * 60)
        assert data == "z" * 60
        if verbose:
            print data

        fd = d.fd()
        if verbose:
            print fd

        c = d.cursor()
        rec = c.first()
        while rec:
            if verbose:
                print rec
            rec = c.next()

        c.set(50)
        rec = c.current()
        if verbose:
            print rec

        c.put(-1, "a replacement record", db.DB_CURRENT)

        c.set(50)
        rec = c.current()
        assert rec == (50, "a replacement record")
        if verbose:
            print rec

        rec = c.set_range(30)
        if verbose:
            print rec

136 137 138 139 140 141 142
        # test that non-existant key lookups work (and that
        # DBC_set_range doesn't have a memleak under valgrind)
        rec = c.set_range(999999)
        assert rec == None
        if verbose:
            print rec

143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179
        c.close()
        d.close()

        d = db.DB()
        d.open(self.filename)
        c = d.cursor()

        # put a record beyond the consecutive end of the recno's
        d[100] = "way out there"
        assert d[100] == "way out there"

        try:
            data = d[99]
        except KeyError:
            pass
        else:
            self.fail("expected exception")

        try:
            d.get(99)
        except db.DBKeyEmptyError, val:
            assert val[0] == db.DB_KEYEMPTY
            if verbose: print val
        else:
            self.fail("expected exception")

        rec = c.set(40)
        while rec:
            if verbose:
                print rec
            rec = c.next()

        c.close()
        d.close()

    def test02_WithSource(self):
        """
180 181 182 183
        A Recno file that is given a "backing source file" is essentially a
        simple ASCII file.  Normally each record is delimited by \n and so is
        just a line in the file, but you can set a different record delimiter
        if needed.
184
        """
185 186
        source = os.path.join(os.path.dirname(sys.argv[0]),
                              'db_home/test_recno.txt')
187 188
        if not os.path.isdir('db_home'):
            os.mkdir('db_home')
189 190 191 192
        f = open(source, 'w') # create the file
        f.close()

        d = db.DB()
193 194
        # This is the default value, just checking if both int
        d.set_re_delim(0x0A)
195 196 197 198
        d.set_re_delim('\n')  # and char can be used...
        d.set_re_source(source)
        d.open(self.filename, db.DB_RECNO, db.DB_CREATE)

199
        data = "The quick brown fox jumped over the lazy dog".split()
200 201 202 203 204 205 206
        for datum in data:
            d.append(datum)
        d.sync()
        d.close()

        # get the text from the backing source
        text = open(source, 'r').read()
207
        text = text.strip()
208 209 210
        if verbose:
            print text
            print data
211
            print text.split('\n')
212

213
        assert text.split('\n') == data
214 215 216 217 218 219 220 221 222 223 224 225 226

        # open as a DB again
        d = db.DB()
        d.set_re_source(source)
        d.open(self.filename, db.DB_RECNO)

        d[3] = 'reddish-brown'
        d[8] = 'comatose'

        d.sync()
        d.close()

        text = open(source, 'r').read()
227
        text = text.strip()
228 229
        if verbose:
            print text
230
            print text.split('\n')
231

232 233
        assert text.split('\n') == \
             "The quick reddish-brown fox jumped over the comatose dog".split()
234 235 236 237 238 239 240 241

    def test03_FixedLength(self):
        d = db.DB()
        d.set_re_len(40)  # fixed length records, 40 bytes long
        d.set_re_pad('-') # sets the pad character...
        d.set_re_pad(45)  # ...test both int and char
        d.open(self.filename, db.DB_RECNO, db.DB_CREATE)

242
        for x in letters:
243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264
            d.append(x * 35)    # These will be padded

        d.append('.' * 40)      # this one will be exact

        try:                    # this one will fail
            d.append('bad' * 20)
        except db.DBInvalidArgError, val:
            assert val[0] == db.EINVAL
            if verbose: print val
        else:
            self.fail("expected exception")

        c = d.cursor()
        rec = c.first()
        while rec:
            if verbose:
                print rec
            rec = c.next()

        c.close()
        d.close()

265

266 267 268
#----------------------------------------------------------------------


269
def test_suite():
270 271 272 273
    return unittest.makeSuite(SimpleRecnoTestCase)


if __name__ == '__main__':
274
    unittest.main(defaultTest='test_suite')