Kaydet (Commit) 67f7a388 authored tarafından Guido van Rossum's avatar Guido van Rossum

SF patch 555085 (timeout socket implementation) by Michael Gilfix.

I've made considerable changes to Michael's code, specifically to use
the select() system call directly and to store the timeout as a C
double instead of a Python object; internally, -1.0 (or anything
negative) represents the None from the API.

I'm not 100% sure that all corner cases are covered correctly, so
please keep an eye on this.  Next I'm going to try it Windows before
Tim complains.

No way is this a bugfix candidate. :-)
üst c9a55776
...@@ -134,7 +134,8 @@ def getfqdn(name=''): ...@@ -134,7 +134,8 @@ def getfqdn(name=''):
_socketmethods = ( _socketmethods = (
'bind', 'connect', 'connect_ex', 'fileno', 'listen', 'bind', 'connect', 'connect_ex', 'fileno', 'listen',
'getpeername', 'getsockname', 'getsockopt', 'setsockopt', 'getpeername', 'getsockname', 'getsockopt', 'setsockopt',
'recv', 'recvfrom', 'send', 'sendall', 'sendto', 'setblocking', 'shutdown') 'recv', 'recvfrom', 'send', 'sendall', 'sendto', 'setblocking',
'settimeout', 'gettimeout', 'shutdown')
class _socketobject: class _socketobject:
...@@ -168,94 +169,108 @@ class _socketobject: ...@@ -168,94 +169,108 @@ class _socketobject:
class _fileobject: class _fileobject:
"""Implements a file object on top of a regular socket object."""
def __init__(self, sock, mode, bufsize): def __init__(self, sock, mode='rb', bufsize=8192):
self._sock = sock self._sock = sock
self._mode = mode self._mode = mode
if bufsize < 0: if bufsize <= 0:
bufsize = 512 bufsize = 512
self._rbufsize = max(1, bufsize) self._rbufsize = bufsize
self._wbufsize = bufsize self._wbufsize = bufsize
self._wbuf = self._rbuf = "" self._rbuf = [ ]
self._wbuf = [ ]
def close(self): def close(self):
try: try:
if self._sock: if self._sock:
self.flush() self.flush()
finally: finally:
self._sock = 0 self._sock = None
def __del__(self): def __del__(self):
self.close() self.close()
def flush(self): def flush(self):
if self._wbuf: if self._wbuf:
self._sock.sendall(self._wbuf) buffer = ''.join(self._wbuf)
self._wbuf = "" self._sock.sendall(buffer)
self._wbuf = [ ]
def fileno(self): def fileno (self):
return self._sock.fileno() return self._sock.fileno()
def write(self, data): def write(self, data):
self._wbuf = self._wbuf + data self._wbuf.append (data)
# A _wbufsize of 1 means we're doing unbuffered IO.
# Flush accordingly.
if self._wbufsize == 1: if self._wbufsize == 1:
if '\n' in data: if '\n' in data:
self.flush() self.flush ()
else: elif self.__get_wbuf_len() >= self._wbufsize:
if len(self._wbuf) >= self._wbufsize: self.flush()
self.flush()
def writelines(self, list): def writelines(self, list):
filter(self._sock.sendall, list) filter(self._sock.sendall, list)
self.flush() self.flush()
def read(self, n=-1): def __get_wbuf_len (self):
if n >= 0: buf_len = 0
k = len(self._rbuf) for i in [len(x) for x in self._wbuf]:
if n <= k: buf_len += i
data = self._rbuf[:n] return buf_len
self._rbuf = self._rbuf[n:]
return data def __get_rbuf_len(self):
n = n - k buf_len = 0
L = [self._rbuf] for i in [len(x) for x in self._rbuf]:
self._rbuf = "" buf_len += i
while n > 0: return buf_len
new = self._sock.recv(max(n, self._rbufsize))
if not new: break def read(self, size=-1):
k = len(new) buf_len = self.__get_rbuf_len()
if k > n: while size < 0 or buf_len < size:
L.append(new[:n]) recv_size = max(self._rbufsize, size - buf_len)
self._rbuf = new[n:] data = self._sock.recv(recv_size)
break if not data:
L.append(new) break
n = n - k buf_len += len(data)
return "".join(L) self._rbuf.append(data)
k = max(512, self._rbufsize) data = ''.join(self._rbuf)
L = [self._rbuf] # Clear the rbuf at the end so we're not affected by
self._rbuf = "" # an exception during a recv
while 1: self._rbuf = [ ]
new = self._sock.recv(k) if buf_len > size and size >= 0:
if not new: break self._rbuf.append(data[size:])
L.append(new) data = data[:size]
k = min(k*2, 1024**2) return data
return "".join(L)
def readline(self, size=-1):
def readline(self, limit=-1): index = -1
data = "" buf_len = self.__get_rbuf_len()
i = self._rbuf.find('\n') if len (self._rbuf):
while i < 0 and not (0 < limit <= len(self._rbuf)): index = min([x.find('\n') for x in self._rbuf])
new = self._sock.recv(self._rbufsize) while index < 0 and (size < 0 or buf_len < size):
if not new: break recv_size = max(self._rbufsize, size - buf_len)
i = new.find('\n') data = self._sock.recv(recv_size)
if i >= 0: i = i + len(self._rbuf) if not data:
self._rbuf = self._rbuf + new break
if i < 0: i = len(self._rbuf) buf_len += len(data)
else: i = i+1 self._rbuf.append(data)
if 0 <= limit < len(self._rbuf): i = limit index = data.find('\n')
data, self._rbuf = self._rbuf[:i], self._rbuf[i:] data = ''.join(self._rbuf)
self._rbuf = [ ]
index = data.find('\n')
if index >= 0:
index += 1
elif buf_len > size:
index = size
else:
index = buf_len
self._rbuf.append(data[index:])
data = data[:index]
return data return data
def readlines(self, sizehint = 0): def readlines(self, sizehint=0):
total = 0 total = 0
list = [] list = []
while 1: while 1:
......
...@@ -109,6 +109,7 @@ except socket.error: ...@@ -109,6 +109,7 @@ except socket.error:
canfork = hasattr(os, 'fork') canfork = hasattr(os, 'fork')
try: try:
PORT = 50007 PORT = 50007
msg = 'socket test\n'
if not canfork or os.fork(): if not canfork or os.fork():
# parent is server # parent is server
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
...@@ -133,13 +134,52 @@ try: ...@@ -133,13 +134,52 @@ try:
f = conn.makefile() f = conn.makefile()
if verbose: if verbose:
print 'file obj:', f print 'file obj:', f
data = conn.recv(1024)
if verbose:
print 'received:', data
conn.sendall(data)
# Perform a few tests on the windows file object
if verbose:
print "Staring _fileobject tests..."
f = socket._fileobject (conn, 'rb', 8192)
first_seg = f.read(7)
second_seg = f.read(5)
if not first_seg == 'socket ' or not second_seg == 'test\n':
print "Error performing read with the python _fileobject class"
os._exit (1)
elif verbose:
print "_fileobject buffered read works"
f.write (data)
f.flush ()
buf = ''
while 1: while 1:
data = conn.recv(1024) char = f.read(1)
if not data: if not char:
print "Error performing unbuffered read with the python ", \
"_fileobject class"
os._exit (1)
buf += char
if buf == msg:
if verbose:
print "__fileobject unbuffered read works"
break break
if verbose: if verbose:
print 'received:', data # If we got this far, write() must work as well
conn.sendall(data) print "__fileobject write works"
f.write(buf)
f.flush()
line = f.readline()
if not line == msg:
print "Error perferming readline with the python _fileobject class"
os._exit (1)
f.write(line)
f.flush()
if verbose:
print "__fileobject readline works"
conn.close() conn.close()
else: else:
try: try:
...@@ -149,11 +189,18 @@ try: ...@@ -149,11 +189,18 @@ try:
if verbose: if verbose:
print 'child connecting' print 'child connecting'
s.connect(("127.0.0.1", PORT)) s.connect(("127.0.0.1", PORT))
msg = 'socket test'
s.send(msg) iteration = 0
data = s.recv(1024) while 1:
if msg != data: s.send(msg)
print 'parent/client mismatch' data = s.recv(12)
if not data:
break
if msg != data:
print "parent/client mismatch. Failed in %s iteration. Received: [%s]" \
%(iteration, data)
time.sleep (1)
iteration += 1
s.close() s.close()
finally: finally:
os._exit(1) os._exit(1)
......
#!/home/bernie/src/python23/dist/src/python
import unittest
import time
import socket
class creationTestCase(unittest.TestCase):
"""Test Case for socket.gettimeout() and socket.settimeout()"""
def setUp(self):
self.__s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
def tearDown(self):
self.__s.close()
def testObjectCreation(self):
"Test Socket creation"
self.assertEqual(self.__s.gettimeout(), None,
"Timeout socket not default to disable (None)")
def testFloatReturnValue(self):
"Test return value of getter/setter"
self.__s.settimeout(7.345)
self.assertEqual(self.__s.gettimeout(), 7.345,
"settimeout() and gettimeout() return different result")
self.__s.settimeout(3)
self.assertEqual(self.__s.gettimeout(), 3,
"settimeout() and gettimeout() return different result")
def testReturnType(self):
"Test return type of getter/setter"
self.__s.settimeout(1)
self.assertEqual(type(self.__s.gettimeout()), type(1.0),
"return type of gettimeout() is not FloatType")
self.__s.settimeout(3.9)
self.assertEqual(type(self.__s.gettimeout()), type(1.0),
"return type of gettimeout() is not FloatType")
class timeoutTestCase(unittest.TestCase):
"""Test Case for socket.socket() timeout functions"""
def setUp(self):
self.__s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.__addr_remote = ('www.google.com', 80)
self.__addr_local = ('127.0.0.1', 25339)
def tearDown(self):
self.__s.close()
def testConnectTimeout(self):
"Test connect() timeout"
_timeout = 0.02
self.__s.settimeout(_timeout)
_t1 = time.time()
self.failUnlessRaises(socket.error, self.__s.connect,
self.__addr_remote)
_t2 = time.time()
_delta = abs(_t1 - _t2)
self.assert_(_delta < _timeout + 0.5,
"timeout (%f) is 0.5 seconds more than required (%f)"
%(_delta, _timeout))
def testRecvTimeout(self):
"Test recv() timeout"
_timeout = 0.02
self.__s.connect(self.__addr_remote)
self.__s.settimeout(_timeout)
_t1 = time.time()
self.failUnlessRaises(socket.error, self.__s.recv, 1024)
_t2 = time.time()
_delta = abs(_t1 - _t2)
self.assert_(_delta < _timeout + 0.5,
"timeout (%f) is 0.5 seconds more than required (%f)"
%(_delta, _timeout))
def testAcceptTimeout(self):
"Test accept() timeout()"
_timeout = 2
self.__s.settimeout(_timeout)
self.__s.bind(self.__addr_local)
self.__s.listen(5)
_t1 = time.time()
self.failUnlessRaises(socket.error, self.__s.accept)
_t2 = time.time()
_delta = abs(_t1 - _t2)
self.assert_(_delta < _timeout + 0.5,
"timeout (%f) is 0.5 seconds more than required (%f)"
%(_delta, _timeout))
def testRecvfromTimeout(self):
"Test recvfrom() timeout()"
_timeout = 2
self.__s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.__s.settimeout(_timeout)
self.__s.bind(self.__addr_local)
_t1 = time.time()
self.failUnlessRaises(socket.error, self.__s.recvfrom, 8192)
_t2 = time.time()
_delta = abs(_t1 - _t2)
self.assert_(_delta < _timeout + 0.5,
"timeout (%f) is 0.5 seconds more than required (%f)"
%(_delta, _timeout))
def testSend(self):
"Test send() timeout"
# couldn't figure out how to test it
pass
def testSendto(self):
"Test sendto() timeout"
# couldn't figure out how to test it
pass
def testSendall(self):
"Test sendall() timeout"
# couldn't figure out how to test it
pass
def suite():
suite = unittest.TestSuite()
return suite
if __name__ == "__main__":
unittest.main()
This diff is collapsed.
...@@ -83,6 +83,9 @@ typedef struct { ...@@ -83,6 +83,9 @@ typedef struct {
PyObject *(*errorhandler)(void); /* Error handler; checks PyObject *(*errorhandler)(void); /* Error handler; checks
errno, returns NULL and errno, returns NULL and
sets a Python exception */ sets a Python exception */
int sock_blocking; /* Flag indicated whether the
socket is in blocking mode */
double sock_timeout; /* Operation timeout value */
} PySocketSockObject; } PySocketSockObject;
/* --- C API ----------------------------------------------------*/ /* --- C API ----------------------------------------------------*/
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment