Kaydet (Commit) 76ed9c37 authored tarafından Joffrey F's avatar Joffrey F Kaydeden (comit) Aanand Prasad

Read from socket after sending TCP upgrade headers.

Signed-off-by: 's avatarJoffrey F <joffrey@docker.com>
üst 5464cf2b
...@@ -26,7 +26,7 @@ class ContainerApiMixin(object): ...@@ -26,7 +26,7 @@ class ContainerApiMixin(object):
u = self._url("/containers/{0}/attach", container) u = self._url("/containers/{0}/attach", container)
response = self._post(u, headers=headers, params=params, stream=stream) response = self._post(u, headers=headers, params=params, stream=stream)
return self._get_result(container, stream, response) return self._read_from_socket(response, stream)
@utils.check_resource @utils.check_resource
def attach_socket(self, container, params=None, ws=False): def attach_socket(self, container, params=None, ws=False):
...@@ -40,9 +40,18 @@ class ContainerApiMixin(object): ...@@ -40,9 +40,18 @@ class ContainerApiMixin(object):
if ws: if ws:
return self._attach_websocket(container, params) return self._attach_websocket(container, params)
headers = {
'Connection': 'Upgrade',
'Upgrade': 'tcp'
}
u = self._url("/containers/{0}/attach", container) u = self._url("/containers/{0}/attach", container)
return self._get_raw_response_socket(self.post( return self._get_raw_response_socket(
u, None, params=self._attach_params(params), stream=True)) self.post(
u, None, params=self._attach_params(params), stream=True,
headers=headers
)
)
@utils.check_resource @utils.check_resource
def commit(self, container, repository=None, tag=None, message=None, def commit(self, container, repository=None, tag=None, message=None,
......
...@@ -56,8 +56,6 @@ class ExecApiMixin(object): ...@@ -56,8 +56,6 @@ class ExecApiMixin(object):
def exec_start(self, exec_id, detach=False, tty=False, stream=False, def exec_start(self, exec_id, detach=False, tty=False, stream=False,
socket=False): socket=False):
# we want opened socket if socket == True # we want opened socket if socket == True
if socket:
stream = True
if isinstance(exec_id, dict): if isinstance(exec_id, dict):
exec_id = exec_id.get('Id') exec_id = exec_id.get('Id')
...@@ -75,9 +73,9 @@ class ExecApiMixin(object): ...@@ -75,9 +73,9 @@ class ExecApiMixin(object):
self._url('/exec/{0}/start', exec_id), self._url('/exec/{0}/start', exec_id),
headers=headers, headers=headers,
data=data, data=data,
stream=stream stream=True
) )
if socket: if socket:
return self._get_raw_response_socket(res) return self._get_raw_response_socket(res)
return self._get_result_tty(stream, res, tty) return self._read_from_socket(res, stream)
...@@ -12,7 +12,10 @@ ...@@ -12,7 +12,10 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import errno
import json import json
import os
import select
import struct import struct
import requests import requests
...@@ -305,6 +308,53 @@ class Client( ...@@ -305,6 +308,53 @@ class Client(
for out in response.iter_content(chunk_size=1, decode_unicode=True): for out in response.iter_content(chunk_size=1, decode_unicode=True):
yield out yield out
def _read_from_socket(self, response, stream):
def read_socket(socket, n=4096):
recoverable_errors = (
errno.EINTR, errno.EDEADLK, errno.EWOULDBLOCK
)
# wait for data to become available
select.select([socket], [], [])
try:
if hasattr(socket, 'recv'):
return socket.recv(n)
return os.read(socket.fileno(), n)
except EnvironmentError as e:
if e.errno not in recoverable_errors:
raise
def next_packet_size(socket):
data = six.binary_type()
while len(data) < 8:
next_data = read_socket(socket, 8 - len(data))
if not next_data:
return 0
data = data + next_data
if data is None:
return 0
if len(data) == 8:
_, actual = struct.unpack('>BxxxL', data)
return actual
def read_loop(socket):
n = next_packet_size(socket)
while n > 0:
yield read_socket(socket, n)
n = next_packet_size(socket)
socket = self._get_raw_response_socket(response)
if stream:
return read_loop(socket)
else:
data = six.binary_type()
for d in read_loop(socket):
data += d
return data
def _disable_socket_timeout(self, socket): def _disable_socket_timeout(self, socket):
""" Depending on the combination of python version and whether we're """ Depending on the combination of python version and whether we're
connecting over http or https, we might need to access _sock, which connecting over http or https, we might need to access _sock, which
......
...@@ -54,7 +54,7 @@ def exec_driver_is_native(): ...@@ -54,7 +54,7 @@ def exec_driver_is_native():
c = docker_client() c = docker_client()
EXEC_DRIVER = c.info()['ExecutionDriver'] EXEC_DRIVER = c.info()['ExecutionDriver']
c.close() c.close()
return EXEC_DRIVER.startswith('native') return EXEC_DRIVER.startswith('native') or EXEC_DRIVER == ''
def docker_client(**kwargs): def docker_client(**kwargs):
...@@ -105,7 +105,7 @@ def read_data(socket, packet_size): ...@@ -105,7 +105,7 @@ def read_data(socket, packet_size):
while len(data) < packet_size: while len(data) < packet_size:
next_data = read_socket(socket, packet_size - len(data)) next_data = read_socket(socket, packet_size - len(data))
if not next_data: if not next_data:
assert False, "Failed trying to read in the dataz" assert False, "Failed trying to read in the data"
data += next_data data += next_data
return data return data
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment