Skip to content
Projeler
Gruplar
Parçacıklar
Yardım
Yükleniyor...
Oturum aç / Kaydol
Gezinmeyi değiştir
C
cpython
Proje
Proje
Ayrıntılar
Etkinlik
Cycle Analytics
Depo (repository)
Depo (repository)
Dosyalar
Kayıtlar (commit)
Dallar (branch)
Etiketler
Katkıda bulunanlar
Grafik
Karşılaştır
Grafikler
Konular (issue)
0
Konular (issue)
0
Liste
Pano
Etiketler
Kilometre Taşları
Birleştirme (merge) Talepleri
0
Birleştirme (merge) Talepleri
0
CI / CD
CI / CD
İş akışları (pipeline)
İşler
Zamanlamalar
Grafikler
Paketler
Paketler
Wiki
Wiki
Parçacıklar
Parçacıklar
Üyeler
Üyeler
Collapse sidebar
Close sidebar
Etkinlik
Grafik
Grafikler
Yeni bir konu (issue) oluştur
İşler
Kayıtlar (commit)
Konu (issue) Panoları
Kenar çubuğunu aç
Batuhan Osman TASKAYA
cpython
Commits
3844fe5e
Kaydet (Commit)
3844fe5e
authored
Eyl 26, 2015
tarafından
Victor Stinner
Dosyalara gözat
Seçenekler
Dosyalara Gözat
İndir
Eposta Yamaları
Sade Fark
Issue #25220: Move most regrtest.py code to libregrtest
üst
c22eee6b
Hide whitespace changes
Inline
Side-by-side
Showing
8 changed files
with
1318 additions
and
1272 deletions
+1318
-1272
__init__.py
Lib/test/libregrtest/__init__.py
+1
-0
cmdline.py
Lib/test/libregrtest/cmdline.py
+1
-1
main.py
Lib/test/libregrtest/main.py
+547
-0
refleak.py
Lib/test/libregrtest/refleak.py
+165
-0
runtest.py
Lib/test/libregrtest/runtest.py
+271
-0
save_env.py
Lib/test/libregrtest/save_env.py
+284
-0
regrtest.py
Lib/test/regrtest.py
+1
-1224
test_regrtest.py
Lib/test/test_regrtest.py
+48
-47
No files found.
Lib/test/libregrtest/__init__.py
Dosyayı görüntüle @
3844fe5e
from
test.libregrtest.cmdline
import
_parse_args
,
RESOURCE_NAMES
from
test.libregrtest.cmdline
import
_parse_args
,
RESOURCE_NAMES
from
test.libregrtest.main
import
main_in_temp_cwd
Lib/test/libregrtest/cmdline.py
Dosyayı görüntüle @
3844fe5e
import
argparse
import
argparse
import
faulthandler
import
faulthandler
import
os
import
os
from
test
import
support
from
test
import
support
USAGE
=
"""
\
USAGE
=
"""
\
python -m test [options] [test_name1 [test_name2 ...]]
python -m test [options] [test_name1 [test_name2 ...]]
python path/to/Lib/test/regrtest.py [options] [test_name1 [test_name2 ...]]
python path/to/Lib/test/regrtest.py [options] [test_name1 [test_name2 ...]]
...
...
Lib/test/libregrtest/main.py
0 → 100644
Dosyayı görüntüle @
3844fe5e
import
faulthandler
import
json
import
os
import
re
import
sys
import
tempfile
import
sysconfig
import
signal
import
random
import
platform
import
traceback
import
unittest
from
test.libregrtest.runtest
import
(
findtests
,
runtest
,
run_test_in_subprocess
,
STDTESTS
,
NOTTESTS
,
PASSED
,
FAILED
,
ENV_CHANGED
,
SKIPPED
,
RESOURCE_DENIED
,
INTERRUPTED
,
CHILD_ERROR
)
from
test.libregrtest.refleak
import
warm_caches
from
test.libregrtest.cmdline
import
_parse_args
from
test
import
support
try
:
import
threading
except
ImportError
:
threading
=
None
# Some times __path__ and __file__ are not absolute (e.g. while running from
# Lib/) and, if we change the CWD to run the tests in a temporary dir, some
# imports might fail. This affects only the modules imported before os.chdir().
# These modules are searched first in sys.path[0] (so '' -- the CWD) and if
# they are found in the CWD their __file__ and __path__ will be relative (this
# happens before the chdir). All the modules imported after the chdir, are
# not found in the CWD, and since the other paths in sys.path[1:] are absolute
# (site.py absolutize them), the __file__ and __path__ will be absolute too.
# Therefore it is necessary to absolutize manually the __file__ and __path__ of
# the packages to prevent later imports to fail when the CWD is different.
for
module
in
sys
.
modules
.
values
():
if
hasattr
(
module
,
'__path__'
):
module
.
__path__
=
[
os
.
path
.
abspath
(
path
)
for
path
in
module
.
__path__
]
if
hasattr
(
module
,
'__file__'
):
module
.
__file__
=
os
.
path
.
abspath
(
module
.
__file__
)
# MacOSX (a.k.a. Darwin) has a default stack size that is too small
# for deeply recursive regular expressions. We see this as crashes in
# the Python test suite when running test_re.py and test_sre.py. The
# fix is to set the stack limit to 2048.
# This approach may also be useful for other Unixy platforms that
# suffer from small default stack limits.
if
sys
.
platform
==
'darwin'
:
try
:
import
resource
except
ImportError
:
pass
else
:
soft
,
hard
=
resource
.
getrlimit
(
resource
.
RLIMIT_STACK
)
newsoft
=
min
(
hard
,
max
(
soft
,
1024
*
2048
))
resource
.
setrlimit
(
resource
.
RLIMIT_STACK
,
(
newsoft
,
hard
))
# When tests are run from the Python build directory, it is best practice
# to keep the test files in a subfolder. This eases the cleanup of leftover
# files using the "make distclean" command.
if
sysconfig
.
is_python_build
():
TEMPDIR
=
os
.
path
.
join
(
sysconfig
.
get_config_var
(
'srcdir'
),
'build'
)
else
:
TEMPDIR
=
tempfile
.
gettempdir
()
TEMPDIR
=
os
.
path
.
abspath
(
TEMPDIR
)
def
main
(
tests
=
None
,
**
kwargs
):
"""Execute a test suite.
This also parses command-line options and modifies its behavior
accordingly.
tests -- a list of strings containing test names (optional)
testdir -- the directory in which to look for tests (optional)
Users other than the Python test suite will certainly want to
specify testdir; if it's omitted, the directory containing the
Python test suite is searched for.
If the tests argument is omitted, the tests listed on the
command-line will be used. If that's empty, too, then all *.py
files beginning with test_ will be used.
The other default arguments (verbose, quiet, exclude,
single, randomize, findleaks, use_resources, trace, coverdir,
print_slow, and random_seed) allow programmers calling main()
directly to set the values that would normally be set by flags
on the command line.
"""
# Display the Python traceback on fatal errors (e.g. segfault)
faulthandler
.
enable
(
all_threads
=
True
)
# Display the Python traceback on SIGALRM or SIGUSR1 signal
signals
=
[]
if
hasattr
(
signal
,
'SIGALRM'
):
signals
.
append
(
signal
.
SIGALRM
)
if
hasattr
(
signal
,
'SIGUSR1'
):
signals
.
append
(
signal
.
SIGUSR1
)
for
signum
in
signals
:
faulthandler
.
register
(
signum
,
chain
=
True
)
replace_stdout
()
support
.
record_original_stdout
(
sys
.
stdout
)
ns
=
_parse_args
(
sys
.
argv
[
1
:],
**
kwargs
)
if
ns
.
huntrleaks
:
# Avoid false positives due to various caches
# filling slowly with random data:
warm_caches
()
if
ns
.
memlimit
is
not
None
:
support
.
set_memlimit
(
ns
.
memlimit
)
if
ns
.
threshold
is
not
None
:
import
gc
gc
.
set_threshold
(
ns
.
threshold
)
if
ns
.
nowindows
:
import
msvcrt
msvcrt
.
SetErrorMode
(
msvcrt
.
SEM_FAILCRITICALERRORS
|
msvcrt
.
SEM_NOALIGNMENTFAULTEXCEPT
|
msvcrt
.
SEM_NOGPFAULTERRORBOX
|
msvcrt
.
SEM_NOOPENFILEERRORBOX
)
try
:
msvcrt
.
CrtSetReportMode
except
AttributeError
:
# release build
pass
else
:
for
m
in
[
msvcrt
.
CRT_WARN
,
msvcrt
.
CRT_ERROR
,
msvcrt
.
CRT_ASSERT
]:
msvcrt
.
CrtSetReportMode
(
m
,
msvcrt
.
CRTDBG_MODE_FILE
)
msvcrt
.
CrtSetReportFile
(
m
,
msvcrt
.
CRTDBG_FILE_STDERR
)
if
ns
.
wait
:
input
(
"Press any key to continue..."
)
if
ns
.
slaveargs
is
not
None
:
args
,
kwargs
=
json
.
loads
(
ns
.
slaveargs
)
if
kwargs
.
get
(
'huntrleaks'
):
unittest
.
BaseTestSuite
.
_cleanup
=
False
try
:
result
=
runtest
(
*
args
,
**
kwargs
)
except
KeyboardInterrupt
:
result
=
INTERRUPTED
,
''
except
BaseException
as
e
:
traceback
.
print_exc
()
result
=
CHILD_ERROR
,
str
(
e
)
sys
.
stdout
.
flush
()
print
()
# Force a newline (just in case)
print
(
json
.
dumps
(
result
))
sys
.
exit
(
0
)
good
=
[]
bad
=
[]
skipped
=
[]
resource_denieds
=
[]
environment_changed
=
[]
interrupted
=
False
if
ns
.
findleaks
:
try
:
import
gc
except
ImportError
:
print
(
'No GC available, disabling findleaks.'
)
ns
.
findleaks
=
False
else
:
# Uncomment the line below to report garbage that is not
# freeable by reference counting alone. By default only
# garbage that is not collectable by the GC is reported.
#gc.set_debug(gc.DEBUG_SAVEALL)
found_garbage
=
[]
if
ns
.
huntrleaks
:
unittest
.
BaseTestSuite
.
_cleanup
=
False
if
ns
.
single
:
filename
=
os
.
path
.
join
(
TEMPDIR
,
'pynexttest'
)
try
:
with
open
(
filename
,
'r'
)
as
fp
:
next_test
=
fp
.
read
()
.
strip
()
tests
=
[
next_test
]
except
OSError
:
pass
if
ns
.
fromfile
:
tests
=
[]
with
open
(
os
.
path
.
join
(
support
.
SAVEDCWD
,
ns
.
fromfile
))
as
fp
:
count_pat
=
re
.
compile
(
r'\[\s*\d+/\s*\d+\]'
)
for
line
in
fp
:
line
=
count_pat
.
sub
(
''
,
line
)
guts
=
line
.
split
()
# assuming no test has whitespace in its name
if
guts
and
not
guts
[
0
]
.
startswith
(
'#'
):
tests
.
extend
(
guts
)
# Strip .py extensions.
removepy
(
ns
.
args
)
removepy
(
tests
)
stdtests
=
STDTESTS
[:]
nottests
=
NOTTESTS
.
copy
()
if
ns
.
exclude
:
for
arg
in
ns
.
args
:
if
arg
in
stdtests
:
stdtests
.
remove
(
arg
)
nottests
.
add
(
arg
)
ns
.
args
=
[]
# For a partial run, we do not need to clutter the output.
if
ns
.
verbose
or
ns
.
header
or
not
(
ns
.
quiet
or
ns
.
single
or
tests
or
ns
.
args
):
# Print basic platform information
print
(
"=="
,
platform
.
python_implementation
(),
*
sys
.
version
.
split
())
print
(
"== "
,
platform
.
platform
(
aliased
=
True
),
"
%
s-endian"
%
sys
.
byteorder
)
print
(
"== "
,
"hash algorithm:"
,
sys
.
hash_info
.
algorithm
,
"64bit"
if
sys
.
maxsize
>
2
**
32
else
"32bit"
)
print
(
"== "
,
os
.
getcwd
())
print
(
"Testing with flags:"
,
sys
.
flags
)
# if testdir is set, then we are not running the python tests suite, so
# don't add default tests to be executed or skipped (pass empty values)
if
ns
.
testdir
:
alltests
=
findtests
(
ns
.
testdir
,
list
(),
set
())
else
:
alltests
=
findtests
(
ns
.
testdir
,
stdtests
,
nottests
)
selected
=
tests
or
ns
.
args
or
alltests
if
ns
.
single
:
selected
=
selected
[:
1
]
try
:
next_single_test
=
alltests
[
alltests
.
index
(
selected
[
0
])
+
1
]
except
IndexError
:
next_single_test
=
None
# Remove all the selected tests that precede start if it's set.
if
ns
.
start
:
try
:
del
selected
[:
selected
.
index
(
ns
.
start
)]
except
ValueError
:
print
(
"Couldn't find starting test (
%
s), using all tests"
%
ns
.
start
)
if
ns
.
randomize
:
if
ns
.
random_seed
is
None
:
ns
.
random_seed
=
random
.
randrange
(
10000000
)
random
.
seed
(
ns
.
random_seed
)
print
(
"Using random seed"
,
ns
.
random_seed
)
random
.
shuffle
(
selected
)
if
ns
.
trace
:
import
trace
,
tempfile
tracer
=
trace
.
Trace
(
ignoredirs
=
[
sys
.
base_prefix
,
sys
.
base_exec_prefix
,
tempfile
.
gettempdir
()],
trace
=
False
,
count
=
True
)
test_times
=
[]
support
.
verbose
=
ns
.
verbose
# Tell tests to be moderately quiet
support
.
use_resources
=
ns
.
use_resources
save_modules
=
sys
.
modules
.
keys
()
def
accumulate_result
(
test
,
result
):
ok
,
test_time
=
result
test_times
.
append
((
test_time
,
test
))
if
ok
==
PASSED
:
good
.
append
(
test
)
elif
ok
==
FAILED
:
bad
.
append
(
test
)
elif
ok
==
ENV_CHANGED
:
environment_changed
.
append
(
test
)
elif
ok
==
SKIPPED
:
skipped
.
append
(
test
)
elif
ok
==
RESOURCE_DENIED
:
skipped
.
append
(
test
)
resource_denieds
.
append
(
test
)
if
ns
.
forever
:
def
test_forever
(
tests
=
list
(
selected
)):
while
True
:
for
test
in
tests
:
yield
test
if
bad
:
return
tests
=
test_forever
()
test_count
=
''
test_count_width
=
3
else
:
tests
=
iter
(
selected
)
test_count
=
'/{}'
.
format
(
len
(
selected
))
test_count_width
=
len
(
test_count
)
-
1
if
ns
.
use_mp
:
try
:
from
threading
import
Thread
except
ImportError
:
print
(
"Multiprocess option requires thread support"
)
sys
.
exit
(
2
)
from
queue
import
Queue
debug_output_pat
=
re
.
compile
(
r"\[\d+ refs, \d+ blocks\]$"
)
output
=
Queue
()
pending
=
MultiprocessTests
(
tests
)
def
work
():
# A worker thread.
try
:
while
True
:
try
:
test
=
next
(
pending
)
except
StopIteration
:
output
.
put
((
None
,
None
,
None
,
None
))
return
retcode
,
stdout
,
stderr
=
run_test_in_subprocess
(
test
,
ns
)
# Strip last refcount output line if it exists, since it
# comes from the shutdown of the interpreter in the subcommand.
stderr
=
debug_output_pat
.
sub
(
""
,
stderr
)
stdout
,
_
,
result
=
stdout
.
strip
()
.
rpartition
(
"
\n
"
)
if
retcode
!=
0
:
result
=
(
CHILD_ERROR
,
"Exit code
%
s"
%
retcode
)
output
.
put
((
test
,
stdout
.
rstrip
(),
stderr
.
rstrip
(),
result
))
return
if
not
result
:
output
.
put
((
None
,
None
,
None
,
None
))
return
result
=
json
.
loads
(
result
)
output
.
put
((
test
,
stdout
.
rstrip
(),
stderr
.
rstrip
(),
result
))
except
BaseException
:
output
.
put
((
None
,
None
,
None
,
None
))
raise
workers
=
[
Thread
(
target
=
work
)
for
i
in
range
(
ns
.
use_mp
)]
for
worker
in
workers
:
worker
.
start
()
finished
=
0
test_index
=
1
try
:
while
finished
<
ns
.
use_mp
:
test
,
stdout
,
stderr
,
result
=
output
.
get
()
if
test
is
None
:
finished
+=
1
continue
accumulate_result
(
test
,
result
)
if
not
ns
.
quiet
:
fmt
=
"[{1:{0}}{2}/{3}] {4}"
if
bad
else
"[{1:{0}}{2}] {4}"
print
(
fmt
.
format
(
test_count_width
,
test_index
,
test_count
,
len
(
bad
),
test
))
if
stdout
:
print
(
stdout
)
if
stderr
:
print
(
stderr
,
file
=
sys
.
stderr
)
sys
.
stdout
.
flush
()
sys
.
stderr
.
flush
()
if
result
[
0
]
==
INTERRUPTED
:
raise
KeyboardInterrupt
if
result
[
0
]
==
CHILD_ERROR
:
raise
Exception
(
"Child error on {}: {}"
.
format
(
test
,
result
[
1
]))
test_index
+=
1
except
KeyboardInterrupt
:
interrupted
=
True
pending
.
interrupted
=
True
for
worker
in
workers
:
worker
.
join
()
else
:
for
test_index
,
test
in
enumerate
(
tests
,
1
):
if
not
ns
.
quiet
:
fmt
=
"[{1:{0}}{2}/{3}] {4}"
if
bad
else
"[{1:{0}}{2}] {4}"
print
(
fmt
.
format
(
test_count_width
,
test_index
,
test_count
,
len
(
bad
),
test
))
sys
.
stdout
.
flush
()
if
ns
.
trace
:
# If we're tracing code coverage, then we don't exit with status
# if on a false return value from main.
tracer
.
runctx
(
'runtest(test, ns.verbose, ns.quiet, timeout=ns.timeout)'
,
globals
=
globals
(),
locals
=
vars
())
else
:
try
:
result
=
runtest
(
test
,
ns
.
verbose
,
ns
.
quiet
,
ns
.
huntrleaks
,
output_on_failure
=
ns
.
verbose3
,
timeout
=
ns
.
timeout
,
failfast
=
ns
.
failfast
,
match_tests
=
ns
.
match_tests
)
accumulate_result
(
test
,
result
)
except
KeyboardInterrupt
:
interrupted
=
True
break
if
ns
.
findleaks
:
gc
.
collect
()
if
gc
.
garbage
:
print
(
"Warning: test created"
,
len
(
gc
.
garbage
),
end
=
' '
)
print
(
"uncollectable object(s)."
)
# move the uncollectable objects somewhere so we don't see
# them again
found_garbage
.
extend
(
gc
.
garbage
)
del
gc
.
garbage
[:]
# Unload the newly imported modules (best effort finalization)
for
module
in
sys
.
modules
.
keys
():
if
module
not
in
save_modules
and
module
.
startswith
(
"test."
):
support
.
unload
(
module
)
if
interrupted
:
# print a newline after ^C
print
()
print
(
"Test suite interrupted by signal SIGINT."
)
omitted
=
set
(
selected
)
-
set
(
good
)
-
set
(
bad
)
-
set
(
skipped
)
print
(
count
(
len
(
omitted
),
"test"
),
"omitted:"
)
printlist
(
omitted
)
if
good
and
not
ns
.
quiet
:
if
not
bad
and
not
skipped
and
not
interrupted
and
len
(
good
)
>
1
:
print
(
"All"
,
end
=
' '
)
print
(
count
(
len
(
good
),
"test"
),
"OK."
)
if
ns
.
print_slow
:
test_times
.
sort
(
reverse
=
True
)
print
(
"10 slowest tests:"
)
for
time
,
test
in
test_times
[:
10
]:
print
(
"
%
s:
%.1
fs"
%
(
test
,
time
))
if
bad
:
print
(
count
(
len
(
bad
),
"test"
),
"failed:"
)
printlist
(
bad
)
if
environment_changed
:
print
(
"{} altered the execution environment:"
.
format
(
count
(
len
(
environment_changed
),
"test"
)))
printlist
(
environment_changed
)
if
skipped
and
not
ns
.
quiet
:
print
(
count
(
len
(
skipped
),
"test"
),
"skipped:"
)
printlist
(
skipped
)
if
ns
.
verbose2
and
bad
:
print
(
"Re-running failed tests in verbose mode"
)
for
test
in
bad
[:]:
print
(
"Re-running test
%
r in verbose mode"
%
test
)
sys
.
stdout
.
flush
()
try
:
ns
.
verbose
=
True
ok
=
runtest
(
test
,
True
,
ns
.
quiet
,
ns
.
huntrleaks
,
timeout
=
ns
.
timeout
)
except
KeyboardInterrupt
:
# print a newline separate from the ^C
print
()
break
else
:
if
ok
[
0
]
in
{
PASSED
,
ENV_CHANGED
,
SKIPPED
,
RESOURCE_DENIED
}:
bad
.
remove
(
test
)
else
:
if
bad
:
print
(
count
(
len
(
bad
),
'test'
),
"failed again:"
)
printlist
(
bad
)
if
ns
.
single
:
if
next_single_test
:
with
open
(
filename
,
'w'
)
as
fp
:
fp
.
write
(
next_single_test
+
'
\n
'
)
else
:
os
.
unlink
(
filename
)
if
ns
.
trace
:
r
=
tracer
.
results
()
r
.
write_results
(
show_missing
=
True
,
summary
=
True
,
coverdir
=
ns
.
coverdir
)
if
ns
.
runleaks
:
os
.
system
(
"leaks
%
d"
%
os
.
getpid
())
sys
.
exit
(
len
(
bad
)
>
0
or
interrupted
)
# We do not use a generator so multiple threads can call next().
class
MultiprocessTests
(
object
):
"""A thread-safe iterator over tests for multiprocess mode."""
def
__init__
(
self
,
tests
):
self
.
interrupted
=
False
self
.
lock
=
threading
.
Lock
()
self
.
tests
=
tests
def
__iter__
(
self
):
return
self
def
__next__
(
self
):
with
self
.
lock
:
if
self
.
interrupted
:
raise
StopIteration
(
'tests interrupted'
)
return
next
(
self
.
tests
)
def
replace_stdout
():
"""Set stdout encoder error handler to backslashreplace (as stderr error
handler) to avoid UnicodeEncodeError when printing a traceback"""
import
atexit
stdout
=
sys
.
stdout
sys
.
stdout
=
open
(
stdout
.
fileno
(),
'w'
,
encoding
=
stdout
.
encoding
,
errors
=
"backslashreplace"
,
closefd
=
False
,
newline
=
'
\n
'
)
def
restore_stdout
():
sys
.
stdout
.
close
()
sys
.
stdout
=
stdout
atexit
.
register
(
restore_stdout
)
def
removepy
(
names
):
if
not
names
:
return
for
idx
,
name
in
enumerate
(
names
):
basename
,
ext
=
os
.
path
.
splitext
(
name
)
if
ext
==
'.py'
:
names
[
idx
]
=
basename
def
count
(
n
,
word
):
if
n
==
1
:
return
"
%
d
%
s"
%
(
n
,
word
)
else
:
return
"
%
d
%
ss"
%
(
n
,
word
)
def
printlist
(
x
,
width
=
70
,
indent
=
4
):
"""Print the elements of iterable x to stdout.
Optional arg width (default 70) is the maximum line length.
Optional arg indent (default 4) is the number of blanks with which to
begin each line.
"""
from
textwrap
import
fill
blanks
=
' '
*
indent
# Print the sorted list: 'x' may be a '--random' list or a set()
print
(
fill
(
' '
.
join
(
str
(
elt
)
for
elt
in
sorted
(
x
)),
width
,
initial_indent
=
blanks
,
subsequent_indent
=
blanks
))
def
main_in_temp_cwd
():
"""Run main() in a temporary working directory."""
if
sysconfig
.
is_python_build
():
try
:
os
.
mkdir
(
TEMPDIR
)
except
FileExistsError
:
pass
# Define a writable temp dir that will be used as cwd while running
# the tests. The name of the dir includes the pid to allow parallel
# testing (see the -j option).
test_cwd
=
'test_python_{}'
.
format
(
os
.
getpid
())
test_cwd
=
os
.
path
.
join
(
TEMPDIR
,
test_cwd
)
# Run the tests in a context manager that temporarily changes the CWD to a
# temporary and writable directory. If it's not possible to create or
# change the CWD, the original CWD will be used. The original CWD is
# available from support.SAVEDCWD.
with
support
.
temp_cwd
(
test_cwd
,
quiet
=
True
):
main
()
Lib/test/libregrtest/refleak.py
0 → 100644
Dosyayı görüntüle @
3844fe5e
import
os
import
re
import
sys
import
warnings
from
inspect
import
isabstract
from
test
import
support
def
dash_R
(
the_module
,
test
,
indirect_test
,
huntrleaks
):
"""Run a test multiple times, looking for reference leaks.
Returns:
False if the test didn't leak references; True if we detected refleaks.
"""
# This code is hackish and inelegant, but it seems to do the job.
import
copyreg
import
collections.abc
if
not
hasattr
(
sys
,
'gettotalrefcount'
):
raise
Exception
(
"Tracking reference leaks requires a debug build "
"of Python"
)
# Save current values for dash_R_cleanup() to restore.
fs
=
warnings
.
filters
[:]
ps
=
copyreg
.
dispatch_table
.
copy
()
pic
=
sys
.
path_importer_cache
.
copy
()
try
:
import
zipimport
except
ImportError
:
zdc
=
None
# Run unmodified on platforms without zipimport support
else
:
zdc
=
zipimport
.
_zip_directory_cache
.
copy
()
abcs
=
{}
for
abc
in
[
getattr
(
collections
.
abc
,
a
)
for
a
in
collections
.
abc
.
__all__
]:
if
not
isabstract
(
abc
):
continue
for
obj
in
abc
.
__subclasses__
()
+
[
abc
]:
abcs
[
obj
]
=
obj
.
_abc_registry
.
copy
()
nwarmup
,
ntracked
,
fname
=
huntrleaks
fname
=
os
.
path
.
join
(
support
.
SAVEDCWD
,
fname
)
repcount
=
nwarmup
+
ntracked
rc_deltas
=
[
0
]
*
repcount
alloc_deltas
=
[
0
]
*
repcount
print
(
"beginning"
,
repcount
,
"repetitions"
,
file
=
sys
.
stderr
)
print
((
"1234567890"
*
(
repcount
//
10
+
1
))[:
repcount
],
file
=
sys
.
stderr
)
sys
.
stderr
.
flush
()
for
i
in
range
(
repcount
):
indirect_test
()
alloc_after
,
rc_after
=
dash_R_cleanup
(
fs
,
ps
,
pic
,
zdc
,
abcs
)
sys
.
stderr
.
write
(
'.'
)
sys
.
stderr
.
flush
()
if
i
>=
nwarmup
:
rc_deltas
[
i
]
=
rc_after
-
rc_before
alloc_deltas
[
i
]
=
alloc_after
-
alloc_before
alloc_before
,
rc_before
=
alloc_after
,
rc_after
print
(
file
=
sys
.
stderr
)
# These checkers return False on success, True on failure
def
check_rc_deltas
(
deltas
):
return
any
(
deltas
)
def
check_alloc_deltas
(
deltas
):
# At least 1/3rd of 0s
if
3
*
deltas
.
count
(
0
)
<
len
(
deltas
):
return
True
# Nothing else than 1s, 0s and -1s
if
not
set
(
deltas
)
<=
{
1
,
0
,
-
1
}:
return
True
return
False
failed
=
False
for
deltas
,
item_name
,
checker
in
[
(
rc_deltas
,
'references'
,
check_rc_deltas
),
(
alloc_deltas
,
'memory blocks'
,
check_alloc_deltas
)]:
if
checker
(
deltas
):
msg
=
'
%
s leaked
%
s
%
s, sum=
%
s'
%
(
test
,
deltas
[
nwarmup
:],
item_name
,
sum
(
deltas
))
print
(
msg
,
file
=
sys
.
stderr
)
sys
.
stderr
.
flush
()
with
open
(
fname
,
"a"
)
as
refrep
:
print
(
msg
,
file
=
refrep
)
refrep
.
flush
()
failed
=
True
return
failed
def
dash_R_cleanup
(
fs
,
ps
,
pic
,
zdc
,
abcs
):
import
gc
,
copyreg
import
_strptime
,
linecache
import
urllib.parse
,
urllib
.
request
,
mimetypes
,
doctest
import
struct
,
filecmp
,
collections
.
abc
from
distutils.dir_util
import
_path_created
from
weakref
import
WeakSet
# Clear the warnings registry, so they can be displayed again
for
mod
in
sys
.
modules
.
values
():
if
hasattr
(
mod
,
'__warningregistry__'
):
del
mod
.
__warningregistry__
# Restore some original values.
warnings
.
filters
[:]
=
fs
copyreg
.
dispatch_table
.
clear
()
copyreg
.
dispatch_table
.
update
(
ps
)
sys
.
path_importer_cache
.
clear
()
sys
.
path_importer_cache
.
update
(
pic
)
try
:
import
zipimport
except
ImportError
:
pass
# Run unmodified on platforms without zipimport support
else
:
zipimport
.
_zip_directory_cache
.
clear
()
zipimport
.
_zip_directory_cache
.
update
(
zdc
)
# clear type cache
sys
.
_clear_type_cache
()
# Clear ABC registries, restoring previously saved ABC registries.
for
abc
in
[
getattr
(
collections
.
abc
,
a
)
for
a
in
collections
.
abc
.
__all__
]:
if
not
isabstract
(
abc
):
continue
for
obj
in
abc
.
__subclasses__
()
+
[
abc
]:
obj
.
_abc_registry
=
abcs
.
get
(
obj
,
WeakSet
())
.
copy
()
obj
.
_abc_cache
.
clear
()
obj
.
_abc_negative_cache
.
clear
()
# Flush standard output, so that buffered data is sent to the OS and
# associated Python objects are reclaimed.
for
stream
in
(
sys
.
stdout
,
sys
.
stderr
,
sys
.
__stdout__
,
sys
.
__stderr__
):
if
stream
is
not
None
:
stream
.
flush
()
# Clear assorted module caches.
_path_created
.
clear
()
re
.
purge
()
_strptime
.
_regex_cache
.
clear
()
urllib
.
parse
.
clear_cache
()
urllib
.
request
.
urlcleanup
()
linecache
.
clearcache
()
mimetypes
.
_default_mime_types
()
filecmp
.
_cache
.
clear
()
struct
.
_clearcache
()
doctest
.
master
=
None
try
:
import
ctypes
except
ImportError
:
# Don't worry about resetting the cache if ctypes is not supported
pass
else
:
ctypes
.
_reset_cache
()
# Collect cyclic trash and read memory statistics immediately after.
func1
=
sys
.
getallocatedblocks
func2
=
sys
.
gettotalrefcount
gc
.
collect
()
return
func1
(),
func2
()
def
warm_caches
():
# char cache
s
=
bytes
(
range
(
256
))
for
i
in
range
(
256
):
s
[
i
:
i
+
1
]
# unicode cache
x
=
[
chr
(
i
)
for
i
in
range
(
256
)]
# int cache
x
=
list
(
range
(
-
5
,
257
))
Lib/test/libregrtest/runtest.py
0 → 100644
Dosyayı görüntüle @
3844fe5e
import
faulthandler
import
importlib
import
io
import
json
import
os
import
sys
import
time
import
traceback
import
unittest
from
test
import
support
from
test.libregrtest.refleak
import
dash_R
from
test.libregrtest.save_env
import
saved_test_environment
# Test result constants.
PASSED
=
1
FAILED
=
0
ENV_CHANGED
=
-
1
SKIPPED
=
-
2
RESOURCE_DENIED
=
-
3
INTERRUPTED
=
-
4
CHILD_ERROR
=
-
5
# error in a child process
def
run_test_in_subprocess
(
testname
,
ns
):
"""Run the given test in a subprocess with --slaveargs.
ns is the option Namespace parsed from command-line arguments. regrtest
is invoked in a subprocess with the --slaveargs argument; when the
subprocess exits, its return code, stdout and stderr are returned as a
3-tuple.
"""
from
subprocess
import
Popen
,
PIPE
base_cmd
=
([
sys
.
executable
]
+
support
.
args_from_interpreter_flags
()
+
[
'-X'
,
'faulthandler'
,
'-m'
,
'test.regrtest'
])
slaveargs
=
(
(
testname
,
ns
.
verbose
,
ns
.
quiet
),
dict
(
huntrleaks
=
ns
.
huntrleaks
,
use_resources
=
ns
.
use_resources
,
output_on_failure
=
ns
.
verbose3
,
timeout
=
ns
.
timeout
,
failfast
=
ns
.
failfast
,
match_tests
=
ns
.
match_tests
))
# Running the child from the same working directory as regrtest's original
# invocation ensures that TEMPDIR for the child is the same when
# sysconfig.is_python_build() is true. See issue 15300.
popen
=
Popen
(
base_cmd
+
[
'--slaveargs'
,
json
.
dumps
(
slaveargs
)],
stdout
=
PIPE
,
stderr
=
PIPE
,
universal_newlines
=
True
,
close_fds
=
(
os
.
name
!=
'nt'
),
cwd
=
support
.
SAVEDCWD
)
stdout
,
stderr
=
popen
.
communicate
()
retcode
=
popen
.
wait
()
return
retcode
,
stdout
,
stderr
# small set of tests to determine if we have a basically functioning interpreter
# (i.e. if any of these fail, then anything else is likely to follow)
STDTESTS
=
[
'test_grammar'
,
'test_opcodes'
,
'test_dict'
,
'test_builtin'
,
'test_exceptions'
,
'test_types'
,
'test_unittest'
,
'test_doctest'
,
'test_doctest2'
,
'test_support'
]
# set of tests that we don't want to be executed when using regrtest
NOTTESTS
=
set
()
def
findtests
(
testdir
=
None
,
stdtests
=
STDTESTS
,
nottests
=
NOTTESTS
):
"""Return a list of all applicable test modules."""
testdir
=
findtestdir
(
testdir
)
names
=
os
.
listdir
(
testdir
)
tests
=
[]
others
=
set
(
stdtests
)
|
nottests
for
name
in
names
:
mod
,
ext
=
os
.
path
.
splitext
(
name
)
if
mod
[:
5
]
==
"test_"
and
ext
in
(
".py"
,
""
)
and
mod
not
in
others
:
tests
.
append
(
mod
)
return
stdtests
+
sorted
(
tests
)
def
runtest
(
test
,
verbose
,
quiet
,
huntrleaks
=
False
,
use_resources
=
None
,
output_on_failure
=
False
,
failfast
=
False
,
match_tests
=
None
,
timeout
=
None
):
"""Run a single test.
test -- the name of the test
verbose -- if true, print more messages
quiet -- if true, don't print 'skipped' messages (probably redundant)
huntrleaks -- run multiple times to test for leaks; requires a debug
build; a triple corresponding to -R's three arguments
use_resources -- list of extra resources to use
output_on_failure -- if true, display test output on failure
timeout -- dump the traceback and exit if a test takes more than
timeout seconds
failfast, match_tests -- See regrtest command-line flags for these.
Returns the tuple result, test_time, where result is one of the constants:
INTERRUPTED KeyboardInterrupt when run under -j
RESOURCE_DENIED test skipped because resource denied
SKIPPED test skipped for some other reason
ENV_CHANGED test failed because it changed the execution environment
FAILED test failed
PASSED test passed
"""
if
use_resources
is
not
None
:
support
.
use_resources
=
use_resources
use_timeout
=
(
timeout
is
not
None
)
if
use_timeout
:
faulthandler
.
dump_traceback_later
(
timeout
,
exit
=
True
)
try
:
support
.
match_tests
=
match_tests
if
failfast
:
support
.
failfast
=
True
if
output_on_failure
:
support
.
verbose
=
True
# Reuse the same instance to all calls to runtest(). Some
# tests keep a reference to sys.stdout or sys.stderr
# (eg. test_argparse).
if
runtest
.
stringio
is
None
:
stream
=
io
.
StringIO
()
runtest
.
stringio
=
stream
else
:
stream
=
runtest
.
stringio
stream
.
seek
(
0
)
stream
.
truncate
()
orig_stdout
=
sys
.
stdout
orig_stderr
=
sys
.
stderr
try
:
sys
.
stdout
=
stream
sys
.
stderr
=
stream
result
=
runtest_inner
(
test
,
verbose
,
quiet
,
huntrleaks
,
display_failure
=
False
)
if
result
[
0
]
==
FAILED
:
output
=
stream
.
getvalue
()
orig_stderr
.
write
(
output
)
orig_stderr
.
flush
()
finally
:
sys
.
stdout
=
orig_stdout
sys
.
stderr
=
orig_stderr
else
:
support
.
verbose
=
verbose
# Tell tests to be moderately quiet
result
=
runtest_inner
(
test
,
verbose
,
quiet
,
huntrleaks
,
display_failure
=
not
verbose
)
return
result
finally
:
if
use_timeout
:
faulthandler
.
cancel_dump_traceback_later
()
cleanup_test_droppings
(
test
,
verbose
)
runtest
.
stringio
=
None
def
runtest_inner
(
test
,
verbose
,
quiet
,
huntrleaks
=
False
,
display_failure
=
True
):
support
.
unload
(
test
)
test_time
=
0.0
refleak
=
False
# True if the test leaked references.
try
:
if
test
.
startswith
(
'test.'
):
abstest
=
test
else
:
# Always import it from the test package
abstest
=
'test.'
+
test
with
saved_test_environment
(
test
,
verbose
,
quiet
)
as
environment
:
start_time
=
time
.
time
()
the_module
=
importlib
.
import_module
(
abstest
)
# If the test has a test_main, that will run the appropriate
# tests. If not, use normal unittest test loading.
test_runner
=
getattr
(
the_module
,
"test_main"
,
None
)
if
test_runner
is
None
:
def
test_runner
():
loader
=
unittest
.
TestLoader
()
tests
=
loader
.
loadTestsFromModule
(
the_module
)
for
error
in
loader
.
errors
:
print
(
error
,
file
=
sys
.
stderr
)
if
loader
.
errors
:
raise
Exception
(
"errors while loading tests"
)
support
.
run_unittest
(
tests
)
test_runner
()
if
huntrleaks
:
refleak
=
dash_R
(
the_module
,
test
,
test_runner
,
huntrleaks
)
test_time
=
time
.
time
()
-
start_time
except
support
.
ResourceDenied
as
msg
:
if
not
quiet
:
print
(
test
,
"skipped --"
,
msg
)
sys
.
stdout
.
flush
()
return
RESOURCE_DENIED
,
test_time
except
unittest
.
SkipTest
as
msg
:
if
not
quiet
:
print
(
test
,
"skipped --"
,
msg
)
sys
.
stdout
.
flush
()
return
SKIPPED
,
test_time
except
KeyboardInterrupt
:
raise
except
support
.
TestFailed
as
msg
:
if
display_failure
:
print
(
"test"
,
test
,
"failed --"
,
msg
,
file
=
sys
.
stderr
)
else
:
print
(
"test"
,
test
,
"failed"
,
file
=
sys
.
stderr
)
sys
.
stderr
.
flush
()
return
FAILED
,
test_time
except
:
msg
=
traceback
.
format_exc
()
print
(
"test"
,
test
,
"crashed --"
,
msg
,
file
=
sys
.
stderr
)
sys
.
stderr
.
flush
()
return
FAILED
,
test_time
else
:
if
refleak
:
return
FAILED
,
test_time
if
environment
.
changed
:
return
ENV_CHANGED
,
test_time
return
PASSED
,
test_time
def
cleanup_test_droppings
(
testname
,
verbose
):
import
shutil
import
stat
import
gc
# First kill any dangling references to open files etc.
# This can also issue some ResourceWarnings which would otherwise get
# triggered during the following test run, and possibly produce failures.
gc
.
collect
()
# Try to clean up junk commonly left behind. While tests shouldn't leave
# any files or directories behind, when a test fails that can be tedious
# for it to arrange. The consequences can be especially nasty on Windows,
# since if a test leaves a file open, it cannot be deleted by name (while
# there's nothing we can do about that here either, we can display the
# name of the offending test, which is a real help).
for
name
in
(
support
.
TESTFN
,
"db_home"
,
):
if
not
os
.
path
.
exists
(
name
):
continue
if
os
.
path
.
isdir
(
name
):
kind
,
nuker
=
"directory"
,
shutil
.
rmtree
elif
os
.
path
.
isfile
(
name
):
kind
,
nuker
=
"file"
,
os
.
unlink
else
:
raise
SystemError
(
"os.path says
%
r exists but is neither "
"directory nor file"
%
name
)
if
verbose
:
print
(
"
%
r left behind
%
s
%
r"
%
(
testname
,
kind
,
name
))
try
:
# if we have chmod, fix possible permissions problems
# that might prevent cleanup
if
(
hasattr
(
os
,
'chmod'
)):
os
.
chmod
(
name
,
stat
.
S_IRWXU
|
stat
.
S_IRWXG
|
stat
.
S_IRWXO
)
nuker
(
name
)
except
Exception
as
msg
:
print
((
"
%
r left behind
%
s
%
r and it couldn't be "
"removed:
%
s"
%
(
testname
,
kind
,
name
,
msg
)),
file
=
sys
.
stderr
)
def
findtestdir
(
path
=
None
):
return
path
or
os
.
path
.
dirname
(
os
.
path
.
dirname
(
__file__
))
or
os
.
curdir
Lib/test/libregrtest/save_env.py
0 → 100644
Dosyayı görüntüle @
3844fe5e
import
builtins
import
locale
import
logging
import
os
import
shutil
import
sys
import
sysconfig
import
warnings
from
test
import
support
try
:
import
threading
except
ImportError
:
threading
=
None
try
:
import
_multiprocessing
,
multiprocessing
.
process
except
ImportError
:
multiprocessing
=
None
# Unit tests are supposed to leave the execution environment unchanged
# once they complete. But sometimes tests have bugs, especially when
# tests fail, and the changes to environment go on to mess up other
# tests. This can cause issues with buildbot stability, since tests
# are run in random order and so problems may appear to come and go.
# There are a few things we can save and restore to mitigate this, and
# the following context manager handles this task.
class
saved_test_environment
:
"""Save bits of the test environment and restore them at block exit.
with saved_test_environment(testname, verbose, quiet):
#stuff
Unless quiet is True, a warning is printed to stderr if any of
the saved items was changed by the test. The attribute 'changed'
is initially False, but is set to True if a change is detected.
If verbose is more than 1, the before and after state of changed
items is also printed.
"""
changed
=
False
def
__init__
(
self
,
testname
,
verbose
=
0
,
quiet
=
False
):
self
.
testname
=
testname
self
.
verbose
=
verbose
self
.
quiet
=
quiet
# To add things to save and restore, add a name XXX to the resources list
# and add corresponding get_XXX/restore_XXX functions. get_XXX should
# return the value to be saved and compared against a second call to the
# get function when test execution completes. restore_XXX should accept
# the saved value and restore the resource using it. It will be called if
# and only if a change in the value is detected.
#
# Note: XXX will have any '.' replaced with '_' characters when determining
# the corresponding method names.
resources
=
(
'sys.argv'
,
'cwd'
,
'sys.stdin'
,
'sys.stdout'
,
'sys.stderr'
,
'os.environ'
,
'sys.path'
,
'sys.path_hooks'
,
'__import__'
,
'warnings.filters'
,
'asyncore.socket_map'
,
'logging._handlers'
,
'logging._handlerList'
,
'sys.gettrace'
,
'sys.warnoptions'
,
# multiprocessing.process._cleanup() may release ref
# to a thread, so check processes first.
'multiprocessing.process._dangling'
,
'threading._dangling'
,
'sysconfig._CONFIG_VARS'
,
'sysconfig._INSTALL_SCHEMES'
,
'files'
,
'locale'
,
'warnings.showwarning'
,
)
def
get_sys_argv
(
self
):
return
id
(
sys
.
argv
),
sys
.
argv
,
sys
.
argv
[:]
def
restore_sys_argv
(
self
,
saved_argv
):
sys
.
argv
=
saved_argv
[
1
]
sys
.
argv
[:]
=
saved_argv
[
2
]
def
get_cwd
(
self
):
return
os
.
getcwd
()
def
restore_cwd
(
self
,
saved_cwd
):
os
.
chdir
(
saved_cwd
)
def
get_sys_stdout
(
self
):
return
sys
.
stdout
def
restore_sys_stdout
(
self
,
saved_stdout
):
sys
.
stdout
=
saved_stdout
def
get_sys_stderr
(
self
):
return
sys
.
stderr
def
restore_sys_stderr
(
self
,
saved_stderr
):
sys
.
stderr
=
saved_stderr
def
get_sys_stdin
(
self
):
return
sys
.
stdin
def
restore_sys_stdin
(
self
,
saved_stdin
):
sys
.
stdin
=
saved_stdin
def
get_os_environ
(
self
):
return
id
(
os
.
environ
),
os
.
environ
,
dict
(
os
.
environ
)
def
restore_os_environ
(
self
,
saved_environ
):
os
.
environ
=
saved_environ
[
1
]
os
.
environ
.
clear
()
os
.
environ
.
update
(
saved_environ
[
2
])
def
get_sys_path
(
self
):
return
id
(
sys
.
path
),
sys
.
path
,
sys
.
path
[:]
def
restore_sys_path
(
self
,
saved_path
):
sys
.
path
=
saved_path
[
1
]
sys
.
path
[:]
=
saved_path
[
2
]
def
get_sys_path_hooks
(
self
):
return
id
(
sys
.
path_hooks
),
sys
.
path_hooks
,
sys
.
path_hooks
[:]
def
restore_sys_path_hooks
(
self
,
saved_hooks
):
sys
.
path_hooks
=
saved_hooks
[
1
]
sys
.
path_hooks
[:]
=
saved_hooks
[
2
]
def
get_sys_gettrace
(
self
):
return
sys
.
gettrace
()
def
restore_sys_gettrace
(
self
,
trace_fxn
):
sys
.
settrace
(
trace_fxn
)
def
get___import__
(
self
):
return
builtins
.
__import__
def
restore___import__
(
self
,
import_
):
builtins
.
__import__
=
import_
def
get_warnings_filters
(
self
):
return
id
(
warnings
.
filters
),
warnings
.
filters
,
warnings
.
filters
[:]
def
restore_warnings_filters
(
self
,
saved_filters
):
warnings
.
filters
=
saved_filters
[
1
]
warnings
.
filters
[:]
=
saved_filters
[
2
]
def
get_asyncore_socket_map
(
self
):
asyncore
=
sys
.
modules
.
get
(
'asyncore'
)
# XXX Making a copy keeps objects alive until __exit__ gets called.
return
asyncore
and
asyncore
.
socket_map
.
copy
()
or
{}
def
restore_asyncore_socket_map
(
self
,
saved_map
):
asyncore
=
sys
.
modules
.
get
(
'asyncore'
)
if
asyncore
is
not
None
:
asyncore
.
close_all
(
ignore_all
=
True
)
asyncore
.
socket_map
.
update
(
saved_map
)
def
get_shutil_archive_formats
(
self
):
# we could call get_archives_formats() but that only returns the
# registry keys; we want to check the values too (the functions that
# are registered)
return
shutil
.
_ARCHIVE_FORMATS
,
shutil
.
_ARCHIVE_FORMATS
.
copy
()
def
restore_shutil_archive_formats
(
self
,
saved
):
shutil
.
_ARCHIVE_FORMATS
=
saved
[
0
]
shutil
.
_ARCHIVE_FORMATS
.
clear
()
shutil
.
_ARCHIVE_FORMATS
.
update
(
saved
[
1
])
def
get_shutil_unpack_formats
(
self
):
return
shutil
.
_UNPACK_FORMATS
,
shutil
.
_UNPACK_FORMATS
.
copy
()
def
restore_shutil_unpack_formats
(
self
,
saved
):
shutil
.
_UNPACK_FORMATS
=
saved
[
0
]
shutil
.
_UNPACK_FORMATS
.
clear
()
shutil
.
_UNPACK_FORMATS
.
update
(
saved
[
1
])
def
get_logging__handlers
(
self
):
# _handlers is a WeakValueDictionary
return
id
(
logging
.
_handlers
),
logging
.
_handlers
,
logging
.
_handlers
.
copy
()
def
restore_logging__handlers
(
self
,
saved_handlers
):
# Can't easily revert the logging state
pass
def
get_logging__handlerList
(
self
):
# _handlerList is a list of weakrefs to handlers
return
id
(
logging
.
_handlerList
),
logging
.
_handlerList
,
logging
.
_handlerList
[:]
def
restore_logging__handlerList
(
self
,
saved_handlerList
):
# Can't easily revert the logging state
pass
def
get_sys_warnoptions
(
self
):
return
id
(
sys
.
warnoptions
),
sys
.
warnoptions
,
sys
.
warnoptions
[:]
def
restore_sys_warnoptions
(
self
,
saved_options
):
sys
.
warnoptions
=
saved_options
[
1
]
sys
.
warnoptions
[:]
=
saved_options
[
2
]
# Controlling dangling references to Thread objects can make it easier
# to track reference leaks.
def
get_threading__dangling
(
self
):
if
not
threading
:
return
None
# This copies the weakrefs without making any strong reference
return
threading
.
_dangling
.
copy
()
def
restore_threading__dangling
(
self
,
saved
):
if
not
threading
:
return
threading
.
_dangling
.
clear
()
threading
.
_dangling
.
update
(
saved
)
# Same for Process objects
def
get_multiprocessing_process__dangling
(
self
):
if
not
multiprocessing
:
return
None
# Unjoined process objects can survive after process exits
multiprocessing
.
process
.
_cleanup
()
# This copies the weakrefs without making any strong reference
return
multiprocessing
.
process
.
_dangling
.
copy
()
def
restore_multiprocessing_process__dangling
(
self
,
saved
):
if
not
multiprocessing
:
return
multiprocessing
.
process
.
_dangling
.
clear
()
multiprocessing
.
process
.
_dangling
.
update
(
saved
)
def
get_sysconfig__CONFIG_VARS
(
self
):
# make sure the dict is initialized
sysconfig
.
get_config_var
(
'prefix'
)
return
(
id
(
sysconfig
.
_CONFIG_VARS
),
sysconfig
.
_CONFIG_VARS
,
dict
(
sysconfig
.
_CONFIG_VARS
))
def
restore_sysconfig__CONFIG_VARS
(
self
,
saved
):
sysconfig
.
_CONFIG_VARS
=
saved
[
1
]
sysconfig
.
_CONFIG_VARS
.
clear
()
sysconfig
.
_CONFIG_VARS
.
update
(
saved
[
2
])
def
get_sysconfig__INSTALL_SCHEMES
(
self
):
return
(
id
(
sysconfig
.
_INSTALL_SCHEMES
),
sysconfig
.
_INSTALL_SCHEMES
,
sysconfig
.
_INSTALL_SCHEMES
.
copy
())
def
restore_sysconfig__INSTALL_SCHEMES
(
self
,
saved
):
sysconfig
.
_INSTALL_SCHEMES
=
saved
[
1
]
sysconfig
.
_INSTALL_SCHEMES
.
clear
()
sysconfig
.
_INSTALL_SCHEMES
.
update
(
saved
[
2
])
def
get_files
(
self
):
return
sorted
(
fn
+
(
'/'
if
os
.
path
.
isdir
(
fn
)
else
''
)
for
fn
in
os
.
listdir
())
def
restore_files
(
self
,
saved_value
):
fn
=
support
.
TESTFN
if
fn
not
in
saved_value
and
(
fn
+
'/'
)
not
in
saved_value
:
if
os
.
path
.
isfile
(
fn
):
support
.
unlink
(
fn
)
elif
os
.
path
.
isdir
(
fn
):
support
.
rmtree
(
fn
)
_lc
=
[
getattr
(
locale
,
lc
)
for
lc
in
dir
(
locale
)
if
lc
.
startswith
(
'LC_'
)]
def
get_locale
(
self
):
pairings
=
[]
for
lc
in
self
.
_lc
:
try
:
pairings
.
append
((
lc
,
locale
.
setlocale
(
lc
,
None
)))
except
(
TypeError
,
ValueError
):
continue
return
pairings
def
restore_locale
(
self
,
saved
):
for
lc
,
setting
in
saved
:
locale
.
setlocale
(
lc
,
setting
)
def
get_warnings_showwarning
(
self
):
return
warnings
.
showwarning
def
restore_warnings_showwarning
(
self
,
fxn
):
warnings
.
showwarning
=
fxn
def
resource_info
(
self
):
for
name
in
self
.
resources
:
method_suffix
=
name
.
replace
(
'.'
,
'_'
)
get_name
=
'get_'
+
method_suffix
restore_name
=
'restore_'
+
method_suffix
yield
name
,
getattr
(
self
,
get_name
),
getattr
(
self
,
restore_name
)
def
__enter__
(
self
):
self
.
saved_values
=
dict
((
name
,
get
())
for
name
,
get
,
restore
in
self
.
resource_info
())
return
self
def
__exit__
(
self
,
exc_type
,
exc_val
,
exc_tb
):
saved_values
=
self
.
saved_values
del
self
.
saved_values
for
name
,
get
,
restore
in
self
.
resource_info
():
current
=
get
()
original
=
saved_values
.
pop
(
name
)
# Check for changes to the resource's value
if
current
!=
original
:
self
.
changed
=
True
restore
(
original
)
if
not
self
.
quiet
:
print
(
"Warning -- {} was modified by {}"
.
format
(
name
,
self
.
testname
),
file
=
sys
.
stderr
)
if
self
.
verbose
>
1
:
print
(
" Before: {}
\n
After: {} "
.
format
(
original
,
current
),
file
=
sys
.
stderr
)
return
False
Lib/test/regrtest.py
Dosyayı görüntüle @
3844fe5e
...
@@ -9,1232 +9,9 @@ Run this script with -h or --help for documentation.
...
@@ -9,1232 +9,9 @@ Run this script with -h or --help for documentation.
# We import importlib *ASAP* in order to test #15386
# We import importlib *ASAP* in order to test #15386
import
importlib
import
importlib
import
argparse
import
builtins
import
faulthandler
import
io
import
json
import
locale
import
logging
import
os
import
os
import
platform
import
random
import
re
import
shutil
import
signal
import
sys
import
sys
import
sysconfig
from
test.libregrtest
import
main_in_temp_cwd
import
tempfile
import
time
import
traceback
import
unittest
import
warnings
from
inspect
import
isabstract
try
:
import
threading
except
ImportError
:
threading
=
None
try
:
import
_multiprocessing
,
multiprocessing
.
process
except
ImportError
:
multiprocessing
=
None
from
test.libregrtest
import
_parse_args
# Some times __path__ and __file__ are not absolute (e.g. while running from
# Lib/) and, if we change the CWD to run the tests in a temporary dir, some
# imports might fail. This affects only the modules imported before os.chdir().
# These modules are searched first in sys.path[0] (so '' -- the CWD) and if
# they are found in the CWD their __file__ and __path__ will be relative (this
# happens before the chdir). All the modules imported after the chdir, are
# not found in the CWD, and since the other paths in sys.path[1:] are absolute
# (site.py absolutize them), the __file__ and __path__ will be absolute too.
# Therefore it is necessary to absolutize manually the __file__ and __path__ of
# the packages to prevent later imports to fail when the CWD is different.
for
module
in
sys
.
modules
.
values
():
if
hasattr
(
module
,
'__path__'
):
module
.
__path__
=
[
os
.
path
.
abspath
(
path
)
for
path
in
module
.
__path__
]
if
hasattr
(
module
,
'__file__'
):
module
.
__file__
=
os
.
path
.
abspath
(
module
.
__file__
)
# MacOSX (a.k.a. Darwin) has a default stack size that is too small
# for deeply recursive regular expressions. We see this as crashes in
# the Python test suite when running test_re.py and test_sre.py. The
# fix is to set the stack limit to 2048.
# This approach may also be useful for other Unixy platforms that
# suffer from small default stack limits.
if
sys
.
platform
==
'darwin'
:
try
:
import
resource
except
ImportError
:
pass
else
:
soft
,
hard
=
resource
.
getrlimit
(
resource
.
RLIMIT_STACK
)
newsoft
=
min
(
hard
,
max
(
soft
,
1024
*
2048
))
resource
.
setrlimit
(
resource
.
RLIMIT_STACK
,
(
newsoft
,
hard
))
# Test result constants.
PASSED
=
1
FAILED
=
0
ENV_CHANGED
=
-
1
SKIPPED
=
-
2
RESOURCE_DENIED
=
-
3
INTERRUPTED
=
-
4
CHILD_ERROR
=
-
5
# error in a child process
from
test
import
support
# When tests are run from the Python build directory, it is best practice
# to keep the test files in a subfolder. This eases the cleanup of leftover
# files using the "make distclean" command.
if
sysconfig
.
is_python_build
():
TEMPDIR
=
os
.
path
.
join
(
sysconfig
.
get_config_var
(
'srcdir'
),
'build'
)
else
:
TEMPDIR
=
tempfile
.
gettempdir
()
TEMPDIR
=
os
.
path
.
abspath
(
TEMPDIR
)
def
run_test_in_subprocess
(
testname
,
ns
):
"""Run the given test in a subprocess with --slaveargs.
ns is the option Namespace parsed from command-line arguments. regrtest
is invoked in a subprocess with the --slaveargs argument; when the
subprocess exits, its return code, stdout and stderr are returned as a
3-tuple.
"""
from
subprocess
import
Popen
,
PIPE
base_cmd
=
([
sys
.
executable
]
+
support
.
args_from_interpreter_flags
()
+
[
'-X'
,
'faulthandler'
,
'-m'
,
'test.regrtest'
])
slaveargs
=
(
(
testname
,
ns
.
verbose
,
ns
.
quiet
),
dict
(
huntrleaks
=
ns
.
huntrleaks
,
use_resources
=
ns
.
use_resources
,
output_on_failure
=
ns
.
verbose3
,
timeout
=
ns
.
timeout
,
failfast
=
ns
.
failfast
,
match_tests
=
ns
.
match_tests
))
# Running the child from the same working directory as regrtest's original
# invocation ensures that TEMPDIR for the child is the same when
# sysconfig.is_python_build() is true. See issue 15300.
popen
=
Popen
(
base_cmd
+
[
'--slaveargs'
,
json
.
dumps
(
slaveargs
)],
stdout
=
PIPE
,
stderr
=
PIPE
,
universal_newlines
=
True
,
close_fds
=
(
os
.
name
!=
'nt'
),
cwd
=
support
.
SAVEDCWD
)
stdout
,
stderr
=
popen
.
communicate
()
retcode
=
popen
.
wait
()
return
retcode
,
stdout
,
stderr
def
main
(
tests
=
None
,
**
kwargs
):
"""Execute a test suite.
This also parses command-line options and modifies its behavior
accordingly.
tests -- a list of strings containing test names (optional)
testdir -- the directory in which to look for tests (optional)
Users other than the Python test suite will certainly want to
specify testdir; if it's omitted, the directory containing the
Python test suite is searched for.
If the tests argument is omitted, the tests listed on the
command-line will be used. If that's empty, too, then all *.py
files beginning with test_ will be used.
The other default arguments (verbose, quiet, exclude,
single, randomize, findleaks, use_resources, trace, coverdir,
print_slow, and random_seed) allow programmers calling main()
directly to set the values that would normally be set by flags
on the command line.
"""
# Display the Python traceback on fatal errors (e.g. segfault)
faulthandler
.
enable
(
all_threads
=
True
)
# Display the Python traceback on SIGALRM or SIGUSR1 signal
signals
=
[]
if
hasattr
(
signal
,
'SIGALRM'
):
signals
.
append
(
signal
.
SIGALRM
)
if
hasattr
(
signal
,
'SIGUSR1'
):
signals
.
append
(
signal
.
SIGUSR1
)
for
signum
in
signals
:
faulthandler
.
register
(
signum
,
chain
=
True
)
replace_stdout
()
support
.
record_original_stdout
(
sys
.
stdout
)
ns
=
_parse_args
(
sys
.
argv
[
1
:],
**
kwargs
)
if
ns
.
huntrleaks
:
# Avoid false positives due to various caches
# filling slowly with random data:
warm_caches
()
if
ns
.
memlimit
is
not
None
:
support
.
set_memlimit
(
ns
.
memlimit
)
if
ns
.
threshold
is
not
None
:
import
gc
gc
.
set_threshold
(
ns
.
threshold
)
if
ns
.
nowindows
:
import
msvcrt
msvcrt
.
SetErrorMode
(
msvcrt
.
SEM_FAILCRITICALERRORS
|
msvcrt
.
SEM_NOALIGNMENTFAULTEXCEPT
|
msvcrt
.
SEM_NOGPFAULTERRORBOX
|
msvcrt
.
SEM_NOOPENFILEERRORBOX
)
try
:
msvcrt
.
CrtSetReportMode
except
AttributeError
:
# release build
pass
else
:
for
m
in
[
msvcrt
.
CRT_WARN
,
msvcrt
.
CRT_ERROR
,
msvcrt
.
CRT_ASSERT
]:
msvcrt
.
CrtSetReportMode
(
m
,
msvcrt
.
CRTDBG_MODE_FILE
)
msvcrt
.
CrtSetReportFile
(
m
,
msvcrt
.
CRTDBG_FILE_STDERR
)
if
ns
.
wait
:
input
(
"Press any key to continue..."
)
if
ns
.
slaveargs
is
not
None
:
args
,
kwargs
=
json
.
loads
(
ns
.
slaveargs
)
if
kwargs
.
get
(
'huntrleaks'
):
unittest
.
BaseTestSuite
.
_cleanup
=
False
try
:
result
=
runtest
(
*
args
,
**
kwargs
)
except
KeyboardInterrupt
:
result
=
INTERRUPTED
,
''
except
BaseException
as
e
:
traceback
.
print_exc
()
result
=
CHILD_ERROR
,
str
(
e
)
sys
.
stdout
.
flush
()
print
()
# Force a newline (just in case)
print
(
json
.
dumps
(
result
))
sys
.
exit
(
0
)
good
=
[]
bad
=
[]
skipped
=
[]
resource_denieds
=
[]
environment_changed
=
[]
interrupted
=
False
if
ns
.
findleaks
:
try
:
import
gc
except
ImportError
:
print
(
'No GC available, disabling findleaks.'
)
ns
.
findleaks
=
False
else
:
# Uncomment the line below to report garbage that is not
# freeable by reference counting alone. By default only
# garbage that is not collectable by the GC is reported.
#gc.set_debug(gc.DEBUG_SAVEALL)
found_garbage
=
[]
if
ns
.
huntrleaks
:
unittest
.
BaseTestSuite
.
_cleanup
=
False
if
ns
.
single
:
filename
=
os
.
path
.
join
(
TEMPDIR
,
'pynexttest'
)
try
:
with
open
(
filename
,
'r'
)
as
fp
:
next_test
=
fp
.
read
()
.
strip
()
tests
=
[
next_test
]
except
OSError
:
pass
if
ns
.
fromfile
:
tests
=
[]
with
open
(
os
.
path
.
join
(
support
.
SAVEDCWD
,
ns
.
fromfile
))
as
fp
:
count_pat
=
re
.
compile
(
r'\[\s*\d+/\s*\d+\]'
)
for
line
in
fp
:
line
=
count_pat
.
sub
(
''
,
line
)
guts
=
line
.
split
()
# assuming no test has whitespace in its name
if
guts
and
not
guts
[
0
]
.
startswith
(
'#'
):
tests
.
extend
(
guts
)
# Strip .py extensions.
removepy
(
ns
.
args
)
removepy
(
tests
)
stdtests
=
STDTESTS
[:]
nottests
=
NOTTESTS
.
copy
()
if
ns
.
exclude
:
for
arg
in
ns
.
args
:
if
arg
in
stdtests
:
stdtests
.
remove
(
arg
)
nottests
.
add
(
arg
)
ns
.
args
=
[]
# For a partial run, we do not need to clutter the output.
if
ns
.
verbose
or
ns
.
header
or
not
(
ns
.
quiet
or
ns
.
single
or
tests
or
ns
.
args
):
# Print basic platform information
print
(
"=="
,
platform
.
python_implementation
(),
*
sys
.
version
.
split
())
print
(
"== "
,
platform
.
platform
(
aliased
=
True
),
"
%
s-endian"
%
sys
.
byteorder
)
print
(
"== "
,
"hash algorithm:"
,
sys
.
hash_info
.
algorithm
,
"64bit"
if
sys
.
maxsize
>
2
**
32
else
"32bit"
)
print
(
"== "
,
os
.
getcwd
())
print
(
"Testing with flags:"
,
sys
.
flags
)
# if testdir is set, then we are not running the python tests suite, so
# don't add default tests to be executed or skipped (pass empty values)
if
ns
.
testdir
:
alltests
=
findtests
(
ns
.
testdir
,
list
(),
set
())
else
:
alltests
=
findtests
(
ns
.
testdir
,
stdtests
,
nottests
)
selected
=
tests
or
ns
.
args
or
alltests
if
ns
.
single
:
selected
=
selected
[:
1
]
try
:
next_single_test
=
alltests
[
alltests
.
index
(
selected
[
0
])
+
1
]
except
IndexError
:
next_single_test
=
None
# Remove all the selected tests that precede start if it's set.
if
ns
.
start
:
try
:
del
selected
[:
selected
.
index
(
ns
.
start
)]
except
ValueError
:
print
(
"Couldn't find starting test (
%
s), using all tests"
%
ns
.
start
)
if
ns
.
randomize
:
if
ns
.
random_seed
is
None
:
ns
.
random_seed
=
random
.
randrange
(
10000000
)
random
.
seed
(
ns
.
random_seed
)
print
(
"Using random seed"
,
ns
.
random_seed
)
random
.
shuffle
(
selected
)
if
ns
.
trace
:
import
trace
,
tempfile
tracer
=
trace
.
Trace
(
ignoredirs
=
[
sys
.
base_prefix
,
sys
.
base_exec_prefix
,
tempfile
.
gettempdir
()],
trace
=
False
,
count
=
True
)
test_times
=
[]
support
.
verbose
=
ns
.
verbose
# Tell tests to be moderately quiet
support
.
use_resources
=
ns
.
use_resources
save_modules
=
sys
.
modules
.
keys
()
def
accumulate_result
(
test
,
result
):
ok
,
test_time
=
result
test_times
.
append
((
test_time
,
test
))
if
ok
==
PASSED
:
good
.
append
(
test
)
elif
ok
==
FAILED
:
bad
.
append
(
test
)
elif
ok
==
ENV_CHANGED
:
environment_changed
.
append
(
test
)
elif
ok
==
SKIPPED
:
skipped
.
append
(
test
)
elif
ok
==
RESOURCE_DENIED
:
skipped
.
append
(
test
)
resource_denieds
.
append
(
test
)
if
ns
.
forever
:
def
test_forever
(
tests
=
list
(
selected
)):
while
True
:
for
test
in
tests
:
yield
test
if
bad
:
return
tests
=
test_forever
()
test_count
=
''
test_count_width
=
3
else
:
tests
=
iter
(
selected
)
test_count
=
'/{}'
.
format
(
len
(
selected
))
test_count_width
=
len
(
test_count
)
-
1
if
ns
.
use_mp
:
try
:
from
threading
import
Thread
except
ImportError
:
print
(
"Multiprocess option requires thread support"
)
sys
.
exit
(
2
)
from
queue
import
Queue
debug_output_pat
=
re
.
compile
(
r"\[\d+ refs, \d+ blocks\]$"
)
output
=
Queue
()
pending
=
MultiprocessTests
(
tests
)
def
work
():
# A worker thread.
try
:
while
True
:
try
:
test
=
next
(
pending
)
except
StopIteration
:
output
.
put
((
None
,
None
,
None
,
None
))
return
retcode
,
stdout
,
stderr
=
run_test_in_subprocess
(
test
,
ns
)
# Strip last refcount output line if it exists, since it
# comes from the shutdown of the interpreter in the subcommand.
stderr
=
debug_output_pat
.
sub
(
""
,
stderr
)
stdout
,
_
,
result
=
stdout
.
strip
()
.
rpartition
(
"
\n
"
)
if
retcode
!=
0
:
result
=
(
CHILD_ERROR
,
"Exit code
%
s"
%
retcode
)
output
.
put
((
test
,
stdout
.
rstrip
(),
stderr
.
rstrip
(),
result
))
return
if
not
result
:
output
.
put
((
None
,
None
,
None
,
None
))
return
result
=
json
.
loads
(
result
)
output
.
put
((
test
,
stdout
.
rstrip
(),
stderr
.
rstrip
(),
result
))
except
BaseException
:
output
.
put
((
None
,
None
,
None
,
None
))
raise
workers
=
[
Thread
(
target
=
work
)
for
i
in
range
(
ns
.
use_mp
)]
for
worker
in
workers
:
worker
.
start
()
finished
=
0
test_index
=
1
try
:
while
finished
<
ns
.
use_mp
:
test
,
stdout
,
stderr
,
result
=
output
.
get
()
if
test
is
None
:
finished
+=
1
continue
accumulate_result
(
test
,
result
)
if
not
ns
.
quiet
:
fmt
=
"[{1:{0}}{2}/{3}] {4}"
if
bad
else
"[{1:{0}}{2}] {4}"
print
(
fmt
.
format
(
test_count_width
,
test_index
,
test_count
,
len
(
bad
),
test
))
if
stdout
:
print
(
stdout
)
if
stderr
:
print
(
stderr
,
file
=
sys
.
stderr
)
sys
.
stdout
.
flush
()
sys
.
stderr
.
flush
()
if
result
[
0
]
==
INTERRUPTED
:
raise
KeyboardInterrupt
if
result
[
0
]
==
CHILD_ERROR
:
raise
Exception
(
"Child error on {}: {}"
.
format
(
test
,
result
[
1
]))
test_index
+=
1
except
KeyboardInterrupt
:
interrupted
=
True
pending
.
interrupted
=
True
for
worker
in
workers
:
worker
.
join
()
else
:
for
test_index
,
test
in
enumerate
(
tests
,
1
):
if
not
ns
.
quiet
:
fmt
=
"[{1:{0}}{2}/{3}] {4}"
if
bad
else
"[{1:{0}}{2}] {4}"
print
(
fmt
.
format
(
test_count_width
,
test_index
,
test_count
,
len
(
bad
),
test
))
sys
.
stdout
.
flush
()
if
ns
.
trace
:
# If we're tracing code coverage, then we don't exit with status
# if on a false return value from main.
tracer
.
runctx
(
'runtest(test, ns.verbose, ns.quiet, timeout=ns.timeout)'
,
globals
=
globals
(),
locals
=
vars
())
else
:
try
:
result
=
runtest
(
test
,
ns
.
verbose
,
ns
.
quiet
,
ns
.
huntrleaks
,
output_on_failure
=
ns
.
verbose3
,
timeout
=
ns
.
timeout
,
failfast
=
ns
.
failfast
,
match_tests
=
ns
.
match_tests
)
accumulate_result
(
test
,
result
)
except
KeyboardInterrupt
:
interrupted
=
True
break
if
ns
.
findleaks
:
gc
.
collect
()
if
gc
.
garbage
:
print
(
"Warning: test created"
,
len
(
gc
.
garbage
),
end
=
' '
)
print
(
"uncollectable object(s)."
)
# move the uncollectable objects somewhere so we don't see
# them again
found_garbage
.
extend
(
gc
.
garbage
)
del
gc
.
garbage
[:]
# Unload the newly imported modules (best effort finalization)
for
module
in
sys
.
modules
.
keys
():
if
module
not
in
save_modules
and
module
.
startswith
(
"test."
):
support
.
unload
(
module
)
if
interrupted
:
# print a newline after ^C
print
()
print
(
"Test suite interrupted by signal SIGINT."
)
omitted
=
set
(
selected
)
-
set
(
good
)
-
set
(
bad
)
-
set
(
skipped
)
print
(
count
(
len
(
omitted
),
"test"
),
"omitted:"
)
printlist
(
omitted
)
if
good
and
not
ns
.
quiet
:
if
not
bad
and
not
skipped
and
not
interrupted
and
len
(
good
)
>
1
:
print
(
"All"
,
end
=
' '
)
print
(
count
(
len
(
good
),
"test"
),
"OK."
)
if
ns
.
print_slow
:
test_times
.
sort
(
reverse
=
True
)
print
(
"10 slowest tests:"
)
for
time
,
test
in
test_times
[:
10
]:
print
(
"
%
s:
%.1
fs"
%
(
test
,
time
))
if
bad
:
print
(
count
(
len
(
bad
),
"test"
),
"failed:"
)
printlist
(
bad
)
if
environment_changed
:
print
(
"{} altered the execution environment:"
.
format
(
count
(
len
(
environment_changed
),
"test"
)))
printlist
(
environment_changed
)
if
skipped
and
not
ns
.
quiet
:
print
(
count
(
len
(
skipped
),
"test"
),
"skipped:"
)
printlist
(
skipped
)
if
ns
.
verbose2
and
bad
:
print
(
"Re-running failed tests in verbose mode"
)
for
test
in
bad
[:]:
print
(
"Re-running test
%
r in verbose mode"
%
test
)
sys
.
stdout
.
flush
()
try
:
ns
.
verbose
=
True
ok
=
runtest
(
test
,
True
,
ns
.
quiet
,
ns
.
huntrleaks
,
timeout
=
ns
.
timeout
)
except
KeyboardInterrupt
:
# print a newline separate from the ^C
print
()
break
else
:
if
ok
[
0
]
in
{
PASSED
,
ENV_CHANGED
,
SKIPPED
,
RESOURCE_DENIED
}:
bad
.
remove
(
test
)
else
:
if
bad
:
print
(
count
(
len
(
bad
),
'test'
),
"failed again:"
)
printlist
(
bad
)
if
ns
.
single
:
if
next_single_test
:
with
open
(
filename
,
'w'
)
as
fp
:
fp
.
write
(
next_single_test
+
'
\n
'
)
else
:
os
.
unlink
(
filename
)
if
ns
.
trace
:
r
=
tracer
.
results
()
r
.
write_results
(
show_missing
=
True
,
summary
=
True
,
coverdir
=
ns
.
coverdir
)
if
ns
.
runleaks
:
os
.
system
(
"leaks
%
d"
%
os
.
getpid
())
sys
.
exit
(
len
(
bad
)
>
0
or
interrupted
)
# small set of tests to determine if we have a basically functioning interpreter
# (i.e. if any of these fail, then anything else is likely to follow)
STDTESTS
=
[
'test_grammar'
,
'test_opcodes'
,
'test_dict'
,
'test_builtin'
,
'test_exceptions'
,
'test_types'
,
'test_unittest'
,
'test_doctest'
,
'test_doctest2'
,
'test_support'
]
# set of tests that we don't want to be executed when using regrtest
NOTTESTS
=
set
()
def
findtests
(
testdir
=
None
,
stdtests
=
STDTESTS
,
nottests
=
NOTTESTS
):
"""Return a list of all applicable test modules."""
testdir
=
findtestdir
(
testdir
)
names
=
os
.
listdir
(
testdir
)
tests
=
[]
others
=
set
(
stdtests
)
|
nottests
for
name
in
names
:
mod
,
ext
=
os
.
path
.
splitext
(
name
)
if
mod
[:
5
]
==
"test_"
and
ext
in
(
".py"
,
""
)
and
mod
not
in
others
:
tests
.
append
(
mod
)
return
stdtests
+
sorted
(
tests
)
# We do not use a generator so multiple threads can call next().
class
MultiprocessTests
(
object
):
"""A thread-safe iterator over tests for multiprocess mode."""
def
__init__
(
self
,
tests
):
self
.
interrupted
=
False
self
.
lock
=
threading
.
Lock
()
self
.
tests
=
tests
def
__iter__
(
self
):
return
self
def
__next__
(
self
):
with
self
.
lock
:
if
self
.
interrupted
:
raise
StopIteration
(
'tests interrupted'
)
return
next
(
self
.
tests
)
def
replace_stdout
():
"""Set stdout encoder error handler to backslashreplace (as stderr error
handler) to avoid UnicodeEncodeError when printing a traceback"""
import
atexit
stdout
=
sys
.
stdout
sys
.
stdout
=
open
(
stdout
.
fileno
(),
'w'
,
encoding
=
stdout
.
encoding
,
errors
=
"backslashreplace"
,
closefd
=
False
,
newline
=
'
\n
'
)
def
restore_stdout
():
sys
.
stdout
.
close
()
sys
.
stdout
=
stdout
atexit
.
register
(
restore_stdout
)
def
runtest
(
test
,
verbose
,
quiet
,
huntrleaks
=
False
,
use_resources
=
None
,
output_on_failure
=
False
,
failfast
=
False
,
match_tests
=
None
,
timeout
=
None
):
"""Run a single test.
test -- the name of the test
verbose -- if true, print more messages
quiet -- if true, don't print 'skipped' messages (probably redundant)
huntrleaks -- run multiple times to test for leaks; requires a debug
build; a triple corresponding to -R's three arguments
use_resources -- list of extra resources to use
output_on_failure -- if true, display test output on failure
timeout -- dump the traceback and exit if a test takes more than
timeout seconds
failfast, match_tests -- See regrtest command-line flags for these.
Returns the tuple result, test_time, where result is one of the constants:
INTERRUPTED KeyboardInterrupt when run under -j
RESOURCE_DENIED test skipped because resource denied
SKIPPED test skipped for some other reason
ENV_CHANGED test failed because it changed the execution environment
FAILED test failed
PASSED test passed
"""
if
use_resources
is
not
None
:
support
.
use_resources
=
use_resources
use_timeout
=
(
timeout
is
not
None
)
if
use_timeout
:
faulthandler
.
dump_traceback_later
(
timeout
,
exit
=
True
)
try
:
support
.
match_tests
=
match_tests
if
failfast
:
support
.
failfast
=
True
if
output_on_failure
:
support
.
verbose
=
True
# Reuse the same instance to all calls to runtest(). Some
# tests keep a reference to sys.stdout or sys.stderr
# (eg. test_argparse).
if
runtest
.
stringio
is
None
:
stream
=
io
.
StringIO
()
runtest
.
stringio
=
stream
else
:
stream
=
runtest
.
stringio
stream
.
seek
(
0
)
stream
.
truncate
()
orig_stdout
=
sys
.
stdout
orig_stderr
=
sys
.
stderr
try
:
sys
.
stdout
=
stream
sys
.
stderr
=
stream
result
=
runtest_inner
(
test
,
verbose
,
quiet
,
huntrleaks
,
display_failure
=
False
)
if
result
[
0
]
==
FAILED
:
output
=
stream
.
getvalue
()
orig_stderr
.
write
(
output
)
orig_stderr
.
flush
()
finally
:
sys
.
stdout
=
orig_stdout
sys
.
stderr
=
orig_stderr
else
:
support
.
verbose
=
verbose
# Tell tests to be moderately quiet
result
=
runtest_inner
(
test
,
verbose
,
quiet
,
huntrleaks
,
display_failure
=
not
verbose
)
return
result
finally
:
if
use_timeout
:
faulthandler
.
cancel_dump_traceback_later
()
cleanup_test_droppings
(
test
,
verbose
)
runtest
.
stringio
=
None
# Unit tests are supposed to leave the execution environment unchanged
# once they complete. But sometimes tests have bugs, especially when
# tests fail, and the changes to environment go on to mess up other
# tests. This can cause issues with buildbot stability, since tests
# are run in random order and so problems may appear to come and go.
# There are a few things we can save and restore to mitigate this, and
# the following context manager handles this task.
class
saved_test_environment
:
"""Save bits of the test environment and restore them at block exit.
with saved_test_environment(testname, verbose, quiet):
#stuff
Unless quiet is True, a warning is printed to stderr if any of
the saved items was changed by the test. The attribute 'changed'
is initially False, but is set to True if a change is detected.
If verbose is more than 1, the before and after state of changed
items is also printed.
"""
changed
=
False
def
__init__
(
self
,
testname
,
verbose
=
0
,
quiet
=
False
):
self
.
testname
=
testname
self
.
verbose
=
verbose
self
.
quiet
=
quiet
# To add things to save and restore, add a name XXX to the resources list
# and add corresponding get_XXX/restore_XXX functions. get_XXX should
# return the value to be saved and compared against a second call to the
# get function when test execution completes. restore_XXX should accept
# the saved value and restore the resource using it. It will be called if
# and only if a change in the value is detected.
#
# Note: XXX will have any '.' replaced with '_' characters when determining
# the corresponding method names.
resources
=
(
'sys.argv'
,
'cwd'
,
'sys.stdin'
,
'sys.stdout'
,
'sys.stderr'
,
'os.environ'
,
'sys.path'
,
'sys.path_hooks'
,
'__import__'
,
'warnings.filters'
,
'asyncore.socket_map'
,
'logging._handlers'
,
'logging._handlerList'
,
'sys.gettrace'
,
'sys.warnoptions'
,
# multiprocessing.process._cleanup() may release ref
# to a thread, so check processes first.
'multiprocessing.process._dangling'
,
'threading._dangling'
,
'sysconfig._CONFIG_VARS'
,
'sysconfig._INSTALL_SCHEMES'
,
'files'
,
'locale'
,
'warnings.showwarning'
,
)
def
get_sys_argv
(
self
):
return
id
(
sys
.
argv
),
sys
.
argv
,
sys
.
argv
[:]
def
restore_sys_argv
(
self
,
saved_argv
):
sys
.
argv
=
saved_argv
[
1
]
sys
.
argv
[:]
=
saved_argv
[
2
]
def
get_cwd
(
self
):
return
os
.
getcwd
()
def
restore_cwd
(
self
,
saved_cwd
):
os
.
chdir
(
saved_cwd
)
def
get_sys_stdout
(
self
):
return
sys
.
stdout
def
restore_sys_stdout
(
self
,
saved_stdout
):
sys
.
stdout
=
saved_stdout
def
get_sys_stderr
(
self
):
return
sys
.
stderr
def
restore_sys_stderr
(
self
,
saved_stderr
):
sys
.
stderr
=
saved_stderr
def
get_sys_stdin
(
self
):
return
sys
.
stdin
def
restore_sys_stdin
(
self
,
saved_stdin
):
sys
.
stdin
=
saved_stdin
def
get_os_environ
(
self
):
return
id
(
os
.
environ
),
os
.
environ
,
dict
(
os
.
environ
)
def
restore_os_environ
(
self
,
saved_environ
):
os
.
environ
=
saved_environ
[
1
]
os
.
environ
.
clear
()
os
.
environ
.
update
(
saved_environ
[
2
])
def
get_sys_path
(
self
):
return
id
(
sys
.
path
),
sys
.
path
,
sys
.
path
[:]
def
restore_sys_path
(
self
,
saved_path
):
sys
.
path
=
saved_path
[
1
]
sys
.
path
[:]
=
saved_path
[
2
]
def
get_sys_path_hooks
(
self
):
return
id
(
sys
.
path_hooks
),
sys
.
path_hooks
,
sys
.
path_hooks
[:]
def
restore_sys_path_hooks
(
self
,
saved_hooks
):
sys
.
path_hooks
=
saved_hooks
[
1
]
sys
.
path_hooks
[:]
=
saved_hooks
[
2
]
def
get_sys_gettrace
(
self
):
return
sys
.
gettrace
()
def
restore_sys_gettrace
(
self
,
trace_fxn
):
sys
.
settrace
(
trace_fxn
)
def
get___import__
(
self
):
return
builtins
.
__import__
def
restore___import__
(
self
,
import_
):
builtins
.
__import__
=
import_
def
get_warnings_filters
(
self
):
return
id
(
warnings
.
filters
),
warnings
.
filters
,
warnings
.
filters
[:]
def
restore_warnings_filters
(
self
,
saved_filters
):
warnings
.
filters
=
saved_filters
[
1
]
warnings
.
filters
[:]
=
saved_filters
[
2
]
def
get_asyncore_socket_map
(
self
):
asyncore
=
sys
.
modules
.
get
(
'asyncore'
)
# XXX Making a copy keeps objects alive until __exit__ gets called.
return
asyncore
and
asyncore
.
socket_map
.
copy
()
or
{}
def
restore_asyncore_socket_map
(
self
,
saved_map
):
asyncore
=
sys
.
modules
.
get
(
'asyncore'
)
if
asyncore
is
not
None
:
asyncore
.
close_all
(
ignore_all
=
True
)
asyncore
.
socket_map
.
update
(
saved_map
)
def
get_shutil_archive_formats
(
self
):
# we could call get_archives_formats() but that only returns the
# registry keys; we want to check the values too (the functions that
# are registered)
return
shutil
.
_ARCHIVE_FORMATS
,
shutil
.
_ARCHIVE_FORMATS
.
copy
()
def
restore_shutil_archive_formats
(
self
,
saved
):
shutil
.
_ARCHIVE_FORMATS
=
saved
[
0
]
shutil
.
_ARCHIVE_FORMATS
.
clear
()
shutil
.
_ARCHIVE_FORMATS
.
update
(
saved
[
1
])
def
get_shutil_unpack_formats
(
self
):
return
shutil
.
_UNPACK_FORMATS
,
shutil
.
_UNPACK_FORMATS
.
copy
()
def
restore_shutil_unpack_formats
(
self
,
saved
):
shutil
.
_UNPACK_FORMATS
=
saved
[
0
]
shutil
.
_UNPACK_FORMATS
.
clear
()
shutil
.
_UNPACK_FORMATS
.
update
(
saved
[
1
])
def
get_logging__handlers
(
self
):
# _handlers is a WeakValueDictionary
return
id
(
logging
.
_handlers
),
logging
.
_handlers
,
logging
.
_handlers
.
copy
()
def
restore_logging__handlers
(
self
,
saved_handlers
):
# Can't easily revert the logging state
pass
def
get_logging__handlerList
(
self
):
# _handlerList is a list of weakrefs to handlers
return
id
(
logging
.
_handlerList
),
logging
.
_handlerList
,
logging
.
_handlerList
[:]
def
restore_logging__handlerList
(
self
,
saved_handlerList
):
# Can't easily revert the logging state
pass
def
get_sys_warnoptions
(
self
):
return
id
(
sys
.
warnoptions
),
sys
.
warnoptions
,
sys
.
warnoptions
[:]
def
restore_sys_warnoptions
(
self
,
saved_options
):
sys
.
warnoptions
=
saved_options
[
1
]
sys
.
warnoptions
[:]
=
saved_options
[
2
]
# Controlling dangling references to Thread objects can make it easier
# to track reference leaks.
def
get_threading__dangling
(
self
):
if
not
threading
:
return
None
# This copies the weakrefs without making any strong reference
return
threading
.
_dangling
.
copy
()
def
restore_threading__dangling
(
self
,
saved
):
if
not
threading
:
return
threading
.
_dangling
.
clear
()
threading
.
_dangling
.
update
(
saved
)
# Same for Process objects
def
get_multiprocessing_process__dangling
(
self
):
if
not
multiprocessing
:
return
None
# Unjoined process objects can survive after process exits
multiprocessing
.
process
.
_cleanup
()
# This copies the weakrefs without making any strong reference
return
multiprocessing
.
process
.
_dangling
.
copy
()
def
restore_multiprocessing_process__dangling
(
self
,
saved
):
if
not
multiprocessing
:
return
multiprocessing
.
process
.
_dangling
.
clear
()
multiprocessing
.
process
.
_dangling
.
update
(
saved
)
def
get_sysconfig__CONFIG_VARS
(
self
):
# make sure the dict is initialized
sysconfig
.
get_config_var
(
'prefix'
)
return
(
id
(
sysconfig
.
_CONFIG_VARS
),
sysconfig
.
_CONFIG_VARS
,
dict
(
sysconfig
.
_CONFIG_VARS
))
def
restore_sysconfig__CONFIG_VARS
(
self
,
saved
):
sysconfig
.
_CONFIG_VARS
=
saved
[
1
]
sysconfig
.
_CONFIG_VARS
.
clear
()
sysconfig
.
_CONFIG_VARS
.
update
(
saved
[
2
])
def
get_sysconfig__INSTALL_SCHEMES
(
self
):
return
(
id
(
sysconfig
.
_INSTALL_SCHEMES
),
sysconfig
.
_INSTALL_SCHEMES
,
sysconfig
.
_INSTALL_SCHEMES
.
copy
())
def
restore_sysconfig__INSTALL_SCHEMES
(
self
,
saved
):
sysconfig
.
_INSTALL_SCHEMES
=
saved
[
1
]
sysconfig
.
_INSTALL_SCHEMES
.
clear
()
sysconfig
.
_INSTALL_SCHEMES
.
update
(
saved
[
2
])
def
get_files
(
self
):
return
sorted
(
fn
+
(
'/'
if
os
.
path
.
isdir
(
fn
)
else
''
)
for
fn
in
os
.
listdir
())
def
restore_files
(
self
,
saved_value
):
fn
=
support
.
TESTFN
if
fn
not
in
saved_value
and
(
fn
+
'/'
)
not
in
saved_value
:
if
os
.
path
.
isfile
(
fn
):
support
.
unlink
(
fn
)
elif
os
.
path
.
isdir
(
fn
):
support
.
rmtree
(
fn
)
_lc
=
[
getattr
(
locale
,
lc
)
for
lc
in
dir
(
locale
)
if
lc
.
startswith
(
'LC_'
)]
def
get_locale
(
self
):
pairings
=
[]
for
lc
in
self
.
_lc
:
try
:
pairings
.
append
((
lc
,
locale
.
setlocale
(
lc
,
None
)))
except
(
TypeError
,
ValueError
):
continue
return
pairings
def
restore_locale
(
self
,
saved
):
for
lc
,
setting
in
saved
:
locale
.
setlocale
(
lc
,
setting
)
def
get_warnings_showwarning
(
self
):
return
warnings
.
showwarning
def
restore_warnings_showwarning
(
self
,
fxn
):
warnings
.
showwarning
=
fxn
def
resource_info
(
self
):
for
name
in
self
.
resources
:
method_suffix
=
name
.
replace
(
'.'
,
'_'
)
get_name
=
'get_'
+
method_suffix
restore_name
=
'restore_'
+
method_suffix
yield
name
,
getattr
(
self
,
get_name
),
getattr
(
self
,
restore_name
)
def
__enter__
(
self
):
self
.
saved_values
=
dict
((
name
,
get
())
for
name
,
get
,
restore
in
self
.
resource_info
())
return
self
def
__exit__
(
self
,
exc_type
,
exc_val
,
exc_tb
):
saved_values
=
self
.
saved_values
del
self
.
saved_values
for
name
,
get
,
restore
in
self
.
resource_info
():
current
=
get
()
original
=
saved_values
.
pop
(
name
)
# Check for changes to the resource's value
if
current
!=
original
:
self
.
changed
=
True
restore
(
original
)
if
not
self
.
quiet
:
print
(
"Warning -- {} was modified by {}"
.
format
(
name
,
self
.
testname
),
file
=
sys
.
stderr
)
if
self
.
verbose
>
1
:
print
(
" Before: {}
\n
After: {} "
.
format
(
original
,
current
),
file
=
sys
.
stderr
)
return
False
def
runtest_inner
(
test
,
verbose
,
quiet
,
huntrleaks
=
False
,
display_failure
=
True
):
support
.
unload
(
test
)
test_time
=
0.0
refleak
=
False
# True if the test leaked references.
try
:
if
test
.
startswith
(
'test.'
):
abstest
=
test
else
:
# Always import it from the test package
abstest
=
'test.'
+
test
with
saved_test_environment
(
test
,
verbose
,
quiet
)
as
environment
:
start_time
=
time
.
time
()
the_module
=
importlib
.
import_module
(
abstest
)
# If the test has a test_main, that will run the appropriate
# tests. If not, use normal unittest test loading.
test_runner
=
getattr
(
the_module
,
"test_main"
,
None
)
if
test_runner
is
None
:
def
test_runner
():
loader
=
unittest
.
TestLoader
()
tests
=
loader
.
loadTestsFromModule
(
the_module
)
for
error
in
loader
.
errors
:
print
(
error
,
file
=
sys
.
stderr
)
if
loader
.
errors
:
raise
Exception
(
"errors while loading tests"
)
support
.
run_unittest
(
tests
)
test_runner
()
if
huntrleaks
:
refleak
=
dash_R
(
the_module
,
test
,
test_runner
,
huntrleaks
)
test_time
=
time
.
time
()
-
start_time
except
support
.
ResourceDenied
as
msg
:
if
not
quiet
:
print
(
test
,
"skipped --"
,
msg
)
sys
.
stdout
.
flush
()
return
RESOURCE_DENIED
,
test_time
except
unittest
.
SkipTest
as
msg
:
if
not
quiet
:
print
(
test
,
"skipped --"
,
msg
)
sys
.
stdout
.
flush
()
return
SKIPPED
,
test_time
except
KeyboardInterrupt
:
raise
except
support
.
TestFailed
as
msg
:
if
display_failure
:
print
(
"test"
,
test
,
"failed --"
,
msg
,
file
=
sys
.
stderr
)
else
:
print
(
"test"
,
test
,
"failed"
,
file
=
sys
.
stderr
)
sys
.
stderr
.
flush
()
return
FAILED
,
test_time
except
:
msg
=
traceback
.
format_exc
()
print
(
"test"
,
test
,
"crashed --"
,
msg
,
file
=
sys
.
stderr
)
sys
.
stderr
.
flush
()
return
FAILED
,
test_time
else
:
if
refleak
:
return
FAILED
,
test_time
if
environment
.
changed
:
return
ENV_CHANGED
,
test_time
return
PASSED
,
test_time
def
cleanup_test_droppings
(
testname
,
verbose
):
import
shutil
import
stat
import
gc
# First kill any dangling references to open files etc.
# This can also issue some ResourceWarnings which would otherwise get
# triggered during the following test run, and possibly produce failures.
gc
.
collect
()
# Try to clean up junk commonly left behind. While tests shouldn't leave
# any files or directories behind, when a test fails that can be tedious
# for it to arrange. The consequences can be especially nasty on Windows,
# since if a test leaves a file open, it cannot be deleted by name (while
# there's nothing we can do about that here either, we can display the
# name of the offending test, which is a real help).
for
name
in
(
support
.
TESTFN
,
"db_home"
,
):
if
not
os
.
path
.
exists
(
name
):
continue
if
os
.
path
.
isdir
(
name
):
kind
,
nuker
=
"directory"
,
shutil
.
rmtree
elif
os
.
path
.
isfile
(
name
):
kind
,
nuker
=
"file"
,
os
.
unlink
else
:
raise
SystemError
(
"os.path says
%
r exists but is neither "
"directory nor file"
%
name
)
if
verbose
:
print
(
"
%
r left behind
%
s
%
r"
%
(
testname
,
kind
,
name
))
try
:
# if we have chmod, fix possible permissions problems
# that might prevent cleanup
if
(
hasattr
(
os
,
'chmod'
)):
os
.
chmod
(
name
,
stat
.
S_IRWXU
|
stat
.
S_IRWXG
|
stat
.
S_IRWXO
)
nuker
(
name
)
except
Exception
as
msg
:
print
((
"
%
r left behind
%
s
%
r and it couldn't be "
"removed:
%
s"
%
(
testname
,
kind
,
name
,
msg
)),
file
=
sys
.
stderr
)
def
dash_R
(
the_module
,
test
,
indirect_test
,
huntrleaks
):
"""Run a test multiple times, looking for reference leaks.
Returns:
False if the test didn't leak references; True if we detected refleaks.
"""
# This code is hackish and inelegant, but it seems to do the job.
import
copyreg
import
collections.abc
if
not
hasattr
(
sys
,
'gettotalrefcount'
):
raise
Exception
(
"Tracking reference leaks requires a debug build "
"of Python"
)
# Save current values for dash_R_cleanup() to restore.
fs
=
warnings
.
filters
[:]
ps
=
copyreg
.
dispatch_table
.
copy
()
pic
=
sys
.
path_importer_cache
.
copy
()
try
:
import
zipimport
except
ImportError
:
zdc
=
None
# Run unmodified on platforms without zipimport support
else
:
zdc
=
zipimport
.
_zip_directory_cache
.
copy
()
abcs
=
{}
for
abc
in
[
getattr
(
collections
.
abc
,
a
)
for
a
in
collections
.
abc
.
__all__
]:
if
not
isabstract
(
abc
):
continue
for
obj
in
abc
.
__subclasses__
()
+
[
abc
]:
abcs
[
obj
]
=
obj
.
_abc_registry
.
copy
()
nwarmup
,
ntracked
,
fname
=
huntrleaks
fname
=
os
.
path
.
join
(
support
.
SAVEDCWD
,
fname
)
repcount
=
nwarmup
+
ntracked
rc_deltas
=
[
0
]
*
repcount
alloc_deltas
=
[
0
]
*
repcount
print
(
"beginning"
,
repcount
,
"repetitions"
,
file
=
sys
.
stderr
)
print
((
"1234567890"
*
(
repcount
//
10
+
1
))[:
repcount
],
file
=
sys
.
stderr
)
sys
.
stderr
.
flush
()
for
i
in
range
(
repcount
):
indirect_test
()
alloc_after
,
rc_after
=
dash_R_cleanup
(
fs
,
ps
,
pic
,
zdc
,
abcs
)
sys
.
stderr
.
write
(
'.'
)
sys
.
stderr
.
flush
()
if
i
>=
nwarmup
:
rc_deltas
[
i
]
=
rc_after
-
rc_before
alloc_deltas
[
i
]
=
alloc_after
-
alloc_before
alloc_before
,
rc_before
=
alloc_after
,
rc_after
print
(
file
=
sys
.
stderr
)
# These checkers return False on success, True on failure
def
check_rc_deltas
(
deltas
):
return
any
(
deltas
)
def
check_alloc_deltas
(
deltas
):
# At least 1/3rd of 0s
if
3
*
deltas
.
count
(
0
)
<
len
(
deltas
):
return
True
# Nothing else than 1s, 0s and -1s
if
not
set
(
deltas
)
<=
{
1
,
0
,
-
1
}:
return
True
return
False
failed
=
False
for
deltas
,
item_name
,
checker
in
[
(
rc_deltas
,
'references'
,
check_rc_deltas
),
(
alloc_deltas
,
'memory blocks'
,
check_alloc_deltas
)]:
if
checker
(
deltas
):
msg
=
'
%
s leaked
%
s
%
s, sum=
%
s'
%
(
test
,
deltas
[
nwarmup
:],
item_name
,
sum
(
deltas
))
print
(
msg
,
file
=
sys
.
stderr
)
sys
.
stderr
.
flush
()
with
open
(
fname
,
"a"
)
as
refrep
:
print
(
msg
,
file
=
refrep
)
refrep
.
flush
()
failed
=
True
return
failed
def
dash_R_cleanup
(
fs
,
ps
,
pic
,
zdc
,
abcs
):
import
gc
,
copyreg
import
_strptime
,
linecache
import
urllib.parse
,
urllib
.
request
,
mimetypes
,
doctest
import
struct
,
filecmp
,
collections
.
abc
from
distutils.dir_util
import
_path_created
from
weakref
import
WeakSet
# Clear the warnings registry, so they can be displayed again
for
mod
in
sys
.
modules
.
values
():
if
hasattr
(
mod
,
'__warningregistry__'
):
del
mod
.
__warningregistry__
# Restore some original values.
warnings
.
filters
[:]
=
fs
copyreg
.
dispatch_table
.
clear
()
copyreg
.
dispatch_table
.
update
(
ps
)
sys
.
path_importer_cache
.
clear
()
sys
.
path_importer_cache
.
update
(
pic
)
try
:
import
zipimport
except
ImportError
:
pass
# Run unmodified on platforms without zipimport support
else
:
zipimport
.
_zip_directory_cache
.
clear
()
zipimport
.
_zip_directory_cache
.
update
(
zdc
)
# clear type cache
sys
.
_clear_type_cache
()
# Clear ABC registries, restoring previously saved ABC registries.
for
abc
in
[
getattr
(
collections
.
abc
,
a
)
for
a
in
collections
.
abc
.
__all__
]:
if
not
isabstract
(
abc
):
continue
for
obj
in
abc
.
__subclasses__
()
+
[
abc
]:
obj
.
_abc_registry
=
abcs
.
get
(
obj
,
WeakSet
())
.
copy
()
obj
.
_abc_cache
.
clear
()
obj
.
_abc_negative_cache
.
clear
()
# Flush standard output, so that buffered data is sent to the OS and
# associated Python objects are reclaimed.
for
stream
in
(
sys
.
stdout
,
sys
.
stderr
,
sys
.
__stdout__
,
sys
.
__stderr__
):
if
stream
is
not
None
:
stream
.
flush
()
# Clear assorted module caches.
_path_created
.
clear
()
re
.
purge
()
_strptime
.
_regex_cache
.
clear
()
urllib
.
parse
.
clear_cache
()
urllib
.
request
.
urlcleanup
()
linecache
.
clearcache
()
mimetypes
.
_default_mime_types
()
filecmp
.
_cache
.
clear
()
struct
.
_clearcache
()
doctest
.
master
=
None
try
:
import
ctypes
except
ImportError
:
# Don't worry about resetting the cache if ctypes is not supported
pass
else
:
ctypes
.
_reset_cache
()
# Collect cyclic trash and read memory statistics immediately after.
func1
=
sys
.
getallocatedblocks
func2
=
sys
.
gettotalrefcount
gc
.
collect
()
return
func1
(),
func2
()
def
warm_caches
():
# char cache
s
=
bytes
(
range
(
256
))
for
i
in
range
(
256
):
s
[
i
:
i
+
1
]
# unicode cache
x
=
[
chr
(
i
)
for
i
in
range
(
256
)]
# int cache
x
=
list
(
range
(
-
5
,
257
))
def
findtestdir
(
path
=
None
):
return
path
or
os
.
path
.
dirname
(
__file__
)
or
os
.
curdir
def
removepy
(
names
):
if
not
names
:
return
for
idx
,
name
in
enumerate
(
names
):
basename
,
ext
=
os
.
path
.
splitext
(
name
)
if
ext
==
'.py'
:
names
[
idx
]
=
basename
def
count
(
n
,
word
):
if
n
==
1
:
return
"
%
d
%
s"
%
(
n
,
word
)
else
:
return
"
%
d
%
ss"
%
(
n
,
word
)
def
printlist
(
x
,
width
=
70
,
indent
=
4
):
"""Print the elements of iterable x to stdout.
Optional arg width (default 70) is the maximum line length.
Optional arg indent (default 4) is the number of blanks with which to
begin each line.
"""
from
textwrap
import
fill
blanks
=
' '
*
indent
# Print the sorted list: 'x' may be a '--random' list or a set()
print
(
fill
(
' '
.
join
(
str
(
elt
)
for
elt
in
sorted
(
x
)),
width
,
initial_indent
=
blanks
,
subsequent_indent
=
blanks
))
def
main_in_temp_cwd
():
"""Run main() in a temporary working directory."""
if
sysconfig
.
is_python_build
():
try
:
os
.
mkdir
(
TEMPDIR
)
except
FileExistsError
:
pass
# Define a writable temp dir that will be used as cwd while running
# the tests. The name of the dir includes the pid to allow parallel
# testing (see the -j option).
test_cwd
=
'test_python_{}'
.
format
(
os
.
getpid
())
test_cwd
=
os
.
path
.
join
(
TEMPDIR
,
test_cwd
)
# Run the tests in a context manager that temporarily changes the CWD to a
# temporary and writable directory. If it's not possible to create or
# change the CWD, the original CWD will be used. The original CWD is
# available from support.SAVEDCWD.
with
support
.
temp_cwd
(
test_cwd
,
quiet
=
True
):
main
()
if
__name__
==
'__main__'
:
if
__name__
==
'__main__'
:
...
...
Lib/test/test_regrtest.py
Dosyayı görüntüle @
3844fe5e
...
@@ -7,7 +7,8 @@ import faulthandler
...
@@ -7,7 +7,8 @@ import faulthandler
import
getopt
import
getopt
import
os.path
import
os.path
import
unittest
import
unittest
from
test
import
regrtest
,
support
,
libregrtest
from
test
import
libregrtest
from
test
import
support
class
ParseArgsTestCase
(
unittest
.
TestCase
):
class
ParseArgsTestCase
(
unittest
.
TestCase
):
...
@@ -15,7 +16,7 @@ class ParseArgsTestCase(unittest.TestCase):
...
@@ -15,7 +16,7 @@ class ParseArgsTestCase(unittest.TestCase):
def
checkError
(
self
,
args
,
msg
):
def
checkError
(
self
,
args
,
msg
):
with
support
.
captured_stderr
()
as
err
,
self
.
assertRaises
(
SystemExit
):
with
support
.
captured_stderr
()
as
err
,
self
.
assertRaises
(
SystemExit
):
regrtest
.
_parse_args
(
args
)
lib
regrtest
.
_parse_args
(
args
)
self
.
assertIn
(
msg
,
err
.
getvalue
())
self
.
assertIn
(
msg
,
err
.
getvalue
())
def
test_help
(
self
):
def
test_help
(
self
):
...
@@ -23,82 +24,82 @@ class ParseArgsTestCase(unittest.TestCase):
...
@@ -23,82 +24,82 @@ class ParseArgsTestCase(unittest.TestCase):
with
self
.
subTest
(
opt
=
opt
):
with
self
.
subTest
(
opt
=
opt
):
with
support
.
captured_stdout
()
as
out
,
\
with
support
.
captured_stdout
()
as
out
,
\
self
.
assertRaises
(
SystemExit
):
self
.
assertRaises
(
SystemExit
):
regrtest
.
_parse_args
([
opt
])
lib
regrtest
.
_parse_args
([
opt
])
self
.
assertIn
(
'Run Python regression tests.'
,
out
.
getvalue
())
self
.
assertIn
(
'Run Python regression tests.'
,
out
.
getvalue
())
@unittest.skipUnless
(
hasattr
(
faulthandler
,
'dump_traceback_later'
),
@unittest.skipUnless
(
hasattr
(
faulthandler
,
'dump_traceback_later'
),
"faulthandler.dump_traceback_later() required"
)
"faulthandler.dump_traceback_later() required"
)
def
test_timeout
(
self
):
def
test_timeout
(
self
):
ns
=
regrtest
.
_parse_args
([
'--timeout'
,
'4.2'
])
ns
=
lib
regrtest
.
_parse_args
([
'--timeout'
,
'4.2'
])
self
.
assertEqual
(
ns
.
timeout
,
4.2
)
self
.
assertEqual
(
ns
.
timeout
,
4.2
)
self
.
checkError
([
'--timeout'
],
'expected one argument'
)
self
.
checkError
([
'--timeout'
],
'expected one argument'
)
self
.
checkError
([
'--timeout'
,
'foo'
],
'invalid float value'
)
self
.
checkError
([
'--timeout'
,
'foo'
],
'invalid float value'
)
def
test_wait
(
self
):
def
test_wait
(
self
):
ns
=
regrtest
.
_parse_args
([
'--wait'
])
ns
=
lib
regrtest
.
_parse_args
([
'--wait'
])
self
.
assertTrue
(
ns
.
wait
)
self
.
assertTrue
(
ns
.
wait
)
def
test_slaveargs
(
self
):
def
test_slaveargs
(
self
):
ns
=
regrtest
.
_parse_args
([
'--slaveargs'
,
'[[], {}]'
])
ns
=
lib
regrtest
.
_parse_args
([
'--slaveargs'
,
'[[], {}]'
])
self
.
assertEqual
(
ns
.
slaveargs
,
'[[], {}]'
)
self
.
assertEqual
(
ns
.
slaveargs
,
'[[], {}]'
)
self
.
checkError
([
'--slaveargs'
],
'expected one argument'
)
self
.
checkError
([
'--slaveargs'
],
'expected one argument'
)
def
test_start
(
self
):
def
test_start
(
self
):
for
opt
in
'-S'
,
'--start'
:
for
opt
in
'-S'
,
'--start'
:
with
self
.
subTest
(
opt
=
opt
):
with
self
.
subTest
(
opt
=
opt
):
ns
=
regrtest
.
_parse_args
([
opt
,
'foo'
])
ns
=
lib
regrtest
.
_parse_args
([
opt
,
'foo'
])
self
.
assertEqual
(
ns
.
start
,
'foo'
)
self
.
assertEqual
(
ns
.
start
,
'foo'
)
self
.
checkError
([
opt
],
'expected one argument'
)
self
.
checkError
([
opt
],
'expected one argument'
)
def
test_verbose
(
self
):
def
test_verbose
(
self
):
ns
=
regrtest
.
_parse_args
([
'-v'
])
ns
=
lib
regrtest
.
_parse_args
([
'-v'
])
self
.
assertEqual
(
ns
.
verbose
,
1
)
self
.
assertEqual
(
ns
.
verbose
,
1
)
ns
=
regrtest
.
_parse_args
([
'-vvv'
])
ns
=
lib
regrtest
.
_parse_args
([
'-vvv'
])
self
.
assertEqual
(
ns
.
verbose
,
3
)
self
.
assertEqual
(
ns
.
verbose
,
3
)
ns
=
regrtest
.
_parse_args
([
'--verbose'
])
ns
=
lib
regrtest
.
_parse_args
([
'--verbose'
])
self
.
assertEqual
(
ns
.
verbose
,
1
)
self
.
assertEqual
(
ns
.
verbose
,
1
)
ns
=
regrtest
.
_parse_args
([
'--verbose'
]
*
3
)
ns
=
lib
regrtest
.
_parse_args
([
'--verbose'
]
*
3
)
self
.
assertEqual
(
ns
.
verbose
,
3
)
self
.
assertEqual
(
ns
.
verbose
,
3
)
ns
=
regrtest
.
_parse_args
([])
ns
=
lib
regrtest
.
_parse_args
([])
self
.
assertEqual
(
ns
.
verbose
,
0
)
self
.
assertEqual
(
ns
.
verbose
,
0
)
def
test_verbose2
(
self
):
def
test_verbose2
(
self
):
for
opt
in
'-w'
,
'--verbose2'
:
for
opt
in
'-w'
,
'--verbose2'
:
with
self
.
subTest
(
opt
=
opt
):
with
self
.
subTest
(
opt
=
opt
):
ns
=
regrtest
.
_parse_args
([
opt
])
ns
=
lib
regrtest
.
_parse_args
([
opt
])
self
.
assertTrue
(
ns
.
verbose2
)
self
.
assertTrue
(
ns
.
verbose2
)
def
test_verbose3
(
self
):
def
test_verbose3
(
self
):
for
opt
in
'-W'
,
'--verbose3'
:
for
opt
in
'-W'
,
'--verbose3'
:
with
self
.
subTest
(
opt
=
opt
):
with
self
.
subTest
(
opt
=
opt
):
ns
=
regrtest
.
_parse_args
([
opt
])
ns
=
lib
regrtest
.
_parse_args
([
opt
])
self
.
assertTrue
(
ns
.
verbose3
)
self
.
assertTrue
(
ns
.
verbose3
)
def
test_quiet
(
self
):
def
test_quiet
(
self
):
for
opt
in
'-q'
,
'--quiet'
:
for
opt
in
'-q'
,
'--quiet'
:
with
self
.
subTest
(
opt
=
opt
):
with
self
.
subTest
(
opt
=
opt
):
ns
=
regrtest
.
_parse_args
([
opt
])
ns
=
lib
regrtest
.
_parse_args
([
opt
])
self
.
assertTrue
(
ns
.
quiet
)
self
.
assertTrue
(
ns
.
quiet
)
self
.
assertEqual
(
ns
.
verbose
,
0
)
self
.
assertEqual
(
ns
.
verbose
,
0
)
def
test_slow
(
self
):
def
test_slow
(
self
):
for
opt
in
'-o'
,
'--slow'
:
for
opt
in
'-o'
,
'--slow'
:
with
self
.
subTest
(
opt
=
opt
):
with
self
.
subTest
(
opt
=
opt
):
ns
=
regrtest
.
_parse_args
([
opt
])
ns
=
lib
regrtest
.
_parse_args
([
opt
])
self
.
assertTrue
(
ns
.
print_slow
)
self
.
assertTrue
(
ns
.
print_slow
)
def
test_header
(
self
):
def
test_header
(
self
):
ns
=
regrtest
.
_parse_args
([
'--header'
])
ns
=
lib
regrtest
.
_parse_args
([
'--header'
])
self
.
assertTrue
(
ns
.
header
)
self
.
assertTrue
(
ns
.
header
)
def
test_randomize
(
self
):
def
test_randomize
(
self
):
for
opt
in
'-r'
,
'--randomize'
:
for
opt
in
'-r'
,
'--randomize'
:
with
self
.
subTest
(
opt
=
opt
):
with
self
.
subTest
(
opt
=
opt
):
ns
=
regrtest
.
_parse_args
([
opt
])
ns
=
lib
regrtest
.
_parse_args
([
opt
])
self
.
assertTrue
(
ns
.
randomize
)
self
.
assertTrue
(
ns
.
randomize
)
def
test_randseed
(
self
):
def
test_randseed
(
self
):
ns
=
regrtest
.
_parse_args
([
'--randseed'
,
'12345'
])
ns
=
lib
regrtest
.
_parse_args
([
'--randseed'
,
'12345'
])
self
.
assertEqual
(
ns
.
random_seed
,
12345
)
self
.
assertEqual
(
ns
.
random_seed
,
12345
)
self
.
assertTrue
(
ns
.
randomize
)
self
.
assertTrue
(
ns
.
randomize
)
self
.
checkError
([
'--randseed'
],
'expected one argument'
)
self
.
checkError
([
'--randseed'
],
'expected one argument'
)
...
@@ -107,7 +108,7 @@ class ParseArgsTestCase(unittest.TestCase):
...
@@ -107,7 +108,7 @@ class ParseArgsTestCase(unittest.TestCase):
def
test_fromfile
(
self
):
def
test_fromfile
(
self
):
for
opt
in
'-f'
,
'--fromfile'
:
for
opt
in
'-f'
,
'--fromfile'
:
with
self
.
subTest
(
opt
=
opt
):
with
self
.
subTest
(
opt
=
opt
):
ns
=
regrtest
.
_parse_args
([
opt
,
'foo'
])
ns
=
lib
regrtest
.
_parse_args
([
opt
,
'foo'
])
self
.
assertEqual
(
ns
.
fromfile
,
'foo'
)
self
.
assertEqual
(
ns
.
fromfile
,
'foo'
)
self
.
checkError
([
opt
],
'expected one argument'
)
self
.
checkError
([
opt
],
'expected one argument'
)
self
.
checkError
([
opt
,
'foo'
,
'-s'
],
"don't go together"
)
self
.
checkError
([
opt
,
'foo'
,
'-s'
],
"don't go together"
)
...
@@ -115,42 +116,42 @@ class ParseArgsTestCase(unittest.TestCase):
...
@@ -115,42 +116,42 @@ class ParseArgsTestCase(unittest.TestCase):
def
test_exclude
(
self
):
def
test_exclude
(
self
):
for
opt
in
'-x'
,
'--exclude'
:
for
opt
in
'-x'
,
'--exclude'
:
with
self
.
subTest
(
opt
=
opt
):
with
self
.
subTest
(
opt
=
opt
):
ns
=
regrtest
.
_parse_args
([
opt
])
ns
=
lib
regrtest
.
_parse_args
([
opt
])
self
.
assertTrue
(
ns
.
exclude
)
self
.
assertTrue
(
ns
.
exclude
)
def
test_single
(
self
):
def
test_single
(
self
):
for
opt
in
'-s'
,
'--single'
:
for
opt
in
'-s'
,
'--single'
:
with
self
.
subTest
(
opt
=
opt
):
with
self
.
subTest
(
opt
=
opt
):
ns
=
regrtest
.
_parse_args
([
opt
])
ns
=
lib
regrtest
.
_parse_args
([
opt
])
self
.
assertTrue
(
ns
.
single
)
self
.
assertTrue
(
ns
.
single
)
self
.
checkError
([
opt
,
'-f'
,
'foo'
],
"don't go together"
)
self
.
checkError
([
opt
,
'-f'
,
'foo'
],
"don't go together"
)
def
test_match
(
self
):
def
test_match
(
self
):
for
opt
in
'-m'
,
'--match'
:
for
opt
in
'-m'
,
'--match'
:
with
self
.
subTest
(
opt
=
opt
):
with
self
.
subTest
(
opt
=
opt
):
ns
=
regrtest
.
_parse_args
([
opt
,
'pattern'
])
ns
=
lib
regrtest
.
_parse_args
([
opt
,
'pattern'
])
self
.
assertEqual
(
ns
.
match_tests
,
'pattern'
)
self
.
assertEqual
(
ns
.
match_tests
,
'pattern'
)
self
.
checkError
([
opt
],
'expected one argument'
)
self
.
checkError
([
opt
],
'expected one argument'
)
def
test_failfast
(
self
):
def
test_failfast
(
self
):
for
opt
in
'-G'
,
'--failfast'
:
for
opt
in
'-G'
,
'--failfast'
:
with
self
.
subTest
(
opt
=
opt
):
with
self
.
subTest
(
opt
=
opt
):
ns
=
regrtest
.
_parse_args
([
opt
,
'-v'
])
ns
=
lib
regrtest
.
_parse_args
([
opt
,
'-v'
])
self
.
assertTrue
(
ns
.
failfast
)
self
.
assertTrue
(
ns
.
failfast
)
ns
=
regrtest
.
_parse_args
([
opt
,
'-W'
])
ns
=
lib
regrtest
.
_parse_args
([
opt
,
'-W'
])
self
.
assertTrue
(
ns
.
failfast
)
self
.
assertTrue
(
ns
.
failfast
)
self
.
checkError
([
opt
],
'-G/--failfast needs either -v or -W'
)
self
.
checkError
([
opt
],
'-G/--failfast needs either -v or -W'
)
def
test_use
(
self
):
def
test_use
(
self
):
for
opt
in
'-u'
,
'--use'
:
for
opt
in
'-u'
,
'--use'
:
with
self
.
subTest
(
opt
=
opt
):
with
self
.
subTest
(
opt
=
opt
):
ns
=
regrtest
.
_parse_args
([
opt
,
'gui,network'
])
ns
=
lib
regrtest
.
_parse_args
([
opt
,
'gui,network'
])
self
.
assertEqual
(
ns
.
use_resources
,
[
'gui'
,
'network'
])
self
.
assertEqual
(
ns
.
use_resources
,
[
'gui'
,
'network'
])
ns
=
regrtest
.
_parse_args
([
opt
,
'gui,none,network'
])
ns
=
lib
regrtest
.
_parse_args
([
opt
,
'gui,none,network'
])
self
.
assertEqual
(
ns
.
use_resources
,
[
'network'
])
self
.
assertEqual
(
ns
.
use_resources
,
[
'network'
])
expected
=
list
(
libregrtest
.
RESOURCE_NAMES
)
expected
=
list
(
libregrtest
.
RESOURCE_NAMES
)
expected
.
remove
(
'gui'
)
expected
.
remove
(
'gui'
)
ns
=
regrtest
.
_parse_args
([
opt
,
'all,-gui'
])
ns
=
lib
regrtest
.
_parse_args
([
opt
,
'all,-gui'
])
self
.
assertEqual
(
ns
.
use_resources
,
expected
)
self
.
assertEqual
(
ns
.
use_resources
,
expected
)
self
.
checkError
([
opt
],
'expected one argument'
)
self
.
checkError
([
opt
],
'expected one argument'
)
self
.
checkError
([
opt
,
'foo'
],
'invalid resource'
)
self
.
checkError
([
opt
,
'foo'
],
'invalid resource'
)
...
@@ -158,31 +159,31 @@ class ParseArgsTestCase(unittest.TestCase):
...
@@ -158,31 +159,31 @@ class ParseArgsTestCase(unittest.TestCase):
def
test_memlimit
(
self
):
def
test_memlimit
(
self
):
for
opt
in
'-M'
,
'--memlimit'
:
for
opt
in
'-M'
,
'--memlimit'
:
with
self
.
subTest
(
opt
=
opt
):
with
self
.
subTest
(
opt
=
opt
):
ns
=
regrtest
.
_parse_args
([
opt
,
'4G'
])
ns
=
lib
regrtest
.
_parse_args
([
opt
,
'4G'
])
self
.
assertEqual
(
ns
.
memlimit
,
'4G'
)
self
.
assertEqual
(
ns
.
memlimit
,
'4G'
)
self
.
checkError
([
opt
],
'expected one argument'
)
self
.
checkError
([
opt
],
'expected one argument'
)
def
test_testdir
(
self
):
def
test_testdir
(
self
):
ns
=
regrtest
.
_parse_args
([
'--testdir'
,
'foo'
])
ns
=
lib
regrtest
.
_parse_args
([
'--testdir'
,
'foo'
])
self
.
assertEqual
(
ns
.
testdir
,
os
.
path
.
join
(
support
.
SAVEDCWD
,
'foo'
))
self
.
assertEqual
(
ns
.
testdir
,
os
.
path
.
join
(
support
.
SAVEDCWD
,
'foo'
))
self
.
checkError
([
'--testdir'
],
'expected one argument'
)
self
.
checkError
([
'--testdir'
],
'expected one argument'
)
def
test_runleaks
(
self
):
def
test_runleaks
(
self
):
for
opt
in
'-L'
,
'--runleaks'
:
for
opt
in
'-L'
,
'--runleaks'
:
with
self
.
subTest
(
opt
=
opt
):
with
self
.
subTest
(
opt
=
opt
):
ns
=
regrtest
.
_parse_args
([
opt
])
ns
=
lib
regrtest
.
_parse_args
([
opt
])
self
.
assertTrue
(
ns
.
runleaks
)
self
.
assertTrue
(
ns
.
runleaks
)
def
test_huntrleaks
(
self
):
def
test_huntrleaks
(
self
):
for
opt
in
'-R'
,
'--huntrleaks'
:
for
opt
in
'-R'
,
'--huntrleaks'
:
with
self
.
subTest
(
opt
=
opt
):
with
self
.
subTest
(
opt
=
opt
):
ns
=
regrtest
.
_parse_args
([
opt
,
':'
])
ns
=
lib
regrtest
.
_parse_args
([
opt
,
':'
])
self
.
assertEqual
(
ns
.
huntrleaks
,
(
5
,
4
,
'reflog.txt'
))
self
.
assertEqual
(
ns
.
huntrleaks
,
(
5
,
4
,
'reflog.txt'
))
ns
=
regrtest
.
_parse_args
([
opt
,
'6:'
])
ns
=
lib
regrtest
.
_parse_args
([
opt
,
'6:'
])
self
.
assertEqual
(
ns
.
huntrleaks
,
(
6
,
4
,
'reflog.txt'
))
self
.
assertEqual
(
ns
.
huntrleaks
,
(
6
,
4
,
'reflog.txt'
))
ns
=
regrtest
.
_parse_args
([
opt
,
':3'
])
ns
=
lib
regrtest
.
_parse_args
([
opt
,
':3'
])
self
.
assertEqual
(
ns
.
huntrleaks
,
(
5
,
3
,
'reflog.txt'
))
self
.
assertEqual
(
ns
.
huntrleaks
,
(
5
,
3
,
'reflog.txt'
))
ns
=
regrtest
.
_parse_args
([
opt
,
'6:3:leaks.log'
])
ns
=
lib
regrtest
.
_parse_args
([
opt
,
'6:3:leaks.log'
])
self
.
assertEqual
(
ns
.
huntrleaks
,
(
6
,
3
,
'leaks.log'
))
self
.
assertEqual
(
ns
.
huntrleaks
,
(
6
,
3
,
'leaks.log'
))
self
.
checkError
([
opt
],
'expected one argument'
)
self
.
checkError
([
opt
],
'expected one argument'
)
self
.
checkError
([
opt
,
'6'
],
self
.
checkError
([
opt
,
'6'
],
...
@@ -193,7 +194,7 @@ class ParseArgsTestCase(unittest.TestCase):
...
@@ -193,7 +194,7 @@ class ParseArgsTestCase(unittest.TestCase):
def
test_multiprocess
(
self
):
def
test_multiprocess
(
self
):
for
opt
in
'-j'
,
'--multiprocess'
:
for
opt
in
'-j'
,
'--multiprocess'
:
with
self
.
subTest
(
opt
=
opt
):
with
self
.
subTest
(
opt
=
opt
):
ns
=
regrtest
.
_parse_args
([
opt
,
'2'
])
ns
=
lib
regrtest
.
_parse_args
([
opt
,
'2'
])
self
.
assertEqual
(
ns
.
use_mp
,
2
)
self
.
assertEqual
(
ns
.
use_mp
,
2
)
self
.
checkError
([
opt
],
'expected one argument'
)
self
.
checkError
([
opt
],
'expected one argument'
)
self
.
checkError
([
opt
,
'foo'
],
'invalid int value'
)
self
.
checkError
([
opt
,
'foo'
],
'invalid int value'
)
...
@@ -204,13 +205,13 @@ class ParseArgsTestCase(unittest.TestCase):
...
@@ -204,13 +205,13 @@ class ParseArgsTestCase(unittest.TestCase):
def
test_coverage
(
self
):
def
test_coverage
(
self
):
for
opt
in
'-T'
,
'--coverage'
:
for
opt
in
'-T'
,
'--coverage'
:
with
self
.
subTest
(
opt
=
opt
):
with
self
.
subTest
(
opt
=
opt
):
ns
=
regrtest
.
_parse_args
([
opt
])
ns
=
lib
regrtest
.
_parse_args
([
opt
])
self
.
assertTrue
(
ns
.
trace
)
self
.
assertTrue
(
ns
.
trace
)
def
test_coverdir
(
self
):
def
test_coverdir
(
self
):
for
opt
in
'-D'
,
'--coverdir'
:
for
opt
in
'-D'
,
'--coverdir'
:
with
self
.
subTest
(
opt
=
opt
):
with
self
.
subTest
(
opt
=
opt
):
ns
=
regrtest
.
_parse_args
([
opt
,
'foo'
])
ns
=
lib
regrtest
.
_parse_args
([
opt
,
'foo'
])
self
.
assertEqual
(
ns
.
coverdir
,
self
.
assertEqual
(
ns
.
coverdir
,
os
.
path
.
join
(
support
.
SAVEDCWD
,
'foo'
))
os
.
path
.
join
(
support
.
SAVEDCWD
,
'foo'
))
self
.
checkError
([
opt
],
'expected one argument'
)
self
.
checkError
([
opt
],
'expected one argument'
)
...
@@ -218,13 +219,13 @@ class ParseArgsTestCase(unittest.TestCase):
...
@@ -218,13 +219,13 @@ class ParseArgsTestCase(unittest.TestCase):
def
test_nocoverdir
(
self
):
def
test_nocoverdir
(
self
):
for
opt
in
'-N'
,
'--nocoverdir'
:
for
opt
in
'-N'
,
'--nocoverdir'
:
with
self
.
subTest
(
opt
=
opt
):
with
self
.
subTest
(
opt
=
opt
):
ns
=
regrtest
.
_parse_args
([
opt
])
ns
=
lib
regrtest
.
_parse_args
([
opt
])
self
.
assertIsNone
(
ns
.
coverdir
)
self
.
assertIsNone
(
ns
.
coverdir
)
def
test_threshold
(
self
):
def
test_threshold
(
self
):
for
opt
in
'-t'
,
'--threshold'
:
for
opt
in
'-t'
,
'--threshold'
:
with
self
.
subTest
(
opt
=
opt
):
with
self
.
subTest
(
opt
=
opt
):
ns
=
regrtest
.
_parse_args
([
opt
,
'1000'
])
ns
=
lib
regrtest
.
_parse_args
([
opt
,
'1000'
])
self
.
assertEqual
(
ns
.
threshold
,
1000
)
self
.
assertEqual
(
ns
.
threshold
,
1000
)
self
.
checkError
([
opt
],
'expected one argument'
)
self
.
checkError
([
opt
],
'expected one argument'
)
self
.
checkError
([
opt
,
'foo'
],
'invalid int value'
)
self
.
checkError
([
opt
,
'foo'
],
'invalid int value'
)
...
@@ -232,13 +233,13 @@ class ParseArgsTestCase(unittest.TestCase):
...
@@ -232,13 +233,13 @@ class ParseArgsTestCase(unittest.TestCase):
def
test_nowindows
(
self
):
def
test_nowindows
(
self
):
for
opt
in
'-n'
,
'--nowindows'
:
for
opt
in
'-n'
,
'--nowindows'
:
with
self
.
subTest
(
opt
=
opt
):
with
self
.
subTest
(
opt
=
opt
):
ns
=
regrtest
.
_parse_args
([
opt
])
ns
=
lib
regrtest
.
_parse_args
([
opt
])
self
.
assertTrue
(
ns
.
nowindows
)
self
.
assertTrue
(
ns
.
nowindows
)
def
test_forever
(
self
):
def
test_forever
(
self
):
for
opt
in
'-F'
,
'--forever'
:
for
opt
in
'-F'
,
'--forever'
:
with
self
.
subTest
(
opt
=
opt
):
with
self
.
subTest
(
opt
=
opt
):
ns
=
regrtest
.
_parse_args
([
opt
])
ns
=
lib
regrtest
.
_parse_args
([
opt
])
self
.
assertTrue
(
ns
.
forever
)
self
.
assertTrue
(
ns
.
forever
)
...
@@ -246,26 +247,26 @@ class ParseArgsTestCase(unittest.TestCase):
...
@@ -246,26 +247,26 @@ class ParseArgsTestCase(unittest.TestCase):
self
.
checkError
([
'--xxx'
],
'usage:'
)
self
.
checkError
([
'--xxx'
],
'usage:'
)
def
test_long_option__partial
(
self
):
def
test_long_option__partial
(
self
):
ns
=
regrtest
.
_parse_args
([
'--qui'
])
ns
=
lib
regrtest
.
_parse_args
([
'--qui'
])
self
.
assertTrue
(
ns
.
quiet
)
self
.
assertTrue
(
ns
.
quiet
)
self
.
assertEqual
(
ns
.
verbose
,
0
)
self
.
assertEqual
(
ns
.
verbose
,
0
)
def
test_two_options
(
self
):
def
test_two_options
(
self
):
ns
=
regrtest
.
_parse_args
([
'--quiet'
,
'--exclude'
])
ns
=
lib
regrtest
.
_parse_args
([
'--quiet'
,
'--exclude'
])
self
.
assertTrue
(
ns
.
quiet
)
self
.
assertTrue
(
ns
.
quiet
)
self
.
assertEqual
(
ns
.
verbose
,
0
)
self
.
assertEqual
(
ns
.
verbose
,
0
)
self
.
assertTrue
(
ns
.
exclude
)
self
.
assertTrue
(
ns
.
exclude
)
def
test_option_with_empty_string_value
(
self
):
def
test_option_with_empty_string_value
(
self
):
ns
=
regrtest
.
_parse_args
([
'--start'
,
''
])
ns
=
lib
regrtest
.
_parse_args
([
'--start'
,
''
])
self
.
assertEqual
(
ns
.
start
,
''
)
self
.
assertEqual
(
ns
.
start
,
''
)
def
test_arg
(
self
):
def
test_arg
(
self
):
ns
=
regrtest
.
_parse_args
([
'foo'
])
ns
=
lib
regrtest
.
_parse_args
([
'foo'
])
self
.
assertEqual
(
ns
.
args
,
[
'foo'
])
self
.
assertEqual
(
ns
.
args
,
[
'foo'
])
def
test_option_and_arg
(
self
):
def
test_option_and_arg
(
self
):
ns
=
regrtest
.
_parse_args
([
'--quiet'
,
'foo'
])
ns
=
lib
regrtest
.
_parse_args
([
'--quiet'
,
'foo'
])
self
.
assertTrue
(
ns
.
quiet
)
self
.
assertTrue
(
ns
.
quiet
)
self
.
assertEqual
(
ns
.
verbose
,
0
)
self
.
assertEqual
(
ns
.
verbose
,
0
)
self
.
assertEqual
(
ns
.
args
,
[
'foo'
])
self
.
assertEqual
(
ns
.
args
,
[
'foo'
])
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment