Kaydet (Commit) 45cde4a0 authored tarafından Maxime Petazzoni's avatar Maxime Petazzoni

Fix build() streaming and stream methods cleanup

Signed-off-by: 's avatarMaxime Petazzoni <max@signalfuse.com>
üst af1bb34d
...@@ -226,21 +226,13 @@ class Client(requests.Session): ...@@ -226,21 +226,13 @@ class Client(requests.Session):
def _create_websocket_connection(self, url): def _create_websocket_connection(self, url):
return websocket.create_connection(url) return websocket.create_connection(url)
def _stream_result(self, response): def _get_raw_response_socket(self, response):
"""Generator for straight-out, non chunked-encoded HTTP responses."""
self._raise_for_status(response)
for line in response.iter_lines(chunk_size=1, decode_unicode=True):
# filter out keep-alive new lines
if line:
yield line + '\n'
def _stream_result_socket(self, response):
self._raise_for_status(response) self._raise_for_status(response)
return response.raw._fp.fp._sock return response.raw._fp.fp._sock
def _stream_helper(self, response): def _stream_helper(self, response):
"""Generator for data coming from a chunked-encoded HTTP response.""" """Generator for data coming from a chunked-encoded HTTP response."""
socket_fp = self._stream_result_socket(response) socket_fp = self._get_raw_response_socket(response)
socket_fp.setblocking(1) socket_fp.setblocking(1)
socket = socket_fp.makefile() socket = socket_fp.makefile()
while True: while True:
...@@ -269,7 +261,7 @@ class Client(requests.Session): ...@@ -269,7 +261,7 @@ class Client(requests.Session):
def _multiplexed_socket_stream_helper(self, response): def _multiplexed_socket_stream_helper(self, response):
"""A generator of multiplexed data blocks coming from a response """A generator of multiplexed data blocks coming from a response
socket.""" socket."""
socket = self._stream_result_socket(response) socket = self._get_raw_response_socket(response)
def recvall(socket, size): def recvall(socket, size):
data = '' data = ''
...@@ -308,9 +300,18 @@ class Client(requests.Session): ...@@ -308,9 +300,18 @@ class Client(requests.Session):
u = self._url("/containers/{0}/attach".format(container)) u = self._url("/containers/{0}/attach".format(container))
response = self._post(u, params=params, stream=stream) response = self._post(u, params=params, stream=stream)
# Stream multi-plexing was introduced in API v1.6. # Stream multi-plexing was only introduced in API v1.6. Anything before
# that needs old-style streaming.
if utils.compare_version('1.6', self._version) < 0: if utils.compare_version('1.6', self._version) < 0:
return stream and self._stream_result(response) or \ def stream_result():
self._raise_for_status(response)
for line in response.iter_lines(chunk_size=1,
decode_unicode=True):
# filter out keep-alive new lines
if line:
yield line
return stream and stream_result(response) or \
self._result(response, binary=True) self._result(response, binary=True)
return stream and self._multiplexed_socket_stream_helper(response) or \ return stream and self._multiplexed_socket_stream_helper(response) or \
...@@ -323,13 +324,15 @@ class Client(requests.Session): ...@@ -323,13 +324,15 @@ class Client(requests.Session):
'stderr': 1, 'stderr': 1,
'stream': 1 'stream': 1
} }
if ws: if ws:
return self._attach_websocket(container, params) return self._attach_websocket(container, params)
if isinstance(container, dict): if isinstance(container, dict):
container = container.get('Id') container = container.get('Id')
u = self._url("/containers/{0}/attach".format(container)) u = self._url("/containers/{0}/attach".format(container))
return self._stream_result_socket(self.post( return self._get_raw_response_socket(self.post(
u, None, params=self._attach_params(params), stream=True)) u, None, params=self._attach_params(params), stream=True))
def build(self, path=None, tag=None, quiet=False, fileobj=None, def build(self, path=None, tag=None, quiet=False, fileobj=None,
...@@ -367,8 +370,9 @@ class Client(requests.Session): ...@@ -367,8 +370,9 @@ class Client(requests.Session):
if context is not None: if context is not None:
context.close() context.close()
if stream or utils.compare_version('1.8', self._version) >= 0: if stream or utils.compare_version('1.8', self._version) >= 0:
return self._stream_result(response) return self._stream_helper(response)
else: else:
output = self._result(response) output = self._result(response)
srch = r'Successfully built ([0-9a-f]+)' srch = r'Successfully built ([0-9a-f]+)'
...@@ -446,14 +450,13 @@ class Client(requests.Session): ...@@ -446,14 +450,13 @@ class Client(requests.Session):
format(container))), True) format(container))), True)
def events(self): def events(self):
u = self._url("/events") socket = self._get_raw_response_socket(self.get(self._url('/events'),
stream=True))
socket = self._stream_result_socket(self.get(u, stream=True))
while True: while True:
chunk = socket.recv(4096) chunk = socket.recv(4096)
if chunk: if chunk:
# Messages come in the format of length, data, newline. # Messages come in the format of length, data, newline.
# XXX: do they really?
length, data = chunk.split("\n", 1) length, data = chunk.split("\n", 1)
length = int(length, 16) length = int(length, 16)
if length > len(data): if length > len(data):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment