test_smtplib.py 30.3 KB
Newer Older
1
import asyncore
2
import email.mime.text
3
import email.utils
4
import socket
5
import smtpd
6
import smtplib
7
import io
8
import re
9
import sys
10
import time
11
import select
12

13
import unittest
14
from test import support, mock_socket
15

16 17 18 19 20
try:
    import threading
except ImportError:
    threading = None

21
HOST = support.HOST
Christian Heimes's avatar
Christian Heimes committed
22

23 24 25 26 27 28 29 30
if sys.platform == 'darwin':
    # select.poll returns a select.POLLHUP at the end of the tests
    # on darwin, so just ignore it
    def handle_expt(self):
        pass
    smtpd.SMTPChannel.handle_expt = handle_expt


Christian Heimes's avatar
Christian Heimes committed
31
def server(evt, buf, serv):
Christian Heimes's avatar
Christian Heimes committed
32 33
    serv.listen(5)
    evt.set()
34 35 36 37 38
    try:
        conn, addr = serv.accept()
    except socket.timeout:
        pass
    else:
39 40 41 42 43 44 45 46 47
        n = 500
        while buf and n > 0:
            r, w, e = select.select([], [conn], [])
            if w:
                sent = conn.send(buf)
                buf = buf[sent:]

            n -= 1

48 49 50 51 52
        conn.close()
    finally:
        serv.close()
        evt.set()

53
class GeneralTests(unittest.TestCase):
54 55

    def setUp(self):
56 57
        smtplib.socket = mock_socket
        self.port = 25
58 59

    def tearDown(self):
60
        smtplib.socket = socket
61

62 63 64 65 66 67 68
    # This method is no longer used but is retained for backward compatibility,
    # so test to make sure it still works.
    def testQuoteData(self):
        teststr  = "abc\n.jkl\rfoo\r\n..blue"
        expected = "abc\r\n..jkl\r\nfoo\r\n...blue"
        self.assertEqual(expected, smtplib.quotedata(teststr))

69
    def testBasic1(self):
70
        mock_socket.reply_with(b"220 Hola mundo")
71
        # connects
Christian Heimes's avatar
Christian Heimes committed
72
        smtp = smtplib.SMTP(HOST, self.port)
Georg Brandl's avatar
Georg Brandl committed
73
        smtp.close()
74

75 76 77 78 79 80 81 82
    def testSourceAddress(self):
        mock_socket.reply_with(b"220 Hola mundo")
        # connects
        smtp = smtplib.SMTP(HOST, self.port,
                source_address=('127.0.0.1',19876))
        self.assertEqual(smtp.source_address, ('127.0.0.1', 19876))
        smtp.close()

83
    def testBasic2(self):
84
        mock_socket.reply_with(b"220 Hola mundo")
85
        # connects, include port in host name
Christian Heimes's avatar
Christian Heimes committed
86
        smtp = smtplib.SMTP("%s:%s" % (HOST, self.port))
Georg Brandl's avatar
Georg Brandl committed
87
        smtp.close()
88 89

    def testLocalHostName(self):
90
        mock_socket.reply_with(b"220 Hola mundo")
91
        # check that supplied local_hostname is used
Christian Heimes's avatar
Christian Heimes committed
92
        smtp = smtplib.SMTP(HOST, self.port, local_hostname="testhost")
93
        self.assertEqual(smtp.local_hostname, "testhost")
Georg Brandl's avatar
Georg Brandl committed
94
        smtp.close()
95 96

    def testTimeoutDefault(self):
97
        mock_socket.reply_with(b"220 Hola mundo")
98 99 100
        self.assertTrue(mock_socket.getdefaulttimeout() is None)
        mock_socket.setdefaulttimeout(30)
        self.assertEqual(mock_socket.getdefaulttimeout(), 30)
Georg Brandl's avatar
Georg Brandl committed
101 102 103
        try:
            smtp = smtplib.SMTP(HOST, self.port)
        finally:
104
            mock_socket.setdefaulttimeout(None)
105
        self.assertEqual(smtp.sock.gettimeout(), 30)
Georg Brandl's avatar
Georg Brandl committed
106
        smtp.close()
107 108

    def testTimeoutNone(self):
109
        mock_socket.reply_with(b"220 Hola mundo")
Georg Brandl's avatar
Georg Brandl committed
110
        self.assertTrue(socket.getdefaulttimeout() is None)
111 112
        socket.setdefaulttimeout(30)
        try:
Christian Heimes's avatar
Christian Heimes committed
113
            smtp = smtplib.SMTP(HOST, self.port, timeout=None)
114
        finally:
Georg Brandl's avatar
Georg Brandl committed
115 116 117 118 119
            socket.setdefaulttimeout(None)
        self.assertTrue(smtp.sock.gettimeout() is None)
        smtp.close()

    def testTimeoutValue(self):
120
        mock_socket.reply_with(b"220 Hola mundo")
Georg Brandl's avatar
Georg Brandl committed
121
        smtp = smtplib.SMTP(HOST, self.port, timeout=30)
122
        self.assertEqual(smtp.sock.gettimeout(), 30)
Georg Brandl's avatar
Georg Brandl committed
123
        smtp.close()
124 125


126
# Test server thread using the specified SMTP server class
Christian Heimes's avatar
Christian Heimes committed
127
def debugging_server(serv, serv_evt, client_evt):
Christian Heimes's avatar
Christian Heimes committed
128
    serv_evt.set()
129 130 131 132 133 134 135 136 137 138 139 140 141

    try:
        if hasattr(select, 'poll'):
            poll_fun = asyncore.poll2
        else:
            poll_fun = asyncore.poll

        n = 1000
        while asyncore.socket_map and n > 0:
            poll_fun(0.01, asyncore.socket_map)

            # when the client conversation is finished, it will
            # set client_evt, and it's then ok to kill the server
142
            if client_evt.is_set():
143 144 145 146 147 148 149 150
                serv.close()
                break

            n -= 1

    except socket.timeout:
        pass
    finally:
151
        if not client_evt.is_set():
Christian Heimes's avatar
Christian Heimes committed
152 153 154
            # allow some time for the client to read the result
            time.sleep(0.5)
            serv.close()
155 156 157 158 159 160
        asyncore.close_all()
        serv_evt.set()

MSG_BEGIN = '---------- MESSAGE FOLLOWS ----------\n'
MSG_END = '------------ END MESSAGE ------------\n'

161 162 163
# NOTE: Some SMTP objects in the tests below are created with a non-default
# local_hostname argument to the constructor, since (on some systems) the FQDN
# lookup caused by the default local_hostname sometimes takes so long that the
164
# test server times out, causing the test to fail.
165 166

# Test behavior of smtpd.DebuggingServer
167 168
@unittest.skipUnless(threading, 'Threading required for this test.')
class DebuggingServerTests(unittest.TestCase):
169

170 171
    maxDiff = None

172
    def setUp(self):
173 174
        self.real_getfqdn = socket.getfqdn
        socket.getfqdn = mock_socket.getfqdn
175 176 177 178 179
        # temporarily replace sys.stdout to capture DebuggingServer output
        self.old_stdout = sys.stdout
        self.output = io.StringIO()
        sys.stdout = self.output

180
        self._threads = support.threading_setup()
181 182
        self.serv_evt = threading.Event()
        self.client_evt = threading.Event()
183 184 185
        # Capture SMTPChannel debug output
        self.old_DEBUGSTREAM = smtpd.DEBUGSTREAM
        smtpd.DEBUGSTREAM = io.StringIO()
186 187 188 189
        # Pick a random unused port by passing 0 for the port number
        self.serv = smtpd.DebuggingServer((HOST, 0), ('nowhere', -1))
        # Keep a note of what port was assigned
        self.port = self.serv.socket.getsockname()[1]
Christian Heimes's avatar
Christian Heimes committed
190
        serv_args = (self.serv, self.serv_evt, self.client_evt)
191 192
        self.thread = threading.Thread(target=debugging_server, args=serv_args)
        self.thread.start()
193 194

        # wait until server thread has assigned a port number
Christian Heimes's avatar
Christian Heimes committed
195 196
        self.serv_evt.wait()
        self.serv_evt.clear()
197 198

    def tearDown(self):
199
        socket.getfqdn = self.real_getfqdn
200 201 202 203
        # indicate that the client is finished
        self.client_evt.set()
        # wait for the server thread to terminate
        self.serv_evt.wait()
204 205
        self.thread.join()
        support.threading_cleanup(*self._threads)
206 207
        # restore sys.stdout
        sys.stdout = self.old_stdout
208 209 210
        # restore DEBUGSTREAM
        smtpd.DEBUGSTREAM.close()
        smtpd.DEBUGSTREAM = self.old_DEBUGSTREAM
211 212 213

    def testBasic(self):
        # connect
Christian Heimes's avatar
Christian Heimes committed
214
        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
215 216
        smtp.quit()

217 218
    def testSourceAddress(self):
        # connect
219 220 221 222 223 224 225 226 227 228 229
        port = support.find_unused_port()
        try:
            smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
                    timeout=3, source_address=('127.0.0.1', port))
            self.assertEqual(smtp.source_address, ('127.0.0.1', port))
            self.assertEqual(smtp.local_hostname, 'localhost')
            smtp.quit()
        except IOError as e:
            if e.errno == errno.EADDRINUSE:
                self.skipTest("couldn't bind to port %d" % port)
            raise
230

231
    def testNOOP(self):
Christian Heimes's avatar
Christian Heimes committed
232
        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
233 234 235 236 237
        expected = (250, b'Ok')
        self.assertEqual(smtp.noop(), expected)
        smtp.quit()

    def testRSET(self):
Christian Heimes's avatar
Christian Heimes committed
238
        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
239 240 241 242 243 244
        expected = (250, b'Ok')
        self.assertEqual(smtp.rset(), expected)
        smtp.quit()

    def testNotImplemented(self):
        # EHLO isn't implemented in DebuggingServer
Christian Heimes's avatar
Christian Heimes committed
245
        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
246 247 248 249
        expected = (502, b'Error: command "EHLO" not implemented')
        self.assertEqual(smtp.ehlo(), expected)
        smtp.quit()

250 251
    def testVRFY(self):
        # VRFY isn't implemented in DebuggingServer
Christian Heimes's avatar
Christian Heimes committed
252
        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
253 254 255 256 257 258 259 260
        expected = (502, b'Error: command "VRFY" not implemented')
        self.assertEqual(smtp.vrfy('nobody@nowhere.com'), expected)
        self.assertEqual(smtp.verify('nobody@nowhere.com'), expected)
        smtp.quit()

    def testSecondHELO(self):
        # check that a second HELO returns a message that it's a duplicate
        # (this behavior is specific to smtpd.SMTPChannel)
Christian Heimes's avatar
Christian Heimes committed
261
        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
262 263 264 265 266
        smtp.helo()
        expected = (503, b'Duplicate HELO/EHLO')
        self.assertEqual(smtp.helo(), expected)
        smtp.quit()

267
    def testHELP(self):
Christian Heimes's avatar
Christian Heimes committed
268
        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
269 270 271 272 273 274
        self.assertEqual(smtp.help(), b'Error: command "HELP" not implemented')
        smtp.quit()

    def testSend(self):
        # connect and send mail
        m = 'A test message'
Christian Heimes's avatar
Christian Heimes committed
275
        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
276
        smtp.sendmail('John', 'Sally', m)
277 278 279 280
        # XXX(nnorwitz): this test is flaky and dies with a bad file descriptor
        # in asyncore.  This sleep might help, but should really be fixed
        # properly by using an Event variable.
        time.sleep(0.01)
281 282 283 284 285 286 287 288
        smtp.quit()

        self.client_evt.set()
        self.serv_evt.wait()
        self.output.flush()
        mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END)
        self.assertEqual(self.output.getvalue(), mexpect)

289 290 291 292 293 294 295 296 297 298 299 300 301 302
    def testSendBinary(self):
        m = b'A test message'
        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
        smtp.sendmail('John', 'Sally', m)
        # XXX (see comment in testSend)
        time.sleep(0.01)
        smtp.quit()

        self.client_evt.set()
        self.serv_evt.wait()
        self.output.flush()
        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.decode('ascii'), MSG_END)
        self.assertEqual(self.output.getvalue(), mexpect)

303 304 305 306 307 308 309 310 311 312 313 314 315 316 317
    def testSendNeedingDotQuote(self):
        # Issue 12283
        m = '.A test\n.mes.sage.'
        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
        smtp.sendmail('John', 'Sally', m)
        # XXX (see comment in testSend)
        time.sleep(0.01)
        smtp.quit()

        self.client_evt.set()
        self.serv_evt.wait()
        self.output.flush()
        mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END)
        self.assertEqual(self.output.getvalue(), mexpect)

318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334
    def testSendNullSender(self):
        m = 'A test message'
        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
        smtp.sendmail('<>', 'Sally', m)
        # XXX (see comment in testSend)
        time.sleep(0.01)
        smtp.quit()

        self.client_evt.set()
        self.serv_evt.wait()
        self.output.flush()
        mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END)
        self.assertEqual(self.output.getvalue(), mexpect)
        debugout = smtpd.DEBUGSTREAM.getvalue()
        sender = re.compile("^sender: <>$", re.MULTILINE)
        self.assertRegex(debugout, sender)

335 336 337 338 339 340 341 342 343 344 345 346
    def testSendMessage(self):
        m = email.mime.text.MIMEText('A test message')
        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
        smtp.send_message(m, from_addr='John', to_addrs='Sally')
        # XXX (see comment in testSend)
        time.sleep(0.01)
        smtp.quit()

        self.client_evt.set()
        self.serv_evt.wait()
        self.output.flush()
        # Add the X-Peer header that DebuggingServer adds
347
        m['X-Peer'] = socket.gethostbyname('localhost')
348 349 350 351 352 353 354 355 356 357 358 359 360 361
        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
        self.assertEqual(self.output.getvalue(), mexpect)

    def testSendMessageWithAddresses(self):
        m = email.mime.text.MIMEText('A test message')
        m['From'] = 'foo@bar.com'
        m['To'] = 'John'
        m['CC'] = 'Sally, Fred'
        m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>'
        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
        smtp.send_message(m)
        # XXX (see comment in testSend)
        time.sleep(0.01)
        smtp.quit()
362 363 364
        # make sure the Bcc header is still in the message.
        self.assertEqual(m['Bcc'], 'John Root <root@localhost>, "Dinsdale" '
                                    '<warped@silly.walks.com>')
365 366 367 368 369

        self.client_evt.set()
        self.serv_evt.wait()
        self.output.flush()
        # Add the X-Peer header that DebuggingServer adds
370
        m['X-Peer'] = socket.gethostbyname('localhost')
371
        # The Bcc header should not be transmitted.
372 373 374 375 376
        del m['Bcc']
        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
        self.assertEqual(self.output.getvalue(), mexpect)
        debugout = smtpd.DEBUGSTREAM.getvalue()
        sender = re.compile("^sender: foo@bar.com$", re.MULTILINE)
377
        self.assertRegex(debugout, sender)
378 379 380 381
        for addr in ('John', 'Sally', 'Fred', 'root@localhost',
                     'warped@silly.walks.com'):
            to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
                                 re.MULTILINE)
382
            self.assertRegex(debugout, to_addr)
383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398

    def testSendMessageWithSomeAddresses(self):
        # Make sure nothing breaks if not all of the three 'to' headers exist
        m = email.mime.text.MIMEText('A test message')
        m['From'] = 'foo@bar.com'
        m['To'] = 'John, Dinsdale'
        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
        smtp.send_message(m)
        # XXX (see comment in testSend)
        time.sleep(0.01)
        smtp.quit()

        self.client_evt.set()
        self.serv_evt.wait()
        self.output.flush()
        # Add the X-Peer header that DebuggingServer adds
399
        m['X-Peer'] = socket.gethostbyname('localhost')
400 401 402 403
        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
        self.assertEqual(self.output.getvalue(), mexpect)
        debugout = smtpd.DEBUGSTREAM.getvalue()
        sender = re.compile("^sender: foo@bar.com$", re.MULTILINE)
404
        self.assertRegex(debugout, sender)
405 406 407
        for addr in ('John', 'Dinsdale'):
            to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
                                 re.MULTILINE)
408
            self.assertRegex(debugout, to_addr)
409

410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515
    def testSendMessageWithSpecifiedAddresses(self):
        # Make sure addresses specified in call override those in message.
        m = email.mime.text.MIMEText('A test message')
        m['From'] = 'foo@bar.com'
        m['To'] = 'John, Dinsdale'
        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
        smtp.send_message(m, from_addr='joe@example.com', to_addrs='foo@example.net')
        # XXX (see comment in testSend)
        time.sleep(0.01)
        smtp.quit()

        self.client_evt.set()
        self.serv_evt.wait()
        self.output.flush()
        # Add the X-Peer header that DebuggingServer adds
        m['X-Peer'] = socket.gethostbyname('localhost')
        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
        self.assertEqual(self.output.getvalue(), mexpect)
        debugout = smtpd.DEBUGSTREAM.getvalue()
        sender = re.compile("^sender: joe@example.com$", re.MULTILINE)
        self.assertRegex(debugout, sender)
        for addr in ('John', 'Dinsdale'):
            to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
                                 re.MULTILINE)
            self.assertNotRegex(debugout, to_addr)
        recip = re.compile(r"^recips: .*'foo@example.net'.*$", re.MULTILINE)
        self.assertRegex(debugout, recip)

    def testSendMessageWithMultipleFrom(self):
        # Sender overrides To
        m = email.mime.text.MIMEText('A test message')
        m['From'] = 'Bernard, Bianca'
        m['Sender'] = 'the_rescuers@Rescue-Aid-Society.com'
        m['To'] = 'John, Dinsdale'
        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
        smtp.send_message(m)
        # XXX (see comment in testSend)
        time.sleep(0.01)
        smtp.quit()

        self.client_evt.set()
        self.serv_evt.wait()
        self.output.flush()
        # Add the X-Peer header that DebuggingServer adds
        m['X-Peer'] = socket.gethostbyname('localhost')
        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
        self.assertEqual(self.output.getvalue(), mexpect)
        debugout = smtpd.DEBUGSTREAM.getvalue()
        sender = re.compile("^sender: the_rescuers@Rescue-Aid-Society.com$", re.MULTILINE)
        self.assertRegex(debugout, sender)
        for addr in ('John', 'Dinsdale'):
            to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
                                 re.MULTILINE)
            self.assertRegex(debugout, to_addr)

    def testSendMessageResent(self):
        m = email.mime.text.MIMEText('A test message')
        m['From'] = 'foo@bar.com'
        m['To'] = 'John'
        m['CC'] = 'Sally, Fred'
        m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>'
        m['Resent-Date'] = 'Thu, 1 Jan 1970 17:42:00 +0000'
        m['Resent-From'] = 'holy@grail.net'
        m['Resent-To'] = 'Martha <my_mom@great.cooker.com>, Jeff'
        m['Resent-Bcc'] = 'doe@losthope.net'
        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
        smtp.send_message(m)
        # XXX (see comment in testSend)
        time.sleep(0.01)
        smtp.quit()

        self.client_evt.set()
        self.serv_evt.wait()
        self.output.flush()
        # The Resent-Bcc headers are deleted before serialization.
        del m['Bcc']
        del m['Resent-Bcc']
        # Add the X-Peer header that DebuggingServer adds
        m['X-Peer'] = socket.gethostbyname('localhost')
        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
        self.assertEqual(self.output.getvalue(), mexpect)
        debugout = smtpd.DEBUGSTREAM.getvalue()
        sender = re.compile("^sender: holy@grail.net$", re.MULTILINE)
        self.assertRegex(debugout, sender)
        for addr in ('my_mom@great.cooker.com', 'Jeff', 'doe@losthope.net'):
            to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
                                 re.MULTILINE)
            self.assertRegex(debugout, to_addr)

    def testSendMessageMultipleResentRaises(self):
        m = email.mime.text.MIMEText('A test message')
        m['From'] = 'foo@bar.com'
        m['To'] = 'John'
        m['CC'] = 'Sally, Fred'
        m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>'
        m['Resent-Date'] = 'Thu, 1 Jan 1970 17:42:00 +0000'
        m['Resent-From'] = 'holy@grail.net'
        m['Resent-To'] = 'Martha <my_mom@great.cooker.com>, Jeff'
        m['Resent-Bcc'] = 'doe@losthope.net'
        m['Resent-Date'] = 'Thu, 2 Jan 1970 17:42:00 +0000'
        m['Resent-To'] = 'holy@grail.net'
        m['Resent-From'] = 'Martha <my_mom@great.cooker.com>, Jeff'
        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
        with self.assertRaises(ValueError):
            smtp.send_message(m)
        smtp.close()
516

517
class NonConnectingTests(unittest.TestCase):
Christian Heimes's avatar
Christian Heimes committed
518

519 520 521 522 523 524
    def setUp(self):
        smtplib.socket = mock_socket

    def tearDown(self):
        smtplib.socket = socket

Christian Heimes's avatar
Christian Heimes committed
525 526 527 528 529 530 531 532 533 534 535 536
    def testNotConnected(self):
        # Test various operations on an unconnected SMTP object that
        # should raise exceptions (at present the attempt in SMTP.send
        # to reference the nonexistent 'sock' attribute of the SMTP object
        # causes an AttributeError)
        smtp = smtplib.SMTP()
        self.assertRaises(smtplib.SMTPServerDisconnected, smtp.ehlo)
        self.assertRaises(smtplib.SMTPServerDisconnected,
                          smtp.send, 'test msg')

    def testNonnumericPort(self):
        # check that non-numeric port raises socket.error
537
        self.assertRaises(mock_socket.error, smtplib.SMTP,
Christian Heimes's avatar
Christian Heimes committed
538
                          "localhost", "bogus")
539
        self.assertRaises(mock_socket.error, smtplib.SMTP,
Christian Heimes's avatar
Christian Heimes committed
540 541 542
                          "localhost:bogus")


543
# test response of client to a non-successful HELO message
544 545
@unittest.skipUnless(threading, 'Threading required for this test.')
class BadHELOServerTests(unittest.TestCase):
546 547

    def setUp(self):
548 549
        smtplib.socket = mock_socket
        mock_socket.reply_with(b"199 no hello for you!")
550 551 552
        self.old_stdout = sys.stdout
        self.output = io.StringIO()
        sys.stdout = self.output
553
        self.port = 25
554 555

    def tearDown(self):
556
        smtplib.socket = socket
557 558 559 560
        sys.stdout = self.old_stdout

    def testFailingHELO(self):
        self.assertRaises(smtplib.SMTPConnectError, smtplib.SMTP,
Christian Heimes's avatar
Christian Heimes committed
561
                            HOST, self.port, 'localhost', 3)
562

563 564

sim_users = {'Mr.A@somewhere.com':'John A',
565
             'Ms.B@xn--fo-fka.com':'Sally B',
566 567 568
             'Mrs.C@somewhereesle.com':'Ruth C',
            }

569
sim_auth = ('Mr.A@somewhere.com', 'somepassword')
570 571 572 573 574 575 576 577 578
sim_cram_md5_challenge = ('PENCeUxFREJoU0NnbmhNWitOMjNGNn'
                          'dAZWx3b29kLmlubm9zb2Z0LmNvbT4=')
sim_auth_credentials = {
    'login': 'TXIuQUBzb21ld2hlcmUuY29t',
    'plain': 'AE1yLkFAc29tZXdoZXJlLmNvbQBzb21lcGFzc3dvcmQ=',
    'cram-md5': ('TXIUQUBZB21LD2HLCMUUY29TIDG4OWQ0MJ'
                 'KWZGQ4ODNMNDA4NTGXMDRLZWMYZJDMODG1'),
    }
sim_auth_login_password = 'C29TZXBHC3N3B3JK'
579

580
sim_lists = {'list-1':['Mr.A@somewhere.com','Mrs.C@somewhereesle.com'],
581
             'list-2':['Ms.B@xn--fo-fka.com',],
582 583 584 585
            }

# Simulated SMTP channel & server
class SimSMTPChannel(smtpd.SMTPChannel):
586

587 588 589
    # For testing failures in QUIT when using the context manager API.
    quit_response = None

590 591 592
    def __init__(self, extra_features, *args, **kw):
        self._extrafeatures = ''.join(
            [ "250-{0}\r\n".format(x) for x in extra_features ])
593 594
        super(SimSMTPChannel, self).__init__(*args, **kw)

595
    def smtp_EHLO(self, arg):
596 597 598 599 600 601
        resp = ('250-testhost\r\n'
                '250-EXPN\r\n'
                '250-SIZE 20000000\r\n'
                '250-STARTTLS\r\n'
                '250-DELIVERBY\r\n')
        resp = resp + self._extrafeatures + '250 HELP'
602 603 604
        self.push(resp)

    def smtp_VRFY(self, arg):
605 606 607
        # For max compatibility smtplib should be sending the raw address.
        if arg in sim_users:
            self.push('250 %s %s' % (sim_users[arg], smtplib.quoteaddr(arg)))
608 609 610 611
        else:
            self.push('550 No such user: %s' % arg)

    def smtp_EXPN(self, arg):
612
        list_name = arg.lower()
613 614 615 616 617 618 619 620 621 622 623
        if list_name in sim_lists:
            user_list = sim_lists[list_name]
            for n, user_email in enumerate(user_list):
                quoted_addr = smtplib.quoteaddr(user_email)
                if n < len(user_list) - 1:
                    self.push('250-%s %s' % (sim_users[user_email], quoted_addr))
                else:
                    self.push('250 %s %s' % (sim_users[user_email], quoted_addr))
        else:
            self.push('550 No access for you!')

624
    def smtp_AUTH(self, arg):
625 626 627
        if arg.strip().lower()=='cram-md5':
            self.push('334 {}'.format(sim_cram_md5_challenge))
            return
628
        mech, auth = arg.split()
629 630
        mech = mech.lower()
        if mech not in sim_auth_credentials:
631
            self.push('504 auth type unimplemented')
632 633 634 635 636 637 638
            return
        if mech == 'plain' and auth==sim_auth_credentials['plain']:
            self.push('235 plain auth ok')
        elif mech=='login' and auth==sim_auth_credentials['login']:
            self.push('334 Password:')
        else:
            self.push('550 No access for you!')
639

640 641 642 643 644 645 646 647
    def smtp_QUIT(self, arg):
        # args is ignored
        if self.quit_response is None:
            super(SimSMTPChannel, self).smtp_QUIT(arg)
        else:
            self.push(self.quit_response)
            self.close_when_done()

648 649 650
    def handle_error(self):
        raise

651 652

class SimSMTPServer(smtpd.SMTPServer):
653

654 655 656
    # For testing failures in QUIT when using the context manager API.
    quit_response = None

657 658 659 660
    def __init__(self, *args, **kw):
        self._extra_features = []
        smtpd.SMTPServer.__init__(self, *args, **kw)

661
    def handle_accepted(self, conn, addr):
662 663 664
        self._SMTPchannel = SimSMTPChannel(
            self._extra_features, self, conn, addr)
        self._SMTPchannel.quit_response = self.quit_response
665 666 667 668

    def process_message(self, peer, mailfrom, rcpttos, data):
        pass

669
    def add_feature(self, feature):
670
        self._extra_features.append(feature)
671

672 673 674
    def handle_error(self):
        raise

675 676 677

# Test various SMTP & ESMTP commands/behaviors that require a simulated server
# (i.e., something with more features than DebuggingServer)
678 679
@unittest.skipUnless(threading, 'Threading required for this test.')
class SMTPSimTests(unittest.TestCase):
680 681

    def setUp(self):
682 683
        self.real_getfqdn = socket.getfqdn
        socket.getfqdn = mock_socket.getfqdn
684
        self._threads = support.threading_setup()
685 686
        self.serv_evt = threading.Event()
        self.client_evt = threading.Event()
687 688 689 690
        # Pick a random unused port by passing 0 for the port number
        self.serv = SimSMTPServer((HOST, 0), ('nowhere', -1))
        # Keep a note of what port was assigned
        self.port = self.serv.socket.getsockname()[1]
Christian Heimes's avatar
Christian Heimes committed
691
        serv_args = (self.serv, self.serv_evt, self.client_evt)
692 693
        self.thread = threading.Thread(target=debugging_server, args=serv_args)
        self.thread.start()
694 695

        # wait until server thread has assigned a port number
Christian Heimes's avatar
Christian Heimes committed
696 697
        self.serv_evt.wait()
        self.serv_evt.clear()
698 699

    def tearDown(self):
700
        socket.getfqdn = self.real_getfqdn
701 702 703 704
        # indicate that the client is finished
        self.client_evt.set()
        # wait for the server thread to terminate
        self.serv_evt.wait()
705 706
        self.thread.join()
        support.threading_cleanup(*self._threads)
707 708 709

    def testBasic(self):
        # smoke test
Christian Heimes's avatar
Christian Heimes committed
710
        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
711 712 713
        smtp.quit()

    def testEHLO(self):
Christian Heimes's avatar
Christian Heimes committed
714
        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734

        # no features should be present before the EHLO
        self.assertEqual(smtp.esmtp_features, {})

        # features expected from the test server
        expected_features = {'expn':'',
                             'size': '20000000',
                             'starttls': '',
                             'deliverby': '',
                             'help': '',
                             }

        smtp.ehlo()
        self.assertEqual(smtp.esmtp_features, expected_features)
        for k in expected_features:
            self.assertTrue(smtp.has_extn(k))
        self.assertFalse(smtp.has_extn('unsupported-feature'))
        smtp.quit()

    def testVRFY(self):
Christian Heimes's avatar
Christian Heimes committed
735
        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
736 737 738

        for email, name in sim_users.items():
            expected_known = (250, bytes('%s %s' %
739 740
                                         (name, smtplib.quoteaddr(email)),
                                         "ascii"))
741 742 743
            self.assertEqual(smtp.vrfy(email), expected_known)

        u = 'nobody@nowhere.com'
744
        expected_unknown = (550, ('No such user: %s' % u).encode('ascii'))
745 746 747 748
        self.assertEqual(smtp.vrfy(u), expected_unknown)
        smtp.quit()

    def testEXPN(self):
Christian Heimes's avatar
Christian Heimes committed
749
        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
750 751 752 753 754

        for listname, members in sim_lists.items():
            users = []
            for m in members:
                users.append('%s %s' % (sim_users[m], smtplib.quoteaddr(m)))
755
            expected_known = (250, bytes('\n'.join(users), "ascii"))
756 757 758 759 760 761 762
            self.assertEqual(smtp.expn(listname), expected_known)

        u = 'PSU-Members-List'
        expected_unknown = (550, b'No access for you!')
        self.assertEqual(smtp.expn(u), expected_unknown)
        smtp.quit()

763 764
    def testAUTH_PLAIN(self):
        self.serv.add_feature("AUTH PLAIN")
765
        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
766

767
        expected_auth_ok = (235, b'plain auth ok')
768
        self.assertEqual(smtp.login(sim_auth[0], sim_auth[1]), expected_auth_ok)
769
        smtp.close()
770

771 772 773 774 775 776 777 778 779 780
    # SimSMTPChannel doesn't fully support LOGIN or CRAM-MD5 auth because they
    # require a synchronous read to obtain the credentials...so instead smtpd
    # sees the credential sent by smtplib's login method as an unknown command,
    # which results in smtplib raising an auth error.  Fortunately the error
    # message contains the encoded credential, so we can partially check that it
    # was generated correctly (partially, because the 'word' is uppercased in
    # the error message).

    def testAUTH_LOGIN(self):
        self.serv.add_feature("AUTH LOGIN")
781
        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
782 783
        try: smtp.login(sim_auth[0], sim_auth[1])
        except smtplib.SMTPAuthenticationError as err:
784
            self.assertIn(sim_auth_login_password, str(err))
785
        smtp.close()
786 787 788

    def testAUTH_CRAM_MD5(self):
        self.serv.add_feature("AUTH CRAM-MD5")
789
        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
790 791 792

        try: smtp.login(sim_auth[0], sim_auth[1])
        except smtplib.SMTPAuthenticationError as err:
793
            self.assertIn(sim_auth_credentials['cram-md5'], str(err))
794
        smtp.close()
795

796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814
    def test_with_statement(self):
        with smtplib.SMTP(HOST, self.port) as smtp:
            code, message = smtp.noop()
            self.assertEqual(code, 250)
        self.assertRaises(smtplib.SMTPServerDisconnected, smtp.send, b'foo')
        with smtplib.SMTP(HOST, self.port) as smtp:
            smtp.close()
        self.assertRaises(smtplib.SMTPServerDisconnected, smtp.send, b'foo')

    def test_with_statement_QUIT_failure(self):
        self.serv.quit_response = '421 QUIT FAILED'
        with self.assertRaises(smtplib.SMTPResponseException) as error:
            with smtplib.SMTP(HOST, self.port) as smtp:
                smtp.noop()
        self.assertEqual(error.exception.smtp_code, 421)
        self.assertEqual(error.exception.smtp_error, b'QUIT FAILED')
        # We don't need to clean up self.serv.quit_response because a new
        # server is always instantiated in the setUp().

815 816 817
    #TODO: add tests for correct AUTH method fallback now that the
    #test infrastructure can support it.

818

819
def test_main(verbose=None):
820
    support.run_unittest(GeneralTests, DebuggingServerTests,
Christian Heimes's avatar
Christian Heimes committed
821
                              NonConnectingTests,
822
                              BadHELOServerTests, SMTPSimTests)
823 824 825

if __name__ == '__main__':
    test_main()