Kaydet (Commit) 0f366bf7 authored tarafından Joffrey F's avatar Joffrey F

Merge pull request #167 from mpetazzoni/build-output

Fix build() and events() streaming
......@@ -226,21 +226,13 @@ class Client(requests.Session):
def _create_websocket_connection(self, url):
return websocket.create_connection(url)
def _stream_result(self, response):
"""Generator for straight-out, non chunked-encoded HTTP responses."""
self._raise_for_status(response)
for line in response.iter_lines(chunk_size=1, decode_unicode=True):
# filter out keep-alive new lines
if line:
yield line + '\n'
def _stream_result_socket(self, response):
def _get_raw_response_socket(self, response):
self._raise_for_status(response)
return response.raw._fp.fp._sock
def _stream_helper(self, response):
"""Generator for data coming from a chunked-encoded HTTP response."""
socket_fp = self._stream_result_socket(response)
socket_fp = self._get_raw_response_socket(response)
socket_fp.setblocking(1)
socket = socket_fp.makefile()
while True:
......@@ -269,7 +261,7 @@ class Client(requests.Session):
def _multiplexed_socket_stream_helper(self, response):
"""A generator of multiplexed data blocks coming from a response
socket."""
socket = self._stream_result_socket(response)
socket = self._get_raw_response_socket(response)
def recvall(socket, size):
data = ''
......@@ -308,9 +300,18 @@ class Client(requests.Session):
u = self._url("/containers/{0}/attach".format(container))
response = self._post(u, params=params, stream=stream)
# Stream multi-plexing was introduced in API v1.6.
# Stream multi-plexing was only introduced in API v1.6. Anything before
# that needs old-style streaming.
if utils.compare_version('1.6', self._version) < 0:
return stream and self._stream_result(response) or \
def stream_result():
self._raise_for_status(response)
for line in response.iter_lines(chunk_size=1,
decode_unicode=True):
# filter out keep-alive new lines
if line:
yield line
return stream and stream_result(response) or \
self._result(response, binary=True)
return stream and self._multiplexed_socket_stream_helper(response) or \
......@@ -323,13 +324,15 @@ class Client(requests.Session):
'stderr': 1,
'stream': 1
}
if ws:
return self._attach_websocket(container, params)
if isinstance(container, dict):
container = container.get('Id')
u = self._url("/containers/{0}/attach".format(container))
return self._stream_result_socket(self.post(
return self._get_raw_response_socket(self.post(
u, None, params=self._attach_params(params), stream=True))
def build(self, path=None, tag=None, quiet=False, fileobj=None,
......@@ -367,8 +370,9 @@ class Client(requests.Session):
if context is not None:
context.close()
if stream or utils.compare_version('1.8', self._version) >= 0:
return self._stream_result(response)
return self._stream_helper(response)
else:
output = self._result(response)
srch = r'Successfully built ([0-9a-f]+)'
......@@ -446,21 +450,7 @@ class Client(requests.Session):
format(container))), True)
def events(self):
u = self._url("/events")
socket = self._stream_result_socket(self.get(u, stream=True))
while True:
chunk = socket.recv(4096)
if chunk:
# Messages come in the format of length, data, newline.
length, data = chunk.split("\n", 1)
length = int(length, 16)
if length > len(data):
data += socket.recv(length - len(data))
yield json.loads(data)
else:
break
return self._stream_helper(self.get(self._url('/events'), stream=True))
def export(self, container):
if isinstance(container, dict):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment