Kaydet (Commit) d6284963 authored tarafından Victor Stinner's avatar Victor Stinner

Close #12363: fix a race condition in siginterrupt() tests

The previous tests used time.sleep() to synchronize two processes. If the host
was too slow, the test could fail.

The new tests only use one process, but they use a subprocess to:

 - have only one thread
 - have a timeout on the blocking read (select cannot be used in the test,
   select always fail with EINTR, the kernel doesn't restart it)
 - not touch signal handling of the parent process
üst 395dc58e
......@@ -9,7 +9,7 @@ import struct
import subprocess
import traceback
import sys, os, time, errno
from test.script_helper import assert_python_ok
from test.script_helper import assert_python_ok, spawn_python
try:
import threading
except ImportError:
......@@ -314,100 +314,76 @@ class WakeupSignalTests(unittest.TestCase):
@unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
class SiginterruptTest(unittest.TestCase):
def setUp(self):
"""Install a no-op signal handler that can be set to allow
interrupts or not, and arrange for the original signal handler to be
re-installed when the test is finished.
"""
self.signum = signal.SIGUSR1
oldhandler = signal.signal(self.signum, lambda x,y: None)
self.addCleanup(signal.signal, self.signum, oldhandler)
def readpipe_interrupted(self):
def readpipe_interrupted(self, interrupt):
"""Perform a read during which a signal will arrive. Return True if the
read is interrupted by the signal and raises an exception. Return False
if it returns normally.
"""
# Create a pipe that can be used for the read. Also clean it up
# when the test is over, since nothing else will (but see below for
# the write end).
r, w = os.pipe()
self.addCleanup(os.close, r)
# Create another process which can send a signal to this one to try
# to interrupt the read.
ppid = os.getpid()
pid = os.fork()
if pid == 0:
# Child code: sleep to give the parent enough time to enter the
# read() call (there's a race here, but it's really tricky to
# eliminate it); then signal the parent process. Also, sleep
# again to make it likely that the signal is delivered to the
# parent process before the child exits. If the child exits
# first, the write end of the pipe will be closed and the test
# is invalid.
try:
time.sleep(0.2)
os.kill(ppid, self.signum)
time.sleep(0.2)
finally:
# No matter what, just exit as fast as possible now.
exit_subprocess()
else:
# Parent code.
# Make sure the child is eventually reaped, else it'll be a
# zombie for the rest of the test suite run.
self.addCleanup(os.waitpid, pid, 0)
# Close the write end of the pipe. The child has a copy, so
# it's not really closed until the child exits. We need it to
# close when the child exits so that in the non-interrupt case
# the read eventually completes, otherwise we could just close
# it *after* the test.
os.close(w)
# Try the read and report whether it is interrupted or not to
# the caller.
# use a subprocess to have only one thread, to have a timeout on the
# blocking read and to not touch signal handling in this process
code = """if 1:
import errno
import os
import signal
import sys
interrupt = %r
r, w = os.pipe()
def handler(signum, frame):
pass
signal.signal(signal.SIGALRM, handler)
if interrupt is not None:
signal.siginterrupt(signal.SIGALRM, interrupt)
# run the test twice
for loop in range(2):
# send a SIGALRM in a second (during the read)
signal.alarm(1)
try:
# blocking call: read from a pipe without data
os.read(r, 1)
except OSError as err:
if err.errno != errno.EINTR:
raise
else:
sys.exit(2)
sys.exit(3)
""" % (interrupt,)
with spawn_python('-c', code) as process:
try:
d = os.read(r, 1)
stdout, stderr = process.communicate(timeout=3.0)
except subprocess.TimeoutExpired:
process.kill()
return False
except OSError as err:
if err.errno != errno.EINTR:
raise
return True
else:
exitcode = process.wait()
if exitcode not in (2, 3):
raise Exception("Child error (exit code %s): %s"
% (exitcode, stdout))
return (exitcode == 3)
def test_without_siginterrupt(self):
# If a signal handler is installed and siginterrupt is not called
# at all, when that signal arrives, it interrupts a syscall that's in
# progress.
i = self.readpipe_interrupted()
self.assertTrue(i)
# Arrival of the signal shouldn't have changed anything.
i = self.readpipe_interrupted()
self.assertTrue(i)
interrupted = self.readpipe_interrupted(None)
self.assertTrue(interrupted)
def test_siginterrupt_on(self):
# If a signal handler is installed and siginterrupt is called with
# a true value for the second argument, when that signal arrives, it
# interrupts a syscall that's in progress.
signal.siginterrupt(self.signum, 1)
i = self.readpipe_interrupted()
self.assertTrue(i)
# Arrival of the signal shouldn't have changed anything.
i = self.readpipe_interrupted()
self.assertTrue(i)
interrupted = self.readpipe_interrupted(True)
self.assertTrue(interrupted)
def test_siginterrupt_off(self):
# If a signal handler is installed and siginterrupt is called with
# a false value for the second argument, when that signal arrives, it
# does not interrupt a syscall that's in progress.
signal.siginterrupt(self.signum, 0)
i = self.readpipe_interrupted()
self.assertFalse(i)
# Arrival of the signal shouldn't have changed anything.
i = self.readpipe_interrupted()
self.assertFalse(i)
interrupted = self.readpipe_interrupted(False)
self.assertFalse(interrupted)
@unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment