Skip to content
Projeler
Gruplar
Parçacıklar
Yardım
Yükleniyor...
Oturum aç / Kaydol
Gezinmeyi değiştir
C
cpython
Proje
Proje
Ayrıntılar
Etkinlik
Cycle Analytics
Depo (repository)
Depo (repository)
Dosyalar
Kayıtlar (commit)
Dallar (branch)
Etiketler
Katkıda bulunanlar
Grafik
Karşılaştır
Grafikler
Konular (issue)
0
Konular (issue)
0
Liste
Pano
Etiketler
Kilometre Taşları
Birleştirme (merge) Talepleri
0
Birleştirme (merge) Talepleri
0
CI / CD
CI / CD
İş akışları (pipeline)
İşler
Zamanlamalar
Grafikler
Paketler
Paketler
Wiki
Wiki
Parçacıklar
Parçacıklar
Üyeler
Üyeler
Collapse sidebar
Close sidebar
Etkinlik
Grafik
Grafikler
Yeni bir konu (issue) oluştur
İşler
Kayıtlar (commit)
Konu (issue) Panoları
Kenar çubuğunu aç
Batuhan Osman TASKAYA
cpython
Commits
82733fac
Kaydet (Commit)
82733fac
authored
Eyl 08, 2016
tarafından
Senthil Kumaran
Dosyalara gözat
Seçenekler
Dosyalara Gözat
İndir
Eposta Yamaları
Sade Fark
Issue11551 - Increase the test coverage of _dummy_thread module to 100%.
Initial patch contributed by Denver Coneybeare.
üst
bfac23a4
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
115 additions
and
41 deletions
+115
-41
test_dummy_thread.py
Lib/test/test_dummy_thread.py
+115
-41
No files found.
Lib/test/test_dummy_thread.py
Dosyayı görüntüle @
82733fac
"""Generic thread tests.
Meant to be used by dummy_thread and thread. To allow for different modules
to be used, test_main() can be called with the module to use as the thread
implementation as its sole argument.
"""
import
_dummy_thread
as
_thread
import
time
import
queue
import
random
import
unittest
from
test
import
support
from
unittest
import
mock
DELAY
=
0
DELAY
=
0
# Set > 0 when testing a module other than _dummy_thread, such as
# the '_thread' module.
class
LockTests
(
unittest
.
TestCase
):
"""Test lock objects."""
...
...
@@ -34,6 +28,12 @@ class LockTests(unittest.TestCase):
self
.
assertFalse
(
self
.
lock
.
locked
(),
"Lock object did not release properly."
)
def
test_LockType_context_manager
(
self
):
with
_thread
.
LockType
():
pass
self
.
assertFalse
(
self
.
lock
.
locked
(),
"Acquired Lock was not released"
)
def
test_improper_release
(
self
):
#Make sure release of an unlocked thread raises RuntimeError
self
.
assertRaises
(
RuntimeError
,
self
.
lock
.
release
)
...
...
@@ -83,39 +83,72 @@ class LockTests(unittest.TestCase):
self
.
assertGreaterEqual
(
end_time
-
start_time
,
DELAY
,
"Blocking by unconditional acquiring failed."
)
@mock.patch
(
'time.sleep'
)
def
test_acquire_timeout
(
self
,
mock_sleep
):
"""Test invoking acquire() with a positive timeout when the lock is
already acquired. Ensure that time.sleep() is invoked with the given
timeout and that False is returned."""
self
.
lock
.
acquire
()
retval
=
self
.
lock
.
acquire
(
waitflag
=
0
,
timeout
=
1
)
self
.
assertTrue
(
mock_sleep
.
called
)
mock_sleep
.
assert_called_once_with
(
1
)
self
.
assertEqual
(
retval
,
False
)
def
test_lock_representation
(
self
):
self
.
lock
.
acquire
()
self
.
assertIn
(
"locked"
,
repr
(
self
.
lock
))
self
.
lock
.
release
()
self
.
assertIn
(
"unlocked"
,
repr
(
self
.
lock
))
class
MiscTests
(
unittest
.
TestCase
):
"""Miscellaneous tests."""
def
test_exit
(
self
):
#Make sure _thread.exit() raises SystemExit
self
.
assertRaises
(
SystemExit
,
_thread
.
exit
)
def
test_ident
(
self
):
#Test sanity of _thread.get_ident()
self
.
assertIsInstance
(
_thread
.
get_ident
(),
int
,
"_thread.get_ident() returned a non-integer"
)
self
.
assertNotEqual
(
_thread
.
get_ident
(),
0
,
"_thread.get_ident() returned 0"
)
def
test_LockType
(
self
):
#Make sure _thread.LockType is the same type as _thread.allocate_locke()
self
.
assertIsInstance
(
_thread
.
allocate_lock
(),
_thread
.
LockType
,
"_thread.LockType is not an instance of what "
"is returned by _thread.allocate_lock()"
)
def
test_set_sentinel
(
self
):
self
.
assertIsInstance
(
_thread
.
_set_sentinel
(),
_thread
.
LockType
,
"_thread._set_sentinel() did not return a "
"LockType instance."
)
def
test_interrupt_main
(
self
):
#Calling start_new_thread with a function that executes interrupt_main
# should raise KeyboardInterrupt upon completion.
def
call_interrupt
():
_thread
.
interrupt_main
()
self
.
assertRaises
(
KeyboardInterrupt
,
_thread
.
start_new_thread
,
call_interrupt
,
tuple
())
self
.
assertRaises
(
KeyboardInterrupt
,
_thread
.
start_new_thread
,
call_interrupt
,
tuple
())
def
test_interrupt_in_main
(
self
):
# Make sure that if interrupt_main is called in main threat that
# KeyboardInterrupt is raised instantly.
self
.
assertRaises
(
KeyboardInterrupt
,
_thread
.
interrupt_main
)
def
test_stack_size_None
(
self
):
retval
=
_thread
.
stack_size
(
None
)
self
.
assertEqual
(
retval
,
0
)
def
test_stack_size_not_None
(
self
):
with
self
.
assertRaises
(
_thread
.
error
)
as
cm
:
_thread
.
stack_size
(
""
)
self
.
assertEqual
(
cm
.
exception
.
args
[
0
],
"setting thread stack size not supported"
)
class
ThreadTests
(
unittest
.
TestCase
):
"""Test thread creation."""
...
...
@@ -129,31 +162,43 @@ class ThreadTests(unittest.TestCase):
_thread
.
start_new_thread
(
arg_tester
,
(
testing_queue
,
True
,
True
))
result
=
testing_queue
.
get
()
self
.
assertTrue
(
result
[
0
]
and
result
[
1
],
"Argument passing for thread creation using tuple failed"
)
_thread
.
start_new_thread
(
arg_tester
,
tuple
(),
{
'queue'
:
testing_queue
,
'arg1'
:
True
,
'arg2'
:
True
})
"Argument passing for thread creation "
"using tuple failed"
)
_thread
.
start_new_thread
(
arg_tester
,
tuple
(),
{
'queue'
:
testing_queue
,
'arg1'
:
True
,
'arg2'
:
True
})
result
=
testing_queue
.
get
()
self
.
assertTrue
(
result
[
0
]
and
result
[
1
],
"Argument passing for thread creation using kwargs failed"
)
_thread
.
start_new_thread
(
arg_tester
,
(
testing_queue
,
True
),
{
'arg2'
:
True
})
"Argument passing for thread creation "
"using kwargs failed"
)
_thread
.
start_new_thread
(
arg_tester
,
(
testing_queue
,
True
),
{
'arg2'
:
True
})
result
=
testing_queue
.
get
()
self
.
assertTrue
(
result
[
0
]
and
result
[
1
],
"Argument passing for thread creation using both tuple"
" and kwargs failed"
)
def
test_multi_creation
(
self
):
#Make sure multiple threads can be created.
def
test_multi_thread_creation
(
self
):
def
queue_mark
(
queue
,
delay
):
"""Wait for ``delay`` seconds and then put something into ``queue``"""
time
.
sleep
(
delay
)
queue
.
put
(
_thread
.
get_ident
())
thread_count
=
5
testing_queue
=
queue
.
Queue
(
thread_count
)
if
support
.
verbose
:
print
()
print
(
"*** Testing multiple thread creation "
\
"(will take approx.
%
s to
%
s sec.) ***"
%
(
DELAY
,
thread_count
))
print
(
"*** Testing multiple thread creation "
"(will take approx.
%
s to
%
s sec.) ***"
%
(
DELAY
,
thread_count
))
for
count
in
range
(
thread_count
):
if
DELAY
:
local_delay
=
round
(
random
.
random
(),
1
)
...
...
@@ -165,18 +210,47 @@ class ThreadTests(unittest.TestCase):
if
support
.
verbose
:
print
(
'done'
)
self
.
assertEqual
(
testing_queue
.
qsize
(),
thread_count
,
"Not all
%
s threads executed properly after
%
s sec."
%
(
thread_count
,
DELAY
))
def
test_main
(
imported_module
=
None
):
global
_thread
,
DELAY
if
imported_module
:
_thread
=
imported_module
DELAY
=
2
if
support
.
verbose
:
print
()
print
(
"*** Using
%
s as _thread module ***"
%
_thread
)
support
.
run_unittest
(
LockTests
,
MiscTests
,
ThreadTests
)
if
__name__
==
'__main__'
:
test_main
()
"Not all
%
s threads executed properly "
"after
%
s sec."
%
(
thread_count
,
DELAY
))
def
test_args_not_tuple
(
self
):
"""
Test invoking start_new_thread() with a non-tuple value for "args".
Expect TypeError with a meaningful error message to be raised.
"""
with
self
.
assertRaises
(
TypeError
)
as
cm
:
_thread
.
start_new_thread
(
mock
.
Mock
(),
[])
self
.
assertEqual
(
cm
.
exception
.
args
[
0
],
"2nd arg must be a tuple"
)
def
test_kwargs_not_dict
(
self
):
"""
Test invoking start_new_thread() with a non-dict value for "kwargs".
Expect TypeError with a meaningful error message to be raised.
"""
with
self
.
assertRaises
(
TypeError
)
as
cm
:
_thread
.
start_new_thread
(
mock
.
Mock
(),
tuple
(),
kwargs
=
[])
self
.
assertEqual
(
cm
.
exception
.
args
[
0
],
"3rd arg must be a dict"
)
def
test_SystemExit
(
self
):
"""
Test invoking start_new_thread() with a function that raises
SystemExit.
The exception should be discarded.
"""
func
=
mock
.
Mock
(
side_effect
=
SystemExit
())
try
:
_thread
.
start_new_thread
(
func
,
tuple
())
except
SystemExit
:
self
.
fail
(
"start_new_thread raised SystemExit."
)
@mock.patch
(
'traceback.print_exc'
)
def
test_RaiseException
(
self
,
mock_print_exc
):
"""
Test invoking start_new_thread() with a function that raises exception.
The exception should be discarded and the traceback should be printed
via traceback.print_exc()
"""
func
=
mock
.
Mock
(
side_effect
=
Exception
)
_thread
.
start_new_thread
(
func
,
tuple
())
self
.
assertTrue
(
mock_print_exc
.
called
)
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment