test_smtplib.py 30.5 KB
Newer Older
1
import asyncore
2
import email.mime.text
3
import email.utils
4
import socket
5
import smtpd
6
import smtplib
7
import io
8
import re
9
import sys
10
import time
11
import select
12
import errno
13

14
import unittest
15
from test import support, mock_socket
16

17 18 19 20 21
try:
    import threading
except ImportError:
    threading = None

22
HOST = support.HOST
Christian Heimes's avatar
Christian Heimes committed
23

24 25 26 27 28 29 30 31
if sys.platform == 'darwin':
    # select.poll returns a select.POLLHUP at the end of the tests
    # on darwin, so just ignore it
    def handle_expt(self):
        pass
    smtpd.SMTPChannel.handle_expt = handle_expt


Christian Heimes's avatar
Christian Heimes committed
32
def server(evt, buf, serv):
Christian Heimes's avatar
Christian Heimes committed
33 34
    serv.listen(5)
    evt.set()
35 36 37 38 39
    try:
        conn, addr = serv.accept()
    except socket.timeout:
        pass
    else:
40 41 42 43 44 45 46 47 48
        n = 500
        while buf and n > 0:
            r, w, e = select.select([], [conn], [])
            if w:
                sent = conn.send(buf)
                buf = buf[sent:]

            n -= 1

49 50 51 52 53
        conn.close()
    finally:
        serv.close()
        evt.set()

54
class GeneralTests(unittest.TestCase):
55 56

    def setUp(self):
57 58
        smtplib.socket = mock_socket
        self.port = 25
59 60

    def tearDown(self):
61
        smtplib.socket = socket
62

63 64 65 66 67 68 69
    # This method is no longer used but is retained for backward compatibility,
    # so test to make sure it still works.
    def testQuoteData(self):
        teststr  = "abc\n.jkl\rfoo\r\n..blue"
        expected = "abc\r\n..jkl\r\nfoo\r\n...blue"
        self.assertEqual(expected, smtplib.quotedata(teststr))

70
    def testBasic1(self):
71
        mock_socket.reply_with(b"220 Hola mundo")
72
        # connects
Christian Heimes's avatar
Christian Heimes committed
73
        smtp = smtplib.SMTP(HOST, self.port)
Georg Brandl's avatar
Georg Brandl committed
74
        smtp.close()
75

76 77 78 79 80 81 82 83
    def testSourceAddress(self):
        mock_socket.reply_with(b"220 Hola mundo")
        # connects
        smtp = smtplib.SMTP(HOST, self.port,
                source_address=('127.0.0.1',19876))
        self.assertEqual(smtp.source_address, ('127.0.0.1', 19876))
        smtp.close()

84
    def testBasic2(self):
85
        mock_socket.reply_with(b"220 Hola mundo")
86
        # connects, include port in host name
Christian Heimes's avatar
Christian Heimes committed
87
        smtp = smtplib.SMTP("%s:%s" % (HOST, self.port))
Georg Brandl's avatar
Georg Brandl committed
88
        smtp.close()
89 90

    def testLocalHostName(self):
91
        mock_socket.reply_with(b"220 Hola mundo")
92
        # check that supplied local_hostname is used
Christian Heimes's avatar
Christian Heimes committed
93
        smtp = smtplib.SMTP(HOST, self.port, local_hostname="testhost")
94
        self.assertEqual(smtp.local_hostname, "testhost")
Georg Brandl's avatar
Georg Brandl committed
95
        smtp.close()
96 97

    def testTimeoutDefault(self):
98
        mock_socket.reply_with(b"220 Hola mundo")
99 100 101
        self.assertTrue(mock_socket.getdefaulttimeout() is None)
        mock_socket.setdefaulttimeout(30)
        self.assertEqual(mock_socket.getdefaulttimeout(), 30)
Georg Brandl's avatar
Georg Brandl committed
102 103 104
        try:
            smtp = smtplib.SMTP(HOST, self.port)
        finally:
105
            mock_socket.setdefaulttimeout(None)
106
        self.assertEqual(smtp.sock.gettimeout(), 30)
Georg Brandl's avatar
Georg Brandl committed
107
        smtp.close()
108 109

    def testTimeoutNone(self):
110
        mock_socket.reply_with(b"220 Hola mundo")
Georg Brandl's avatar
Georg Brandl committed
111
        self.assertTrue(socket.getdefaulttimeout() is None)
112 113
        socket.setdefaulttimeout(30)
        try:
Christian Heimes's avatar
Christian Heimes committed
114
            smtp = smtplib.SMTP(HOST, self.port, timeout=None)
115
        finally:
Georg Brandl's avatar
Georg Brandl committed
116 117 118 119 120
            socket.setdefaulttimeout(None)
        self.assertTrue(smtp.sock.gettimeout() is None)
        smtp.close()

    def testTimeoutValue(self):
121
        mock_socket.reply_with(b"220 Hola mundo")
Georg Brandl's avatar
Georg Brandl committed
122
        smtp = smtplib.SMTP(HOST, self.port, timeout=30)
123
        self.assertEqual(smtp.sock.gettimeout(), 30)
Georg Brandl's avatar
Georg Brandl committed
124
        smtp.close()
125 126


127
# Test server thread using the specified SMTP server class
Christian Heimes's avatar
Christian Heimes committed
128
def debugging_server(serv, serv_evt, client_evt):
Christian Heimes's avatar
Christian Heimes committed
129
    serv_evt.set()
130 131 132 133 134 135 136 137 138 139 140 141 142

    try:
        if hasattr(select, 'poll'):
            poll_fun = asyncore.poll2
        else:
            poll_fun = asyncore.poll

        n = 1000
        while asyncore.socket_map and n > 0:
            poll_fun(0.01, asyncore.socket_map)

            # when the client conversation is finished, it will
            # set client_evt, and it's then ok to kill the server
143
            if client_evt.is_set():
144 145 146 147 148 149 150 151
                serv.close()
                break

            n -= 1

    except socket.timeout:
        pass
    finally:
152
        if not client_evt.is_set():
Christian Heimes's avatar
Christian Heimes committed
153 154 155
            # allow some time for the client to read the result
            time.sleep(0.5)
            serv.close()
156 157 158 159 160 161
        asyncore.close_all()
        serv_evt.set()

MSG_BEGIN = '---------- MESSAGE FOLLOWS ----------\n'
MSG_END = '------------ END MESSAGE ------------\n'

162 163 164
# NOTE: Some SMTP objects in the tests below are created with a non-default
# local_hostname argument to the constructor, since (on some systems) the FQDN
# lookup caused by the default local_hostname sometimes takes so long that the
165
# test server times out, causing the test to fail.
166 167

# Test behavior of smtpd.DebuggingServer
168 169
@unittest.skipUnless(threading, 'Threading required for this test.')
class DebuggingServerTests(unittest.TestCase):
170

171 172
    maxDiff = None

173
    def setUp(self):
174 175
        self.real_getfqdn = socket.getfqdn
        socket.getfqdn = mock_socket.getfqdn
176 177 178 179 180 181 182
        # temporarily replace sys.stdout to capture DebuggingServer output
        self.old_stdout = sys.stdout
        self.output = io.StringIO()
        sys.stdout = self.output

        self.serv_evt = threading.Event()
        self.client_evt = threading.Event()
183 184 185
        # Capture SMTPChannel debug output
        self.old_DEBUGSTREAM = smtpd.DEBUGSTREAM
        smtpd.DEBUGSTREAM = io.StringIO()
186 187 188 189
        # Pick a random unused port by passing 0 for the port number
        self.serv = smtpd.DebuggingServer((HOST, 0), ('nowhere', -1))
        # Keep a note of what port was assigned
        self.port = self.serv.socket.getsockname()[1]
Christian Heimes's avatar
Christian Heimes committed
190
        serv_args = (self.serv, self.serv_evt, self.client_evt)
191 192
        self.thread = threading.Thread(target=debugging_server, args=serv_args)
        self.thread.start()
193 194

        # wait until server thread has assigned a port number
Christian Heimes's avatar
Christian Heimes committed
195 196
        self.serv_evt.wait()
        self.serv_evt.clear()
197 198

    def tearDown(self):
199
        socket.getfqdn = self.real_getfqdn
200 201 202 203
        # indicate that the client is finished
        self.client_evt.set()
        # wait for the server thread to terminate
        self.serv_evt.wait()
204
        self.thread.join()
205 206
        # restore sys.stdout
        sys.stdout = self.old_stdout
207 208 209
        # restore DEBUGSTREAM
        smtpd.DEBUGSTREAM.close()
        smtpd.DEBUGSTREAM = self.old_DEBUGSTREAM
210 211 212

    def testBasic(self):
        # connect
Christian Heimes's avatar
Christian Heimes committed
213
        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
214 215
        smtp.quit()

216 217
    def testSourceAddress(self):
        # connect
218 219 220 221 222 223 224 225 226 227 228
        port = support.find_unused_port()
        try:
            smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
                    timeout=3, source_address=('127.0.0.1', port))
            self.assertEqual(smtp.source_address, ('127.0.0.1', port))
            self.assertEqual(smtp.local_hostname, 'localhost')
            smtp.quit()
        except IOError as e:
            if e.errno == errno.EADDRINUSE:
                self.skipTest("couldn't bind to port %d" % port)
            raise
229

230
    def testNOOP(self):
Christian Heimes's avatar
Christian Heimes committed
231
        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
232
        expected = (250, b'OK')
233 234 235 236
        self.assertEqual(smtp.noop(), expected)
        smtp.quit()

    def testRSET(self):
Christian Heimes's avatar
Christian Heimes committed
237
        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
238
        expected = (250, b'OK')
239 240 241 242 243
        self.assertEqual(smtp.rset(), expected)
        smtp.quit()

    def testNotImplemented(self):
        # EHLO isn't implemented in DebuggingServer
Christian Heimes's avatar
Christian Heimes committed
244
        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
245 246 247 248
        expected = (502, b'Error: command "EHLO" not implemented')
        self.assertEqual(smtp.ehlo(), expected)
        smtp.quit()

249 250 251 252 253 254 255 256
    def testNotImplemented(self):
        # EXPN isn't implemented in DebuggingServer
        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
        expected = (502, b'EXPN not implemented')
        smtp.putcmd('EXPN')
        self.assertEqual(smtp.getreply(), expected)
        smtp.quit()

257
    def testVRFY(self):
Christian Heimes's avatar
Christian Heimes committed
258
        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
259 260
        expected = (252, b'Cannot VRFY user, but will accept message ' + \
                         b'and attempt delivery')
261 262 263 264 265 266 267
        self.assertEqual(smtp.vrfy('nobody@nowhere.com'), expected)
        self.assertEqual(smtp.verify('nobody@nowhere.com'), expected)
        smtp.quit()

    def testSecondHELO(self):
        # check that a second HELO returns a message that it's a duplicate
        # (this behavior is specific to smtpd.SMTPChannel)
Christian Heimes's avatar
Christian Heimes committed
268
        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
269 270 271 272 273
        smtp.helo()
        expected = (503, b'Duplicate HELO/EHLO')
        self.assertEqual(smtp.helo(), expected)
        smtp.quit()

274
    def testHELP(self):
Christian Heimes's avatar
Christian Heimes committed
275
        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
276 277
        self.assertEqual(smtp.help(), b'Supported commands: EHLO HELO MAIL ' + \
                                      b'RCPT DATA RSET NOOP QUIT VRFY')
278 279 280 281 282
        smtp.quit()

    def testSend(self):
        # connect and send mail
        m = 'A test message'
Christian Heimes's avatar
Christian Heimes committed
283
        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
284
        smtp.sendmail('John', 'Sally', m)
285 286 287 288
        # XXX(nnorwitz): this test is flaky and dies with a bad file descriptor
        # in asyncore.  This sleep might help, but should really be fixed
        # properly by using an Event variable.
        time.sleep(0.01)
289 290 291 292 293 294 295 296
        smtp.quit()

        self.client_evt.set()
        self.serv_evt.wait()
        self.output.flush()
        mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END)
        self.assertEqual(self.output.getvalue(), mexpect)

297 298 299 300 301 302 303 304 305 306 307 308 309 310
    def testSendBinary(self):
        m = b'A test message'
        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
        smtp.sendmail('John', 'Sally', m)
        # XXX (see comment in testSend)
        time.sleep(0.01)
        smtp.quit()

        self.client_evt.set()
        self.serv_evt.wait()
        self.output.flush()
        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.decode('ascii'), MSG_END)
        self.assertEqual(self.output.getvalue(), mexpect)

311 312 313 314 315 316 317 318 319 320 321 322 323 324 325
    def testSendNeedingDotQuote(self):
        # Issue 12283
        m = '.A test\n.mes.sage.'
        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
        smtp.sendmail('John', 'Sally', m)
        # XXX (see comment in testSend)
        time.sleep(0.01)
        smtp.quit()

        self.client_evt.set()
        self.serv_evt.wait()
        self.output.flush()
        mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END)
        self.assertEqual(self.output.getvalue(), mexpect)

326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342
    def testSendNullSender(self):
        m = 'A test message'
        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
        smtp.sendmail('<>', 'Sally', m)
        # XXX (see comment in testSend)
        time.sleep(0.01)
        smtp.quit()

        self.client_evt.set()
        self.serv_evt.wait()
        self.output.flush()
        mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END)
        self.assertEqual(self.output.getvalue(), mexpect)
        debugout = smtpd.DEBUGSTREAM.getvalue()
        sender = re.compile("^sender: <>$", re.MULTILINE)
        self.assertRegex(debugout, sender)

343 344 345 346 347 348 349 350 351 352 353 354
    def testSendMessage(self):
        m = email.mime.text.MIMEText('A test message')
        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
        smtp.send_message(m, from_addr='John', to_addrs='Sally')
        # XXX (see comment in testSend)
        time.sleep(0.01)
        smtp.quit()

        self.client_evt.set()
        self.serv_evt.wait()
        self.output.flush()
        # Add the X-Peer header that DebuggingServer adds
355
        m['X-Peer'] = socket.gethostbyname('localhost')
356 357 358 359 360 361 362 363 364 365 366 367 368 369
        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
        self.assertEqual(self.output.getvalue(), mexpect)

    def testSendMessageWithAddresses(self):
        m = email.mime.text.MIMEText('A test message')
        m['From'] = 'foo@bar.com'
        m['To'] = 'John'
        m['CC'] = 'Sally, Fred'
        m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>'
        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
        smtp.send_message(m)
        # XXX (see comment in testSend)
        time.sleep(0.01)
        smtp.quit()
370 371 372
        # make sure the Bcc header is still in the message.
        self.assertEqual(m['Bcc'], 'John Root <root@localhost>, "Dinsdale" '
                                    '<warped@silly.walks.com>')
373 374 375 376 377

        self.client_evt.set()
        self.serv_evt.wait()
        self.output.flush()
        # Add the X-Peer header that DebuggingServer adds
378
        m['X-Peer'] = socket.gethostbyname('localhost')
379
        # The Bcc header should not be transmitted.
380 381 382 383 384
        del m['Bcc']
        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
        self.assertEqual(self.output.getvalue(), mexpect)
        debugout = smtpd.DEBUGSTREAM.getvalue()
        sender = re.compile("^sender: foo@bar.com$", re.MULTILINE)
385
        self.assertRegex(debugout, sender)
386 387 388 389
        for addr in ('John', 'Sally', 'Fred', 'root@localhost',
                     'warped@silly.walks.com'):
            to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
                                 re.MULTILINE)
390
            self.assertRegex(debugout, to_addr)
391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406

    def testSendMessageWithSomeAddresses(self):
        # Make sure nothing breaks if not all of the three 'to' headers exist
        m = email.mime.text.MIMEText('A test message')
        m['From'] = 'foo@bar.com'
        m['To'] = 'John, Dinsdale'
        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
        smtp.send_message(m)
        # XXX (see comment in testSend)
        time.sleep(0.01)
        smtp.quit()

        self.client_evt.set()
        self.serv_evt.wait()
        self.output.flush()
        # Add the X-Peer header that DebuggingServer adds
407
        m['X-Peer'] = socket.gethostbyname('localhost')
408 409 410 411
        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
        self.assertEqual(self.output.getvalue(), mexpect)
        debugout = smtpd.DEBUGSTREAM.getvalue()
        sender = re.compile("^sender: foo@bar.com$", re.MULTILINE)
412
        self.assertRegex(debugout, sender)
413 414 415
        for addr in ('John', 'Dinsdale'):
            to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
                                 re.MULTILINE)
416
            self.assertRegex(debugout, to_addr)
417

418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523
    def testSendMessageWithSpecifiedAddresses(self):
        # Make sure addresses specified in call override those in message.
        m = email.mime.text.MIMEText('A test message')
        m['From'] = 'foo@bar.com'
        m['To'] = 'John, Dinsdale'
        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
        smtp.send_message(m, from_addr='joe@example.com', to_addrs='foo@example.net')
        # XXX (see comment in testSend)
        time.sleep(0.01)
        smtp.quit()

        self.client_evt.set()
        self.serv_evt.wait()
        self.output.flush()
        # Add the X-Peer header that DebuggingServer adds
        m['X-Peer'] = socket.gethostbyname('localhost')
        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
        self.assertEqual(self.output.getvalue(), mexpect)
        debugout = smtpd.DEBUGSTREAM.getvalue()
        sender = re.compile("^sender: joe@example.com$", re.MULTILINE)
        self.assertRegex(debugout, sender)
        for addr in ('John', 'Dinsdale'):
            to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
                                 re.MULTILINE)
            self.assertNotRegex(debugout, to_addr)
        recip = re.compile(r"^recips: .*'foo@example.net'.*$", re.MULTILINE)
        self.assertRegex(debugout, recip)

    def testSendMessageWithMultipleFrom(self):
        # Sender overrides To
        m = email.mime.text.MIMEText('A test message')
        m['From'] = 'Bernard, Bianca'
        m['Sender'] = 'the_rescuers@Rescue-Aid-Society.com'
        m['To'] = 'John, Dinsdale'
        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
        smtp.send_message(m)
        # XXX (see comment in testSend)
        time.sleep(0.01)
        smtp.quit()

        self.client_evt.set()
        self.serv_evt.wait()
        self.output.flush()
        # Add the X-Peer header that DebuggingServer adds
        m['X-Peer'] = socket.gethostbyname('localhost')
        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
        self.assertEqual(self.output.getvalue(), mexpect)
        debugout = smtpd.DEBUGSTREAM.getvalue()
        sender = re.compile("^sender: the_rescuers@Rescue-Aid-Society.com$", re.MULTILINE)
        self.assertRegex(debugout, sender)
        for addr in ('John', 'Dinsdale'):
            to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
                                 re.MULTILINE)
            self.assertRegex(debugout, to_addr)

    def testSendMessageResent(self):
        m = email.mime.text.MIMEText('A test message')
        m['From'] = 'foo@bar.com'
        m['To'] = 'John'
        m['CC'] = 'Sally, Fred'
        m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>'
        m['Resent-Date'] = 'Thu, 1 Jan 1970 17:42:00 +0000'
        m['Resent-From'] = 'holy@grail.net'
        m['Resent-To'] = 'Martha <my_mom@great.cooker.com>, Jeff'
        m['Resent-Bcc'] = 'doe@losthope.net'
        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
        smtp.send_message(m)
        # XXX (see comment in testSend)
        time.sleep(0.01)
        smtp.quit()

        self.client_evt.set()
        self.serv_evt.wait()
        self.output.flush()
        # The Resent-Bcc headers are deleted before serialization.
        del m['Bcc']
        del m['Resent-Bcc']
        # Add the X-Peer header that DebuggingServer adds
        m['X-Peer'] = socket.gethostbyname('localhost')
        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
        self.assertEqual(self.output.getvalue(), mexpect)
        debugout = smtpd.DEBUGSTREAM.getvalue()
        sender = re.compile("^sender: holy@grail.net$", re.MULTILINE)
        self.assertRegex(debugout, sender)
        for addr in ('my_mom@great.cooker.com', 'Jeff', 'doe@losthope.net'):
            to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
                                 re.MULTILINE)
            self.assertRegex(debugout, to_addr)

    def testSendMessageMultipleResentRaises(self):
        m = email.mime.text.MIMEText('A test message')
        m['From'] = 'foo@bar.com'
        m['To'] = 'John'
        m['CC'] = 'Sally, Fred'
        m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>'
        m['Resent-Date'] = 'Thu, 1 Jan 1970 17:42:00 +0000'
        m['Resent-From'] = 'holy@grail.net'
        m['Resent-To'] = 'Martha <my_mom@great.cooker.com>, Jeff'
        m['Resent-Bcc'] = 'doe@losthope.net'
        m['Resent-Date'] = 'Thu, 2 Jan 1970 17:42:00 +0000'
        m['Resent-To'] = 'holy@grail.net'
        m['Resent-From'] = 'Martha <my_mom@great.cooker.com>, Jeff'
        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
        with self.assertRaises(ValueError):
            smtp.send_message(m)
        smtp.close()
524

525
class NonConnectingTests(unittest.TestCase):
Christian Heimes's avatar
Christian Heimes committed
526

527 528 529 530 531 532
    def setUp(self):
        smtplib.socket = mock_socket

    def tearDown(self):
        smtplib.socket = socket

Christian Heimes's avatar
Christian Heimes committed
533 534 535 536 537 538 539 540 541 542 543 544
    def testNotConnected(self):
        # Test various operations on an unconnected SMTP object that
        # should raise exceptions (at present the attempt in SMTP.send
        # to reference the nonexistent 'sock' attribute of the SMTP object
        # causes an AttributeError)
        smtp = smtplib.SMTP()
        self.assertRaises(smtplib.SMTPServerDisconnected, smtp.ehlo)
        self.assertRaises(smtplib.SMTPServerDisconnected,
                          smtp.send, 'test msg')

    def testNonnumericPort(self):
        # check that non-numeric port raises socket.error
545
        self.assertRaises(mock_socket.error, smtplib.SMTP,
Christian Heimes's avatar
Christian Heimes committed
546
                          "localhost", "bogus")
547
        self.assertRaises(mock_socket.error, smtplib.SMTP,
Christian Heimes's avatar
Christian Heimes committed
548 549 550
                          "localhost:bogus")


551
# test response of client to a non-successful HELO message
552 553
@unittest.skipUnless(threading, 'Threading required for this test.')
class BadHELOServerTests(unittest.TestCase):
554 555

    def setUp(self):
556 557
        smtplib.socket = mock_socket
        mock_socket.reply_with(b"199 no hello for you!")
558 559 560
        self.old_stdout = sys.stdout
        self.output = io.StringIO()
        sys.stdout = self.output
561
        self.port = 25
562 563

    def tearDown(self):
564
        smtplib.socket = socket
565 566 567 568
        sys.stdout = self.old_stdout

    def testFailingHELO(self):
        self.assertRaises(smtplib.SMTPConnectError, smtplib.SMTP,
Christian Heimes's avatar
Christian Heimes committed
569
                            HOST, self.port, 'localhost', 3)
570

571 572

sim_users = {'Mr.A@somewhere.com':'John A',
573
             'Ms.B@xn--fo-fka.com':'Sally B',
574 575 576
             'Mrs.C@somewhereesle.com':'Ruth C',
            }

577
sim_auth = ('Mr.A@somewhere.com', 'somepassword')
578 579 580 581 582 583 584 585 586
sim_cram_md5_challenge = ('PENCeUxFREJoU0NnbmhNWitOMjNGNn'
                          'dAZWx3b29kLmlubm9zb2Z0LmNvbT4=')
sim_auth_credentials = {
    'login': 'TXIuQUBzb21ld2hlcmUuY29t',
    'plain': 'AE1yLkFAc29tZXdoZXJlLmNvbQBzb21lcGFzc3dvcmQ=',
    'cram-md5': ('TXIUQUBZB21LD2HLCMUUY29TIDG4OWQ0MJ'
                 'KWZGQ4ODNMNDA4NTGXMDRLZWMYZJDMODG1'),
    }
sim_auth_login_password = 'C29TZXBHC3N3B3JK'
587

588
sim_lists = {'list-1':['Mr.A@somewhere.com','Mrs.C@somewhereesle.com'],
589
             'list-2':['Ms.B@xn--fo-fka.com',],
590 591 592 593
            }

# Simulated SMTP channel & server
class SimSMTPChannel(smtpd.SMTPChannel):
594

595 596 597
    # For testing failures in QUIT when using the context manager API.
    quit_response = None

598 599 600
    def __init__(self, extra_features, *args, **kw):
        self._extrafeatures = ''.join(
            [ "250-{0}\r\n".format(x) for x in extra_features ])
601 602
        super(SimSMTPChannel, self).__init__(*args, **kw)

603
    def smtp_EHLO(self, arg):
604 605 606 607 608 609
        resp = ('250-testhost\r\n'
                '250-EXPN\r\n'
                '250-SIZE 20000000\r\n'
                '250-STARTTLS\r\n'
                '250-DELIVERBY\r\n')
        resp = resp + self._extrafeatures + '250 HELP'
610 611 612
        self.push(resp)

    def smtp_VRFY(self, arg):
613 614 615
        # For max compatibility smtplib should be sending the raw address.
        if arg in sim_users:
            self.push('250 %s %s' % (sim_users[arg], smtplib.quoteaddr(arg)))
616 617 618 619
        else:
            self.push('550 No such user: %s' % arg)

    def smtp_EXPN(self, arg):
620
        list_name = arg.lower()
621 622 623 624 625 626 627 628 629 630 631
        if list_name in sim_lists:
            user_list = sim_lists[list_name]
            for n, user_email in enumerate(user_list):
                quoted_addr = smtplib.quoteaddr(user_email)
                if n < len(user_list) - 1:
                    self.push('250-%s %s' % (sim_users[user_email], quoted_addr))
                else:
                    self.push('250 %s %s' % (sim_users[user_email], quoted_addr))
        else:
            self.push('550 No access for you!')

632
    def smtp_AUTH(self, arg):
633 634 635
        if arg.strip().lower()=='cram-md5':
            self.push('334 {}'.format(sim_cram_md5_challenge))
            return
636
        mech, auth = arg.split()
637 638
        mech = mech.lower()
        if mech not in sim_auth_credentials:
639
            self.push('504 auth type unimplemented')
640 641 642 643 644 645 646
            return
        if mech == 'plain' and auth==sim_auth_credentials['plain']:
            self.push('235 plain auth ok')
        elif mech=='login' and auth==sim_auth_credentials['login']:
            self.push('334 Password:')
        else:
            self.push('550 No access for you!')
647

648 649 650 651 652 653 654 655
    def smtp_QUIT(self, arg):
        # args is ignored
        if self.quit_response is None:
            super(SimSMTPChannel, self).smtp_QUIT(arg)
        else:
            self.push(self.quit_response)
            self.close_when_done()

656 657 658
    def handle_error(self):
        raise

659 660

class SimSMTPServer(smtpd.SMTPServer):
661

662 663 664
    # For testing failures in QUIT when using the context manager API.
    quit_response = None

665 666 667 668
    def __init__(self, *args, **kw):
        self._extra_features = []
        smtpd.SMTPServer.__init__(self, *args, **kw)

669
    def handle_accepted(self, conn, addr):
670 671 672
        self._SMTPchannel = SimSMTPChannel(
            self._extra_features, self, conn, addr)
        self._SMTPchannel.quit_response = self.quit_response
673 674 675 676

    def process_message(self, peer, mailfrom, rcpttos, data):
        pass

677
    def add_feature(self, feature):
678
        self._extra_features.append(feature)
679

680 681 682
    def handle_error(self):
        raise

683 684 685

# Test various SMTP & ESMTP commands/behaviors that require a simulated server
# (i.e., something with more features than DebuggingServer)
686 687
@unittest.skipUnless(threading, 'Threading required for this test.')
class SMTPSimTests(unittest.TestCase):
688 689

    def setUp(self):
690 691
        self.real_getfqdn = socket.getfqdn
        socket.getfqdn = mock_socket.getfqdn
692 693
        self.serv_evt = threading.Event()
        self.client_evt = threading.Event()
694 695 696 697
        # Pick a random unused port by passing 0 for the port number
        self.serv = SimSMTPServer((HOST, 0), ('nowhere', -1))
        # Keep a note of what port was assigned
        self.port = self.serv.socket.getsockname()[1]
Christian Heimes's avatar
Christian Heimes committed
698
        serv_args = (self.serv, self.serv_evt, self.client_evt)
699 700
        self.thread = threading.Thread(target=debugging_server, args=serv_args)
        self.thread.start()
701 702

        # wait until server thread has assigned a port number
Christian Heimes's avatar
Christian Heimes committed
703 704
        self.serv_evt.wait()
        self.serv_evt.clear()
705 706

    def tearDown(self):
707
        socket.getfqdn = self.real_getfqdn
708 709 710 711
        # indicate that the client is finished
        self.client_evt.set()
        # wait for the server thread to terminate
        self.serv_evt.wait()
712
        self.thread.join()
713 714 715

    def testBasic(self):
        # smoke test
Christian Heimes's avatar
Christian Heimes committed
716
        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
717 718 719
        smtp.quit()

    def testEHLO(self):
Christian Heimes's avatar
Christian Heimes committed
720
        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740

        # no features should be present before the EHLO
        self.assertEqual(smtp.esmtp_features, {})

        # features expected from the test server
        expected_features = {'expn':'',
                             'size': '20000000',
                             'starttls': '',
                             'deliverby': '',
                             'help': '',
                             }

        smtp.ehlo()
        self.assertEqual(smtp.esmtp_features, expected_features)
        for k in expected_features:
            self.assertTrue(smtp.has_extn(k))
        self.assertFalse(smtp.has_extn('unsupported-feature'))
        smtp.quit()

    def testVRFY(self):
Christian Heimes's avatar
Christian Heimes committed
741
        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
742 743 744

        for email, name in sim_users.items():
            expected_known = (250, bytes('%s %s' %
745 746
                                         (name, smtplib.quoteaddr(email)),
                                         "ascii"))
747 748 749
            self.assertEqual(smtp.vrfy(email), expected_known)

        u = 'nobody@nowhere.com'
750
        expected_unknown = (550, ('No such user: %s' % u).encode('ascii'))
751 752 753 754
        self.assertEqual(smtp.vrfy(u), expected_unknown)
        smtp.quit()

    def testEXPN(self):
Christian Heimes's avatar
Christian Heimes committed
755
        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
756 757 758 759 760

        for listname, members in sim_lists.items():
            users = []
            for m in members:
                users.append('%s %s' % (sim_users[m], smtplib.quoteaddr(m)))
761
            expected_known = (250, bytes('\n'.join(users), "ascii"))
762 763 764 765 766 767 768
            self.assertEqual(smtp.expn(listname), expected_known)

        u = 'PSU-Members-List'
        expected_unknown = (550, b'No access for you!')
        self.assertEqual(smtp.expn(u), expected_unknown)
        smtp.quit()

769 770
    def testAUTH_PLAIN(self):
        self.serv.add_feature("AUTH PLAIN")
771
        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
772

773
        expected_auth_ok = (235, b'plain auth ok')
774
        self.assertEqual(smtp.login(sim_auth[0], sim_auth[1]), expected_auth_ok)
775
        smtp.close()
776

777 778 779 780 781 782 783 784 785 786
    # SimSMTPChannel doesn't fully support LOGIN or CRAM-MD5 auth because they
    # require a synchronous read to obtain the credentials...so instead smtpd
    # sees the credential sent by smtplib's login method as an unknown command,
    # which results in smtplib raising an auth error.  Fortunately the error
    # message contains the encoded credential, so we can partially check that it
    # was generated correctly (partially, because the 'word' is uppercased in
    # the error message).

    def testAUTH_LOGIN(self):
        self.serv.add_feature("AUTH LOGIN")
787
        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
788 789
        try: smtp.login(sim_auth[0], sim_auth[1])
        except smtplib.SMTPAuthenticationError as err:
790
            self.assertIn(sim_auth_login_password, str(err))
791
        smtp.close()
792 793 794

    def testAUTH_CRAM_MD5(self):
        self.serv.add_feature("AUTH CRAM-MD5")
795
        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
796 797 798

        try: smtp.login(sim_auth[0], sim_auth[1])
        except smtplib.SMTPAuthenticationError as err:
799
            self.assertIn(sim_auth_credentials['cram-md5'], str(err))
800
        smtp.close()
801

802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820
    def test_with_statement(self):
        with smtplib.SMTP(HOST, self.port) as smtp:
            code, message = smtp.noop()
            self.assertEqual(code, 250)
        self.assertRaises(smtplib.SMTPServerDisconnected, smtp.send, b'foo')
        with smtplib.SMTP(HOST, self.port) as smtp:
            smtp.close()
        self.assertRaises(smtplib.SMTPServerDisconnected, smtp.send, b'foo')

    def test_with_statement_QUIT_failure(self):
        self.serv.quit_response = '421 QUIT FAILED'
        with self.assertRaises(smtplib.SMTPResponseException) as error:
            with smtplib.SMTP(HOST, self.port) as smtp:
                smtp.noop()
        self.assertEqual(error.exception.smtp_code, 421)
        self.assertEqual(error.exception.smtp_error, b'QUIT FAILED')
        # We don't need to clean up self.serv.quit_response because a new
        # server is always instantiated in the setUp().

821 822 823
    #TODO: add tests for correct AUTH method fallback now that the
    #test infrastructure can support it.

824

825
@support.reap_threads
826
def test_main(verbose=None):
827
    support.run_unittest(GeneralTests, DebuggingServerTests,
Christian Heimes's avatar
Christian Heimes committed
828
                              NonConnectingTests,
829
                              BadHELOServerTests, SMTPSimTests)
830 831 832

if __name__ == '__main__':
    test_main()