Skip to content
Projeler
Gruplar
Parçacıklar
Yardım
Yükleniyor...
Oturum aç / Kaydol
Gezinmeyi değiştir
C
cpython
Proje
Proje
Ayrıntılar
Etkinlik
Cycle Analytics
Depo (repository)
Depo (repository)
Dosyalar
Kayıtlar (commit)
Dallar (branch)
Etiketler
Katkıda bulunanlar
Grafik
Karşılaştır
Grafikler
Konular (issue)
0
Konular (issue)
0
Liste
Pano
Etiketler
Kilometre Taşları
Birleştirme (merge) Talepleri
0
Birleştirme (merge) Talepleri
0
CI / CD
CI / CD
İş akışları (pipeline)
İşler
Zamanlamalar
Grafikler
Paketler
Paketler
Wiki
Wiki
Parçacıklar
Parçacıklar
Üyeler
Üyeler
Collapse sidebar
Close sidebar
Etkinlik
Grafik
Grafikler
Yeni bir konu (issue) oluştur
İşler
Kayıtlar (commit)
Konu (issue) Panoları
Kenar çubuğunu aç
Batuhan Osman TASKAYA
cpython
Commits
ce7c9781
Kaydet (Commit)
ce7c9781
authored
May 17, 2011
tarafından
Vinay Sajip
Dosyalara gözat
Seçenekler
Dosyalara Gözat
İndir
Eposta Yamaları
Sade Fark
Skip some more tests in the absence of threading.
üst
362ce379
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
263 additions
and
260 deletions
+263
-260
test_logging.py
Lib/test/test_logging.py
+263
-260
No files found.
Lib/test/test_logging.py
Dosyayı görüntüle @
ce7c9781
...
...
@@ -25,24 +25,17 @@ import logging
import
logging.handlers
import
logging.config
import
asynchat
import
asyncore
import
codecs
import
datetime
import
errno
import
pickle
import
io
import
gc
from
http.server
import
HTTPServer
,
BaseHTTPRequestHandler
import
json
import
os
import
queue
import
re
import
select
import
smtpd
import
socket
from
socketserver
import
(
ThreadingUDPServer
,
DatagramRequestHandler
,
ThreadingTCPServer
,
StreamRequestHandler
)
import
struct
import
sys
import
tempfile
...
...
@@ -51,11 +44,19 @@ from test.support import TestHandler, Matcher
import
textwrap
import
time
import
unittest
from
urllib.parse
import
urlparse
,
parse_qs
import
warnings
import
weakref
try
:
import
threading
# The following imports are needed only for tests which
import
asynchat
import
asyncore
import
errno
from
http.server
import
HTTPServer
,
BaseHTTPRequestHandler
import
smtpd
from
urllib.parse
import
urlparse
,
parse_qs
from
socketserver
import
(
ThreadingUDPServer
,
DatagramRequestHandler
,
ThreadingTCPServer
,
StreamRequestHandler
)
except
ImportError
:
threading
=
None
try
:
...
...
@@ -611,284 +612,286 @@ class StreamHandlerTest(BaseTest):
# -- The following section could be moved into a server_helper.py module
# -- if it proves to be of wider utility than just test_logging
class
TestSMTPChannel
(
smtpd
.
SMTPChannel
):
"""
This derived class has had to be created because smtpd does not
support use of custom channel maps, although they are allowed by
asyncore's design. Issue #11959 has been raised to address this,
and if resolved satisfactorily, some of this code can be removed.
"""
def
__init__
(
self
,
server
,
conn
,
addr
,
sockmap
):
asynchat
.
async_chat
.
__init__
(
self
,
conn
,
sockmap
)
self
.
smtp_server
=
server
self
.
conn
=
conn
self
.
addr
=
addr
self
.
received_lines
=
[]
self
.
smtp_state
=
self
.
COMMAND
self
.
seen_greeting
=
''
self
.
mailfrom
=
None
self
.
rcpttos
=
[]
self
.
received_data
=
''
self
.
fqdn
=
socket
.
getfqdn
()
self
.
num_bytes
=
0
try
:
self
.
peer
=
conn
.
getpeername
()
except
socket
.
error
as
err
:
# a race condition may occur if the other end is closing
# before we can get the peername
self
.
close
()
if
err
.
args
[
0
]
!=
errno
.
ENOTCONN
:
raise
return
self
.
push
(
'220
%
s
%
s'
%
(
self
.
fqdn
,
smtpd
.
__version__
))
self
.
set_terminator
(
b
'
\r\n
'
)
class
TestSMTPServer
(
smtpd
.
SMTPServer
):
"""
This class implements a test SMTP server.
:param addr: A (host, port) tuple which the server listens on.
You can specify a port value of zero: the server's
*port* attribute will hold the actual port number
used, which can be used in client connections.
:param handler: A callable which will be called to process
incoming messages. The handler will be passed
the client address tuple, who the message is from,
a list of recipients and the message data.
:param poll_interval: The interval, in seconds, used in the underlying
:func:`select` or :func:`poll` call by
:func:`asyncore.loop`.
:param sockmap: A dictionary which will be used to hold
:class:`asyncore.dispatcher` instances used by
:func:`asyncore.loop`. This avoids changing the
:mod:`asyncore` module's global state.
"""
channel_class
=
TestSMTPChannel
def
__init__
(
self
,
addr
,
handler
,
poll_interval
,
sockmap
):
self
.
_localaddr
=
addr
self
.
_remoteaddr
=
None
self
.
sockmap
=
sockmap
asyncore
.
dispatcher
.
__init__
(
self
,
map
=
sockmap
)
try
:
sock
=
socket
.
socket
(
socket
.
AF_INET
,
socket
.
SOCK_STREAM
)
sock
.
setblocking
(
0
)
self
.
set_socket
(
sock
,
map
=
sockmap
)
# try to re-use a server port if possible
self
.
set_reuse_addr
()
self
.
bind
(
addr
)
self
.
port
=
sock
.
getsockname
()[
1
]
self
.
listen
(
5
)
except
:
self
.
close
()
raise
self
.
_handler
=
handler
self
.
_thread
=
None
self
.
poll_interval
=
poll_interval
def
handle_accepted
(
self
,
conn
,
addr
):
if
threading
:
class
TestSMTPChannel
(
smtpd
.
SMTPChannel
):
"""
Redefined only because the base class does not pass in a
map, forcing use of a global in :mod:`asyncore`.
This derived class has had to be created because smtpd does not
support use of custom channel maps, although they are allowed by
asyncore's design. Issue #11959 has been raised to address this,
and if resolved satisfactorily, some of this code can be removed.
"""
channel
=
self
.
channel_class
(
self
,
conn
,
addr
,
self
.
sockmap
)
def
process_message
(
self
,
peer
,
mailfrom
,
rcpttos
,
data
):
"""
Delegates to the handler passed in to the server's constructor.
Typically, this will be a test case method.
:param peer: The client (host, port) tuple.
:param mailfrom: The address of the sender.
:param rcpttos: The addresses of the recipients.
:param data: The message.
"""
self
.
_handler
(
peer
,
mailfrom
,
rcpttos
,
data
)
def
start
(
self
):
"""
Start the server running on a separate daemon thread.
"""
self
.
_thread
=
t
=
threading
.
Thread
(
target
=
self
.
serve_forever
,
args
=
(
self
.
poll_interval
,))
t
.
setDaemon
(
True
)
t
.
start
()
def
serve_forever
(
self
,
poll_interval
):
def
__init__
(
self
,
server
,
conn
,
addr
,
sockmap
):
asynchat
.
async_chat
.
__init__
(
self
,
conn
,
sockmap
)
self
.
smtp_server
=
server
self
.
conn
=
conn
self
.
addr
=
addr
self
.
received_lines
=
[]
self
.
smtp_state
=
self
.
COMMAND
self
.
seen_greeting
=
''
self
.
mailfrom
=
None
self
.
rcpttos
=
[]
self
.
received_data
=
''
self
.
fqdn
=
socket
.
getfqdn
()
self
.
num_bytes
=
0
try
:
self
.
peer
=
conn
.
getpeername
()
except
socket
.
error
as
err
:
# a race condition may occur if the other end is closing
# before we can get the peername
self
.
close
()
if
err
.
args
[
0
]
!=
errno
.
ENOTCONN
:
raise
return
self
.
push
(
'220
%
s
%
s'
%
(
self
.
fqdn
,
smtpd
.
__version__
))
self
.
set_terminator
(
b
'
\r\n
'
)
class
TestSMTPServer
(
smtpd
.
SMTPServer
):
"""
Run the :mod:`asyncore` loop until normal termination
conditions arise.
This class implements a test SMTP server.
:param addr: A (host, port) tuple which the server listens on.
You can specify a port value of zero: the server's
*port* attribute will hold the actual port number
used, which can be used in client connections.
:param handler: A callable which will be called to process
incoming messages. The handler will be passed
the client address tuple, who the message is from,
a list of recipients and the message data.
:param poll_interval: The interval, in seconds, used in the underlying
:func:`select` or :func:`poll` call by
:func:`asyncore.loop`.
:param sockmap: A dictionary which will be used to hold
:class:`asyncore.dispatcher` instances used by
:func:`asyncore.loop`. This avoids changing the
:mod:`asyncore` module's global state.
"""
try
:
asyncore
.
loop
(
poll_interval
,
map
=
self
.
sockmap
)
except
select
.
error
:
# On FreeBSD 8, closing the server repeatably
# raises this error. We swallow it if the
# server has been closed.
if
self
.
connected
or
self
.
accepting
:
raise
def
stop
(
self
,
timeout
=
None
):
"""
Stop the thread by closing the server instance.
Wait for the server thread to terminate.
channel_class
=
TestSMTPChannel
:param timeout: How long to wait for the server thread
to terminate.
"""
self
.
close
()
self
.
_thread
.
join
(
timeout
)
self
.
_thread
=
None
class
ControlMixin
(
object
):
"""
This mixin is used to start a server on a separate thread, and
shut it down programmatically. Request handling is simplified - instead
of needing to derive a suitable RequestHandler subclass, you just
provide a callable which will be passed each received request to be
processed.
:param handler: A handler callable which will be called with a
single parameter - the request - in order to
process the request. This handler is called on the
server thread, effectively meaning that requests are
processed serially. While not quite Web scale ;-),
this should be fine for testing applications.
:param poll_interval: The polling interval in seconds.
"""
def
__init__
(
self
,
handler
,
poll_interval
):
self
.
_thread
=
None
self
.
poll_interval
=
poll_interval
self
.
_handler
=
handler
self
.
ready
=
threading
.
Event
()
def
__init__
(
self
,
addr
,
handler
,
poll_interval
,
sockmap
):
self
.
_localaddr
=
addr
self
.
_remoteaddr
=
None
self
.
sockmap
=
sockmap
asyncore
.
dispatcher
.
__init__
(
self
,
map
=
sockmap
)
try
:
sock
=
socket
.
socket
(
socket
.
AF_INET
,
socket
.
SOCK_STREAM
)
sock
.
setblocking
(
0
)
self
.
set_socket
(
sock
,
map
=
sockmap
)
# try to re-use a server port if possible
self
.
set_reuse_addr
()
self
.
bind
(
addr
)
self
.
port
=
sock
.
getsockname
()[
1
]
self
.
listen
(
5
)
except
:
self
.
close
()
raise
self
.
_handler
=
handler
self
.
_thread
=
None
self
.
poll_interval
=
poll_interval
def
handle_accepted
(
self
,
conn
,
addr
):
"""
Redefined only because the base class does not pass in a
map, forcing use of a global in :mod:`asyncore`.
"""
channel
=
self
.
channel_class
(
self
,
conn
,
addr
,
self
.
sockmap
)
def
process_message
(
self
,
peer
,
mailfrom
,
rcpttos
,
data
):
"""
Delegates to the handler passed in to the server's constructor.
Typically, this will be a test case method.
:param peer: The client (host, port) tuple.
:param mailfrom: The address of the sender.
:param rcpttos: The addresses of the recipients.
:param data: The message.
"""
self
.
_handler
(
peer
,
mailfrom
,
rcpttos
,
data
)
def
start
(
self
):
"""
Start the server running on a separate daemon thread.
"""
self
.
_thread
=
t
=
threading
.
Thread
(
target
=
self
.
serve_forever
,
args
=
(
self
.
poll_interval
,))
t
.
setDaemon
(
True
)
t
.
start
()
def
serve_forever
(
self
,
poll_interval
):
"""
Run the :mod:`asyncore` loop until normal termination
conditions arise.
:param poll_interval: The interval, in seconds, used in the underlying
:func:`select` or :func:`poll` call by
:func:`asyncore.loop`.
"""
try
:
asyncore
.
loop
(
poll_interval
,
map
=
self
.
sockmap
)
except
select
.
error
:
# On FreeBSD 8, closing the server repeatably
# raises this error. We swallow it if the
# server has been closed.
if
self
.
connected
or
self
.
accepting
:
raise
def
stop
(
self
,
timeout
=
None
):
"""
Stop the thread by closing the server instance.
Wait for the server thread to terminate.
:param timeout: How long to wait for the server thread
to terminate.
"""
self
.
close
()
self
.
_thread
.
join
(
timeout
)
self
.
_thread
=
None
def
start
(
self
):
class
ControlMixin
(
object
):
"""
Create a daemon thread to run the server, and start it.
This mixin is used to start a server on a separate thread, and
shut it down programmatically. Request handling is simplified - instead
of needing to derive a suitable RequestHandler subclass, you just
provide a callable which will be passed each received request to be
processed.
:param handler: A handler callable which will be called with a
single parameter - the request - in order to
process the request. This handler is called on the
server thread, effectively meaning that requests are
processed serially. While not quite Web scale ;-),
this should be fine for testing applications.
:param poll_interval: The polling interval in seconds.
"""
self
.
_thread
=
t
=
threading
.
Thread
(
target
=
self
.
serve_forever
,
args
=
(
self
.
poll_interval
,))
t
.
setDaemon
(
True
)
t
.
start
()
def
serve_forever
(
self
,
poll_interval
):
def
__init__
(
self
,
handler
,
poll_interval
):
self
.
_thread
=
None
self
.
poll_interval
=
poll_interval
self
.
_handler
=
handler
self
.
ready
=
threading
.
Event
()
def
start
(
self
):
"""
Create a daemon thread to run the server, and start it.
"""
self
.
_thread
=
t
=
threading
.
Thread
(
target
=
self
.
serve_forever
,
args
=
(
self
.
poll_interval
,))
t
.
setDaemon
(
True
)
t
.
start
()
def
serve_forever
(
self
,
poll_interval
):
"""
Run the server. Set the ready flag before entering the
service loop.
"""
self
.
ready
.
set
()
super
(
ControlMixin
,
self
)
.
serve_forever
(
poll_interval
)
def
stop
(
self
,
timeout
=
None
):
"""
Tell the server thread to stop, and wait for it to do so.
:param timeout: How long to wait for the server thread
to terminate.
"""
self
.
shutdown
()
if
self
.
_thread
is
not
None
:
self
.
_thread
.
join
(
timeout
)
self
.
_thread
=
None
self
.
server_close
()
self
.
ready
.
clear
()
class
TestHTTPServer
(
ControlMixin
,
HTTPServer
):
"""
Run the server. Set the ready flag before entering the
service loop.
An HTTP server which is controllable using :class:`ControlMixin`.
:param addr: A tuple with the IP address and port to listen on.
:param handler: A handler callable which will be called with a
single parameter - the request - in order to
process the request.
:param poll_interval: The polling interval in seconds.
:param log: Pass ``True`` to enable log messages.
"""
self
.
ready
.
set
()
super
(
ControlMixin
,
self
)
.
serve_forever
(
poll_interval
)
def
stop
(
self
,
timeout
=
None
):
def
__init__
(
self
,
addr
,
handler
,
poll_interval
=
0.5
,
log
=
False
):
class
DelegatingHTTPRequestHandler
(
BaseHTTPRequestHandler
):
def
__getattr__
(
self
,
name
,
default
=
None
):
if
name
.
startswith
(
'do_'
):
return
self
.
process_request
raise
AttributeError
(
name
)
def
process_request
(
self
):
self
.
server
.
_handler
(
self
)
def
log_message
(
self
,
format
,
*
args
):
if
log
:
super
(
DelegatingHTTPRequestHandler
,
self
)
.
log_message
(
format
,
*
args
)
HTTPServer
.
__init__
(
self
,
addr
,
DelegatingHTTPRequestHandler
)
ControlMixin
.
__init__
(
self
,
handler
,
poll_interval
)
class
TestTCPServer
(
ControlMixin
,
ThreadingTCPServer
):
"""
Tell the server thread to stop, and wait for it to do so.
:param timeout: How long to wait for the server thread
to terminate.
A TCP server which is controllable using :class:`ControlMixin`.
:param addr: A tuple with the IP address and port to listen on.
:param handler: A handler callable which will be called with a single
parameter - the request - in order to process the request.
:param poll_interval: The polling interval in seconds.
:bind_and_activate: If True (the default), binds the server and starts it
listening. If False, you need to call
:meth:`server_bind` and :meth:`server_activate` at
some later time before calling :meth:`start`, so that
the server will set up the socket and listen on it.
"""
self
.
shutdown
()
if
self
.
_thread
is
not
None
:
self
.
_thread
.
join
(
timeout
)
self
.
_thread
=
None
self
.
server_close
()
self
.
ready
.
clear
()
class
TestHTTPServer
(
ControlMixin
,
HTTPServer
):
"""
An HTTP server which is controllable using :class:`ControlMixin`.
:param addr: A tuple with the IP address and port to listen on.
:param handler: A handler callable which will be called with a
single parameter - the request - in order to
process the request.
:param poll_interval: The polling interval in seconds.
:param log: Pass ``True`` to enable log messages.
"""
def
__init__
(
self
,
addr
,
handler
,
poll_interval
=
0.5
,
log
=
False
):
class
DelegatingHTTPRequestHandler
(
BaseHTTPRequestHandler
):
def
__getattr__
(
self
,
name
,
default
=
None
):
if
name
.
startswith
(
'do_'
):
return
self
.
process_request
raise
AttributeError
(
name
)
def
process_request
(
self
):
self
.
server
.
_handler
(
self
)
def
log_message
(
self
,
format
,
*
args
):
if
log
:
super
(
DelegatingHTTPRequestHandler
,
self
)
.
log_message
(
format
,
*
args
)
HTTPServer
.
__init__
(
self
,
addr
,
DelegatingHTTPRequestHandler
)
ControlMixin
.
__init__
(
self
,
handler
,
poll_interval
)
class
TestTCPServer
(
ControlMixin
,
ThreadingTCPServer
):
"""
A TCP server which is controllable using :class:`ControlMixin`.
:param addr: A tuple with the IP address and port to listen on.
:param handler: A handler callable which will be called with a single
parameter - the request - in order to process the request.
:param poll_interval: The polling interval in seconds.
:bind_and_activate: If True (the default), binds the server and starts it
listening. If False, you need to call
:meth:`server_bind` and :meth:`server_activate` at
some later time before calling :meth:`start`, so that
the server will set up the socket and listen on it.
"""
allow_reuse_address
=
True
allow_reuse_address
=
True
def
__init__
(
self
,
addr
,
handler
,
poll_interval
=
0.5
,
bind_and_activate
=
True
):
class
DelegatingTCPRequestHandler
(
StreamRequestHandler
):
def
__init__
(
self
,
addr
,
handler
,
poll_interval
=
0.5
,
bind_and_activate
=
True
):
class
DelegatingTCPRequestHandler
(
StreamRequestHandler
):
def
handle
(
self
):
self
.
server
.
_handler
(
self
)
ThreadingTCPServer
.
__init__
(
self
,
addr
,
DelegatingTCPRequestHandler
,
bind_and_activate
)
ControlMixin
.
__init__
(
self
,
handler
,
poll_interval
)
def
handle
(
self
):
self
.
server
.
_handler
(
self
)
ThreadingTCPServer
.
__init__
(
self
,
addr
,
DelegatingTCPRequestHandler
,
bind_and_activate
)
ControlMixin
.
__init__
(
self
,
handler
,
poll_interval
)
def
server_bind
(
self
):
super
(
TestTCPServer
,
self
)
.
server_bind
()
self
.
port
=
self
.
socket
.
getsockname
()[
1
]
def
server_bind
(
self
):
super
(
TestTCPServer
,
self
)
.
server_bind
()
self
.
port
=
self
.
socket
.
getsockname
()[
1
]
class
TestUDPServer
(
ControlMixin
,
ThreadingUDPServer
):
"""
A UDP server which is controllable using :class:`ControlMixin`.
:param addr: A tuple with the IP address and port to listen on.
:param handler: A handler callable which will be called with a
single parameter - the request - in order to
process the request.
:param poll_interval: The polling interval for shutdown requests,
in seconds.
:bind_and_activate: If True (the default), binds the server and
starts it listening. If False, you need to
call :meth:`server_bind` and
:meth:`server_activate` at some later time
before calling :meth:`start`, so that the server will
set up the socket and listen on it.
"""
def
__init__
(
self
,
addr
,
handler
,
poll_interval
=
0.5
,
bind_and_activate
=
True
):
class
DelegatingUDPRequestHandler
(
DatagramRequestHandler
):
class
TestUDPServer
(
ControlMixin
,
ThreadingUDPServer
):
"""
A UDP server which is controllable using :class:`ControlMixin`.
:param addr: A tuple with the IP address and port to listen on.
:param handler: A handler callable which will be called with a
single parameter - the request - in order to
process the request.
:param poll_interval: The polling interval for shutdown requests,
in seconds.
:bind_and_activate: If True (the default), binds the server and
starts it listening. If False, you need to
call :meth:`server_bind` and
:meth:`server_activate` at some later time
before calling :meth:`start`, so that the server will
set up the socket and listen on it.
"""
def
__init__
(
self
,
addr
,
handler
,
poll_interval
=
0.5
,
bind_and_activate
=
True
):
class
DelegatingUDPRequestHandler
(
DatagramRequestHandler
):
def
handle
(
self
):
self
.
server
.
_handler
(
self
)
ThreadingUDPServer
.
__init__
(
self
,
addr
,
DelegatingUDPRequestHandler
,
bind_and_activate
)
ControlMixin
.
__init__
(
self
,
handler
,
poll_interval
)
def
handle
(
self
):
self
.
server
.
_handler
(
self
)
ThreadingUDPServer
.
__init__
(
self
,
addr
,
DelegatingUDPRequestHandler
,
bind_and_activate
)
ControlMixin
.
__init__
(
self
,
handler
,
poll_interval
)
def
server_bind
(
self
):
super
(
TestUDPServer
,
self
)
.
server_bind
()
self
.
port
=
self
.
socket
.
getsockname
()[
1
]
def
server_bind
(
self
):
super
(
TestUDPServer
,
self
)
.
server_bind
()
self
.
port
=
self
.
socket
.
getsockname
()[
1
]
# - end of server_helper section
@unittest.skipUnless
(
threading
,
'Threading required for this test.'
)
class
SMTPHandlerTest
(
BaseTest
):
def
test_basic
(
self
):
sockmap
=
{}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment