Kaydet (Commit) 9a5395ae authored tarafından Christian Heimes's avatar Christian Heimes

Issue #18147: Add diagnostic functions to ssl.SSLContext().

get_ca_list() lists all loaded CA certificates and cert_store_stats() returns
amount of loaded X.509 certs, X.509 CA certs and CRLs.
üst 9424bb4a
......@@ -791,6 +791,19 @@ to speed up repeated connections from the same clients.
:class:`SSLContext` objects have the following methods and attributes:
.. method:: SSLContext.cert_store_stats()
Get statistics about quantities of loaded X.509 certificates, count of
X.509 certificates flagged as CA certificates and certificate revocation
lists as dictionary.
Example for a context with one CA cert and one other cert::
>>> context.cert_store_stats()
{'crl': 0, 'x509_ca': 1, 'x509': 2}
.. versionadded:: 3.4
.. method:: SSLContext.load_cert_chain(certfile, keyfile=None, password=None)
Load a private key and the corresponding certificate. The *certfile*
......@@ -837,6 +850,17 @@ to speed up repeated connections from the same clients.
following an `OpenSSL specific layout
<http://www.openssl.org/docs/ssl/SSL_CTX_load_verify_locations.html>`_.
.. method:: SSLContext.get_ca_certs(binary_form=False)
Get a list of loaded "certification authority" (CA) certificates. If the
``binary_form`` parameter is :const:`False` each list
entry is a dict like the output of :meth:`SSLSocket.getpeercert`. Otherwise
the method returns a list of DER-encoded certificates. The returned list
does not contain certificates from *capath* unless a certificate was
requested and loaded by a SSL connection.
..versionadded:: 3.4
.. method:: SSLContext.set_default_verify_paths()
Load a set of default "certification authority" (CA) certificates from
......
......@@ -680,6 +680,47 @@ class ContextTests(unittest.TestCase):
gc.collect()
self.assertIs(wr(), None)
def test_cert_store_stats(self):
ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
self.assertEqual(ctx.cert_store_stats(),
{'x509_ca': 0, 'crl': 0, 'x509': 0})
ctx.load_cert_chain(CERTFILE)
self.assertEqual(ctx.cert_store_stats(),
{'x509_ca': 0, 'crl': 0, 'x509': 0})
ctx.load_verify_locations(CERTFILE)
self.assertEqual(ctx.cert_store_stats(),
{'x509_ca': 0, 'crl': 0, 'x509': 1})
ctx.load_verify_locations(SVN_PYTHON_ORG_ROOT_CERT)
self.assertEqual(ctx.cert_store_stats(),
{'x509_ca': 1, 'crl': 0, 'x509': 2})
def test_get_ca_certs(self):
ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
self.assertEqual(ctx.get_ca_certs(), [])
# CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE
ctx.load_verify_locations(CERTFILE)
self.assertEqual(ctx.get_ca_certs(), [])
# but SVN_PYTHON_ORG_ROOT_CERT is a CA cert
ctx.load_verify_locations(SVN_PYTHON_ORG_ROOT_CERT)
self.assertEqual(ctx.get_ca_certs(),
[{'issuer': ((('organizationName', 'Root CA'),),
(('organizationalUnitName', 'http://www.cacert.org'),),
(('commonName', 'CA Cert Signing Authority'),),
(('emailAddress', 'support@cacert.org'),)),
'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'),
'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'),
'serialNumber': '00',
'subject': ((('organizationName', 'Root CA'),),
(('organizationalUnitName', 'http://www.cacert.org'),),
(('commonName', 'CA Cert Signing Authority'),),
(('emailAddress', 'support@cacert.org'),)),
'version': 3}])
with open(SVN_PYTHON_ORG_ROOT_CERT) as f:
pem = f.read()
der = ssl.PEM_cert_to_DER_cert(pem)
self.assertEqual(ctx.get_ca_certs(True), [der])
class SSLErrorTests(unittest.TestCase):
......@@ -995,6 +1036,22 @@ class NetworkedTests(unittest.TestCase):
finally:
s.close()
def test_get_ca_certs_capath(self):
# capath certs are loaded on request
with support.transient_internet("svn.python.org"):
ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
ctx.verify_mode = ssl.CERT_REQUIRED
ctx.load_verify_locations(capath=CAPATH)
self.assertEqual(ctx.get_ca_certs(), [])
s = ctx.wrap_socket(socket.socket(socket.AF_INET))
s.connect(("svn.python.org", 443))
try:
cert = s.getpeercert()
self.assertTrue(cert)
finally:
s.close()
self.assertEqual(len(ctx.get_ca_certs()), 1)
try:
import threading
......
......@@ -123,6 +123,10 @@ Core and Builtins
Library
-------
- Issue #18147: Add diagnostic functions to ssl.SSLContext(). get_ca_list()
lists all loaded CA certificates and cert_store_stats() returns amount of
loaded X.509 certs, X.509 CA certs and CRLs.
- Issue #18076: Introduce importlib.util.decode_source().
- importlib.abc.SourceLoader.get_source() no longer changes SyntaxError or
......
......@@ -1023,6 +1023,24 @@ _decode_certificate(X509 *certificate) {
return NULL;
}
static PyObject *
_certificate_to_der(X509 *certificate)
{
unsigned char *bytes_buf = NULL;
int len;
PyObject *retval;
bytes_buf = NULL;
len = i2d_X509(certificate, &bytes_buf);
if (len < 0) {
_setSSLError(NULL, 0, __FILE__, __LINE__);
return NULL;
}
/* this is actually an immutable bytes sequence */
retval = PyBytes_FromStringAndSize((const char *) bytes_buf, len);
OPENSSL_free(bytes_buf);
return retval;
}
static PyObject *
PySSL_test_decode_certificate (PyObject *mod, PyObject *args) {
......@@ -1068,8 +1086,6 @@ PySSL_test_decode_certificate (PyObject *mod, PyObject *args) {
static PyObject *
PySSL_peercert(PySSLSocket *self, PyObject *args)
{
PyObject *retval = NULL;
int len;
int verification;
int binary_mode = 0;
......@@ -1081,21 +1097,7 @@ PySSL_peercert(PySSLSocket *self, PyObject *args)
if (binary_mode) {
/* return cert in DER-encoded format */
unsigned char *bytes_buf = NULL;
bytes_buf = NULL;
len = i2d_X509(self->peer_cert, &bytes_buf);
if (len < 0) {
PySSL_SetError(self, len, __FILE__, __LINE__);
return NULL;
}
/* this is actually an immutable bytes sequence */
retval = PyBytes_FromStringAndSize
((const char *) bytes_buf, len);
OPENSSL_free(bytes_buf);
return retval;
return _certificate_to_der(self->peer_cert);
} else {
verification = SSL_CTX_get_verify_mode(SSL_get_SSL_CTX(self->ssl));
if ((verification & SSL_VERIFY_PEER) == 0)
......@@ -2555,6 +2557,110 @@ set_servername_callback(PySSLContext *self, PyObject *args)
#endif
}
PyDoc_STRVAR(PySSL_get_stats_doc,
"cert_store_stats() -> {'crl': int, 'x509_ca': int, 'x509': int}\n\
\n\
Returns quantities of loaded X.509 certificates. X.509 certificates with a\n\
CA extension and certificate revocation lists inside the context's cert\n\
store.\n\
NOTE: Certificates in a capath directory aren't loaded unless they have\n\
been used at least once.");
static PyObject *
cert_store_stats(PySSLContext *self)
{
X509_STORE *store;
X509_OBJECT *obj;
int x509 = 0, crl = 0, pkey = 0, ca = 0, i;
store = SSL_CTX_get_cert_store(self->ctx);
for (i = 0; i < sk_X509_OBJECT_num(store->objs); i++) {
obj = sk_X509_OBJECT_value(store->objs, i);
switch (obj->type) {
case X509_LU_X509:
x509++;
if (X509_check_ca(obj->data.x509)) {
ca++;
}
break;
case X509_LU_CRL:
crl++;
break;
case X509_LU_PKEY:
pkey++;
break;
default:
/* Ignore X509_LU_FAIL, X509_LU_RETRY, X509_LU_PKEY.
* As far as I can tell they are internal states and never
* stored in a cert store */
break;
}
}
return Py_BuildValue("{sisisi}", "x509", x509, "crl", crl,
"x509_ca", ca);
}
PyDoc_STRVAR(PySSL_get_ca_certs_doc,
"get_ca_certs([der=False]) -> list of loaded certificate\n\
\n\
Returns a list of dicts with information of loaded CA certs. If the\n\
optional argument is True, returns a DER-encoded copy of the CA certificate.\n\
NOTE: Certificates in a capath directory aren't loaded unless they have\n\
been used at least once.");
static PyObject *
get_ca_certs(PySSLContext *self, PyObject *args)
{
X509_STORE *store;
PyObject *ci = NULL, *rlist = NULL;
int i;
int binary_mode = 0;
if (!PyArg_ParseTuple(args, "|p:get_ca_certs", &binary_mode)) {
return NULL;
}
if ((rlist = PyList_New(0)) == NULL) {
return NULL;
}
store = SSL_CTX_get_cert_store(self->ctx);
for (i = 0; i < sk_X509_OBJECT_num(store->objs); i++) {
X509_OBJECT *obj;
X509 *cert;
obj = sk_X509_OBJECT_value(store->objs, i);
if (obj->type != X509_LU_X509) {
/* not a x509 cert */
continue;
}
/* CA for any purpose */
cert = obj->data.x509;
if (!X509_check_ca(cert)) {
continue;
}
if (binary_mode) {
ci = _certificate_to_der(cert);
} else {
ci = _decode_certificate(cert);
}
if (ci == NULL) {
goto error;
}
if (PyList_Append(rlist, ci) == -1) {
goto error;
}
Py_CLEAR(ci);
}
return rlist;
error:
Py_XDECREF(ci);
Py_XDECREF(rlist);
return NULL;
}
static PyGetSetDef context_getsetlist[] = {
{"options", (getter) get_options,
(setter) set_options, NULL},
......@@ -2586,6 +2692,10 @@ static struct PyMethodDef context_methods[] = {
#endif
{"set_servername_callback", (PyCFunction) set_servername_callback,
METH_VARARGS, PySSL_set_servername_callback_doc},
{"cert_store_stats", (PyCFunction) cert_store_stats,
METH_NOARGS, PySSL_get_stats_doc},
{"get_ca_certs", (PyCFunction) get_ca_certs,
METH_VARARGS, PySSL_get_ca_certs_doc},
{NULL, NULL} /* sentinel */
};
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment