Kaydet (Commit) 9676feba authored tarafından Davanum Srinivas's avatar Davanum Srinivas

Fix to enable streaming container logs reliably

Started a ubuntu container that just runs "ping 8.8.8.8" and tried
the sample code in https://gist.github.com/dims/c3327f633c526847c8e5
to recreate the problem mentioned in:
https://github.com/docker/docker-py/issues/300

To debug the problem i printed the byte array read in recvall
when reading STREAM_HEADER_SIZE_BYTES and realized that the data
being read was far ahead of the actual start of the header documented
in the vnd.docker.raw-stream of the docker remote api. This is
possibly because the requests/urllib3 is reading ahead a bit more
and we shouldn't be trying to hack the internals of those projects.
So just using the documented file-like response.raw is good enough
for us to get the functionality we need which is being able to
read for exactly where the stream header starts. With this change
i can reliably stream the logs just like "docker logs --follow".

Note that we still need to access the underlying socket to set
the timeout to prevent read time outs. The original fix was for
client.logs() only but on further review it made sense to replace
all occurances of _multiplexed_socket_stream_helper with the
new method.
üst d3a2d900
...@@ -320,40 +320,26 @@ class Client(requests.Session): ...@@ -320,40 +320,26 @@ class Client(requests.Session):
walker = end walker = end
yield buf[start:end] yield buf[start:end]
def _multiplexed_socket_stream_helper(self, response): def _multiplexed_response_stream_helper(self, response):
"""A generator of multiplexed data blocks coming from a response """A generator of multiplexed data blocks coming from a response
socket.""" stream."""
socket = self._get_raw_response_socket(response)
def recvall(socket, size):
blocks = []
while size > 0:
if six.PY3:
block = socket._sock.recv(size)
else:
block = socket.recv(size)
if not block:
return None
blocks.append(block)
size -= len(block)
sep = bytes() if six.PY3 else str() # Disable timeout on the underlying socket to prevent
data = sep.join(blocks) # Read timed out(s) for long running processes
return data socket = self._get_raw_response_socket(response)
if six.PY3:
socket._sock.settimeout(None)
else:
socket.settimeout(None)
while True: while True:
if six.PY3: header = response.raw.read(STREAM_HEADER_SIZE_BYTES)
socket._sock.settimeout(None)
else:
socket.settimeout(None)
header = recvall(socket, STREAM_HEADER_SIZE_BYTES)
if not header: if not header:
break break
_, length = struct.unpack('>BxxxL', header) _, length = struct.unpack('>BxxxL', header)
if not length: if not length:
break break
data = recvall(socket, length) data = response.raw.read(length)
if not data: if not data:
break break
yield data yield data
...@@ -387,7 +373,7 @@ class Client(requests.Session): ...@@ -387,7 +373,7 @@ class Client(requests.Session):
sep = bytes() if six.PY3 else str() sep = bytes() if six.PY3 else str()
return stream and self._multiplexed_socket_stream_helper(response) or \ return stream and self._multiplexed_response_stream_helper(response) or \
sep.join([x for x in self._multiplexed_buffer_helper(response)]) sep.join([x for x in self._multiplexed_buffer_helper(response)])
def attach_socket(self, container, params=None, ws=False): def attach_socket(self, container, params=None, ws=False):
...@@ -604,7 +590,7 @@ class Client(requests.Session): ...@@ -604,7 +590,7 @@ class Client(requests.Session):
data=data, stream=stream) data=data, stream=stream)
self._raise_for_status(res) self._raise_for_status(res)
if stream: if stream:
return self._multiplexed_socket_stream_helper(res) return self._multiplexed_response_stream_helper(res)
elif six.PY3: elif six.PY3:
return bytes().join( return bytes().join(
[x for x in self._multiplexed_buffer_helper(res)] [x for x in self._multiplexed_buffer_helper(res)]
...@@ -774,7 +760,7 @@ class Client(requests.Session): ...@@ -774,7 +760,7 @@ class Client(requests.Session):
url = self._url("/containers/{0}/logs".format(container)) url = self._url("/containers/{0}/logs".format(container))
res = self._get(url, params=params, stream=stream) res = self._get(url, params=params, stream=stream)
if stream: if stream:
return self._multiplexed_socket_stream_helper(res) return self._multiplexed_response_stream_helper(res)
elif six.PY3: elif six.PY3:
return bytes().join( return bytes().join(
[x for x in self._multiplexed_buffer_helper(res)] [x for x in self._multiplexed_buffer_helper(res)]
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment