Skip to content
Projeler
Gruplar
Parçacıklar
Yardım
Yükleniyor...
Oturum aç / Kaydol
Gezinmeyi değiştir
C
cpython
Proje
Proje
Ayrıntılar
Etkinlik
Cycle Analytics
Depo (repository)
Depo (repository)
Dosyalar
Kayıtlar (commit)
Dallar (branch)
Etiketler
Katkıda bulunanlar
Grafik
Karşılaştır
Grafikler
Konular (issue)
0
Konular (issue)
0
Liste
Pano
Etiketler
Kilometre Taşları
Birleştirme (merge) Talepleri
0
Birleştirme (merge) Talepleri
0
CI / CD
CI / CD
İş akışları (pipeline)
İşler
Zamanlamalar
Grafikler
Paketler
Paketler
Wiki
Wiki
Parçacıklar
Parçacıklar
Üyeler
Üyeler
Collapse sidebar
Close sidebar
Etkinlik
Grafik
Grafikler
Yeni bir konu (issue) oluştur
İşler
Kayıtlar (commit)
Konu (issue) Panoları
Kenar çubuğunu aç
Batuhan Osman TASKAYA
cpython
Commits
48a69b70
Kaydet (Commit)
48a69b70
authored
May 16, 1994
tarafından
Guido van Rossum
Dosyalara gözat
Seçenekler
Dosyalara Gözat
İndir
Eposta Yamaları
Sade Fark
Initial revision
üst
9f2a5b0f
Show whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
497 additions
and
0 deletions
+497
-0
bug.py
Demo/threads/bug.py
+69
-0
sync.py
Demo/threads/sync.py
+428
-0
No files found.
Demo/threads/bug.py
0 → 100644
Dosyayı görüntüle @
48a69b70
# The following self-contained little program usually freezes with most
# threads reporting
#
# Unhandled exception in thread:
# Traceback (innermost last):
# File "importbug.py", line 6
# x = whrandom.randint(1,3)
# AttributeError: randint
#
# Here's the program; it doesn't use anything from the attached module:
import
thread
def
task
():
global
N
import
whrandom
x
=
whrandom
.
randint
(
1
,
3
)
a
.
acquire
()
N
=
N
-
1
if
N
==
0
:
done
.
release
()
a
.
release
()
a
=
thread
.
allocate_lock
()
done
=
thread
.
allocate_lock
()
N
=
10
done
.
acquire
()
for
i
in
range
(
N
):
thread
.
start_new_thread
(
task
,
())
done
.
acquire
()
print
'done'
# Sticking an acquire/release pair around the 'import' statement makes the
# problem go away.
#
# I believe that what happens is:
#
# 1) The first thread to hit the import atomically reaches, and executes
# most of, get_module. In particular, it finds Lib/whrandom.pyc,
# installs its name in sys.modules, and executes
#
# v = eval_code(co, d, d, d, (object *)NULL);
#
# to initialize the module.
#
# 2) eval_code "ticker"-slices the 1st thread out, and gives another thread
# a chance. When this 2nd thread hits the same 'import', import_module
# finds 'whrandom' in sys.modules, so just proceeds.
#
# 3) But the 1st thread is still "in the middle" of executing whrandom.pyc.
# So the 2nd thread has a good chance of trying to look up 'randint'
# before the 1st thread has placed it in whrandom's dict.
#
# 4) The more threads there are, the more likely that at least one of them
# will do this before the 1st thread finishes the import work.
#
# If that's right, a perhaps not-too-bad workaround would be to introduce a
# static "you can't interrupt this thread" flag in ceval.c, check it before
# giving up interpreter_lock, and have IMPORT_NAME set it & restore (plain
# clearing would not work) it around its call to import_module. To its
# credit, there's something wonderfully perverse about fixing a race via an
# unprotected static <grin>.
#
# as-with-most-other-things-(pseudo-)parallel-programming's-more-fun-
# in-python-too!-ly y'rs - tim
#
# Tim Peters tim@ksr.com
# not speaking for Kendall Square Research Corp
Demo/threads/sync.py
0 → 100644
Dosyayı görüntüle @
48a69b70
# Defines classes that provide synchronization objects. Note that use of
# this module requires that your Python support threads.
#
# condition() # a POSIX-like condition-variable object
# barrier(n) # an n-thread barrier
# event() # an event object
#
# CONDITIONS
#
# A condition object is created via
# import this_module
# your_condition_object = this_module.condition()
#
# Methods:
# .acquire()
# acquire the lock associated with the condition
# .release()
# release the lock associated with the condition
# .wait()
# block the thread until such time as some other thread does a
# .signal or .broadcast on the same condition, and release the
# lock associated with the condition. The lock associated with
# the condition MUST be in the acquired state at the time
# .wait is invoked.
# .signal()
# wake up exactly one thread (if any) that previously did a .wait
# on the condition; that thread will awaken with the lock associated
# with the condition in the acquired state. If no threads are
# .wait'ing, this is a nop. If more than one thread is .wait'ing on
# the condition, any of them may be awakened.
# .broadcast()
# wake up all threads (if any) that are .wait'ing on the condition;
# the threads are woken up serially, each with the lock in the
# acquired state, so should .release() as soon as possible. If no
# threads are .wait'ing, this is a nop.
#
# Note that if a thread does a .wait *while* a signal/broadcast is
# in progress, it's guaranteeed to block until a subsequenct
# signal/broadcast.
#
# Secret feature: `broadcast' actually takes an integer argument,
# and will wake up exactly that many waiting threads (or the total
# number waiting, if that's less). Use of this is dubious, though,
# and probably won't be supported if this form of condition is
# reimplemented in C.
#
# DIFFERENCES FROM POSIX
#
# + A separate mutex is not needed to guard condition data. Instead, a
# condition object can (must) be .acquire'ed and .release'ed directly.
# This eliminates a common error in using POSIX conditions.
#
# + Because of implementation difficulties, a POSIX `signal' wakes up
# _at least_ one .wait'ing thread. Race conditions make it difficult
# to stop that. This implementation guarantees to wake up only one,
# but you probably shouldn't rely on that.
#
# PROTOCOL
#
# Condition objects are used to block threads until "some condition" is
# true. E.g., a thread may wish to wait until a producer pumps out data
# for it to consume, or a server may wish to wait until someone requests
# its services, or perhaps a whole bunch of threads want to wait until a
# preceding pass over the data is complete. Early models for conditions
# relied on some other thread figuring out when a blocked thread's
# condition was true, and made the other thread responsible both for
# waking up the blocked thread and guaranteeing that it woke up with all
# data in a correct state. This proved to be very delicate in practice,
# and gave conditions a bad name in some circles.
#
# The POSIX model addresses these problems by making a thread responsible
# for ensuring that its own state is correct when it wakes, and relies
# on a rigid protocol to make this easy; so long as you stick to the
# protocol, POSIX conditions are easy to "get right":
#
# A) The thread that's waiting for some arbitrarily-complex condition
# (ACC) to become true does:
#
# condition.acquire()
# while not (code to evaluate the ACC):
# condition.wait()
# # That blocks the thread, *and* releases the lock. When a
# # condition.signal() happens, it will wake up some thread that
# # did a .wait, *and* acquire the lock again before .wait
# # returns.
# #
# # Because the lock is acquired at this point, the state used
# # in evaluating the ACC is frozen, so it's safe to go back &
# # reevaluate the ACC.
#
# # At this point, ACC is true, and the thread has the condition
# # locked.
# # So code here can safely muck with the shared state that
# # went into evaluating the ACC -- if it wants to.
# # When done mucking with the shared state, do
# condition.release()
#
# B) Threads that are mucking with shared state that may affect the
# ACC do:
#
# condition.acquire()
# # muck with shared state
# condition.release()
# if it's possible that ACC is true now:
# condition.signal() # or .broadcast()
#
# Note: You may prefer to put the "if" clause before the release().
# That's fine, but do note that anyone waiting on the signal will
# stay blocked until the release() is done (since acquiring the
# condition is part of what .wait() does before it returns).
#
# TRICK OF THE TRADE
#
# With simpler forms of conditions, it can be impossible to know when
# a thread that's supposed to do a .wait has actually done it. But
# because this form of condition releases a lock as _part_ of doing a
# wait, the state of that lock can be used to guarantee it.
#
# E.g., suppose thread A spawns thread B and later wants to wait for B to
# complete:
#
# In A: In B:
#
# B_done = condition() ... do work ...
# B_done.acquire() B_done.acquire(); B_done.release()
# spawn B B_done.signal()
# ... some time later ... ... and B exits ...
# B_done.wait()
#
# Because B_done was in the acquire'd state at the time B was spawned,
# B's attempt to acquire B_done can't succeed until A has done its
# B_done.wait() (which releases B_done). So B's B_done.signal() is
# guaranteed to be seen by the .wait(). Without the lock trick, B
# may signal before A .waits, and then A would wait forever.
#
# BARRIERS
#
# A barrier object is created via
# import this_module
# your_barrier = this_module.barrier(num_threads)
#
# Methods:
# .enter()
# the thread blocks until num_threads threads in all have done
# .enter(). Then the num_threads threads that .enter'ed resume,
# and the barrier resets to capture the next num_threads threads
# that .enter it.
#
# EVENTS
#
# An event object is created via
# import this_module
# your_event = this_module.event()
#
# An event has two states, `posted' and `cleared'. An event is
# created in the cleared state.
#
# Methods:
#
# .post()
# Put the event in the posted state, and resume all threads
# .wait'ing on the event (if any).
#
# .clear()
# Put the event in the cleared state.
#
# .is_posted()
# Returns 0 if the event is in the cleared state, or 1 if the event
# is in the posted state.
#
# .wait()
# If the event is in the posted state, returns immediately.
# If the event is in the cleared state, blocks the calling thread
# until the event is .post'ed by another thread.
#
# Note that an event, once posted, remains posted until explicitly
# cleared. Relative to conditions, this is both the strength & weakness
# of events. It's a strength because the .post'ing thread doesn't have to
# worry about whether the threads it's trying to communicate with have
# already done a .wait (a condition .signal is seen only by threads that
# do a .wait _prior_ to the .signal; a .signal does not persist). But
# it's a weakness because .clear'ing an event is error-prone: it's easy
# to mistakenly .clear an event before all the threads you intended to
# see the event get around to .wait'ing on it. But so long as you don't
# need to .clear an event, events are easy to use safely.
#
# Tim Peters tim@ksr.com
# not speaking for Kendall Square Research Corp
import
thread
class
condition
:
def
__init__
(
self
):
# the lock actually used by .acquire() and .release()
self
.
mutex
=
thread
.
allocate_lock
()
# lock used to block threads until a signal
self
.
checkout
=
thread
.
allocate_lock
()
self
.
checkout
.
acquire
()
# internal critical-section lock, & the data it protects
self
.
idlock
=
thread
.
allocate_lock
()
self
.
id
=
0
self
.
waiting
=
0
# num waiters subject to current release
self
.
pending
=
0
# num waiters awaiting next signal
self
.
torelease
=
0
# num waiters to release
self
.
releasing
=
0
# 1 iff release is in progress
def
acquire
(
self
):
self
.
mutex
.
acquire
()
def
release
(
self
):
self
.
mutex
.
release
()
def
wait
(
self
):
mutex
,
checkout
,
idlock
=
self
.
mutex
,
self
.
checkout
,
self
.
idlock
if
not
mutex
.
locked
():
raise
ValueError
,
\
"condition must be .acquire'd when .wait() invoked"
idlock
.
acquire
()
myid
=
self
.
id
self
.
pending
=
self
.
pending
+
1
idlock
.
release
()
mutex
.
release
()
while
1
:
checkout
.
acquire
();
idlock
.
acquire
()
if
myid
<
self
.
id
:
break
checkout
.
release
();
idlock
.
release
()
self
.
waiting
=
self
.
waiting
-
1
self
.
torelease
=
self
.
torelease
-
1
if
self
.
torelease
:
checkout
.
release
()
else
:
self
.
releasing
=
0
if
self
.
waiting
==
self
.
pending
==
0
:
self
.
id
=
0
idlock
.
release
()
mutex
.
acquire
()
def
signal
(
self
):
self
.
broadcast
(
1
)
def
broadcast
(
self
,
num
=
-
1
):
if
num
<
-
1
:
raise
ValueError
,
'.broadcast called with num '
+
`num`
if
num
==
0
:
return
self
.
idlock
.
acquire
()
if
self
.
pending
:
self
.
waiting
=
self
.
waiting
+
self
.
pending
self
.
pending
=
0
self
.
id
=
self
.
id
+
1
if
num
==
-
1
:
self
.
torelease
=
self
.
waiting
else
:
self
.
torelease
=
min
(
self
.
waiting
,
self
.
torelease
+
num
)
if
self
.
torelease
and
not
self
.
releasing
:
self
.
releasing
=
1
self
.
checkout
.
release
()
self
.
idlock
.
release
()
class
barrier
:
def
__init__
(
self
,
n
):
self
.
n
=
n
self
.
togo
=
n
self
.
full
=
condition
()
def
enter
(
self
):
full
=
self
.
full
full
.
acquire
()
self
.
togo
=
self
.
togo
-
1
if
self
.
togo
:
full
.
wait
()
else
:
self
.
togo
=
self
.
n
full
.
broadcast
()
full
.
release
()
class
event
:
def
__init__
(
self
):
self
.
state
=
0
self
.
posted
=
condition
()
def
post
(
self
):
self
.
posted
.
acquire
()
self
.
state
=
1
self
.
posted
.
broadcast
()
self
.
posted
.
release
()
def
clear
(
self
):
self
.
posted
.
acquire
()
self
.
state
=
0
self
.
posted
.
release
()
def
is_posted
(
self
):
self
.
posted
.
acquire
()
answer
=
self
.
state
self
.
posted
.
release
()
return
answer
def
wait
(
self
):
self
.
posted
.
acquire
()
while
not
self
.
state
:
self
.
posted
.
wait
()
self
.
posted
.
release
()
# The rest of the file is a test case, that runs a number of parallelized
# quicksorts in parallel. If it works, you'll get about 600 lines of
# tracing output, with a line like
# test passed! 209 threads created in all
# as the last line. The content and order of preceding lines will
# vary across runs.
def
_new_thread
(
func
,
*
args
):
global
TID
tid
.
acquire
();
id
=
TID
=
TID
+
1
;
tid
.
release
()
io
.
acquire
();
alive
.
append
(
id
);
\
print
'starting thread'
,
id
,
'--'
,
len
(
alive
),
'alive'
;
\
io
.
release
()
thread
.
start_new_thread
(
func
,
(
id
,)
+
args
)
def
_qsort
(
tid
,
a
,
l
,
r
,
finished
):
# sort a[l:r]; post finished when done
io
.
acquire
();
print
'thread'
,
tid
,
'qsort'
,
l
,
r
;
io
.
release
()
if
r
-
l
>
1
:
pivot
=
a
[
l
]
j
=
l
+
1
# make a[l:j] <= pivot, and a[j:r] > pivot
for
i
in
range
(
j
,
r
):
if
a
[
i
]
<=
pivot
:
a
[
j
],
a
[
i
]
=
a
[
i
],
a
[
j
]
j
=
j
+
1
a
[
l
],
a
[
j
-
1
]
=
a
[
j
-
1
],
pivot
l_subarray_sorted
=
event
()
r_subarray_sorted
=
event
()
_new_thread
(
_qsort
,
a
,
l
,
j
-
1
,
l_subarray_sorted
)
_new_thread
(
_qsort
,
a
,
j
,
r
,
r_subarray_sorted
)
l_subarray_sorted
.
wait
()
r_subarray_sorted
.
wait
()
io
.
acquire
();
print
'thread'
,
tid
,
'qsort done'
;
\
alive
.
remove
(
tid
);
io
.
release
()
finished
.
post
()
def
_randarray
(
tid
,
a
,
finished
):
io
.
acquire
();
print
'thread'
,
tid
,
'randomizing array'
;
\
io
.
release
()
for
i
in
range
(
1
,
len
(
a
)):
wh
.
acquire
();
j
=
randint
(
0
,
i
);
wh
.
release
()
a
[
i
],
a
[
j
]
=
a
[
j
],
a
[
i
]
io
.
acquire
();
print
'thread'
,
tid
,
'randomizing done'
;
\
alive
.
remove
(
tid
);
io
.
release
()
finished
.
post
()
def
_check_sort
(
a
):
if
a
!=
range
(
len
(
a
)):
raise
ValueError
,
(
'a not sorted'
,
a
)
def
_run_one_sort
(
tid
,
a
,
bar
,
done
):
# randomize a, and quicksort it
# for variety, all the threads running this enter a barrier
# at the end, and post `done' after the barrier exits
io
.
acquire
();
print
'thread'
,
tid
,
'randomizing'
,
a
;
\
io
.
release
()
finished
=
event
()
_new_thread
(
_randarray
,
a
,
finished
)
finished
.
wait
()
io
.
acquire
();
print
'thread'
,
tid
,
'sorting'
,
a
;
io
.
release
()
finished
.
clear
()
_new_thread
(
_qsort
,
a
,
0
,
len
(
a
),
finished
)
finished
.
wait
()
_check_sort
(
a
)
io
.
acquire
();
print
'thread'
,
tid
,
'entering barrier'
;
\
io
.
release
()
bar
.
enter
()
io
.
acquire
();
print
'thread'
,
tid
,
'leaving barrier'
;
\
io
.
release
()
io
.
acquire
();
alive
.
remove
(
tid
);
io
.
release
()
bar
.
enter
()
# make sure they've all removed themselves from alive
## before 'done' is posted
bar
.
enter
()
# just to be cruel
done
.
post
()
def
test
():
global
TID
,
tid
,
io
,
wh
,
randint
,
alive
import
whrandom
randint
=
whrandom
.
randint
TID
=
0
# thread ID (1, 2, ...)
tid
=
thread
.
allocate_lock
()
# for changing TID
io
=
thread
.
allocate_lock
()
# for printing, and 'alive'
wh
=
thread
.
allocate_lock
()
# for calls to whrandom
alive
=
[]
# IDs of active threads
NSORTS
=
5
arrays
=
[]
for
i
in
range
(
NSORTS
):
arrays
.
append
(
range
(
(
i
+
1
)
*
10
)
)
bar
=
barrier
(
NSORTS
)
finished
=
event
()
for
i
in
range
(
NSORTS
):
_new_thread
(
_run_one_sort
,
arrays
[
i
],
bar
,
finished
)
finished
.
wait
()
print
'all threads done, and checking results ...'
if
alive
:
raise
ValueError
,
(
'threads still alive at end'
,
alive
)
for
i
in
range
(
NSORTS
):
a
=
arrays
[
i
]
if
len
(
a
)
!=
(
i
+
1
)
*
10
:
raise
ValueError
,
(
'length of array'
,
i
,
'screwed up'
)
_check_sort
(
a
)
print
'test passed!'
,
TID
,
'threads created in all'
if
__name__
==
'__main__'
:
test
()
# end of module
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment