Kaydet (Commit) 5f157bba authored tarafından Corentin Henry's avatar Corentin Henry

implement stream demultiplexing for exec commands

fixes https://github.com/docker/docker-py/issues/1952Signed-off-by: 's avatarCorentin Henry <corentinhenry@gmail.com>
üst e1e40487
......@@ -32,7 +32,7 @@ from ..errors import (
from ..tls import TLSConfig
from ..transport import SSLAdapter, UnixAdapter
from ..utils import utils, check_resource, update_headers, config
from ..utils.socket import frames_iter, socket_raw_iter
from ..utils.socket import frames_iter, consume_socket_output, demux_adaptor
from ..utils.json_stream import json_stream
try:
from ..transport import NpipeAdapter
......@@ -381,19 +381,23 @@ class APIClient(
for out in response.iter_content(chunk_size, decode):
yield out
def _read_from_socket(self, response, stream, tty=False):
def _read_from_socket(self, response, stream, tty=True, demux=False):
socket = self._get_raw_response_socket(response)
gen = None
if tty is False:
gen = frames_iter(socket)
gen = frames_iter(socket, tty)
if demux:
# The generator will output tuples (stdout, stderr)
gen = (demux_adaptor(*frame) for frame in gen)
else:
gen = socket_raw_iter(socket)
# The generator will output strings
gen = (data for (_, data) in gen)
if stream:
return gen
else:
return six.binary_type().join(gen)
# Wait for all the frames, concatenate them, and return the result
return consume_socket_output(gen, demux=demux)
def _disable_socket_timeout(self, socket):
""" Depending on the combination of python version and whether we're
......
......@@ -13,7 +13,7 @@ from ..types import (
class ContainerApiMixin(object):
@utils.check_resource('container')
def attach(self, container, stdout=True, stderr=True,
stream=False, logs=False):
stream=False, logs=False, demux=False):
"""
Attach to a container.
......@@ -28,11 +28,15 @@ class ContainerApiMixin(object):
stream (bool): Return container output progressively as an iterator
of strings, rather than a single string.
logs (bool): Include the container's previous output.
demux (bool): Keep stdout and stderr separate.
Returns:
By default, the container's output as a single string.
By default, the container's output as a single string (two if
``demux=True``: one for stdout and one for stderr).
If ``stream=True``, an iterator of output strings.
If ``stream=True``, an iterator of output strings. If
``demux=True``, two iterators are returned: one for stdout and one
for stderr.
Raises:
:py:class:`docker.errors.APIError`
......@@ -54,8 +58,7 @@ class ContainerApiMixin(object):
response = self._post(u, headers=headers, params=params, stream=True)
output = self._read_from_socket(
response, stream, self._check_is_tty(container)
)
response, stream, self._check_is_tty(container), demux=demux)
if stream:
return CancellableStream(output, response)
......
......@@ -118,7 +118,7 @@ class ExecApiMixin(object):
@utils.check_resource('exec_id')
def exec_start(self, exec_id, detach=False, tty=False, stream=False,
socket=False):
socket=False, demux=False):
"""
Start a previously set up exec instance.
......@@ -130,11 +130,14 @@ class ExecApiMixin(object):
stream (bool): Stream response data. Default: False
socket (bool): Return the connection socket to allow custom
read/write operations.
demux (bool): Separate return stdin, stdout and stderr separately
Returns:
(generator or str): If ``stream=True``, a generator yielding
response chunks. If ``socket=True``, a socket object for the
connection. A string containing response data otherwise.
(generator or str or tuple): If ``stream=True``, a generator
yielding response chunks. If ``socket=True``, a socket object for
the connection. A string containing response data otherwise. If
``demux=True``, stdin, stdout and stderr are separated.
Raises:
:py:class:`docker.errors.APIError`
......@@ -162,4 +165,4 @@ class ExecApiMixin(object):
return self._result(res)
if socket:
return self._get_raw_response_socket(res)
return self._read_from_socket(res, stream, tty)
return self._read_from_socket(res, stream, tty=tty, demux=demux)
......@@ -144,7 +144,7 @@ class Container(Model):
def exec_run(self, cmd, stdout=True, stderr=True, stdin=False, tty=False,
privileged=False, user='', detach=False, stream=False,
socket=False, environment=None, workdir=None):
socket=False, environment=None, workdir=None, demux=False):
"""
Run a command inside this container. Similar to
``docker exec``.
......@@ -166,6 +166,7 @@ class Container(Model):
the following format ``["PASSWORD=xxx"]`` or
``{"PASSWORD": "xxx"}``.
workdir (str): Path to working directory for this exec session
demux (bool): Return stdout and stderr separately
Returns:
(ExecResult): A tuple of (exit_code, output)
......@@ -180,6 +181,70 @@ class Container(Model):
Raises:
:py:class:`docker.errors.APIError`
If the server returns an error.
Example:
Create a container that runs in the background
>>> client = docker.from_env()
>>> container = client.containers.run(
... 'bfirsh/reticulate-splines', detach=True)
Prepare the command we are going to use. It prints "hello stdout"
in `stdout`, followed by "hello stderr" in `stderr`:
>>> cmd = '/bin/sh -c "echo hello stdout ; echo hello stderr >&2"'
We'll run this command with all four the combinations of ``stream``
and ``demux``.
With ``stream=False`` and ``demux=False``, the output is a string
that contains both the `stdout` and the `stderr` output:
>>> res = container.exec_run(cmd, stream=False, demux=False)
>>> res.output
b'hello stderr\nhello stdout\n'
With ``stream=True``, and ``demux=False``, the output is a
generator that yields strings containing the output of both
`stdout` and `stderr`:
>>> res = container.exec_run(cmd, stream=True, demux=False)
>>> next(res.output)
b'hello stdout\n'
>>> next(res.output)
b'hello stderr\n'
>>> next(res.output)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
StopIteration
With ``stream=True`` and ``demux=True``, the generator now
separates the streams, and yield tuples
``(stdout, stderr)``:
>>> res = container.exec_run(cmd, stream=True, demux=True)
>>> next(res.output)
(b'hello stdout\n', None)
>>> next(res.output)
(None, b'hello stderr\n')
>>> next(res.output)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
StopIteration
Finally, with ``stream=False`` and ``demux=True``, the whole output
is returned, but the streams are still separated:
>>> res = container.exec_run(cmd, stream=True, demux=True)
>>> next(res.output)
(b'hello stdout\n', None)
>>> next(res.output)
(None, b'hello stderr\n')
>>> next(res.output)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
StopIteration
"""
resp = self.client.api.exec_create(
self.id, cmd, stdout=stdout, stderr=stderr, stdin=stdin, tty=tty,
......@@ -187,7 +252,8 @@ class Container(Model):
workdir=workdir
)
exec_output = self.client.api.exec_start(
resp['Id'], detach=detach, tty=tty, stream=stream, socket=socket
resp['Id'], detach=detach, tty=tty, stream=stream, socket=socket,
demux=demux
)
if socket or stream:
return ExecResult(None, exec_output)
......
......@@ -12,6 +12,10 @@ except ImportError:
NpipeSocket = type(None)
STDOUT = 1
STDERR = 2
class SocketError(Exception):
pass
......@@ -51,28 +55,43 @@ def read_exactly(socket, n):
return data
def next_frame_size(socket):
def next_frame_header(socket):
"""
Returns the size of the next frame of data waiting to be read from socket,
according to the protocol defined here:
Returns the stream and size of the next frame of data waiting to be read
from socket, according to the protocol defined here:
https://docs.docker.com/engine/reference/api/docker_remote_api_v1.24/#/attach-to-a-container
https://docs.docker.com/engine/api/v1.24/#attach-to-a-container
"""
try:
data = read_exactly(socket, 8)
except SocketError:
return -1
return (-1, -1)
stream, actual = struct.unpack('>BxxxL', data)
return (stream, actual)
_, actual = struct.unpack('>BxxxL', data)
return actual
def frames_iter(socket, tty):
"""
Return a generator of frames read from socket. A frame is a tuple where
the first item is the stream number and the second item is a chunk of data.
If the tty setting is enabled, the streams are multiplexed into the stdout
stream.
"""
if tty:
return ((STDOUT, frame) for frame in frames_iter_tty(socket))
else:
return frames_iter_no_tty(socket)
def frames_iter(socket):
def frames_iter_no_tty(socket):
"""
Returns a generator of frames read from socket
Returns a generator of data read from the socket when the tty setting is
not enabled.
"""
while True:
n = next_frame_size(socket)
(stream, n) = next_frame_header(socket)
if n < 0:
break
while n > 0:
......@@ -84,13 +103,13 @@ def frames_iter(socket):
# We have reached EOF
return
n -= data_length
yield result
yield (stream, result)
def socket_raw_iter(socket):
def frames_iter_tty(socket):
"""
Returns a generator of data read from the socket.
This is used for non-multiplexed streams.
Return a generator of data read from the socket when the tty setting is
enabled.
"""
while True:
result = read(socket)
......@@ -98,3 +117,45 @@ def socket_raw_iter(socket):
# We have reached EOF
return
yield result
def consume_socket_output(frames, demux=False):
"""
Iterate through frames read from the socket and return the result.
Args:
demux (bool):
If False, stdout and stderr are multiplexed, and the result is the
concatenation of all the frames. If True, the streams are
demultiplexed, and the result is a 2-tuple where each item is the
concatenation of frames belonging to the same stream.
"""
if demux is False:
# If the streams are multiplexed, the generator returns strings, that
# we just need to concatenate.
return six.binary_type().join(frames)
# If the streams are demultiplexed, the generator returns tuples
# (stdin, stdout, stderr)
out = [six.binary_type(), six.binary_type()]
for frame in frames:
for stream_id in [STDOUT, STDERR]:
# It is guaranteed that for each frame, one and only one stream
# is not None.
if frame[stream_id] is not None:
out[stream_id] += frame[stream_id]
return tuple(out)
def demux_adaptor(stream_id, data):
"""
Utility to demultiplex stdout and stderr when reading frames from the
socket.
"""
if stream_id == STDOUT:
return (data, None)
elif stream_id == STDERR:
return (None, data)
else:
raise ValueError('{0} is not a valid stream'.format(stream_id))
......@@ -7,7 +7,7 @@ from datetime import datetime
import docker
from docker.constants import IS_WINDOWS_PLATFORM
from docker.utils.socket import next_frame_size
from docker.utils.socket import next_frame_header
from docker.utils.socket import read_exactly
import pytest
......@@ -1242,7 +1242,8 @@ class AttachContainerTest(BaseAPIIntegrationTest):
self.client.start(container)
next_size = next_frame_size(pty_stdout)
(stream, next_size) = next_frame_header(pty_stdout)
assert stream == 1 # correspond to stdout
assert next_size == len(line)
data = read_exactly(pty_stdout, next_size)
assert data.decode('utf-8') == line
......
from docker.utils.socket import next_frame_size
from docker.utils.socket import next_frame_header
from docker.utils.socket import read_exactly
from .base import BaseAPIIntegrationTest, BUSYBOX
......@@ -91,7 +91,8 @@ class ExecTest(BaseAPIIntegrationTest):
socket = self.client.exec_start(exec_id, socket=True)
self.addCleanup(socket.close)
next_size = next_frame_size(socket)
(stream, next_size) = next_frame_header(socket)
assert stream == 1 # stdout (0 = stdin, 1 = stdout, 2 = stderr)
assert next_size == len(line)
data = read_exactly(socket, next_size)
assert data.decode('utf-8') == line
......
......@@ -83,7 +83,7 @@ def fake_delete(self, url, *args, **kwargs):
return fake_request('DELETE', url, *args, **kwargs)
def fake_read_from_socket(self, response, stream, tty=False):
def fake_read_from_socket(self, response, stream, tty=False, demux=False):
return six.binary_type()
......
......@@ -417,7 +417,8 @@ class ContainerTest(unittest.TestCase):
workdir=None
)
client.api.exec_start.assert_called_with(
FAKE_EXEC_ID, detach=False, tty=False, stream=True, socket=False
FAKE_EXEC_ID, detach=False, tty=False, stream=True, socket=False,
demux=False,
)
def test_exec_run_failure(self):
......@@ -430,7 +431,8 @@ class ContainerTest(unittest.TestCase):
workdir=None
)
client.api.exec_start.assert_called_with(
FAKE_EXEC_ID, detach=False, tty=False, stream=False, socket=False
FAKE_EXEC_ID, detach=False, tty=False, stream=False, socket=False,
demux=False,
)
def test_export(self):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment