Kaydet (Commit) ee6ec4c6 authored tarafından Joffrey F's avatar Joffrey F

Merge branch 'master' of https://github.com/little-dude/docker-py into little-dude-master

......@@ -32,7 +32,7 @@ from ..errors import (
from ..tls import TLSConfig
from ..transport import SSLAdapter, UnixAdapter
from ..utils import utils, check_resource, update_headers, config
from ..utils.socket import frames_iter, socket_raw_iter
from ..utils.socket import frames_iter, consume_socket_output, demux_adaptor
from ..utils.json_stream import json_stream
try:
from ..transport import NpipeAdapter
......@@ -381,19 +381,23 @@ class APIClient(
for out in response.iter_content(chunk_size, decode):
yield out
def _read_from_socket(self, response, stream, tty=False):
def _read_from_socket(self, response, stream, tty=True, demux=False):
socket = self._get_raw_response_socket(response)
gen = None
if tty is False:
gen = frames_iter(socket)
gen = frames_iter(socket, tty)
if demux:
# The generator will output tuples (stdout, stderr)
gen = (demux_adaptor(*frame) for frame in gen)
else:
gen = socket_raw_iter(socket)
# The generator will output strings
gen = (data for (_, data) in gen)
if stream:
return gen
else:
return six.binary_type().join(gen)
# Wait for all the frames, concatenate them, and return the result
return consume_socket_output(gen, demux=demux)
def _disable_socket_timeout(self, socket):
""" Depending on the combination of python version and whether we're
......
......@@ -13,7 +13,7 @@ from ..types import (
class ContainerApiMixin(object):
@utils.check_resource('container')
def attach(self, container, stdout=True, stderr=True,
stream=False, logs=False):
stream=False, logs=False, demux=False):
"""
Attach to a container.
......@@ -28,11 +28,15 @@ class ContainerApiMixin(object):
stream (bool): Return container output progressively as an iterator
of strings, rather than a single string.
logs (bool): Include the container's previous output.
demux (bool): Keep stdout and stderr separate.
Returns:
By default, the container's output as a single string.
By default, the container's output as a single string (two if
``demux=True``: one for stdout and one for stderr).
If ``stream=True``, an iterator of output strings.
If ``stream=True``, an iterator of output strings. If
``demux=True``, two iterators are returned: one for stdout and one
for stderr.
Raises:
:py:class:`docker.errors.APIError`
......@@ -54,8 +58,7 @@ class ContainerApiMixin(object):
response = self._post(u, headers=headers, params=params, stream=True)
output = self._read_from_socket(
response, stream, self._check_is_tty(container)
)
response, stream, self._check_is_tty(container), demux=demux)
if stream:
return CancellableStream(output, response)
......
......@@ -118,7 +118,7 @@ class ExecApiMixin(object):
@utils.check_resource('exec_id')
def exec_start(self, exec_id, detach=False, tty=False, stream=False,
socket=False):
socket=False, demux=False):
"""
Start a previously set up exec instance.
......@@ -130,11 +130,14 @@ class ExecApiMixin(object):
stream (bool): Stream response data. Default: False
socket (bool): Return the connection socket to allow custom
read/write operations.
demux (bool): Return stdout and stderr separately
Returns:
(generator or str): If ``stream=True``, a generator yielding
response chunks. If ``socket=True``, a socket object for the
connection. A string containing response data otherwise.
(generator or str or tuple): If ``stream=True``, a generator
yielding response chunks. If ``socket=True``, a socket object for
the connection. A string containing response data otherwise. If
``demux=True``, stdout and stderr are separated.
Raises:
:py:class:`docker.errors.APIError`
......@@ -162,4 +165,4 @@ class ExecApiMixin(object):
return self._result(res)
if socket:
return self._get_raw_response_socket(res)
return self._read_from_socket(res, stream, tty)
return self._read_from_socket(res, stream, tty=tty, demux=demux)
......@@ -144,7 +144,7 @@ class Container(Model):
def exec_run(self, cmd, stdout=True, stderr=True, stdin=False, tty=False,
privileged=False, user='', detach=False, stream=False,
socket=False, environment=None, workdir=None):
socket=False, environment=None, workdir=None, demux=False):
"""
Run a command inside this container. Similar to
``docker exec``.
......@@ -166,6 +166,7 @@ class Container(Model):
the following format ``["PASSWORD=xxx"]`` or
``{"PASSWORD": "xxx"}``.
workdir (str): Path to working directory for this exec session
demux (bool): Return stdout and stderr separately
Returns:
(ExecResult): A tuple of (exit_code, output)
......@@ -180,6 +181,70 @@ class Container(Model):
Raises:
:py:class:`docker.errors.APIError`
If the server returns an error.
Example:
Create a container that runs in the background
>>> client = docker.from_env()
>>> container = client.containers.run(
... 'bfirsh/reticulate-splines', detach=True)
Prepare the command we are going to use. It prints "hello stdout"
in `stdout`, followed by "hello stderr" in `stderr`:
>>> cmd = '/bin/sh -c "echo hello stdout ; echo hello stderr >&2"'
We'll run this command with all four the combinations of ``stream``
and ``demux``.
With ``stream=False`` and ``demux=False``, the output is a string
that contains both the `stdout` and the `stderr` output:
>>> res = container.exec_run(cmd, stream=False, demux=False)
>>> res.output
b'hello stderr\nhello stdout\n'
With ``stream=True``, and ``demux=False``, the output is a
generator that yields strings containing the output of both
`stdout` and `stderr`:
>>> res = container.exec_run(cmd, stream=True, demux=False)
>>> next(res.output)
b'hello stdout\n'
>>> next(res.output)
b'hello stderr\n'
>>> next(res.output)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
StopIteration
With ``stream=True`` and ``demux=True``, the generator now
separates the streams, and yield tuples
``(stdout, stderr)``:
>>> res = container.exec_run(cmd, stream=True, demux=True)
>>> next(res.output)
(b'hello stdout\n', None)
>>> next(res.output)
(None, b'hello stderr\n')
>>> next(res.output)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
StopIteration
Finally, with ``stream=False`` and ``demux=True``, the whole output
is returned, but the streams are still separated:
>>> res = container.exec_run(cmd, stream=True, demux=True)
>>> next(res.output)
(b'hello stdout\n', None)
>>> next(res.output)
(None, b'hello stderr\n')
>>> next(res.output)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
StopIteration
"""
resp = self.client.api.exec_create(
self.id, cmd, stdout=stdout, stderr=stderr, stdin=stdin, tty=tty,
......@@ -187,7 +252,8 @@ class Container(Model):
workdir=workdir
)
exec_output = self.client.api.exec_start(
resp['Id'], detach=detach, tty=tty, stream=stream, socket=socket
resp['Id'], detach=detach, tty=tty, stream=stream, socket=socket,
demux=demux
)
if socket or stream:
return ExecResult(None, exec_output)
......
......@@ -12,6 +12,10 @@ except ImportError:
NpipeSocket = type(None)
STDOUT = 1
STDERR = 2
class SocketError(Exception):
pass
......@@ -51,28 +55,43 @@ def read_exactly(socket, n):
return data
def next_frame_size(socket):
def next_frame_header(socket):
"""
Returns the size of the next frame of data waiting to be read from socket,
according to the protocol defined here:
Returns the stream and size of the next frame of data waiting to be read
from socket, according to the protocol defined here:
https://docs.docker.com/engine/reference/api/docker_remote_api_v1.24/#/attach-to-a-container
https://docs.docker.com/engine/api/v1.24/#attach-to-a-container
"""
try:
data = read_exactly(socket, 8)
except SocketError:
return -1
return (-1, -1)
stream, actual = struct.unpack('>BxxxL', data)
return (stream, actual)
_, actual = struct.unpack('>BxxxL', data)
return actual
def frames_iter(socket, tty):
"""
Return a generator of frames read from socket. A frame is a tuple where
the first item is the stream number and the second item is a chunk of data.
If the tty setting is enabled, the streams are multiplexed into the stdout
stream.
"""
if tty:
return ((STDOUT, frame) for frame in frames_iter_tty(socket))
else:
return frames_iter_no_tty(socket)
def frames_iter(socket):
def frames_iter_no_tty(socket):
"""
Returns a generator of frames read from socket
Returns a generator of data read from the socket when the tty setting is
not enabled.
"""
while True:
n = next_frame_size(socket)
(stream, n) = next_frame_header(socket)
if n < 0:
break
while n > 0:
......@@ -84,13 +103,13 @@ def frames_iter(socket):
# We have reached EOF
return
n -= data_length
yield result
yield (stream, result)
def socket_raw_iter(socket):
def frames_iter_tty(socket):
"""
Returns a generator of data read from the socket.
This is used for non-multiplexed streams.
Return a generator of data read from the socket when the tty setting is
enabled.
"""
while True:
result = read(socket)
......@@ -98,3 +117,53 @@ def socket_raw_iter(socket):
# We have reached EOF
return
yield result
def consume_socket_output(frames, demux=False):
"""
Iterate through frames read from the socket and return the result.
Args:
demux (bool):
If False, stdout and stderr are multiplexed, and the result is the
concatenation of all the frames. If True, the streams are
demultiplexed, and the result is a 2-tuple where each item is the
concatenation of frames belonging to the same stream.
"""
if demux is False:
# If the streams are multiplexed, the generator returns strings, that
# we just need to concatenate.
return six.binary_type().join(frames)
# If the streams are demultiplexed, the generator yields tuples
# (stdout, stderr)
out = [None, None]
for frame in frames:
# It is guaranteed that for each frame, one and only one stream
# is not None.
assert frame != (None, None)
if frame[0] is not None:
if out[0] is None:
out[0] = frame[0]
else:
out[0] += frame[0]
else:
if out[1] is None:
out[1] = frame[1]
else:
out[1] += frame[1]
return tuple(out)
def demux_adaptor(stream_id, data):
"""
Utility to demultiplex stdout and stderr when reading frames from the
socket.
"""
if stream_id == STDOUT:
return (data, None)
elif stream_id == STDERR:
return (None, data)
else:
raise ValueError('{0} is not a valid stream'.format(stream_id))
......@@ -7,7 +7,7 @@ from datetime import datetime
import docker
from docker.constants import IS_WINDOWS_PLATFORM
from docker.utils.socket import next_frame_size
from docker.utils.socket import next_frame_header
from docker.utils.socket import read_exactly
import pytest
......@@ -1242,7 +1242,8 @@ class AttachContainerTest(BaseAPIIntegrationTest):
self.client.start(container)
next_size = next_frame_size(pty_stdout)
(stream, next_size) = next_frame_header(pty_stdout)
assert stream == 1 # correspond to stdout
assert next_size == len(line)
data = read_exactly(pty_stdout, next_size)
assert data.decode('utf-8') == line
......
from docker.utils.socket import next_frame_size
from docker.utils.socket import next_frame_header
from docker.utils.socket import read_exactly
from .base import BaseAPIIntegrationTest, BUSYBOX
......@@ -75,6 +75,75 @@ class ExecTest(BaseAPIIntegrationTest):
res += chunk
assert res == b'hello\nworld\n'
def test_exec_command_demux(self):
container = self.client.create_container(
BUSYBOX, 'cat', detach=True, stdin_open=True)
id = container['Id']
self.client.start(id)
self.tmp_containers.append(id)
script = ' ; '.join([
# Write something on stdout
'echo hello out',
# Busybox's sleep does not handle sub-second times.
# This loops takes ~0.3 second to execute on my machine.
'for i in $(seq 1 50000); do echo $i>/dev/null; done',
# Write something on stderr
'echo hello err >&2'])
cmd = 'sh -c "{}"'.format(script)
# tty=False, stream=False, demux=False
res = self.client.exec_create(id, cmd)
exec_log = self.client.exec_start(res)
assert exec_log == b'hello out\nhello err\n'
# tty=False, stream=True, demux=False
res = self.client.exec_create(id, cmd)
exec_log = self.client.exec_start(res, stream=True)
assert next(exec_log) == b'hello out\n'
assert next(exec_log) == b'hello err\n'
with self.assertRaises(StopIteration):
next(exec_log)
# tty=False, stream=False, demux=True
res = self.client.exec_create(id, cmd)
exec_log = self.client.exec_start(res, demux=True)
assert exec_log == (b'hello out\n', b'hello err\n')
# tty=False, stream=True, demux=True
res = self.client.exec_create(id, cmd)
exec_log = self.client.exec_start(res, demux=True, stream=True)
assert next(exec_log) == (b'hello out\n', None)
assert next(exec_log) == (None, b'hello err\n')
with self.assertRaises(StopIteration):
next(exec_log)
# tty=True, stream=False, demux=False
res = self.client.exec_create(id, cmd, tty=True)
exec_log = self.client.exec_start(res)
assert exec_log == b'hello out\r\nhello err\r\n'
# tty=True, stream=True, demux=False
res = self.client.exec_create(id, cmd, tty=True)
exec_log = self.client.exec_start(res, stream=True)
assert next(exec_log) == b'hello out\r\n'
assert next(exec_log) == b'hello err\r\n'
with self.assertRaises(StopIteration):
next(exec_log)
# tty=True, stream=False, demux=True
res = self.client.exec_create(id, cmd, tty=True)
exec_log = self.client.exec_start(res, demux=True)
assert exec_log == (b'hello out\r\nhello err\r\n', None)
# tty=True, stream=True, demux=True
res = self.client.exec_create(id, cmd, tty=True)
exec_log = self.client.exec_start(res, demux=True, stream=True)
assert next(exec_log) == (b'hello out\r\n', None)
assert next(exec_log) == (b'hello err\r\n', None)
with self.assertRaises(StopIteration):
next(exec_log)
def test_exec_start_socket(self):
container = self.client.create_container(BUSYBOX, 'cat',
detach=True, stdin_open=True)
......@@ -91,7 +160,8 @@ class ExecTest(BaseAPIIntegrationTest):
socket = self.client.exec_start(exec_id, socket=True)
self.addCleanup(socket.close)
next_size = next_frame_size(socket)
(stream, next_size) = next_frame_header(socket)
assert stream == 1 # stdout (0 = stdin, 1 = stdout, 2 = stderr)
assert next_size == len(line)
data = read_exactly(socket, next_size)
assert data.decode('utf-8') == line
......
......@@ -15,6 +15,7 @@ from docker.api import APIClient
import requests
from requests.packages import urllib3
import six
import struct
from . import fake_api
......@@ -83,7 +84,7 @@ def fake_delete(self, url, *args, **kwargs):
return fake_request('DELETE', url, *args, **kwargs)
def fake_read_from_socket(self, response, stream, tty=False):
def fake_read_from_socket(self, response, stream, tty=False, demux=False):
return six.binary_type()
......@@ -467,24 +468,25 @@ class UnixSocketStreamTest(unittest.TestCase):
class TCPSocketStreamTest(unittest.TestCase):
text_data = b'''
stdout_data = b'''
Now, those children out there, they're jumping through the
flames in the hope that the god of the fire will make them fruitful.
Really, you can't blame them. After all, what girl would not prefer the
child of a god to that of some acne-scarred artisan?
'''
stderr_data = b'''
And what of the true God? To whose glory churches and monasteries have been
built on these islands for generations past? Now shall what of Him?
'''
def setUp(self):
self.server = six.moves.socketserver.ThreadingTCPServer(
('', 0), self.get_handler_class()
)
('', 0), self.get_handler_class())
self.thread = threading.Thread(target=self.server.serve_forever)
self.thread.setDaemon(True)
self.thread.start()
self.address = 'http://{}:{}'.format(
socket.gethostname(), self.server.server_address[1]
)
socket.gethostname(), self.server.server_address[1])
def tearDown(self):
self.server.shutdown()
......@@ -492,31 +494,95 @@ class TCPSocketStreamTest(unittest.TestCase):
self.thread.join()
def get_handler_class(self):
text_data = self.text_data
stdout_data = self.stdout_data
stderr_data = self.stderr_data
class Handler(six.moves.BaseHTTPServer.BaseHTTPRequestHandler, object):
def do_POST(self):
resp_data = self.get_resp_data()
self.send_response(101)
self.send_header(
'Content-Type', 'application/vnd.docker.raw-stream'
)
'Content-Type', 'application/vnd.docker.raw-stream')
self.send_header('Connection', 'Upgrade')
self.send_header('Upgrade', 'tcp')
self.end_headers()
self.wfile.flush()
time.sleep(0.2)
self.wfile.write(text_data)
self.wfile.write(resp_data)
self.wfile.flush()
def get_resp_data(self):
path = self.path.split('/')[-1]
if path == 'tty':
return stdout_data + stderr_data
elif path == 'no-tty':
data = b''
data += self.frame_header(1, stdout_data)
data += stdout_data
data += self.frame_header(2, stderr_data)
data += stderr_data
return data
else:
raise Exception('Unknown path {0}'.format(path))
@staticmethod
def frame_header(stream, data):
return struct.pack('>BxxxL', stream, len(data))
return Handler
def test_read_from_socket(self):
def request(self, stream=None, tty=None, demux=None):
assert stream is not None and tty is not None and demux is not None
with APIClient(base_url=self.address) as client:
resp = client._post(client._url('/dummy'), stream=True)
data = client._read_from_socket(resp, stream=True, tty=True)
results = b''.join(data)
assert results == self.text_data
if tty:
url = client._url('/tty')
else:
url = client._url('/no-tty')
resp = client._post(url, stream=True)
return client._read_from_socket(
resp, stream=stream, tty=tty, demux=demux)
def test_read_from_socket_1(self):
res = self.request(stream=True, tty=True, demux=False)
assert next(res) == self.stdout_data + self.stderr_data
with self.assertRaises(StopIteration):
next(res)
def test_read_from_socket_2(self):
res = self.request(stream=True, tty=True, demux=True)
assert next(res) == (self.stdout_data + self.stderr_data, None)
with self.assertRaises(StopIteration):
next(res)
def test_read_from_socket_3(self):
res = self.request(stream=True, tty=False, demux=False)
assert next(res) == self.stdout_data
assert next(res) == self.stderr_data
with self.assertRaises(StopIteration):
next(res)
def test_read_from_socket_4(self):
res = self.request(stream=True, tty=False, demux=True)
assert (self.stdout_data, None) == next(res)
assert (None, self.stderr_data) == next(res)
with self.assertRaises(StopIteration):
next(res)
def test_read_from_socket_5(self):
res = self.request(stream=False, tty=True, demux=False)
assert res == self.stdout_data + self.stderr_data
def test_read_from_socket_6(self):
res = self.request(stream=False, tty=True, demux=True)
assert res == (self.stdout_data + self.stderr_data, None)
def test_read_from_socket_7(self):
res = self.request(stream=False, tty=False, demux=False)
res == self.stdout_data + self.stderr_data
def test_read_from_socket_8(self):
res = self.request(stream=False, tty=False, demux=True)
assert res == (self.stdout_data, self.stderr_data)
class UserAgentTest(unittest.TestCase):
......
......@@ -419,7 +419,8 @@ class ContainerTest(unittest.TestCase):
workdir=None
)
client.api.exec_start.assert_called_with(
FAKE_EXEC_ID, detach=False, tty=False, stream=True, socket=False
FAKE_EXEC_ID, detach=False, tty=False, stream=True, socket=False,
demux=False,
)
def test_exec_run_failure(self):
......@@ -432,7 +433,8 @@ class ContainerTest(unittest.TestCase):
workdir=None
)
client.api.exec_start.assert_called_with(
FAKE_EXEC_ID, detach=False, tty=False, stream=False, socket=False
FAKE_EXEC_ID, detach=False, tty=False, stream=False, socket=False,
demux=False,
)
def test_export(self):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment