test_defect_handling.py 11.1 KB
Newer Older
1 2
import textwrap
import unittest
3 4
import contextlib
from email import policy
5 6 7 8
from email import errors
from test.test_email import TestEmailBase


9
class TestDefectsBase:
10

11 12
    policy = policy.default
    raise_expected = False
13

14 15 16
    @contextlib.contextmanager
    def _raise_point(self, defect):
        yield
17

18 19 20 21 22 23 24 25
    def test_same_boundary_inner_outer(self):
        source = textwrap.dedent("""\
            Subject: XX
            From: xx@xx.dk
            To: XX
            Mime-version: 1.0
            Content-type: multipart/mixed;
               boundary="MS_Mac_OE_3071477847_720252_MIME_Part"
26

27 28 29
            --MS_Mac_OE_3071477847_720252_MIME_Part
            Content-type: multipart/alternative;
               boundary="MS_Mac_OE_3071477847_720252_MIME_Part"
30

31 32 33
            --MS_Mac_OE_3071477847_720252_MIME_Part
            Content-type: text/plain; charset="ISO-8859-1"
            Content-transfer-encoding: quoted-printable
34

35
            text
36

37 38 39
            --MS_Mac_OE_3071477847_720252_MIME_Part
            Content-type: text/html; charset="ISO-8859-1"
            Content-transfer-encoding: quoted-printable
40

41
            <HTML></HTML>
42

43
            --MS_Mac_OE_3071477847_720252_MIME_Part--
44

45 46 47 48
            --MS_Mac_OE_3071477847_720252_MIME_Part
            Content-type: image/gif; name="xx.gif";
            Content-disposition: attachment
            Content-transfer-encoding: base64
49

50
            Some removed base64 encoded chars.
51

52 53 54
            --MS_Mac_OE_3071477847_720252_MIME_Part--

            """)
55
        # XXX better would be to actually detect the duplicate.
56 57 58
        with self._raise_point(errors.StartBoundaryNotFoundDefect):
            msg = self._str_msg(source)
        if self.raise_expected: return
59 60 61
        inner = msg.get_payload(0)
        self.assertTrue(hasattr(inner, 'defects'))
        self.assertEqual(len(self.get_defects(inner)), 1)
62 63
        self.assertIsInstance(self.get_defects(inner)[0],
                              errors.StartBoundaryNotFoundDefect)
64

65 66 67 68 69 70 71
    def test_multipart_no_boundary(self):
        source = textwrap.dedent("""\
            Date: Fri, 6 Apr 2001 09:23:06 -0800 (GMT-0800)
            From: foobar
            Subject: broken mail
            MIME-Version: 1.0
            Content-Type: multipart/report; report-type=delivery-status;
72

73
            --JAB03225.986577786/zinfandel.lacita.com
74

75
            One part
76

77 78
            --JAB03225.986577786/zinfandel.lacita.com
            Content-Type: message/delivery-status
79

80
            Header: Another part
81

82 83 84 85 86
            --JAB03225.986577786/zinfandel.lacita.com--
            """)
        with self._raise_point(errors.NoBoundaryInMultipartDefect):
            msg = self._str_msg(source)
        if self.raise_expected: return
87
        self.assertIsInstance(msg.get_payload(), str)
88
        self.assertEqual(len(self.get_defects(msg)), 2)
89 90 91 92
        self.assertIsInstance(self.get_defects(msg)[0],
                              errors.NoBoundaryInMultipartDefect)
        self.assertIsInstance(self.get_defects(msg)[1],
                              errors.MultipartInvariantViolationDefect)
93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117

    multipart_msg = textwrap.dedent("""\
        Date: Wed, 14 Nov 2007 12:56:23 GMT
        From: foo@bar.invalid
        To: foo@bar.invalid
        Subject: Content-Transfer-Encoding: base64 and multipart
        MIME-Version: 1.0
        Content-Type: multipart/mixed;
            boundary="===============3344438784458119861=="{}

        --===============3344438784458119861==
        Content-Type: text/plain

        Test message

        --===============3344438784458119861==
        Content-Type: application/octet-stream
        Content-Transfer-Encoding: base64

        YWJj

        --===============3344438784458119861==--
        """)

    def test_multipart_invalid_cte(self):
118 119 120 121 122 123
        with self._raise_point(
                errors.InvalidMultipartContentTransferEncodingDefect):
            msg = self._str_msg(
                    self.multipart_msg.format(
                        "\nContent-Transfer-Encoding: base64"))
        if self.raise_expected: return
124 125 126 127 128
        self.assertEqual(len(self.get_defects(msg)), 1)
        self.assertIsInstance(self.get_defects(msg)[0],
            errors.InvalidMultipartContentTransferEncodingDefect)

    def test_multipart_no_cte_no_defect(self):
129
        if self.raise_expected: return
130 131 132 133
        msg = self._str_msg(self.multipart_msg.format(''))
        self.assertEqual(len(self.get_defects(msg)), 0)

    def test_multipart_valid_cte_no_defect(self):
134
        if self.raise_expected: return
135 136 137 138 139 140
        for cte in ('7bit', '8bit', 'BINary'):
            msg = self._str_msg(
                self.multipart_msg.format("\nContent-Transfer-Encoding: "+cte))
            self.assertEqual(len(self.get_defects(msg)), 0, "cte="+cte)

    def test_lying_multipart(self):
141 142 143 144 145 146 147 148 149 150 151 152 153
        source = textwrap.dedent("""\
            From: "Allison Dunlap" <xxx@example.com>
            To: yyy@example.com
            Subject: 64423
            Date: Sun, 11 Jul 2004 16:09:27 -0300
            MIME-Version: 1.0
            Content-Type: multipart/alternative;

            Blah blah blah
            """)
        with self._raise_point(errors.NoBoundaryInMultipartDefect):
            msg = self._str_msg(source)
        if self.raise_expected: return
154 155
        self.assertTrue(hasattr(msg, 'defects'))
        self.assertEqual(len(self.get_defects(msg)), 2)
156 157 158 159
        self.assertIsInstance(self.get_defects(msg)[0],
                              errors.NoBoundaryInMultipartDefect)
        self.assertIsInstance(self.get_defects(msg)[1],
                              errors.MultipartInvariantViolationDefect)
160

161 162 163 164 165
    def test_missing_start_boundary(self):
        source = textwrap.dedent("""\
            Content-Type: multipart/mixed; boundary="AAA"
            From: Mail Delivery Subsystem <xxx@example.com>
            To: yyy@example.com
166

167
            --AAA
168

169
            Stuff
170

171 172
            --AAA
            Content-Type: message/rfc822
173

174 175 176
            From: webmaster@python.org
            To: zzz@example.com
            Content-Type: multipart/mixed; boundary="BBB"
177

178
            --BBB--
179

180
            --AAA--
181

182
            """)
183 184 185 186 187 188 189 190
        # The message structure is:
        #
        # multipart/mixed
        #    text/plain
        #    message/rfc822
        #        multipart/mixed [*]
        #
        # [*] This message is missing its start boundary
191 192 193
        with self._raise_point(errors.StartBoundaryNotFoundDefect):
            outer = self._str_msg(source)
        if self.raise_expected: return
194 195
        bad = outer.get_payload(1).get_payload(0)
        self.assertEqual(len(self.get_defects(bad)), 1)
196 197
        self.assertIsInstance(self.get_defects(bad)[0],
                              errors.StartBoundaryNotFoundDefect)
198 199

    def test_first_line_is_continuation_header(self):
200 201 202
        with self._raise_point(errors.FirstHeaderLineIsContinuationDefect):
            msg = self._str_msg(' Line 1\nSubject: test\n\nbody')
        if self.raise_expected: return
203 204 205 206 207 208 209 210 211 212 213 214
        self.assertEqual(msg.keys(), ['Subject'])
        self.assertEqual(msg.get_payload(), 'body')
        self.assertEqual(len(self.get_defects(msg)), 1)
        self.assertDefectsEqual(self.get_defects(msg),
                                 [errors.FirstHeaderLineIsContinuationDefect])
        self.assertEqual(self.get_defects(msg)[0].line, ' Line 1\n')

    def test_missing_header_body_separator(self):
        # Our heuristic if we see a line that doesn't look like a header (no
        # leading whitespace but no ':') is to assume that the blank line that
        # separates the header from the body is missing, and to stop parsing
        # headers and start parsing the body.
215 216 217
        with self._raise_point(errors.MissingHeaderBodySeparatorDefect):
            msg = self._str_msg('Subject: test\nnot a header\nTo: abc\n\nb\n')
        if self.raise_expected: return
218 219 220 221 222 223
        self.assertEqual(msg.keys(), ['Subject'])
        self.assertEqual(msg.get_payload(), 'not a header\nTo: abc\n\nb\n')
        self.assertDefectsEqual(self.get_defects(msg),
                                [errors.MissingHeaderBodySeparatorDefect])

    def test_bad_padding_in_base64_payload(self):
224 225 226 227 228 229 230 231 232 233 234 235 236
        source = textwrap.dedent("""\
            Subject: test
            MIME-Version: 1.0
            Content-Type: text/plain; charset="utf-8"
            Content-Transfer-Encoding: base64

            dmk
            """)
        msg = self._str_msg(source)
        with self._raise_point(errors.InvalidBase64PaddingDefect):
            payload = msg.get_payload(decode=True)
        if self.raise_expected: return
        self.assertEqual(payload, b'vi')
237 238 239 240
        self.assertDefectsEqual(self.get_defects(msg),
                                [errors.InvalidBase64PaddingDefect])

    def test_invalid_chars_in_base64_payload(self):
241 242 243 244 245 246 247 248 249 250 251 252 253
        source = textwrap.dedent("""\
            Subject: test
            MIME-Version: 1.0
            Content-Type: text/plain; charset="utf-8"
            Content-Transfer-Encoding: base64

            dm\x01k===
            """)
        msg = self._str_msg(source)
        with self._raise_point(errors.InvalidBase64CharactersDefect):
            payload = msg.get_payload(decode=True)
        if self.raise_expected: return
        self.assertEqual(payload, b'vi')
254 255 256
        self.assertDefectsEqual(self.get_defects(msg),
                                [errors.InvalidBase64CharactersDefect])

257 258 259 260 261 262 263
    def test_missing_ending_boundary(self):
        source = textwrap.dedent("""\
            To: 1@harrydomain4.com
            Subject: Fwd: 1
            MIME-Version: 1.0
            Content-Type: multipart/alternative;
             boundary="------------000101020201080900040301"
264

265 266 267
            --------------000101020201080900040301
            Content-Type: text/plain; charset=ISO-8859-1
            Content-Transfer-Encoding: 7bit
268

269
            Alternative 1
270

271 272 273
            --------------000101020201080900040301
            Content-Type: text/html; charset=ISO-8859-1
            Content-Transfer-Encoding: 7bit
274

275
            Alternative 2
276

277 278 279 280
            """)
        with self._raise_point(errors.CloseBoundaryNotFoundDefect):
            msg = self._str_msg(source)
        if self.raise_expected: return
281 282 283 284 285
        self.assertEqual(len(msg.get_payload()), 2)
        self.assertEqual(msg.get_payload(1).get_payload(), 'Alternative 2\n')
        self.assertDefectsEqual(self.get_defects(msg),
                                [errors.CloseBoundaryNotFoundDefect])

286

287
class TestDefectDetection(TestDefectsBase, TestEmailBase):
288 289 290 291 292

    def get_defects(self, obj):
        return obj.defects


293
class TestDefectCapture(TestDefectsBase, TestEmailBase):
294

295
    class CapturePolicy(policy.EmailPolicy):
296 297 298 299 300 301 302 303 304 305 306
        captured = None
        def register_defect(self, obj, defect):
            self.captured.append(defect)

    def setUp(self):
        self.policy = self.CapturePolicy(captured=list())

    def get_defects(self, obj):
        return self.policy.captured


307 308 309 310 311 312 313 314 315 316 317 318
class TestDefectRaising(TestDefectsBase, TestEmailBase):

    policy = TestDefectsBase.policy
    policy = policy.clone(raise_on_defect=True)
    raise_expected = True

    @contextlib.contextmanager
    def _raise_point(self, defect):
        with self.assertRaises(defect):
            yield


319 320
if __name__ == '__main__':
    unittest.main()