test_subprocess.py 48.6 KB
Newer Older
1 2 3 4 5 6
import unittest
from test import test_support
import subprocess
import sys
import signal
import os
7
import errno
8 9
import tempfile
import time
10
import re
11
import sysconfig
12 13 14 15 16 17 18 19

mswindows = (sys.platform == "win32")

#
# Depends on the following external programs: Python
#

if mswindows:
Tim Peters's avatar
Tim Peters committed
20 21
    SETBINARY = ('import msvcrt; msvcrt.setmode(sys.stdout.fileno(), '
                                                'os.O_BINARY);')
22 23 24
else:
    SETBINARY = ''

25 26 27 28 29 30 31 32 33 34

try:
    mkstemp = tempfile.mkstemp
except AttributeError:
    # tempfile.mkstemp is not available
    def mkstemp():
        """Replacement for mkstemp, calling mktemp."""
        fname = tempfile.mktemp()
        return os.open(fname, os.O_RDWR|os.O_CREAT), fname

35

36
class BaseTestCase(unittest.TestCase):
37
    def setUp(self):
Tim Peters's avatar
Tim Peters committed
38 39
        # Try to minimize the number of children we have so this test
        # doesn't crash on some buildbots (Alphas in particular).
40
        test_support.reap_children()
41

42 43 44 45 46 47
    def tearDown(self):
        for inst in subprocess._active:
            inst.wait()
        subprocess._cleanup()
        self.assertFalse(subprocess._active, "subprocess._active not empty")

48 49 50 51 52 53
    def assertStderrEqual(self, stderr, expected, msg=None):
        # In a debug build, stuff like "[6580 refs]" is printed to stderr at
        # shutdown time.  That frustrates tests trying to check stderr produced
        # from a spawned Python process.
        actual = re.sub(r"\[\d+ refs\]\r?\n?$", "", stderr)
        self.assertEqual(actual, expected, msg)
Tim Peters's avatar
Tim Peters committed
54

55 56 57

class ProcessTestCase(BaseTestCase):

58
    def test_call_seq(self):
59
        # call() function with sequence argument
Tim Peters's avatar
Tim Peters committed
60 61
        rc = subprocess.call([sys.executable, "-c",
                              "import sys; sys.exit(47)"])
62 63
        self.assertEqual(rc, 47)

64 65 66 67 68 69 70 71
    def test_check_call_zero(self):
        # check_call() function with zero return code
        rc = subprocess.check_call([sys.executable, "-c",
                                    "import sys; sys.exit(0)"])
        self.assertEqual(rc, 0)

    def test_check_call_nonzero(self):
        # check_call() function with non-zero return code
72
        with self.assertRaises(subprocess.CalledProcessError) as c:
73 74
            subprocess.check_call([sys.executable, "-c",
                                   "import sys; sys.exit(47)"])
75
        self.assertEqual(c.exception.returncode, 47)
76

77 78 79
    def test_check_output(self):
        # check_output() function with zero return code
        output = subprocess.check_output(
80
                [sys.executable, "-c", "print 'BDFL'"])
81
        self.assertIn('BDFL', output)
82

83
    def test_check_output_nonzero(self):
84
        # check_call() function with non-zero return code
85
        with self.assertRaises(subprocess.CalledProcessError) as c:
86
            subprocess.check_output(
87
                    [sys.executable, "-c", "import sys; sys.exit(5)"])
88
        self.assertEqual(c.exception.returncode, 5)
89

90 91 92
    def test_check_output_stderr(self):
        # check_output() function stderr redirected to stdout
        output = subprocess.check_output(
93 94
                [sys.executable, "-c", "import sys; sys.stderr.write('BDFL')"],
                stderr=subprocess.STDOUT)
95
        self.assertIn('BDFL', output)
96

97 98
    def test_check_output_stdout_arg(self):
        # check_output() function stderr redirected to stdout
99
        with self.assertRaises(ValueError) as c:
100
            output = subprocess.check_output(
101 102 103
                    [sys.executable, "-c", "print 'will not be run'"],
                    stdout=sys.stdout)
            self.fail("Expected ValueError when stdout arg supplied.")
104
        self.assertIn('stdout', c.exception.args[0])
105

106
    def test_call_kwargs(self):
107
        # call() function with keyword args
108 109 110
        newenv = os.environ.copy()
        newenv["FRUIT"] = "banana"
        rc = subprocess.call([sys.executable, "-c",
Florent Xicluna's avatar
Florent Xicluna committed
111 112 113
                              'import sys, os;'
                              'sys.exit(os.getenv("FRUIT")=="banana")'],
                             env=newenv)
114 115
        self.assertEqual(rc, 1)

116 117 118
    def test_invalid_args(self):
        # Popen() called with invalid arguments should raise TypeError
        # but Popen.__del__ should not complain (issue #12085)
119
        with test_support.captured_stderr() as s:
120 121 122 123 124 125
            self.assertRaises(TypeError, subprocess.Popen, invalid_arg_name=1)
            argcount = subprocess.Popen.__init__.__code__.co_argcount
            too_many_args = [0] * (argcount + 1)
            self.assertRaises(TypeError, subprocess.Popen, *too_many_args)
        self.assertEqual(s.getvalue(), '')

126
    def test_stdin_none(self):
127
        # .stdin is None when not redirected
128 129
        p = subprocess.Popen([sys.executable, "-c", 'print "banana"'],
                         stdout=subprocess.PIPE, stderr=subprocess.PIPE)
130 131
        self.addCleanup(p.stdout.close)
        self.addCleanup(p.stderr.close)
132 133 134 135
        p.wait()
        self.assertEqual(p.stdin, None)

    def test_stdout_none(self):
136
        # .stdout is None when not redirected
137
        p = subprocess.Popen([sys.executable, "-c",
138 139 140 141
                             'print "    this bit of output is from a '
                             'test of stdout in a different '
                             'process ..."'],
                             stdin=subprocess.PIPE, stderr=subprocess.PIPE)
142 143
        self.addCleanup(p.stdin.close)
        self.addCleanup(p.stderr.close)
144 145 146 147
        p.wait()
        self.assertEqual(p.stdout, None)

    def test_stderr_none(self):
148
        # .stderr is None when not redirected
149 150
        p = subprocess.Popen([sys.executable, "-c", 'print "banana"'],
                         stdin=subprocess.PIPE, stdout=subprocess.PIPE)
151 152
        self.addCleanup(p.stdout.close)
        self.addCleanup(p.stdin.close)
153 154 155
        p.wait()
        self.assertEqual(p.stderr, None)

156
    def test_executable_with_cwd(self):
157
        python_dir = os.path.dirname(os.path.realpath(sys.executable))
158 159 160 161 162 163 164 165 166 167 168 169 170
        p = subprocess.Popen(["somethingyoudonthave", "-c",
                              "import sys; sys.exit(47)"],
                             executable=sys.executable, cwd=python_dir)
        p.wait()
        self.assertEqual(p.returncode, 47)

    @unittest.skipIf(sysconfig.is_python_build(),
                     "need an installed Python. See #7774")
    def test_executable_without_cwd(self):
        # For a normal installation, it should work without 'cwd'
        # argument.  For test runs in the build directory, see #7774.
        p = subprocess.Popen(["somethingyoudonthave", "-c",
                              "import sys; sys.exit(47)"],
Tim Peters's avatar
Tim Peters committed
171
                             executable=sys.executable)
172 173 174 175
        p.wait()
        self.assertEqual(p.returncode, 47)

    def test_stdin_pipe(self):
176
        # stdin redirection
177 178 179 180 181 182 183 184 185
        p = subprocess.Popen([sys.executable, "-c",
                         'import sys; sys.exit(sys.stdin.read() == "pear")'],
                        stdin=subprocess.PIPE)
        p.stdin.write("pear")
        p.stdin.close()
        p.wait()
        self.assertEqual(p.returncode, 1)

    def test_stdin_filedes(self):
186
        # stdin is set to open file descriptor
Tim Peters's avatar
Tim Peters committed
187
        tf = tempfile.TemporaryFile()
188 189 190 191 192 193 194 195 196 197
        d = tf.fileno()
        os.write(d, "pear")
        os.lseek(d, 0, 0)
        p = subprocess.Popen([sys.executable, "-c",
                         'import sys; sys.exit(sys.stdin.read() == "pear")'],
                         stdin=d)
        p.wait()
        self.assertEqual(p.returncode, 1)

    def test_stdin_fileobj(self):
198
        # stdin is set to open file object
199 200 201 202 203 204 205 206 207 208
        tf = tempfile.TemporaryFile()
        tf.write("pear")
        tf.seek(0)
        p = subprocess.Popen([sys.executable, "-c",
                         'import sys; sys.exit(sys.stdin.read() == "pear")'],
                         stdin=tf)
        p.wait()
        self.assertEqual(p.returncode, 1)

    def test_stdout_pipe(self):
209
        # stdout redirection
210 211 212
        p = subprocess.Popen([sys.executable, "-c",
                          'import sys; sys.stdout.write("orange")'],
                         stdout=subprocess.PIPE)
213
        self.addCleanup(p.stdout.close)
214 215 216
        self.assertEqual(p.stdout.read(), "orange")

    def test_stdout_filedes(self):
217
        # stdout is set to open file descriptor
Tim Peters's avatar
Tim Peters committed
218
        tf = tempfile.TemporaryFile()
219 220 221 222 223 224 225 226 227
        d = tf.fileno()
        p = subprocess.Popen([sys.executable, "-c",
                          'import sys; sys.stdout.write("orange")'],
                         stdout=d)
        p.wait()
        os.lseek(d, 0, 0)
        self.assertEqual(os.read(d, 1024), "orange")

    def test_stdout_fileobj(self):
228
        # stdout is set to open file object
Tim Peters's avatar
Tim Peters committed
229
        tf = tempfile.TemporaryFile()
230 231 232 233 234 235 236 237
        p = subprocess.Popen([sys.executable, "-c",
                          'import sys; sys.stdout.write("orange")'],
                         stdout=tf)
        p.wait()
        tf.seek(0)
        self.assertEqual(tf.read(), "orange")

    def test_stderr_pipe(self):
238
        # stderr redirection
239 240 241
        p = subprocess.Popen([sys.executable, "-c",
                          'import sys; sys.stderr.write("strawberry")'],
                         stderr=subprocess.PIPE)
242
        self.addCleanup(p.stderr.close)
243
        self.assertStderrEqual(p.stderr.read(), "strawberry")
244 245

    def test_stderr_filedes(self):
246
        # stderr is set to open file descriptor
Tim Peters's avatar
Tim Peters committed
247
        tf = tempfile.TemporaryFile()
248 249 250 251 252 253
        d = tf.fileno()
        p = subprocess.Popen([sys.executable, "-c",
                          'import sys; sys.stderr.write("strawberry")'],
                         stderr=d)
        p.wait()
        os.lseek(d, 0, 0)
254
        self.assertStderrEqual(os.read(d, 1024), "strawberry")
255 256

    def test_stderr_fileobj(self):
257
        # stderr is set to open file object
Tim Peters's avatar
Tim Peters committed
258
        tf = tempfile.TemporaryFile()
259 260 261 262 263
        p = subprocess.Popen([sys.executable, "-c",
                          'import sys; sys.stderr.write("strawberry")'],
                         stderr=tf)
        p.wait()
        tf.seek(0)
264
        self.assertStderrEqual(tf.read(), "strawberry")
265 266

    def test_stdout_stderr_pipe(self):
267
        # capture stdout and stderr to the same pipe
268
        p = subprocess.Popen([sys.executable, "-c",
269 270 271
                          'import sys;'
                          'sys.stdout.write("apple");'
                          'sys.stdout.flush();'
272 273 274
                          'sys.stderr.write("orange")'],
                         stdout=subprocess.PIPE,
                         stderr=subprocess.STDOUT)
275
        self.addCleanup(p.stdout.close)
276
        self.assertStderrEqual(p.stdout.read(), "appleorange")
277 278

    def test_stdout_stderr_file(self):
279
        # capture stdout and stderr to the same open file
280 281
        tf = tempfile.TemporaryFile()
        p = subprocess.Popen([sys.executable, "-c",
282 283 284
                          'import sys;'
                          'sys.stdout.write("apple");'
                          'sys.stdout.flush();'
285 286 287 288 289
                          'sys.stderr.write("orange")'],
                         stdout=tf,
                         stderr=tf)
        p.wait()
        tf.seek(0)
290
        self.assertStderrEqual(tf.read(), "appleorange")
291

292 293 294 295
    def test_stdout_filedes_of_stdout(self):
        # stdout is set to 1 (#1531862).
        cmd = r"import sys, os; sys.exit(os.write(sys.stdout.fileno(), '.\n'))"
        rc = subprocess.call([sys.executable, "-c", cmd], stdout=1)
296
        self.assertEqual(rc, 2)
297

298
    def test_cwd(self):
299
        tmpdir = tempfile.gettempdir()
300 301 302 303 304 305
        # We cannot use os.path.realpath to canonicalize the path,
        # since it doesn't expand Tru64 {memb} strings. See bug 1063571.
        cwd = os.getcwd()
        os.chdir(tmpdir)
        tmpdir = os.getcwd()
        os.chdir(cwd)
306
        p = subprocess.Popen([sys.executable, "-c",
307
                          'import sys,os;'
308 309 310
                          'sys.stdout.write(os.getcwd())'],
                         stdout=subprocess.PIPE,
                         cwd=tmpdir)
311
        self.addCleanup(p.stdout.close)
312 313
        normcase = os.path.normcase
        self.assertEqual(normcase(p.stdout.read()), normcase(tmpdir))
314 315 316 317 318

    def test_env(self):
        newenv = os.environ.copy()
        newenv["FRUIT"] = "orange"
        p = subprocess.Popen([sys.executable, "-c",
319
                          'import sys,os;'
320 321 322
                          'sys.stdout.write(os.getenv("FRUIT"))'],
                         stdout=subprocess.PIPE,
                         env=newenv)
323
        self.addCleanup(p.stdout.close)
324 325
        self.assertEqual(p.stdout.read(), "orange")

326 327
    def test_communicate_stdin(self):
        p = subprocess.Popen([sys.executable, "-c",
328 329
                              'import sys;'
                              'sys.exit(sys.stdin.read() == "pear")'],
330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347
                             stdin=subprocess.PIPE)
        p.communicate("pear")
        self.assertEqual(p.returncode, 1)

    def test_communicate_stdout(self):
        p = subprocess.Popen([sys.executable, "-c",
                              'import sys; sys.stdout.write("pineapple")'],
                             stdout=subprocess.PIPE)
        (stdout, stderr) = p.communicate()
        self.assertEqual(stdout, "pineapple")
        self.assertEqual(stderr, None)

    def test_communicate_stderr(self):
        p = subprocess.Popen([sys.executable, "-c",
                              'import sys; sys.stderr.write("pineapple")'],
                             stderr=subprocess.PIPE)
        (stdout, stderr) = p.communicate()
        self.assertEqual(stdout, None)
348
        self.assertStderrEqual(stderr, "pineapple")
349

350 351
    def test_communicate(self):
        p = subprocess.Popen([sys.executable, "-c",
352 353
                          'import sys,os;'
                          'sys.stderr.write("pineapple");'
354
                          'sys.stdout.write(sys.stdin.read())'],
Tim Peters's avatar
Tim Peters committed
355 356 357
                         stdin=subprocess.PIPE,
                         stdout=subprocess.PIPE,
                         stderr=subprocess.PIPE)
358 359 360
        self.addCleanup(p.stdout.close)
        self.addCleanup(p.stderr.close)
        self.addCleanup(p.stdin.close)
361 362
        (stdout, stderr) = p.communicate("banana")
        self.assertEqual(stdout, "banana")
363
        self.assertStderrEqual(stderr, "pineapple")
364

365 366
    # This test is Linux specific for simplicity to at least have
    # some coverage.  It is not a platform specific bug.
367 368 369 370 371 372 373 374 375 376 377 378 379 380
    @unittest.skipUnless(os.path.isdir('/proc/%d/fd' % os.getpid()),
                         "Linux specific")
    # Test for the fd leak reported in http://bugs.python.org/issue2791.
    def test_communicate_pipe_fd_leak(self):
        fd_directory = '/proc/%d/fd' % os.getpid()
        num_fds_before_popen = len(os.listdir(fd_directory))
        p = subprocess.Popen([sys.executable, "-c", "print()"],
                             stdout=subprocess.PIPE)
        p.communicate()
        num_fds_after_communicate = len(os.listdir(fd_directory))
        del p
        num_fds_after_destruction = len(os.listdir(fd_directory))
        self.assertEqual(num_fds_before_popen, num_fds_after_destruction)
        self.assertEqual(num_fds_before_popen, num_fds_after_communicate)
381

382
    def test_communicate_returns(self):
383
        # communicate() should return None if no redirection is active
Tim Peters's avatar
Tim Peters committed
384 385
        p = subprocess.Popen([sys.executable, "-c",
                              "import sys; sys.exit(47)"])
386 387 388 389 390
        (stdout, stderr) = p.communicate()
        self.assertEqual(stdout, None)
        self.assertEqual(stderr, None)

    def test_communicate_pipe_buf(self):
391
        # communicate() with writes larger than pipe_buf
392
        # This test will probably deadlock rather than fail, if
Tim Peters's avatar
Tim Peters committed
393
        # communicate() does not work properly.
394 395 396 397 398 399 400 401
        x, y = os.pipe()
        if mswindows:
            pipe_buf = 512
        else:
            pipe_buf = os.fpathconf(x, "PC_PIPE_BUF")
        os.close(x)
        os.close(y)
        p = subprocess.Popen([sys.executable, "-c",
Tim Peters's avatar
Tim Peters committed
402
                          'import sys,os;'
403 404
                          'sys.stdout.write(sys.stdin.read(47));'
                          'sys.stderr.write("xyz"*%d);'
405
                          'sys.stdout.write(sys.stdin.read())' % pipe_buf],
Tim Peters's avatar
Tim Peters committed
406 407 408
                         stdin=subprocess.PIPE,
                         stdout=subprocess.PIPE,
                         stderr=subprocess.PIPE)
409 410 411
        self.addCleanup(p.stdout.close)
        self.addCleanup(p.stderr.close)
        self.addCleanup(p.stdin.close)
412 413 414 415 416
        string_to_write = "abc"*pipe_buf
        (stdout, stderr) = p.communicate(string_to_write)
        self.assertEqual(stdout, string_to_write)

    def test_writes_before_communicate(self):
417
        # stdin.write before communicate()
418
        p = subprocess.Popen([sys.executable, "-c",
419
                          'import sys,os;'
420
                          'sys.stdout.write(sys.stdin.read())'],
Tim Peters's avatar
Tim Peters committed
421 422 423
                         stdin=subprocess.PIPE,
                         stdout=subprocess.PIPE,
                         stderr=subprocess.PIPE)
424 425 426
        self.addCleanup(p.stdout.close)
        self.addCleanup(p.stderr.close)
        self.addCleanup(p.stdin.close)
427 428 429
        p.stdin.write("banana")
        (stdout, stderr) = p.communicate("split")
        self.assertEqual(stdout, "bananasplit")
430
        self.assertStderrEqual(stderr, "")
Tim Peters's avatar
Tim Peters committed
431

432 433
    def test_universal_newlines(self):
        p = subprocess.Popen([sys.executable, "-c",
Tim Peters's avatar
Tim Peters committed
434 435 436 437 438 439 440 441 442
                          'import sys,os;' + SETBINARY +
                          'sys.stdout.write("line1\\n");'
                          'sys.stdout.flush();'
                          'sys.stdout.write("line2\\r");'
                          'sys.stdout.flush();'
                          'sys.stdout.write("line3\\r\\n");'
                          'sys.stdout.flush();'
                          'sys.stdout.write("line4\\r");'
                          'sys.stdout.flush();'
443
                          'sys.stdout.write("\\nline5");'
Tim Peters's avatar
Tim Peters committed
444
                          'sys.stdout.flush();'
445 446 447
                          'sys.stdout.write("\\nline6");'],
                         stdout=subprocess.PIPE,
                         universal_newlines=1)
448
        self.addCleanup(p.stdout.close)
449
        stdout = p.stdout.read()
450
        if hasattr(file, 'newlines'):
451
            # Interpreter with universal newline support
Tim Peters's avatar
Tim Peters committed
452 453
            self.assertEqual(stdout,
                             "line1\nline2\nline3\nline4\nline5\nline6")
454 455
        else:
            # Interpreter without universal newline support
Tim Peters's avatar
Tim Peters committed
456 457
            self.assertEqual(stdout,
                             "line1\nline2\rline3\r\nline4\r\nline5\nline6")
458 459

    def test_universal_newlines_communicate(self):
460
        # universal newlines through communicate()
461
        p = subprocess.Popen([sys.executable, "-c",
Tim Peters's avatar
Tim Peters committed
462 463 464 465 466 467 468 469 470
                          'import sys,os;' + SETBINARY +
                          'sys.stdout.write("line1\\n");'
                          'sys.stdout.flush();'
                          'sys.stdout.write("line2\\r");'
                          'sys.stdout.flush();'
                          'sys.stdout.write("line3\\r\\n");'
                          'sys.stdout.flush();'
                          'sys.stdout.write("line4\\r");'
                          'sys.stdout.flush();'
471
                          'sys.stdout.write("\\nline5");'
Tim Peters's avatar
Tim Peters committed
472
                          'sys.stdout.flush();'
473 474 475
                          'sys.stdout.write("\\nline6");'],
                         stdout=subprocess.PIPE, stderr=subprocess.PIPE,
                         universal_newlines=1)
476 477
        self.addCleanup(p.stdout.close)
        self.addCleanup(p.stderr.close)
478
        (stdout, stderr) = p.communicate()
479
        if hasattr(file, 'newlines'):
480
            # Interpreter with universal newline support
Tim Peters's avatar
Tim Peters committed
481 482
            self.assertEqual(stdout,
                             "line1\nline2\nline3\nline4\nline5\nline6")
483 484
        else:
            # Interpreter without universal newline support
485 486
            self.assertEqual(stdout,
                             "line1\nline2\rline3\r\nline4\r\nline5\nline6")
487 488

    def test_no_leaking(self):
489
        # Make sure we leak no resources
490
        if not mswindows:
491 492
            max_handles = 1026 # too much for most UNIX systems
        else:
493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523
            max_handles = 2050 # too much for (at least some) Windows setups
        handles = []
        try:
            for i in range(max_handles):
                try:
                    handles.append(os.open(test_support.TESTFN,
                                           os.O_WRONLY | os.O_CREAT))
                except OSError as e:
                    if e.errno != errno.EMFILE:
                        raise
                    break
            else:
                self.skipTest("failed to reach the file descriptor limit "
                    "(tried %d)" % max_handles)
            # Close a couple of them (should be enough for a subprocess)
            for i in range(10):
                os.close(handles.pop())
            # Loop creating some subprocesses. If one of them leaks some fds,
            # the next loop iteration will fail by reaching the max fd limit.
            for i in range(15):
                p = subprocess.Popen([sys.executable, "-c",
                                      "import sys;"
                                      "sys.stdout.write(sys.stdin.read())"],
                                     stdin=subprocess.PIPE,
                                     stdout=subprocess.PIPE,
                                     stderr=subprocess.PIPE)
                data = p.communicate(b"lime")[0]
                self.assertEqual(data, b"lime")
        finally:
            for h in handles:
                os.close(h)
524 525 526 527 528 529

    def test_list2cmdline(self):
        self.assertEqual(subprocess.list2cmdline(['a b c', 'd', 'e']),
                         '"a b c" d e')
        self.assertEqual(subprocess.list2cmdline(['ab"c', '\\', 'd']),
                         'ab\\"c \\ d')
530 531
        self.assertEqual(subprocess.list2cmdline(['ab"c', ' \\', 'd']),
                         'ab\\"c " \\\\" d')
532 533 534 535 536 537 538 539
        self.assertEqual(subprocess.list2cmdline(['a\\\\\\b', 'de fg', 'h']),
                         'a\\\\\\b "de fg" h')
        self.assertEqual(subprocess.list2cmdline(['a\\"b', 'c', 'd']),
                         'a\\\\\\"b c d')
        self.assertEqual(subprocess.list2cmdline(['a\\\\b c', 'd', 'e']),
                         '"a\\\\b c" d e')
        self.assertEqual(subprocess.list2cmdline(['a\\\\b\\ c', 'd', 'e']),
                         '"a\\\\b\\ c" d e')
540 541
        self.assertEqual(subprocess.list2cmdline(['ab', '']),
                         'ab ""')
542 543 544 545


    def test_poll(self):
        p = subprocess.Popen([sys.executable,
546 547 548 549 550 551 552 553 554
                          "-c", "import time; time.sleep(1)"])
        count = 0
        while p.poll() is None:
            time.sleep(0.1)
            count += 1
        # We expect that the poll loop probably went around about 10 times,
        # but, based on system scheduling we can't control, it's possible
        # poll() never returned None.  It "should be" very rare that it
        # didn't go around at least twice.
555
        self.assertGreaterEqual(count, 2)
556 557 558 559 560 561 562 563 564 565
        # Subsequent invocations should just return the returncode
        self.assertEqual(p.poll(), 0)


    def test_wait(self):
        p = subprocess.Popen([sys.executable,
                          "-c", "import time; time.sleep(2)"])
        self.assertEqual(p.wait(), 0)
        # Subsequent invocations should just return the returncode
        self.assertEqual(p.wait(), 0)
Tim Peters's avatar
Tim Peters committed
566

567 568 569 570

    def test_invalid_bufsize(self):
        # an invalid type of the bufsize argument should raise
        # TypeError.
571
        with self.assertRaises(TypeError):
572 573
            subprocess.Popen([sys.executable, "-c", "pass"], "orange")

574 575 576 577 578 579 580
    def test_leaking_fds_on_error(self):
        # see bug #5179: Popen leaks file descriptors to PIPEs if
        # the child fails to execute; this will eventually exhaust
        # the maximum number of open fds. 1024 seems a very common
        # value for that limit, but Windows has 2048, so we loop
        # 1024 times (each call leaked two fds).
        for i in range(1024):
581 582
            # Windows raises IOError.  Others raise OSError.
            with self.assertRaises(EnvironmentError) as c:
583 584 585
                subprocess.Popen(['nonexisting_i_hope'],
                                 stdout=subprocess.PIPE,
                                 stderr=subprocess.PIPE)
586 587
            # ignore errors that indicate the command was not found
            if c.exception.errno not in (errno.ENOENT, errno.EACCES):
588 589
                raise c.exception

590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609
    def test_handles_closed_on_exception(self):
        # If CreateProcess exits with an error, ensure the
        # duplicate output handles are released
        ifhandle, ifname = mkstemp()
        ofhandle, ofname = mkstemp()
        efhandle, efname = mkstemp()
        try:
            subprocess.Popen (["*"], stdin=ifhandle, stdout=ofhandle,
              stderr=efhandle)
        except OSError:
            os.close(ifhandle)
            os.remove(ifname)
            os.close(ofhandle)
            os.remove(ofname)
            os.close(efhandle)
            os.remove(efname)
        self.assertFalse(os.path.exists(ifname))
        self.assertFalse(os.path.exists(ofname))
        self.assertFalse(os.path.exists(efname))

610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627
    def test_communicate_epipe(self):
        # Issue 10963: communicate() should hide EPIPE
        p = subprocess.Popen([sys.executable, "-c", 'pass'],
                             stdin=subprocess.PIPE,
                             stdout=subprocess.PIPE,
                             stderr=subprocess.PIPE)
        self.addCleanup(p.stdout.close)
        self.addCleanup(p.stderr.close)
        self.addCleanup(p.stdin.close)
        p.communicate("x" * 2**20)

    def test_communicate_epipe_only_stdin(self):
        # Issue 10963: communicate() should hide EPIPE
        p = subprocess.Popen([sys.executable, "-c", 'pass'],
                             stdin=subprocess.PIPE)
        self.addCleanup(p.stdin.close)
        time.sleep(2)
        p.communicate("x" * 2**20)
628 629 630 631 632 633 634 635 636 637 638 639 640 641 642

# context manager
class _SuppressCoreFiles(object):
    """Try to prevent core files from being created."""
    old_limit = None

    def __enter__(self):
        """Try to save previous ulimit, then set it to (0, 0)."""
        try:
            import resource
            self.old_limit = resource.getrlimit(resource.RLIMIT_CORE)
            resource.setrlimit(resource.RLIMIT_CORE, (0, 0))
        except (ImportError, ValueError, resource.error):
            pass

643 644 645 646 647 648 649 650 651 652 653 654 655 656
        if sys.platform == 'darwin':
            # Check if the 'Crash Reporter' on OSX was configured
            # in 'Developer' mode and warn that it will get triggered
            # when it is.
            #
            # This assumes that this context manager is used in tests
            # that might trigger the next manager.
            value = subprocess.Popen(['/usr/bin/defaults', 'read',
                    'com.apple.CrashReporter', 'DialogType'],
                    stdout=subprocess.PIPE).communicate()[0]
            if value.strip() == b'developer':
                print "this tests triggers the Crash Reporter, that is intentional"
                sys.stdout.flush()

657 658 659 660 661 662 663 664 665 666
    def __exit__(self, *args):
        """Return core file behavior to default."""
        if self.old_limit is None:
            return
        try:
            import resource
            resource.setrlimit(resource.RLIMIT_CORE, self.old_limit)
        except (ImportError, ValueError, resource.error):
            pass

667 668
    @unittest.skipUnless(hasattr(signal, 'SIGALRM'),
                         "Requires signal.SIGALRM")
669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684
    def test_communicate_eintr(self):
        # Issue #12493: communicate() should handle EINTR
        def handler(signum, frame):
            pass
        old_handler = signal.signal(signal.SIGALRM, handler)
        self.addCleanup(signal.signal, signal.SIGALRM, old_handler)

        # the process is running for 2 seconds
        args = [sys.executable, "-c", 'import time; time.sleep(2)']
        for stream in ('stdout', 'stderr'):
            kw = {stream: subprocess.PIPE}
            with subprocess.Popen(args, **kw) as process:
                signal.alarm(1)
                # communicate() will be interrupted by SIGALRM
                process.communicate()

685

Florent Xicluna's avatar
Florent Xicluna committed
686
@unittest.skipIf(mswindows, "POSIX specific tests")
687
class POSIXProcessTestCase(BaseTestCase):
688

689 690 691 692
    def test_exceptions(self):
        # caught & re-raised exceptions
        with self.assertRaises(OSError) as c:
            p = subprocess.Popen([sys.executable, "-c", ""],
693
                                 cwd="/this/path/does/not/exist")
694 695
        # The attribute child_traceback should contain "os.chdir" somewhere.
        self.assertIn("os.chdir", c.exception.child_traceback)
696

697 698 699
    def test_run_abort(self):
        # returncode handles signal termination
        with _SuppressCoreFiles():
700
            p = subprocess.Popen([sys.executable, "-c",
701 702 703 704 705 706 707 708 709
                                  "import os; os.abort()"])
            p.wait()
        self.assertEqual(-p.returncode, signal.SIGABRT)

    def test_preexec(self):
        # preexec function
        p = subprocess.Popen([sys.executable, "-c",
                              "import sys, os;"
                              "sys.stdout.write(os.getenv('FRUIT'))"],
710 711
                             stdout=subprocess.PIPE,
                             preexec_fn=lambda: os.putenv("FRUIT", "apple"))
712
        self.addCleanup(p.stdout.close)
713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745
        self.assertEqual(p.stdout.read(), "apple")

    def test_args_string(self):
        # args is a string
        f, fname = mkstemp()
        os.write(f, "#!/bin/sh\n")
        os.write(f, "exec '%s' -c 'import sys; sys.exit(47)'\n" %
                    sys.executable)
        os.close(f)
        os.chmod(fname, 0o700)
        p = subprocess.Popen(fname)
        p.wait()
        os.remove(fname)
        self.assertEqual(p.returncode, 47)

    def test_invalid_args(self):
        # invalid arguments should raise ValueError
        self.assertRaises(ValueError, subprocess.call,
                          [sys.executable, "-c",
                           "import sys; sys.exit(47)"],
                          startupinfo=47)
        self.assertRaises(ValueError, subprocess.call,
                          [sys.executable, "-c",
                           "import sys; sys.exit(47)"],
                          creationflags=47)

    def test_shell_sequence(self):
        # Run command through the shell (sequence)
        newenv = os.environ.copy()
        newenv["FRUIT"] = "apple"
        p = subprocess.Popen(["echo $FRUIT"], shell=1,
                             stdout=subprocess.PIPE,
                             env=newenv)
746
        self.addCleanup(p.stdout.close)
747 748 749 750 751 752 753 754 755
        self.assertEqual(p.stdout.read().strip(), "apple")

    def test_shell_string(self):
        # Run command through the shell (string)
        newenv = os.environ.copy()
        newenv["FRUIT"] = "apple"
        p = subprocess.Popen("echo $FRUIT", shell=1,
                             stdout=subprocess.PIPE,
                             env=newenv)
756
        self.addCleanup(p.stdout.close)
757 758 759 760 761 762 763 764 765 766 767 768 769 770
        self.assertEqual(p.stdout.read().strip(), "apple")

    def test_call_string(self):
        # call() function with string argument on UNIX
        f, fname = mkstemp()
        os.write(f, "#!/bin/sh\n")
        os.write(f, "exec '%s' -c 'import sys; sys.exit(47)'\n" %
                    sys.executable)
        os.close(f)
        os.chmod(fname, 0700)
        rc = subprocess.call(fname)
        os.remove(fname)
        self.assertEqual(rc, 47)

771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787
    def test_specific_shell(self):
        # Issue #9265: Incorrect name passed as arg[0].
        shells = []
        for prefix in ['/bin', '/usr/bin/', '/usr/local/bin']:
            for name in ['bash', 'ksh']:
                sh = os.path.join(prefix, name)
                if os.path.isfile(sh):
                    shells.append(sh)
        if not shells: # Will probably work for any shell but csh.
            self.skipTest("bash or ksh required for this test")
        sh = '/bin/sh'
        if os.path.isfile(sh) and not os.path.islink(sh):
            # Test will fail if /bin/sh is a symlink to csh.
            shells.append(sh)
        for sh in shells:
            p = subprocess.Popen("echo $0", executable=sh, shell=True,
                                 stdout=subprocess.PIPE)
788
            self.addCleanup(p.stdout.close)
789 790
            self.assertEqual(p.stdout.read().strip(), sh)

791
    def _kill_process(self, method, *args):
792 793
        # Do not inherit file handles from the parent.
        # It should fix failures on some platforms.
794 795 796 797 798 799 800 801 802 803 804 805 806 807
        p = subprocess.Popen([sys.executable, "-c", """if 1:
                             import sys, time
                             sys.stdout.write('x\\n')
                             sys.stdout.flush()
                             time.sleep(30)
                             """],
                             close_fds=True,
                             stdin=subprocess.PIPE,
                             stdout=subprocess.PIPE,
                             stderr=subprocess.PIPE)
        # Wait for the interpreter to be completely initialized before
        # sending any signal.
        p.stdout.read(1)
        getattr(p, method)(*args)
808 809 810 811
        return p

    def test_send_signal(self):
        p = self._kill_process('send_signal', signal.SIGINT)
812
        _, stderr = p.communicate()
813
        self.assertIn('KeyboardInterrupt', stderr)
814
        self.assertNotEqual(p.wait(), 0)
815 816

    def test_kill(self):
817
        p = self._kill_process('kill')
818 819
        _, stderr = p.communicate()
        self.assertStderrEqual(stderr, '')
820 821 822
        self.assertEqual(p.wait(), -signal.SIGKILL)

    def test_terminate(self):
823
        p = self._kill_process('terminate')
824 825
        _, stderr = p.communicate()
        self.assertStderrEqual(stderr, '')
826 827
        self.assertEqual(p.wait(), -signal.SIGTERM)

828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879
    def check_close_std_fds(self, fds):
        # Issue #9905: test that subprocess pipes still work properly with
        # some standard fds closed
        stdin = 0
        newfds = []
        for a in fds:
            b = os.dup(a)
            newfds.append(b)
            if a == 0:
                stdin = b
        try:
            for fd in fds:
                os.close(fd)
            out, err = subprocess.Popen([sys.executable, "-c",
                              'import sys;'
                              'sys.stdout.write("apple");'
                              'sys.stdout.flush();'
                              'sys.stderr.write("orange")'],
                       stdin=stdin,
                       stdout=subprocess.PIPE,
                       stderr=subprocess.PIPE).communicate()
            err = test_support.strip_python_stderr(err)
            self.assertEqual((out, err), (b'apple', b'orange'))
        finally:
            for b, a in zip(newfds, fds):
                os.dup2(b, a)
            for b in newfds:
                os.close(b)

    def test_close_fd_0(self):
        self.check_close_std_fds([0])

    def test_close_fd_1(self):
        self.check_close_std_fds([1])

    def test_close_fd_2(self):
        self.check_close_std_fds([2])

    def test_close_fds_0_1(self):
        self.check_close_std_fds([0, 1])

    def test_close_fds_0_2(self):
        self.check_close_std_fds([0, 2])

    def test_close_fds_1_2(self):
        self.check_close_std_fds([1, 2])

    def test_close_fds_0_1_2(self):
        # Issue #10806: test that subprocess pipes still work properly with
        # all standard fds closed.
        self.check_close_std_fds([0, 1, 2])

880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937
    def check_swap_fds(self, stdin_no, stdout_no, stderr_no):
        # open up some temporary files
        temps = [mkstemp() for i in range(3)]
        temp_fds = [fd for fd, fname in temps]
        try:
            # unlink the files -- we won't need to reopen them
            for fd, fname in temps:
                os.unlink(fname)

            # save a copy of the standard file descriptors
            saved_fds = [os.dup(fd) for fd in range(3)]
            try:
                # duplicate the temp files over the standard fd's 0, 1, 2
                for fd, temp_fd in enumerate(temp_fds):
                    os.dup2(temp_fd, fd)

                # write some data to what will become stdin, and rewind
                os.write(stdin_no, b"STDIN")
                os.lseek(stdin_no, 0, 0)

                # now use those files in the given order, so that subprocess
                # has to rearrange them in the child
                p = subprocess.Popen([sys.executable, "-c",
                    'import sys; got = sys.stdin.read();'
                    'sys.stdout.write("got %s"%got); sys.stderr.write("err")'],
                    stdin=stdin_no,
                    stdout=stdout_no,
                    stderr=stderr_no)
                p.wait()

                for fd in temp_fds:
                    os.lseek(fd, 0, 0)

                out = os.read(stdout_no, 1024)
                err = test_support.strip_python_stderr(os.read(stderr_no, 1024))
            finally:
                for std, saved in enumerate(saved_fds):
                    os.dup2(saved, std)
                    os.close(saved)

            self.assertEqual(out, b"got STDIN")
            self.assertEqual(err, b"err")

        finally:
            for fd in temp_fds:
                os.close(fd)

    # When duping fds, if there arises a situation where one of the fds is
    # either 0, 1 or 2, it is possible that it is overwritten (#12607).
    # This tests all combinations of this.
    def test_swap_fds(self):
        self.check_swap_fds(0, 1, 2)
        self.check_swap_fds(0, 2, 1)
        self.check_swap_fds(1, 0, 2)
        self.check_swap_fds(1, 2, 0)
        self.check_swap_fds(2, 0, 1)
        self.check_swap_fds(2, 1, 0)

938 939 940 941 942 943 944 945 946 947
    def test_wait_when_sigchild_ignored(self):
        # NOTE: sigchild_ignore.py may not be an effective test on all OSes.
        sigchild_ignore = test_support.findfile("sigchild_ignore.py",
                                                subdir="subprocessdata")
        p = subprocess.Popen([sys.executable, sigchild_ignore],
                             stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        stdout, stderr = p.communicate()
        self.assertEqual(0, p.returncode, "sigchild_ignore.py exited"
                         " non-zero with this error:\n%s" % stderr)

948 949 950 951 952 953 954 955 956 957
    def test_zombie_fast_process_del(self):
        # Issue #12650: on Unix, if Popen.__del__() was called before the
        # process exited, it wouldn't be added to subprocess._active, and would
        # remain a zombie.
        # spawn a Popen, and delete its reference before it exits
        p = subprocess.Popen([sys.executable, "-c",
                              'import sys, time;'
                              'time.sleep(0.2)'],
                             stdout=subprocess.PIPE,
                             stderr=subprocess.PIPE)
958 959
        self.addCleanup(p.stdout.close)
        self.addCleanup(p.stderr.close)
960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976
        ident = id(p)
        pid = p.pid
        del p
        # check that p is in the active processes list
        self.assertIn(ident, [id(o) for o in subprocess._active])

    def test_leak_fast_process_del_killed(self):
        # Issue #12650: on Unix, if Popen.__del__() was called before the
        # process exited, and the process got killed by a signal, it would never
        # be removed from subprocess._active, which triggered a FD and memory
        # leak.
        # spawn a Popen, delete its reference and kill it
        p = subprocess.Popen([sys.executable, "-c",
                              'import time;'
                              'time.sleep(3)'],
                             stdout=subprocess.PIPE,
                             stderr=subprocess.PIPE)
977 978
        self.addCleanup(p.stdout.close)
        self.addCleanup(p.stderr.close)
979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997
        ident = id(p)
        pid = p.pid
        del p
        os.kill(pid, signal.SIGKILL)
        # check that p is in the active processes list
        self.assertIn(ident, [id(o) for o in subprocess._active])

        # let some time for the process to exit, and create a new Popen: this
        # should trigger the wait() of p
        time.sleep(0.2)
        with self.assertRaises(EnvironmentError) as c:
            with subprocess.Popen(['nonexisting_i_hope'],
                                  stdout=subprocess.PIPE,
                                  stderr=subprocess.PIPE) as proc:
                pass
        # p should have been wait()ed on, and removed from the _active list
        self.assertRaises(OSError, os.waitpid, pid, 0)
        self.assertNotIn(ident, [id(o) for o in subprocess._active])

998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028
    def test_pipe_cloexec(self):
        # Issue 12786: check that the communication pipes' FDs are set CLOEXEC,
        # and are not inherited by another child process.
        p1 = subprocess.Popen([sys.executable, "-c",
                               'import os;'
                               'os.read(0, 1)'
                              ],
                              stdin=subprocess.PIPE, stdout=subprocess.PIPE,
                              stderr=subprocess.PIPE)

        p2 = subprocess.Popen([sys.executable, "-c", """if True:
                               import os, errno, sys
                               for fd in %r:
                                   try:
                                       os.close(fd)
                                   except OSError as e:
                                       if e.errno != errno.EBADF:
                                           raise
                                   else:
                                       sys.exit(1)
                               sys.exit(0)
                               """ % [f.fileno() for f in (p1.stdin, p1.stdout,
                                                           p1.stderr)]
                              ],
                              stdin=subprocess.PIPE, stdout=subprocess.PIPE,
                              stderr=subprocess.PIPE, close_fds=False)
        p1.communicate('foo')
        _, stderr = p2.communicate()

        self.assertEqual(p2.returncode, 0, "Unexpected error: " + repr(stderr))

1029

Florent Xicluna's avatar
Florent Xicluna committed
1030
@unittest.skipUnless(mswindows, "Windows specific tests")
1031
class Win32ProcessTestCase(BaseTestCase):
1032

1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045
    def test_startupinfo(self):
        # startupinfo argument
        # We uses hardcoded constants, because we do not want to
        # depend on win32all.
        STARTF_USESHOWWINDOW = 1
        SW_MAXIMIZE = 3
        startupinfo = subprocess.STARTUPINFO()
        startupinfo.dwFlags = STARTF_USESHOWWINDOW
        startupinfo.wShowWindow = SW_MAXIMIZE
        # Since Python is a console process, it won't be affected
        # by wShowWindow, but the argument should be silently
        # ignored
        subprocess.call([sys.executable, "-c", "import sys; sys.exit(0)"],
1046 1047
                        startupinfo=startupinfo)

1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071
    def test_creationflags(self):
        # creationflags argument
        CREATE_NEW_CONSOLE = 16
        sys.stderr.write("    a DOS box should flash briefly ...\n")
        subprocess.call(sys.executable +
                        ' -c "import time; time.sleep(0.25)"',
                        creationflags=CREATE_NEW_CONSOLE)

    def test_invalid_args(self):
        # invalid arguments should raise ValueError
        self.assertRaises(ValueError, subprocess.call,
                          [sys.executable, "-c",
                           "import sys; sys.exit(47)"],
                          preexec_fn=lambda: 1)
        self.assertRaises(ValueError, subprocess.call,
                          [sys.executable, "-c",
                           "import sys; sys.exit(47)"],
                          stdout=subprocess.PIPE,
                          close_fds=True)

    def test_close_fds(self):
        # close file descriptors
        rc = subprocess.call([sys.executable, "-c",
                              "import sys; sys.exit(47)"],
1072
                              close_fds=True)
1073
        self.assertEqual(rc, 47)
1074

1075 1076 1077 1078 1079 1080 1081
    def test_shell_sequence(self):
        # Run command through the shell (sequence)
        newenv = os.environ.copy()
        newenv["FRUIT"] = "physalis"
        p = subprocess.Popen(["set"], shell=1,
                             stdout=subprocess.PIPE,
                             env=newenv)
1082
        self.addCleanup(p.stdout.close)
1083
        self.assertIn("physalis", p.stdout.read())
1084

1085 1086 1087 1088 1089 1090 1091
    def test_shell_string(self):
        # Run command through the shell (string)
        newenv = os.environ.copy()
        newenv["FRUIT"] = "physalis"
        p = subprocess.Popen("set", shell=1,
                             stdout=subprocess.PIPE,
                             env=newenv)
1092
        self.addCleanup(p.stdout.close)
1093
        self.assertIn("physalis", p.stdout.read())
1094

1095 1096 1097 1098 1099
    def test_call_string(self):
        # call() function with string argument on Windows
        rc = subprocess.call(sys.executable +
                             ' -c "import sys; sys.exit(47)"')
        self.assertEqual(rc, 47)
1100

1101
    def _kill_process(self, method, *args):
1102
        # Some win32 buildbot raises EOFError if stdin is inherited
1103 1104 1105 1106 1107 1108 1109 1110 1111
        p = subprocess.Popen([sys.executable, "-c", """if 1:
                             import sys, time
                             sys.stdout.write('x\\n')
                             sys.stdout.flush()
                             time.sleep(30)
                             """],
                             stdin=subprocess.PIPE,
                             stdout=subprocess.PIPE,
                             stderr=subprocess.PIPE)
1112 1113 1114
        self.addCleanup(p.stdout.close)
        self.addCleanup(p.stderr.close)
        self.addCleanup(p.stdin.close)
1115 1116 1117 1118
        # Wait for the interpreter to be completely initialized before
        # sending any signal.
        p.stdout.read(1)
        getattr(p, method)(*args)
1119 1120
        _, stderr = p.communicate()
        self.assertStderrEqual(stderr, '')
1121
        returncode = p.wait()
1122
        self.assertNotEqual(returncode, 0)
1123

1124 1125
    def test_send_signal(self):
        self._kill_process('send_signal', signal.SIGTERM)
1126

1127 1128
    def test_kill(self):
        self._kill_process('kill')
1129

1130 1131
    def test_terminate(self):
        self._kill_process('terminate')
1132 1133


1134 1135 1136 1137 1138 1139
@unittest.skipUnless(getattr(subprocess, '_has_poll', False),
                     "poll system call not supported")
class ProcessTestCaseNoPoll(ProcessTestCase):
    def setUp(self):
        subprocess._has_poll = False
        ProcessTestCase.setUp(self)
1140

1141 1142 1143
    def tearDown(self):
        subprocess._has_poll = True
        ProcessTestCase.tearDown(self)
1144 1145


1146
class HelperFunctionTests(unittest.TestCase):
1147
    @unittest.skipIf(mswindows, "errno and EINTR make no sense on windows")
1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163
    def test_eintr_retry_call(self):
        record_calls = []
        def fake_os_func(*args):
            record_calls.append(args)
            if len(record_calls) == 2:
                raise OSError(errno.EINTR, "fake interrupted system call")
            return tuple(reversed(args))

        self.assertEqual((999, 256),
                         subprocess._eintr_retry_call(fake_os_func, 256, 999))
        self.assertEqual([(256, 999)], record_calls)
        # This time there will be an EINTR so it will loop once.
        self.assertEqual((666,),
                         subprocess._eintr_retry_call(fake_os_func, 666))
        self.assertEqual([(256, 999), (666,), (666,)], record_calls)

1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182
@unittest.skipUnless(mswindows, "mswindows only")
class CommandsWithSpaces (BaseTestCase):

    def setUp(self):
        super(CommandsWithSpaces, self).setUp()
        f, fname = mkstemp(".py", "te st")
        self.fname = fname.lower ()
        os.write(f, b"import sys;"
                    b"sys.stdout.write('%d %s' % (len(sys.argv), [a.lower () for a in sys.argv]))"
        )
        os.close(f)

    def tearDown(self):
        os.remove(self.fname)
        super(CommandsWithSpaces, self).tearDown()

    def with_spaces(self, *args, **kwargs):
        kwargs['stdout'] = subprocess.PIPE
        p = subprocess.Popen(*args, **kwargs)
1183
        self.addCleanup(p.stdout.close)
1184 1185 1186 1187 1188 1189 1190
        self.assertEqual(
          p.stdout.read ().decode("mbcs"),
          "2 [%r, 'ab cd']" % self.fname
        )

    def test_shell_string_with_spaces(self):
        # call() function with string argument with spaces on Windows
1191 1192
        self.with_spaces('"%s" "%s" "%s"' % (sys.executable, self.fname,
                                             "ab cd"), shell=1)
1193 1194 1195

    def test_shell_sequence_with_spaces(self):
        # call() function with sequence argument with spaces on Windows
1196
        self.with_spaces([sys.executable, self.fname, "ab cd"], shell=1)
1197 1198 1199 1200 1201 1202 1203 1204 1205 1206

    def test_noshell_string_with_spaces(self):
        # call() function with string argument with spaces on Windows
        self.with_spaces('"%s" "%s" "%s"' % (sys.executable, self.fname,
                             "ab cd"))

    def test_noshell_sequence_with_spaces(self):
        # call() function with sequence argument with spaces on Windows
        self.with_spaces([sys.executable, self.fname, "ab cd"])

1207
def test_main():
1208 1209 1210
    unit_tests = (ProcessTestCase,
                  POSIXProcessTestCase,
                  Win32ProcessTestCase,
1211
                  ProcessTestCaseNoPoll,
1212 1213
                  HelperFunctionTests,
                  CommandsWithSpaces)
1214

1215
    test_support.run_unittest(*unit_tests)
1216
    test_support.reap_children()
1217 1218 1219

if __name__ == "__main__":
    test_main()