Author: Matti Picus <matti.pi...@gmail.com> Branch: Changeset: r97180:df138cf691c1 Date: 2019-08-14 19:35 +0300 http://bitbucket.org/pypy/pypy/changeset/df138cf691c1/
Log: tweak test, add missing get_cipher diff too long, truncating to 2000 out of 4071 lines diff --git a/lib-python/3/test/test_tools.py b/lib-python/3.2/test/test_tools.py rename from lib-python/3/test/test_tools.py rename to lib-python/3.2/test/test_tools.py diff --git a/lib-python/3/test/test_ssl.py b/lib-python/3/test/test_ssl.py new file mode 100644 --- /dev/null +++ b/lib-python/3/test/test_ssl.py @@ -0,0 +1,3987 @@ +# Test the support for SSL and sockets + +import sys +import unittest +from test import support +import socket +import select +import time +import datetime +import gc +import os +import errno +import pprint +import tempfile +import urllib.request +import traceback +import asyncore +import weakref +import platform +import re +import functools +try: + import ctypes +except ImportError: + ctypes = None + +ssl = support.import_module("ssl") + +try: + import threading +except ImportError: + _have_threads = False +else: + _have_threads = True + +PROTOCOLS = sorted(ssl._PROTOCOL_NAMES) +HOST = support.HOST +IS_LIBRESSL = ssl.OPENSSL_VERSION.startswith('LibreSSL') +IS_OPENSSL_1_1 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0) + + +def data_file(*name): + return os.path.join(os.path.dirname(__file__), *name) + +# The custom key and certificate files used in test_ssl are generated +# using Lib/test/make_ssl_certs.py. +# Other certificates are simply fetched from the Internet servers they +# are meant to authenticate. + +CERTFILE = data_file("keycert.pem") +BYTES_CERTFILE = os.fsencode(CERTFILE) +ONLYCERT = data_file("ssl_cert.pem") +ONLYKEY = data_file("ssl_key.pem") +BYTES_ONLYCERT = os.fsencode(ONLYCERT) +BYTES_ONLYKEY = os.fsencode(ONLYKEY) +CERTFILE_PROTECTED = data_file("keycert.passwd.pem") +ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem") +KEY_PASSWORD = "somepass" +CAPATH = data_file("capath") +BYTES_CAPATH = os.fsencode(CAPATH) +CAFILE_NEURONIO = data_file("capath", "4e1295a3.0") +CAFILE_CACERT = data_file("capath", "5ed36f99.0") + +# empty CRL +CRLFILE = data_file("revocation.crl") + +# Two keys and certs signed by the same CA (for SNI tests) +SIGNED_CERTFILE = data_file("keycert3.pem") +SIGNED_CERTFILE2 = data_file("keycert4.pem") +# Same certificate as pycacert.pem, but without extra text in file +SIGNING_CA = data_file("capath", "ceff1710.0") +# cert with all kinds of subject alt names +ALLSANFILE = data_file("allsans.pem") +# cert with all kinds of subject alt names +ALLSANFILE = data_file("allsans.pem") + +REMOTE_HOST = "self-signed.pythontest.net" + +EMPTYCERT = data_file("nullcert.pem") +BADCERT = data_file("badcert.pem") +NONEXISTINGCERT = data_file("XXXnonexisting.pem") +BADKEY = data_file("badkey.pem") +NOKIACERT = data_file("nokia.pem") +NULLBYTECERT = data_file("nullbytecert.pem") +TALOS_INVALID_CRLDP = data_file("talos-2019-0758.pem") + +DHFILE = data_file("ffdh3072.pem") +BYTES_DHFILE = os.fsencode(DHFILE) + +# Not defined in all versions of OpenSSL +OP_NO_COMPRESSION = getattr(ssl, "OP_NO_COMPRESSION", 0) +OP_SINGLE_DH_USE = getattr(ssl, "OP_SINGLE_DH_USE", 0) +OP_SINGLE_ECDH_USE = getattr(ssl, "OP_SINGLE_ECDH_USE", 0) +OP_CIPHER_SERVER_PREFERENCE = getattr(ssl, "OP_CIPHER_SERVER_PREFERENCE", 0) +OP_ENABLE_MIDDLEBOX_COMPAT = getattr(ssl, "OP_ENABLE_MIDDLEBOX_COMPAT", 0) + + +def handle_error(prefix): + exc_format = ' '.join(traceback.format_exception(*sys.exc_info())) + if support.verbose: + sys.stdout.write(prefix + exc_format) + +def can_clear_options(): + # 0.9.8m or higher + return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15) + +def no_sslv2_implies_sslv3_hello(): + # 0.9.7h or higher + return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15) + +def have_verify_flags(): + # 0.9.8 or higher + return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 0, 15) + +def utc_offset(): #NOTE: ignore issues like #1647654 + # local time = utc time + utc offset + if time.daylight and time.localtime().tm_isdst > 0: + return -time.altzone # seconds + return -time.timezone + +def asn1time(cert_time): + # Some versions of OpenSSL ignore seconds, see #18207 + # 0.9.8.i + if ssl._OPENSSL_API_VERSION == (0, 9, 8, 9, 15): + fmt = "%b %d %H:%M:%S %Y GMT" + dt = datetime.datetime.strptime(cert_time, fmt) + dt = dt.replace(second=0) + cert_time = dt.strftime(fmt) + # %d adds leading zero but ASN1_TIME_print() uses leading space + if cert_time[4] == "0": + cert_time = cert_time[:4] + " " + cert_time[5:] + + return cert_time + +# Issue #9415: Ubuntu hijacks their OpenSSL and forcefully disables SSLv2 +def skip_if_broken_ubuntu_ssl(func): + if hasattr(ssl, 'PROTOCOL_SSLv2'): + @functools.wraps(func) + def f(*args, **kwargs): + try: + ssl.SSLContext(ssl.PROTOCOL_SSLv2) + except ssl.SSLError: + if (ssl.OPENSSL_VERSION_INFO == (0, 9, 8, 15, 15) and + platform.linux_distribution() == ('debian', 'squeeze/sid', '')): + raise unittest.SkipTest("Patched Ubuntu OpenSSL breaks behaviour") + return func(*args, **kwargs) + return f + else: + return func + +def skip_if_openssl_cnf_minprotocol_gt_tls1(func): + """Skip a test if the OpenSSL config MinProtocol is > TLSv1. + + OS distros with an /etc/ssl/openssl.cnf and MinProtocol set often do so to + require TLSv1.2 or higher (Debian Buster). Some of our tests for older + protocol versions will fail under such a config. + + Alternative workaround: Run this test in a process with + OPENSSL_CONF=/dev/null in the environment. + """ + @functools.wraps(func) + def f(*args, **kwargs): + openssl_cnf = os.environ.get("OPENSSL_CONF", "/etc/ssl/openssl.cnf") + try: + with open(openssl_cnf, "r") as config: + for line in config: + match = re.match(r"MinProtocol\s*=\s*(TLSv\d+\S*)", line) + if match: + tls_ver = match.group(1) + if tls_ver > "TLSv1": + raise unittest.SkipTest( + "%s has MinProtocol = %s which is > TLSv1." % + (openssl_cnf, tls_ver)) + except (EnvironmentError, UnicodeDecodeError) as err: + # no config file found, etc. + if support.verbose: + sys.stdout.write("\n Could not scan %s for MinProtocol: %s\n" + % (openssl_cnf, err)) + return func(*args, **kwargs) + return f + + +needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test") + + +def test_wrap_socket(sock, ssl_version=ssl.PROTOCOL_TLS, *, + cert_reqs=ssl.CERT_NONE, ca_certs=None, + ciphers=None, certfile=None, keyfile=None, + **kwargs): + context = ssl.SSLContext(ssl_version) + if cert_reqs is not None: + context.verify_mode = cert_reqs + if ca_certs is not None: + context.load_verify_locations(ca_certs) + if certfile is not None or keyfile is not None: + context.load_cert_chain(certfile, keyfile) + if ciphers is not None: + context.set_ciphers(ciphers) + return context.wrap_socket(sock, **kwargs) + +class BasicSocketTests(unittest.TestCase): + + def test_constants(self): + ssl.CERT_NONE + ssl.CERT_OPTIONAL + ssl.CERT_REQUIRED + ssl.OP_CIPHER_SERVER_PREFERENCE + ssl.OP_SINGLE_DH_USE + if ssl.HAS_ECDH: + ssl.OP_SINGLE_ECDH_USE + if ssl.OPENSSL_VERSION_INFO >= (1, 0): + ssl.OP_NO_COMPRESSION + self.assertIn(ssl.HAS_SNI, {True, False}) + self.assertIn(ssl.HAS_ECDH, {True, False}) + ssl.OP_NO_SSLv2 + ssl.OP_NO_SSLv3 + ssl.OP_NO_TLSv1 + ssl.OP_NO_TLSv1_3 + if ssl.OPENSSL_VERSION_INFO >= (1, 0, 1): + ssl.OP_NO_TLSv1_1 + ssl.OP_NO_TLSv1_2 + + def test_str_for_enums(self): + # Make sure that the PROTOCOL_* constants have enum-like string + # reprs. + proto = ssl.PROTOCOL_TLS + self.assertEqual(str(proto), '_SSLMethod.PROTOCOL_TLS') + ctx = ssl.SSLContext(proto) + self.assertIs(ctx.protocol, proto) + + def test_random(self): + v = ssl.RAND_status() + if support.verbose: + sys.stdout.write("\n RAND_status is %d (%s)\n" + % (v, (v and "sufficient randomness") or + "insufficient randomness")) + + data, is_cryptographic = ssl.RAND_pseudo_bytes(16) + self.assertEqual(len(data), 16) + self.assertEqual(is_cryptographic, v == 1) + if v: + data = ssl.RAND_bytes(16) + self.assertEqual(len(data), 16) + else: + self.assertRaises(ssl.SSLError, ssl.RAND_bytes, 16) + + # negative num is invalid + self.assertRaises(ValueError, ssl.RAND_bytes, -5) + self.assertRaises(ValueError, ssl.RAND_pseudo_bytes, -5) + + if hasattr(ssl, 'RAND_egd'): + self.assertRaises(TypeError, ssl.RAND_egd, 1) + self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1) + ssl.RAND_add("this is a random string", 75.0) + ssl.RAND_add(b"this is a random bytes object", 75.0) + ssl.RAND_add(bytearray(b"this is a random bytearray object"), 75.0) + + @unittest.skipUnless(os.name == 'posix', 'requires posix') + def test_random_fork(self): + status = ssl.RAND_status() + if not status: + self.fail("OpenSSL's PRNG has insufficient randomness") + + rfd, wfd = os.pipe() + pid = os.fork() + if pid == 0: + try: + os.close(rfd) + child_random = ssl.RAND_pseudo_bytes(16)[0] + self.assertEqual(len(child_random), 16) + os.write(wfd, child_random) + os.close(wfd) + except BaseException: + os._exit(1) + else: + os._exit(0) + else: + os.close(wfd) + self.addCleanup(os.close, rfd) + _, status = os.waitpid(pid, 0) + self.assertEqual(status, 0) + + child_random = os.read(rfd, 16) + self.assertEqual(len(child_random), 16) + parent_random = ssl.RAND_pseudo_bytes(16)[0] + self.assertEqual(len(parent_random), 16) + + self.assertNotEqual(child_random, parent_random) + + maxDiff = None + + def test_parse_cert(self): + # note that this uses an 'unofficial' function in _ssl.c, + # provided solely for this test, to exercise the certificate + # parsing code + p = ssl._ssl._test_decode_cert(CERTFILE) + if support.verbose: + sys.stdout.write("\n" + pprint.pformat(p) + "\n") + self.assertEqual(p['issuer'], + ((('countryName', 'XY'),), + (('localityName', 'Castle Anthrax'),), + (('organizationName', 'Python Software Foundation'),), + (('commonName', 'localhost'),)) + ) + # Note the next three asserts will fail if the keys are regenerated + self.assertEqual(p['notAfter'], asn1time('Aug 26 14:23:15 2028 GMT')) + self.assertEqual(p['notBefore'], asn1time('Aug 29 14:23:15 2018 GMT')) + self.assertEqual(p['serialNumber'], '98A7CF88C74A32ED') + self.assertEqual(p['subject'], + ((('countryName', 'XY'),), + (('localityName', 'Castle Anthrax'),), + (('organizationName', 'Python Software Foundation'),), + (('commonName', 'localhost'),)) + ) + self.assertEqual(p['subjectAltName'], (('DNS', 'localhost'),)) + # Issue #13034: the subjectAltName in some certificates + # (notably projects.developer.nokia.com:443) wasn't parsed + p = ssl._ssl._test_decode_cert(NOKIACERT) + if support.verbose: + sys.stdout.write("\n" + pprint.pformat(p) + "\n") + self.assertEqual(p['subjectAltName'], + (('DNS', 'projects.developer.nokia.com'), + ('DNS', 'projects.forum.nokia.com')) + ) + # extra OCSP and AIA fields + self.assertEqual(p['OCSP'], ('http://ocsp.verisign.com',)) + self.assertEqual(p['caIssuers'], + ('http://SVRIntl-G3-aia.verisign.com/SVRIntlG3.cer',)) + self.assertEqual(p['crlDistributionPoints'], + ('http://SVRIntl-G3-crl.verisign.com/SVRIntlG3.crl',)) + + def test_parse_cert_CVE_2019_5010(self): + p = ssl._ssl._test_decode_cert(TALOS_INVALID_CRLDP) + if support.verbose: + sys.stdout.write("\n" + pprint.pformat(p) + "\n") + self.assertEqual( + p, + { + 'issuer': ( + (('countryName', 'UK'),), (('commonName', 'cody-ca'),)), + 'notAfter': 'Jun 14 18:00:58 2028 GMT', + 'notBefore': 'Jun 18 18:00:58 2018 GMT', + 'serialNumber': '02', + 'subject': ((('countryName', 'UK'),), + (('commonName', + 'codenomicon-vm-2.test.lal.cisco.com'),)), + 'subjectAltName': ( + ('DNS', 'codenomicon-vm-2.test.lal.cisco.com'),), + 'version': 3 + } + ) + + def test_parse_cert_CVE_2013_4238(self): + p = ssl._ssl._test_decode_cert(NULLBYTECERT) + if support.verbose: + sys.stdout.write("\n" + pprint.pformat(p) + "\n") + subject = ((('countryName', 'US'),), + (('stateOrProvinceName', 'Oregon'),), + (('localityName', 'Beaverton'),), + (('organizationName', 'Python Software Foundation'),), + (('organizationalUnitName', 'Python Core Development'),), + (('commonName', 'null.python.org\x00example.org'),), + (('emailAddress', 'python-...@python.org'),)) + self.assertEqual(p['subject'], subject) + self.assertEqual(p['issuer'], subject) + if ssl._OPENSSL_API_VERSION >= (0, 9, 8): + san = (('DNS', 'altnull.python.org\x00example.com'), + ('email', 'n...@python.org\x00u...@example.org'), + ('URI', 'http://null.python.org\x00http://example.org'), + ('IP Address', '192.0.2.1'), + ('IP Address', '2001:DB8:0:0:0:0:0:1\n')) + else: + # OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName + san = (('DNS', 'altnull.python.org\x00example.com'), + ('email', 'n...@python.org\x00u...@example.org'), + ('URI', 'http://null.python.org\x00http://example.org'), + ('IP Address', '192.0.2.1'), + ('IP Address', '<invalid>')) + + self.assertEqual(p['subjectAltName'], san) + + def test_parse_all_sans(self): + p = ssl._ssl._test_decode_cert(ALLSANFILE) + self.assertEqual(p['subjectAltName'], + ( + ('DNS', 'allsans'), + ('othername', '<unsupported>'), + ('othername', '<unsupported>'), + ('email', 'u...@example.org'), + ('DNS', 'www.example.org'), + ('DirName', + ((('countryName', 'XY'),), + (('localityName', 'Castle Anthrax'),), + (('organizationName', 'Python Software Foundation'),), + (('commonName', 'dirname example'),))), + ('URI', 'https://www.python.org/'), + ('IP Address', '127.0.0.1'), + ('IP Address', '0:0:0:0:0:0:0:1\n'), + ('Registered ID', '1.2.3.4.5') + ) + ) + + def test_DER_to_PEM(self): + with open(CAFILE_CACERT, 'r') as f: + pem = f.read() + d1 = ssl.PEM_cert_to_DER_cert(pem) + p2 = ssl.DER_cert_to_PEM_cert(d1) + d2 = ssl.PEM_cert_to_DER_cert(p2) + self.assertEqual(d1, d2) + if not p2.startswith(ssl.PEM_HEADER + '\n'): + self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2) + if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'): + self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2) + + def test_openssl_version(self): + n = ssl.OPENSSL_VERSION_NUMBER + t = ssl.OPENSSL_VERSION_INFO + s = ssl.OPENSSL_VERSION + self.assertIsInstance(n, int) + self.assertIsInstance(t, tuple) + self.assertIsInstance(s, str) + # Some sanity checks follow + # >= 0.9 + self.assertGreaterEqual(n, 0x900000) + # < 3.0 + self.assertLess(n, 0x30000000) + major, minor, fix, patch, status = t + self.assertGreaterEqual(major, 0) + self.assertLess(major, 3) + self.assertGreaterEqual(minor, 0) + self.assertLess(minor, 256) + self.assertGreaterEqual(fix, 0) + self.assertLess(fix, 256) + self.assertGreaterEqual(patch, 0) + self.assertLessEqual(patch, 63) + self.assertGreaterEqual(status, 0) + self.assertLessEqual(status, 15) + # Version string as returned by {Open,Libre}SSL, the format might change + if IS_LIBRESSL: + self.assertTrue(s.startswith("LibreSSL {:d}".format(major)), + (s, t, hex(n))) + else: + self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)), + (s, t, hex(n))) + + @support.cpython_only + def test_refcycle(self): + # Issue #7943: an SSL object doesn't create reference cycles with + # itself. + s = socket.socket(socket.AF_INET) + ss = test_wrap_socket(s) + wr = weakref.ref(ss) + with support.check_warnings(("", ResourceWarning)): + del ss + self.assertEqual(wr(), None) + + def test_wrapped_unconnected(self): + # Methods on an unconnected SSLSocket propagate the original + # OSError raise by the underlying socket object. + s = socket.socket(socket.AF_INET) + with test_wrap_socket(s) as ss: + self.assertRaises(OSError, ss.recv, 1) + self.assertRaises(OSError, ss.recv_into, bytearray(b'x')) + self.assertRaises(OSError, ss.recvfrom, 1) + self.assertRaises(OSError, ss.recvfrom_into, bytearray(b'x'), 1) + self.assertRaises(OSError, ss.send, b'x') + self.assertRaises(OSError, ss.sendto, b'x', ('0.0.0.0', 0)) + self.assertRaises(NotImplementedError, ss.dup) + self.assertRaises(NotImplementedError, ss.sendmsg, + [b'x'], (), 0, ('0.0.0.0', 0)) + self.assertRaises(NotImplementedError, ss.recvmsg, 100) + self.assertRaises(NotImplementedError, ss.recvmsg_into, + [bytearray(100)]) + + def test_timeout(self): + # Issue #8524: when creating an SSL socket, the timeout of the + # original socket should be retained. + for timeout in (None, 0.0, 5.0): + s = socket.socket(socket.AF_INET) + s.settimeout(timeout) + with test_wrap_socket(s) as ss: + self.assertEqual(timeout, ss.gettimeout()) + + def test_errors_sslwrap(self): + sock = socket.socket() + self.assertRaisesRegex(ValueError, + "certfile must be specified", + ssl.wrap_socket, sock, keyfile=CERTFILE) + self.assertRaisesRegex(ValueError, + "certfile must be specified for server-side operations", + ssl.wrap_socket, sock, server_side=True) + self.assertRaisesRegex(ValueError, + "certfile must be specified for server-side operations", + ssl.wrap_socket, sock, server_side=True, certfile="") + with ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE) as s: + self.assertRaisesRegex(ValueError, "can't connect in server-side mode", + s.connect, (HOST, 8080)) + with self.assertRaises(OSError) as cm: + with socket.socket() as sock: + ssl.wrap_socket(sock, certfile=NONEXISTINGCERT) + self.assertEqual(cm.exception.errno, errno.ENOENT) + with self.assertRaises(OSError) as cm: + with socket.socket() as sock: + ssl.wrap_socket(sock, + certfile=CERTFILE, keyfile=NONEXISTINGCERT) + self.assertEqual(cm.exception.errno, errno.ENOENT) + with self.assertRaises(OSError) as cm: + with socket.socket() as sock: + ssl.wrap_socket(sock, + certfile=NONEXISTINGCERT, keyfile=NONEXISTINGCERT) + self.assertEqual(cm.exception.errno, errno.ENOENT) + + def bad_cert_test(self, certfile): + """Check that trying to use the given client certificate fails""" + certfile = os.path.join(os.path.dirname(__file__) or os.curdir, + certfile) + sock = socket.socket() + self.addCleanup(sock.close) + with self.assertRaises(ssl.SSLError): + test_wrap_socket(sock, + certfile=certfile, + ssl_version=ssl.PROTOCOL_TLSv1) + + def test_empty_cert(self): + """Wrapping with an empty cert file""" + self.bad_cert_test("nullcert.pem") + + def test_malformed_cert(self): + """Wrapping with a badly formatted certificate (syntax error)""" + self.bad_cert_test("badcert.pem") + + def test_malformed_key(self): + """Wrapping with a badly formatted key (syntax error)""" + self.bad_cert_test("badkey.pem") + + def test_match_hostname(self): + def ok(cert, hostname): + ssl.match_hostname(cert, hostname) + def fail(cert, hostname): + self.assertRaises(ssl.CertificateError, + ssl.match_hostname, cert, hostname) + + # -- Hostname matching -- + + cert = {'subject': ((('commonName', 'example.com'),),)} + ok(cert, 'example.com') + ok(cert, 'ExAmple.cOm') + fail(cert, 'www.example.com') + fail(cert, '.example.com') + fail(cert, 'example.org') + fail(cert, 'exampleXcom') + + cert = {'subject': ((('commonName', '*.a.com'),),)} + ok(cert, 'foo.a.com') + fail(cert, 'bar.foo.a.com') + fail(cert, 'a.com') + fail(cert, 'Xa.com') + fail(cert, '.a.com') + + # only match one left-most wildcard + cert = {'subject': ((('commonName', 'f*.com'),),)} + ok(cert, 'foo.com') + ok(cert, 'f.com') + fail(cert, 'bar.com') + fail(cert, 'foo.a.com') + fail(cert, 'bar.foo.com') + + # NULL bytes are bad, CVE-2013-4073 + cert = {'subject': ((('commonName', + 'null.python.org\x00example.org'),),)} + ok(cert, 'null.python.org\x00example.org') # or raise an error? + fail(cert, 'example.org') + fail(cert, 'null.python.org') + + # error cases with wildcards + cert = {'subject': ((('commonName', '*.*.a.com'),),)} + fail(cert, 'bar.foo.a.com') + fail(cert, 'a.com') + fail(cert, 'Xa.com') + fail(cert, '.a.com') + + cert = {'subject': ((('commonName', 'a.*.com'),),)} + fail(cert, 'a.foo.com') + fail(cert, 'a..com') + fail(cert, 'a.com') + + # wildcard doesn't match IDNA prefix 'xn--' + idna = 'püthon.python.org'.encode("idna").decode("ascii") + cert = {'subject': ((('commonName', idna),),)} + ok(cert, idna) + cert = {'subject': ((('commonName', 'x*.python.org'),),)} + fail(cert, idna) + cert = {'subject': ((('commonName', 'xn--p*.python.org'),),)} + fail(cert, idna) + + # wildcard in first fragment and IDNA A-labels in sequent fragments + # are supported. + idna = 'www*.pythön.org'.encode("idna").decode("ascii") + cert = {'subject': ((('commonName', idna),),)} + ok(cert, 'www.pythön.org'.encode("idna").decode("ascii")) + ok(cert, 'www1.pythön.org'.encode("idna").decode("ascii")) + fail(cert, 'ftp.pythön.org'.encode("idna").decode("ascii")) + fail(cert, 'pythön.org'.encode("idna").decode("ascii")) + + # Slightly fake real-world example + cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT', + 'subject': ((('commonName', 'linuxfrz.org'),),), + 'subjectAltName': (('DNS', 'linuxfr.org'), + ('DNS', 'linuxfr.com'), + ('othername', '<unsupported>'))} + ok(cert, 'linuxfr.org') + ok(cert, 'linuxfr.com') + # Not a "DNS" entry + fail(cert, '<unsupported>') + # When there is a subjectAltName, commonName isn't used + fail(cert, 'linuxfrz.org') + + # A pristine real-world example + cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT', + 'subject': ((('countryName', 'US'),), + (('stateOrProvinceName', 'California'),), + (('localityName', 'Mountain View'),), + (('organizationName', 'Google Inc'),), + (('commonName', 'mail.google.com'),))} + ok(cert, 'mail.google.com') + fail(cert, 'gmail.com') + # Only commonName is considered + fail(cert, 'California') + + # -- IPv4 matching -- + cert = {'subject': ((('commonName', 'example.com'),),), + 'subjectAltName': (('DNS', 'example.com'), + ('IP Address', '10.11.12.13'), + ('IP Address', '14.15.16.17'))} + ok(cert, '10.11.12.13') + ok(cert, '14.15.16.17') + fail(cert, '14.15.16.18') + fail(cert, 'example.net') + + # -- IPv6 matching -- + cert = {'subject': ((('commonName', 'example.com'),),), + 'subjectAltName': (('DNS', 'example.com'), + ('IP Address', '2001:0:0:0:0:0:0:CAFE\n'), + ('IP Address', '2003:0:0:0:0:0:0:BABA\n'))} + ok(cert, '2001::cafe') + ok(cert, '2003::baba') + fail(cert, '2003::bebe') + fail(cert, 'example.net') + + # -- Miscellaneous -- + + # Neither commonName nor subjectAltName + cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT', + 'subject': ((('countryName', 'US'),), + (('stateOrProvinceName', 'California'),), + (('localityName', 'Mountain View'),), + (('organizationName', 'Google Inc'),))} + fail(cert, 'mail.google.com') + + # No DNS entry in subjectAltName but a commonName + cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT', + 'subject': ((('countryName', 'US'),), + (('stateOrProvinceName', 'California'),), + (('localityName', 'Mountain View'),), + (('commonName', 'mail.google.com'),)), + 'subjectAltName': (('othername', 'blabla'), )} + ok(cert, 'mail.google.com') + + # No DNS entry subjectAltName and no commonName + cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT', + 'subject': ((('countryName', 'US'),), + (('stateOrProvinceName', 'California'),), + (('localityName', 'Mountain View'),), + (('organizationName', 'Google Inc'),)), + 'subjectAltName': (('othername', 'blabla'),)} + fail(cert, 'google.com') + + # Empty cert / no cert + self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com') + self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com') + + # Issue #17980: avoid denials of service by refusing more than one + # wildcard per fragment. + cert = {'subject': ((('commonName', 'a*b.com'),),)} + ok(cert, 'axxb.com') + cert = {'subject': ((('commonName', 'a*b.co*'),),)} + fail(cert, 'axxb.com') + cert = {'subject': ((('commonName', 'a*b*.com'),),)} + with self.assertRaises(ssl.CertificateError) as cm: + ssl.match_hostname(cert, 'axxbxxc.com') + self.assertIn("too many wildcards", str(cm.exception)) + + def test_server_side(self): + # server_hostname doesn't work for server sockets + ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + with socket.socket() as sock: + self.assertRaises(ValueError, ctx.wrap_socket, sock, True, + server_hostname="some.hostname") + + def test_unknown_channel_binding(self): + # should raise ValueError for unknown type + s = socket.socket(socket.AF_INET) + s.bind(('127.0.0.1', 0)) + s.listen() + c = socket.socket(socket.AF_INET) + c.connect(s.getsockname()) + with test_wrap_socket(c, do_handshake_on_connect=False) as ss: + with self.assertRaises(ValueError): + ss.get_channel_binding("unknown-type") + s.close() + + @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES, + "'tls-unique' channel binding not available") + def test_tls_unique_channel_binding(self): + # unconnected should return None for known type + s = socket.socket(socket.AF_INET) + with test_wrap_socket(s) as ss: + self.assertIsNone(ss.get_channel_binding("tls-unique")) + # the same for server-side + s = socket.socket(socket.AF_INET) + with test_wrap_socket(s, server_side=True, certfile=CERTFILE) as ss: + self.assertIsNone(ss.get_channel_binding("tls-unique")) + + def test_dealloc_warn(self): + ss = test_wrap_socket(socket.socket(socket.AF_INET)) + r = repr(ss) + with self.assertWarns(ResourceWarning) as cm: + ss = None + support.gc_collect() + self.assertIn(r, str(cm.warning.args[0])) + + def test_get_default_verify_paths(self): + paths = ssl.get_default_verify_paths() + self.assertEqual(len(paths), 6) + self.assertIsInstance(paths, ssl.DefaultVerifyPaths) + + with support.EnvironmentVarGuard() as env: + env["SSL_CERT_DIR"] = CAPATH + env["SSL_CERT_FILE"] = CERTFILE + paths = ssl.get_default_verify_paths() + self.assertEqual(paths.cafile, CERTFILE) + self.assertEqual(paths.capath, CAPATH) + + @unittest.skipUnless(sys.platform == "win32", "Windows specific") + def test_enum_certificates(self): + self.assertTrue(ssl.enum_certificates("CA")) + self.assertTrue(ssl.enum_certificates("ROOT")) + + self.assertRaises(TypeError, ssl.enum_certificates) + self.assertRaises(WindowsError, ssl.enum_certificates, "") + + trust_oids = set() + for storename in ("CA", "ROOT"): + store = ssl.enum_certificates(storename) + self.assertIsInstance(store, list) + for element in store: + self.assertIsInstance(element, tuple) + self.assertEqual(len(element), 3) + cert, enc, trust = element + self.assertIsInstance(cert, bytes) + self.assertIn(enc, {"x509_asn", "pkcs_7_asn"}) + self.assertIsInstance(trust, (set, bool)) + if isinstance(trust, set): + trust_oids.update(trust) + + serverAuth = "1.3.6.1.5.5.7.3.1" + self.assertIn(serverAuth, trust_oids) + + @unittest.skipUnless(sys.platform == "win32", "Windows specific") + def test_enum_crls(self): + self.assertTrue(ssl.enum_crls("CA")) + self.assertRaises(TypeError, ssl.enum_crls) + self.assertRaises(WindowsError, ssl.enum_crls, "") + + crls = ssl.enum_crls("CA") + self.assertIsInstance(crls, list) + for element in crls: + self.assertIsInstance(element, tuple) + self.assertEqual(len(element), 2) + self.assertIsInstance(element[0], bytes) + self.assertIn(element[1], {"x509_asn", "pkcs_7_asn"}) + + + def test_asn1object(self): + expected = (129, 'serverAuth', 'TLS Web Server Authentication', + '1.3.6.1.5.5.7.3.1') + + val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1') + self.assertEqual(val, expected) + self.assertEqual(val.nid, 129) + self.assertEqual(val.shortname, 'serverAuth') + self.assertEqual(val.longname, 'TLS Web Server Authentication') + self.assertEqual(val.oid, '1.3.6.1.5.5.7.3.1') + self.assertIsInstance(val, ssl._ASN1Object) + self.assertRaises(ValueError, ssl._ASN1Object, 'serverAuth') + + val = ssl._ASN1Object.fromnid(129) + self.assertEqual(val, expected) + self.assertIsInstance(val, ssl._ASN1Object) + self.assertRaises(ValueError, ssl._ASN1Object.fromnid, -1) + with self.assertRaisesRegex(ValueError, "unknown NID 100000"): + ssl._ASN1Object.fromnid(100000) + for i in range(1000): + try: + obj = ssl._ASN1Object.fromnid(i) + except ValueError: + pass + else: + self.assertIsInstance(obj.nid, int) + self.assertIsInstance(obj.shortname, str) + self.assertIsInstance(obj.longname, str) + self.assertIsInstance(obj.oid, (str, type(None))) + + val = ssl._ASN1Object.fromname('TLS Web Server Authentication') + self.assertEqual(val, expected) + self.assertIsInstance(val, ssl._ASN1Object) + self.assertEqual(ssl._ASN1Object.fromname('serverAuth'), expected) + self.assertEqual(ssl._ASN1Object.fromname('1.3.6.1.5.5.7.3.1'), + expected) + with self.assertRaisesRegex(ValueError, "unknown object 'serverauth'"): + ssl._ASN1Object.fromname('serverauth') + + def test_purpose_enum(self): + val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1') + self.assertIsInstance(ssl.Purpose.SERVER_AUTH, ssl._ASN1Object) + self.assertEqual(ssl.Purpose.SERVER_AUTH, val) + self.assertEqual(ssl.Purpose.SERVER_AUTH.nid, 129) + self.assertEqual(ssl.Purpose.SERVER_AUTH.shortname, 'serverAuth') + self.assertEqual(ssl.Purpose.SERVER_AUTH.oid, + '1.3.6.1.5.5.7.3.1') + + val = ssl._ASN1Object('1.3.6.1.5.5.7.3.2') + self.assertIsInstance(ssl.Purpose.CLIENT_AUTH, ssl._ASN1Object) + self.assertEqual(ssl.Purpose.CLIENT_AUTH, val) + self.assertEqual(ssl.Purpose.CLIENT_AUTH.nid, 130) + self.assertEqual(ssl.Purpose.CLIENT_AUTH.shortname, 'clientAuth') + self.assertEqual(ssl.Purpose.CLIENT_AUTH.oid, + '1.3.6.1.5.5.7.3.2') + + def test_unsupported_dtls(self): + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + self.addCleanup(s.close) + with self.assertRaises(NotImplementedError) as cx: + test_wrap_socket(s, cert_reqs=ssl.CERT_NONE) + self.assertEqual(str(cx.exception), "only stream sockets are supported") + ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + with self.assertRaises(NotImplementedError) as cx: + ctx.wrap_socket(s) + self.assertEqual(str(cx.exception), "only stream sockets are supported") + + def cert_time_ok(self, timestring, timestamp): + self.assertEqual(ssl.cert_time_to_seconds(timestring), timestamp) + + def cert_time_fail(self, timestring): + with self.assertRaises(ValueError): + ssl.cert_time_to_seconds(timestring) + + @unittest.skipUnless(utc_offset(), + 'local time needs to be different from UTC') + def test_cert_time_to_seconds_timezone(self): + # Issue #19940: ssl.cert_time_to_seconds() returns wrong + # results if local timezone is not UTC + self.cert_time_ok("May 9 00:00:00 2007 GMT", 1178668800.0) + self.cert_time_ok("Jan 5 09:34:43 2018 GMT", 1515144883.0) + + def test_cert_time_to_seconds(self): + timestring = "Jan 5 09:34:43 2018 GMT" + ts = 1515144883.0 + self.cert_time_ok(timestring, ts) + # accept keyword parameter, assert its name + self.assertEqual(ssl.cert_time_to_seconds(cert_time=timestring), ts) + # accept both %e and %d (space or zero generated by strftime) + self.cert_time_ok("Jan 05 09:34:43 2018 GMT", ts) + # case-insensitive + self.cert_time_ok("JaN 5 09:34:43 2018 GmT", ts) + self.cert_time_fail("Jan 5 09:34 2018 GMT") # no seconds + self.cert_time_fail("Jan 5 09:34:43 2018") # no GMT + self.cert_time_fail("Jan 5 09:34:43 2018 UTC") # not GMT timezone + self.cert_time_fail("Jan 35 09:34:43 2018 GMT") # invalid day + self.cert_time_fail("Jon 5 09:34:43 2018 GMT") # invalid month + self.cert_time_fail("Jan 5 24:00:00 2018 GMT") # invalid hour + self.cert_time_fail("Jan 5 09:60:43 2018 GMT") # invalid minute + + newyear_ts = 1230768000.0 + # leap seconds + self.cert_time_ok("Dec 31 23:59:60 2008 GMT", newyear_ts) + # same timestamp + self.cert_time_ok("Jan 1 00:00:00 2009 GMT", newyear_ts) + + self.cert_time_ok("Jan 5 09:34:59 2018 GMT", 1515144899) + # allow 60th second (even if it is not a leap second) + self.cert_time_ok("Jan 5 09:34:60 2018 GMT", 1515144900) + # allow 2nd leap second for compatibility with time.strptime() + self.cert_time_ok("Jan 5 09:34:61 2018 GMT", 1515144901) + self.cert_time_fail("Jan 5 09:34:62 2018 GMT") # invalid seconds + + # no special treatment for the special value: + # 99991231235959Z (rfc 5280) + self.cert_time_ok("Dec 31 23:59:59 9999 GMT", 253402300799.0) + + @support.run_with_locale('LC_ALL', '') + def test_cert_time_to_seconds_locale(self): + # `cert_time_to_seconds()` should be locale independent + + def local_february_name(): + return time.strftime('%b', (1, 2, 3, 4, 5, 6, 0, 0, 0)) + + if local_february_name().lower() == 'feb': + self.skipTest("locale-specific month name needs to be " + "different from C locale") + + # locale-independent + self.cert_time_ok("Feb 9 00:00:00 2007 GMT", 1170979200.0) + self.cert_time_fail(local_february_name() + " 9 00:00:00 2007 GMT") + + def test_connect_ex_error(self): + server = socket.socket(socket.AF_INET) + self.addCleanup(server.close) + port = support.bind_port(server) # Reserve port but don't listen + s = test_wrap_socket(socket.socket(socket.AF_INET), + cert_reqs=ssl.CERT_REQUIRED) + self.addCleanup(s.close) + rc = s.connect_ex((HOST, port)) + # Issue #19919: Windows machines or VMs hosted on Windows + # machines sometimes return EWOULDBLOCK. + errors = ( + errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.ETIMEDOUT, + errno.EWOULDBLOCK, + ) + self.assertIn(rc, errors) + + +class ContextTests(unittest.TestCase): + + @skip_if_broken_ubuntu_ssl + def test_constructor(self): + for protocol in PROTOCOLS: + ssl.SSLContext(protocol) + ctx = ssl.SSLContext() + self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS) + self.assertRaises(ValueError, ssl.SSLContext, -1) + self.assertRaises(ValueError, ssl.SSLContext, 42) + + @skip_if_broken_ubuntu_ssl + def test_protocol(self): + for proto in PROTOCOLS: + ctx = ssl.SSLContext(proto) + self.assertEqual(ctx.protocol, proto) + + def test_ciphers(self): + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + ctx.set_ciphers("ALL") + ctx.set_ciphers("DEFAULT") + with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"): + ctx.set_ciphers("^$:,;?*'dorothyx") + + @unittest.skipIf(ssl.OPENSSL_VERSION_INFO < (1, 0, 2, 0, 0), 'OpenSSL too old') + def test_get_ciphers(self): + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + ctx.set_ciphers('AESGCM') + names = set(d['name'] for d in ctx.get_ciphers()) + self.assertIn('AES256-GCM-SHA384', names) + self.assertIn('AES128-GCM-SHA256', names) + + @skip_if_broken_ubuntu_ssl + def test_options(self): + ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) + # OP_ALL | OP_NO_SSLv2 | OP_NO_SSLv3 is the default value + default = (ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3) + # SSLContext also enables these by default + default |= (OP_NO_COMPRESSION | OP_CIPHER_SERVER_PREFERENCE | + OP_SINGLE_DH_USE | OP_SINGLE_ECDH_USE | + OP_ENABLE_MIDDLEBOX_COMPAT) + self.assertEqual(default, ctx.options) + ctx.options |= ssl.OP_NO_TLSv1 + self.assertEqual(default | ssl.OP_NO_TLSv1, ctx.options) + if can_clear_options(): + ctx.options = (ctx.options & ~ssl.OP_NO_TLSv1) + self.assertEqual(default, ctx.options) + ctx.options = 0 + # Ubuntu has OP_NO_SSLv3 forced on by default + self.assertEqual(0, ctx.options & ~ssl.OP_NO_SSLv3) + else: + with self.assertRaises(ValueError): + ctx.options = 0 + + def test_verify_mode(self): + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + # Default value + self.assertEqual(ctx.verify_mode, ssl.CERT_NONE) + ctx.verify_mode = ssl.CERT_OPTIONAL + self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL) + ctx.verify_mode = ssl.CERT_REQUIRED + self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED) + ctx.verify_mode = ssl.CERT_NONE + self.assertEqual(ctx.verify_mode, ssl.CERT_NONE) + with self.assertRaises(TypeError): + ctx.verify_mode = None + with self.assertRaises(ValueError): + ctx.verify_mode = 42 + + @unittest.skipUnless(have_verify_flags(), + "verify_flags need OpenSSL > 0.9.8") + def test_verify_flags(self): + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + # default value + tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0) + self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT | tf) + ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF + self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_LEAF) + ctx.verify_flags = ssl.VERIFY_CRL_CHECK_CHAIN + self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_CHAIN) + ctx.verify_flags = ssl.VERIFY_DEFAULT + self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT) + # supports any value + ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT + self.assertEqual(ctx.verify_flags, + ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT) + with self.assertRaises(TypeError): + ctx.verify_flags = None + + def test_load_cert_chain(self): + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + # Combined key and cert in a single file + ctx.load_cert_chain(CERTFILE, keyfile=None) + ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE) + self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE) + with self.assertRaises(OSError) as cm: + ctx.load_cert_chain(NONEXISTINGCERT) + self.assertEqual(cm.exception.errno, errno.ENOENT) + with self.assertRaisesRegex(ssl.SSLError, "PEM lib"): + ctx.load_cert_chain(BADCERT) + with self.assertRaisesRegex(ssl.SSLError, "PEM lib"): + ctx.load_cert_chain(EMPTYCERT) + # Separate key and cert + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + ctx.load_cert_chain(ONLYCERT, ONLYKEY) + ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY) + ctx.load_cert_chain(certfile=BYTES_ONLYCERT, keyfile=BYTES_ONLYKEY) + with self.assertRaisesRegex(ssl.SSLError, "PEM lib"): + ctx.load_cert_chain(ONLYCERT) + with self.assertRaisesRegex(ssl.SSLError, "PEM lib"): + ctx.load_cert_chain(ONLYKEY) + with self.assertRaisesRegex(ssl.SSLError, "PEM lib"): + ctx.load_cert_chain(certfile=ONLYKEY, keyfile=ONLYCERT) + # Mismatching key and cert + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + with self.assertRaisesRegex(ssl.SSLError, "key values mismatch"): + ctx.load_cert_chain(CAFILE_CACERT, ONLYKEY) + # Password protected key and cert + ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD) + ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD.encode()) + ctx.load_cert_chain(CERTFILE_PROTECTED, + password=bytearray(KEY_PASSWORD.encode())) + ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD) + ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD.encode()) + ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, + bytearray(KEY_PASSWORD.encode())) + with self.assertRaisesRegex(TypeError, "should be a string"): + ctx.load_cert_chain(CERTFILE_PROTECTED, password=True) + with self.assertRaises(ssl.SSLError): + ctx.load_cert_chain(CERTFILE_PROTECTED, password="badpass") + with self.assertRaisesRegex(ValueError, "cannot be longer"): + # openssl has a fixed limit on the password buffer. + # PEM_BUFSIZE is generally set to 1kb. + # Return a string larger than this. + ctx.load_cert_chain(CERTFILE_PROTECTED, password=b'a' * 102400) + # Password callback + def getpass_unicode(): + return KEY_PASSWORD + def getpass_bytes(): + return KEY_PASSWORD.encode() + def getpass_bytearray(): + return bytearray(KEY_PASSWORD.encode()) + def getpass_badpass(): + return "badpass" + def getpass_huge(): + return b'a' * (1024 * 1024) + def getpass_bad_type(): + return 9 + def getpass_exception(): + raise Exception('getpass error') + class GetPassCallable: + def __call__(self): + return KEY_PASSWORD + def getpass(self): + return KEY_PASSWORD + ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_unicode) + ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytes) + ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytearray) + ctx.load_cert_chain(CERTFILE_PROTECTED, password=GetPassCallable()) + ctx.load_cert_chain(CERTFILE_PROTECTED, + password=GetPassCallable().getpass) + with self.assertRaises(ssl.SSLError): + ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_badpass) + with self.assertRaisesRegex(ValueError, "cannot be longer"): + ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_huge) + with self.assertRaisesRegex(TypeError, "must return a string"): + ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bad_type) + with self.assertRaisesRegex(Exception, "getpass error"): + ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_exception) + # Make sure the password function isn't called if it isn't needed + ctx.load_cert_chain(CERTFILE, password=getpass_exception) + + def test_load_verify_locations(self): + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + ctx.load_verify_locations(CERTFILE) + ctx.load_verify_locations(cafile=CERTFILE, capath=None) + ctx.load_verify_locations(BYTES_CERTFILE) + ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None) + self.assertRaises(TypeError, ctx.load_verify_locations) + self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None) + with self.assertRaises(OSError) as cm: + ctx.load_verify_locations(NONEXISTINGCERT) + self.assertEqual(cm.exception.errno, errno.ENOENT) + with self.assertRaisesRegex(ssl.SSLError, "PEM lib"): + ctx.load_verify_locations(BADCERT) + ctx.load_verify_locations(CERTFILE, CAPATH) + ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH) + + # Issue #10989: crash if the second argument type is invalid + self.assertRaises(TypeError, ctx.load_verify_locations, None, True) + + def test_load_verify_cadata(self): + # test cadata + with open(CAFILE_CACERT) as f: + cacert_pem = f.read() + cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem) + with open(CAFILE_NEURONIO) as f: + neuronio_pem = f.read() + neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem) + + # test PEM + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0) + ctx.load_verify_locations(cadata=cacert_pem) + self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1) + ctx.load_verify_locations(cadata=neuronio_pem) + self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2) + # cert already in hash table + ctx.load_verify_locations(cadata=neuronio_pem) + self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2) + + # combined + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + combined = "\n".join((cacert_pem, neuronio_pem)) + ctx.load_verify_locations(cadata=combined) + self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2) + + # with junk around the certs + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + combined = ["head", cacert_pem, "other", neuronio_pem, "again", + neuronio_pem, "tail"] + ctx.load_verify_locations(cadata="\n".join(combined)) + self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2) + + # test DER + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + ctx.load_verify_locations(cadata=cacert_der) + ctx.load_verify_locations(cadata=neuronio_der) + self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2) + # cert already in hash table + ctx.load_verify_locations(cadata=cacert_der) + self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2) + + # combined + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + combined = b"".join((cacert_der, neuronio_der)) + ctx.load_verify_locations(cadata=combined) + self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2) + + # error cases + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object) + + with self.assertRaisesRegex(ssl.SSLError, "no start line"): + ctx.load_verify_locations(cadata="broken") + with self.assertRaisesRegex(ssl.SSLError, "not enough data"): + ctx.load_verify_locations(cadata=b"broken") + + + def test_load_dh_params(self): + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + ctx.load_dh_params(DHFILE) + if os.name != 'nt': + ctx.load_dh_params(BYTES_DHFILE) + self.assertRaises(TypeError, ctx.load_dh_params) + self.assertRaises(TypeError, ctx.load_dh_params, None) + with self.assertRaises(FileNotFoundError) as cm: + ctx.load_dh_params(NONEXISTINGCERT) + self.assertEqual(cm.exception.errno, errno.ENOENT) + with self.assertRaises(ssl.SSLError) as cm: + ctx.load_dh_params(CERTFILE) + + @skip_if_broken_ubuntu_ssl + def test_session_stats(self): + for proto in PROTOCOLS: + ctx = ssl.SSLContext(proto) + self.assertEqual(ctx.session_stats(), { + 'number': 0, + 'connect': 0, + 'connect_good': 0, + 'connect_renegotiate': 0, + 'accept': 0, + 'accept_good': 0, + 'accept_renegotiate': 0, + 'hits': 0, + 'misses': 0, + 'timeouts': 0, + 'cache_full': 0, + }) + + def test_set_default_verify_paths(self): + # There's not much we can do to test that it acts as expected, + # so just check it doesn't crash or raise an exception. + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + ctx.set_default_verify_paths() + + @unittest.skipUnless(ssl.HAS_ECDH, "ECDH disabled on this OpenSSL build") + def test_set_ecdh_curve(self): + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + ctx.set_ecdh_curve("prime256v1") + ctx.set_ecdh_curve(b"prime256v1") + self.assertRaises(TypeError, ctx.set_ecdh_curve) + self.assertRaises(TypeError, ctx.set_ecdh_curve, None) + self.assertRaises(ValueError, ctx.set_ecdh_curve, "foo") + self.assertRaises(ValueError, ctx.set_ecdh_curve, b"foo") + + @needs_sni + def test_sni_callback(self): + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + + # set_servername_callback expects a callable, or None + self.assertRaises(TypeError, ctx.set_servername_callback) + self.assertRaises(TypeError, ctx.set_servername_callback, 4) + self.assertRaises(TypeError, ctx.set_servername_callback, "") + self.assertRaises(TypeError, ctx.set_servername_callback, ctx) + + def dummycallback(sock, servername, ctx): + pass + ctx.set_servername_callback(None) + ctx.set_servername_callback(dummycallback) + + @needs_sni + def test_sni_callback_refcycle(self): + # Reference cycles through the servername callback are detected + # and cleared. + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + def dummycallback(sock, servername, ctx, cycle=ctx): + pass + ctx.set_servername_callback(dummycallback) + wr = weakref.ref(ctx) + del ctx, dummycallback + gc.collect() + self.assertIs(wr(), None) + + def test_cert_store_stats(self): + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + self.assertEqual(ctx.cert_store_stats(), + {'x509_ca': 0, 'crl': 0, 'x509': 0}) + ctx.load_cert_chain(CERTFILE) + self.assertEqual(ctx.cert_store_stats(), + {'x509_ca': 0, 'crl': 0, 'x509': 0}) + ctx.load_verify_locations(CERTFILE) + self.assertEqual(ctx.cert_store_stats(), + {'x509_ca': 0, 'crl': 0, 'x509': 1}) + ctx.load_verify_locations(CAFILE_CACERT) + self.assertEqual(ctx.cert_store_stats(), + {'x509_ca': 1, 'crl': 0, 'x509': 2}) + + def test_get_ca_certs(self): + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + self.assertEqual(ctx.get_ca_certs(), []) + # CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE + ctx.load_verify_locations(CERTFILE) + self.assertEqual(ctx.get_ca_certs(), []) + # but CAFILE_CACERT is a CA cert + ctx.load_verify_locations(CAFILE_CACERT) + self.assertEqual(ctx.get_ca_certs(), + [{'issuer': ((('organizationName', 'Root CA'),), + (('organizationalUnitName', 'http://www.cacert.org'),), + (('commonName', 'CA Cert Signing Authority'),), + (('emailAddress', 'supp...@cacert.org'),)), + 'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'), + 'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'), + 'serialNumber': '00', + 'crlDistributionPoints': ('https://www.cacert.org/revoke.crl',), + 'subject': ((('organizationName', 'Root CA'),), + (('organizationalUnitName', 'http://www.cacert.org'),), + (('commonName', 'CA Cert Signing Authority'),), + (('emailAddress', 'supp...@cacert.org'),)), + 'version': 3}]) + + with open(CAFILE_CACERT) as f: + pem = f.read() + der = ssl.PEM_cert_to_DER_cert(pem) + self.assertEqual(ctx.get_ca_certs(True), [der]) + + def test_load_default_certs(self): + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + ctx.load_default_certs() + + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + ctx.load_default_certs(ssl.Purpose.SERVER_AUTH) + ctx.load_default_certs() + + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH) + + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + self.assertRaises(TypeError, ctx.load_default_certs, None) + self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH') + + @unittest.skipIf(sys.platform == "win32", "not-Windows specific") + @unittest.skipIf(IS_LIBRESSL, "LibreSSL doesn't support env vars") + def test_load_default_certs_env(self): + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + with support.EnvironmentVarGuard() as env: + env["SSL_CERT_DIR"] = CAPATH + env["SSL_CERT_FILE"] = CERTFILE + ctx.load_default_certs() + self.assertEqual(ctx.cert_store_stats(), {"crl": 0, "x509": 1, "x509_ca": 0}) + + @unittest.skipUnless(sys.platform == "win32", "Windows specific") + def test_load_default_certs_env_windows(self): + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + ctx.load_default_certs() + stats = ctx.cert_store_stats() + + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + with support.EnvironmentVarGuard() as env: + env["SSL_CERT_DIR"] = CAPATH + env["SSL_CERT_FILE"] = CERTFILE + ctx.load_default_certs() + stats["x509"] += 1 + self.assertEqual(ctx.cert_store_stats(), stats) + + def _assert_context_options(self, ctx): + self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2) + if OP_NO_COMPRESSION != 0: + self.assertEqual(ctx.options & OP_NO_COMPRESSION, + OP_NO_COMPRESSION) + if OP_SINGLE_DH_USE != 0: + self.assertEqual(ctx.options & OP_SINGLE_DH_USE, + OP_SINGLE_DH_USE) + if OP_SINGLE_ECDH_USE != 0: + self.assertEqual(ctx.options & OP_SINGLE_ECDH_USE, + OP_SINGLE_ECDH_USE) + if OP_CIPHER_SERVER_PREFERENCE != 0: + self.assertEqual(ctx.options & OP_CIPHER_SERVER_PREFERENCE, + OP_CIPHER_SERVER_PREFERENCE) + + def test_create_default_context(self): + ctx = ssl.create_default_context() + + self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23) + self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED) + self.assertTrue(ctx.check_hostname) + self._assert_context_options(ctx) + + + with open(SIGNING_CA) as f: + cadata = f.read() + ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH, + cadata=cadata) + self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23) + self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED) + self._assert_context_options(ctx) + + ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) + self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23) + self.assertEqual(ctx.verify_mode, ssl.CERT_NONE) + self._assert_context_options(ctx) + + def test__create_stdlib_context(self): + ctx = ssl._create_stdlib_context() + self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23) + self.assertEqual(ctx.verify_mode, ssl.CERT_NONE) + self.assertFalse(ctx.check_hostname) + self._assert_context_options(ctx) + + ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1) + self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1) + self.assertEqual(ctx.verify_mode, ssl.CERT_NONE) + self._assert_context_options(ctx) + + ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1, + cert_reqs=ssl.CERT_REQUIRED, + check_hostname=True) + self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1) + self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED) + self.assertTrue(ctx.check_hostname) + self._assert_context_options(ctx) + + ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH) + self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23) + self.assertEqual(ctx.verify_mode, ssl.CERT_NONE) + self._assert_context_options(ctx) + + def test_check_hostname(self): + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + self.assertFalse(ctx.check_hostname) + + # Requires CERT_REQUIRED or CERT_OPTIONAL + with self.assertRaises(ValueError): + ctx.check_hostname = True + ctx.verify_mode = ssl.CERT_REQUIRED + self.assertFalse(ctx.check_hostname) + ctx.check_hostname = True + self.assertTrue(ctx.check_hostname) + + ctx.verify_mode = ssl.CERT_OPTIONAL + ctx.check_hostname = True + self.assertTrue(ctx.check_hostname) + + # Cannot set CERT_NONE with check_hostname enabled + with self.assertRaises(ValueError): + ctx.verify_mode = ssl.CERT_NONE + ctx.check_hostname = False + self.assertFalse(ctx.check_hostname) + + def test_context_client_server(self): + # PROTOCOL_TLS_CLIENT has sane defaults + ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) + self.assertTrue(ctx.check_hostname) + self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED) + + # PROTOCOL_TLS_SERVER has different but also sane defaults + ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) + self.assertFalse(ctx.check_hostname) + self.assertEqual(ctx.verify_mode, ssl.CERT_NONE) + + +class SSLErrorTests(unittest.TestCase): + + def test_str(self): + # The str() of a SSLError doesn't include the errno + e = ssl.SSLError(1, "foo") + self.assertEqual(str(e), "foo") + self.assertEqual(e.errno, 1) + # Same for a subclass + e = ssl.SSLZeroReturnError(1, "foo") + self.assertEqual(str(e), "foo") + self.assertEqual(e.errno, 1) + + def test_lib_reason(self): + # Test the library and reason attributes + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + with self.assertRaises(ssl.SSLError) as cm: + ctx.load_dh_params(CERTFILE) + self.assertEqual(cm.exception.library, 'PEM') + self.assertEqual(cm.exception.reason, 'NO_START_LINE') + s = str(cm.exception) + self.assertTrue(s.startswith("[PEM: NO_START_LINE] no start line"), s) + + def test_subclass(self): + # Check that the appropriate SSLError subclass is raised + # (this only tests one of them) + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + with socket.socket() as s: + s.bind(("127.0.0.1", 0)) + s.listen() + c = socket.socket() + c.connect(s.getsockname()) + c.setblocking(False) + with ctx.wrap_socket(c, False, do_handshake_on_connect=False) as c: + with self.assertRaises(ssl.SSLWantReadError) as cm: + c.do_handshake() + s = str(cm.exception) + self.assertTrue(s.startswith("The operation did not complete (read)"), s) + # For compatibility + self.assertEqual(cm.exception.errno, ssl.SSL_ERROR_WANT_READ) + + +class MemoryBIOTests(unittest.TestCase): + + def test_read_write(self): + bio = ssl.MemoryBIO() + bio.write(b'foo') + self.assertEqual(bio.read(), b'foo') + self.assertEqual(bio.read(), b'') + bio.write(b'foo') + bio.write(b'bar') + self.assertEqual(bio.read(), b'foobar') + self.assertEqual(bio.read(), b'') + bio.write(b'baz') + self.assertEqual(bio.read(2), b'ba') + self.assertEqual(bio.read(1), b'z') + self.assertEqual(bio.read(1), b'') + + def test_eof(self): + bio = ssl.MemoryBIO() + self.assertFalse(bio.eof) + self.assertEqual(bio.read(), b'') + self.assertFalse(bio.eof) + bio.write(b'foo') + self.assertFalse(bio.eof) + bio.write_eof() + self.assertFalse(bio.eof) + self.assertEqual(bio.read(2), b'fo') + self.assertFalse(bio.eof) + self.assertEqual(bio.read(1), b'o') + self.assertTrue(bio.eof) + self.assertEqual(bio.read(), b'') + self.assertTrue(bio.eof) + + def test_pending(self): + bio = ssl.MemoryBIO() + self.assertEqual(bio.pending, 0) + bio.write(b'foo') + self.assertEqual(bio.pending, 3) + for i in range(3): + bio.read(1) + self.assertEqual(bio.pending, 3-i-1) + for i in range(3): + bio.write(b'x') + self.assertEqual(bio.pending, i+1) + bio.read() + self.assertEqual(bio.pending, 0) + + def test_buffer_types(self): + bio = ssl.MemoryBIO() + bio.write(b'foo') + self.assertEqual(bio.read(), b'foo') + bio.write(bytearray(b'bar')) + self.assertEqual(bio.read(), b'bar') + bio.write(memoryview(b'baz')) + self.assertEqual(bio.read(), b'baz') + + def test_error_types(self): + bio = ssl.MemoryBIO() + self.assertRaises(TypeError, bio.write, 'foo') + self.assertRaises(TypeError, bio.write, None) + self.assertRaises(TypeError, bio.write, True) + self.assertRaises(TypeError, bio.write, 1) + + +@unittest.skipUnless(_have_threads, "Needs threading module") +class SimpleBackgroundTests(unittest.TestCase): + + """Tests that connect to a simple server running in the background""" + + def setUp(self): + server = ThreadedEchoServer(SIGNED_CERTFILE) + self.server_addr = (HOST, server.port) + server.__enter__() + self.addCleanup(server.__exit__, None, None, None) + + def test_connect(self): + with test_wrap_socket(socket.socket(socket.AF_INET), + cert_reqs=ssl.CERT_NONE) as s: + s.connect(self.server_addr) + self.assertEqual({}, s.getpeercert()) + self.assertFalse(s.server_side) + + # this should succeed because we specify the root cert + with test_wrap_socket(socket.socket(socket.AF_INET), + cert_reqs=ssl.CERT_REQUIRED, + ca_certs=SIGNING_CA) as s: + s.connect(self.server_addr) + self.assertTrue(s.getpeercert()) + self.assertFalse(s.server_side) + + def test_connect_fail(self): + # This should fail because we have no verification certs. Connection + # failure crashes ThreadedEchoServer, so run this in an independent + # test method. + s = test_wrap_socket(socket.socket(socket.AF_INET), + cert_reqs=ssl.CERT_REQUIRED) + self.addCleanup(s.close) + self.assertRaisesRegex(ssl.SSLError, "certificate verify failed", + s.connect, self.server_addr) + + def test_connect_ex(self): + # Issue #11326: check connect_ex() implementation + s = test_wrap_socket(socket.socket(socket.AF_INET), + cert_reqs=ssl.CERT_REQUIRED, + ca_certs=SIGNING_CA) + self.addCleanup(s.close) + self.assertEqual(0, s.connect_ex(self.server_addr)) + self.assertTrue(s.getpeercert()) + + def test_non_blocking_connect_ex(self): + # Issue #11326: non-blocking connect_ex() should allow handshake + # to proceed after the socket gets ready. + s = test_wrap_socket(socket.socket(socket.AF_INET), + cert_reqs=ssl.CERT_REQUIRED, + ca_certs=SIGNING_CA, + do_handshake_on_connect=False) + self.addCleanup(s.close) + s.setblocking(False) + rc = s.connect_ex(self.server_addr) + # EWOULDBLOCK under Windows, EINPROGRESS elsewhere + self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK)) + # Wait for connect to finish + select.select([], [s], [], 5.0) + # Non-blocking handshake + while True: + try: + s.do_handshake() + break + except ssl.SSLWantReadError: + select.select([s], [], [], 5.0) + except ssl.SSLWantWriteError: + select.select([], [s], [], 5.0) + # SSL established + self.assertTrue(s.getpeercert()) + + def test_connect_with_context(self): + # Same as test_connect, but with a separately created context + ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s: + s.connect(self.server_addr) + self.assertEqual({}, s.getpeercert()) + # Same with a server hostname + with ctx.wrap_socket(socket.socket(socket.AF_INET), + server_hostname="dummy") as s: + s.connect(self.server_addr) + ctx.verify_mode = ssl.CERT_REQUIRED + # This should succeed because we specify the root cert + ctx.load_verify_locations(SIGNING_CA) + with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s: + s.connect(self.server_addr) + cert = s.getpeercert() + self.assertTrue(cert) + + def test_connect_with_context_fail(self): + # This should fail because we have no verification certs. Connection + # failure crashes ThreadedEchoServer, so run this in an independent + # test method. + ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + ctx.verify_mode = ssl.CERT_REQUIRED + s = ctx.wrap_socket(socket.socket(socket.AF_INET)) + self.addCleanup(s.close) + self.assertRaisesRegex(ssl.SSLError, "certificate verify failed", + s.connect, self.server_addr) + + def test_connect_capath(self): + # Verify server certificates using the `capath` argument + # NOTE: the subject hashing algorithm has been changed between + # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must + # contain both versions of each certificate (same content, different + # filename) for this test to be portable across OpenSSL releases. + ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + ctx.verify_mode = ssl.CERT_REQUIRED + ctx.load_verify_locations(capath=CAPATH) + with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s: + s.connect(self.server_addr) + cert = s.getpeercert() + self.assertTrue(cert) + # Same with a bytes `capath` argument + ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + ctx.verify_mode = ssl.CERT_REQUIRED + ctx.load_verify_locations(capath=BYTES_CAPATH) + with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s: + s.connect(self.server_addr) + cert = s.getpeercert() + self.assertTrue(cert) + + def test_connect_cadata(self): + with open(SIGNING_CA) as f: + pem = f.read() + der = ssl.PEM_cert_to_DER_cert(pem) + ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + ctx.verify_mode = ssl.CERT_REQUIRED + ctx.load_verify_locations(cadata=pem) + with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s: + s.connect(self.server_addr) + cert = s.getpeercert() + self.assertTrue(cert) + + # same with DER + ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + ctx.verify_mode = ssl.CERT_REQUIRED + ctx.load_verify_locations(cadata=der) + with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s: + s.connect(self.server_addr) + cert = s.getpeercert() + self.assertTrue(cert) + + @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows") + def test_makefile_close(self): + # Issue #5238: creating a file-like object with makefile() shouldn't + # delay closing the underlying "real socket" (here tested with its + # file descriptor, hence skipping the test under Windows). + ss = test_wrap_socket(socket.socket(socket.AF_INET)) + ss.connect(self.server_addr) + fd = ss.fileno() + f = ss.makefile() + f.close() + # The fd is still open + os.read(fd, 0) + # Closing the SSL socket should close the fd too + ss.close() + gc.collect() + with self.assertRaises(OSError) as e: + os.read(fd, 0) + self.assertEqual(e.exception.errno, errno.EBADF) + + def test_non_blocking_handshake(self): + s = socket.socket(socket.AF_INET) + s.connect(self.server_addr) + s.setblocking(False) + s = test_wrap_socket(s, + cert_reqs=ssl.CERT_NONE, + do_handshake_on_connect=False) + self.addCleanup(s.close) + count = 0 + while True: + try: + count += 1 + s.do_handshake() + break + except ssl.SSLWantReadError: + select.select([s], [], []) + except ssl.SSLWantWriteError: + select.select([], [s], []) + if support.verbose: + sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count) + + def test_get_server_certificate(self): + _test_get_server_certificate(self, *self.server_addr, cert=SIGNING_CA) + + def test_get_server_certificate_fail(self): + # Connection failure crashes ThreadedEchoServer, so run this in an + # independent test method + _test_get_server_certificate_fail(self, *self.server_addr) + + def test_ciphers(self): + with test_wrap_socket(socket.socket(socket.AF_INET), + cert_reqs=ssl.CERT_NONE, ciphers="ALL") as s: + s.connect(self.server_addr) + with test_wrap_socket(socket.socket(socket.AF_INET), + cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") as s: + s.connect(self.server_addr) + # Error checking can happen at instantiation or when connecting + with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"): + with socket.socket(socket.AF_INET) as sock: + s = test_wrap_socket(sock, + cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx") + s.connect(self.server_addr) + + def test_get_ca_certs_capath(self): + # capath certs are loaded on request + ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + ctx.verify_mode = ssl.CERT_REQUIRED + ctx.load_verify_locations(capath=CAPATH) + self.assertEqual(ctx.get_ca_certs(), []) + with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s: + s.connect(self.server_addr) + cert = s.getpeercert() + self.assertTrue(cert) + self.assertEqual(len(ctx.get_ca_certs()), 1) + + @needs_sni + @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_2"), "needs TLS 1.2") + def test_context_setget(self): + # Check that the context of a connected socket can be replaced. + ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) + ctx2 = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + s = socket.socket(socket.AF_INET) + with ctx1.wrap_socket(s) as ss: + ss.connect(self.server_addr) + self.assertIs(ss.context, ctx1) + self.assertIs(ss._sslobj.context, ctx1) + ss.context = ctx2 + self.assertIs(ss.context, ctx2) + self.assertIs(ss._sslobj.context, ctx2) + + def ssl_io_loop(self, sock, incoming, outgoing, func, *args, **kwargs): + # A simple IO loop. Call func(*args) depending on the error we get + # (WANT_READ or WANT_WRITE) move data between the socket and the BIOs. + timeout = kwargs.get('timeout', 10) + count = 0 + while True: + errno = None + count += 1 + try: + ret = func(*args) + except ssl.SSLError as e: + if e.errno not in (ssl.SSL_ERROR_WANT_READ, + ssl.SSL_ERROR_WANT_WRITE): + raise + errno = e.errno + # Get any data from the outgoing BIO irrespective of any error, and + # send it to the socket. + buf = outgoing.read() + sock.sendall(buf) + # If there's no error, we're done. For WANT_READ, we need to get + # data from the socket and put it in the incoming BIO. + if errno is None: + break + elif errno == ssl.SSL_ERROR_WANT_READ: + buf = sock.recv(32768) + if buf: + incoming.write(buf) + else: + incoming.write_eof() + if support.verbose: + sys.stdout.write("Needed %d calls to complete %s().\n" + % (count, func.__name__)) + return ret + + def test_bio_handshake(self): + sock = socket.socket(socket.AF_INET) + self.addCleanup(sock.close) + sock.connect(self.server_addr) + incoming = ssl.MemoryBIO() + outgoing = ssl.MemoryBIO() + ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + ctx.verify_mode = ssl.CERT_REQUIRED + ctx.load_verify_locations(SIGNING_CA) + ctx.check_hostname = True + sslobj = ctx.wrap_bio(incoming, outgoing, False, 'localhost') + self.assertIs(sslobj._sslobj.owner, sslobj) + self.assertIsNone(sslobj.cipher()) + # cypthon implementation detail + # self.assertIsNone(sslobj.version()) + self.assertIsNotNone(sslobj.shared_ciphers()) + self.assertRaises(ValueError, sslobj.getpeercert) + if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES: + self.assertIsNone(sslobj.get_channel_binding('tls-unique')) + self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake) + self.assertTrue(sslobj.cipher()) + self.assertIsNotNone(sslobj.shared_ciphers()) + self.assertIsNotNone(sslobj.version()) + self.assertTrue(sslobj.getpeercert()) + if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES: + self.assertTrue(sslobj.get_channel_binding('tls-unique')) + try: + self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap) + except ssl.SSLSyscallError: + # If the server shuts down the TCP connection without sending a + # secure shutdown message, this is reported as SSL_ERROR_SYSCALL + pass + self.assertRaises(ssl.SSLError, sslobj.write, b'foo') + + def test_bio_read_write_data(self): + sock = socket.socket(socket.AF_INET) + self.addCleanup(sock.close) + sock.connect(self.server_addr) + incoming = ssl.MemoryBIO() + outgoing = ssl.MemoryBIO() + ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + ctx.verify_mode = ssl.CERT_NONE + sslobj = ctx.wrap_bio(incoming, outgoing, False) + self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake) + req = b'FOO\n' + self.ssl_io_loop(sock, incoming, outgoing, sslobj.write, req) + buf = self.ssl_io_loop(sock, incoming, outgoing, sslobj.read, 1024) + self.assertEqual(buf, b'foo\n') + self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap) + + +class NetworkedTests(unittest.TestCase): + + def test_timeout_connect_ex(self): + # Issue #12065: on a timeout, connect_ex() should return the original + # errno (mimicking the behaviour of non-SSL sockets). + with support.transient_internet(REMOTE_HOST): + s = test_wrap_socket(socket.socket(socket.AF_INET), + cert_reqs=ssl.CERT_REQUIRED, + do_handshake_on_connect=False) + self.addCleanup(s.close) + s.settimeout(0.0000001) + rc = s.connect_ex((REMOTE_HOST, 443)) + if rc == 0: + self.skipTest("REMOTE_HOST responded too quickly") + self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK)) + + @unittest.skipUnless(support.IPV6_ENABLED, 'Needs IPv6') + def test_get_server_certificate_ipv6(self): + with support.transient_internet('ipv6.google.com'): + _test_get_server_certificate(self, 'ipv6.google.com', 443) + _test_get_server_certificate_fail(self, 'ipv6.google.com', 443) + + +def _test_get_server_certificate(test, host, port, cert=None): + pem = ssl.get_server_certificate((host, port)) + if not pem: + test.fail("No server certificate on %s:%s!" % (host, port)) + + pem = ssl.get_server_certificate((host, port), ca_certs=cert) + if not pem: + test.fail("No server certificate on %s:%s!" % (host, port)) + if support.verbose: + sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem)) + +def _test_get_server_certificate_fail(test, host, port): + try: + pem = ssl.get_server_certificate((host, port), ca_certs=CERTFILE) + except ssl.SSLError as x: + #should fail + if support.verbose: + sys.stdout.write("%s\n" % x) + else: + test.fail("Got server certificate %s for %s:%s!" % (pem, host, port)) + + +if _have_threads: + from test.ssl_servers import make_https_server + + class ThreadedEchoServer(threading.Thread): + + class ConnectionHandler(threading.Thread): + + """A mildly complicated class, because we want it to work both + with and without the SSL wrapper around the socket connection, so + that we can test the STARTTLS functionality.""" + + def __init__(self, server, connsock, addr): + self.server = server + self.running = False + self.sock = connsock + self.addr = addr + self.sock.setblocking(1) + self.sslconn = None + threading.Thread.__init__(self) + self.daemon = True + + def wrap_conn(self): + try: + self.sslconn = self.server.context.wrap_socket( + self.sock, server_side=True) + self.server.selected_npn_protocols.append(self.sslconn.selected_npn_protocol()) + self.server.selected_alpn_protocols.append(self.sslconn.selected_alpn_protocol()) + except (ConnectionResetError, BrokenPipeError) as e: + # We treat ConnectionResetError as though it were an + # SSLError - OpenSSL on Ubuntu abruptly closes the + # connection when asked to use an unsupported protocol. + # + # BrokenPipeError is raised in TLS 1.3 mode, when OpenSSL + # tries to send session tickets after handshake. + # https://github.com/openssl/openssl/issues/6342 + self.server.conn_errors.append(str(e)) + if self.server.chatty: + handle_error( + "\n server: bad connection attempt from " + repr( + self.addr) + ":\n") + self.running = False + self.close() + return False + except (ssl.SSLError, OSError) as e: + # OSError may occur with wrong protocols, e.g. both + # sides use PROTOCOL_TLS_SERVER. + # + # XXX Various errors can have happened here, for example + # a mismatching protocol version, an invalid certificate, + # or a low-level bug. This should be made more discriminating. + # + # bpo-31323: Store the exception as string to prevent + # a reference leak: server -> conn_errors -> exception + # -> traceback -> self (ConnectionHandler) -> server + self.server.conn_errors.append(str(e)) + if self.server.chatty: + handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n") + self.running = False + self.server.stop() + self.close() + return False + else: + self.server.shared_ciphers.append(self.sslconn.shared_ciphers()) + if self.server.context.verify_mode == ssl.CERT_REQUIRED: + cert = self.sslconn.getpeercert() + if support.verbose and self.server.chatty: + sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n") + cert_binary = self.sslconn.getpeercert(True) + if support.verbose and self.server.chatty: + sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n") + cipher = self.sslconn.cipher() + if support.verbose and self.server.chatty: + sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n") + sys.stdout.write(" server: selected protocol is now " + + str(self.sslconn.selected_npn_protocol()) + "\n") + return True + + def read(self): + if self.sslconn: + return self.sslconn.read() + else: + return self.sock.recv(1024) + + def write(self, bytes): + if self.sslconn: + return self.sslconn.write(bytes) + else: + return self.sock.send(bytes) + + def close(self): + if self.sslconn: + self.sslconn.close() + else: _______________________________________________ pypy-commit mailing list pypy-commit@python.org https://mail.python.org/mailman/listinfo/pypy-commit