Skip to content
Projeler
Gruplar
Parçacıklar
Yardım
Yükleniyor...
Oturum aç / Kaydol
Gezinmeyi değiştir
C
cpython
Proje
Proje
Ayrıntılar
Etkinlik
Cycle Analytics
Depo (repository)
Depo (repository)
Dosyalar
Kayıtlar (commit)
Dallar (branch)
Etiketler
Katkıda bulunanlar
Grafik
Karşılaştır
Grafikler
Konular (issue)
0
Konular (issue)
0
Liste
Pano
Etiketler
Kilometre Taşları
Birleştirme (merge) Talepleri
0
Birleştirme (merge) Talepleri
0
CI / CD
CI / CD
İş akışları (pipeline)
İşler
Zamanlamalar
Grafikler
Paketler
Paketler
Wiki
Wiki
Parçacıklar
Parçacıklar
Üyeler
Üyeler
Collapse sidebar
Close sidebar
Etkinlik
Grafik
Grafikler
Yeni bir konu (issue) oluştur
İşler
Kayıtlar (commit)
Konu (issue) Panoları
Kenar çubuğunu aç
Batuhan Osman TASKAYA
cpython
Commits
18eb1fa2
Kaydet (Commit)
18eb1fa2
authored
May 13, 2008
tarafından
Jesus Cea
Dosyalara gözat
Seçenekler
Dosyalara Gözat
İndir
Eposta Yamaları
Sade Fark
Testsuite for bsddb module, version 4.6.4
üst
cb33aeaf
Expand all
Hide whitespace changes
Inline
Side-by-side
Showing
25 changed files
with
931 additions
and
411 deletions
+931
-411
__init__.py
Lib/bsddb/__init__.py
+6
-6
dbshelve.py
Lib/bsddb/dbshelve.py
+14
-5
test_1413192.py
Lib/bsddb/test/test_1413192.py
+0
-48
test_all.py
Lib/bsddb/test/test_all.py
+55
-1
test_associate.py
Lib/bsddb/test/test_associate.py
+21
-39
test_basics.py
Lib/bsddb/test/test_basics.py
+0
-0
test_compare.py
Lib/bsddb/test/test_compare.py
+8
-13
test_compat.py
Lib/bsddb/test/test_compat.py
+6
-7
test_cursor_pget_bug.py
Lib/bsddb/test/test_cursor_pget_bug.py
+3
-7
test_dbobj.py
Lib/bsddb/test/test_dbobj.py
+10
-14
test_dbshelve.py
Lib/bsddb/test/test_dbshelve.py
+38
-36
test_dbtables.py
Lib/bsddb/test/test_dbtables.py
+4
-10
test_distributed_transactions.py
Lib/bsddb/test/test_distributed_transactions.py
+170
-0
test_early_close.py
Lib/bsddb/test/test_early_close.py
+207
-0
test_env_close.py
Lib/bsddb/test/test_env_close.py
+0
-107
test_get_none.py
Lib/bsddb/test/test_get_none.py
+10
-11
test_join.py
Lib/bsddb/test/test_join.py
+6
-13
test_lock.py
Lib/bsddb/test/test_lock.py
+66
-28
test_misc.py
Lib/bsddb/test/test_misc.py
+6
-10
test_pickle.py
Lib/bsddb/test/test_pickle.py
+4
-7
test_queue.py
Lib/bsddb/test/test_queue.py
+8
-9
test_recno.py
Lib/bsddb/test/test_recno.py
+35
-31
test_replication.py
Lib/bsddb/test/test_replication.py
+199
-0
test_sequence.py
Lib/bsddb/test/test_sequence.py
+55
-9
test_thread.py
Lib/bsddb/test/test_thread.py
+0
-0
No files found.
Lib/bsddb/__init__.py
Dosyayı görüntüle @
18eb1fa2
...
@@ -33,10 +33,10 @@
...
@@ -33,10 +33,10 @@
#----------------------------------------------------------------------
#----------------------------------------------------------------------
"""Support for Berkeley
DB 3.3 through 4.4
with a simple interface.
"""Support for Berkeley
DB 3.3 through 4.6
with a simple interface.
For the full featured object oriented interface use the bsddb.db module
For the full featured object oriented interface use the bsddb.db module
instead. It mirrors the
Sleepycat Berkeley
DB C API.
instead. It mirrors the
Oracle Berkeley
DB C API.
"""
"""
try
:
try
:
...
@@ -188,7 +188,7 @@ class _DBWithCursor(_iter_mixin):
...
@@ -188,7 +188,7 @@ class _DBWithCursor(_iter_mixin):
self
.
saved_dbc_key
=
None
self
.
saved_dbc_key
=
None
# This method is needed for all non-cursor DB calls to avoid
# This method is needed for all non-cursor DB calls to avoid
# BerkeleyDB deadlocks (due to being opened with DB_INIT_LOCK
# Berkeley
DB deadlocks (due to being opened with DB_INIT_LOCK
# and DB_THREAD to be thread safe) when intermixing database
# and DB_THREAD to be thread safe) when intermixing database
# operations that use the cursor internally with those that don't.
# operations that use the cursor internally with those that don't.
def
_closeCursors
(
self
,
save
=
1
):
def
_closeCursors
(
self
,
save
=
1
):
...
@@ -372,7 +372,7 @@ def _checkflag(flag, file):
...
@@ -372,7 +372,7 @@ def _checkflag(flag, file):
elif
flag
==
'n'
:
elif
flag
==
'n'
:
flags
=
db
.
DB_CREATE
flags
=
db
.
DB_CREATE
#flags = db.DB_CREATE | db.DB_TRUNCATE
#flags = db.DB_CREATE | db.DB_TRUNCATE
# we used db.DB_TRUNCATE flag for this before but BerkeleyDB
# we used db.DB_TRUNCATE flag for this before but Berkeley
DB
# 4.2.52 changed to disallowed truncate with txn environments.
# 4.2.52 changed to disallowed truncate with txn environments.
if
file
is
not
None
and
os
.
path
.
isfile
(
file
):
if
file
is
not
None
and
os
.
path
.
isfile
(
file
):
os
.
unlink
(
file
)
os
.
unlink
(
file
)
...
@@ -385,10 +385,10 @@ def _checkflag(flag, file):
...
@@ -385,10 +385,10 @@ def _checkflag(flag, file):
# This is a silly little hack that allows apps to continue to use the
# This is a silly little hack that allows apps to continue to use the
# DB_THREAD flag even on systems without threads without freaking out
# DB_THREAD flag even on systems without threads without freaking out
# BerkeleyDB.
# Berkeley
DB.
#
#
# This assumes that if Python was built with thread support then
# This assumes that if Python was built with thread support then
# BerkeleyDB was too.
# Berkeley
DB was too.
try
:
try
:
import
thread
import
thread
...
...
Lib/bsddb/dbshelve.py
Dosyayı görüntüle @
18eb1fa2
...
@@ -37,9 +37,18 @@ import sys
...
@@ -37,9 +37,18 @@ import sys
#DictMixin was added
#DictMixin was added
if
sys
.
version_info
[:
3
]
>=
(
2
,
3
,
0
):
if
sys
.
version_info
[:
3
]
>=
(
2
,
3
,
0
):
HIGHEST_PROTOCOL
=
cPickle
.
HIGHEST_PROTOCOL
HIGHEST_PROTOCOL
=
cPickle
.
HIGHEST_PROTOCOL
def
_dumps
(
object
,
protocol
):
# In python 2.3.*, "cPickle.dumps" accepts no
return
cPickle
.
dumps
(
object
,
protocol
=
protocol
)
# named parameters. "pickle.dumps" accepts them,
# so this seems a bug.
if
sys
.
version_info
[:
3
]
<
(
2
,
4
,
0
):
def
_dumps
(
object
,
protocol
):
return
cPickle
.
dumps
(
object
,
protocol
)
else
:
def
_dumps
(
object
,
protocol
):
return
cPickle
.
dumps
(
object
,
protocol
=
protocol
)
from
UserDict
import
DictMixin
from
UserDict
import
DictMixin
else
:
else
:
HIGHEST_PROTOCOL
=
None
HIGHEST_PROTOCOL
=
None
def
_dumps
(
object
,
protocol
):
def
_dumps
(
object
,
protocol
):
...
@@ -133,7 +142,7 @@ class DBShelf(DictMixin):
...
@@ -133,7 +142,7 @@ class DBShelf(DictMixin):
def
keys
(
self
,
txn
=
None
):
def
keys
(
self
,
txn
=
None
):
if
txn
is
not
None
:
if
txn
!=
None
:
return
self
.
db
.
keys
(
txn
)
return
self
.
db
.
keys
(
txn
)
else
:
else
:
return
self
.
db
.
keys
()
return
self
.
db
.
keys
()
...
@@ -157,7 +166,7 @@ class DBShelf(DictMixin):
...
@@ -157,7 +166,7 @@ class DBShelf(DictMixin):
def
items
(
self
,
txn
=
None
):
def
items
(
self
,
txn
=
None
):
if
txn
is
not
None
:
if
txn
!=
None
:
items
=
self
.
db
.
items
(
txn
)
items
=
self
.
db
.
items
(
txn
)
else
:
else
:
items
=
self
.
db
.
items
()
items
=
self
.
db
.
items
()
...
@@ -168,7 +177,7 @@ class DBShelf(DictMixin):
...
@@ -168,7 +177,7 @@ class DBShelf(DictMixin):
return
newitems
return
newitems
def
values
(
self
,
txn
=
None
):
def
values
(
self
,
txn
=
None
):
if
txn
is
not
None
:
if
txn
!=
None
:
values
=
self
.
db
.
values
(
txn
)
values
=
self
.
db
.
values
(
txn
)
else
:
else
:
values
=
self
.
db
.
values
()
values
=
self
.
db
.
values
()
...
...
Lib/bsddb/test/test_1413192.py
deleted
100644 → 0
Dosyayı görüntüle @
cb33aeaf
# http://bugs.python.org/issue1413192
#
# See the bug report for details.
# The problem was that the env was deallocated prior to the txn.
import
shutil
import
tempfile
from
test.test_support
import
catch_warning
import
warnings
try
:
# For Pythons w/distutils and add-on pybsddb
from
bsddb3
import
db
except
ImportError
:
# For Python >= 2.3 builtin bsddb distribution
from
bsddb
import
db
env_name
=
tempfile
.
mkdtemp
()
# Wrap test operation in a class so we can control destruction rather than
# waiting for the controlling Python executable to exit
class
Context
:
def
__init__
(
self
):
self
.
env
=
db
.
DBEnv
()
self
.
env
.
open
(
env_name
,
db
.
DB_CREATE
|
db
.
DB_INIT_TXN
|
db
.
DB_INIT_MPOOL
)
self
.
the_txn
=
self
.
env
.
txn_begin
()
self
.
map
=
db
.
DB
(
self
.
env
)
self
.
map
.
open
(
'xxx.db'
,
"p"
,
db
.
DB_HASH
,
db
.
DB_CREATE
,
0666
,
txn
=
self
.
the_txn
)
del
self
.
env
del
self
.
the_txn
with
catch_warning
():
warnings
.
filterwarnings
(
'ignore'
,
'DBTxn aborted in destructor'
)
context
=
Context
()
del
context
# try not to leave a turd
try
:
shutil
.
rmtree
(
env_name
)
except
EnvironmentError
:
pass
Lib/bsddb/test/test_all.py
Dosyayı görüntüle @
18eb1fa2
...
@@ -11,6 +11,11 @@ except ImportError:
...
@@ -11,6 +11,11 @@ except ImportError:
# For Python 2.3
# For Python 2.3
from
bsddb
import
db
from
bsddb
import
db
try
:
from
bsddb3
import
test_support
except
ImportError
:
from
test
import
test_support
verbose
=
0
verbose
=
0
if
'verbose'
in
sys
.
argv
:
if
'verbose'
in
sys
.
argv
:
verbose
=
1
verbose
=
1
...
@@ -33,6 +38,53 @@ def print_versions():
...
@@ -33,6 +38,53 @@ def print_versions():
print
'-='
*
38
print
'-='
*
38
def
get_new_path
(
name
)
:
get_new_path
.
mutex
.
acquire
()
try
:
import
os
path
=
os
.
path
.
join
(
get_new_path
.
prefix
,
name
+
"_"
+
str
(
os
.
getpid
())
+
"_"
+
str
(
get_new_path
.
num
))
get_new_path
.
num
+=
1
finally
:
get_new_path
.
mutex
.
release
()
return
path
def
get_new_environment_path
()
:
path
=
get_new_path
(
"environment"
)
import
os
try
:
os
.
makedirs
(
path
,
mode
=
0700
)
except
os
.
error
:
test_support
.
rmtree
(
path
)
os
.
makedirs
(
path
)
return
path
def
get_new_database_path
()
:
path
=
get_new_path
(
"database"
)
import
os
if
os
.
path
.
exists
(
path
)
:
os
.
remove
(
path
)
return
path
get_new_path
.
prefix
=
"/tmp/z-Berkeley_DB"
get_new_path
.
num
=
0
try
:
import
threading
get_new_path
.
mutex
=
threading
.
Lock
()
del
threading
except
ImportError
:
class
Lock
(
object
)
:
def
acquire
(
self
)
:
pass
def
release
(
self
)
:
pass
get_new_path
.
mutex
=
Lock
()
del
Lock
class
PrintInfoFakeTest
(
unittest
.
TestCase
):
class
PrintInfoFakeTest
(
unittest
.
TestCase
):
def
testPrintVersions
(
self
):
def
testPrintVersions
(
self
):
print_versions
()
print_versions
()
...
@@ -60,7 +112,9 @@ def suite():
...
@@ -60,7 +112,9 @@ def suite():
'test_dbobj'
,
'test_dbobj'
,
'test_dbshelve'
,
'test_dbshelve'
,
'test_dbtables'
,
'test_dbtables'
,
'test_env_close'
,
'test_early_close'
,
'test_distributed_transactions'
,
'test_replication'
,
'test_get_none'
,
'test_get_none'
,
'test_join'
,
'test_join'
,
'test_lock'
,
'test_lock'
,
...
...
Lib/bsddb/test/test_associate.py
Dosyayı görüntüle @
18eb1fa2
...
@@ -3,7 +3,6 @@ TestCases for DB.associate.
...
@@ -3,7 +3,6 @@ TestCases for DB.associate.
"""
"""
import
sys
,
os
,
string
import
sys
,
os
,
string
import
tempfile
import
time
import
time
from
pprint
import
pprint
from
pprint
import
pprint
...
@@ -14,7 +13,7 @@ except ImportError:
...
@@ -14,7 +13,7 @@ except ImportError:
have_threads
=
0
have_threads
=
0
import
unittest
import
unittest
from
test_all
import
verbose
from
test_all
import
verbose
,
get_new_environment_path
try
:
try
:
# For Pythons w/distutils pybsddb
# For Pythons w/distutils pybsddb
...
@@ -96,17 +95,9 @@ musicdata = {
...
@@ -96,17 +95,9 @@ musicdata = {
class
AssociateErrorTestCase
(
unittest
.
TestCase
):
class
AssociateErrorTestCase
(
unittest
.
TestCase
):
def
setUp
(
self
):
def
setUp
(
self
):
self
.
filename
=
self
.
__class__
.
__name__
+
'.db'
self
.
filename
=
self
.
__class__
.
__name__
+
'.db'
homeDir
=
os
.
path
.
join
(
tempfile
.
gettempdir
(),
'db_home
%
d'
%
os
.
getpid
())
self
.
homeDir
=
get_new_environment_path
()
self
.
homeDir
=
homeDir
try
:
os
.
mkdir
(
homeDir
)
except
os
.
error
:
import
glob
files
=
glob
.
glob
(
os
.
path
.
join
(
self
.
homeDir
,
'*'
))
for
file
in
files
:
os
.
remove
(
file
)
self
.
env
=
db
.
DBEnv
()
self
.
env
=
db
.
DBEnv
()
self
.
env
.
open
(
homeDir
,
db
.
DB_CREATE
|
db
.
DB_INIT_MPOOL
)
self
.
env
.
open
(
self
.
homeDir
,
db
.
DB_CREATE
|
db
.
DB_INIT_MPOOL
)
def
tearDown
(
self
):
def
tearDown
(
self
):
self
.
env
.
close
()
self
.
env
.
close
()
...
@@ -127,7 +118,7 @@ class AssociateErrorTestCase(unittest.TestCase):
...
@@ -127,7 +118,7 @@ class AssociateErrorTestCase(unittest.TestCase):
secDB
.
open
(
self
.
filename
,
"secondary"
,
db
.
DB_BTREE
,
db
.
DB_CREATE
)
secDB
.
open
(
self
.
filename
,
"secondary"
,
db
.
DB_BTREE
,
db
.
DB_CREATE
)
# dupDB has been configured to allow duplicates, it can't
# dupDB has been configured to allow duplicates, it can't
# associate with a secondary. BerkeleyDB will return an error.
# associate with a secondary. Berkeley
DB will return an error.
try
:
try
:
def
f
(
a
,
b
):
return
a
+
b
def
f
(
a
,
b
):
return
a
+
b
dupDB
.
associate
(
secDB
,
f
)
dupDB
.
associate
(
secDB
,
f
)
...
@@ -152,27 +143,16 @@ class AssociateTestCase(unittest.TestCase):
...
@@ -152,27 +143,16 @@ class AssociateTestCase(unittest.TestCase):
def
setUp
(
self
):
def
setUp
(
self
):
self
.
filename
=
self
.
__class__
.
__name__
+
'.db'
self
.
filename
=
self
.
__class__
.
__name__
+
'.db'
homeDir
=
os
.
path
.
join
(
tempfile
.
gettempdir
(),
'db_home
%
d'
%
os
.
getpid
())
self
.
homeDir
=
get_new_environment_path
()
self
.
homeDir
=
homeDir
try
:
os
.
mkdir
(
homeDir
)
except
os
.
error
:
import
glob
files
=
glob
.
glob
(
os
.
path
.
join
(
self
.
homeDir
,
'*'
))
for
file
in
files
:
os
.
remove
(
file
)
self
.
env
=
db
.
DBEnv
()
self
.
env
=
db
.
DBEnv
()
self
.
env
.
open
(
homeDir
,
db
.
DB_CREATE
|
db
.
DB_INIT_MPOOL
|
self
.
env
.
open
(
self
.
homeDir
,
db
.
DB_CREATE
|
db
.
DB_INIT_MPOOL
|
db
.
DB_INIT_LOCK
|
db
.
DB_THREAD
|
self
.
envFlags
)
db
.
DB_INIT_LOCK
|
db
.
DB_THREAD
|
self
.
envFlags
)
def
tearDown
(
self
):
def
tearDown
(
self
):
self
.
closeDB
()
self
.
closeDB
()
self
.
env
.
close
()
self
.
env
.
close
()
self
.
env
=
None
self
.
env
=
None
import
glob
test_support
.
rmtree
(
self
.
homeDir
)
files
=
glob
.
glob
(
os
.
path
.
join
(
self
.
homeDir
,
'*'
))
for
file
in
files
:
os
.
remove
(
file
)
def
addDataToDB
(
self
,
d
,
txn
=
None
):
def
addDataToDB
(
self
,
d
,
txn
=
None
):
for
key
,
value
in
musicdata
.
items
():
for
key
,
value
in
musicdata
.
items
():
...
@@ -249,10 +229,10 @@ class AssociateTestCase(unittest.TestCase):
...
@@ -249,10 +229,10 @@ class AssociateTestCase(unittest.TestCase):
def
finish_test
(
self
,
secDB
,
txn
=
None
):
def
finish_test
(
self
,
secDB
,
txn
=
None
):
# 'Blues' should not be in the secondary database
# 'Blues' should not be in the secondary database
vals
=
secDB
.
pget
(
'Blues'
,
txn
=
txn
)
vals
=
secDB
.
pget
(
'Blues'
,
txn
=
txn
)
assert
vals
==
None
,
vals
self
.
assertEqual
(
vals
,
None
,
vals
)
vals
=
secDB
.
pget
(
'Unknown'
,
txn
=
txn
)
vals
=
secDB
.
pget
(
'Unknown'
,
txn
=
txn
)
assert
vals
[
0
]
==
99
or
vals
[
0
]
==
'99'
,
vals
self
.
assert_
(
vals
[
0
]
==
99
or
vals
[
0
]
==
'99'
,
vals
)
vals
[
1
]
.
index
(
'Unknown'
)
vals
[
1
]
.
index
(
'Unknown'
)
vals
[
1
]
.
index
(
'Unnamed'
)
vals
[
1
]
.
index
(
'Unnamed'
)
vals
[
1
]
.
index
(
'unknown'
)
vals
[
1
]
.
index
(
'unknown'
)
...
@@ -264,14 +244,14 @@ class AssociateTestCase(unittest.TestCase):
...
@@ -264,14 +244,14 @@ class AssociateTestCase(unittest.TestCase):
rec
=
self
.
cur
.
first
()
rec
=
self
.
cur
.
first
()
while
rec
is
not
None
:
while
rec
is
not
None
:
if
type
(
self
.
keytype
)
==
type
(
''
):
if
type
(
self
.
keytype
)
==
type
(
''
):
assert
string
.
atoi
(
rec
[
0
]
)
# for primary db, key is a number
self
.
assert_
(
string
.
atoi
(
rec
[
0
])
)
# for primary db, key is a number
else
:
else
:
assert
rec
[
0
]
and
type
(
rec
[
0
])
==
type
(
0
)
self
.
assert_
(
rec
[
0
]
and
type
(
rec
[
0
])
==
type
(
0
)
)
count
=
count
+
1
count
=
count
+
1
if
verbose
:
if
verbose
:
print
rec
print
rec
rec
=
self
.
cur
.
next
()
rec
=
self
.
cur
.
next
()
assert
count
==
len
(
musicdata
)
# all items accounted for
self
.
assertEqual
(
count
,
len
(
musicdata
))
# all items accounted for
if
verbose
:
if
verbose
:
...
@@ -281,29 +261,29 @@ class AssociateTestCase(unittest.TestCase):
...
@@ -281,29 +261,29 @@ class AssociateTestCase(unittest.TestCase):
# test cursor pget
# test cursor pget
vals
=
self
.
cur
.
pget
(
'Unknown'
,
flags
=
db
.
DB_LAST
)
vals
=
self
.
cur
.
pget
(
'Unknown'
,
flags
=
db
.
DB_LAST
)
assert
vals
[
1
]
==
99
or
vals
[
1
]
==
'99'
,
vals
self
.
assert_
(
vals
[
1
]
==
99
or
vals
[
1
]
==
'99'
,
vals
)
assert
vals
[
0
]
==
'Unknown'
self
.
assertEqual
(
vals
[
0
],
'Unknown'
)
vals
[
2
]
.
index
(
'Unknown'
)
vals
[
2
]
.
index
(
'Unknown'
)
vals
[
2
]
.
index
(
'Unnamed'
)
vals
[
2
]
.
index
(
'Unnamed'
)
vals
[
2
]
.
index
(
'unknown'
)
vals
[
2
]
.
index
(
'unknown'
)
vals
=
self
.
cur
.
pget
(
'Unknown'
,
data
=
'wrong value'
,
flags
=
db
.
DB_GET_BOTH
)
vals
=
self
.
cur
.
pget
(
'Unknown'
,
data
=
'wrong value'
,
flags
=
db
.
DB_GET_BOTH
)
assert
vals
==
None
,
vals
self
.
assertEqual
(
vals
,
None
,
vals
)
rec
=
self
.
cur
.
first
()
rec
=
self
.
cur
.
first
()
assert
rec
[
0
]
==
"Jazz"
self
.
assertEqual
(
rec
[
0
],
"Jazz"
)
while
rec
is
not
None
:
while
rec
is
not
None
:
count
=
count
+
1
count
=
count
+
1
if
verbose
:
if
verbose
:
print
rec
print
rec
rec
=
self
.
cur
.
next
()
rec
=
self
.
cur
.
next
()
# all items accounted for EXCEPT for 1 with "Blues" genre
# all items accounted for EXCEPT for 1 with "Blues" genre
assert
count
==
len
(
musicdata
)
-
1
self
.
assertEqual
(
count
,
len
(
musicdata
)
-
1
)
self
.
cur
=
None
self
.
cur
=
None
def
getGenre
(
self
,
priKey
,
priData
):
def
getGenre
(
self
,
priKey
,
priData
):
assert
type
(
priData
)
==
type
(
""
)
self
.
assertEqual
(
type
(
priData
),
type
(
""
)
)
if
verbose
:
if
verbose
:
print
'getGenre key:
%
r data:
%
r'
%
(
priKey
,
priData
)
print
'getGenre key:
%
r data:
%
r'
%
(
priKey
,
priData
)
genre
=
string
.
split
(
priData
,
'|'
)[
2
]
genre
=
string
.
split
(
priData
,
'|'
)[
2
]
...
@@ -388,7 +368,7 @@ class ShelveAssociateTestCase(AssociateTestCase):
...
@@ -388,7 +368,7 @@ class ShelveAssociateTestCase(AssociateTestCase):
def
getGenre
(
self
,
priKey
,
priData
):
def
getGenre
(
self
,
priKey
,
priData
):
assert
type
(
priData
)
==
type
((
))
self
.
assertEqual
(
type
(
priData
),
type
(()
))
if
verbose
:
if
verbose
:
print
'getGenre key:
%
r data:
%
r'
%
(
priKey
,
priData
)
print
'getGenre key:
%
r data:
%
r'
%
(
priKey
,
priData
)
genre
=
priData
[
2
]
genre
=
priData
[
2
]
...
@@ -419,6 +399,8 @@ class ThreadedAssociateTestCase(AssociateTestCase):
...
@@ -419,6 +399,8 @@ class ThreadedAssociateTestCase(AssociateTestCase):
t2
=
Thread
(
target
=
self
.
writer2
,
t2
=
Thread
(
target
=
self
.
writer2
,
args
=
(
d
,
))
args
=
(
d
,
))
t1
.
setDaemon
(
True
)
t2
.
setDaemon
(
True
)
t1
.
start
()
t1
.
start
()
t2
.
start
()
t2
.
start
()
t1
.
join
()
t1
.
join
()
...
...
Lib/bsddb/test/test_basics.py
Dosyayı görüntüle @
18eb1fa2
This diff is collapsed.
Click to expand it.
Lib/bsddb/test/test_compare.py
Dosyayı görüntüle @
18eb1fa2
...
@@ -4,7 +4,6 @@ TestCases for python DB Btree key comparison function.
...
@@ -4,7 +4,6 @@ TestCases for python DB Btree key comparison function.
import
sys
,
os
,
re
import
sys
,
os
,
re
import
test_all
import
test_all
import
tempfile
from
cStringIO
import
StringIO
from
cStringIO
import
StringIO
import
unittest
import
unittest
...
@@ -15,6 +14,8 @@ except ImportError:
...
@@ -15,6 +14,8 @@ except ImportError:
# For Python 2.3
# For Python 2.3
from
bsddb
import
db
,
dbshelve
from
bsddb
import
db
,
dbshelve
from
test_all
import
get_new_environment_path
,
get_new_database_path
try
:
try
:
from
bsddb3
import
test_support
from
bsddb3
import
test_support
except
ImportError
:
except
ImportError
:
...
@@ -57,23 +58,17 @@ class AbstractBtreeKeyCompareTestCase (unittest.TestCase):
...
@@ -57,23 +58,17 @@ class AbstractBtreeKeyCompareTestCase (unittest.TestCase):
def
setUp
(
self
):
def
setUp
(
self
):
self
.
filename
=
self
.
__class__
.
__name__
+
'.db'
self
.
filename
=
self
.
__class__
.
__name__
+
'.db'
homeDir
=
os
.
path
.
join
(
tempfile
.
gettempdir
(),
'db_home
%
d'
%
os
.
getpid
())
self
.
homeDir
=
get_new_environment_path
()
self
.
homeDir
=
homeDir
env
=
db
.
DBEnv
()
try
:
env
.
open
(
self
.
homeDir
,
os
.
mkdir
(
homeDir
)
except
os
.
error
:
pass
env
=
db
.
DBEnv
()
env
.
open
(
homeDir
,
db
.
DB_CREATE
|
db
.
DB_INIT_MPOOL
db
.
DB_CREATE
|
db
.
DB_INIT_MPOOL
|
db
.
DB_INIT_LOCK
|
db
.
DB_THREAD
)
|
db
.
DB_INIT_LOCK
|
db
.
DB_THREAD
)
self
.
env
=
env
self
.
env
=
env
def
tearDown
(
self
):
def
tearDown
(
self
):
self
.
closeDB
()
self
.
closeDB
()
if
self
.
env
is
not
None
:
if
self
.
env
is
not
None
:
self
.
env
.
close
()
self
.
env
.
close
()
self
.
env
=
None
self
.
env
=
None
test_support
.
rmtree
(
self
.
homeDir
)
test_support
.
rmtree
(
self
.
homeDir
)
...
@@ -236,7 +231,7 @@ class BtreeExceptionsTestCase (AbstractBtreeKeyCompareTestCase):
...
@@ -236,7 +231,7 @@ class BtreeExceptionsTestCase (AbstractBtreeKeyCompareTestCase):
self
.
createDB
(
my_compare
)
self
.
createDB
(
my_compare
)
try
:
try
:
self
.
db
.
set_bt_compare
(
my_compare
)
self
.
db
.
set_bt_compare
(
my_compare
)
assert
False
,
"this set should fail"
self
.
assert_
(
0
,
"this set should fail"
)
except
RuntimeError
,
msg
:
except
RuntimeError
,
msg
:
pass
pass
...
...
Lib/bsddb/test/test_compat.py
Dosyayı görüntüle @
18eb1fa2
...
@@ -5,9 +5,9 @@ regression test suite.
...
@@ -5,9 +5,9 @@ regression test suite.
import
os
,
string
import
os
,
string
import
unittest
import
unittest
import
tempfile
from
test_all
import
verbose
from
test_all
import
verbose
,
get_new_database_path
try
:
try
:
# For Pythons w/distutils pybsddb
# For Pythons w/distutils pybsddb
...
@@ -16,10 +16,9 @@ except ImportError:
...
@@ -16,10 +16,9 @@ except ImportError:
# For Python 2.3
# For Python 2.3
from
bsddb
import
db
,
hashopen
,
btopen
,
rnopen
from
bsddb
import
db
,
hashopen
,
btopen
,
rnopen
class
CompatibilityTestCase
(
unittest
.
TestCase
):
class
CompatibilityTestCase
(
unittest
.
TestCase
):
def
setUp
(
self
):
def
setUp
(
self
):
self
.
filename
=
tempfile
.
mktemp
()
self
.
filename
=
get_new_database_path
()
def
tearDown
(
self
):
def
tearDown
(
self
):
try
:
try
:
...
@@ -47,7 +46,7 @@ class CompatibilityTestCase(unittest.TestCase):
...
@@ -47,7 +46,7 @@ class CompatibilityTestCase(unittest.TestCase):
if
verbose
:
if
verbose
:
print
'
%
s
%
s
%
s'
%
getTest
print
'
%
s
%
s
%
s'
%
getTest
assert
getTest
[
1
]
==
'quick'
,
'data mismatch!'
self
.
assertEqual
(
getTest
[
1
],
'quick'
,
'data mismatch!'
)
rv
=
f
.
set_location
(
3
)
rv
=
f
.
set_location
(
3
)
if
rv
!=
(
3
,
'brown'
):
if
rv
!=
(
3
,
'brown'
):
...
@@ -120,13 +119,13 @@ class CompatibilityTestCase(unittest.TestCase):
...
@@ -120,13 +119,13 @@ class CompatibilityTestCase(unittest.TestCase):
try
:
try
:
rec
=
f
.
next
()
rec
=
f
.
next
()
except
KeyError
:
except
KeyError
:
assert
rec
==
f
.
last
(),
'Error, last <> last!'
self
.
assertEqual
(
rec
,
f
.
last
(),
'Error, last <> last!'
)
f
.
previous
()
f
.
previous
()
break
break
if
verbose
:
if
verbose
:
print
rec
print
rec
assert
f
.
has_key
(
'f'
),
'Error, missing key!'
self
.
assert_
(
f
.
has_key
(
'f'
),
'Error, missing key!'
)
# test that set_location() returns the next nearest key, value
# test that set_location() returns the next nearest key, value
# on btree databases and raises KeyError on others.
# on btree databases and raises KeyError on others.
...
...
Lib/bsddb/test/test_cursor_pget_bug.py
Dosyayı görüntüle @
18eb1fa2
import
unittest
import
unittest
import
tempfile
import
os
,
glob
import
os
,
glob
try
:
try
:
...
@@ -9,12 +8,13 @@ except ImportError:
...
@@ -9,12 +8,13 @@ except ImportError:
# For Python 2.3
# For Python 2.3
from
bsddb
import
db
from
bsddb
import
db
from
test_all
import
get_new_environment_path
,
get_new_database_path
try
:
try
:
from
bsddb3
import
test_support
from
bsddb3
import
test_support
except
ImportError
:
except
ImportError
:
from
test
import
test_support
from
test
import
test_support
#----------------------------------------------------------------------
#----------------------------------------------------------------------
class
pget_bugTestCase
(
unittest
.
TestCase
):
class
pget_bugTestCase
(
unittest
.
TestCase
):
...
@@ -22,11 +22,7 @@ class pget_bugTestCase(unittest.TestCase):
...
@@ -22,11 +22,7 @@ class pget_bugTestCase(unittest.TestCase):
db_name
=
'test-cursor_pget.db'
db_name
=
'test-cursor_pget.db'
def
setUp
(
self
):
def
setUp
(
self
):
self
.
homeDir
=
os
.
path
.
join
(
tempfile
.
gettempdir
(),
'db_home
%
d'
%
os
.
getpid
())
self
.
homeDir
=
get_new_environment_path
()
try
:
os
.
mkdir
(
self
.
homeDir
)
except
os
.
error
:
pass
self
.
env
=
db
.
DBEnv
()
self
.
env
=
db
.
DBEnv
()
self
.
env
.
open
(
self
.
homeDir
,
db
.
DB_CREATE
|
db
.
DB_INIT_MPOOL
)
self
.
env
.
open
(
self
.
homeDir
,
db
.
DB_CREATE
|
db
.
DB_INIT_MPOOL
)
self
.
primary_db
=
db
.
DB
(
self
.
env
)
self
.
primary_db
=
db
.
DB
(
self
.
env
)
...
...
Lib/bsddb/test/test_dbobj.py
Dosyayı görüntüle @
18eb1fa2
import
os
,
string
import
os
,
string
import
unittest
import
unittest
import
tempfile
try
:
try
:
# For Pythons w/distutils pybsddb
# For Pythons w/distutils pybsddb
...
@@ -10,24 +9,21 @@ except ImportError:
...
@@ -10,24 +9,21 @@ except ImportError:
# For Python 2.3
# For Python 2.3
from
bsddb
import
db
,
dbobj
from
bsddb
import
db
,
dbobj
from
test_all
import
get_new_environment_path
,
get_new_database_path
try
:
try
:
from
bsddb3
import
test_support
from
bsddb3
import
test_support
except
ImportError
:
except
ImportError
:
from
test
import
test_support
from
test
import
test_support
#----------------------------------------------------------------------
#----------------------------------------------------------------------
class
dbobjTestCase
(
unittest
.
TestCase
):
class
dbobjTestCase
(
unittest
.
TestCase
):
"""Verify that dbobj.DB and dbobj.DBEnv work properly"""
"""Verify that dbobj.DB and dbobj.DBEnv work properly"""
db_home
=
'db_home'
db_name
=
'test-dbobj.db'
db_name
=
'test-dbobj.db'
def
setUp
(
self
):
def
setUp
(
self
):
homeDir
=
os
.
path
.
join
(
tempfile
.
gettempdir
(),
'db_home
%
d'
%
os
.
getpid
())
self
.
homeDir
=
get_new_environment_path
()
self
.
homeDir
=
homeDir
try
:
os
.
mkdir
(
homeDir
)
except
os
.
error
:
pass
def
tearDown
(
self
):
def
tearDown
(
self
):
if
hasattr
(
self
,
'db'
):
if
hasattr
(
self
,
'db'
):
...
@@ -48,10 +44,10 @@ class dbobjTestCase(unittest.TestCase):
...
@@ -48,10 +44,10 @@ class dbobjTestCase(unittest.TestCase):
self
.
db
=
TestDB
(
self
.
env
)
self
.
db
=
TestDB
(
self
.
env
)
self
.
db
.
open
(
self
.
db_name
,
db
.
DB_HASH
,
db
.
DB_CREATE
)
self
.
db
.
open
(
self
.
db_name
,
db
.
DB_HASH
,
db
.
DB_CREATE
)
self
.
db
.
put
(
'spam'
,
'eggs'
)
self
.
db
.
put
(
'spam'
,
'eggs'
)
assert
self
.
db
.
get
(
'spam'
)
==
None
,
\
self
.
assertEqual
(
self
.
db
.
get
(
'spam'
),
None
,
"overridden dbobj.DB.put() method failed [1]"
"overridden dbobj.DB.put() method failed [1]"
)
assert
self
.
db
.
get
(
'SPAM'
)
==
'eggs'
,
\
self
.
assertEqual
(
self
.
db
.
get
(
'SPAM'
),
'eggs'
,
"overridden dbobj.DB.put() method failed [2]"
"overridden dbobj.DB.put() method failed [2]"
)
self
.
db
.
close
()
self
.
db
.
close
()
self
.
env
.
close
()
self
.
env
.
close
()
...
@@ -63,12 +59,12 @@ class dbobjTestCase(unittest.TestCase):
...
@@ -63,12 +59,12 @@ class dbobjTestCase(unittest.TestCase):
# __setitem__
# __setitem__
self
.
db
[
'spam'
]
=
'eggs'
self
.
db
[
'spam'
]
=
'eggs'
# __len__
# __len__
assert
len
(
self
.
db
)
==
1
self
.
assertEqual
(
len
(
self
.
db
),
1
)
# __getitem__
# __getitem__
assert
self
.
db
[
'spam'
]
==
'eggs'
self
.
assertEqual
(
self
.
db
[
'spam'
],
'eggs'
)
# __del__
# __del__
del
self
.
db
[
'spam'
]
del
self
.
db
[
'spam'
]
assert
self
.
db
.
get
(
'spam'
)
==
None
,
"dbobj __del__ failed"
self
.
assertEqual
(
self
.
db
.
get
(
'spam'
),
None
,
"dbobj __del__ failed"
)
self
.
db
.
close
()
self
.
db
.
close
()
self
.
env
.
close
()
self
.
env
.
close
()
...
...
Lib/bsddb/test/test_dbshelve.py
Dosyayı görüntüle @
18eb1fa2
...
@@ -3,7 +3,7 @@ TestCases for checking dbShelve objects.
...
@@ -3,7 +3,7 @@ TestCases for checking dbShelve objects.
"""
"""
import
os
,
string
import
os
,
string
import
tempfile
,
random
import
random
from
types
import
*
from
types
import
*
import
unittest
import
unittest
...
@@ -19,7 +19,8 @@ try:
...
@@ -19,7 +19,8 @@ try:
except
ImportError
:
except
ImportError
:
from
test
import
test_support
from
test
import
test_support
from
test_all
import
verbose
from
test_all
import
verbose
,
get_new_environment_path
,
get_new_database_path
#----------------------------------------------------------------------
#----------------------------------------------------------------------
...
@@ -35,7 +36,7 @@ class DataClass:
...
@@ -35,7 +36,7 @@ class DataClass:
class
DBShelveTestCase
(
unittest
.
TestCase
):
class
DBShelveTestCase
(
unittest
.
TestCase
):
def
setUp
(
self
):
def
setUp
(
self
):
self
.
filename
=
tempfile
.
mktemp
()
self
.
filename
=
get_new_database_path
()
self
.
do_open
()
self
.
do_open
()
def
tearDown
(
self
):
def
tearDown
(
self
):
...
@@ -91,15 +92,15 @@ class DBShelveTestCase(unittest.TestCase):
...
@@ -91,15 +92,15 @@ class DBShelveTestCase(unittest.TestCase):
print
"keys:"
,
k
print
"keys:"
,
k
print
"stats:"
,
s
print
"stats:"
,
s
assert
0
==
d
.
has_key
(
self
.
mk
(
'bad key'
))
self
.
assertEqual
(
0
,
d
.
has_key
(
self
.
mk
(
'bad key'
)
))
assert
1
==
d
.
has_key
(
self
.
mk
(
'IA'
))
self
.
assertEqual
(
1
,
d
.
has_key
(
self
.
mk
(
'IA'
)
))
assert
1
==
d
.
has_key
(
self
.
mk
(
'OA'
))
self
.
assertEqual
(
1
,
d
.
has_key
(
self
.
mk
(
'OA'
)
))
d
.
delete
(
self
.
mk
(
'IA'
))
d
.
delete
(
self
.
mk
(
'IA'
))
del
d
[
self
.
mk
(
'OA'
)]
del
d
[
self
.
mk
(
'OA'
)]
assert
0
==
d
.
has_key
(
self
.
mk
(
'IA'
))
self
.
assertEqual
(
0
,
d
.
has_key
(
self
.
mk
(
'IA'
)
))
assert
0
==
d
.
has_key
(
self
.
mk
(
'OA'
))
self
.
assertEqual
(
0
,
d
.
has_key
(
self
.
mk
(
'OA'
)
))
assert
len
(
d
)
==
l
-
2
self
.
assertEqual
(
len
(
d
),
l
-
2
)
values
=
[]
values
=
[]
for
key
in
d
.
keys
():
for
key
in
d
.
keys
():
...
@@ -110,29 +111,29 @@ class DBShelveTestCase(unittest.TestCase):
...
@@ -110,29 +111,29 @@ class DBShelveTestCase(unittest.TestCase):
self
.
checkrec
(
key
,
value
)
self
.
checkrec
(
key
,
value
)
dbvalues
=
d
.
values
()
dbvalues
=
d
.
values
()
assert
len
(
dbvalues
)
==
len
(
d
.
keys
(
))
self
.
assertEqual
(
len
(
dbvalues
),
len
(
d
.
keys
()
))
values
.
sort
()
values
.
sort
()
dbvalues
.
sort
()
dbvalues
.
sort
()
assert
values
==
dbvalues
self
.
assertEqual
(
values
,
dbvalues
)
items
=
d
.
items
()
items
=
d
.
items
()
assert
len
(
items
)
==
len
(
values
)
self
.
assertEqual
(
len
(
items
),
len
(
values
)
)
for
key
,
value
in
items
:
for
key
,
value
in
items
:
self
.
checkrec
(
key
,
value
)
self
.
checkrec
(
key
,
value
)
assert
d
.
get
(
self
.
mk
(
'bad key'
))
==
None
self
.
assertEqual
(
d
.
get
(
self
.
mk
(
'bad key'
)),
None
)
assert
d
.
get
(
self
.
mk
(
'bad key'
),
None
)
==
None
self
.
assertEqual
(
d
.
get
(
self
.
mk
(
'bad key'
),
None
),
None
)
assert
d
.
get
(
self
.
mk
(
'bad key'
),
'a string'
)
==
'a string'
self
.
assertEqual
(
d
.
get
(
self
.
mk
(
'bad key'
),
'a string'
),
'a string'
)
assert
d
.
get
(
self
.
mk
(
'bad key'
),
[
1
,
2
,
3
])
==
[
1
,
2
,
3
]
self
.
assertEqual
(
d
.
get
(
self
.
mk
(
'bad key'
),
[
1
,
2
,
3
]),
[
1
,
2
,
3
])
d
.
set_get_returns_none
(
0
)
d
.
set_get_returns_none
(
0
)
self
.
assertRaises
(
db
.
DBNotFoundError
,
d
.
get
,
self
.
mk
(
'bad key'
))
self
.
assertRaises
(
db
.
DBNotFoundError
,
d
.
get
,
self
.
mk
(
'bad key'
))
d
.
set_get_returns_none
(
1
)
d
.
set_get_returns_none
(
1
)
d
.
put
(
self
.
mk
(
'new key'
),
'new data'
)
d
.
put
(
self
.
mk
(
'new key'
),
'new data'
)
assert
d
.
get
(
self
.
mk
(
'new key'
))
==
'new data'
self
.
assertEqual
(
d
.
get
(
self
.
mk
(
'new key'
)),
'new data'
)
assert
d
[
self
.
mk
(
'new key'
)]
==
'new data'
self
.
assertEqual
(
d
[
self
.
mk
(
'new key'
)],
'new data'
)
...
@@ -156,7 +157,7 @@ class DBShelveTestCase(unittest.TestCase):
...
@@ -156,7 +157,7 @@ class DBShelveTestCase(unittest.TestCase):
rec
=
c
.
next
()
rec
=
c
.
next
()
del
c
del
c
assert
count
==
len
(
d
)
self
.
assertEqual
(
count
,
len
(
d
)
)
count
=
0
count
=
0
c
=
d
.
cursor
()
c
=
d
.
cursor
()
...
@@ -169,7 +170,7 @@ class DBShelveTestCase(unittest.TestCase):
...
@@ -169,7 +170,7 @@ class DBShelveTestCase(unittest.TestCase):
self
.
checkrec
(
key
,
value
)
self
.
checkrec
(
key
,
value
)
rec
=
c
.
prev
()
rec
=
c
.
prev
()
assert
count
==
len
(
d
)
self
.
assertEqual
(
count
,
len
(
d
)
)
c
.
set
(
self
.
mk
(
'SS'
))
c
.
set
(
self
.
mk
(
'SS'
))
key
,
value
=
c
.
current
()
key
,
value
=
c
.
current
()
...
@@ -191,25 +192,25 @@ class DBShelveTestCase(unittest.TestCase):
...
@@ -191,25 +192,25 @@ class DBShelveTestCase(unittest.TestCase):
# override this in a subclass if the key type is different
# override this in a subclass if the key type is different
x
=
key
[
1
]
x
=
key
[
1
]
if
key
[
0
]
==
'S'
:
if
key
[
0
]
==
'S'
:
assert
type
(
value
)
==
StringType
self
.
assertEqual
(
type
(
value
),
StringType
)
assert
value
==
10
*
x
self
.
assertEqual
(
value
,
10
*
x
)
elif
key
[
0
]
==
'I'
:
elif
key
[
0
]
==
'I'
:
assert
type
(
value
)
==
IntType
self
.
assertEqual
(
type
(
value
),
IntType
)
assert
value
==
ord
(
x
)
self
.
assertEqual
(
value
,
ord
(
x
)
)
elif
key
[
0
]
==
'L'
:
elif
key
[
0
]
==
'L'
:
assert
type
(
value
)
==
ListType
self
.
assertEqual
(
type
(
value
),
ListType
)
assert
value
==
[
x
]
*
10
self
.
assertEqual
(
value
,
[
x
]
*
10
)
elif
key
[
0
]
==
'O'
:
elif
key
[
0
]
==
'O'
:
assert
type
(
value
)
==
InstanceType
self
.
assertEqual
(
type
(
value
),
InstanceType
)
assert
value
.
S
==
10
*
x
self
.
assertEqual
(
value
.
S
,
10
*
x
)
assert
value
.
I
==
ord
(
x
)
self
.
assertEqual
(
value
.
I
,
ord
(
x
)
)
assert
value
.
L
==
[
x
]
*
10
self
.
assertEqual
(
value
.
L
,
[
x
]
*
10
)
else
:
else
:
raise
AssertionError
,
'Unknown key type, fix the test'
self
.
assert_
(
0
,
'Unknown key type, fix the test'
)
#----------------------------------------------------------------------
#----------------------------------------------------------------------
...
@@ -246,12 +247,9 @@ class ThreadHashShelveTestCase(BasicShelveTestCase):
...
@@ -246,12 +247,9 @@ class ThreadHashShelveTestCase(BasicShelveTestCase):
class
BasicEnvShelveTestCase
(
DBShelveTestCase
):
class
BasicEnvShelveTestCase
(
DBShelveTestCase
):
def
do_open
(
self
):
def
do_open
(
self
):
self
.
homeDir
=
homeDir
=
os
.
path
.
join
(
tempfile
.
gettempdir
(),
'db_home
%
d'
%
os
.
getpid
())
try
:
os
.
mkdir
(
homeDir
)
except
os
.
error
:
pass
self
.
env
=
db
.
DBEnv
()
self
.
env
=
db
.
DBEnv
()
self
.
env
.
open
(
homeDir
,
self
.
envflags
|
db
.
DB_INIT_MPOOL
|
db
.
DB_CREATE
)
self
.
env
.
open
(
self
.
homeDir
,
self
.
envflags
|
db
.
DB_INIT_MPOOL
|
db
.
DB_CREATE
)
self
.
filename
=
os
.
path
.
split
(
self
.
filename
)[
1
]
self
.
filename
=
os
.
path
.
split
(
self
.
filename
)[
1
]
self
.
d
=
dbshelve
.
DBShelf
(
self
.
env
)
self
.
d
=
dbshelve
.
DBShelf
(
self
.
env
)
...
@@ -263,6 +261,10 @@ class BasicEnvShelveTestCase(DBShelveTestCase):
...
@@ -263,6 +261,10 @@ class BasicEnvShelveTestCase(DBShelveTestCase):
self
.
env
.
close
()
self
.
env
.
close
()
def
setUp
(
self
)
:
self
.
homeDir
=
get_new_environment_path
()
DBShelveTestCase
.
setUp
(
self
)
def
tearDown
(
self
):
def
tearDown
(
self
):
self
.
do_close
()
self
.
do_close
()
test_support
.
rmtree
(
self
.
homeDir
)
test_support
.
rmtree
(
self
.
homeDir
)
...
...
Lib/bsddb/test/test_dbtables.py
Dosyayı görüntüle @
18eb1fa2
...
@@ -21,16 +21,14 @@
...
@@ -21,16 +21,14 @@
# $Id$
# $Id$
import
os
,
re
import
os
,
re
import
tempfile
try
:
try
:
import
cPickle
import
cPickle
pickle
=
cPickle
pickle
=
cPickle
except
ImportError
:
except
ImportError
:
import
pickle
import
pickle
import
tempfile
import
unittest
import
unittest
from
test_all
import
verbose
from
test_all
import
verbose
,
get_new_environment_path
,
get_new_database_path
try
:
try
:
# For Pythons w/distutils pybsddb
# For Pythons w/distutils pybsddb
...
@@ -48,16 +46,12 @@ except ImportError:
...
@@ -48,16 +46,12 @@ except ImportError:
#----------------------------------------------------------------------
#----------------------------------------------------------------------
class
TableDBTestCase
(
unittest
.
TestCase
):
class
TableDBTestCase
(
unittest
.
TestCase
):
db_home
=
'db_home'
db_name
=
'test-table.db'
db_name
=
'test-table.db'
def
setUp
(
self
):
def
setUp
(
self
):
homeDir
=
tempfile
.
mkdtemp
()
self
.
testHomeDir
=
get_new_environment_path
()
self
.
testHomeDir
=
homeDir
try
:
os
.
mkdir
(
homeDir
)
except
os
.
error
:
pass
self
.
tdb
=
dbtables
.
bsdTableDB
(
self
.
tdb
=
dbtables
.
bsdTableDB
(
filename
=
'tabletest.db'
,
dbhome
=
h
omeDir
,
create
=
1
)
filename
=
'tabletest.db'
,
dbhome
=
self
.
testH
omeDir
,
create
=
1
)
def
tearDown
(
self
):
def
tearDown
(
self
):
self
.
tdb
.
close
()
self
.
tdb
.
close
()
...
@@ -323,7 +317,7 @@ class TableDBTestCase(unittest.TestCase):
...
@@ -323,7 +317,7 @@ class TableDBTestCase(unittest.TestCase):
self
.
tdb
.
Insert
(
tabname
,
{
'Type'
:
'Unknown'
,
'Access'
:
'0'
})
self
.
tdb
.
Insert
(
tabname
,
{
'Type'
:
'Unknown'
,
'Access'
:
'0'
})
def
set_type
(
type
):
def
set_type
(
type
):
if
type
is
None
:
if
type
==
None
:
return
'MP3'
return
'MP3'
return
type
return
type
...
...
Lib/bsddb/test/test_distributed_transactions.py
0 → 100644
Dosyayı görüntüle @
18eb1fa2
"""TestCases for distributed transactions.
"""
import
os
import
unittest
try
:
# For Pythons w/distutils pybsddb
from
bsddb3
import
db
except
ImportError
:
# For Python 2.3
from
bsddb
import
db
from
test_all
import
get_new_environment_path
,
get_new_database_path
try
:
from
bsddb3
import
test_support
except
ImportError
:
from
test
import
test_support
try
:
a
=
set
()
except
:
# Python 2.3
from
sets
import
Set
as
set
else
:
del
a
from
test_all
import
verbose
#----------------------------------------------------------------------
class
DBTxn_distributed
(
unittest
.
TestCase
):
num_txns
=
1234
nosync
=
True
must_open_db
=
False
def
_create_env
(
self
,
must_open_db
)
:
self
.
dbenv
=
db
.
DBEnv
()
self
.
dbenv
.
set_tx_max
(
self
.
num_txns
)
self
.
dbenv
.
set_lk_max_lockers
(
self
.
num_txns
*
2
)
self
.
dbenv
.
set_lk_max_locks
(
self
.
num_txns
*
2
)
self
.
dbenv
.
set_lk_max_objects
(
self
.
num_txns
*
2
)
if
self
.
nosync
:
self
.
dbenv
.
set_flags
(
db
.
DB_TXN_NOSYNC
,
True
)
self
.
dbenv
.
open
(
self
.
homeDir
,
db
.
DB_CREATE
|
db
.
DB_THREAD
|
db
.
DB_RECOVER
|
db
.
DB_INIT_TXN
|
db
.
DB_INIT_LOG
|
db
.
DB_INIT_MPOOL
|
db
.
DB_INIT_LOCK
,
0666
)
self
.
db
=
db
.
DB
(
self
.
dbenv
)
self
.
db
.
set_re_len
(
db
.
DB_XIDDATASIZE
)
if
must_open_db
:
if
db
.
version
()
>
(
4
,
1
)
:
txn
=
self
.
dbenv
.
txn_begin
()
self
.
db
.
open
(
self
.
filename
,
db
.
DB_QUEUE
,
db
.
DB_CREATE
|
db
.
DB_THREAD
,
0666
,
txn
=
txn
)
txn
.
commit
()
else
:
self
.
db
.
open
(
self
.
filename
,
db
.
DB_QUEUE
,
db
.
DB_CREATE
|
db
.
DB_THREAD
,
0666
)
def
setUp
(
self
)
:
self
.
homeDir
=
get_new_environment_path
()
self
.
filename
=
"test"
return
self
.
_create_env
(
must_open_db
=
True
)
def
_destroy_env
(
self
):
if
self
.
nosync
or
(
db
.
version
()[:
2
]
==
(
4
,
6
)):
# Known bug
self
.
dbenv
.
log_flush
()
self
.
db
.
close
()
self
.
dbenv
.
close
()
def
tearDown
(
self
):
self
.
_destroy_env
()
test_support
.
rmtree
(
self
.
homeDir
)
def
_recreate_env
(
self
,
must_open_db
)
:
self
.
_destroy_env
()
self
.
_create_env
(
must_open_db
)
def
test01_distributed_transactions
(
self
)
:
txns
=
set
()
# Create transactions, "prepare" them, and
# let them be garbage collected.
for
i
in
xrange
(
self
.
num_txns
)
:
txn
=
self
.
dbenv
.
txn_begin
()
gid
=
"
%%%
dd"
%
db
.
DB_XIDDATASIZE
gid
=
gid
%
i
self
.
db
.
put
(
i
,
gid
,
txn
=
txn
,
flags
=
db
.
DB_APPEND
)
txns
.
add
(
gid
)
txn
.
prepare
(
gid
)
del
txn
self
.
_recreate_env
(
self
.
must_open_db
)
# Get "to be recovered" transactions but
# let them be garbage collected.
recovered_txns
=
self
.
dbenv
.
txn_recover
()
self
.
assertEquals
(
self
.
num_txns
,
len
(
recovered_txns
))
for
gid
,
txn
in
recovered_txns
:
self
.
assert_
(
gid
in
txns
)
del
txn
del
recovered_txns
self
.
_recreate_env
(
self
.
must_open_db
)
# Get "to be recovered" transactions. Commit, abort and
# discard them.
recovered_txns
=
self
.
dbenv
.
txn_recover
()
self
.
assertEquals
(
self
.
num_txns
,
len
(
recovered_txns
))
discard_txns
=
set
()
committed_txns
=
set
()
state
=
0
for
gid
,
txn
in
recovered_txns
:
if
state
==
0
or
state
==
1
:
committed_txns
.
add
(
gid
)
txn
.
commit
()
elif
state
==
2
:
txn
.
abort
()
elif
state
==
3
:
txn
.
discard
()
discard_txns
.
add
(
gid
)
state
=-
1
state
+=
1
del
txn
del
recovered_txns
self
.
_recreate_env
(
self
.
must_open_db
)
# Verify the discarded transactions are still
# around, and dispose them.
recovered_txns
=
self
.
dbenv
.
txn_recover
()
self
.
assertEquals
(
len
(
discard_txns
),
len
(
recovered_txns
))
for
gid
,
txn
in
recovered_txns
:
txn
.
abort
()
del
txn
del
recovered_txns
self
.
_recreate_env
(
must_open_db
=
True
)
# Be sure there are not pending transactions.
# Check also database size.
recovered_txns
=
self
.
dbenv
.
txn_recover
()
self
.
assert_
(
len
(
recovered_txns
)
==
0
)
self
.
assertEquals
(
len
(
committed_txns
),
self
.
db
.
stat
()[
"nkeys"
])
class
DBTxn_distributedSYNC
(
DBTxn_distributed
):
nosync
=
False
class
DBTxn_distributed_must_open_db
(
DBTxn_distributed
):
must_open_db
=
True
class
DBTxn_distributedSYNC_must_open_db
(
DBTxn_distributed
):
nosync
=
False
must_open_db
=
True
#----------------------------------------------------------------------
def
test_suite
():
suite
=
unittest
.
TestSuite
()
if
db
.
version
()
>=
(
4
,
5
)
:
suite
.
addTest
(
unittest
.
makeSuite
(
DBTxn_distributed
))
suite
.
addTest
(
unittest
.
makeSuite
(
DBTxn_distributedSYNC
))
if
db
.
version
()
>=
(
4
,
6
)
:
suite
.
addTest
(
unittest
.
makeSuite
(
DBTxn_distributed_must_open_db
))
suite
.
addTest
(
unittest
.
makeSuite
(
DBTxn_distributedSYNC_must_open_db
))
return
suite
if
__name__
==
'__main__'
:
unittest
.
main
(
defaultTest
=
'test_suite'
)
Lib/bsddb/test/test_early_close.py
0 → 100644
Dosyayı görüntüle @
18eb1fa2
"""TestCases for checking that it does not segfault when a DBEnv object
is closed before its DB objects.
"""
import
os
import
unittest
try
:
# For Pythons w/distutils pybsddb
from
bsddb3
import
db
except
ImportError
:
# For Python 2.3
from
bsddb
import
db
try
:
from
bsddb3
import
test_support
except
ImportError
:
from
test
import
test_support
from
test_all
import
verbose
,
get_new_environment_path
,
get_new_database_path
# We're going to get warnings in this module about trying to close the db when
# its env is already closed. Let's just ignore those.
try
:
import
warnings
except
ImportError
:
pass
else
:
warnings
.
filterwarnings
(
'ignore'
,
message
=
'DB could not be closed in'
,
category
=
RuntimeWarning
)
#----------------------------------------------------------------------
class
DBEnvClosedEarlyCrash
(
unittest
.
TestCase
):
def
setUp
(
self
):
self
.
homeDir
=
get_new_environment_path
()
self
.
filename
=
"test"
def
tearDown
(
self
):
test_support
.
rmtree
(
self
.
homeDir
)
def
test01_close_dbenv_before_db
(
self
):
dbenv
=
db
.
DBEnv
()
dbenv
.
open
(
self
.
homeDir
,
db
.
DB_INIT_CDB
|
db
.
DB_CREATE
|
db
.
DB_THREAD
|
db
.
DB_INIT_MPOOL
,
0666
)
d
=
db
.
DB
(
dbenv
)
d2
=
db
.
DB
(
dbenv
)
d
.
open
(
self
.
filename
,
db
.
DB_BTREE
,
db
.
DB_CREATE
|
db
.
DB_THREAD
,
0666
)
self
.
assertRaises
(
db
.
DBNoSuchFileError
,
d2
.
open
,
self
.
filename
+
"2"
,
db
.
DB_BTREE
,
db
.
DB_THREAD
,
0666
)
d
.
put
(
"test"
,
"this is a test"
)
self
.
assertEqual
(
d
.
get
(
"test"
),
"this is a test"
,
"put!=get"
)
dbenv
.
close
()
# This "close" should close the child db handle also
self
.
assertRaises
(
db
.
DBError
,
d
.
get
,
"test"
)
def
test02_close_dbenv_before_dbcursor
(
self
):
dbenv
=
db
.
DBEnv
()
dbenv
.
open
(
self
.
homeDir
,
db
.
DB_INIT_CDB
|
db
.
DB_CREATE
|
db
.
DB_THREAD
|
db
.
DB_INIT_MPOOL
,
0666
)
d
=
db
.
DB
(
dbenv
)
d
.
open
(
self
.
filename
,
db
.
DB_BTREE
,
db
.
DB_CREATE
|
db
.
DB_THREAD
,
0666
)
d
.
put
(
"test"
,
"this is a test"
)
d
.
put
(
"test2"
,
"another test"
)
d
.
put
(
"test3"
,
"another one"
)
self
.
assertEqual
(
d
.
get
(
"test"
),
"this is a test"
,
"put!=get"
)
c
=
d
.
cursor
()
c
.
first
()
c
.
next
()
d
.
close
()
# This "close" should close the child db handle also
# db.close should close the child cursor
self
.
assertRaises
(
db
.
DBError
,
c
.
next
)
d
=
db
.
DB
(
dbenv
)
d
.
open
(
self
.
filename
,
db
.
DB_BTREE
,
db
.
DB_CREATE
|
db
.
DB_THREAD
,
0666
)
c
=
d
.
cursor
()
c
.
first
()
c
.
next
()
dbenv
.
close
()
# The "close" should close the child db handle also, with cursors
self
.
assertRaises
(
db
.
DBError
,
c
.
next
)
def
test03_close_db_before_dbcursor_without_env
(
self
):
import
os.path
path
=
os
.
path
.
join
(
self
.
homeDir
,
self
.
filename
)
d
=
db
.
DB
()
d
.
open
(
path
,
db
.
DB_BTREE
,
db
.
DB_CREATE
|
db
.
DB_THREAD
,
0666
)
d
.
put
(
"test"
,
"this is a test"
)
d
.
put
(
"test2"
,
"another test"
)
d
.
put
(
"test3"
,
"another one"
)
self
.
assertEqual
(
d
.
get
(
"test"
),
"this is a test"
,
"put!=get"
)
c
=
d
.
cursor
()
c
.
first
()
c
.
next
()
d
.
close
()
# The "close" should close the child db handle also
self
.
assertRaises
(
db
.
DBError
,
c
.
next
)
def
test04_close_massive
(
self
):
dbenv
=
db
.
DBEnv
()
dbenv
.
open
(
self
.
homeDir
,
db
.
DB_INIT_CDB
|
db
.
DB_CREATE
|
db
.
DB_THREAD
|
db
.
DB_INIT_MPOOL
,
0666
)
dbs
=
[
db
.
DB
(
dbenv
)
for
i
in
xrange
(
16
)]
cursors
=
[]
for
i
in
dbs
:
i
.
open
(
self
.
filename
,
db
.
DB_BTREE
,
db
.
DB_CREATE
|
db
.
DB_THREAD
,
0666
)
dbs
[
10
]
.
put
(
"test"
,
"this is a test"
)
dbs
[
10
]
.
put
(
"test2"
,
"another test"
)
dbs
[
10
]
.
put
(
"test3"
,
"another one"
)
self
.
assertEqual
(
dbs
[
4
]
.
get
(
"test"
),
"this is a test"
,
"put!=get"
)
for
i
in
dbs
:
cursors
.
extend
([
i
.
cursor
()
for
j
in
xrange
(
32
)])
for
i
in
dbs
[::
3
]
:
i
.
close
()
for
i
in
cursors
[::
3
]
:
i
.
close
()
# Check for missing exception in DB! (after DB close)
self
.
assertRaises
(
db
.
DBError
,
dbs
[
9
]
.
get
,
"test"
)
# Check for missing exception in DBCursor! (after DB close)
self
.
assertRaises
(
db
.
DBError
,
cursors
[
101
]
.
first
)
cursors
[
80
]
.
first
()
cursors
[
80
]
.
next
()
dbenv
.
close
()
# This "close" should close the child db handle also
# Check for missing exception! (after DBEnv close)
self
.
assertRaises
(
db
.
DBError
,
cursors
[
80
]
.
next
)
def
test05_close_dbenv_delete_db_success
(
self
):
dbenv
=
db
.
DBEnv
()
dbenv
.
open
(
self
.
homeDir
,
db
.
DB_INIT_CDB
|
db
.
DB_CREATE
|
db
.
DB_THREAD
|
db
.
DB_INIT_MPOOL
,
0666
)
d
=
db
.
DB
(
dbenv
)
d
.
open
(
self
.
filename
,
db
.
DB_BTREE
,
db
.
DB_CREATE
|
db
.
DB_THREAD
,
0666
)
dbenv
.
close
()
# This "close" should close the child db handle also
del
d
try
:
import
gc
except
ImportError
:
gc
=
None
if
gc
:
# force d.__del__ [DB_dealloc] to be called
gc
.
collect
()
def
test06_close_txn_before_dup_cursor
(
self
)
:
dbenv
=
db
.
DBEnv
()
dbenv
.
open
(
self
.
homeDir
,
db
.
DB_INIT_TXN
|
db
.
DB_INIT_MPOOL
|
db
.
DB_INIT_LOG
|
db
.
DB_CREATE
)
d
=
db
.
DB
(
dbenv
)
txn
=
dbenv
.
txn_begin
()
if
db
.
version
()
<
(
4
,
1
)
:
d
.
open
(
self
.
filename
,
dbtype
=
db
.
DB_HASH
,
flags
=
db
.
DB_CREATE
)
else
:
d
.
open
(
self
.
filename
,
dbtype
=
db
.
DB_HASH
,
flags
=
db
.
DB_CREATE
,
txn
=
txn
)
d
.
put
(
"XXX"
,
"yyy"
,
txn
=
txn
)
txn
.
commit
()
txn
=
dbenv
.
txn_begin
()
c1
=
d
.
cursor
(
txn
)
c2
=
c1
.
dup
()
self
.
assertEquals
((
"XXX"
,
"yyy"
),
c1
.
first
())
import
warnings
# Not interested in warnings about implicit close.
warnings
.
simplefilter
(
"ignore"
)
txn
.
commit
()
warnings
.
resetwarnings
()
self
.
assertRaises
(
db
.
DBCursorClosedError
,
c2
.
first
)
if
db
.
version
()
>
(
4
,
3
,
0
)
:
def
test07_close_db_before_sequence
(
self
):
import
os.path
path
=
os
.
path
.
join
(
self
.
homeDir
,
self
.
filename
)
d
=
db
.
DB
()
d
.
open
(
path
,
db
.
DB_BTREE
,
db
.
DB_CREATE
|
db
.
DB_THREAD
,
0666
)
dbs
=
db
.
DBSequence
(
d
)
d
.
close
()
# This "close" should close the child DBSequence also
dbs
.
close
()
# If not closed, core dump (in Berkeley DB 4.6.*)
#----------------------------------------------------------------------
def
test_suite
():
suite
=
unittest
.
TestSuite
()
suite
.
addTest
(
unittest
.
makeSuite
(
DBEnvClosedEarlyCrash
))
return
suite
if
__name__
==
'__main__'
:
unittest
.
main
(
defaultTest
=
'test_suite'
)
Lib/bsddb/test/test_env_close.py
deleted
100644 → 0
Dosyayı görüntüle @
cb33aeaf
"""TestCases for checking that it does not segfault when a DBEnv object
is closed before its DB objects.
"""
import
os
import
tempfile
import
unittest
try
:
# For Pythons w/distutils pybsddb
from
bsddb3
import
db
except
ImportError
:
# For Python 2.3
from
bsddb
import
db
try
:
from
bsddb3
import
test_support
except
ImportError
:
from
test
import
test_support
from
test_all
import
verbose
# We're going to get warnings in this module about trying to close the db when
# its env is already closed. Let's just ignore those.
try
:
import
warnings
except
ImportError
:
pass
else
:
warnings
.
filterwarnings
(
'ignore'
,
message
=
'DB could not be closed in'
,
category
=
RuntimeWarning
)
#----------------------------------------------------------------------
class
DBEnvClosedEarlyCrash
(
unittest
.
TestCase
):
def
setUp
(
self
):
self
.
homeDir
=
os
.
path
.
join
(
tempfile
.
gettempdir
(),
'db_home
%
d'
%
os
.
getpid
())
try
:
os
.
mkdir
(
self
.
homeDir
)
except
os
.
error
:
pass
tempfile
.
tempdir
=
self
.
homeDir
self
.
filename
=
os
.
path
.
split
(
tempfile
.
mktemp
())[
1
]
tempfile
.
tempdir
=
None
def
tearDown
(
self
):
test_support
.
rmtree
(
self
.
homeDir
)
def
test01_close_dbenv_before_db
(
self
):
dbenv
=
db
.
DBEnv
()
dbenv
.
open
(
self
.
homeDir
,
db
.
DB_INIT_CDB
|
db
.
DB_CREATE
|
db
.
DB_THREAD
|
db
.
DB_INIT_MPOOL
,
0666
)
d
=
db
.
DB
(
dbenv
)
d
.
open
(
self
.
filename
,
db
.
DB_BTREE
,
db
.
DB_CREATE
|
db
.
DB_THREAD
,
0666
)
try
:
dbenv
.
close
()
except
db
.
DBError
:
try
:
d
.
close
()
except
db
.
DBError
:
return
assert
0
,
\
"DB close did not raise an exception about its "
\
"DBEnv being trashed"
# XXX This may fail when using older versions of BerkeleyDB.
# E.g. 3.2.9 never raised the exception.
assert
0
,
"dbenv did not raise an exception about its DB being open"
def
test02_close_dbenv_delete_db_success
(
self
):
dbenv
=
db
.
DBEnv
()
dbenv
.
open
(
self
.
homeDir
,
db
.
DB_INIT_CDB
|
db
.
DB_CREATE
|
db
.
DB_THREAD
|
db
.
DB_INIT_MPOOL
,
0666
)
d
=
db
.
DB
(
dbenv
)
d
.
open
(
self
.
filename
,
db
.
DB_BTREE
,
db
.
DB_CREATE
|
db
.
DB_THREAD
,
0666
)
try
:
dbenv
.
close
()
except
db
.
DBError
:
pass
# good, it should raise an exception
del
d
try
:
import
gc
except
ImportError
:
gc
=
None
if
gc
:
# force d.__del__ [DB_dealloc] to be called
gc
.
collect
()
#----------------------------------------------------------------------
def
test_suite
():
suite
=
unittest
.
TestSuite
()
suite
.
addTest
(
unittest
.
makeSuite
(
DBEnvClosedEarlyCrash
))
return
suite
if
__name__
==
'__main__'
:
unittest
.
main
(
defaultTest
=
'test_suite'
)
Lib/bsddb/test/test_get_none.py
Dosyayı görüntüle @
18eb1fa2
...
@@ -3,7 +3,6 @@ TestCases for checking set_get_returns_none.
...
@@ -3,7 +3,6 @@ TestCases for checking set_get_returns_none.
"""
"""
import
os
,
string
import
os
,
string
import
tempfile
import
unittest
import
unittest
try
:
try
:
...
@@ -13,14 +12,14 @@ except ImportError:
...
@@ -13,14 +12,14 @@ except ImportError:
# For Python 2.3
# For Python 2.3
from
bsddb
import
db
from
bsddb
import
db
from
test_all
import
verbose
from
test_all
import
verbose
,
get_new_database_path
#----------------------------------------------------------------------
#----------------------------------------------------------------------
class
GetReturnsNoneTestCase
(
unittest
.
TestCase
):
class
GetReturnsNoneTestCase
(
unittest
.
TestCase
):
def
setUp
(
self
):
def
setUp
(
self
):
self
.
filename
=
tempfile
.
mktemp
()
self
.
filename
=
get_new_database_path
()
def
tearDown
(
self
):
def
tearDown
(
self
):
try
:
try
:
...
@@ -38,10 +37,10 @@ class GetReturnsNoneTestCase(unittest.TestCase):
...
@@ -38,10 +37,10 @@ class GetReturnsNoneTestCase(unittest.TestCase):
d
.
put
(
x
,
x
*
40
)
d
.
put
(
x
,
x
*
40
)
data
=
d
.
get
(
'bad key'
)
data
=
d
.
get
(
'bad key'
)
assert
data
==
None
self
.
assertEqual
(
data
,
None
)
data
=
d
.
get
(
'a'
)
data
=
d
.
get
(
'a'
)
assert
data
==
'a'
*
40
self
.
assertEqual
(
data
,
'a'
*
40
)
count
=
0
count
=
0
c
=
d
.
cursor
()
c
=
d
.
cursor
()
...
@@ -50,8 +49,8 @@ class GetReturnsNoneTestCase(unittest.TestCase):
...
@@ -50,8 +49,8 @@ class GetReturnsNoneTestCase(unittest.TestCase):
count
=
count
+
1
count
=
count
+
1
rec
=
c
.
next
()
rec
=
c
.
next
()
assert
rec
==
None
self
.
assertEqual
(
rec
,
None
)
assert
count
==
52
self
.
assertEqual
(
count
,
52
)
c
.
close
()
c
.
close
()
d
.
close
()
d
.
close
()
...
@@ -69,7 +68,7 @@ class GetReturnsNoneTestCase(unittest.TestCase):
...
@@ -69,7 +68,7 @@ class GetReturnsNoneTestCase(unittest.TestCase):
self
.
assertRaises
(
KeyError
,
d
.
get
,
'bad key'
)
self
.
assertRaises
(
KeyError
,
d
.
get
,
'bad key'
)
data
=
d
.
get
(
'a'
)
data
=
d
.
get
(
'a'
)
assert
data
==
'a'
*
40
self
.
assertEqual
(
data
,
'a'
*
40
)
count
=
0
count
=
0
exceptionHappened
=
0
exceptionHappened
=
0
...
@@ -83,9 +82,9 @@ class GetReturnsNoneTestCase(unittest.TestCase):
...
@@ -83,9 +82,9 @@ class GetReturnsNoneTestCase(unittest.TestCase):
exceptionHappened
=
1
exceptionHappened
=
1
break
break
assert
rec
!=
None
self
.
assertNotEqual
(
rec
,
None
)
assert
exceptionHappened
self
.
assert_
(
exceptionHappened
)
assert
count
==
52
self
.
assertEqual
(
count
,
52
)
c
.
close
()
c
.
close
()
d
.
close
()
d
.
close
()
...
...
Lib/bsddb/test/test_join.py
Dosyayı görüntüle @
18eb1fa2
...
@@ -2,13 +2,6 @@
...
@@ -2,13 +2,6 @@
"""
"""
import
os
import
os
import
tempfile
try
:
from
threading
import
Thread
,
currentThread
have_threads
=
1
except
ImportError
:
have_threads
=
0
import
unittest
import
unittest
from
test_all
import
verbose
from
test_all
import
verbose
...
@@ -20,11 +13,14 @@ except ImportError:
...
@@ -20,11 +13,14 @@ except ImportError:
# For Python 2.3
# For Python 2.3
from
bsddb
import
db
,
dbshelve
from
bsddb
import
db
,
dbshelve
from
test_all
import
get_new_environment_path
,
get_new_database_path
try
:
try
:
from
bsddb3
import
test_support
from
bsddb3
import
test_support
except
ImportError
:
except
ImportError
:
from
test
import
test_support
from
test
import
test_support
#----------------------------------------------------------------------
#----------------------------------------------------------------------
ProductIndex
=
[
ProductIndex
=
[
...
@@ -51,12 +47,9 @@ class JoinTestCase(unittest.TestCase):
...
@@ -51,12 +47,9 @@ class JoinTestCase(unittest.TestCase):
def
setUp
(
self
):
def
setUp
(
self
):
self
.
filename
=
self
.
__class__
.
__name__
+
'.db'
self
.
filename
=
self
.
__class__
.
__name__
+
'.db'
homeDir
=
os
.
path
.
join
(
tempfile
.
gettempdir
(),
'db_home
%
d'
%
os
.
getpid
())
self
.
homeDir
=
get_new_environment_path
()
self
.
homeDir
=
homeDir
try
:
os
.
mkdir
(
homeDir
)
except
os
.
error
:
pass
self
.
env
=
db
.
DBEnv
()
self
.
env
=
db
.
DBEnv
()
self
.
env
.
open
(
homeDir
,
db
.
DB_CREATE
|
db
.
DB_INIT_MPOOL
|
db
.
DB_INIT_LOCK
)
self
.
env
.
open
(
self
.
homeDir
,
db
.
DB_CREATE
|
db
.
DB_INIT_MPOOL
|
db
.
DB_INIT_LOCK
)
def
tearDown
(
self
):
def
tearDown
(
self
):
self
.
env
.
close
()
self
.
env
.
close
()
...
@@ -87,7 +80,7 @@ class JoinTestCase(unittest.TestCase):
...
@@ -87,7 +80,7 @@ class JoinTestCase(unittest.TestCase):
# Don't do the .set() in an assert, or you can get a bogus failure
# Don't do the .set() in an assert, or you can get a bogus failure
# when running python -O
# when running python -O
tmp
=
sCursor
.
set
(
'red'
)
tmp
=
sCursor
.
set
(
'red'
)
assert
tmp
self
.
assert_
(
tmp
)
# FIXME: jCursor doesn't properly hold a reference to its
# FIXME: jCursor doesn't properly hold a reference to its
# cursors, if they are closed before jcursor is used it
# cursors, if they are closed before jcursor is used it
...
...
Lib/bsddb/test/test_lock.py
Dosyayı görüntüle @
18eb1fa2
...
@@ -2,7 +2,6 @@
...
@@ -2,7 +2,6 @@
TestCases for testing the locking sub-system.
TestCases for testing the locking sub-system.
"""
"""
import
tempfile
import
time
import
time
try
:
try
:
...
@@ -13,7 +12,7 @@ except ImportError:
...
@@ -13,7 +12,7 @@ except ImportError:
import
unittest
import
unittest
from
test_all
import
verbose
from
test_all
import
verbose
,
get_new_environment_path
,
get_new_database_path
try
:
try
:
# For Pythons w/distutils pybsddb
# For Pythons w/distutils pybsddb
...
@@ -31,9 +30,14 @@ except ImportError:
...
@@ -31,9 +30,14 @@ except ImportError:
#----------------------------------------------------------------------
#----------------------------------------------------------------------
class
LockingTestCase
(
unittest
.
TestCase
):
class
LockingTestCase
(
unittest
.
TestCase
):
import
sys
if
sys
.
version_info
[:
3
]
<
(
2
,
4
,
0
):
def
assertTrue
(
self
,
expr
,
msg
=
None
):
self
.
failUnless
(
expr
,
msg
=
msg
)
def
setUp
(
self
):
def
setUp
(
self
):
self
.
homeDir
=
tempfile
.
mkdtemp
(
'.test_lock'
)
self
.
homeDir
=
get_new_environment_path
(
)
self
.
env
=
db
.
DBEnv
()
self
.
env
=
db
.
DBEnv
()
self
.
env
.
open
(
self
.
homeDir
,
db
.
DB_THREAD
|
db
.
DB_INIT_MPOOL
|
self
.
env
.
open
(
self
.
homeDir
,
db
.
DB_THREAD
|
db
.
DB_INIT_MPOOL
|
db
.
DB_INIT_LOCK
|
db
.
DB_CREATE
)
db
.
DB_INIT_LOCK
|
db
.
DB_CREATE
)
...
@@ -55,7 +59,6 @@ class LockingTestCase(unittest.TestCase):
...
@@ -55,7 +59,6 @@ class LockingTestCase(unittest.TestCase):
lock
=
self
.
env
.
lock_get
(
anID
,
"some locked thing"
,
db
.
DB_LOCK_WRITE
)
lock
=
self
.
env
.
lock_get
(
anID
,
"some locked thing"
,
db
.
DB_LOCK_WRITE
)
if
verbose
:
if
verbose
:
print
"Aquired lock:
%
s"
%
lock
print
"Aquired lock:
%
s"
%
lock
time
.
sleep
(
1
)
self
.
env
.
lock_put
(
lock
)
self
.
env
.
lock_put
(
lock
)
if
verbose
:
if
verbose
:
print
"Released lock:
%
s"
%
lock
print
"Released lock:
%
s"
%
lock
...
@@ -70,38 +73,73 @@ class LockingTestCase(unittest.TestCase):
...
@@ -70,38 +73,73 @@ class LockingTestCase(unittest.TestCase):
threads
=
[]
threads
=
[]
threads
.
append
(
Thread
(
target
=
self
.
theThread
,
threads
.
append
(
Thread
(
target
=
self
.
theThread
,
args
=
(
5
,
db
.
DB_LOCK_WRITE
)))
args
=
(
db
.
DB_LOCK_WRITE
,
)))
threads
.
append
(
Thread
(
target
=
self
.
theThread
,
threads
.
append
(
Thread
(
target
=
self
.
theThread
,
args
=
(
1
,
db
.
DB_LOCK_READ
)))
args
=
(
db
.
DB_LOCK_READ
,
)))
threads
.
append
(
Thread
(
target
=
self
.
theThread
,
threads
.
append
(
Thread
(
target
=
self
.
theThread
,
args
=
(
1
,
db
.
DB_LOCK_READ
)))
args
=
(
db
.
DB_LOCK_READ
,
)))
threads
.
append
(
Thread
(
target
=
self
.
theThread
,
threads
.
append
(
Thread
(
target
=
self
.
theThread
,
args
=
(
1
,
db
.
DB_LOCK_WRITE
)))
args
=
(
db
.
DB_LOCK_WRITE
,
)))
threads
.
append
(
Thread
(
target
=
self
.
theThread
,
threads
.
append
(
Thread
(
target
=
self
.
theThread
,
args
=
(
1
,
db
.
DB_LOCK_READ
)))
args
=
(
db
.
DB_LOCK_READ
,
)))
threads
.
append
(
Thread
(
target
=
self
.
theThread
,
threads
.
append
(
Thread
(
target
=
self
.
theThread
,
args
=
(
1
,
db
.
DB_LOCK_READ
)))
args
=
(
db
.
DB_LOCK_READ
,
)))
threads
.
append
(
Thread
(
target
=
self
.
theThread
,
threads
.
append
(
Thread
(
target
=
self
.
theThread
,
args
=
(
1
,
db
.
DB_LOCK_WRITE
)))
args
=
(
db
.
DB_LOCK_WRITE
,
)))
threads
.
append
(
Thread
(
target
=
self
.
theThread
,
threads
.
append
(
Thread
(
target
=
self
.
theThread
,
args
=
(
1
,
db
.
DB_LOCK_WRITE
)))
args
=
(
db
.
DB_LOCK_WRITE
,
)))
threads
.
append
(
Thread
(
target
=
self
.
theThread
,
threads
.
append
(
Thread
(
target
=
self
.
theThread
,
args
=
(
1
,
db
.
DB_LOCK_WRITE
)))
args
=
(
db
.
DB_LOCK_WRITE
,
)))
for
t
in
threads
:
for
t
in
threads
:
t
.
setDaemon
(
True
)
t
.
start
()
t
.
start
()
for
t
in
threads
:
for
t
in
threads
:
t
.
join
()
t
.
join
()
def
test03_set_timeout
(
self
):
def
test03_lock_timeout
(
self
):
# test that the set_timeout call works
self
.
env
.
set_timeout
(
0
,
db
.
DB_SET_LOCK_TIMEOUT
)
if
hasattr
(
self
.
env
,
'set_timeout'
):
self
.
env
.
set_timeout
(
0
,
db
.
DB_SET_TXN_TIMEOUT
)
self
.
env
.
set_timeout
(
0
,
db
.
DB_SET_LOCK_TIMEOUT
)
self
.
env
.
set_timeout
(
123456
,
db
.
DB_SET_LOCK_TIMEOUT
)
self
.
env
.
set_timeout
(
0
,
db
.
DB_SET_TXN_TIMEOUT
)
self
.
env
.
set_timeout
(
7890123
,
db
.
DB_SET_TXN_TIMEOUT
)
self
.
env
.
set_timeout
(
123456
,
db
.
DB_SET_LOCK_TIMEOUT
)
self
.
env
.
set_timeout
(
7890123
,
db
.
DB_SET_TXN_TIMEOUT
)
def
deadlock_detection
()
:
while
not
deadlock_detection
.
end
:
deadlock_detection
.
count
=
\
self
.
env
.
lock_detect
(
db
.
DB_LOCK_EXPIRE
)
if
deadlock_detection
.
count
:
while
not
deadlock_detection
.
end
:
pass
break
time
.
sleep
(
0.01
)
deadlock_detection
.
end
=
False
deadlock_detection
.
count
=
0
t
=
Thread
(
target
=
deadlock_detection
)
t
.
setDaemon
(
True
)
t
.
start
()
self
.
env
.
set_timeout
(
100000
,
db
.
DB_SET_LOCK_TIMEOUT
)
anID
=
self
.
env
.
lock_id
()
anID2
=
self
.
env
.
lock_id
()
self
.
assertNotEqual
(
anID
,
anID2
)
lock
=
self
.
env
.
lock_get
(
anID
,
"shared lock"
,
db
.
DB_LOCK_WRITE
)
start_time
=
time
.
time
()
self
.
assertRaises
(
db
.
DBLockNotGrantedError
,
self
.
env
.
lock_get
,
anID2
,
"shared lock"
,
db
.
DB_LOCK_READ
)
end_time
=
time
.
time
()
deadlock_detection
.
end
=
True
self
.
assertTrue
((
end_time
-
start_time
)
>=
0.1
)
self
.
env
.
lock_put
(
lock
)
t
.
join
()
if
db
.
version
()
>=
(
4
,
0
):
self
.
env
.
lock_id_free
(
anID
)
self
.
env
.
lock_id_free
(
anID2
)
if
db
.
version
()
>=
(
4
,
6
):
self
.
assertTrue
(
deadlock_detection
.
count
>
0
)
def
theThread
(
self
,
sleepTime
,
lockType
):
def
theThread
(
self
,
lockType
):
name
=
currentThread
()
.
getName
()
name
=
currentThread
()
.
getName
()
if
lockType
==
db
.
DB_LOCK_WRITE
:
if
lockType
==
db
.
DB_LOCK_WRITE
:
lt
=
"write"
lt
=
"write"
...
@@ -112,15 +150,15 @@ class LockingTestCase(unittest.TestCase):
...
@@ -112,15 +150,15 @@ class LockingTestCase(unittest.TestCase):
if
verbose
:
if
verbose
:
print
"
%
s: locker ID:
%
s"
%
(
name
,
anID
)
print
"
%
s: locker ID:
%
s"
%
(
name
,
anID
)
lock
=
self
.
env
.
lock_get
(
anID
,
"some locked thing"
,
lockType
)
for
i
in
xrange
(
1000
)
:
if
verbose
:
lock
=
self
.
env
.
lock_get
(
anID
,
"some locked thing"
,
lockType
)
print
"
%
s: Aquired
%
s lock:
%
s"
%
(
name
,
lt
,
lock
)
if
verbose
:
print
"
%
s: Aquired
%
s lock:
%
s"
%
(
name
,
lt
,
lock
)
time
.
sleep
(
sleepTime
)
self
.
env
.
lock_put
(
lock
)
if
verbose
:
print
"
%
s: Released
%
s lock:
%
s"
%
(
name
,
lt
,
lock
)
self
.
env
.
lock_put
(
lock
)
if
verbose
:
print
"
%
s: Released
%
s lock:
%
s"
%
(
name
,
lt
,
lock
)
if
db
.
version
()
>=
(
4
,
0
):
if
db
.
version
()
>=
(
4
,
0
):
self
.
env
.
lock_id_free
(
anID
)
self
.
env
.
lock_id_free
(
anID
)
...
...
Lib/bsddb/test/test_misc.py
Dosyayı görüntüle @
18eb1fa2
...
@@ -3,7 +3,6 @@
...
@@ -3,7 +3,6 @@
import
os
import
os
import
unittest
import
unittest
import
tempfile
try
:
try
:
# For Pythons w/distutils pybsddb
# For Pythons w/distutils pybsddb
...
@@ -12,6 +11,8 @@ except ImportError:
...
@@ -12,6 +11,8 @@ except ImportError:
# For Python 2.3
# For Python 2.3
from
bsddb
import
db
,
dbshelve
,
hashopen
from
bsddb
import
db
,
dbshelve
,
hashopen
from
test_all
import
get_new_environment_path
,
get_new_database_path
try
:
try
:
from
bsddb3
import
test_support
from
bsddb3
import
test_support
except
ImportError
:
except
ImportError
:
...
@@ -22,12 +23,7 @@ except ImportError:
...
@@ -22,12 +23,7 @@ except ImportError:
class
MiscTestCase
(
unittest
.
TestCase
):
class
MiscTestCase
(
unittest
.
TestCase
):
def
setUp
(
self
):
def
setUp
(
self
):
self
.
filename
=
self
.
__class__
.
__name__
+
'.db'
self
.
filename
=
self
.
__class__
.
__name__
+
'.db'
homeDir
=
os
.
path
.
join
(
tempfile
.
gettempdir
(),
'db_home
%
d'
%
os
.
getpid
())
self
.
homeDir
=
get_new_environment_path
()
self
.
homeDir
=
homeDir
try
:
os
.
mkdir
(
homeDir
)
except
OSError
:
pass
def
tearDown
(
self
):
def
tearDown
(
self
):
test_support
.
unlink
(
self
.
filename
)
test_support
.
unlink
(
self
.
filename
)
...
@@ -41,9 +37,9 @@ class MiscTestCase(unittest.TestCase):
...
@@ -41,9 +37,9 @@ class MiscTestCase(unittest.TestCase):
def
test02_db_home
(
self
):
def
test02_db_home
(
self
):
env
=
db
.
DBEnv
()
env
=
db
.
DBEnv
()
# check for crash fixed when db_home is used before open()
# check for crash fixed when db_home is used before open()
assert
env
.
db_home
is
None
self
.
assert_
(
env
.
db_home
is
None
)
env
.
open
(
self
.
homeDir
,
db
.
DB_CREATE
)
env
.
open
(
self
.
homeDir
,
db
.
DB_CREATE
)
assert
self
.
homeDir
==
env
.
db_home
self
.
assertEqual
(
self
.
homeDir
,
env
.
db_home
)
def
test03_repr_closed_db
(
self
):
def
test03_repr_closed_db
(
self
):
db
=
hashopen
(
self
.
filename
)
db
=
hashopen
(
self
.
filename
)
...
@@ -93,7 +89,7 @@ class MiscTestCase(unittest.TestCase):
...
@@ -93,7 +89,7 @@ class MiscTestCase(unittest.TestCase):
def
test_DB_set_flags_persists
(
self
):
def
test_DB_set_flags_persists
(
self
):
if
db
.
version
()
<
(
4
,
2
):
if
db
.
version
()
<
(
4
,
2
):
# The get_flags API required for this to work is only available
# The get_flags API required for this to work is only available
# in BerkeleyDB >= 4.2
# in Berkeley
DB >= 4.2
return
return
try
:
try
:
db1
=
db
.
DB
()
db1
=
db
.
DB
()
...
...
Lib/bsddb/test/test_pickle.py
Dosyayı görüntüle @
18eb1fa2
...
@@ -6,7 +6,6 @@ try:
...
@@ -6,7 +6,6 @@ try:
except
ImportError
:
except
ImportError
:
cPickle
=
None
cPickle
=
None
import
unittest
import
unittest
import
tempfile
try
:
try
:
# For Pythons w/distutils pybsddb
# For Pythons w/distutils pybsddb
...
@@ -15,6 +14,8 @@ except ImportError, e:
...
@@ -15,6 +14,8 @@ except ImportError, e:
# For Python 2.3
# For Python 2.3
from
bsddb
import
db
from
bsddb
import
db
from
test_all
import
get_new_environment_path
,
get_new_database_path
try
:
try
:
from
bsddb3
import
test_support
from
bsddb3
import
test_support
except
ImportError
:
except
ImportError
:
...
@@ -25,14 +26,10 @@ except ImportError:
...
@@ -25,14 +26,10 @@ except ImportError:
class
pickleTestCase
(
unittest
.
TestCase
):
class
pickleTestCase
(
unittest
.
TestCase
):
"""Verify that DBError can be pickled and unpickled"""
"""Verify that DBError can be pickled and unpickled"""
db_home
=
'db_home'
db_name
=
'test-dbobj.db'
db_name
=
'test-dbobj.db'
def
setUp
(
self
):
def
setUp
(
self
):
homeDir
=
os
.
path
.
join
(
tempfile
.
gettempdir
(),
'db_home
%
d'
%
os
.
getpid
())
self
.
homeDir
=
get_new_environment_path
()
self
.
homeDir
=
homeDir
try
:
os
.
mkdir
(
homeDir
)
except
os
.
error
:
pass
def
tearDown
(
self
):
def
tearDown
(
self
):
if
hasattr
(
self
,
'db'
):
if
hasattr
(
self
,
'db'
):
...
@@ -47,7 +44,7 @@ class pickleTestCase(unittest.TestCase):
...
@@ -47,7 +44,7 @@ class pickleTestCase(unittest.TestCase):
self
.
db
=
db
.
DB
(
self
.
env
)
self
.
db
=
db
.
DB
(
self
.
env
)
self
.
db
.
open
(
self
.
db_name
,
db
.
DB_HASH
,
db
.
DB_CREATE
)
self
.
db
.
open
(
self
.
db_name
,
db
.
DB_HASH
,
db
.
DB_CREATE
)
self
.
db
.
put
(
'spam'
,
'eggs'
)
self
.
db
.
put
(
'spam'
,
'eggs'
)
assert
self
.
db
[
'spam'
]
==
'eggs'
self
.
assertEqual
(
self
.
db
[
'spam'
],
'eggs'
)
try
:
try
:
self
.
db
.
put
(
'spam'
,
'ham'
,
flags
=
db
.
DB_NOOVERWRITE
)
self
.
db
.
put
(
'spam'
,
'ham'
,
flags
=
db
.
DB_NOOVERWRITE
)
except
db
.
DBError
,
egg
:
except
db
.
DBError
,
egg
:
...
...
Lib/bsddb/test/test_queue.py
Dosyayı görüntüle @
18eb1fa2
...
@@ -3,7 +3,6 @@ TestCases for exercising a Queue DB.
...
@@ -3,7 +3,6 @@ TestCases for exercising a Queue DB.
"""
"""
import
os
,
string
import
os
,
string
import
tempfile
from
pprint
import
pprint
from
pprint
import
pprint
import
unittest
import
unittest
...
@@ -14,14 +13,14 @@ except ImportError:
...
@@ -14,14 +13,14 @@ except ImportError:
# For Python 2.3
# For Python 2.3
from
bsddb
import
db
from
bsddb
import
db
from
test_all
import
verbose
from
test_all
import
verbose
,
get_new_database_path
#----------------------------------------------------------------------
#----------------------------------------------------------------------
class
SimpleQueueTestCase
(
unittest
.
TestCase
):
class
SimpleQueueTestCase
(
unittest
.
TestCase
):
def
setUp
(
self
):
def
setUp
(
self
):
self
.
filename
=
tempfile
.
mktemp
()
self
.
filename
=
get_new_database_path
()
def
tearDown
(
self
):
def
tearDown
(
self
):
try
:
try
:
...
@@ -48,14 +47,14 @@ class SimpleQueueTestCase(unittest.TestCase):
...
@@ -48,14 +47,14 @@ class SimpleQueueTestCase(unittest.TestCase):
for
x
in
string
.
letters
:
for
x
in
string
.
letters
:
d
.
append
(
x
*
40
)
d
.
append
(
x
*
40
)
assert
len
(
d
)
==
52
self
.
assertEqual
(
len
(
d
),
52
)
d
.
put
(
100
,
"some more data"
)
d
.
put
(
100
,
"some more data"
)
d
.
put
(
101
,
"and some more "
)
d
.
put
(
101
,
"and some more "
)
d
.
put
(
75
,
"out of order"
)
d
.
put
(
75
,
"out of order"
)
d
.
put
(
1
,
"replacement data"
)
d
.
put
(
1
,
"replacement data"
)
assert
len
(
d
)
==
55
self
.
assertEqual
(
len
(
d
),
55
)
if
verbose
:
if
verbose
:
print
"before close"
+
'-'
*
30
print
"before close"
+
'-'
*
30
...
@@ -88,9 +87,9 @@ class SimpleQueueTestCase(unittest.TestCase):
...
@@ -88,9 +87,9 @@ class SimpleQueueTestCase(unittest.TestCase):
print
"after consume loop"
+
'-'
*
30
print
"after consume loop"
+
'-'
*
30
pprint
(
d
.
stat
())
pprint
(
d
.
stat
())
assert
len
(
d
)
==
0
,
\
self
.
assertEqual
(
len
(
d
),
0
,
\
"if you see this message then you need to rebuild "
\
"if you see this message then you need to rebuild "
\
"Berkeley
DB 3.1.17 with the patch in patches/qam_stat.diff"
"Berkeley
DB 3.1.17 with the patch in patches/qam_stat.diff"
)
d
.
close
()
d
.
close
()
...
@@ -120,14 +119,14 @@ class SimpleQueueTestCase(unittest.TestCase):
...
@@ -120,14 +119,14 @@ class SimpleQueueTestCase(unittest.TestCase):
for
x
in
string
.
letters
:
for
x
in
string
.
letters
:
d
.
append
(
x
*
40
)
d
.
append
(
x
*
40
)
assert
len
(
d
)
==
52
self
.
assertEqual
(
len
(
d
),
52
)
d
.
put
(
100
,
"some more data"
)
d
.
put
(
100
,
"some more data"
)
d
.
put
(
101
,
"and some more "
)
d
.
put
(
101
,
"and some more "
)
d
.
put
(
75
,
"out of order"
)
d
.
put
(
75
,
"out of order"
)
d
.
put
(
1
,
"replacement data"
)
d
.
put
(
1
,
"replacement data"
)
assert
len
(
d
)
==
55
self
.
assertEqual
(
len
(
d
),
55
)
if
verbose
:
if
verbose
:
print
"before close"
+
'-'
*
30
print
"before close"
+
'-'
*
30
...
...
Lib/bsddb/test/test_recno.py
Dosyayı görüntüle @
18eb1fa2
...
@@ -3,11 +3,10 @@
...
@@ -3,11 +3,10 @@
import
os
import
os
import
errno
import
errno
import
tempfile
from
pprint
import
pprint
from
pprint
import
pprint
import
unittest
import
unittest
from
test_all
import
verbose
from
test_all
import
verbose
,
get_new_environment_path
,
get_new_database_path
try
:
try
:
# For Pythons w/distutils pybsddb
# For Pythons w/distutils pybsddb
...
@@ -27,8 +26,13 @@ letters = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'
...
@@ -27,8 +26,13 @@ letters = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'
#----------------------------------------------------------------------
#----------------------------------------------------------------------
class
SimpleRecnoTestCase
(
unittest
.
TestCase
):
class
SimpleRecnoTestCase
(
unittest
.
TestCase
):
import
sys
if
sys
.
version_info
[:
3
]
<
(
2
,
4
,
0
):
def
assertFalse
(
self
,
expr
,
msg
=
None
):
self
.
failIf
(
expr
,
msg
=
msg
)
def
setUp
(
self
):
def
setUp
(
self
):
self
.
filename
=
tempfile
.
mktemp
()
self
.
filename
=
get_new_database_path
()
self
.
homeDir
=
None
self
.
homeDir
=
None
def
tearDown
(
self
):
def
tearDown
(
self
):
...
@@ -46,8 +50,8 @@ class SimpleRecnoTestCase(unittest.TestCase):
...
@@ -46,8 +50,8 @@ class SimpleRecnoTestCase(unittest.TestCase):
for
x
in
letters
:
for
x
in
letters
:
recno
=
d
.
append
(
x
*
60
)
recno
=
d
.
append
(
x
*
60
)
assert
type
(
recno
)
==
type
(
0
)
self
.
assertEqual
(
type
(
recno
),
type
(
0
)
)
assert
recno
>=
1
self
.
assert_
(
recno
>=
1
)
if
verbose
:
if
verbose
:
print
recno
,
print
recno
,
...
@@ -62,13 +66,13 @@ class SimpleRecnoTestCase(unittest.TestCase):
...
@@ -62,13 +66,13 @@ class SimpleRecnoTestCase(unittest.TestCase):
if
verbose
:
if
verbose
:
print
data
print
data
assert
type
(
data
)
==
type
(
""
)
self
.
assertEqual
(
type
(
data
),
type
(
""
)
)
assert
data
==
d
.
get
(
recno
)
self
.
assertEqual
(
data
,
d
.
get
(
recno
)
)
try
:
try
:
data
=
d
[
0
]
# This should raise a KeyError!?!?!
data
=
d
[
0
]
# This should raise a KeyError!?!?!
except
db
.
DBInvalidArgError
,
val
:
except
db
.
DBInvalidArgError
,
val
:
assert
val
[
0
]
==
db
.
EINVAL
self
.
assertEqual
(
val
[
0
],
db
.
EINVAL
)
if
verbose
:
print
val
if
verbose
:
print
val
else
:
else
:
self
.
fail
(
"expected exception"
)
self
.
fail
(
"expected exception"
)
...
@@ -94,35 +98,35 @@ class SimpleRecnoTestCase(unittest.TestCase):
...
@@ -94,35 +98,35 @@ class SimpleRecnoTestCase(unittest.TestCase):
if
get_returns_none
:
if
get_returns_none
:
self
.
fail
(
"unexpected exception"
)
self
.
fail
(
"unexpected exception"
)
else
:
else
:
assert
data
==
None
self
.
assertEqual
(
data
,
None
)
keys
=
d
.
keys
()
keys
=
d
.
keys
()
if
verbose
:
if
verbose
:
print
keys
print
keys
assert
type
(
keys
)
==
type
([]
)
self
.
assertEqual
(
type
(
keys
),
type
([])
)
assert
type
(
keys
[
0
])
==
type
(
123
)
self
.
assertEqual
(
type
(
keys
[
0
]),
type
(
123
)
)
assert
len
(
keys
)
==
len
(
d
)
self
.
assertEqual
(
len
(
keys
),
len
(
d
)
)
items
=
d
.
items
()
items
=
d
.
items
()
if
verbose
:
if
verbose
:
pprint
(
items
)
pprint
(
items
)
assert
type
(
items
)
==
type
([]
)
self
.
assertEqual
(
type
(
items
),
type
([])
)
assert
type
(
items
[
0
])
==
type
((
))
self
.
assertEqual
(
type
(
items
[
0
]),
type
(()
))
assert
len
(
items
[
0
])
==
2
self
.
assertEqual
(
len
(
items
[
0
]),
2
)
assert
type
(
items
[
0
][
0
])
==
type
(
123
)
self
.
assertEqual
(
type
(
items
[
0
][
0
]),
type
(
123
)
)
assert
type
(
items
[
0
][
1
])
==
type
(
""
)
self
.
assertEqual
(
type
(
items
[
0
][
1
]),
type
(
""
)
)
assert
len
(
items
)
==
len
(
d
)
self
.
assertEqual
(
len
(
items
),
len
(
d
)
)
assert
d
.
has_key
(
25
)
self
.
assert_
(
d
.
has_key
(
25
)
)
del
d
[
25
]
del
d
[
25
]
assert
not
d
.
has_key
(
25
)
self
.
assertFalse
(
d
.
has_key
(
25
)
)
d
.
delete
(
13
)
d
.
delete
(
13
)
assert
not
d
.
has_key
(
13
)
self
.
assertFalse
(
d
.
has_key
(
13
)
)
data
=
d
.
get_both
(
26
,
"z"
*
60
)
data
=
d
.
get_both
(
26
,
"z"
*
60
)
assert
data
==
"z"
*
60
,
'was
%
r'
%
data
self
.
assertEqual
(
data
,
"z"
*
60
,
'was
%
r'
%
data
)
if
verbose
:
if
verbose
:
print
data
print
data
...
@@ -146,7 +150,7 @@ class SimpleRecnoTestCase(unittest.TestCase):
...
@@ -146,7 +150,7 @@ class SimpleRecnoTestCase(unittest.TestCase):
c
.
set
(
50
)
c
.
set
(
50
)
rec
=
c
.
current
()
rec
=
c
.
current
()
assert
rec
==
(
50
,
"a replacement record"
)
self
.
assertEqual
(
rec
,
(
50
,
"a replacement record"
)
)
if
verbose
:
if
verbose
:
print
rec
print
rec
...
@@ -157,7 +161,7 @@ class SimpleRecnoTestCase(unittest.TestCase):
...
@@ -157,7 +161,7 @@ class SimpleRecnoTestCase(unittest.TestCase):
# test that non-existant key lookups work (and that
# test that non-existant key lookups work (and that
# DBC_set_range doesn't have a memleak under valgrind)
# DBC_set_range doesn't have a memleak under valgrind)
rec
=
c
.
set_range
(
999999
)
rec
=
c
.
set_range
(
999999
)
assert
rec
==
None
self
.
assertEqual
(
rec
,
None
)
if
verbose
:
if
verbose
:
print
rec
print
rec
...
@@ -170,7 +174,7 @@ class SimpleRecnoTestCase(unittest.TestCase):
...
@@ -170,7 +174,7 @@ class SimpleRecnoTestCase(unittest.TestCase):
# put a record beyond the consecutive end of the recno's
# put a record beyond the consecutive end of the recno's
d
[
100
]
=
"way out there"
d
[
100
]
=
"way out there"
assert
d
[
100
]
==
"way out there"
self
.
assertEqual
(
d
[
100
],
"way out there"
)
try
:
try
:
data
=
d
[
99
]
data
=
d
[
99
]
...
@@ -185,7 +189,7 @@ class SimpleRecnoTestCase(unittest.TestCase):
...
@@ -185,7 +189,7 @@ class SimpleRecnoTestCase(unittest.TestCase):
if
get_returns_none
:
if
get_returns_none
:
self
.
fail
(
"unexpected DBKeyEmptyError exception"
)
self
.
fail
(
"unexpected DBKeyEmptyError exception"
)
else
:
else
:
assert
val
[
0
]
==
db
.
DB_KEYEMPTY
self
.
assertEqual
(
val
[
0
],
db
.
DB_KEYEMPTY
)
if
verbose
:
print
val
if
verbose
:
print
val
else
:
else
:
if
not
get_returns_none
:
if
not
get_returns_none
:
...
@@ -207,7 +211,7 @@ class SimpleRecnoTestCase(unittest.TestCase):
...
@@ -207,7 +211,7 @@ class SimpleRecnoTestCase(unittest.TestCase):
just a line in the file, but you can set a different record delimiter
just a line in the file, but you can set a different record delimiter
if needed.
if needed.
"""
"""
homeDir
=
os
.
path
.
join
(
tempfile
.
gettempdir
(),
'db_home
%
d'
%
os
.
getpid
()
)
homeDir
=
get_new_environment_path
(
)
self
.
homeDir
=
homeDir
self
.
homeDir
=
homeDir
source
=
os
.
path
.
join
(
homeDir
,
'test_recno.txt'
)
source
=
os
.
path
.
join
(
homeDir
,
'test_recno.txt'
)
if
not
os
.
path
.
isdir
(
homeDir
):
if
not
os
.
path
.
isdir
(
homeDir
):
...
@@ -236,7 +240,7 @@ class SimpleRecnoTestCase(unittest.TestCase):
...
@@ -236,7 +240,7 @@ class SimpleRecnoTestCase(unittest.TestCase):
print
data
print
data
print
text
.
split
(
'
\n
'
)
print
text
.
split
(
'
\n
'
)
assert
text
.
split
(
'
\n
'
)
==
data
self
.
assertEqual
(
text
.
split
(
'
\n
'
),
data
)
# open as a DB again
# open as a DB again
d
=
db
.
DB
()
d
=
db
.
DB
()
...
@@ -255,8 +259,8 @@ class SimpleRecnoTestCase(unittest.TestCase):
...
@@ -255,8 +259,8 @@ class SimpleRecnoTestCase(unittest.TestCase):
print
text
print
text
print
text
.
split
(
'
\n
'
)
print
text
.
split
(
'
\n
'
)
assert
text
.
split
(
'
\n
'
)
==
\
self
.
assertEqual
(
text
.
split
(
'
\n
'
),
"The quick reddish-brown fox jumped over the comatose dog"
.
split
(
)
"The quick reddish-brown fox jumped over the comatose dog"
.
split
()
)
def
test03_FixedLength
(
self
):
def
test03_FixedLength
(
self
):
d
=
db
.
DB
()
d
=
db
.
DB
()
...
@@ -273,7 +277,7 @@ class SimpleRecnoTestCase(unittest.TestCase):
...
@@ -273,7 +277,7 @@ class SimpleRecnoTestCase(unittest.TestCase):
try
:
# this one will fail
try
:
# this one will fail
d
.
append
(
'bad'
*
20
)
d
.
append
(
'bad'
*
20
)
except
db
.
DBInvalidArgError
,
val
:
except
db
.
DBInvalidArgError
,
val
:
assert
val
[
0
]
==
db
.
EINVAL
self
.
assertEqual
(
val
[
0
],
db
.
EINVAL
)
if
verbose
:
print
val
if
verbose
:
print
val
else
:
else
:
self
.
fail
(
"expected exception"
)
self
.
fail
(
"expected exception"
)
...
...
Lib/bsddb/test/test_replication.py
0 → 100644
Dosyayı görüntüle @
18eb1fa2
"""TestCases for distributed transactions.
"""
import
os
import
unittest
try
:
# For Pythons w/distutils pybsddb
from
bsddb3
import
db
except
ImportError
:
# For Python 2.3
from
bsddb
import
db
from
test_all
import
get_new_environment_path
,
get_new_database_path
try
:
from
bsddb3
import
test_support
except
ImportError
:
from
test
import
test_support
from
test_all
import
verbose
#----------------------------------------------------------------------
class
DBReplicationManager
(
unittest
.
TestCase
):
import
sys
if
sys
.
version_info
[:
3
]
<
(
2
,
4
,
0
):
def
assertTrue
(
self
,
expr
,
msg
=
None
):
self
.
failUnless
(
expr
,
msg
=
msg
)
def
setUp
(
self
)
:
self
.
homeDirMaster
=
get_new_environment_path
()
self
.
homeDirClient
=
get_new_environment_path
()
self
.
dbenvMaster
=
db
.
DBEnv
()
self
.
dbenvClient
=
db
.
DBEnv
()
# Must use "DB_THREAD" because the Replication Manager will
# be executed in other threads but will use the same environment.
# http://forums.oracle.com/forums/thread.jspa?threadID=645788&tstart=0
self
.
dbenvMaster
.
open
(
self
.
homeDirMaster
,
db
.
DB_CREATE
|
db
.
DB_INIT_TXN
|
db
.
DB_INIT_LOG
|
db
.
DB_INIT_MPOOL
|
db
.
DB_INIT_LOCK
|
db
.
DB_INIT_REP
|
db
.
DB_RECOVER
|
db
.
DB_THREAD
,
0666
)
self
.
dbenvClient
.
open
(
self
.
homeDirClient
,
db
.
DB_CREATE
|
db
.
DB_INIT_TXN
|
db
.
DB_INIT_LOG
|
db
.
DB_INIT_MPOOL
|
db
.
DB_INIT_LOCK
|
db
.
DB_INIT_REP
|
db
.
DB_RECOVER
|
db
.
DB_THREAD
,
0666
)
self
.
confirmed_master
=
self
.
client_startupdone
=
False
def
confirmed_master
(
a
,
b
,
c
)
:
if
b
==
db
.
DB_EVENT_REP_MASTER
:
self
.
confirmed_master
=
True
def
client_startupdone
(
a
,
b
,
c
)
:
if
b
==
db
.
DB_EVENT_REP_STARTUPDONE
:
self
.
client_startupdone
=
True
self
.
dbenvMaster
.
set_event_notify
(
confirmed_master
)
self
.
dbenvClient
.
set_event_notify
(
client_startupdone
)
self
.
dbenvMaster
.
repmgr_set_local_site
(
"127.0.0.1"
,
46117
)
self
.
dbenvClient
.
repmgr_set_local_site
(
"127.0.0.1"
,
46118
)
self
.
dbenvMaster
.
repmgr_add_remote_site
(
"127.0.0.1"
,
46118
)
self
.
dbenvClient
.
repmgr_add_remote_site
(
"127.0.0.1"
,
46117
)
self
.
dbenvMaster
.
rep_set_nsites
(
2
)
self
.
dbenvClient
.
rep_set_nsites
(
2
)
self
.
dbenvMaster
.
rep_set_priority
(
10
)
self
.
dbenvClient
.
rep_set_priority
(
0
)
self
.
dbenvMaster
.
repmgr_set_ack_policy
(
db
.
DB_REPMGR_ACKS_ALL
)
self
.
dbenvClient
.
repmgr_set_ack_policy
(
db
.
DB_REPMGR_ACKS_ALL
)
self
.
dbenvMaster
.
repmgr_start
(
1
,
db
.
DB_REP_MASTER
);
self
.
dbenvClient
.
repmgr_start
(
1
,
db
.
DB_REP_CLIENT
);
self
.
assertEquals
(
self
.
dbenvMaster
.
rep_get_nsites
(),
2
)
self
.
assertEquals
(
self
.
dbenvClient
.
rep_get_nsites
(),
2
)
self
.
assertEquals
(
self
.
dbenvMaster
.
rep_get_priority
(),
10
)
self
.
assertEquals
(
self
.
dbenvClient
.
rep_get_priority
(),
0
)
self
.
assertEquals
(
self
.
dbenvMaster
.
repmgr_get_ack_policy
(),
db
.
DB_REPMGR_ACKS_ALL
)
self
.
assertEquals
(
self
.
dbenvClient
.
repmgr_get_ack_policy
(),
db
.
DB_REPMGR_ACKS_ALL
)
#self.dbenvMaster.set_verbose(db.DB_VERB_REPLICATION, True)
#self.dbenvMaster.set_verbose(db.DB_VERB_FILEOPS_ALL, True)
#self.dbenvClient.set_verbose(db.DB_VERB_REPLICATION, True)
#self.dbenvClient.set_verbose(db.DB_VERB_FILEOPS_ALL, True)
self
.
dbMaster
=
self
.
dbClient
=
None
# The timeout is necessary in BDB 4.5, since DB_EVENT_REP_STARTUPDONE
# is not generated if the master has no new transactions.
# This is solved in BDB 4.6 (#15542).
import
time
timeout
=
time
.
time
()
+
2
while
(
time
.
time
()
<
timeout
)
and
not
(
self
.
confirmed_master
and
self
.
client_startupdone
)
:
time
.
sleep
(
0.001
)
if
db
.
version
()
>=
(
4
,
6
)
:
self
.
assertTrue
(
time
.
time
()
<
timeout
)
else
:
self
.
assertTrue
(
time
.
time
()
>=
timeout
)
d
=
self
.
dbenvMaster
.
repmgr_site_list
()
self
.
assertEquals
(
len
(
d
),
1
)
self
.
assertEquals
(
d
[
0
][
0
],
"127.0.0.1"
)
self
.
assertEquals
(
d
[
0
][
1
],
46118
)
self
.
assertTrue
((
d
[
0
][
2
]
==
db
.
DB_REPMGR_CONNECTED
)
or
\
(
d
[
0
][
2
]
==
db
.
DB_REPMGR_DISCONNECTED
))
d
=
self
.
dbenvClient
.
repmgr_site_list
()
self
.
assertEquals
(
len
(
d
),
1
)
self
.
assertEquals
(
d
[
0
][
0
],
"127.0.0.1"
)
self
.
assertEquals
(
d
[
0
][
1
],
46117
)
self
.
assertTrue
((
d
[
0
][
2
]
==
db
.
DB_REPMGR_CONNECTED
)
or
\
(
d
[
0
][
2
]
==
db
.
DB_REPMGR_DISCONNECTED
))
if
db
.
version
()
>=
(
4
,
6
)
:
d
=
self
.
dbenvMaster
.
repmgr_stat
(
flags
=
db
.
DB_STAT_CLEAR
);
self
.
assertTrue
(
"msgs_queued"
in
d
)
def
tearDown
(
self
):
if
self
.
dbClient
:
self
.
dbClient
.
close
()
if
self
.
dbMaster
:
self
.
dbMaster
.
close
()
self
.
dbenvClient
.
close
()
self
.
dbenvMaster
.
close
()
test_support
.
rmtree
(
self
.
homeDirClient
)
test_support
.
rmtree
(
self
.
homeDirMaster
)
def
test01_basic_replication
(
self
)
:
self
.
dbMaster
=
db
.
DB
(
self
.
dbenvMaster
)
txn
=
self
.
dbenvMaster
.
txn_begin
()
self
.
dbMaster
.
open
(
"test"
,
db
.
DB_HASH
,
db
.
DB_CREATE
,
0666
,
txn
=
txn
)
txn
.
commit
()
import
time
,
os
.
path
timeout
=
time
.
time
()
+
10
while
(
time
.
time
()
<
timeout
)
and
\
not
(
os
.
path
.
exists
(
os
.
path
.
join
(
self
.
homeDirClient
,
"test"
)))
:
time
.
sleep
(
0.01
)
self
.
dbClient
=
db
.
DB
(
self
.
dbenvClient
)
while
True
:
txn
=
self
.
dbenvClient
.
txn_begin
()
try
:
self
.
dbClient
.
open
(
"test"
,
db
.
DB_HASH
,
flags
=
db
.
DB_RDONLY
,
mode
=
0666
,
txn
=
txn
)
except
db
.
DBRepHandleDeadError
:
txn
.
abort
()
self
.
dbClient
.
close
()
self
.
dbClient
=
db
.
DB
(
self
.
dbenvClient
)
continue
txn
.
commit
()
break
txn
=
self
.
dbenvMaster
.
txn_begin
()
self
.
dbMaster
.
put
(
"ABC"
,
"123"
,
txn
=
txn
)
txn
.
commit
()
import
time
timeout
=
time
.
time
()
+
1
v
=
None
while
(
time
.
time
()
<
timeout
)
and
(
v
==
None
)
:
txn
=
self
.
dbenvClient
.
txn_begin
()
v
=
self
.
dbClient
.
get
(
"ABC"
,
txn
=
txn
)
txn
.
commit
()
self
.
assertEquals
(
"123"
,
v
)
txn
=
self
.
dbenvMaster
.
txn_begin
()
self
.
dbMaster
.
delete
(
"ABC"
,
txn
=
txn
)
txn
.
commit
()
timeout
=
time
.
time
()
+
1
while
(
time
.
time
()
<
timeout
)
and
(
v
!=
None
)
:
txn
=
self
.
dbenvClient
.
txn_begin
()
v
=
self
.
dbClient
.
get
(
"ABC"
,
txn
=
txn
)
txn
.
commit
()
self
.
assertEquals
(
None
,
v
)
#----------------------------------------------------------------------
def
test_suite
():
suite
=
unittest
.
TestSuite
()
if
db
.
version
()
>=
(
4
,
5
)
:
dbenv
=
db
.
DBEnv
()
try
:
dbenv
.
repmgr_get_ack_policy
()
ReplicationManager_available
=
True
except
:
ReplicationManager_available
=
False
dbenv
.
close
()
del
dbenv
if
ReplicationManager_available
:
suite
.
addTest
(
unittest
.
makeSuite
(
DBReplicationManager
))
return
suite
if
__name__
==
'__main__'
:
unittest
.
main
(
defaultTest
=
'test_suite'
)
Lib/bsddb/test/test_sequence.py
Dosyayı görüntüle @
18eb1fa2
import
unittest
import
unittest
import
os
import
os
import
tempfile
try
:
try
:
# For Pythons w/distutils pybsddb
# For Pythons w/distutils pybsddb
...
@@ -13,18 +12,19 @@ try:
...
@@ -13,18 +12,19 @@ try:
except
ImportError
:
except
ImportError
:
from
test
import
test_support
from
test
import
test_support
from
test_all
import
get_new_environment_path
,
get_new_database_path
class
DBSequenceTest
(
unittest
.
TestCase
):
class
DBSequenceTest
(
unittest
.
TestCase
):
import
sys
if
sys
.
version_info
[:
3
]
<
(
2
,
4
,
0
):
def
assertTrue
(
self
,
expr
,
msg
=
None
):
self
.
failUnless
(
expr
,
msg
=
msg
)
def
setUp
(
self
):
def
setUp
(
self
):
self
.
int_32_max
=
0x100000000
self
.
int_32_max
=
0x100000000
self
.
homeDir
=
os
.
path
.
join
(
tempfile
.
gettempdir
(),
'db_home
%
d'
%
os
.
getpid
())
self
.
homeDir
=
get_new_environment_path
()
try
:
self
.
filename
=
"test"
os
.
mkdir
(
self
.
homeDir
)
except
os
.
error
:
pass
tempfile
.
tempdir
=
self
.
homeDir
self
.
filename
=
os
.
path
.
split
(
tempfile
.
mktemp
())[
1
]
tempfile
.
tempdir
=
None
self
.
dbenv
=
db
.
DBEnv
()
self
.
dbenv
=
db
.
DBEnv
()
self
.
dbenv
.
open
(
self
.
homeDir
,
db
.
DB_CREATE
|
db
.
DB_INIT_MPOOL
,
0666
)
self
.
dbenv
.
open
(
self
.
homeDir
,
db
.
DB_CREATE
|
db
.
DB_INIT_MPOOL
,
0666
)
...
@@ -100,6 +100,52 @@ class DBSequenceTest(unittest.TestCase):
...
@@ -100,6 +100,52 @@ class DBSequenceTest(unittest.TestCase):
'flags'
,
'cache_size'
,
'last_value'
,
'wait'
):
'flags'
,
'cache_size'
,
'last_value'
,
'wait'
):
self
.
assertTrue
(
param
in
stat
,
"parameter
%
s isn't in stat info"
%
param
)
self
.
assertTrue
(
param
in
stat
,
"parameter
%
s isn't in stat info"
%
param
)
if
db
.
version
()
>=
(
4
,
7
)
:
# This code checks a crash solved in Berkeley DB 4.7
def
test_stat_crash
(
self
)
:
d
=
db
.
DB
()
d
.
open
(
None
,
dbtype
=
db
.
DB_HASH
,
flags
=
db
.
DB_CREATE
)
# In RAM
seq
=
db
.
DBSequence
(
d
,
flags
=
0
)
self
.
assertRaises
(
db
.
DBNotFoundError
,
seq
.
open
,
key
=
'id'
,
txn
=
None
,
flags
=
0
)
self
.
assertRaises
(
db
.
DBNotFoundError
,
seq
.
stat
)
d
.
close
()
def
test_64bits
(
self
)
:
value_plus
=
(
1L
<<
63
)
-
1
self
.
assertEquals
(
9223372036854775807L
,
value_plus
)
value_minus
=-
1L
<<
63
# Two complement
self
.
assertEquals
(
-
9223372036854775808L
,
value_minus
)
if
db
.
version
()
<
(
4
,
4
):
# We don't use both extremes because it is
# problematic in Berkeley DB 4.3.
value_plus
-=
1
value_minus
+=
1
self
.
seq
=
db
.
DBSequence
(
self
.
d
,
flags
=
0
)
self
.
assertEquals
(
None
,
self
.
seq
.
init_value
(
value_plus
-
1
))
self
.
assertEquals
(
None
,
self
.
seq
.
open
(
key
=
'id'
,
txn
=
None
,
flags
=
db
.
DB_CREATE
))
self
.
assertEquals
(
value_plus
-
1
,
self
.
seq
.
get
(
1
))
self
.
assertEquals
(
value_plus
,
self
.
seq
.
get
(
1
))
self
.
seq
.
remove
(
txn
=
None
,
flags
=
0
)
self
.
seq
=
db
.
DBSequence
(
self
.
d
,
flags
=
0
)
self
.
assertEquals
(
None
,
self
.
seq
.
init_value
(
value_minus
))
self
.
assertEquals
(
None
,
self
.
seq
.
open
(
key
=
'id'
,
txn
=
None
,
flags
=
db
.
DB_CREATE
))
self
.
assertEquals
(
value_minus
,
self
.
seq
.
get
(
1
))
self
.
assertEquals
(
value_minus
+
1
,
self
.
seq
.
get
(
1
))
def
test_multiple_close
(
self
):
self
.
seq
=
db
.
DBSequence
(
self
.
d
)
self
.
seq
.
close
()
# You can close a Sequence multiple times
self
.
seq
.
close
()
self
.
seq
.
close
()
def
test_suite
():
def
test_suite
():
suite
=
unittest
.
TestSuite
()
suite
=
unittest
.
TestSuite
()
if
db
.
version
()
>=
(
4
,
3
):
if
db
.
version
()
>=
(
4
,
3
):
...
...
Lib/bsddb/test/test_thread.py
Dosyayı görüntüle @
18eb1fa2
This diff is collapsed.
Click to expand it.
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment