Author: Matti Picus <matti.pi...@gmail.com>
Branch: 
Changeset: r97180:df138cf691c1
Date: 2019-08-14 19:35 +0300
http://bitbucket.org/pypy/pypy/changeset/df138cf691c1/

Log:    tweak test, add missing get_cipher

diff too long, truncating to 2000 out of 4071 lines

diff --git a/lib-python/3/test/test_tools.py b/lib-python/3.2/test/test_tools.py
rename from lib-python/3/test/test_tools.py
rename to lib-python/3.2/test/test_tools.py
diff --git a/lib-python/3/test/test_ssl.py b/lib-python/3/test/test_ssl.py
new file mode 100644
--- /dev/null
+++ b/lib-python/3/test/test_ssl.py
@@ -0,0 +1,3987 @@
+# Test the support for SSL and sockets
+
+import sys
+import unittest
+from test import support
+import socket
+import select
+import time
+import datetime
+import gc
+import os
+import errno
+import pprint
+import tempfile
+import urllib.request
+import traceback
+import asyncore
+import weakref
+import platform
+import re
+import functools
+try:
+    import ctypes
+except ImportError:
+    ctypes = None
+
+ssl = support.import_module("ssl")
+
+try:
+    import threading
+except ImportError:
+    _have_threads = False
+else:
+    _have_threads = True
+
+PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
+HOST = support.HOST
+IS_LIBRESSL = ssl.OPENSSL_VERSION.startswith('LibreSSL')
+IS_OPENSSL_1_1 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0)
+
+
+def data_file(*name):
+    return os.path.join(os.path.dirname(__file__), *name)
+
+# The custom key and certificate files used in test_ssl are generated
+# using Lib/test/make_ssl_certs.py.
+# Other certificates are simply fetched from the Internet servers they
+# are meant to authenticate.
+
+CERTFILE = data_file("keycert.pem")
+BYTES_CERTFILE = os.fsencode(CERTFILE)
+ONLYCERT = data_file("ssl_cert.pem")
+ONLYKEY = data_file("ssl_key.pem")
+BYTES_ONLYCERT = os.fsencode(ONLYCERT)
+BYTES_ONLYKEY = os.fsencode(ONLYKEY)
+CERTFILE_PROTECTED = data_file("keycert.passwd.pem")
+ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem")
+KEY_PASSWORD = "somepass"
+CAPATH = data_file("capath")
+BYTES_CAPATH = os.fsencode(CAPATH)
+CAFILE_NEURONIO = data_file("capath", "4e1295a3.0")
+CAFILE_CACERT = data_file("capath", "5ed36f99.0")
+
+# empty CRL
+CRLFILE = data_file("revocation.crl")
+
+# Two keys and certs signed by the same CA (for SNI tests)
+SIGNED_CERTFILE = data_file("keycert3.pem")
+SIGNED_CERTFILE2 = data_file("keycert4.pem")
+# Same certificate as pycacert.pem, but without extra text in file
+SIGNING_CA = data_file("capath", "ceff1710.0")
+# cert with all kinds of subject alt names
+ALLSANFILE = data_file("allsans.pem")
+# cert with all kinds of subject alt names
+ALLSANFILE = data_file("allsans.pem")
+
+REMOTE_HOST = "self-signed.pythontest.net"
+
+EMPTYCERT = data_file("nullcert.pem")
+BADCERT = data_file("badcert.pem")
+NONEXISTINGCERT = data_file("XXXnonexisting.pem")
+BADKEY = data_file("badkey.pem")
+NOKIACERT = data_file("nokia.pem")
+NULLBYTECERT = data_file("nullbytecert.pem")
+TALOS_INVALID_CRLDP = data_file("talos-2019-0758.pem")
+
+DHFILE = data_file("ffdh3072.pem")
+BYTES_DHFILE = os.fsencode(DHFILE)
+
+# Not defined in all versions of OpenSSL
+OP_NO_COMPRESSION = getattr(ssl, "OP_NO_COMPRESSION", 0)
+OP_SINGLE_DH_USE = getattr(ssl, "OP_SINGLE_DH_USE", 0)
+OP_SINGLE_ECDH_USE = getattr(ssl, "OP_SINGLE_ECDH_USE", 0)
+OP_CIPHER_SERVER_PREFERENCE = getattr(ssl, "OP_CIPHER_SERVER_PREFERENCE", 0)
+OP_ENABLE_MIDDLEBOX_COMPAT = getattr(ssl, "OP_ENABLE_MIDDLEBOX_COMPAT", 0)
+
+
+def handle_error(prefix):
+    exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
+    if support.verbose:
+        sys.stdout.write(prefix + exc_format)
+
+def can_clear_options():
+    # 0.9.8m or higher
+    return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15)
+
+def no_sslv2_implies_sslv3_hello():
+    # 0.9.7h or higher
+    return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15)
+
+def have_verify_flags():
+    # 0.9.8 or higher
+    return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 0, 15)
+
+def utc_offset(): #NOTE: ignore issues like #1647654
+    # local time = utc time + utc offset
+    if time.daylight and time.localtime().tm_isdst > 0:
+        return -time.altzone  # seconds
+    return -time.timezone
+
+def asn1time(cert_time):
+    # Some versions of OpenSSL ignore seconds, see #18207
+    # 0.9.8.i
+    if ssl._OPENSSL_API_VERSION == (0, 9, 8, 9, 15):
+        fmt = "%b %d %H:%M:%S %Y GMT"
+        dt = datetime.datetime.strptime(cert_time, fmt)
+        dt = dt.replace(second=0)
+        cert_time = dt.strftime(fmt)
+        # %d adds leading zero but ASN1_TIME_print() uses leading space
+        if cert_time[4] == "0":
+            cert_time = cert_time[:4] + " " + cert_time[5:]
+
+    return cert_time
+
+# Issue #9415: Ubuntu hijacks their OpenSSL and forcefully disables SSLv2
+def skip_if_broken_ubuntu_ssl(func):
+    if hasattr(ssl, 'PROTOCOL_SSLv2'):
+        @functools.wraps(func)
+        def f(*args, **kwargs):
+            try:
+                ssl.SSLContext(ssl.PROTOCOL_SSLv2)
+            except ssl.SSLError:
+                if (ssl.OPENSSL_VERSION_INFO == (0, 9, 8, 15, 15) and
+                    platform.linux_distribution() == ('debian', 'squeeze/sid', 
'')):
+                    raise unittest.SkipTest("Patched Ubuntu OpenSSL breaks 
behaviour")
+            return func(*args, **kwargs)
+        return f
+    else:
+        return func
+
+def skip_if_openssl_cnf_minprotocol_gt_tls1(func):
+    """Skip a test if the OpenSSL config MinProtocol is > TLSv1.
+
+    OS distros with an /etc/ssl/openssl.cnf and MinProtocol set often do so to
+    require TLSv1.2 or higher (Debian Buster).  Some of our tests for older
+    protocol versions will fail under such a config.
+
+    Alternative workaround: Run this test in a process with
+    OPENSSL_CONF=/dev/null in the environment.
+    """
+    @functools.wraps(func)
+    def f(*args, **kwargs):
+        openssl_cnf = os.environ.get("OPENSSL_CONF", "/etc/ssl/openssl.cnf")
+        try:
+            with open(openssl_cnf, "r") as config:
+                for line in config:
+                    match = re.match(r"MinProtocol\s*=\s*(TLSv\d+\S*)", line)
+                    if match:
+                        tls_ver = match.group(1)
+                        if tls_ver > "TLSv1":
+                            raise unittest.SkipTest(
+                                "%s has MinProtocol = %s which is > TLSv1." %
+                                (openssl_cnf, tls_ver))
+        except (EnvironmentError, UnicodeDecodeError) as err:
+            # no config file found, etc.
+            if support.verbose:
+                sys.stdout.write("\n Could not scan %s for MinProtocol: %s\n"
+                                 % (openssl_cnf, err))
+        return func(*args, **kwargs)
+    return f
+
+
+needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this 
test")
+
+
+def test_wrap_socket(sock, ssl_version=ssl.PROTOCOL_TLS, *,
+                     cert_reqs=ssl.CERT_NONE, ca_certs=None,
+                     ciphers=None, certfile=None, keyfile=None,
+                     **kwargs):
+    context = ssl.SSLContext(ssl_version)
+    if cert_reqs is not None:
+        context.verify_mode = cert_reqs
+    if ca_certs is not None:
+        context.load_verify_locations(ca_certs)
+    if certfile is not None or keyfile is not None:
+        context.load_cert_chain(certfile, keyfile)
+    if ciphers is not None:
+        context.set_ciphers(ciphers)
+    return context.wrap_socket(sock, **kwargs)
+
+class BasicSocketTests(unittest.TestCase):
+
+    def test_constants(self):
+        ssl.CERT_NONE
+        ssl.CERT_OPTIONAL
+        ssl.CERT_REQUIRED
+        ssl.OP_CIPHER_SERVER_PREFERENCE
+        ssl.OP_SINGLE_DH_USE
+        if ssl.HAS_ECDH:
+            ssl.OP_SINGLE_ECDH_USE
+        if ssl.OPENSSL_VERSION_INFO >= (1, 0):
+            ssl.OP_NO_COMPRESSION
+        self.assertIn(ssl.HAS_SNI, {True, False})
+        self.assertIn(ssl.HAS_ECDH, {True, False})
+        ssl.OP_NO_SSLv2
+        ssl.OP_NO_SSLv3
+        ssl.OP_NO_TLSv1
+        ssl.OP_NO_TLSv1_3
+    if ssl.OPENSSL_VERSION_INFO >= (1, 0, 1):
+        ssl.OP_NO_TLSv1_1
+        ssl.OP_NO_TLSv1_2
+
+    def test_str_for_enums(self):
+        # Make sure that the PROTOCOL_* constants have enum-like string
+        # reprs.
+        proto = ssl.PROTOCOL_TLS
+        self.assertEqual(str(proto), '_SSLMethod.PROTOCOL_TLS')
+        ctx = ssl.SSLContext(proto)
+        self.assertIs(ctx.protocol, proto)
+
+    def test_random(self):
+        v = ssl.RAND_status()
+        if support.verbose:
+            sys.stdout.write("\n RAND_status is %d (%s)\n"
+                             % (v, (v and "sufficient randomness") or
+                                "insufficient randomness"))
+
+        data, is_cryptographic = ssl.RAND_pseudo_bytes(16)
+        self.assertEqual(len(data), 16)
+        self.assertEqual(is_cryptographic, v == 1)
+        if v:
+            data = ssl.RAND_bytes(16)
+            self.assertEqual(len(data), 16)
+        else:
+            self.assertRaises(ssl.SSLError, ssl.RAND_bytes, 16)
+
+        # negative num is invalid
+        self.assertRaises(ValueError, ssl.RAND_bytes, -5)
+        self.assertRaises(ValueError, ssl.RAND_pseudo_bytes, -5)
+
+        if hasattr(ssl, 'RAND_egd'):
+            self.assertRaises(TypeError, ssl.RAND_egd, 1)
+            self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
+        ssl.RAND_add("this is a random string", 75.0)
+        ssl.RAND_add(b"this is a random bytes object", 75.0)
+        ssl.RAND_add(bytearray(b"this is a random bytearray object"), 75.0)
+
+    @unittest.skipUnless(os.name == 'posix', 'requires posix')
+    def test_random_fork(self):
+        status = ssl.RAND_status()
+        if not status:
+            self.fail("OpenSSL's PRNG has insufficient randomness")
+
+        rfd, wfd = os.pipe()
+        pid = os.fork()
+        if pid == 0:
+            try:
+                os.close(rfd)
+                child_random = ssl.RAND_pseudo_bytes(16)[0]
+                self.assertEqual(len(child_random), 16)
+                os.write(wfd, child_random)
+                os.close(wfd)
+            except BaseException:
+                os._exit(1)
+            else:
+                os._exit(0)
+        else:
+            os.close(wfd)
+            self.addCleanup(os.close, rfd)
+            _, status = os.waitpid(pid, 0)
+            self.assertEqual(status, 0)
+
+            child_random = os.read(rfd, 16)
+            self.assertEqual(len(child_random), 16)
+            parent_random = ssl.RAND_pseudo_bytes(16)[0]
+            self.assertEqual(len(parent_random), 16)
+
+            self.assertNotEqual(child_random, parent_random)
+
+    maxDiff = None
+
+    def test_parse_cert(self):
+        # note that this uses an 'unofficial' function in _ssl.c,
+        # provided solely for this test, to exercise the certificate
+        # parsing code
+        p = ssl._ssl._test_decode_cert(CERTFILE)
+        if support.verbose:
+            sys.stdout.write("\n" + pprint.pformat(p) + "\n")
+        self.assertEqual(p['issuer'],
+                         ((('countryName', 'XY'),),
+                          (('localityName', 'Castle Anthrax'),),
+                          (('organizationName', 'Python Software 
Foundation'),),
+                          (('commonName', 'localhost'),))
+                        )
+        # Note the next three asserts will fail if the keys are regenerated
+        self.assertEqual(p['notAfter'], asn1time('Aug 26 14:23:15 2028 GMT'))
+        self.assertEqual(p['notBefore'], asn1time('Aug 29 14:23:15 2018 GMT'))
+        self.assertEqual(p['serialNumber'], '98A7CF88C74A32ED')
+        self.assertEqual(p['subject'],
+                         ((('countryName', 'XY'),),
+                          (('localityName', 'Castle Anthrax'),),
+                          (('organizationName', 'Python Software 
Foundation'),),
+                          (('commonName', 'localhost'),))
+                        )
+        self.assertEqual(p['subjectAltName'], (('DNS', 'localhost'),))
+        # Issue #13034: the subjectAltName in some certificates
+        # (notably projects.developer.nokia.com:443) wasn't parsed
+        p = ssl._ssl._test_decode_cert(NOKIACERT)
+        if support.verbose:
+            sys.stdout.write("\n" + pprint.pformat(p) + "\n")
+        self.assertEqual(p['subjectAltName'],
+                         (('DNS', 'projects.developer.nokia.com'),
+                          ('DNS', 'projects.forum.nokia.com'))
+                        )
+        # extra OCSP and AIA fields
+        self.assertEqual(p['OCSP'], ('http://ocsp.verisign.com',))
+        self.assertEqual(p['caIssuers'],
+                         ('http://SVRIntl-G3-aia.verisign.com/SVRIntlG3.cer',))
+        self.assertEqual(p['crlDistributionPoints'],
+                         ('http://SVRIntl-G3-crl.verisign.com/SVRIntlG3.crl',))
+
+    def test_parse_cert_CVE_2019_5010(self):
+        p = ssl._ssl._test_decode_cert(TALOS_INVALID_CRLDP)
+        if support.verbose:
+            sys.stdout.write("\n" + pprint.pformat(p) + "\n")
+        self.assertEqual(
+            p,
+            {
+                'issuer': (
+                    (('countryName', 'UK'),), (('commonName', 'cody-ca'),)),
+                'notAfter': 'Jun 14 18:00:58 2028 GMT',
+                'notBefore': 'Jun 18 18:00:58 2018 GMT',
+                'serialNumber': '02',
+                'subject': ((('countryName', 'UK'),),
+                            (('commonName',
+                              'codenomicon-vm-2.test.lal.cisco.com'),)),
+                'subjectAltName': (
+                    ('DNS', 'codenomicon-vm-2.test.lal.cisco.com'),),
+                'version': 3
+            }
+        )
+
+    def test_parse_cert_CVE_2013_4238(self):
+        p = ssl._ssl._test_decode_cert(NULLBYTECERT)
+        if support.verbose:
+            sys.stdout.write("\n" + pprint.pformat(p) + "\n")
+        subject = ((('countryName', 'US'),),
+                   (('stateOrProvinceName', 'Oregon'),),
+                   (('localityName', 'Beaverton'),),
+                   (('organizationName', 'Python Software Foundation'),),
+                   (('organizationalUnitName', 'Python Core Development'),),
+                   (('commonName', 'null.python.org\x00example.org'),),
+                   (('emailAddress', 'python-...@python.org'),))
+        self.assertEqual(p['subject'], subject)
+        self.assertEqual(p['issuer'], subject)
+        if ssl._OPENSSL_API_VERSION >= (0, 9, 8):
+            san = (('DNS', 'altnull.python.org\x00example.com'),
+                   ('email', 'n...@python.org\x00u...@example.org'),
+                   ('URI', 'http://null.python.org\x00http://example.org'),
+                   ('IP Address', '192.0.2.1'),
+                   ('IP Address', '2001:DB8:0:0:0:0:0:1\n'))
+        else:
+            # OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName
+            san = (('DNS', 'altnull.python.org\x00example.com'),
+                   ('email', 'n...@python.org\x00u...@example.org'),
+                   ('URI', 'http://null.python.org\x00http://example.org'),
+                   ('IP Address', '192.0.2.1'),
+                   ('IP Address', '<invalid>'))
+
+        self.assertEqual(p['subjectAltName'], san)
+
+    def test_parse_all_sans(self):
+        p = ssl._ssl._test_decode_cert(ALLSANFILE)
+        self.assertEqual(p['subjectAltName'],
+            (
+                ('DNS', 'allsans'),
+                ('othername', '<unsupported>'),
+                ('othername', '<unsupported>'),
+                ('email', 'u...@example.org'),
+                ('DNS', 'www.example.org'),
+                ('DirName',
+                    ((('countryName', 'XY'),),
+                    (('localityName', 'Castle Anthrax'),),
+                    (('organizationName', 'Python Software Foundation'),),
+                    (('commonName', 'dirname example'),))),
+                ('URI', 'https://www.python.org/'),
+                ('IP Address', '127.0.0.1'),
+                ('IP Address', '0:0:0:0:0:0:0:1\n'),
+                ('Registered ID', '1.2.3.4.5')
+            )
+        )
+
+    def test_DER_to_PEM(self):
+        with open(CAFILE_CACERT, 'r') as f:
+            pem = f.read()
+        d1 = ssl.PEM_cert_to_DER_cert(pem)
+        p2 = ssl.DER_cert_to_PEM_cert(d1)
+        d2 = ssl.PEM_cert_to_DER_cert(p2)
+        self.assertEqual(d1, d2)
+        if not p2.startswith(ssl.PEM_HEADER + '\n'):
+            self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2)
+        if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'):
+            self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2)
+
+    def test_openssl_version(self):
+        n = ssl.OPENSSL_VERSION_NUMBER
+        t = ssl.OPENSSL_VERSION_INFO
+        s = ssl.OPENSSL_VERSION
+        self.assertIsInstance(n, int)
+        self.assertIsInstance(t, tuple)
+        self.assertIsInstance(s, str)
+        # Some sanity checks follow
+        # >= 0.9
+        self.assertGreaterEqual(n, 0x900000)
+        # < 3.0
+        self.assertLess(n, 0x30000000)
+        major, minor, fix, patch, status = t
+        self.assertGreaterEqual(major, 0)
+        self.assertLess(major, 3)
+        self.assertGreaterEqual(minor, 0)
+        self.assertLess(minor, 256)
+        self.assertGreaterEqual(fix, 0)
+        self.assertLess(fix, 256)
+        self.assertGreaterEqual(patch, 0)
+        self.assertLessEqual(patch, 63)
+        self.assertGreaterEqual(status, 0)
+        self.assertLessEqual(status, 15)
+        # Version string as returned by {Open,Libre}SSL, the format might 
change
+        if IS_LIBRESSL:
+            self.assertTrue(s.startswith("LibreSSL {:d}".format(major)),
+                            (s, t, hex(n)))
+        else:
+            self.assertTrue(s.startswith("OpenSSL 
{:d}.{:d}.{:d}".format(major, minor, fix)),
+                            (s, t, hex(n)))
+
+    @support.cpython_only
+    def test_refcycle(self):
+        # Issue #7943: an SSL object doesn't create reference cycles with
+        # itself.
+        s = socket.socket(socket.AF_INET)
+        ss = test_wrap_socket(s)
+        wr = weakref.ref(ss)
+        with support.check_warnings(("", ResourceWarning)):
+            del ss
+        self.assertEqual(wr(), None)
+
+    def test_wrapped_unconnected(self):
+        # Methods on an unconnected SSLSocket propagate the original
+        # OSError raise by the underlying socket object.
+        s = socket.socket(socket.AF_INET)
+        with test_wrap_socket(s) as ss:
+            self.assertRaises(OSError, ss.recv, 1)
+            self.assertRaises(OSError, ss.recv_into, bytearray(b'x'))
+            self.assertRaises(OSError, ss.recvfrom, 1)
+            self.assertRaises(OSError, ss.recvfrom_into, bytearray(b'x'), 1)
+            self.assertRaises(OSError, ss.send, b'x')
+            self.assertRaises(OSError, ss.sendto, b'x', ('0.0.0.0', 0))
+            self.assertRaises(NotImplementedError, ss.dup)
+            self.assertRaises(NotImplementedError, ss.sendmsg,
+                              [b'x'], (), 0, ('0.0.0.0', 0))
+            self.assertRaises(NotImplementedError, ss.recvmsg, 100)
+            self.assertRaises(NotImplementedError, ss.recvmsg_into,
+                              [bytearray(100)])
+
+    def test_timeout(self):
+        # Issue #8524: when creating an SSL socket, the timeout of the
+        # original socket should be retained.
+        for timeout in (None, 0.0, 5.0):
+            s = socket.socket(socket.AF_INET)
+            s.settimeout(timeout)
+            with test_wrap_socket(s) as ss:
+                self.assertEqual(timeout, ss.gettimeout())
+
+    def test_errors_sslwrap(self):
+        sock = socket.socket()
+        self.assertRaisesRegex(ValueError,
+                        "certfile must be specified",
+                        ssl.wrap_socket, sock, keyfile=CERTFILE)
+        self.assertRaisesRegex(ValueError,
+                        "certfile must be specified for server-side 
operations",
+                        ssl.wrap_socket, sock, server_side=True)
+        self.assertRaisesRegex(ValueError,
+                        "certfile must be specified for server-side 
operations",
+                         ssl.wrap_socket, sock, server_side=True, certfile="")
+        with ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE) as s:
+            self.assertRaisesRegex(ValueError, "can't connect in server-side 
mode",
+                                     s.connect, (HOST, 8080))
+        with self.assertRaises(OSError) as cm:
+            with socket.socket() as sock:
+                ssl.wrap_socket(sock, certfile=NONEXISTINGCERT)
+        self.assertEqual(cm.exception.errno, errno.ENOENT)
+        with self.assertRaises(OSError) as cm:
+            with socket.socket() as sock:
+                ssl.wrap_socket(sock,
+                    certfile=CERTFILE, keyfile=NONEXISTINGCERT)
+        self.assertEqual(cm.exception.errno, errno.ENOENT)
+        with self.assertRaises(OSError) as cm:
+            with socket.socket() as sock:
+                ssl.wrap_socket(sock,
+                    certfile=NONEXISTINGCERT, keyfile=NONEXISTINGCERT)
+        self.assertEqual(cm.exception.errno, errno.ENOENT)
+
+    def bad_cert_test(self, certfile):
+        """Check that trying to use the given client certificate fails"""
+        certfile = os.path.join(os.path.dirname(__file__) or os.curdir,
+                                   certfile)
+        sock = socket.socket()
+        self.addCleanup(sock.close)
+        with self.assertRaises(ssl.SSLError):
+            test_wrap_socket(sock,
+                            certfile=certfile,
+                            ssl_version=ssl.PROTOCOL_TLSv1)
+
+    def test_empty_cert(self):
+        """Wrapping with an empty cert file"""
+        self.bad_cert_test("nullcert.pem")
+
+    def test_malformed_cert(self):
+        """Wrapping with a badly formatted certificate (syntax error)"""
+        self.bad_cert_test("badcert.pem")
+
+    def test_malformed_key(self):
+        """Wrapping with a badly formatted key (syntax error)"""
+        self.bad_cert_test("badkey.pem")
+
+    def test_match_hostname(self):
+        def ok(cert, hostname):
+            ssl.match_hostname(cert, hostname)
+        def fail(cert, hostname):
+            self.assertRaises(ssl.CertificateError,
+                              ssl.match_hostname, cert, hostname)
+
+        # -- Hostname matching --
+
+        cert = {'subject': ((('commonName', 'example.com'),),)}
+        ok(cert, 'example.com')
+        ok(cert, 'ExAmple.cOm')
+        fail(cert, 'www.example.com')
+        fail(cert, '.example.com')
+        fail(cert, 'example.org')
+        fail(cert, 'exampleXcom')
+
+        cert = {'subject': ((('commonName', '*.a.com'),),)}
+        ok(cert, 'foo.a.com')
+        fail(cert, 'bar.foo.a.com')
+        fail(cert, 'a.com')
+        fail(cert, 'Xa.com')
+        fail(cert, '.a.com')
+
+        # only match one left-most wildcard
+        cert = {'subject': ((('commonName', 'f*.com'),),)}
+        ok(cert, 'foo.com')
+        ok(cert, 'f.com')
+        fail(cert, 'bar.com')
+        fail(cert, 'foo.a.com')
+        fail(cert, 'bar.foo.com')
+
+        # NULL bytes are bad, CVE-2013-4073
+        cert = {'subject': ((('commonName',
+                              'null.python.org\x00example.org'),),)}
+        ok(cert, 'null.python.org\x00example.org') # or raise an error?
+        fail(cert, 'example.org')
+        fail(cert, 'null.python.org')
+
+        # error cases with wildcards
+        cert = {'subject': ((('commonName', '*.*.a.com'),),)}
+        fail(cert, 'bar.foo.a.com')
+        fail(cert, 'a.com')
+        fail(cert, 'Xa.com')
+        fail(cert, '.a.com')
+
+        cert = {'subject': ((('commonName', 'a.*.com'),),)}
+        fail(cert, 'a.foo.com')
+        fail(cert, 'a..com')
+        fail(cert, 'a.com')
+
+        # wildcard doesn't match IDNA prefix 'xn--'
+        idna = 'p&#252;thon.python.org'.encode("idna").decode("ascii")
+        cert = {'subject': ((('commonName', idna),),)}
+        ok(cert, idna)
+        cert = {'subject': ((('commonName', 'x*.python.org'),),)}
+        fail(cert, idna)
+        cert = {'subject': ((('commonName', 'xn--p*.python.org'),),)}
+        fail(cert, idna)
+
+        # wildcard in first fragment and  IDNA A-labels in sequent fragments
+        # are supported.
+        idna = 'www*.pyth&#246;n.org'.encode("idna").decode("ascii")
+        cert = {'subject': ((('commonName', idna),),)}
+        ok(cert, 'www.pyth&#246;n.org'.encode("idna").decode("ascii"))
+        ok(cert, 'www1.pyth&#246;n.org'.encode("idna").decode("ascii"))
+        fail(cert, 'ftp.pyth&#246;n.org'.encode("idna").decode("ascii"))
+        fail(cert, 'pyth&#246;n.org'.encode("idna").decode("ascii"))
+
+        # Slightly fake real-world example
+        cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT',
+                'subject': ((('commonName', 'linuxfrz.org'),),),
+                'subjectAltName': (('DNS', 'linuxfr.org'),
+                                   ('DNS', 'linuxfr.com'),
+                                   ('othername', '<unsupported>'))}
+        ok(cert, 'linuxfr.org')
+        ok(cert, 'linuxfr.com')
+        # Not a "DNS" entry
+        fail(cert, '<unsupported>')
+        # When there is a subjectAltName, commonName isn't used
+        fail(cert, 'linuxfrz.org')
+
+        # A pristine real-world example
+        cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
+                'subject': ((('countryName', 'US'),),
+                            (('stateOrProvinceName', 'California'),),
+                            (('localityName', 'Mountain View'),),
+                            (('organizationName', 'Google Inc'),),
+                            (('commonName', 'mail.google.com'),))}
+        ok(cert, 'mail.google.com')
+        fail(cert, 'gmail.com')
+        # Only commonName is considered
+        fail(cert, 'California')
+
+        # -- IPv4 matching --
+        cert = {'subject': ((('commonName', 'example.com'),),),
+                'subjectAltName': (('DNS', 'example.com'),
+                                   ('IP Address', '10.11.12.13'),
+                                   ('IP Address', '14.15.16.17'))}
+        ok(cert, '10.11.12.13')
+        ok(cert, '14.15.16.17')
+        fail(cert, '14.15.16.18')
+        fail(cert, 'example.net')
+
+        # -- IPv6 matching --
+        cert = {'subject': ((('commonName', 'example.com'),),),
+                'subjectAltName': (('DNS', 'example.com'),
+                                   ('IP Address', '2001:0:0:0:0:0:0:CAFE\n'),
+                                   ('IP Address', '2003:0:0:0:0:0:0:BABA\n'))}
+        ok(cert, '2001::cafe')
+        ok(cert, '2003::baba')
+        fail(cert, '2003::bebe')
+        fail(cert, 'example.net')
+
+        # -- Miscellaneous --
+
+        # Neither commonName nor subjectAltName
+        cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
+                'subject': ((('countryName', 'US'),),
+                            (('stateOrProvinceName', 'California'),),
+                            (('localityName', 'Mountain View'),),
+                            (('organizationName', 'Google Inc'),))}
+        fail(cert, 'mail.google.com')
+
+        # No DNS entry in subjectAltName but a commonName
+        cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
+                'subject': ((('countryName', 'US'),),
+                            (('stateOrProvinceName', 'California'),),
+                            (('localityName', 'Mountain View'),),
+                            (('commonName', 'mail.google.com'),)),
+                'subjectAltName': (('othername', 'blabla'), )}
+        ok(cert, 'mail.google.com')
+
+        # No DNS entry subjectAltName and no commonName
+        cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
+                'subject': ((('countryName', 'US'),),
+                            (('stateOrProvinceName', 'California'),),
+                            (('localityName', 'Mountain View'),),
+                            (('organizationName', 'Google Inc'),)),
+                'subjectAltName': (('othername', 'blabla'),)}
+        fail(cert, 'google.com')
+
+        # Empty cert / no cert
+        self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com')
+        self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com')
+
+        # Issue #17980: avoid denials of service by refusing more than one
+        # wildcard per fragment.
+        cert = {'subject': ((('commonName', 'a*b.com'),),)}
+        ok(cert, 'axxb.com')
+        cert = {'subject': ((('commonName', 'a*b.co*'),),)}
+        fail(cert, 'axxb.com')
+        cert = {'subject': ((('commonName', 'a*b*.com'),),)}
+        with self.assertRaises(ssl.CertificateError) as cm:
+            ssl.match_hostname(cert, 'axxbxxc.com')
+        self.assertIn("too many wildcards", str(cm.exception))
+
+    def test_server_side(self):
+        # server_hostname doesn't work for server sockets
+        ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+        with socket.socket() as sock:
+            self.assertRaises(ValueError, ctx.wrap_socket, sock, True,
+                              server_hostname="some.hostname")
+
+    def test_unknown_channel_binding(self):
+        # should raise ValueError for unknown type
+        s = socket.socket(socket.AF_INET)
+        s.bind(('127.0.0.1', 0))
+        s.listen()
+        c = socket.socket(socket.AF_INET)
+        c.connect(s.getsockname())
+        with test_wrap_socket(c, do_handshake_on_connect=False) as ss:
+            with self.assertRaises(ValueError):
+                ss.get_channel_binding("unknown-type")
+        s.close()
+
+    @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
+                         "'tls-unique' channel binding not available")
+    def test_tls_unique_channel_binding(self):
+        # unconnected should return None for known type
+        s = socket.socket(socket.AF_INET)
+        with test_wrap_socket(s) as ss:
+            self.assertIsNone(ss.get_channel_binding("tls-unique"))
+        # the same for server-side
+        s = socket.socket(socket.AF_INET)
+        with test_wrap_socket(s, server_side=True, certfile=CERTFILE) as ss:
+            self.assertIsNone(ss.get_channel_binding("tls-unique"))
+
+    def test_dealloc_warn(self):
+        ss = test_wrap_socket(socket.socket(socket.AF_INET))
+        r = repr(ss)
+        with self.assertWarns(ResourceWarning) as cm:
+            ss = None
+            support.gc_collect()
+        self.assertIn(r, str(cm.warning.args[0]))
+
+    def test_get_default_verify_paths(self):
+        paths = ssl.get_default_verify_paths()
+        self.assertEqual(len(paths), 6)
+        self.assertIsInstance(paths, ssl.DefaultVerifyPaths)
+
+        with support.EnvironmentVarGuard() as env:
+            env["SSL_CERT_DIR"] = CAPATH
+            env["SSL_CERT_FILE"] = CERTFILE
+            paths = ssl.get_default_verify_paths()
+            self.assertEqual(paths.cafile, CERTFILE)
+            self.assertEqual(paths.capath, CAPATH)
+
+    @unittest.skipUnless(sys.platform == "win32", "Windows specific")
+    def test_enum_certificates(self):
+        self.assertTrue(ssl.enum_certificates("CA"))
+        self.assertTrue(ssl.enum_certificates("ROOT"))
+
+        self.assertRaises(TypeError, ssl.enum_certificates)
+        self.assertRaises(WindowsError, ssl.enum_certificates, "")
+
+        trust_oids = set()
+        for storename in ("CA", "ROOT"):
+            store = ssl.enum_certificates(storename)
+            self.assertIsInstance(store, list)
+            for element in store:
+                self.assertIsInstance(element, tuple)
+                self.assertEqual(len(element), 3)
+                cert, enc, trust = element
+                self.assertIsInstance(cert, bytes)
+                self.assertIn(enc, {"x509_asn", "pkcs_7_asn"})
+                self.assertIsInstance(trust, (set, bool))
+                if isinstance(trust, set):
+                    trust_oids.update(trust)
+
+        serverAuth = "1.3.6.1.5.5.7.3.1"
+        self.assertIn(serverAuth, trust_oids)
+
+    @unittest.skipUnless(sys.platform == "win32", "Windows specific")
+    def test_enum_crls(self):
+        self.assertTrue(ssl.enum_crls("CA"))
+        self.assertRaises(TypeError, ssl.enum_crls)
+        self.assertRaises(WindowsError, ssl.enum_crls, "")
+
+        crls = ssl.enum_crls("CA")
+        self.assertIsInstance(crls, list)
+        for element in crls:
+            self.assertIsInstance(element, tuple)
+            self.assertEqual(len(element), 2)
+            self.assertIsInstance(element[0], bytes)
+            self.assertIn(element[1], {"x509_asn", "pkcs_7_asn"})
+
+
+    def test_asn1object(self):
+        expected = (129, 'serverAuth', 'TLS Web Server Authentication',
+                    '1.3.6.1.5.5.7.3.1')
+
+        val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
+        self.assertEqual(val, expected)
+        self.assertEqual(val.nid, 129)
+        self.assertEqual(val.shortname, 'serverAuth')
+        self.assertEqual(val.longname, 'TLS Web Server Authentication')
+        self.assertEqual(val.oid, '1.3.6.1.5.5.7.3.1')
+        self.assertIsInstance(val, ssl._ASN1Object)
+        self.assertRaises(ValueError, ssl._ASN1Object, 'serverAuth')
+
+        val = ssl._ASN1Object.fromnid(129)
+        self.assertEqual(val, expected)
+        self.assertIsInstance(val, ssl._ASN1Object)
+        self.assertRaises(ValueError, ssl._ASN1Object.fromnid, -1)
+        with self.assertRaisesRegex(ValueError, "unknown NID 100000"):
+            ssl._ASN1Object.fromnid(100000)
+        for i in range(1000):
+            try:
+                obj = ssl._ASN1Object.fromnid(i)
+            except ValueError:
+                pass
+            else:
+                self.assertIsInstance(obj.nid, int)
+                self.assertIsInstance(obj.shortname, str)
+                self.assertIsInstance(obj.longname, str)
+                self.assertIsInstance(obj.oid, (str, type(None)))
+
+        val = ssl._ASN1Object.fromname('TLS Web Server Authentication')
+        self.assertEqual(val, expected)
+        self.assertIsInstance(val, ssl._ASN1Object)
+        self.assertEqual(ssl._ASN1Object.fromname('serverAuth'), expected)
+        self.assertEqual(ssl._ASN1Object.fromname('1.3.6.1.5.5.7.3.1'),
+                         expected)
+        with self.assertRaisesRegex(ValueError, "unknown object 'serverauth'"):
+            ssl._ASN1Object.fromname('serverauth')
+
+    def test_purpose_enum(self):
+        val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
+        self.assertIsInstance(ssl.Purpose.SERVER_AUTH, ssl._ASN1Object)
+        self.assertEqual(ssl.Purpose.SERVER_AUTH, val)
+        self.assertEqual(ssl.Purpose.SERVER_AUTH.nid, 129)
+        self.assertEqual(ssl.Purpose.SERVER_AUTH.shortname, 'serverAuth')
+        self.assertEqual(ssl.Purpose.SERVER_AUTH.oid,
+                              '1.3.6.1.5.5.7.3.1')
+
+        val = ssl._ASN1Object('1.3.6.1.5.5.7.3.2')
+        self.assertIsInstance(ssl.Purpose.CLIENT_AUTH, ssl._ASN1Object)
+        self.assertEqual(ssl.Purpose.CLIENT_AUTH, val)
+        self.assertEqual(ssl.Purpose.CLIENT_AUTH.nid, 130)
+        self.assertEqual(ssl.Purpose.CLIENT_AUTH.shortname, 'clientAuth')
+        self.assertEqual(ssl.Purpose.CLIENT_AUTH.oid,
+                              '1.3.6.1.5.5.7.3.2')
+
+    def test_unsupported_dtls(self):
+        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+        self.addCleanup(s.close)
+        with self.assertRaises(NotImplementedError) as cx:
+            test_wrap_socket(s, cert_reqs=ssl.CERT_NONE)
+        self.assertEqual(str(cx.exception), "only stream sockets are 
supported")
+        ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+        with self.assertRaises(NotImplementedError) as cx:
+            ctx.wrap_socket(s)
+        self.assertEqual(str(cx.exception), "only stream sockets are 
supported")
+
+    def cert_time_ok(self, timestring, timestamp):
+        self.assertEqual(ssl.cert_time_to_seconds(timestring), timestamp)
+
+    def cert_time_fail(self, timestring):
+        with self.assertRaises(ValueError):
+            ssl.cert_time_to_seconds(timestring)
+
+    @unittest.skipUnless(utc_offset(),
+                         'local time needs to be different from UTC')
+    def test_cert_time_to_seconds_timezone(self):
+        # Issue #19940: ssl.cert_time_to_seconds() returns wrong
+        #               results if local timezone is not UTC
+        self.cert_time_ok("May  9 00:00:00 2007 GMT", 1178668800.0)
+        self.cert_time_ok("Jan  5 09:34:43 2018 GMT", 1515144883.0)
+
+    def test_cert_time_to_seconds(self):
+        timestring = "Jan  5 09:34:43 2018 GMT"
+        ts = 1515144883.0
+        self.cert_time_ok(timestring, ts)
+        # accept keyword parameter, assert its name
+        self.assertEqual(ssl.cert_time_to_seconds(cert_time=timestring), ts)
+        # accept both %e and %d (space or zero generated by strftime)
+        self.cert_time_ok("Jan 05 09:34:43 2018 GMT", ts)
+        # case-insensitive
+        self.cert_time_ok("JaN  5 09:34:43 2018 GmT", ts)
+        self.cert_time_fail("Jan  5 09:34 2018 GMT")     # no seconds
+        self.cert_time_fail("Jan  5 09:34:43 2018")      # no GMT
+        self.cert_time_fail("Jan  5 09:34:43 2018 UTC")  # not GMT timezone
+        self.cert_time_fail("Jan 35 09:34:43 2018 GMT")  # invalid day
+        self.cert_time_fail("Jon  5 09:34:43 2018 GMT")  # invalid month
+        self.cert_time_fail("Jan  5 24:00:00 2018 GMT")  # invalid hour
+        self.cert_time_fail("Jan  5 09:60:43 2018 GMT")  # invalid minute
+
+        newyear_ts = 1230768000.0
+        # leap seconds
+        self.cert_time_ok("Dec 31 23:59:60 2008 GMT", newyear_ts)
+        # same timestamp
+        self.cert_time_ok("Jan  1 00:00:00 2009 GMT", newyear_ts)
+
+        self.cert_time_ok("Jan  5 09:34:59 2018 GMT", 1515144899)
+        #  allow 60th second (even if it is not a leap second)
+        self.cert_time_ok("Jan  5 09:34:60 2018 GMT", 1515144900)
+        #  allow 2nd leap second for compatibility with time.strptime()
+        self.cert_time_ok("Jan  5 09:34:61 2018 GMT", 1515144901)
+        self.cert_time_fail("Jan  5 09:34:62 2018 GMT")  # invalid seconds
+
+        # no special treatment for the special value:
+        #   99991231235959Z (rfc 5280)
+        self.cert_time_ok("Dec 31 23:59:59 9999 GMT", 253402300799.0)
+
+    @support.run_with_locale('LC_ALL', '')
+    def test_cert_time_to_seconds_locale(self):
+        # `cert_time_to_seconds()` should be locale independent
+
+        def local_february_name():
+            return time.strftime('%b', (1, 2, 3, 4, 5, 6, 0, 0, 0))
+
+        if local_february_name().lower() == 'feb':
+            self.skipTest("locale-specific month name needs to be "
+                          "different from C locale")
+
+        # locale-independent
+        self.cert_time_ok("Feb  9 00:00:00 2007 GMT", 1170979200.0)
+        self.cert_time_fail(local_february_name() + "  9 00:00:00 2007 GMT")
+
+    def test_connect_ex_error(self):
+        server = socket.socket(socket.AF_INET)
+        self.addCleanup(server.close)
+        port = support.bind_port(server)  # Reserve port but don't listen
+        s = test_wrap_socket(socket.socket(socket.AF_INET),
+                            cert_reqs=ssl.CERT_REQUIRED)
+        self.addCleanup(s.close)
+        rc = s.connect_ex((HOST, port))
+        # Issue #19919: Windows machines or VMs hosted on Windows
+        # machines sometimes return EWOULDBLOCK.
+        errors = (
+            errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.ETIMEDOUT,
+            errno.EWOULDBLOCK,
+        )
+        self.assertIn(rc, errors)
+
+
+class ContextTests(unittest.TestCase):
+
+    @skip_if_broken_ubuntu_ssl
+    def test_constructor(self):
+        for protocol in PROTOCOLS:
+            ssl.SSLContext(protocol)
+        ctx = ssl.SSLContext()
+        self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
+        self.assertRaises(ValueError, ssl.SSLContext, -1)
+        self.assertRaises(ValueError, ssl.SSLContext, 42)
+
+    @skip_if_broken_ubuntu_ssl
+    def test_protocol(self):
+        for proto in PROTOCOLS:
+            ctx = ssl.SSLContext(proto)
+            self.assertEqual(ctx.protocol, proto)
+
+    def test_ciphers(self):
+        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
+        ctx.set_ciphers("ALL")
+        ctx.set_ciphers("DEFAULT")
+        with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
+            ctx.set_ciphers("^$:,;?*'dorothyx")
+
+    @unittest.skipIf(ssl.OPENSSL_VERSION_INFO < (1, 0, 2, 0, 0), 'OpenSSL too 
old')
+    def test_get_ciphers(self):
+        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
+        ctx.set_ciphers('AESGCM')
+        names = set(d['name'] for d in ctx.get_ciphers())
+        self.assertIn('AES256-GCM-SHA384', names)
+        self.assertIn('AES128-GCM-SHA256', names)
+
+    @skip_if_broken_ubuntu_ssl
+    def test_options(self):
+        ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
+        # OP_ALL | OP_NO_SSLv2 | OP_NO_SSLv3 is the default value
+        default = (ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
+        # SSLContext also enables these by default
+        default |= (OP_NO_COMPRESSION | OP_CIPHER_SERVER_PREFERENCE |
+                    OP_SINGLE_DH_USE | OP_SINGLE_ECDH_USE |
+                    OP_ENABLE_MIDDLEBOX_COMPAT)
+        self.assertEqual(default, ctx.options)
+        ctx.options |= ssl.OP_NO_TLSv1
+        self.assertEqual(default | ssl.OP_NO_TLSv1, ctx.options)
+        if can_clear_options():
+            ctx.options = (ctx.options & ~ssl.OP_NO_TLSv1)
+            self.assertEqual(default, ctx.options)
+            ctx.options = 0
+            # Ubuntu has OP_NO_SSLv3 forced on by default
+            self.assertEqual(0, ctx.options & ~ssl.OP_NO_SSLv3)
+        else:
+            with self.assertRaises(ValueError):
+                ctx.options = 0
+
+    def test_verify_mode(self):
+        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
+        # Default value
+        self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
+        ctx.verify_mode = ssl.CERT_OPTIONAL
+        self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
+        ctx.verify_mode = ssl.CERT_REQUIRED
+        self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
+        ctx.verify_mode = ssl.CERT_NONE
+        self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
+        with self.assertRaises(TypeError):
+            ctx.verify_mode = None
+        with self.assertRaises(ValueError):
+            ctx.verify_mode = 42
+
+    @unittest.skipUnless(have_verify_flags(),
+                         "verify_flags need OpenSSL > 0.9.8")
+    def test_verify_flags(self):
+        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
+        # default value
+        tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
+        self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT | tf)
+        ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
+        self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_LEAF)
+        ctx.verify_flags = ssl.VERIFY_CRL_CHECK_CHAIN
+        self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_CHAIN)
+        ctx.verify_flags = ssl.VERIFY_DEFAULT
+        self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
+        # supports any value
+        ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT
+        self.assertEqual(ctx.verify_flags,
+                         ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT)
+        with self.assertRaises(TypeError):
+            ctx.verify_flags = None
+
+    def test_load_cert_chain(self):
+        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
+        # Combined key and cert in a single file
+        ctx.load_cert_chain(CERTFILE, keyfile=None)
+        ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE)
+        self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE)
+        with self.assertRaises(OSError) as cm:
+            ctx.load_cert_chain(NONEXISTINGCERT)
+        self.assertEqual(cm.exception.errno, errno.ENOENT)
+        with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
+            ctx.load_cert_chain(BADCERT)
+        with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
+            ctx.load_cert_chain(EMPTYCERT)
+        # Separate key and cert
+        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
+        ctx.load_cert_chain(ONLYCERT, ONLYKEY)
+        ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY)
+        ctx.load_cert_chain(certfile=BYTES_ONLYCERT, keyfile=BYTES_ONLYKEY)
+        with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
+            ctx.load_cert_chain(ONLYCERT)
+        with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
+            ctx.load_cert_chain(ONLYKEY)
+        with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
+            ctx.load_cert_chain(certfile=ONLYKEY, keyfile=ONLYCERT)
+        # Mismatching key and cert
+        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
+        with self.assertRaisesRegex(ssl.SSLError, "key values mismatch"):
+            ctx.load_cert_chain(CAFILE_CACERT, ONLYKEY)
+        # Password protected key and cert
+        ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD)
+        ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD.encode())
+        ctx.load_cert_chain(CERTFILE_PROTECTED,
+                            password=bytearray(KEY_PASSWORD.encode()))
+        ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD)
+        ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD.encode())
+        ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED,
+                            bytearray(KEY_PASSWORD.encode()))
+        with self.assertRaisesRegex(TypeError, "should be a string"):
+            ctx.load_cert_chain(CERTFILE_PROTECTED, password=True)
+        with self.assertRaises(ssl.SSLError):
+            ctx.load_cert_chain(CERTFILE_PROTECTED, password="badpass")
+        with self.assertRaisesRegex(ValueError, "cannot be longer"):
+            # openssl has a fixed limit on the password buffer.
+            # PEM_BUFSIZE is generally set to 1kb.
+            # Return a string larger than this.
+            ctx.load_cert_chain(CERTFILE_PROTECTED, password=b'a' * 102400)
+        # Password callback
+        def getpass_unicode():
+            return KEY_PASSWORD
+        def getpass_bytes():
+            return KEY_PASSWORD.encode()
+        def getpass_bytearray():
+            return bytearray(KEY_PASSWORD.encode())
+        def getpass_badpass():
+            return "badpass"
+        def getpass_huge():
+            return b'a' * (1024 * 1024)
+        def getpass_bad_type():
+            return 9
+        def getpass_exception():
+            raise Exception('getpass error')
+        class GetPassCallable:
+            def __call__(self):
+                return KEY_PASSWORD
+            def getpass(self):
+                return KEY_PASSWORD
+        ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_unicode)
+        ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytes)
+        ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytearray)
+        ctx.load_cert_chain(CERTFILE_PROTECTED, password=GetPassCallable())
+        ctx.load_cert_chain(CERTFILE_PROTECTED,
+                            password=GetPassCallable().getpass)
+        with self.assertRaises(ssl.SSLError):
+            ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_badpass)
+        with self.assertRaisesRegex(ValueError, "cannot be longer"):
+            ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_huge)
+        with self.assertRaisesRegex(TypeError, "must return a string"):
+            ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bad_type)
+        with self.assertRaisesRegex(Exception, "getpass error"):
+            ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_exception)
+        # Make sure the password function isn't called if it isn't needed
+        ctx.load_cert_chain(CERTFILE, password=getpass_exception)
+
+    def test_load_verify_locations(self):
+        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
+        ctx.load_verify_locations(CERTFILE)
+        ctx.load_verify_locations(cafile=CERTFILE, capath=None)
+        ctx.load_verify_locations(BYTES_CERTFILE)
+        ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None)
+        self.assertRaises(TypeError, ctx.load_verify_locations)
+        self.assertRaises(TypeError, ctx.load_verify_locations, None, None, 
None)
+        with self.assertRaises(OSError) as cm:
+            ctx.load_verify_locations(NONEXISTINGCERT)
+        self.assertEqual(cm.exception.errno, errno.ENOENT)
+        with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
+            ctx.load_verify_locations(BADCERT)
+        ctx.load_verify_locations(CERTFILE, CAPATH)
+        ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH)
+
+        # Issue #10989: crash if the second argument type is invalid
+        self.assertRaises(TypeError, ctx.load_verify_locations, None, True)
+
+    def test_load_verify_cadata(self):
+        # test cadata
+        with open(CAFILE_CACERT) as f:
+            cacert_pem = f.read()
+        cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem)
+        with open(CAFILE_NEURONIO) as f:
+            neuronio_pem = f.read()
+        neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem)
+
+        # test PEM
+        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
+        self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0)
+        ctx.load_verify_locations(cadata=cacert_pem)
+        self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1)
+        ctx.load_verify_locations(cadata=neuronio_pem)
+        self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
+        # cert already in hash table
+        ctx.load_verify_locations(cadata=neuronio_pem)
+        self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
+
+        # combined
+        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
+        combined = "\n".join((cacert_pem, neuronio_pem))
+        ctx.load_verify_locations(cadata=combined)
+        self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
+
+        # with junk around the certs
+        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
+        combined = ["head", cacert_pem, "other", neuronio_pem, "again",
+                    neuronio_pem, "tail"]
+        ctx.load_verify_locations(cadata="\n".join(combined))
+        self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
+
+        # test DER
+        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
+        ctx.load_verify_locations(cadata=cacert_der)
+        ctx.load_verify_locations(cadata=neuronio_der)
+        self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
+        # cert already in hash table
+        ctx.load_verify_locations(cadata=cacert_der)
+        self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
+
+        # combined
+        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
+        combined = b"".join((cacert_der, neuronio_der))
+        ctx.load_verify_locations(cadata=combined)
+        self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
+
+        # error cases
+        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
+        self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object)
+
+        with self.assertRaisesRegex(ssl.SSLError, "no start line"):
+            ctx.load_verify_locations(cadata="broken")
+        with self.assertRaisesRegex(ssl.SSLError, "not enough data"):
+            ctx.load_verify_locations(cadata=b"broken")
+
+
+    def test_load_dh_params(self):
+        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
+        ctx.load_dh_params(DHFILE)
+        if os.name != 'nt':
+            ctx.load_dh_params(BYTES_DHFILE)
+        self.assertRaises(TypeError, ctx.load_dh_params)
+        self.assertRaises(TypeError, ctx.load_dh_params, None)
+        with self.assertRaises(FileNotFoundError) as cm:
+            ctx.load_dh_params(NONEXISTINGCERT)
+        self.assertEqual(cm.exception.errno, errno.ENOENT)
+        with self.assertRaises(ssl.SSLError) as cm:
+            ctx.load_dh_params(CERTFILE)
+
+    @skip_if_broken_ubuntu_ssl
+    def test_session_stats(self):
+        for proto in PROTOCOLS:
+            ctx = ssl.SSLContext(proto)
+            self.assertEqual(ctx.session_stats(), {
+                'number': 0,
+                'connect': 0,
+                'connect_good': 0,
+                'connect_renegotiate': 0,
+                'accept': 0,
+                'accept_good': 0,
+                'accept_renegotiate': 0,
+                'hits': 0,
+                'misses': 0,
+                'timeouts': 0,
+                'cache_full': 0,
+            })
+
+    def test_set_default_verify_paths(self):
+        # There's not much we can do to test that it acts as expected,
+        # so just check it doesn't crash or raise an exception.
+        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
+        ctx.set_default_verify_paths()
+
+    @unittest.skipUnless(ssl.HAS_ECDH, "ECDH disabled on this OpenSSL build")
+    def test_set_ecdh_curve(self):
+        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
+        ctx.set_ecdh_curve("prime256v1")
+        ctx.set_ecdh_curve(b"prime256v1")
+        self.assertRaises(TypeError, ctx.set_ecdh_curve)
+        self.assertRaises(TypeError, ctx.set_ecdh_curve, None)
+        self.assertRaises(ValueError, ctx.set_ecdh_curve, "foo")
+        self.assertRaises(ValueError, ctx.set_ecdh_curve, b"foo")
+
+    @needs_sni
+    def test_sni_callback(self):
+        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
+
+        # set_servername_callback expects a callable, or None
+        self.assertRaises(TypeError, ctx.set_servername_callback)
+        self.assertRaises(TypeError, ctx.set_servername_callback, 4)
+        self.assertRaises(TypeError, ctx.set_servername_callback, "")
+        self.assertRaises(TypeError, ctx.set_servername_callback, ctx)
+
+        def dummycallback(sock, servername, ctx):
+            pass
+        ctx.set_servername_callback(None)
+        ctx.set_servername_callback(dummycallback)
+
+    @needs_sni
+    def test_sni_callback_refcycle(self):
+        # Reference cycles through the servername callback are detected
+        # and cleared.
+        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
+        def dummycallback(sock, servername, ctx, cycle=ctx):
+            pass
+        ctx.set_servername_callback(dummycallback)
+        wr = weakref.ref(ctx)
+        del ctx, dummycallback
+        gc.collect()
+        self.assertIs(wr(), None)
+
+    def test_cert_store_stats(self):
+        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
+        self.assertEqual(ctx.cert_store_stats(),
+            {'x509_ca': 0, 'crl': 0, 'x509': 0})
+        ctx.load_cert_chain(CERTFILE)
+        self.assertEqual(ctx.cert_store_stats(),
+            {'x509_ca': 0, 'crl': 0, 'x509': 0})
+        ctx.load_verify_locations(CERTFILE)
+        self.assertEqual(ctx.cert_store_stats(),
+            {'x509_ca': 0, 'crl': 0, 'x509': 1})
+        ctx.load_verify_locations(CAFILE_CACERT)
+        self.assertEqual(ctx.cert_store_stats(),
+            {'x509_ca': 1, 'crl': 0, 'x509': 2})
+
+    def test_get_ca_certs(self):
+        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
+        self.assertEqual(ctx.get_ca_certs(), [])
+        # CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE
+        ctx.load_verify_locations(CERTFILE)
+        self.assertEqual(ctx.get_ca_certs(), [])
+        # but CAFILE_CACERT is a CA cert
+        ctx.load_verify_locations(CAFILE_CACERT)
+        self.assertEqual(ctx.get_ca_certs(),
+            [{'issuer': ((('organizationName', 'Root CA'),),
+                         (('organizationalUnitName', 
'http://www.cacert.org'),),
+                         (('commonName', 'CA Cert Signing Authority'),),
+                         (('emailAddress', 'supp...@cacert.org'),)),
+              'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'),
+              'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'),
+              'serialNumber': '00',
+              'crlDistributionPoints': ('https://www.cacert.org/revoke.crl',),
+              'subject': ((('organizationName', 'Root CA'),),
+                          (('organizationalUnitName', 
'http://www.cacert.org'),),
+                          (('commonName', 'CA Cert Signing Authority'),),
+                          (('emailAddress', 'supp...@cacert.org'),)),
+              'version': 3}])
+
+        with open(CAFILE_CACERT) as f:
+            pem = f.read()
+        der = ssl.PEM_cert_to_DER_cert(pem)
+        self.assertEqual(ctx.get_ca_certs(True), [der])
+
+    def test_load_default_certs(self):
+        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
+        ctx.load_default_certs()
+
+        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
+        ctx.load_default_certs(ssl.Purpose.SERVER_AUTH)
+        ctx.load_default_certs()
+
+        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
+        ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH)
+
+        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
+        self.assertRaises(TypeError, ctx.load_default_certs, None)
+        self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH')
+
+    @unittest.skipIf(sys.platform == "win32", "not-Windows specific")
+    @unittest.skipIf(IS_LIBRESSL, "LibreSSL doesn't support env vars")
+    def test_load_default_certs_env(self):
+        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
+        with support.EnvironmentVarGuard() as env:
+            env["SSL_CERT_DIR"] = CAPATH
+            env["SSL_CERT_FILE"] = CERTFILE
+            ctx.load_default_certs()
+            self.assertEqual(ctx.cert_store_stats(), {"crl": 0, "x509": 1, 
"x509_ca": 0})
+
+    @unittest.skipUnless(sys.platform == "win32", "Windows specific")
+    def test_load_default_certs_env_windows(self):
+        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
+        ctx.load_default_certs()
+        stats = ctx.cert_store_stats()
+
+        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
+        with support.EnvironmentVarGuard() as env:
+            env["SSL_CERT_DIR"] = CAPATH
+            env["SSL_CERT_FILE"] = CERTFILE
+            ctx.load_default_certs()
+            stats["x509"] += 1
+            self.assertEqual(ctx.cert_store_stats(), stats)
+
+    def _assert_context_options(self, ctx):
+        self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
+        if OP_NO_COMPRESSION != 0:
+            self.assertEqual(ctx.options & OP_NO_COMPRESSION,
+                             OP_NO_COMPRESSION)
+        if OP_SINGLE_DH_USE != 0:
+            self.assertEqual(ctx.options & OP_SINGLE_DH_USE,
+                             OP_SINGLE_DH_USE)
+        if OP_SINGLE_ECDH_USE != 0:
+            self.assertEqual(ctx.options & OP_SINGLE_ECDH_USE,
+                             OP_SINGLE_ECDH_USE)
+        if OP_CIPHER_SERVER_PREFERENCE != 0:
+            self.assertEqual(ctx.options & OP_CIPHER_SERVER_PREFERENCE,
+                             OP_CIPHER_SERVER_PREFERENCE)
+
+    def test_create_default_context(self):
+        ctx = ssl.create_default_context()
+
+        self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
+        self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
+        self.assertTrue(ctx.check_hostname)
+        self._assert_context_options(ctx)
+
+
+        with open(SIGNING_CA) as f:
+            cadata = f.read()
+        ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH,
+                                         cadata=cadata)
+        self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
+        self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
+        self._assert_context_options(ctx)
+
+        ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
+        self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
+        self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
+        self._assert_context_options(ctx)
+
+    def test__create_stdlib_context(self):
+        ctx = ssl._create_stdlib_context()
+        self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
+        self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
+        self.assertFalse(ctx.check_hostname)
+        self._assert_context_options(ctx)
+
+        ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
+        self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
+        self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
+        self._assert_context_options(ctx)
+
+        ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1,
+                                         cert_reqs=ssl.CERT_REQUIRED,
+                                         check_hostname=True)
+        self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
+        self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
+        self.assertTrue(ctx.check_hostname)
+        self._assert_context_options(ctx)
+
+        ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH)
+        self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
+        self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
+        self._assert_context_options(ctx)
+
+    def test_check_hostname(self):
+        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
+        self.assertFalse(ctx.check_hostname)
+
+        # Requires CERT_REQUIRED or CERT_OPTIONAL
+        with self.assertRaises(ValueError):
+            ctx.check_hostname = True
+        ctx.verify_mode = ssl.CERT_REQUIRED
+        self.assertFalse(ctx.check_hostname)
+        ctx.check_hostname = True
+        self.assertTrue(ctx.check_hostname)
+
+        ctx.verify_mode = ssl.CERT_OPTIONAL
+        ctx.check_hostname = True
+        self.assertTrue(ctx.check_hostname)
+
+        # Cannot set CERT_NONE with check_hostname enabled
+        with self.assertRaises(ValueError):
+            ctx.verify_mode = ssl.CERT_NONE
+        ctx.check_hostname = False
+        self.assertFalse(ctx.check_hostname)
+
+    def test_context_client_server(self):
+        # PROTOCOL_TLS_CLIENT has sane defaults
+        ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
+        self.assertTrue(ctx.check_hostname)
+        self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
+
+        # PROTOCOL_TLS_SERVER has different but also sane defaults
+        ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
+        self.assertFalse(ctx.check_hostname)
+        self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
+
+
+class SSLErrorTests(unittest.TestCase):
+
+    def test_str(self):
+        # The str() of a SSLError doesn't include the errno
+        e = ssl.SSLError(1, "foo")
+        self.assertEqual(str(e), "foo")
+        self.assertEqual(e.errno, 1)
+        # Same for a subclass
+        e = ssl.SSLZeroReturnError(1, "foo")
+        self.assertEqual(str(e), "foo")
+        self.assertEqual(e.errno, 1)
+
+    def test_lib_reason(self):
+        # Test the library and reason attributes
+        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
+        with self.assertRaises(ssl.SSLError) as cm:
+            ctx.load_dh_params(CERTFILE)
+        self.assertEqual(cm.exception.library, 'PEM')
+        self.assertEqual(cm.exception.reason, 'NO_START_LINE')
+        s = str(cm.exception)
+        self.assertTrue(s.startswith("[PEM: NO_START_LINE] no start line"), s)
+
+    def test_subclass(self):
+        # Check that the appropriate SSLError subclass is raised
+        # (this only tests one of them)
+        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
+        with socket.socket() as s:
+            s.bind(("127.0.0.1", 0))
+            s.listen()
+            c = socket.socket()
+            c.connect(s.getsockname())
+            c.setblocking(False)
+            with ctx.wrap_socket(c, False, do_handshake_on_connect=False) as c:
+                with self.assertRaises(ssl.SSLWantReadError) as cm:
+                    c.do_handshake()
+                s = str(cm.exception)
+                self.assertTrue(s.startswith("The operation did not complete 
(read)"), s)
+                # For compatibility
+                self.assertEqual(cm.exception.errno, ssl.SSL_ERROR_WANT_READ)
+
+
+class MemoryBIOTests(unittest.TestCase):
+
+    def test_read_write(self):
+        bio = ssl.MemoryBIO()
+        bio.write(b'foo')
+        self.assertEqual(bio.read(), b'foo')
+        self.assertEqual(bio.read(), b'')
+        bio.write(b'foo')
+        bio.write(b'bar')
+        self.assertEqual(bio.read(), b'foobar')
+        self.assertEqual(bio.read(), b'')
+        bio.write(b'baz')
+        self.assertEqual(bio.read(2), b'ba')
+        self.assertEqual(bio.read(1), b'z')
+        self.assertEqual(bio.read(1), b'')
+
+    def test_eof(self):
+        bio = ssl.MemoryBIO()
+        self.assertFalse(bio.eof)
+        self.assertEqual(bio.read(), b'')
+        self.assertFalse(bio.eof)
+        bio.write(b'foo')
+        self.assertFalse(bio.eof)
+        bio.write_eof()
+        self.assertFalse(bio.eof)
+        self.assertEqual(bio.read(2), b'fo')
+        self.assertFalse(bio.eof)
+        self.assertEqual(bio.read(1), b'o')
+        self.assertTrue(bio.eof)
+        self.assertEqual(bio.read(), b'')
+        self.assertTrue(bio.eof)
+
+    def test_pending(self):
+        bio = ssl.MemoryBIO()
+        self.assertEqual(bio.pending, 0)
+        bio.write(b'foo')
+        self.assertEqual(bio.pending, 3)
+        for i in range(3):
+            bio.read(1)
+            self.assertEqual(bio.pending, 3-i-1)
+        for i in range(3):
+            bio.write(b'x')
+            self.assertEqual(bio.pending, i+1)
+        bio.read()
+        self.assertEqual(bio.pending, 0)
+
+    def test_buffer_types(self):
+        bio = ssl.MemoryBIO()
+        bio.write(b'foo')
+        self.assertEqual(bio.read(), b'foo')
+        bio.write(bytearray(b'bar'))
+        self.assertEqual(bio.read(), b'bar')
+        bio.write(memoryview(b'baz'))
+        self.assertEqual(bio.read(), b'baz')
+
+    def test_error_types(self):
+        bio = ssl.MemoryBIO()
+        self.assertRaises(TypeError, bio.write, 'foo')
+        self.assertRaises(TypeError, bio.write, None)
+        self.assertRaises(TypeError, bio.write, True)
+        self.assertRaises(TypeError, bio.write, 1)
+
+
+@unittest.skipUnless(_have_threads, "Needs threading module")
+class SimpleBackgroundTests(unittest.TestCase):
+
+    """Tests that connect to a simple server running in the background"""
+
+    def setUp(self):
+        server = ThreadedEchoServer(SIGNED_CERTFILE)
+        self.server_addr = (HOST, server.port)
+        server.__enter__()
+        self.addCleanup(server.__exit__, None, None, None)
+
+    def test_connect(self):
+        with test_wrap_socket(socket.socket(socket.AF_INET),
+                            cert_reqs=ssl.CERT_NONE) as s:
+            s.connect(self.server_addr)
+            self.assertEqual({}, s.getpeercert())
+            self.assertFalse(s.server_side)
+
+        # this should succeed because we specify the root cert
+        with test_wrap_socket(socket.socket(socket.AF_INET),
+                            cert_reqs=ssl.CERT_REQUIRED,
+                            ca_certs=SIGNING_CA) as s:
+            s.connect(self.server_addr)
+            self.assertTrue(s.getpeercert())
+            self.assertFalse(s.server_side)
+
+    def test_connect_fail(self):
+        # This should fail because we have no verification certs. Connection
+        # failure crashes ThreadedEchoServer, so run this in an independent
+        # test method.
+        s = test_wrap_socket(socket.socket(socket.AF_INET),
+                            cert_reqs=ssl.CERT_REQUIRED)
+        self.addCleanup(s.close)
+        self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
+                               s.connect, self.server_addr)
+
+    def test_connect_ex(self):
+        # Issue #11326: check connect_ex() implementation
+        s = test_wrap_socket(socket.socket(socket.AF_INET),
+                            cert_reqs=ssl.CERT_REQUIRED,
+                            ca_certs=SIGNING_CA)
+        self.addCleanup(s.close)
+        self.assertEqual(0, s.connect_ex(self.server_addr))
+        self.assertTrue(s.getpeercert())
+
+    def test_non_blocking_connect_ex(self):
+        # Issue #11326: non-blocking connect_ex() should allow handshake
+        # to proceed after the socket gets ready.
+        s = test_wrap_socket(socket.socket(socket.AF_INET),
+                            cert_reqs=ssl.CERT_REQUIRED,
+                            ca_certs=SIGNING_CA,
+                            do_handshake_on_connect=False)
+        self.addCleanup(s.close)
+        s.setblocking(False)
+        rc = s.connect_ex(self.server_addr)
+        # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
+        self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
+        # Wait for connect to finish
+        select.select([], [s], [], 5.0)
+        # Non-blocking handshake
+        while True:
+            try:
+                s.do_handshake()
+                break
+            except ssl.SSLWantReadError:
+                select.select([s], [], [], 5.0)
+            except ssl.SSLWantWriteError:
+                select.select([], [s], [], 5.0)
+        # SSL established
+        self.assertTrue(s.getpeercert())
+
+    def test_connect_with_context(self):
+        # Same as test_connect, but with a separately created context
+        ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+        with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
+            s.connect(self.server_addr)
+            self.assertEqual({}, s.getpeercert())
+        # Same with a server hostname
+        with ctx.wrap_socket(socket.socket(socket.AF_INET),
+                            server_hostname="dummy") as s:
+            s.connect(self.server_addr)
+        ctx.verify_mode = ssl.CERT_REQUIRED
+        # This should succeed because we specify the root cert
+        ctx.load_verify_locations(SIGNING_CA)
+        with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
+            s.connect(self.server_addr)
+            cert = s.getpeercert()
+            self.assertTrue(cert)
+
+    def test_connect_with_context_fail(self):
+        # This should fail because we have no verification certs. Connection
+        # failure crashes ThreadedEchoServer, so run this in an independent
+        # test method.
+        ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+        ctx.verify_mode = ssl.CERT_REQUIRED
+        s = ctx.wrap_socket(socket.socket(socket.AF_INET))
+        self.addCleanup(s.close)
+        self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
+                                s.connect, self.server_addr)
+
+    def test_connect_capath(self):
+        # Verify server certificates using the `capath` argument
+        # NOTE: the subject hashing algorithm has been changed between
+        # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
+        # contain both versions of each certificate (same content, different
+        # filename) for this test to be portable across OpenSSL releases.
+        ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+        ctx.verify_mode = ssl.CERT_REQUIRED
+        ctx.load_verify_locations(capath=CAPATH)
+        with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
+            s.connect(self.server_addr)
+            cert = s.getpeercert()
+            self.assertTrue(cert)
+        # Same with a bytes `capath` argument
+        ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+        ctx.verify_mode = ssl.CERT_REQUIRED
+        ctx.load_verify_locations(capath=BYTES_CAPATH)
+        with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
+            s.connect(self.server_addr)
+            cert = s.getpeercert()
+            self.assertTrue(cert)
+
+    def test_connect_cadata(self):
+        with open(SIGNING_CA) as f:
+            pem = f.read()
+        der = ssl.PEM_cert_to_DER_cert(pem)
+        ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+        ctx.verify_mode = ssl.CERT_REQUIRED
+        ctx.load_verify_locations(cadata=pem)
+        with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
+            s.connect(self.server_addr)
+            cert = s.getpeercert()
+            self.assertTrue(cert)
+
+        # same with DER
+        ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+        ctx.verify_mode = ssl.CERT_REQUIRED
+        ctx.load_verify_locations(cadata=der)
+        with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
+            s.connect(self.server_addr)
+            cert = s.getpeercert()
+            self.assertTrue(cert)
+
+    @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under 
Windows")
+    def test_makefile_close(self):
+        # Issue #5238: creating a file-like object with makefile() shouldn't
+        # delay closing the underlying "real socket" (here tested with its
+        # file descriptor, hence skipping the test under Windows).
+        ss = test_wrap_socket(socket.socket(socket.AF_INET))
+        ss.connect(self.server_addr)
+        fd = ss.fileno()
+        f = ss.makefile()
+        f.close()
+        # The fd is still open
+        os.read(fd, 0)
+        # Closing the SSL socket should close the fd too
+        ss.close()
+        gc.collect()
+        with self.assertRaises(OSError) as e:
+            os.read(fd, 0)
+        self.assertEqual(e.exception.errno, errno.EBADF)
+
+    def test_non_blocking_handshake(self):
+        s = socket.socket(socket.AF_INET)
+        s.connect(self.server_addr)
+        s.setblocking(False)
+        s = test_wrap_socket(s,
+                            cert_reqs=ssl.CERT_NONE,
+                            do_handshake_on_connect=False)
+        self.addCleanup(s.close)
+        count = 0
+        while True:
+            try:
+                count += 1
+                s.do_handshake()
+                break
+            except ssl.SSLWantReadError:
+                select.select([s], [], [])
+            except ssl.SSLWantWriteError:
+                select.select([], [s], [])
+        if support.verbose:
+            sys.stdout.write("\nNeeded %d calls to do_handshake() to establish 
session.\n" % count)
+
+    def test_get_server_certificate(self):
+        _test_get_server_certificate(self, *self.server_addr, cert=SIGNING_CA)
+
+    def test_get_server_certificate_fail(self):
+        # Connection failure crashes ThreadedEchoServer, so run this in an
+        # independent test method
+        _test_get_server_certificate_fail(self, *self.server_addr)
+
+    def test_ciphers(self):
+        with test_wrap_socket(socket.socket(socket.AF_INET),
+                             cert_reqs=ssl.CERT_NONE, ciphers="ALL") as s:
+            s.connect(self.server_addr)
+        with test_wrap_socket(socket.socket(socket.AF_INET),
+                             cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") as s:
+            s.connect(self.server_addr)
+        # Error checking can happen at instantiation or when connecting
+        with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
+            with socket.socket(socket.AF_INET) as sock:
+                s = test_wrap_socket(sock,
+                                    cert_reqs=ssl.CERT_NONE, 
ciphers="^$:,;?*'dorothyx")
+                s.connect(self.server_addr)
+
+    def test_get_ca_certs_capath(self):
+        # capath certs are loaded on request
+        ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+        ctx.verify_mode = ssl.CERT_REQUIRED
+        ctx.load_verify_locations(capath=CAPATH)
+        self.assertEqual(ctx.get_ca_certs(), [])
+        with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
+            s.connect(self.server_addr)
+            cert = s.getpeercert()
+            self.assertTrue(cert)
+        self.assertEqual(len(ctx.get_ca_certs()), 1)
+
+    @needs_sni
+    @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_2"), "needs TLS 1.2")
+    def test_context_setget(self):
+        # Check that the context of a connected socket can be replaced.
+        ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
+        ctx2 = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+        s = socket.socket(socket.AF_INET)
+        with ctx1.wrap_socket(s) as ss:
+            ss.connect(self.server_addr)
+            self.assertIs(ss.context, ctx1)
+            self.assertIs(ss._sslobj.context, ctx1)
+            ss.context = ctx2
+            self.assertIs(ss.context, ctx2)
+            self.assertIs(ss._sslobj.context, ctx2)
+
+    def ssl_io_loop(self, sock, incoming, outgoing, func, *args, **kwargs):
+        # A simple IO loop. Call func(*args) depending on the error we get
+        # (WANT_READ or WANT_WRITE) move data between the socket and the BIOs.
+        timeout = kwargs.get('timeout', 10)
+        count = 0
+        while True:
+            errno = None
+            count += 1
+            try:
+                ret = func(*args)
+            except ssl.SSLError as e:
+                if e.errno not in (ssl.SSL_ERROR_WANT_READ,
+                                   ssl.SSL_ERROR_WANT_WRITE):
+                    raise
+                errno = e.errno
+            # Get any data from the outgoing BIO irrespective of any error, and
+            # send it to the socket.
+            buf = outgoing.read()
+            sock.sendall(buf)
+            # If there's no error, we're done. For WANT_READ, we need to get
+            # data from the socket and put it in the incoming BIO.
+            if errno is None:
+                break
+            elif errno == ssl.SSL_ERROR_WANT_READ:
+                buf = sock.recv(32768)
+                if buf:
+                    incoming.write(buf)
+                else:
+                    incoming.write_eof()
+        if support.verbose:
+            sys.stdout.write("Needed %d calls to complete %s().\n"
+                             % (count, func.__name__))
+        return ret
+
+    def test_bio_handshake(self):
+        sock = socket.socket(socket.AF_INET)
+        self.addCleanup(sock.close)
+        sock.connect(self.server_addr)
+        incoming = ssl.MemoryBIO()
+        outgoing = ssl.MemoryBIO()
+        ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+        ctx.verify_mode = ssl.CERT_REQUIRED
+        ctx.load_verify_locations(SIGNING_CA)
+        ctx.check_hostname = True
+        sslobj = ctx.wrap_bio(incoming, outgoing, False, 'localhost')
+        self.assertIs(sslobj._sslobj.owner, sslobj)
+        self.assertIsNone(sslobj.cipher())
+        # cypthon implementation detail
+        # self.assertIsNone(sslobj.version())
+        self.assertIsNotNone(sslobj.shared_ciphers())
+        self.assertRaises(ValueError, sslobj.getpeercert)
+        if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
+            self.assertIsNone(sslobj.get_channel_binding('tls-unique'))
+        self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
+        self.assertTrue(sslobj.cipher())
+        self.assertIsNotNone(sslobj.shared_ciphers())
+        self.assertIsNotNone(sslobj.version())
+        self.assertTrue(sslobj.getpeercert())
+        if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
+            self.assertTrue(sslobj.get_channel_binding('tls-unique'))
+        try:
+            self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
+        except ssl.SSLSyscallError:
+            # If the server shuts down the TCP connection without sending a
+            # secure shutdown message, this is reported as SSL_ERROR_SYSCALL
+            pass
+        self.assertRaises(ssl.SSLError, sslobj.write, b'foo')
+
+    def test_bio_read_write_data(self):
+        sock = socket.socket(socket.AF_INET)
+        self.addCleanup(sock.close)
+        sock.connect(self.server_addr)
+        incoming = ssl.MemoryBIO()
+        outgoing = ssl.MemoryBIO()
+        ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+        ctx.verify_mode = ssl.CERT_NONE
+        sslobj = ctx.wrap_bio(incoming, outgoing, False)
+        self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
+        req = b'FOO\n'
+        self.ssl_io_loop(sock, incoming, outgoing, sslobj.write, req)
+        buf = self.ssl_io_loop(sock, incoming, outgoing, sslobj.read, 1024)
+        self.assertEqual(buf, b'foo\n')
+        self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
+
+
+class NetworkedTests(unittest.TestCase):
+
+    def test_timeout_connect_ex(self):
+        # Issue #12065: on a timeout, connect_ex() should return the original
+        # errno (mimicking the behaviour of non-SSL sockets).
+        with support.transient_internet(REMOTE_HOST):
+            s = test_wrap_socket(socket.socket(socket.AF_INET),
+                                cert_reqs=ssl.CERT_REQUIRED,
+                                do_handshake_on_connect=False)
+            self.addCleanup(s.close)
+            s.settimeout(0.0000001)
+            rc = s.connect_ex((REMOTE_HOST, 443))
+            if rc == 0:
+                self.skipTest("REMOTE_HOST responded too quickly")
+            self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
+
+    @unittest.skipUnless(support.IPV6_ENABLED, 'Needs IPv6')
+    def test_get_server_certificate_ipv6(self):
+        with support.transient_internet('ipv6.google.com'):
+            _test_get_server_certificate(self, 'ipv6.google.com', 443)
+            _test_get_server_certificate_fail(self, 'ipv6.google.com', 443)
+
+
+def _test_get_server_certificate(test, host, port, cert=None):
+    pem = ssl.get_server_certificate((host, port))
+    if not pem:
+        test.fail("No server certificate on %s:%s!" % (host, port))
+
+    pem = ssl.get_server_certificate((host, port), ca_certs=cert)
+    if not pem:
+        test.fail("No server certificate on %s:%s!" % (host, port))
+    if support.verbose:
+        sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, 
port ,pem))
+
+def _test_get_server_certificate_fail(test, host, port):
+    try:
+        pem = ssl.get_server_certificate((host, port), ca_certs=CERTFILE)
+    except ssl.SSLError as x:
+        #should fail
+        if support.verbose:
+            sys.stdout.write("%s\n" % x)
+    else:
+        test.fail("Got server certificate %s for %s:%s!" % (pem, host, port))
+
+
+if _have_threads:
+    from test.ssl_servers import make_https_server
+
+    class ThreadedEchoServer(threading.Thread):
+
+        class ConnectionHandler(threading.Thread):
+
+            """A mildly complicated class, because we want it to work both
+            with and without the SSL wrapper around the socket connection, so
+            that we can test the STARTTLS functionality."""
+
+            def __init__(self, server, connsock, addr):
+                self.server = server
+                self.running = False
+                self.sock = connsock
+                self.addr = addr
+                self.sock.setblocking(1)
+                self.sslconn = None
+                threading.Thread.__init__(self)
+                self.daemon = True
+
+            def wrap_conn(self):
+                try:
+                    self.sslconn = self.server.context.wrap_socket(
+                        self.sock, server_side=True)
+                    
self.server.selected_npn_protocols.append(self.sslconn.selected_npn_protocol())
+                    
self.server.selected_alpn_protocols.append(self.sslconn.selected_alpn_protocol())
+                except (ConnectionResetError, BrokenPipeError) as e:
+                    # We treat ConnectionResetError as though it were an
+                    # SSLError - OpenSSL on Ubuntu abruptly closes the
+                    # connection when asked to use an unsupported protocol.
+                    #
+                    # BrokenPipeError is raised in TLS 1.3 mode, when OpenSSL
+                    # tries to send session tickets after handshake.
+                    # https://github.com/openssl/openssl/issues/6342
+                    self.server.conn_errors.append(str(e))
+                    if self.server.chatty:
+                        handle_error(
+                            "\n server:  bad connection attempt from " + repr(
+                                self.addr) + ":\n")
+                    self.running = False
+                    self.close()
+                    return False
+                except (ssl.SSLError, OSError) as e:
+                    # OSError may occur with wrong protocols, e.g. both
+                    # sides use PROTOCOL_TLS_SERVER.
+                    #
+                    # XXX Various errors can have happened here, for example
+                    # a mismatching protocol version, an invalid certificate,
+                    # or a low-level bug. This should be made more 
discriminating.
+                    #
+                    # bpo-31323: Store the exception as string to prevent
+                    # a reference leak: server -> conn_errors -> exception
+                    # -> traceback -> self (ConnectionHandler) -> server
+                    self.server.conn_errors.append(str(e))
+                    if self.server.chatty:
+                        handle_error("\n server:  bad connection attempt from 
" + repr(self.addr) + ":\n")
+                    self.running = False
+                    self.server.stop()
+                    self.close()
+                    return False
+                else:
+                    
self.server.shared_ciphers.append(self.sslconn.shared_ciphers())
+                    if self.server.context.verify_mode == ssl.CERT_REQUIRED:
+                        cert = self.sslconn.getpeercert()
+                        if support.verbose and self.server.chatty:
+                            sys.stdout.write(" client cert is " + 
pprint.pformat(cert) + "\n")
+                        cert_binary = self.sslconn.getpeercert(True)
+                        if support.verbose and self.server.chatty:
+                            sys.stdout.write(" cert binary is " + 
str(len(cert_binary)) + " bytes\n")
+                    cipher = self.sslconn.cipher()
+                    if support.verbose and self.server.chatty:
+                        sys.stdout.write(" server: connection cipher is now " 
+ str(cipher) + "\n")
+                        sys.stdout.write(" server: selected protocol is now "
+                                + str(self.sslconn.selected_npn_protocol()) + 
"\n")
+                    return True
+
+            def read(self):
+                if self.sslconn:
+                    return self.sslconn.read()
+                else:
+                    return self.sock.recv(1024)
+
+            def write(self, bytes):
+                if self.sslconn:
+                    return self.sslconn.write(bytes)
+                else:
+                    return self.sock.send(bytes)
+
+            def close(self):
+                if self.sslconn:
+                    self.sslconn.close()
+                else:
_______________________________________________
pypy-commit mailing list
pypy-commit@python.org
https://mail.python.org/mailman/listinfo/pypy-commit

Reply via email to