Kaydet (Commit) c76ec15d authored tarafından Joffrey F's avatar Joffrey F

Several fixes to npipe support

- Fix _get_raw_response_socket to always return the NpipeSocket object
- Override NpipeHTTPConnectionPool._get_conn to avoid crash in urllib3
- Fix NpipeSocket.recv_into for Python 2
- Do not call select() on NpipeSocket objects
Signed-off-by: 's avatarJoffrey F <joffrey@docker.com>
üst 6f7392ea
...@@ -220,7 +220,9 @@ class Client( ...@@ -220,7 +220,9 @@ class Client(
def _get_raw_response_socket(self, response): def _get_raw_response_socket(self, response):
self._raise_for_status(response) self._raise_for_status(response)
if six.PY3: if self.base_url == "http+docker://localnpipe":
sock = response.raw._fp.fp.raw.sock
elif six.PY3:
sock = response.raw._fp.fp.raw sock = response.raw._fp.fp.raw
if self.base_url.startswith("https://"): if self.base_url.startswith("https://"):
sock = sock._sock sock = sock._sock
......
...@@ -2,5 +2,6 @@ ...@@ -2,5 +2,6 @@
from .unixconn import UnixAdapter from .unixconn import UnixAdapter
try: try:
from .npipeconn import NpipeAdapter from .npipeconn import NpipeAdapter
from .npipesocket import NpipeSocket
except ImportError: except ImportError:
pass pass
...@@ -14,7 +14,6 @@ try: ...@@ -14,7 +14,6 @@ try:
except ImportError: except ImportError:
import urllib3 import urllib3
RecentlyUsedContainer = urllib3._collections.RecentlyUsedContainer RecentlyUsedContainer = urllib3._collections.RecentlyUsedContainer
...@@ -46,6 +45,28 @@ class NpipeHTTPConnectionPool(urllib3.connectionpool.HTTPConnectionPool): ...@@ -46,6 +45,28 @@ class NpipeHTTPConnectionPool(urllib3.connectionpool.HTTPConnectionPool):
self.npipe_path, self.timeout self.npipe_path, self.timeout
) )
# When re-using connections, urllib3 tries to call select() on our
# NpipeSocket instance, causing a crash. To circumvent this, we override
# _get_conn, where that check happens.
def _get_conn(self, timeout):
conn = None
try:
conn = self.pool.get(block=self.block, timeout=timeout)
except AttributeError: # self.pool is None
raise urllib3.exceptions.ClosedPoolError(self, "Pool is closed.")
except six.moves.queue.Empty:
if self.block:
raise urllib3.exceptions.EmptyPoolError(
self,
"Pool reached maximum size and no more "
"connections are allowed."
)
pass # Oh well, we'll create a new connection then
return conn or self._new_conn()
class NpipeAdapter(requests.adapters.HTTPAdapter): class NpipeAdapter(requests.adapters.HTTPAdapter):
def __init__(self, base_url, timeout=60, def __init__(self, base_url, timeout=60,
......
import functools import functools
import io import io
import six
import win32file import win32file
import win32pipe import win32pipe
...@@ -115,6 +116,9 @@ class NpipeSocket(object): ...@@ -115,6 +116,9 @@ class NpipeSocket(object):
@check_closed @check_closed
def recv_into(self, buf, nbytes=0): def recv_into(self, buf, nbytes=0):
if six.PY2:
return self._recv_into_py2(buf, nbytes)
readbuf = buf readbuf = buf
if not isinstance(buf, memoryview): if not isinstance(buf, memoryview):
readbuf = memoryview(buf) readbuf = memoryview(buf)
...@@ -125,6 +129,12 @@ class NpipeSocket(object): ...@@ -125,6 +129,12 @@ class NpipeSocket(object):
) )
return len(data) return len(data)
def _recv_into_py2(self, buf, nbytes):
err, data = win32file.ReadFile(self._handle, nbytes or len(buf))
n = len(data)
buf[:n] = data
return n
@check_closed @check_closed
def send(self, string, flags=0): def send(self, string, flags=0):
err, nbytes = win32file.WriteFile(self._handle, string) err, nbytes = win32file.WriteFile(self._handle, string)
......
...@@ -5,6 +5,11 @@ import struct ...@@ -5,6 +5,11 @@ import struct
import six import six
try:
from ..transport import NpipeSocket
except ImportError:
NpipeSocket = type(None)
class SocketError(Exception): class SocketError(Exception):
pass pass
...@@ -14,10 +19,12 @@ def read(socket, n=4096): ...@@ -14,10 +19,12 @@ def read(socket, n=4096):
""" """
Reads at most n bytes from socket Reads at most n bytes from socket
""" """
recoverable_errors = (errno.EINTR, errno.EDEADLK, errno.EWOULDBLOCK) recoverable_errors = (errno.EINTR, errno.EDEADLK, errno.EWOULDBLOCK)
# wait for data to become available # wait for data to become available
select.select([socket], [], []) if not isinstance(socket, NpipeSocket):
select.select([socket], [], [])
try: try:
if hasattr(socket, 'recv'): if hasattr(socket, 'recv'):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment