Kaydet (Commit) dd743db4 authored tarafından Viktor Adam's avatar Viktor Adam Kaydeden (comit) Joffrey F

Allow cancelling the streams from other threads

Signed-off-by: 's avatarViktor Adam <rycus86@gmail.com>
üst d310d95f
......@@ -5,7 +5,8 @@ from .. import errors
from .. import utils
from ..constants import DEFAULT_DATA_CHUNK_SIZE
from ..types import (
ContainerConfig, EndpointConfig, HostConfig, NetworkingConfig
CancellableStream, ContainerConfig, EndpointConfig, HostConfig,
NetworkingConfig
)
......@@ -52,10 +53,15 @@ class ContainerApiMixin(object):
u = self._url("/containers/{0}/attach", container)
response = self._post(u, headers=headers, params=params, stream=True)
return self._read_from_socket(
output = self._read_from_socket(
response, stream, self._check_is_tty(container)
)
if stream:
return CancellableStream(output, response)
else:
return output
@utils.check_resource('container')
def attach_socket(self, container, params=None, ws=False):
"""
......@@ -815,7 +821,12 @@ class ContainerApiMixin(object):
url = self._url("/containers/{0}/logs", container)
res = self._get(url, params=params, stream=stream)
return self._get_result(container, stream, res)
output = self._get_result(container, stream, res)
if stream:
return CancellableStream(output, res)
else:
return output
@utils.check_resource('container')
def pause(self, container):
......
import os
from datetime import datetime
from .. import auth, utils
from .. import auth, types, utils
class DaemonApiMixin(object):
......@@ -34,8 +34,7 @@ class DaemonApiMixin(object):
the fly. False by default.
Returns:
(generator): A blocking generator you can iterate over to retrieve
events as they happen.
A :py:class:`docker.types.daemon.CancellableStream` generator
Raises:
:py:class:`docker.errors.APIError`
......@@ -50,6 +49,14 @@ class DaemonApiMixin(object):
u'status': u'start',
u'time': 1423339459}
...
or
>>> events = client.events()
>>> for event in events:
... print event
>>> # and cancel from another thread
>>> events.close()
"""
if isinstance(since, datetime):
......@@ -68,10 +75,10 @@ class DaemonApiMixin(object):
}
url = self._url('/events')
return self._stream_helper(
self._get(url, params=params, stream=True, timeout=None),
decode=decode
)
response = self._get(url, params=params, stream=True, timeout=None)
stream = self._stream_helper(response, decode=decode)
return types.CancellableStream(stream, response)
def info(self):
"""
......
# flake8: noqa
from .containers import ContainerConfig, HostConfig, LogConfig, Ulimit
from .daemon import CancellableStream
from .healthcheck import Healthcheck
from .networks import EndpointConfig, IPAMConfig, IPAMPool, NetworkingConfig
from .services import (
......
import socket
try:
import requests.packages.urllib3 as urllib3
except ImportError:
import urllib3
class CancellableStream(object):
"""
Stream wrapper for real-time events, logs, etc. from the server.
Example:
>>> events = client.events()
>>> for event in events:
... print event
>>> # and cancel from another thread
>>> events.close()
"""
def __init__(self, stream, response):
self._stream = stream
self._response = response
def __iter__(self):
return self
def __next__(self):
try:
return next(self._stream)
except urllib3.exceptions.ProtocolError:
raise StopIteration
except socket.error:
raise StopIteration
next = __next__
def close(self):
"""
Closes the event streaming.
"""
if not self._response.raw.closed:
# find the underlying socket object
# based on api.client._get_raw_response_socket
sock_fp = self._response.raw._fp.fp
if hasattr(sock_fp, 'raw'):
sock_raw = sock_fp.raw
if hasattr(sock_raw, 'sock'):
sock = sock_raw.sock
elif hasattr(sock_raw, '_sock'):
sock = sock_raw._sock
else:
sock = sock_fp._sock
sock.shutdown(socket.SHUT_RDWR)
sock.makefile().close()
sock.close()
......@@ -2,6 +2,7 @@ import os
import re
import signal
import tempfile
import threading
from datetime import datetime
import docker
......@@ -880,6 +881,30 @@ Line2'''
assert logs == (snippet + '\n').encode(encoding='ascii')
def test_logs_streaming_and_follow_and_cancel(self):
snippet = 'Flowering Nights (Sakuya Iyazoi)'
container = self.client.create_container(
BUSYBOX, 'sh -c "echo \\"{0}\\" && sleep 3"'.format(snippet)
)
id = container['Id']
self.tmp_containers.append(id)
self.client.start(id)
logs = six.binary_type()
generator = self.client.logs(id, stream=True, follow=True)
exit_timer = threading.Timer(3, os._exit, args=[1])
exit_timer.start()
threading.Timer(1, generator.close).start()
for chunk in generator:
logs += chunk
exit_timer.cancel()
assert logs == (snippet + '\n').encode(encoding='ascii')
def test_logs_with_dict_instead_of_id(self):
snippet = 'Flowering Nights (Sakuya Iyazoi)'
container = self.client.create_container(
......@@ -1226,6 +1251,29 @@ class AttachContainerTest(BaseAPIIntegrationTest):
output = self.client.attach(container, stream=False, logs=True)
assert output == 'hello\n'.encode(encoding='ascii')
def test_attach_stream_and_cancel(self):
container = self.client.create_container(
BUSYBOX, 'sh -c "echo hello && sleep 60"',
tty=True
)
self.tmp_containers.append(container)
self.client.start(container)
output = self.client.attach(container, stream=True, logs=True)
exit_timer = threading.Timer(3, os._exit, args=[1])
exit_timer.start()
threading.Timer(1, output.close).start()
lines = []
for line in output:
lines.append(line)
exit_timer.cancel()
assert len(lines) == 1
assert lines[0] == 'hello\r\n'.encode(encoding='ascii')
def test_detach_with_default(self):
container = self.client.create_container(
BUSYBOX, 'cat',
......
import threading
import unittest
import docker
from datetime import datetime, timedelta
from ..helpers import requires_api_version
from .base import TEST_API_VERSION
......@@ -27,3 +30,20 @@ class ClientTest(unittest.TestCase):
assert 'Containers' in data
assert 'Volumes' in data
assert 'Images' in data
class CancellableEventsTest(unittest.TestCase):
client = docker.from_env(version=TEST_API_VERSION)
def test_cancel_events(self):
start = datetime.now()
events = self.client.events(until=start + timedelta(seconds=5))
cancel_thread = threading.Timer(2, events.close)
cancel_thread.start()
for _ in events:
pass
self.assertLess(datetime.now() - start, timedelta(seconds=3))
import os
import tempfile
import threading
import docker
import pytest
......@@ -141,6 +143,25 @@ class ContainerCollectionTest(BaseIntegrationTest):
assert logs[0] == b'hello\n'
assert logs[1] == b'world\n'
def test_run_with_streamed_logs_and_cancel(self):
client = docker.from_env(version=TEST_API_VERSION)
out = client.containers.run(
'alpine', 'sh -c "echo hello && echo world"', stream=True
)
exit_timer = threading.Timer(3, os._exit, args=[1])
exit_timer.start()
threading.Timer(1, out.close).start()
logs = [line for line in out]
exit_timer.cancel()
assert len(logs) == 2
assert logs[0] == b'hello\n'
assert logs[1] == b'world\n'
def test_get(self):
client = docker.from_env(version=TEST_API_VERSION)
container = client.containers.run("alpine", "sleep 300", detach=True)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment