npipesocket.py 5.65 KB
Newer Older
1 2 3
import functools
import io

4
import six
5 6 7
import win32file
import win32pipe

8
cERROR_PIPE_BUSY = 0xe7
9 10
cSECURITY_SQOS_PRESENT = 0x100000
cSECURITY_ANONYMOUS = 0
11 12

RETRY_WAIT_TIMEOUT = 10000
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31


def check_closed(f):
    @functools.wraps(f)
    def wrapped(self, *args, **kwargs):
        if self._closed:
            raise RuntimeError(
                'Can not reuse socket after connection was closed.'
            )
        return f(self, *args, **kwargs)
    return wrapped


class NpipeSocket(object):
    """ Partial implementation of the socket API over windows named pipes.
        This implementation is only designed to be used as a client socket,
        and server-specific methods (bind, listen, accept...) are not
        implemented.
    """
Ben Firshman's avatar
Ben Firshman committed
32

33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
    def __init__(self, handle=None):
        self._timeout = win32pipe.NMPWAIT_USE_DEFAULT_WAIT
        self._handle = handle
        self._closed = False

    def accept(self):
        raise NotImplementedError()

    def bind(self, address):
        raise NotImplementedError()

    def close(self):
        self._handle.Close()
        self._closed = True

    @check_closed
    def connect(self, address):
        win32pipe.WaitNamedPipe(address, self._timeout)
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
        try:
            handle = win32file.CreateFile(
                address,
                win32file.GENERIC_READ | win32file.GENERIC_WRITE,
                0,
                None,
                win32file.OPEN_EXISTING,
                cSECURITY_ANONYMOUS | cSECURITY_SQOS_PRESENT,
                0
            )
        except win32pipe.error as e:
            # See Remarks:
            # https://msdn.microsoft.com/en-us/library/aa365800.aspx
            if e.winerror == cERROR_PIPE_BUSY:
                # Another program or thread has grabbed our pipe instance
                # before we got to it. Wait for availability and attempt to
                # connect again.
                win32pipe.WaitNamedPipe(address, RETRY_WAIT_TIMEOUT)
                return self.connect(address)
            raise e

72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
        self.flags = win32pipe.GetNamedPipeInfo(handle)[0]

        self._handle = handle
        self._address = address

    @check_closed
    def connect_ex(self, address):
        return self.connect(address)

    @check_closed
    def detach(self):
        self._closed = True
        return self._handle

    @check_closed
    def dup(self):
        return NpipeSocket(self._handle)

    def getpeername(self):
        return self._address

    def getsockname(self):
        return self._address

    def getsockopt(self, level, optname, buflen=None):
        raise NotImplementedError()

    def ioctl(self, control, option):
        raise NotImplementedError()

    def listen(self, backlog):
        raise NotImplementedError()

    def makefile(self, mode=None, bufsize=None):
        if mode.strip('b') != 'r':
            raise NotImplementedError()
        rawio = NpipeFileIOBase(self)
109
        if bufsize is None or bufsize <= 0:
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128
            bufsize = io.DEFAULT_BUFFER_SIZE
        return io.BufferedReader(rawio, buffer_size=bufsize)

    @check_closed
    def recv(self, bufsize, flags=0):
        err, data = win32file.ReadFile(self._handle, bufsize)
        return data

    @check_closed
    def recvfrom(self, bufsize, flags=0):
        data = self.recv(bufsize, flags)
        return (data, self._address)

    @check_closed
    def recvfrom_into(self, buf, nbytes=0, flags=0):
        return self.recv_into(buf, nbytes, flags), self._address

    @check_closed
    def recv_into(self, buf, nbytes=0):
129 130 131
        if six.PY2:
            return self._recv_into_py2(buf, nbytes)

132 133 134 135 136 137 138 139 140 141
        readbuf = buf
        if not isinstance(buf, memoryview):
            readbuf = memoryview(buf)

        err, data = win32file.ReadFile(
            self._handle,
            readbuf[:nbytes] if nbytes else readbuf
        )
        return len(data)

142 143 144 145 146 147
    def _recv_into_py2(self, buf, nbytes):
        err, data = win32file.ReadFile(self._handle, nbytes or len(buf))
        n = len(data)
        buf[:n] = data
        return n

148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168
    @check_closed
    def send(self, string, flags=0):
        err, nbytes = win32file.WriteFile(self._handle, string)
        return nbytes

    @check_closed
    def sendall(self, string, flags=0):
        return self.send(string, flags)

    @check_closed
    def sendto(self, string, address):
        self.connect(address)
        return self.send(string)

    def setblocking(self, flag):
        if flag:
            return self.settimeout(None)
        return self.settimeout(0)

    def settimeout(self, value):
        if value is None:
169 170
            # Blocking mode
            self._timeout = win32pipe.NMPWAIT_WAIT_FOREVER
171 172 173
        elif not isinstance(value, (float, int)) or value < 0:
            raise ValueError('Timeout value out of range')
        elif value == 0:
174 175
            # Non-blocking mode
            self._timeout = win32pipe.NMPWAIT_NO_WAIT
176
        else:
177 178
            # Timeout mode - Value converted to milliseconds
            self._timeout = value * 1000
179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215

    def gettimeout(self):
        return self._timeout

    def setsockopt(self, level, optname, value):
        raise NotImplementedError()

    @check_closed
    def shutdown(self, how):
        return self.close()


class NpipeFileIOBase(io.RawIOBase):
    def __init__(self, npipe_socket):
        self.sock = npipe_socket

    def close(self):
        super(NpipeFileIOBase, self).close()
        self.sock = None

    def fileno(self):
        return self.sock.fileno()

    def isatty(self):
        return False

    def readable(self):
        return True

    def readinto(self, buf):
        return self.sock.recv_into(buf)

    def seekable(self):
        return False

    def writable(self):
        return False