Kaydet (Commit) eabfb5b1 authored tarafından Benjamin Peterson's avatar Benjamin Peterson

merge heads

...@@ -9,7 +9,7 @@ import struct ...@@ -9,7 +9,7 @@ import struct
import subprocess import subprocess
import traceback import traceback
import sys, os, time, errno import sys, os, time, errno
from test.script_helper import assert_python_ok from test.script_helper import assert_python_ok, spawn_python
try: try:
import threading import threading
except ImportError: except ImportError:
...@@ -314,100 +314,76 @@ class WakeupSignalTests(unittest.TestCase): ...@@ -314,100 +314,76 @@ class WakeupSignalTests(unittest.TestCase):
@unittest.skipIf(sys.platform == "win32", "Not valid on Windows") @unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
class SiginterruptTest(unittest.TestCase): class SiginterruptTest(unittest.TestCase):
def setUp(self): def readpipe_interrupted(self, interrupt):
"""Install a no-op signal handler that can be set to allow
interrupts or not, and arrange for the original signal handler to be
re-installed when the test is finished.
"""
self.signum = signal.SIGUSR1
oldhandler = signal.signal(self.signum, lambda x,y: None)
self.addCleanup(signal.signal, self.signum, oldhandler)
def readpipe_interrupted(self):
"""Perform a read during which a signal will arrive. Return True if the """Perform a read during which a signal will arrive. Return True if the
read is interrupted by the signal and raises an exception. Return False read is interrupted by the signal and raises an exception. Return False
if it returns normally. if it returns normally.
""" """
# Create a pipe that can be used for the read. Also clean it up # use a subprocess to have only one thread, to have a timeout on the
# when the test is over, since nothing else will (but see below for # blocking read and to not touch signal handling in this process
# the write end). code = """if 1:
r, w = os.pipe() import errno
self.addCleanup(os.close, r) import os
import signal
# Create another process which can send a signal to this one to try import sys
# to interrupt the read.
ppid = os.getpid() interrupt = %r
pid = os.fork() r, w = os.pipe()
if pid == 0: def handler(signum, frame):
# Child code: sleep to give the parent enough time to enter the pass
# read() call (there's a race here, but it's really tricky to
# eliminate it); then signal the parent process. Also, sleep signal.signal(signal.SIGALRM, handler)
# again to make it likely that the signal is delivered to the if interrupt is not None:
# parent process before the child exits. If the child exits signal.siginterrupt(signal.SIGALRM, interrupt)
# first, the write end of the pipe will be closed and the test
# is invalid. # run the test twice
try: for loop in range(2):
time.sleep(0.2) # send a SIGALRM in a second (during the read)
os.kill(ppid, self.signum) signal.alarm(1)
time.sleep(0.2) try:
finally: # blocking call: read from a pipe without data
# No matter what, just exit as fast as possible now. os.read(r, 1)
exit_subprocess() except OSError as err:
else: if err.errno != errno.EINTR:
# Parent code. raise
# Make sure the child is eventually reaped, else it'll be a else:
# zombie for the rest of the test suite run. sys.exit(2)
self.addCleanup(os.waitpid, pid, 0) sys.exit(3)
""" % (interrupt,)
# Close the write end of the pipe. The child has a copy, so with spawn_python('-c', code) as process:
# it's not really closed until the child exits. We need it to
# close when the child exits so that in the non-interrupt case
# the read eventually completes, otherwise we could just close
# it *after* the test.
os.close(w)
# Try the read and report whether it is interrupted or not to
# the caller.
try: try:
d = os.read(r, 1) stdout, stderr = process.communicate(timeout=3.0)
except subprocess.TimeoutExpired:
process.kill()
return False return False
except OSError as err: else:
if err.errno != errno.EINTR: exitcode = process.wait()
raise if exitcode not in (2, 3):
return True raise Exception("Child error (exit code %s): %s"
% (exitcode, stdout))
return (exitcode == 3)
def test_without_siginterrupt(self): def test_without_siginterrupt(self):
# If a signal handler is installed and siginterrupt is not called # If a signal handler is installed and siginterrupt is not called
# at all, when that signal arrives, it interrupts a syscall that's in # at all, when that signal arrives, it interrupts a syscall that's in
# progress. # progress.
i = self.readpipe_interrupted() interrupted = self.readpipe_interrupted(None)
self.assertTrue(i) self.assertTrue(interrupted)
# Arrival of the signal shouldn't have changed anything.
i = self.readpipe_interrupted()
self.assertTrue(i)
def test_siginterrupt_on(self): def test_siginterrupt_on(self):
# If a signal handler is installed and siginterrupt is called with # If a signal handler is installed and siginterrupt is called with
# a true value for the second argument, when that signal arrives, it # a true value for the second argument, when that signal arrives, it
# interrupts a syscall that's in progress. # interrupts a syscall that's in progress.
signal.siginterrupt(self.signum, 1) interrupted = self.readpipe_interrupted(True)
i = self.readpipe_interrupted() self.assertTrue(interrupted)
self.assertTrue(i)
# Arrival of the signal shouldn't have changed anything.
i = self.readpipe_interrupted()
self.assertTrue(i)
def test_siginterrupt_off(self): def test_siginterrupt_off(self):
# If a signal handler is installed and siginterrupt is called with # If a signal handler is installed and siginterrupt is called with
# a false value for the second argument, when that signal arrives, it # a false value for the second argument, when that signal arrives, it
# does not interrupt a syscall that's in progress. # does not interrupt a syscall that's in progress.
signal.siginterrupt(self.signum, 0) interrupted = self.readpipe_interrupted(False)
i = self.readpipe_interrupted() self.assertFalse(interrupted)
self.assertFalse(i)
# Arrival of the signal shouldn't have changed anything.
i = self.readpipe_interrupted()
self.assertFalse(i)
@unittest.skipIf(sys.platform == "win32", "Not valid on Windows") @unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment