Kaydet (Commit) 5cf565dd authored tarafından Phillip J. Eby's avatar Phillip J. Eby

Import wsgiref into the stdlib, as of the external version 0.1-r2181.

üst dbeaa699
...@@ -292,6 +292,7 @@ and how to embed it in other applications. ...@@ -292,6 +292,7 @@ and how to embed it in other applications.
\input{libwebbrowser} \input{libwebbrowser}
\input{libcgi} \input{libcgi}
\input{libcgitb} \input{libcgitb}
\input{libwsgiref}
\input{liburllib} \input{liburllib}
\input{liburllib2} \input{liburllib2}
\input{libhttplib} \input{libhttplib}
......
This diff is collapsed.
This diff is collapsed.
Metadata-Version: 1.0
Name: wsgiref
Version: 0.1
Summary: WSGI (PEP 333) Reference Library
Author: Phillip J. Eby
Author-email: web-sig@python.org
License: PSF or ZPL
Platform: UNKNOWN
"""wsgiref -- a WSGI (PEP 333) Reference Library
Current Contents:
* util -- Miscellaneous useful functions and wrappers
* headers -- Manage response headers
* handlers -- base classes for server/gateway implementations
* simple_server -- a simple BaseHTTPServer that supports WSGI
* validate -- validation wrapper that sits between an app and a server
to detect errors in either
To-Do:
* cgi_gateway -- Run WSGI apps under CGI (pending a deployment standard)
* cgi_wrapper -- Run CGI apps under WSGI
* router -- a simple middleware component that handles URL traversal
"""
This diff is collapsed.
"""Manage HTTP Response Headers
Much of this module is red-handedly pilfered from email.Message in the stdlib,
so portions are Copyright (C) 2001,2002 Python Software Foundation, and were
written by Barry Warsaw.
"""
from types import ListType, TupleType
# Regular expression that matches `special' characters in parameters, the
# existance of which force quoting of the parameter value.
import re
tspecials = re.compile(r'[ \(\)<>@,;:\\"/\[\]\?=]')
def _formatparam(param, value=None, quote=1):
"""Convenience function to format and return a key=value pair.
This will quote the value if needed or if quote is true.
"""
if value is not None and len(value) > 0:
if quote or tspecials.search(value):
value = value.replace('\\', '\\\\').replace('"', r'\"')
return '%s="%s"' % (param, value)
else:
return '%s=%s' % (param, value)
else:
return param
class Headers:
"""Manage a collection of HTTP response headers"""
def __init__(self,headers):
if type(headers) is not ListType:
raise TypeError("Headers must be a list of name/value tuples")
self._headers = headers
def __len__(self):
"""Return the total number of headers, including duplicates."""
return len(self._headers)
def __setitem__(self, name, val):
"""Set the value of a header."""
del self[name]
self._headers.append((name, val))
def __delitem__(self,name):
"""Delete all occurrences of a header, if present.
Does *not* raise an exception if the header is missing.
"""
name = name.lower()
self._headers[:] = [kv for kv in self._headers if kv[0].lower()<>name]
def __getitem__(self,name):
"""Get the first header value for 'name'
Return None if the header is missing instead of raising an exception.
Note that if the header appeared multiple times, the first exactly which
occurrance gets returned is undefined. Use getall() to get all
the values matching a header field name.
"""
return self.get(name)
def has_key(self, name):
"""Return true if the message contains the header."""
return self.get(name) is not None
__contains__ = has_key
def get_all(self, name):
"""Return a list of all the values for the named field.
These will be sorted in the order they appeared in the original header
list or were added to this instance, and may contain duplicates. Any
fields deleted and re-inserted are always appended to the header list.
If no fields exist with the given name, returns an empty list.
"""
name = name.lower()
return [kv[1] for kv in self._headers if kv[0].lower()==name]
def get(self,name,default=None):
"""Get the first header value for 'name', or return 'default'"""
name = name.lower()
for k,v in self._headers:
if k.lower()==name:
return v
return default
def keys(self):
"""Return a list of all the header field names.
These will be sorted in the order they appeared in the original header
list, or were added to this instance, and may contain duplicates.
Any fields deleted and re-inserted are always appended to the header
list.
"""
return [k for k, v in self._headers]
def values(self):
"""Return a list of all header values.
These will be sorted in the order they appeared in the original header
list, or were added to this instance, and may contain duplicates.
Any fields deleted and re-inserted are always appended to the header
list.
"""
return [v for k, v in self._headers]
def items(self):
"""Get all the header fields and values.
These will be sorted in the order they were in the original header
list, or were added to this instance, and may contain duplicates.
Any fields deleted and re-inserted are always appended to the header
list.
"""
return self._headers[:]
def __repr__(self):
return "Headers(%s)" % `self._headers`
def __str__(self):
"""str() returns the formatted headers, complete with end line,
suitable for direct HTTP transmission."""
return '\r\n'.join(["%s: %s" % kv for kv in self._headers]+['',''])
def setdefault(self,name,value):
"""Return first matching header value for 'name', or 'value'
If there is no header named 'name', add a new header with name 'name'
and value 'value'."""
result = self.get(name)
if result is None:
self._headers.append((name,value))
return value
else:
return result
def add_header(self, _name, _value, **_params):
"""Extended header setting.
_name is the header field to add. keyword arguments can be used to set
additional parameters for the header field, with underscores converted
to dashes. Normally the parameter will be added as key="value" unless
value is None, in which case only the key will be added.
Example:
h.add_header('content-disposition', 'attachment', filename='bud.gif')
Note that unlike the corresponding 'email.Message' method, this does
*not* handle '(charset, language, value)' tuples: all values must be
strings or None.
"""
parts = []
if _value is not None:
parts.append(_value)
for k, v in _params.items():
if v is None:
parts.append(k.replace('_', '-'))
else:
parts.append(_formatparam(k.replace('_', '-'), v))
self._headers.append((_name, "; ".join(parts)))
"""BaseHTTPServer that implements the Python WSGI protocol (PEP 333, rev 1.21)
This is both an example of how WSGI can be implemented, and a basis for running
simple web applications on a local machine, such as might be done when testing
or debugging an application. It has not been reviewed for security issues,
however, and we strongly recommend that you use a "real" web server for
production use.
For example usage, see the 'if __name__=="__main__"' block at the end of the
module. See also the BaseHTTPServer module docs for other API information.
"""
from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
import urllib, sys
from wsgiref.handlers import SimpleHandler
__version__ = "0.1"
__all__ = ['WSGIServer', 'WSGIRequestHandler', 'demo_app', 'make_server']
server_version = "WSGIServer/" + __version__
sys_version = "Python/" + sys.version.split()[0]
software_version = server_version + ' ' + sys_version
class ServerHandler(SimpleHandler):
server_software = software_version
def close(self):
try:
self.request_handler.log_request(
self.status.split(' ',1)[0], self.bytes_sent
)
finally:
SimpleHandler.close(self)
class WSGIServer(HTTPServer):
"""BaseHTTPServer that implements the Python WSGI protocol"""
application = None
def server_bind(self):
"""Override server_bind to store the server name."""
HTTPServer.server_bind(self)
self.setup_environ()
def setup_environ(self):
# Set up base environment
env = self.base_environ = {}
env['SERVER_NAME'] = self.server_name
env['GATEWAY_INTERFACE'] = 'CGI/1.1'
env['SERVER_PORT'] = str(self.server_port)
env['REMOTE_HOST']=''
env['CONTENT_LENGTH']=''
env['SCRIPT_NAME'] = ''
def get_app(self):
return self.application
def set_app(self,application):
self.application = application
class WSGIRequestHandler(BaseHTTPRequestHandler):
server_version = "WSGIServer/" + __version__
def get_environ(self):
env = self.server.base_environ.copy()
env['SERVER_PROTOCOL'] = self.request_version
env['REQUEST_METHOD'] = self.command
if '?' in self.path:
path,query = self.path.split('?',1)
else:
path,query = self.path,''
env['PATH_INFO'] = urllib.unquote(path)
env['QUERY_STRING'] = query
host = self.address_string()
if host != self.client_address[0]:
env['REMOTE_HOST'] = host
env['REMOTE_ADDR'] = self.client_address[0]
if self.headers.typeheader is None:
env['CONTENT_TYPE'] = self.headers.type
else:
env['CONTENT_TYPE'] = self.headers.typeheader
length = self.headers.getheader('content-length')
if length:
env['CONTENT_LENGTH'] = length
for h in self.headers.headers:
k,v = h.split(':',1)
k=k.replace('-','_').upper(); v=v.strip()
if k in env:
continue # skip content length, type,etc.
if 'HTTP_'+k in env:
env['HTTP_'+k] += ','+v # comma-separate multiple headers
else:
env['HTTP_'+k] = v
return env
def get_stderr(self):
return sys.stderr
def handle(self):
"""Handle a single HTTP request"""
self.raw_requestline = self.rfile.readline()
if not self.parse_request(): # An error code has been sent, just exit
return
handler = ServerHandler(
self.rfile, self.wfile, self.get_stderr(), self.get_environ()
)
handler.request_handler = self # backpointer for logging
handler.run(self.server.get_app())
def demo_app(environ,start_response):
from StringIO import StringIO
stdout = StringIO()
print >>stdout, "Hello world!"
print >>stdout
h = environ.items(); h.sort()
for k,v in h:
print >>stdout, k,'=',`v`
start_response("200 OK", [('Content-Type','text/plain')])
return [stdout.getvalue()]
def make_server(
host, port, app, server_class=WSGIServer, handler_class=WSGIRequestHandler
):
"""Create a new WSGI server listening on `host` and `port` for `app`"""
server = server_class((host, port), handler_class)
server.set_app(app)
return server
if __name__ == '__main__':
server_address = ('', 8000)
httpd = make_server('', 8000, demo_app)
sa = httpd.socket.getsockname()
print "Serving HTTP on", sa[0], "port", sa[1], "..."
import webbrowser
webbrowser.open('http://localhost:8000/xyz?abc')
httpd.handle_request() # serve one request, then exit
"""Miscellaneous WSGI-related Utilities"""
import posixpath
__all__ = [
'FileWrapper', 'guess_scheme', 'application_uri', 'request_uri',
'shift_path_info', 'setup_testing_defaults',
]
class FileWrapper:
"""Wrapper to convert file-like objects to iterables"""
def __init__(self, filelike, blksize=8192):
self.filelike = filelike
self.blksize = blksize
if hasattr(filelike,'close'):
self.close = filelike.close
def __getitem__(self,key):
data = self.filelike.read(self.blksize)
if data:
return data
raise IndexError
def __iter__(self):
return self
def next(self):
data = self.filelike.read(self.blksize)
if data:
return data
raise StopIteration
def guess_scheme(environ):
"""Return a guess for whether 'wsgi.url_scheme' should be 'http' or 'https'
"""
if environ.get("HTTPS") in ('yes','on','1'):
return 'https'
else:
return 'http'
def application_uri(environ):
"""Return the application's base URI (no PATH_INFO or QUERY_STRING)"""
url = environ['wsgi.url_scheme']+'://'
from urllib import quote
if environ.get('HTTP_HOST'):
url += environ['HTTP_HOST']
else:
url += environ['SERVER_NAME']
if environ['wsgi.url_scheme'] == 'https':
if environ['SERVER_PORT'] != '443':
url += ':' + environ['SERVER_PORT']
else:
if environ['SERVER_PORT'] != '80':
url += ':' + environ['SERVER_PORT']
url += quote(environ.get('SCRIPT_NAME') or '/')
return url
def request_uri(environ, include_query=1):
"""Return the full request URI, optionally including the query string"""
url = application_uri(environ)
from urllib import quote
path_info = quote(environ.get('PATH_INFO',''))
if not environ.get('SCRIPT_NAME'):
url += path_info[1:]
else:
url += path_info
if include_query and environ.get('QUERY_STRING'):
url += '?' + environ['QUERY_STRING']
return url
def shift_path_info(environ):
"""Shift a name from PATH_INFO to SCRIPT_NAME, returning it
If there are no remaining path segments in PATH_INFO, return None.
Note: 'environ' is modified in-place; use a copy if you need to keep
the original PATH_INFO or SCRIPT_NAME.
Note: when PATH_INFO is just a '/', this returns '' and appends a trailing
'/' to SCRIPT_NAME, even though empty path segments are normally ignored,
and SCRIPT_NAME doesn't normally end in a '/'. This is intentional
behavior, to ensure that an application can tell the difference between
'/x' and '/x/' when traversing to objects.
"""
path_info = environ.get('PATH_INFO','')
if not path_info:
return None
path_parts = path_info.split('/')
path_parts[1:-1] = [p for p in path_parts[1:-1] if p and p<>'.']
name = path_parts[1]
del path_parts[1]
script_name = environ.get('SCRIPT_NAME','')
script_name = posixpath.normpath(script_name+'/'+name)
if script_name.endswith('/'):
script_name = script_name[:-1]
if not name and not script_name.endswith('/'):
script_name += '/'
environ['SCRIPT_NAME'] = script_name
environ['PATH_INFO'] = '/'.join(path_parts)
# Special case: '/.' on PATH_INFO doesn't get stripped,
# because we don't strip the last element of PATH_INFO
# if there's only one path part left. Instead of fixing this
# above, we fix it here so that PATH_INFO gets normalized to
# an empty string in the environ.
if name=='.':
name = None
return name
def setup_testing_defaults(environ):
"""Update 'environ' with trivial defaults for testing purposes
This adds various parameters required for WSGI, including HTTP_HOST,
SERVER_NAME, SERVER_PORT, REQUEST_METHOD, SCRIPT_NAME, PATH_INFO,
and all of the wsgi.* variables. It only supplies default values,
and does not replace any existing settings for these variables.
This routine is intended to make it easier for unit tests of WSGI
servers and applications to set up dummy environments. It should *not*
be used by actual WSGI servers or applications, since the data is fake!
"""
environ.setdefault('SERVER_NAME','127.0.0.1')
environ.setdefault('SERVER_PROTOCOL','HTTP/1.0')
environ.setdefault('HTTP_HOST',environ['SERVER_NAME'])
environ.setdefault('REQUEST_METHOD','GET')
if 'SCRIPT_NAME' not in environ and 'PATH_INFO' not in environ:
environ.setdefault('SCRIPT_NAME','')
environ.setdefault('PATH_INFO','/')
environ.setdefault('wsgi.version', (1,0))
environ.setdefault('wsgi.run_once', 0)
environ.setdefault('wsgi.multithread', 0)
environ.setdefault('wsgi.multiprocess', 0)
from StringIO import StringIO
environ.setdefault('wsgi.input', StringIO(""))
environ.setdefault('wsgi.errors', StringIO())
environ.setdefault('wsgi.url_scheme',guess_scheme(environ))
if environ['wsgi.url_scheme']=='http':
environ.setdefault('SERVER_PORT', '80')
elif environ['wsgi.url_scheme']=='https':
environ.setdefault('SERVER_PORT', '443')
_hoppish = {
'connection':1, 'keep-alive':1, 'proxy-authenticate':1,
'proxy-authorization':1, 'te':1, 'trailers':1, 'transfer-encoding':1,
'upgrade':1
}.has_key
def is_hop_by_hop(header_name):
"""Return true if 'header_name' is an HTTP/1.1 "Hop-by-Hop" header"""
return _hoppish(header_name.lower())
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment