Kaydet (Commit) 16ed5b4b authored tarafından Facundo Batista's avatar Facundo Batista

New tests for basic behavior of smtplib.SMTP and

smtpd.DebuggingServer. Change to use global host & port number
variables. Modified the 'server' to take a string to send back in
order to vary test server responses. Added a test for the reaction of
smtplib.SMTP to a non-200 HELO response. [GSoC - Alan McIntyre]
üst 12adef9b
import asyncore
import socket import socket
import threading import threading
import smtpd
import smtplib import smtplib
import StringIO
import sys
import time import time
import select
from unittest import TestCase from unittest import TestCase
from test import test_support from test import test_support
HOST = "localhost"
PORT = 54328
def server(evt): def server(evt, buf):
serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serv.settimeout(3) serv.settimeout(3)
serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
serv.bind(("", 9091)) serv.bind(("", PORT))
serv.listen(5) serv.listen(5)
try: try:
conn, addr = serv.accept() conn, addr = serv.accept()
except socket.timeout: except socket.timeout:
pass pass
else: else:
conn.send("220 Hola mundo\n") n = 200
while buf and n > 0:
r, w, e = select.select([], [conn], [])
if w:
sent = conn.send(buf)
buf = buf[sent:]
n -= 1
time.sleep(0.01)
conn.close() conn.close()
finally: finally:
serv.close() serv.close()
...@@ -28,26 +44,42 @@ class GeneralTests(TestCase): ...@@ -28,26 +44,42 @@ class GeneralTests(TestCase):
def setUp(self): def setUp(self):
self.evt = threading.Event() self.evt = threading.Event()
threading.Thread(target=server, args=(self.evt,)).start() servargs = (self.evt, "220 Hola mundo\n")
threading.Thread(target=server, args=servargs).start()
time.sleep(.1) time.sleep(.1)
def tearDown(self): def tearDown(self):
self.evt.wait() self.evt.wait()
def testBasic(self): def testBasic1(self):
# connects # connects
smtp = smtplib.SMTP("localhost", 9091) smtp = smtplib.SMTP(HOST, PORT)
smtp.sock.close() smtp.sock.close()
def testBasic2(self):
# connects, include port in host name
smtp = smtplib.SMTP("%s:%s" % (HOST, PORT))
smtp.sock.close()
def testLocalHostName(self):
# check that supplied local_hostname is used
smtp = smtplib.SMTP(HOST, PORT, local_hostname="testhost")
self.assertEqual(smtp.local_hostname, "testhost")
smtp.sock.close()
def testNonnumericPort(self):
# check that non-numeric port raises ValueError
self.assertRaises(socket.error, smtplib.SMTP, "localhost", "bogus")
def testTimeoutDefault(self): def testTimeoutDefault(self):
# default # default
smtp = smtplib.SMTP("localhost", 9091) smtp = smtplib.SMTP(HOST, PORT)
self.assertTrue(smtp.sock.gettimeout() is None) self.assertTrue(smtp.sock.gettimeout() is None)
smtp.sock.close() smtp.sock.close()
def testTimeoutValue(self): def testTimeoutValue(self):
# a value # a value
smtp = smtplib.SMTP("localhost", 9091, timeout=30) smtp = smtplib.SMTP(HOST, PORT, timeout=30)
self.assertEqual(smtp.sock.gettimeout(), 30) self.assertEqual(smtp.sock.gettimeout(), 30)
smtp.sock.close() smtp.sock.close()
...@@ -56,16 +88,95 @@ class GeneralTests(TestCase): ...@@ -56,16 +88,95 @@ class GeneralTests(TestCase):
previous = socket.getdefaulttimeout() previous = socket.getdefaulttimeout()
socket.setdefaulttimeout(30) socket.setdefaulttimeout(30)
try: try:
smtp = smtplib.SMTP("localhost", 9091, timeout=None) smtp = smtplib.SMTP(HOST, PORT, timeout=None)
finally: finally:
socket.setdefaulttimeout(previous) socket.setdefaulttimeout(previous)
self.assertEqual(smtp.sock.gettimeout(), 30) self.assertEqual(smtp.sock.gettimeout(), 30)
smtp.sock.close() smtp.sock.close()
# Test server using smtpd.DebuggingServer
def debugging_server(evt):
serv = smtpd.DebuggingServer(("", PORT), ('nowhere', -1))
try:
asyncore.loop(timeout=.01, count=300)
except socket.timeout:
pass
finally:
# allow some time for the client to read the result
time.sleep(0.5)
asyncore.close_all()
evt.set()
MSG_BEGIN = '---------- MESSAGE FOLLOWS ----------\n'
MSG_END = '------------ END MESSAGE ------------\n'
# Test behavior of smtpd.DebuggingServer
class DebuggingServerTests(TestCase):
def setUp(self):
self.old_stdout = sys.stdout
self.output = StringIO.StringIO()
sys.stdout = self.output
self.evt = threading.Event()
threading.Thread(target=debugging_server, args=(self.evt,)).start()
time.sleep(.5)
def tearDown(self):
self.evt.wait()
sys.stdout = self.old_stdout
def testBasic(self):
# connect
smtp = smtplib.SMTP(HOST, PORT)
smtp.sock.close()
def testEHLO(self):
smtp = smtplib.SMTP(HOST, PORT)
self.assertEqual(smtp.ehlo(), (502, 'Error: command "EHLO" not implemented'))
smtp.sock.close()
def testHELP(self):
smtp = smtplib.SMTP(HOST, PORT)
self.assertEqual(smtp.help(), 'Error: command "HELP" not implemented')
smtp.sock.close()
def testSend(self):
# connect and send mail
m = 'A test message'
smtp = smtplib.SMTP(HOST, PORT)
smtp.sendmail('John', 'Sally', m)
smtp.sock.close()
self.evt.wait()
self.output.flush()
mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END)
self.assertEqual(self.output.getvalue(), mexpect)
class BadHELOServerTests(TestCase):
def setUp(self):
self.old_stdout = sys.stdout
self.output = StringIO.StringIO()
sys.stdout = self.output
self.evt = threading.Event()
servargs = (self.evt, "199 no hello for you!\n")
threading.Thread(target=server, args=servargs).start()
time.sleep(.5)
def tearDown(self):
self.evt.wait()
sys.stdout = self.old_stdout
def testFailingHELO(self):
self.assertRaises(smtplib.SMTPConnectError, smtplib.SMTP, HOST, PORT)
def test_main(verbose=None): def test_main(verbose=None):
test_support.run_unittest(GeneralTests) test_support.run_unittest(GeneralTests, DebuggingServerTests, BadHELOServerTests)
if __name__ == '__main__': if __name__ == '__main__':
test_main() test_main()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment