Skip to content
Projeler
Gruplar
Parçacıklar
Yardım
Yükleniyor...
Oturum aç / Kaydol
Gezinmeyi değiştir
C
cpython
Proje
Proje
Ayrıntılar
Etkinlik
Cycle Analytics
Depo (repository)
Depo (repository)
Dosyalar
Kayıtlar (commit)
Dallar (branch)
Etiketler
Katkıda bulunanlar
Grafik
Karşılaştır
Grafikler
Konular (issue)
0
Konular (issue)
0
Liste
Pano
Etiketler
Kilometre Taşları
Birleştirme (merge) Talepleri
0
Birleştirme (merge) Talepleri
0
CI / CD
CI / CD
İş akışları (pipeline)
İşler
Zamanlamalar
Grafikler
Paketler
Paketler
Wiki
Wiki
Parçacıklar
Parçacıklar
Üyeler
Üyeler
Collapse sidebar
Close sidebar
Etkinlik
Grafik
Grafikler
Yeni bir konu (issue) oluştur
İşler
Kayıtlar (commit)
Konu (issue) Panoları
Kenar çubuğunu aç
Batuhan Osman TASKAYA
cpython
Commits
15e5d534
Kaydet (Commit)
15e5d534
authored
Tem 20, 2001
tarafından
Piers Lauder
Dosyalara gözat
Seçenekler
Dosyalara Gözat
İndir
Eposta Yamaları
Sade Fark
apply patch item #416253
üst
34d97059
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
115 additions
and
38 deletions
+115
-38
imaplib.py
Lib/imaplib.py
+115
-38
No files found.
Lib/imaplib.py
Dosyayı görüntüle @
15e5d534
...
...
@@ -14,8 +14,9 @@ Public functions: Internaldate2tuple
#
# Authentication code contributed by Donn Cave <donn@u.washington.edu> June 1998.
# String method conversion by ESR, February 2001.
# GET/SETACL contributed by Anthony Baxter <anthony@interlink.com.au> April 2001.
__version__
=
"2.4
0
"
__version__
=
"2.4
7
"
import
binascii
,
re
,
socket
,
time
,
random
,
sys
...
...
@@ -44,21 +45,24 @@ Commands = {
'EXAMINE'
:
(
'AUTH'
,
'SELECTED'
),
'EXPUNGE'
:
(
'SELECTED'
,),
'FETCH'
:
(
'SELECTED'
,),
'GETACL'
:
(
'AUTH'
,
'SELECTED'
),
'LIST'
:
(
'AUTH'
,
'SELECTED'
),
'LOGIN'
:
(
'NONAUTH'
,),
'LOGOUT'
:
(
'NONAUTH'
,
'AUTH'
,
'SELECTED'
,
'LOGOUT'
),
'LSUB'
:
(
'AUTH'
,
'SELECTED'
),
'NAMESPACE'
:
(
'AUTH'
,
'SELECTED'
),
'NOOP'
:
(
'NONAUTH'
,
'AUTH'
,
'SELECTED'
,
'LOGOUT'
),
'PARTIAL'
:
(
'SELECTED'
,),
'RENAME'
:
(
'AUTH'
,
'SELECTED'
),
'SEARCH'
:
(
'SELECTED'
,),
'SELECT'
:
(
'AUTH'
,
'SELECTED'
),
'SETACL'
:
(
'AUTH'
,
'SELECTED'
),
'SORT'
:
(
'SELECTED'
,),
'STATUS'
:
(
'AUTH'
,
'SELECTED'
),
'STORE'
:
(
'SELECTED'
,),
'SUBSCRIBE'
:
(
'AUTH'
,
'SELECTED'
),
'UID'
:
(
'SELECTED'
,),
'UNSUBSCRIBE'
:
(
'AUTH'
,
'SELECTED'
),
'NAMESPACE'
:
(
'AUTH'
,
'SELECTED'
),
}
# Patterns to match server responses
...
...
@@ -155,6 +159,7 @@ class IMAP4:
if
__debug__
:
if
self
.
debug
>=
1
:
_mesg
(
'imaplib version
%
s'
%
__version__
)
_mesg
(
'new IMAP4 connection, tag=
%
s'
%
self
.
tagpre
)
self
.
welcome
=
self
.
_get_response
()
...
...
@@ -187,21 +192,57 @@ class IMAP4:
def
__getattr__
(
self
,
attr
):
# Allow UPPERCASE variants of IMAP4 command methods.
if
Commands
.
has_key
(
attr
):
return
eval
(
"self.
%
s"
%
attr
.
lower
())
return
getattr
(
self
,
attr
.
lower
())
raise
AttributeError
(
"Unknown IMAP4 command: '
%
s'"
%
attr
)
#
Public
methods
#
Overridable
methods
def
open
(
self
,
host
,
port
):
"""Setup 'self.sock' and 'self.file'."""
"""Setup connection to remote server on "host:port".
This connection will be used by the routines:
read, readline, send, shutdown.
"""
self
.
sock
=
socket
.
socket
(
socket
.
AF_INET
,
socket
.
SOCK_STREAM
)
self
.
sock
.
connect
((
self
.
host
,
self
.
port
))
self
.
file
=
self
.
sock
.
makefile
(
'r'
)
def
read
(
self
,
size
):
"""Read 'size' bytes from remote."""
return
self
.
file
.
read
(
size
)
def
readline
(
self
):
"""Read line from remote."""
return
self
.
file
.
readline
()
def
send
(
self
,
data
):
"""Send data to remote."""
self
.
sock
.
send
(
data
)
def
shutdown
(
self
):
"""Close I/O established in "open"."""
self
.
file
.
close
()
self
.
sock
.
close
()
def
socket
(
self
):
"""Return socket instance used to connect to IMAP4 server.
socket = <instance>.socket()
"""
return
self
.
sock
# Utility methods
def
recent
(
self
):
"""Return most recent 'RECENT' responses if any exist,
else prompt server for an update using the 'NOOP' command.
...
...
@@ -229,14 +270,6 @@ class IMAP4:
return
self
.
_untagged_response
(
code
,
[
None
],
code
.
upper
())
def
socket
(
self
):
"""Return socket instance used to connect to IMAP4 server.
socket = <instance>.socket()
"""
return
self
.
sock
# IMAP4 commands
...
...
@@ -368,6 +401,15 @@ class IMAP4:
return
self
.
_untagged_response
(
typ
,
dat
,
name
)
def
getacl
(
self
,
mailbox
):
"""Get the ACLs for a mailbox.
(typ, [data]) = <instance>.getacl(mailbox)
"""
typ
,
dat
=
self
.
_simple_command
(
'GETACL'
,
mailbox
)
return
self
.
_untagged_response
(
typ
,
dat
,
'ACL'
)
def
list
(
self
,
directory
=
'""'
,
pattern
=
'*'
):
"""List mailbox names in directory matching pattern.
...
...
@@ -406,8 +448,7 @@ class IMAP4:
self
.
state
=
'LOGOUT'
try
:
typ
,
dat
=
self
.
_simple_command
(
'LOGOUT'
)
except
:
typ
,
dat
=
'NO'
,
[
'
%
s:
%
s'
%
sys
.
exc_info
()[:
2
]]
self
.
file
.
close
()
self
.
sock
.
close
()
self
.
shutdown
()
if
self
.
untagged_responses
.
has_key
(
'BYE'
):
return
'BYE'
,
self
.
untagged_responses
[
'BYE'
]
return
typ
,
dat
...
...
@@ -425,6 +466,16 @@ class IMAP4:
return
self
.
_untagged_response
(
typ
,
dat
,
name
)
def
namespace
(
self
):
""" Returns IMAP namespaces ala rfc2342
(typ, [data, ...]) = <instance>.namespace()
"""
name
=
'NAMESPACE'
typ
,
dat
=
self
.
_simple_command
(
name
)
return
self
.
_untagged_response
(
typ
,
dat
,
name
)
def
noop
(
self
):
"""Send NOOP command.
...
...
@@ -465,8 +516,9 @@ class IMAP4:
"""
name
=
'SEARCH'
if
charset
:
charset
=
'CHARSET '
+
charset
typ
,
dat
=
apply
(
self
.
_simple_command
,
(
name
,
charset
)
+
criteria
)
typ
,
dat
=
apply
(
self
.
_simple_command
,
(
name
,
'CHARSET'
,
charset
)
+
criteria
)
else
:
typ
,
dat
=
apply
(
self
.
_simple_command
,
(
name
,)
+
criteria
)
return
self
.
_untagged_response
(
typ
,
dat
,
name
)
...
...
@@ -500,14 +552,36 @@ class IMAP4:
return
typ
,
self
.
untagged_responses
.
get
(
'EXISTS'
,
[
None
])
def
setacl
(
self
,
mailbox
,
who
,
what
):
"""Set a mailbox acl.
(typ, [data]) = <instance>.create(mailbox, who, what)
"""
return
self
.
_simple_command
(
'SETACL'
,
mailbox
,
who
,
what
)
def
sort
(
self
,
sort_criteria
,
charset
,
*
search_criteria
):
"""IMAP4rev1 extension SORT command.
(typ, [data]) = <instance>.sort(sort_criteria, charset, search_criteria, ...)
"""
name
=
'SORT'
#if not name in self.capabilities: # Let the server decide!
# raise self.error('unimplemented extension command: %s' % name)
if
(
sort_criteria
[
0
],
sort_criteria
[
-
1
])
!=
(
'('
,
')'
):
sort_criteria
=
'(
%
s)'
%
sort_criteria
typ
,
dat
=
apply
(
self
.
_simple_command
,
(
name
,
sort_criteria
,
charset
)
+
search_criteria
)
return
self
.
_untagged_response
(
typ
,
dat
,
name
)
def
status
(
self
,
mailbox
,
names
):
"""Request named status conditions for mailbox.
(typ, [data]) = <instance>.status(mailbox, names)
"""
name
=
'STATUS'
if
self
.
PROTOCOL_VERSION
==
'IMAP4'
:
raise
self
.
error
(
'
%
s unimplemented in IMAP4 (obtain IMAP4rev1 server, or re-code)'
%
name
)
#if self.PROTOCOL_VERSION == 'IMAP4': # Let the server decide!
#
raise self.error('%s unimplemented in IMAP4 (obtain IMAP4rev1 server, or re-code)' % name)
typ
,
dat
=
self
.
_simple_command
(
name
,
mailbox
,
names
)
return
self
.
_untagged_response
(
typ
,
dat
,
name
)
...
...
@@ -547,8 +621,8 @@ class IMAP4:
%
(
command
,
self
.
state
))
name
=
'UID'
typ
,
dat
=
apply
(
self
.
_simple_command
,
(
name
,
command
)
+
args
)
if
command
==
'SEARCH'
:
name
=
'SEARCH'
if
command
in
(
'SEARCH'
,
'SORT'
)
:
name
=
command
else
:
name
=
'FETCH'
return
self
.
_untagged_response
(
typ
,
dat
,
name
)
...
...
@@ -566,18 +640,19 @@ class IMAP4:
"""Allow simple extension commands
notified by server in CAPABILITY response.
Assumes command is legal in current state.
(typ, [data]) = <instance>.xatom(name, arg, ...)
Returns response appropriate to extension command `name'.
"""
if
name
[
0
]
!=
'X'
or
not
name
in
self
.
capabilities
:
raise
self
.
error
(
'unknown extension command:
%
s'
%
name
)
name
=
name
.
upper
()
#if not name in self.capabilities: # Let the server decide!
# raise self.error('unknown extension command: %s' % name)
if
not
Commands
.
has_key
(
name
):
Commands
[
name
]
=
(
self
.
state
,)
return
apply
(
self
.
_simple_command
,
(
name
,)
+
args
)
def
namespace
(
self
):
""" Returns IMAP namespaces ala rfc2342
"""
name
=
'NAMESPACE'
typ
,
dat
=
self
.
_simple_command
(
name
)
return
self
.
_untagged_response
(
typ
,
dat
,
name
)
# Private methods
...
...
@@ -640,8 +715,8 @@ class IMAP4:
_log
(
'>
%
s'
%
data
)
try
:
self
.
s
ock
.
s
end
(
'
%
s
%
s'
%
(
data
,
CRLF
))
except
socket
.
error
,
val
:
self
.
send
(
'
%
s
%
s'
%
(
data
,
CRLF
))
except
(
socket
.
error
,
OSError
)
,
val
:
raise
self
.
abort
(
'socket error:
%
s'
%
val
)
if
literal
is
None
:
...
...
@@ -664,9 +739,9 @@ class IMAP4:
_mesg
(
'write literal size
%
s'
%
len
(
literal
))
try
:
self
.
s
ock
.
s
end
(
literal
)
self
.
s
ock
.
s
end
(
CRLF
)
except
socket
.
error
,
val
:
self
.
send
(
literal
)
self
.
send
(
CRLF
)
except
(
socket
.
error
,
OSError
)
,
val
:
raise
self
.
abort
(
'socket error:
%
s'
%
val
)
if
not
literator
:
...
...
@@ -741,7 +816,7 @@ class IMAP4:
if
__debug__
:
if
self
.
debug
>=
4
:
_mesg
(
'read literal size
%
s'
%
size
)
data
=
self
.
file
.
read
(
size
)
data
=
self
.
read
(
size
)
# Store response with literal as tuple
...
...
@@ -789,7 +864,7 @@ class IMAP4:
def
_get_line
(
self
):
line
=
self
.
file
.
readline
()
line
=
self
.
readline
()
if
not
line
:
raise
self
.
abort
(
'socket error: EOF'
)
...
...
@@ -1059,7 +1134,7 @@ if __name__ == '__main__':
host
=
args
[
0
]
USER
=
getpass
.
getuser
()
PASSWD
=
getpass
.
getpass
(
"IMAP password for
%
s on
%
s:"
%
(
USER
,
host
or
"localhost"
))
PASSWD
=
getpass
.
getpass
(
"IMAP password for
%
s on
%
s:
"
%
(
USER
,
host
or
"localhost"
))
test_mesg
=
'From:
%
s@localhost
\n
Subject: IMAP4 test
\n\n
data...
\n
'
%
USER
test_seq1
=
(
...
...
@@ -1073,6 +1148,7 @@ if __name__ == '__main__':
(
'search'
,
(
None
,
'SUBJECT'
,
'test'
)),
(
'partial'
,
(
'1'
,
'RFC822'
,
1
,
1024
)),
(
'store'
,
(
'1'
,
'FLAGS'
,
'(
\
Deleted)'
)),
(
'namespace'
,
()),
(
'expunge'
,
()),
(
'recent'
,
()),
(
'close'
,
()),
...
...
@@ -1090,13 +1166,14 @@ if __name__ == '__main__':
def
run
(
cmd
,
args
):
_mesg
(
'
%
s
%
s'
%
(
cmd
,
args
))
typ
,
dat
=
apply
(
eval
(
'M.
%
s'
%
cmd
),
args
)
typ
,
dat
=
apply
(
getattr
(
M
,
cmd
),
args
)
_mesg
(
'
%
s =>
%
s
%
s'
%
(
cmd
,
typ
,
dat
))
return
dat
try
:
M
=
IMAP4
(
host
)
_mesg
(
'PROTOCOL_VERSION =
%
s'
%
M
.
PROTOCOL_VERSION
)
_mesg
(
'CAPABILITIES =
%
s'
%
`M.capabilities`
)
for
cmd
,
args
in
test_seq1
:
run
(
cmd
,
args
)
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment