Kaydet (Commit) 1c6b1a2b authored tarafından Martin v. Löwis's avatar Martin v. Löwis

Importing test suite from bsddb3 3.4.0 (with modifications).

üst a406b586
This diff is collapsed.
This diff is collapsed.
"""
Test cases adapted from the test_bsddb.py module in Python's
regression test suite.
"""
import sys, os, string
from bsddb import hashopen, btopen, rnopen
import bsddb
import unittest
import tempfile
from test.test_support import verbose
class CompatibilityTestCase(unittest.TestCase):
def setUp(self):
self.filename = tempfile.mktemp()
def tearDown(self):
try:
os.remove(self.filename)
except os.error:
pass
def test01_btopen(self):
self.do_bthash_test(btopen, 'btopen')
def test02_hashopen(self):
self.do_bthash_test(hashopen, 'hashopen')
def test03_rnopen(self):
data = string.split("The quick brown fox jumped over the lazy dog.")
if verbose:
print "\nTesting: rnopen"
f = rnopen(self.filename, 'c')
for x in range(len(data)):
f[x+1] = data[x]
getTest = (f[1], f[2], f[3])
if verbose:
print '%s %s %s' % getTest
assert getTest[1] == 'quick', 'data mismatch!'
f[25] = 'twenty-five'
f.close()
del f
f = rnopen(self.filename, 'w')
f[20] = 'twenty'
def noRec(f):
rec = f[15]
self.assertRaises(KeyError, noRec, f)
def badKey(f):
rec = f['a string']
self.assertRaises(TypeError, badKey, f)
del f[3]
rec = f.first()
while rec:
if verbose:
print rec
try:
rec = f.next()
except KeyError:
break
f.close()
def test04_n_flag(self):
f = hashopen(self.filename, 'n')
f.close()
def do_bthash_test(self, factory, what):
if verbose:
print '\nTesting: ', what
f = factory(self.filename, 'c')
if verbose:
print 'creation...'
# truth test
if f:
if verbose: print "truth test: true"
else:
if verbose: print "truth test: false"
f['0'] = ''
f['a'] = 'Guido'
f['b'] = 'van'
f['c'] = 'Rossum'
f['d'] = 'invented'
f['f'] = 'Python'
if verbose:
print '%s %s %s' % (f['a'], f['b'], f['c'])
if verbose:
print 'key ordering...'
f.set_location(f.first()[0])
while 1:
try:
rec = f.next()
except KeyError:
assert rec == f.last(), 'Error, last <> last!'
f.previous()
break
if verbose:
print rec
assert f.has_key('f'), 'Error, missing key!'
f.sync()
f.close()
# truth test
try:
if f:
if verbose: print "truth test: true"
else:
if verbose: print "truth test: false"
except bsddb.error:
pass
else:
self.fail("Exception expected")
del f
if verbose:
print 'modification...'
f = factory(self.filename, 'w')
f['d'] = 'discovered'
if verbose:
print 'access...'
for key in f.keys():
word = f[key]
if verbose:
print word
def noRec(f):
rec = f['no such key']
self.assertRaises(KeyError, noRec, f)
def badKey(f):
rec = f[15]
self.assertRaises(TypeError, badKey, f)
f.close()
#----------------------------------------------------------------------
def suite():
return unittest.makeSuite(CompatibilityTestCase)
if __name__ == '__main__':
unittest.main( defaultTest='suite' )
import sys, os, string
import unittest
import glob
from bsddb import db, dbobj
#----------------------------------------------------------------------
class dbobjTestCase(unittest.TestCase):
"""Verify that dbobj.DB and dbobj.DBEnv work properly"""
db_home = 'db_home'
db_name = 'test-dbobj.db'
def setUp(self):
homeDir = os.path.join(os.path.dirname(sys.argv[0]), 'db_home')
self.homeDir = homeDir
try: os.mkdir(homeDir)
except os.error: pass
def tearDown(self):
if hasattr(self, 'db'):
del self.db
if hasattr(self, 'env'):
del self.env
files = glob.glob(os.path.join(self.homeDir, '*'))
for file in files:
os.remove(file)
def test01_both(self):
class TestDBEnv(dbobj.DBEnv): pass
class TestDB(dbobj.DB):
def put(self, key, *args, **kwargs):
key = string.upper(key)
# call our parent classes put method with an upper case key
return apply(dbobj.DB.put, (self, key) + args, kwargs)
self.env = TestDBEnv()
self.env.open(self.db_home, db.DB_CREATE | db.DB_INIT_MPOOL)
self.db = TestDB(self.env)
self.db.open(self.db_name, db.DB_HASH, db.DB_CREATE)
self.db.put('spam', 'eggs')
assert self.db.get('spam') == None, "overridden dbobj.DB.put() method failed [1]"
assert self.db.get('SPAM') == 'eggs', "overridden dbobj.DB.put() method failed [2]"
self.db.close()
self.env.close()
def test02_dbobj_dict_interface(self):
self.env = dbobj.DBEnv()
self.env.open(self.db_home, db.DB_CREATE | db.DB_INIT_MPOOL)
self.db = dbobj.DB(self.env)
self.db.open(self.db_name+'02', db.DB_HASH, db.DB_CREATE)
# __setitem__
self.db['spam'] = 'eggs'
# __len__
assert len(self.db) == 1
# __getitem__
assert self.db['spam'] == 'eggs'
# __del__
del self.db['spam']
assert self.db.get('spam') == None, "dbobj __del__ failed"
self.db.close()
self.env.close()
#----------------------------------------------------------------------
def suite():
return unittest.makeSuite(dbobjTestCase)
if __name__ == '__main__':
unittest.main( defaultTest='suite' )
"""
TestCases for checking dbShelve objects.
"""
import sys, os, string
import tempfile, random
from pprint import pprint
from types import *
import unittest
from bsddb import dbshelve, db
from test.test_support import verbose
#----------------------------------------------------------------------
# We want the objects to be comparable so we can test dbshelve.values
# later on.
class DataClass:
def __init__(self):
self.value = random.random()
def __cmp__(self, other):
return cmp(self.value, other)
class DBShelveTestCase(unittest.TestCase):
def setUp(self):
self.filename = tempfile.mktemp()
self.do_open()
def tearDown(self):
self.do_close()
try:
os.remove(self.filename)
except os.error:
pass
def populateDB(self, d):
for x in string.letters:
d['S' + x] = 10 * x # add a string
d['I' + x] = ord(x) # add an integer
d['L' + x] = [x] * 10 # add a list
inst = DataClass() # add an instance
inst.S = 10 * x
inst.I = ord(x)
inst.L = [x] * 10
d['O' + x] = inst
# overridable in derived classes to affect how the shelf is created/opened
def do_open(self):
self.d = dbshelve.open(self.filename)
# and closed...
def do_close(self):
self.d.close()
def test01_basics(self):
if verbose:
print '\n', '-=' * 30
print "Running %s.test01_basics..." % self.__class__.__name__
self.populateDB(self.d)
self.d.sync()
self.do_close()
self.do_open()
d = self.d
l = len(d)
k = d.keys()
s = d.stat()
f = d.fd()
if verbose:
print "length:", l
print "keys:", k
print "stats:", s
assert 0 == d.has_key('bad key')
assert 1 == d.has_key('IA')
assert 1 == d.has_key('OA')
d.delete('IA')
del d['OA']
assert 0 == d.has_key('IA')
assert 0 == d.has_key('OA')
assert len(d) == l-2
values = []
for key in d.keys():
value = d[key]
values.append(value)
if verbose:
print "%s: %s" % (key, value)
self.checkrec(key, value)
dbvalues = d.values()
assert len(dbvalues) == len(d.keys())
values.sort()
dbvalues.sort()
assert values == dbvalues
items = d.items()
assert len(items) == len(values)
for key, value in items:
self.checkrec(key, value)
assert d.get('bad key') == None
assert d.get('bad key', None) == None
assert d.get('bad key', 'a string') == 'a string'
assert d.get('bad key', [1, 2, 3]) == [1, 2, 3]
d.set_get_returns_none(0)
self.assertRaises(db.DBNotFoundError, d.get, 'bad key')
d.set_get_returns_none(1)
d.put('new key', 'new data')
assert d.get('new key') == 'new data'
assert d['new key'] == 'new data'
def test02_cursors(self):
if verbose:
print '\n', '-=' * 30
print "Running %s.test02_cursors..." % self.__class__.__name__
self.populateDB(self.d)
d = self.d
count = 0
c = d.cursor()
rec = c.first()
while rec is not None:
count = count + 1
if verbose:
print rec
key, value = rec
self.checkrec(key, value)
rec = c.next()
assert count == len(d)
count = 0
c = d.cursor()
rec = c.last()
while rec is not None:
count = count + 1
if verbose:
print rec
key, value = rec
self.checkrec(key, value)
rec = c.prev()
assert count == len(d)
c.set('SS')
key, value = c.current()
self.checkrec(key, value)
c.close()
def checkrec(self, key, value):
x = key[1]
if key[0] == 'S':
assert type(value) == StringType
assert value == 10 * x
elif key[0] == 'I':
assert type(value) == IntType
assert value == ord(x)
elif key[0] == 'L':
assert type(value) == ListType
assert value == [x] * 10
elif key[0] == 'O':
assert type(value) == InstanceType
assert value.S == 10 * x
assert value.I == ord(x)
assert value.L == [x] * 10
else:
raise AssertionError, 'Unknown key type, fix the test'
#----------------------------------------------------------------------
class BasicShelveTestCase(DBShelveTestCase):
def do_open(self):
self.d = dbshelve.DBShelf()
self.d.open(self.filename, self.dbtype, self.dbflags)
def do_close(self):
self.d.close()
class BTreeShelveTestCase(BasicShelveTestCase):
dbtype = db.DB_BTREE
dbflags = db.DB_CREATE
class HashShelveTestCase(BasicShelveTestCase):
dbtype = db.DB_BTREE
dbflags = db.DB_CREATE
class ThreadBTreeShelveTestCase(BasicShelveTestCase):
dbtype = db.DB_BTREE
dbflags = db.DB_CREATE | db.DB_THREAD
class ThreadHashShelveTestCase(BasicShelveTestCase):
dbtype = db.DB_BTREE
dbflags = db.DB_CREATE | db.DB_THREAD
#----------------------------------------------------------------------
class BasicEnvShelveTestCase(DBShelveTestCase):
def do_open(self):
self.homeDir = homeDir = os.path.join(os.path.dirname(sys.argv[0]), 'db_home')
try: os.mkdir(homeDir)
except os.error: pass
self.env = db.DBEnv()
self.env.open(homeDir, self.envflags | db.DB_INIT_MPOOL | db.DB_CREATE)
self.filename = os.path.split(self.filename)[1]
self.d = dbshelve.DBShelf(self.env)
self.d.open(self.filename, self.dbtype, self.dbflags)
def do_close(self):
self.d.close()
self.env.close()
def tearDown(self):
self.do_close()
import glob
files = glob.glob(os.path.join(self.homeDir, '*'))
for file in files:
os.remove(file)
class EnvBTreeShelveTestCase(BasicEnvShelveTestCase):
envflags = 0
dbtype = db.DB_BTREE
dbflags = db.DB_CREATE
class EnvHashShelveTestCase(BasicEnvShelveTestCase):
envflags = 0
dbtype = db.DB_BTREE
dbflags = db.DB_CREATE
class EnvThreadBTreeShelveTestCase(BasicEnvShelveTestCase):
envflags = db.DB_THREAD
dbtype = db.DB_BTREE
dbflags = db.DB_CREATE | db.DB_THREAD
class EnvThreadHashShelveTestCase(BasicEnvShelveTestCase):
envflags = db.DB_THREAD
dbtype = db.DB_BTREE
dbflags = db.DB_CREATE | db.DB_THREAD
#----------------------------------------------------------------------
# TODO: Add test cases for a DBShelf in a RECNO DB.
#----------------------------------------------------------------------
def suite():
theSuite = unittest.TestSuite()
theSuite.addTest(unittest.makeSuite(DBShelveTestCase))
theSuite.addTest(unittest.makeSuite(BTreeShelveTestCase))
theSuite.addTest(unittest.makeSuite(HashShelveTestCase))
theSuite.addTest(unittest.makeSuite(ThreadBTreeShelveTestCase))
theSuite.addTest(unittest.makeSuite(ThreadHashShelveTestCase))
theSuite.addTest(unittest.makeSuite(EnvBTreeShelveTestCase))
theSuite.addTest(unittest.makeSuite(EnvHashShelveTestCase))
theSuite.addTest(unittest.makeSuite(EnvThreadBTreeShelveTestCase))
theSuite.addTest(unittest.makeSuite(EnvThreadHashShelveTestCase))
return theSuite
if __name__ == '__main__':
unittest.main( defaultTest='suite' )
This diff is collapsed.
"""
TestCases for checking that it does not segfault when a DBEnv object
is closed before its DB objects.
"""
import sys, os, string
from pprint import pprint
import tempfile
import glob
import unittest
from bsddb import db
from test.test_support import verbose
#----------------------------------------------------------------------
class DBEnvClosedEarlyCrash(unittest.TestCase):
def setUp(self):
self.homeDir = os.path.join(os.path.dirname(sys.argv[0]), 'db_home')
try: os.mkdir(self.homeDir)
except os.error: pass
tempfile.tempdir = self.homeDir
self.filename = os.path.split(tempfile.mktemp())[1]
tempfile.tempdir = None
def tearDown(self):
files = glob.glob(os.path.join(self.homeDir, '*'))
for file in files:
os.remove(file)
def test01_close_dbenv_before_db(self):
dbenv = db.DBEnv()
dbenv.open(self.homeDir,db.DB_INIT_CDB| db.DB_CREATE |db.DB_THREAD|db.DB_INIT_MPOOL, 0666)
d = db.DB(dbenv)
d.open(self.filename, db.DB_BTREE, db.DB_CREATE | db.DB_THREAD, 0666)
try:
dbenv.close()
except db.DBError:
try:
d.close()
except db.DBError:
return
assert 0, "DB close did not raise an exception about its DBEnv being trashed"
assert 0, "dbenv did not raise an exception about its DB being open"
def test02_close_dbenv_delete_db_success(self):
dbenv = db.DBEnv()
dbenv.open(self.homeDir,db.DB_INIT_CDB| db.DB_CREATE |db.DB_THREAD|db.DB_INIT_MPOOL, 0666)
d = db.DB(dbenv)
d.open(self.filename, db.DB_BTREE, db.DB_CREATE | db.DB_THREAD, 0666)
try:
dbenv.close()
except db.DBError:
pass # good, it should raise an exception
# this should not raise an exception, it should silently skip
# the db->close() call as it can't be done safely.
del d
try:
import gc
except ImportError:
gc = None
if gc:
# force d.__del__ [DB_dealloc] to be called
gc.collect()
#----------------------------------------------------------------------
def suite():
return unittest.makeSuite(DBEnvClosedEarlyCrash)
if __name__ == '__main__':
unittest.main( defaultTest='suite' )
"""
TestCases for checking set_get_returns_none.
"""
import sys, os, string
import tempfile
from pprint import pprint
import unittest
from bsddb import db
from test.test_support import verbose
#----------------------------------------------------------------------
class GetReturnsNoneTestCase(unittest.TestCase):
def setUp(self):
self.filename = tempfile.mktemp()
def tearDown(self):
try:
os.remove(self.filename)
except os.error:
pass
def test01_get_returns_none(self):
d = db.DB()
d.open(self.filename, db.DB_BTREE, db.DB_CREATE)
d.set_get_returns_none(1)
for x in string.letters:
d.put(x, x * 40)
data = d.get('bad key')
assert data == None
data = d.get('a')
assert data == 'a'*40
count = 0
c = d.cursor()
rec = c.first()
while rec:
count = count + 1
rec = c.next()
assert rec == None
assert count == 52
c.close()
d.close()
def test02_get_raises_exception(self):
d = db.DB()
d.open(self.filename, db.DB_BTREE, db.DB_CREATE)
d.set_get_returns_none(0)
for x in string.letters:
d.put(x, x * 40)
self.assertRaises(db.DBNotFoundError, d.get, 'bad key')
self.assertRaises(KeyError, d.get, 'bad key')
data = d.get('a')
assert data == 'a'*40
count = 0
exceptionHappened = 0
c = d.cursor()
rec = c.first()
while rec:
count = count + 1
try:
rec = c.next()
except db.DBNotFoundError: # end of the records
exceptionHappened = 1
break
assert rec != None
assert exceptionHappened
assert count == 52
c.close()
d.close()
#----------------------------------------------------------------------
def suite():
return unittest.makeSuite(GetReturnsNoneTestCase)
if __name__ == '__main__':
unittest.main( defaultTest='suite' )
"""
TestCases for using the DB.join and DBCursor.join_item methods.
"""
import sys, os, string
import tempfile
from pprint import pprint
import unittest
from bsddb import db
from test.test_support import verbose
"""
TestCases for testing the locking sub-system.
"""
import sys, os, string
import tempfile
import time
from pprint import pprint
from whrandom import random
try:
from threading import Thread, currentThread
have_threads = 1
except ImportError:
have_threads = 0
import unittest
from test.test_support import verbose
from bsddb import db
#----------------------------------------------------------------------
class LockingTestCase(unittest.TestCase):
def setUp(self):
homeDir = os.path.join(os.path.dirname(sys.argv[0]), 'db_home')
self.homeDir = homeDir
try: os.mkdir(homeDir)
except os.error: pass
self.env = db.DBEnv()
self.env.open(homeDir, db.DB_THREAD | db.DB_INIT_MPOOL |
db.DB_INIT_LOCK | db.DB_CREATE)
def tearDown(self):
self.env.close()
import glob
files = glob.glob(os.path.join(self.homeDir, '*'))
for file in files:
os.remove(file)
def test01_simple(self):
if verbose:
print '\n', '-=' * 30
print "Running %s.test01_simple..." % self.__class__.__name__
anID = self.env.lock_id()
if verbose:
print "locker ID: %s" % anID
lock = self.env.lock_get(anID, "some locked thing", db.DB_LOCK_WRITE)
if verbose:
print "Aquired lock: %s" % lock
time.sleep(1)
self.env.lock_put(lock)
if verbose:
print "Released lock: %s" % lock
def test02_threaded(self):
if verbose:
print '\n', '-=' * 30
print "Running %s.test02_threaded..." % self.__class__.__name__
threads = []
threads.append(Thread(target = self.theThread, args=(5, db.DB_LOCK_WRITE)))
threads.append(Thread(target = self.theThread, args=(1, db.DB_LOCK_READ)))
threads.append(Thread(target = self.theThread, args=(1, db.DB_LOCK_READ)))
threads.append(Thread(target = self.theThread, args=(1, db.DB_LOCK_WRITE)))
threads.append(Thread(target = self.theThread, args=(1, db.DB_LOCK_READ)))
threads.append(Thread(target = self.theThread, args=(1, db.DB_LOCK_READ)))
threads.append(Thread(target = self.theThread, args=(1, db.DB_LOCK_WRITE)))
threads.append(Thread(target = self.theThread, args=(1, db.DB_LOCK_WRITE)))
threads.append(Thread(target = self.theThread, args=(1, db.DB_LOCK_WRITE)))
for t in threads:
t.start()
for t in threads:
t.join()
def theThread(self, sleepTime, lockType):
name = currentThread().getName()
if lockType == db.DB_LOCK_WRITE:
lt = "write"
else:
lt = "read"
anID = self.env.lock_id()
if verbose:
print "%s: locker ID: %s" % (name, anID)
lock = self.env.lock_get(anID, "some locked thing", lockType)
if verbose:
print "%s: Aquired %s lock: %s" % (name, lt, lock)
time.sleep(sleepTime)
self.env.lock_put(lock)
if verbose:
print "%s: Released %s lock: %s" % (name, lt, lock)
#----------------------------------------------------------------------
def suite():
theSuite = unittest.TestSuite()
if have_threads:
theSuite.addTest(unittest.makeSuite(LockingTestCase))
else:
theSuite.addTest(unittest.makeSuite(LockingTestCase, 'test01'))
return theSuite
if __name__ == '__main__':
unittest.main( defaultTest='suite' )
"""
Misc TestCases
"""
import sys, os, string
import tempfile
from pprint import pprint
import unittest
from bsddb import db
from bsddb import dbshelve
from test.test_support import verbose
#----------------------------------------------------------------------
class MiscTestCase(unittest.TestCase):
def setUp(self):
self.filename = self.__class__.__name__ + '.db'
homeDir = os.path.join(os.path.dirname(sys.argv[0]), 'db_home')
self.homeDir = homeDir
try: os.mkdir(homeDir)
except os.error: pass
def tearDown(self):
try: os.remove(self.filename)
except os.error: pass
import glob
files = glob.glob(os.path.join(self.homeDir, '*'))
for file in files:
os.remove(file)
def test01_badpointer(self):
dbs = dbshelve.open(self.filename)
dbs.close()
self.assertRaises(db.DBError, dbs.get, "foo")
def test02_db_home(self):
env = db.DBEnv()
# check for crash fixed when db_home is used before open()
assert env.db_home is None
env.open(self.homeDir, db.DB_CREATE)
assert self.homeDir == env.db_home
#----------------------------------------------------------------------
def suite():
return unittest.makeSuite(MiscTestCase)
if __name__ == '__main__':
unittest.main( defaultTest='suite' )
"""
TestCases for exercising a Queue DB.
"""
import sys, os, string
import tempfile
from pprint import pprint
import unittest
from bsddb import db
from test.test_support import verbose
#----------------------------------------------------------------------
class SimpleQueueTestCase(unittest.TestCase):
def setUp(self):
self.filename = tempfile.mktemp()
def tearDown(self):
try:
os.remove(self.filename)
except os.error:
pass
def test01_basic(self):
# Basic Queue tests using the deprecated DBCursor.consume method.
if verbose:
print '\n', '-=' * 30
print "Running %s.test01_basic..." % self.__class__.__name__
d = db.DB()
d.set_re_len(40) # Queues must be fixed length
d.open(self.filename, db.DB_QUEUE, db.DB_CREATE)
if verbose:
print "before appends" + '-' * 30
pprint(d.stat())
for x in string.letters:
d.append(x * 40)
assert len(d) == 52
d.put(100, "some more data")
d.put(101, "and some more ")
d.put(75, "out of order")
d.put(1, "replacement data")
assert len(d) == 55
if verbose:
print "before close" + '-' * 30
pprint(d.stat())
d.close()
del d
d = db.DB()
d.open(self.filename)
if verbose:
print "after open" + '-' * 30
pprint(d.stat())
d.append("one more")
c = d.cursor()
if verbose:
print "after append" + '-' * 30
pprint(d.stat())
rec = c.consume()
while rec:
if verbose:
print rec
rec = c.consume()
c.close()
if verbose:
print "after consume loop" + '-' * 30
pprint(d.stat())
assert len(d) == 0, \
"if you see this message then you need to rebuild BerkeleyDB 3.1.17 "\
"with the patch in patches/qam_stat.diff"
d.close()
def test02_basicPost32(self):
# Basic Queue tests using the new DB.consume method in DB 3.2+
# (No cursor needed)
if verbose:
print '\n', '-=' * 30
print "Running %s.test02_basicPost32..." % self.__class__.__name__
if db.version() < (3, 2, 0):
if verbose:
print "Test not run, DB not new enough..."
return
d = db.DB()
d.set_re_len(40) # Queues must be fixed length
d.open(self.filename, db.DB_QUEUE, db.DB_CREATE)
if verbose:
print "before appends" + '-' * 30
pprint(d.stat())
for x in string.letters:
d.append(x * 40)
assert len(d) == 52
d.put(100, "some more data")
d.put(101, "and some more ")
d.put(75, "out of order")
d.put(1, "replacement data")
assert len(d) == 55
if verbose:
print "before close" + '-' * 30
pprint(d.stat())
d.close()
del d
d = db.DB()
d.open(self.filename)
#d.set_get_returns_none(true)
if verbose:
print "after open" + '-' * 30
pprint(d.stat())
d.append("one more")
if verbose:
print "after append" + '-' * 30
pprint(d.stat())
rec = d.consume()
while rec:
if verbose:
print rec
rec = d.consume()
if verbose:
print "after consume loop" + '-' * 30
pprint(d.stat())
d.close()
#----------------------------------------------------------------------
def suite():
return unittest.makeSuite(SimpleQueueTestCase)
if __name__ == '__main__':
unittest.main( defaultTest='suite' )
"""
TestCases for exercising a Recno DB.
"""
import sys, os, string
import tempfile
from pprint import pprint
import unittest
from bsddb import db
from test.test_support import verbose
#----------------------------------------------------------------------
class SimpleRecnoTestCase(unittest.TestCase):
def setUp(self):
self.filename = tempfile.mktemp()
def tearDown(self):
try:
os.remove(self.filename)
except os.error:
pass
def test01_basic(self):
d = db.DB()
d.open(self.filename, db.DB_RECNO, db.DB_CREATE)
for x in string.letters:
recno = d.append(x * 60)
assert type(recno) == type(0)
assert recno >= 1
if verbose:
print recno,
if verbose: print
stat = d.stat()
if verbose:
pprint(stat)
for recno in range(1, len(d)+1):
data = d[recno]
if verbose:
print data
assert type(data) == type("")
assert data == d.get(recno)
try:
data = d[0] # This should raise a KeyError!?!?!
except db.DBInvalidArgError, val:
assert val[0] == db.EINVAL
if verbose: print val
else:
self.fail("expected exception")
try:
data = d[100]
except KeyError:
pass
else:
self.fail("expected exception")
data = d.get(100)
assert data == None
keys = d.keys()
if verbose:
print keys
assert type(keys) == type([])
assert type(keys[0]) == type(123)
assert len(keys) == len(d)
items = d.items()
if verbose:
pprint(items)
assert type(items) == type([])
assert type(items[0]) == type(())
assert len(items[0]) == 2
assert type(items[0][0]) == type(123)
assert type(items[0][1]) == type("")
assert len(items) == len(d)
assert d.has_key(25)
del d[25]
assert not d.has_key(25)
d.delete(13)
assert not d.has_key(13)
data = d.get_both(26, "z" * 60)
assert data == "z" * 60
if verbose:
print data
fd = d.fd()
if verbose:
print fd
c = d.cursor()
rec = c.first()
while rec:
if verbose:
print rec
rec = c.next()
c.set(50)
rec = c.current()
if verbose:
print rec
c.put(-1, "a replacement record", db.DB_CURRENT)
c.set(50)
rec = c.current()
assert rec == (50, "a replacement record")
if verbose:
print rec
rec = c.set_range(30)
if verbose:
print rec
c.close()
d.close()
d = db.DB()
d.open(self.filename)
c = d.cursor()
# put a record beyond the consecutive end of the recno's
d[100] = "way out there"
assert d[100] == "way out there"
try:
data = d[99]
except KeyError:
pass
else:
self.fail("expected exception")
try:
d.get(99)
except db.DBKeyEmptyError, val:
assert val[0] == db.DB_KEYEMPTY
if verbose: print val
else:
self.fail("expected exception")
rec = c.set(40)
while rec:
if verbose:
print rec
rec = c.next()
c.close()
d.close()
def test02_WithSource(self):
"""
A Recno file that is given a "backing source file" is essentially a simple ASCII
file. Normally each record is delimited by \n and so is just a line in the file,
but you can set a different record delimiter if needed.
"""
source = os.path.join(os.path.dirname(sys.argv[0]), 'db_home/test_recno.txt')
f = open(source, 'w') # create the file
f.close()
d = db.DB()
d.set_re_delim(0x0A) # This is the default value, just checking if both int
d.set_re_delim('\n') # and char can be used...
d.set_re_source(source)
d.open(self.filename, db.DB_RECNO, db.DB_CREATE)
data = string.split("The quick brown fox jumped over the lazy dog")
for datum in data:
d.append(datum)
d.sync()
d.close()
# get the text from the backing source
text = open(source, 'r').read()
text = string.strip(text)
if verbose:
print text
print data
print string.split(text, '\n')
assert string.split(text, '\n') == data
# open as a DB again
d = db.DB()
d.set_re_source(source)
d.open(self.filename, db.DB_RECNO)
d[3] = 'reddish-brown'
d[8] = 'comatose'
d.sync()
d.close()
text = open(source, 'r').read()
text = string.strip(text)
if verbose:
print text
print string.split(text, '\n')
assert string.split(text, '\n') == string.split("The quick reddish-brown fox jumped over the comatose dog")
def test03_FixedLength(self):
d = db.DB()
d.set_re_len(40) # fixed length records, 40 bytes long
d.set_re_pad('-') # sets the pad character...
d.set_re_pad(45) # ...test both int and char
d.open(self.filename, db.DB_RECNO, db.DB_CREATE)
for x in string.letters:
d.append(x * 35) # These will be padded
d.append('.' * 40) # this one will be exact
try: # this one will fail
d.append('bad' * 20)
except db.DBInvalidArgError, val:
assert val[0] == db.EINVAL
if verbose: print val
else:
self.fail("expected exception")
c = d.cursor()
rec = c.first()
while rec:
if verbose:
print rec
rec = c.next()
c.close()
d.close()
#----------------------------------------------------------------------
def suite():
return unittest.makeSuite(SimpleRecnoTestCase)
if __name__ == '__main__':
unittest.main( defaultTest='suite' )
This diff is collapsed.
......@@ -55,6 +55,9 @@ resources to test. Currently only the following are defined:
network - It is okay to run tests that use external network
resource, e.g. testing SSL support for sockets.
bsddb - It is okay to run the bsddb testsuite, which takes
a long time to complete.
"""
import sys
......@@ -78,7 +81,7 @@ if sys.maxint > 0x7fffffff:
from test import test_support
RESOURCE_NAMES = ('curses', 'largefile', 'network')
RESOURCE_NAMES = ('curses', 'largefile', 'network', 'bsddb')
def usage(code, msg=''):
......
# Test driver for bsddb package.
"""
Run all test cases.
"""
import sys
import unittest
from test.test_support import requires, verbose, run_suite
requires('bsddb')
verbose = 0
if 'verbose' in sys.argv:
verbose = 1
sys.argv.remove('verbose')
if 'silent' in sys.argv: # take care of old flag, just in case
verbose = 0
sys.argv.remove('silent')
def suite():
test_modules = [ 'test_compat',
'test_basics',
'test_misc',
'test_dbobj',
'test_recno',
'test_queue',
'test_get_none',
'test_dbshelve',
'test_dbtables',
'test_thread',
'test_lock',
'test_associate',
]
alltests = unittest.TestSuite()
for name in test_modules:
module = __import__("bsddb.test."+name, globals(), locals(), name)
print module,name
alltests.addTest(module.suite())
return alltests
# For invocation through regrtest
def test_main():
tests = suite()
run_suite(tests)
# For invocation as a script
if __name__ == '__main__':
from bsddb import db
print '-=' * 38
print db.DB_VERSION_STRING
print 'bsddb3.db.version(): %s' % (db.version(), )
print 'bsddb3.db.__version__: %s' % db.__version__
print 'bsddb3.db.cvsid: %s' % db.cvsid
print 'python version: %s' % sys.version
print '-=' * 38
unittest.main( defaultTest='suite' )
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment