Author: Richard Plangger <planri...@gmail.com> Branch: py3.5-ssl Changeset: r88436:c22e66d77f66 Date: 2016-11-17 14:19 +0100 http://bitbucket.org/pypy/pypy/changeset/c22e66d77f66/
Log: remove the old pypy/module/_ssl module diff too long, truncating to 2000 out of 3517 lines diff --git a/pypy/module/_ssl/__init__.py b/pypy/module/_ssl/__init__.py deleted file mode 100644 --- a/pypy/module/_ssl/__init__.py +++ /dev/null @@ -1,67 +0,0 @@ -import sys -from rpython.rlib.rarithmetic import intmask -from pypy.interpreter.mixedmodule import MixedModule -from pypy.module._ssl import ssl_data - -class Module(MixedModule): - """Implementation module for SSL socket operations. - See the socket module for documentation.""" - - interpleveldefs = { - '_test_decode_cert': 'interp_ssl._test_decode_cert', - 'txt2obj': 'interp_ssl.txt2obj', - 'nid2obj': 'interp_ssl.nid2obj', - 'get_default_verify_paths': 'interp_ssl.get_default_verify_paths', - - 'SSLError': 'interp_ssl.get_error(space).w_error', - 'SSLZeroReturnError': 'interp_ssl.get_error(space).w_ZeroReturnError', - 'SSLWantReadError': 'interp_ssl.get_error(space).w_WantReadError', - 'SSLWantWriteError': 'interp_ssl.get_error(space).w_WantWriteError', - 'SSLEOFError': 'interp_ssl.get_error(space).w_EOFError', - 'SSLSyscallError': 'interp_ssl.get_error(space).w_SyscallError', - - '_SSLSocket': 'interp_ssl.SSLSocket', - '_SSLContext': 'interp_ssl.SSLContext', - 'MemoryBIO': 'interp_ssl.MemoryBIO', - } - - if sys.platform == 'win32': - interpleveldefs['enum_certificates'] = 'interp_win32.enum_certificates_w' - interpleveldefs['enum_crls'] = 'interp_win32.enum_crls_w' - - appleveldefs = { - } - - @classmethod - def buildloaders(cls): - # init the SSL module - from pypy.module._ssl.interp_ssl import constants, HAVE_OPENSSL_RAND - - for constant, value in constants.iteritems(): - if constant.startswith('OP_'): - value = intmask(value) # Convert to C long and wrap around. - Module.interpleveldefs[constant] = "space.wrap(%r)" % (value,) - - if HAVE_OPENSSL_RAND: - Module.interpleveldefs['RAND_add'] = "interp_ssl.RAND_add" - Module.interpleveldefs['RAND_bytes'] = "interp_ssl.RAND_bytes" - Module.interpleveldefs['RAND_pseudo_bytes'] = "interp_ssl.RAND_pseudo_bytes" - Module.interpleveldefs['RAND_status'] = "interp_ssl.RAND_status" - Module.interpleveldefs['RAND_egd'] = "interp_ssl.RAND_egd" - - for name, value in ssl_data.ALERT_DESCRIPTION_CODES.items(): - Module.interpleveldefs[name] = "space.wrap(%r)" % value - - super(Module, cls).buildloaders() - - def setup_after_space_initialization(self): - """NOT_RPYTHON""" - from pypy.module._ssl.interp_ssl import PWINFO_STORAGE - PWINFO_STORAGE.clear() - - def startup(self, space): - from rpython.rlib.ropenssl import init_ssl - init_ssl() - if space.config.objspace.usemodules.thread: - from pypy.module._ssl.thread_lock import setup_ssl_threads - setup_ssl_threads() diff --git a/pypy/module/_ssl/interp_ssl.py b/pypy/module/_ssl/interp_ssl.py deleted file mode 100644 --- a/pypy/module/_ssl/interp_ssl.py +++ /dev/null @@ -1,1913 +0,0 @@ -import weakref - -from rpython.rlib import rpoll, rsocket, rthread, rweakref, rgc -from rpython.rlib.rarithmetic import intmask, widen, r_uint -from rpython.rlib.ropenssl import * -from rpython.rlib._rsocket_rffi import MAX_FD_SIZE -from rpython.rlib.rposix import get_saved_errno -from rpython.rlib.rweakref import RWeakValueDictionary -from rpython.rlib.objectmodel import specialize, compute_unique_id -from rpython.rtyper.lltypesystem import lltype, rffi - -from pypy.interpreter.baseobjspace import W_Root -from pypy.interpreter.error import OperationError, oefmt, wrap_oserror -from pypy.interpreter.gateway import interp2app, unwrap_spec -from pypy.interpreter.typedef import TypeDef, GetSetProperty -from pypy.interpreter.unicodehelper import fsdecode -from pypy.module._ssl.ssl_data import ( - LIBRARY_CODES_TO_NAMES, ERROR_CODES_TO_NAMES) -from pypy.module._socket import interp_socket -from pypy.module.exceptions import interp_exceptions - - -# user defined constants -X509_NAME_MAXLEN = 256 -# these mirror ssl.h -PY_SSL_ERROR_NONE, PY_SSL_ERROR_SSL = 0, 1 -PY_SSL_ERROR_WANT_READ, PY_SSL_ERROR_WANT_WRITE = 2, 3 -PY_SSL_ERROR_WANT_X509_LOOKUP = 4 -PY_SSL_ERROR_SYSCALL = 5 # look at error stack/return value/errno -PY_SSL_ERROR_ZERO_RETURN, PY_SSL_ERROR_WANT_CONNECT = 6, 7 -# start of non ssl.h errorcodes -PY_SSL_ERROR_EOF = 8 # special case of SSL_ERROR_SYSCALL -PY_SSL_ERROR_INVALID_ERROR_CODE = 9 - -PY_SSL_CERT_NONE, PY_SSL_CERT_OPTIONAL, PY_SSL_CERT_REQUIRED = 0, 1, 2 - -PY_SSL_CLIENT, PY_SSL_SERVER = 0, 1 - -(PY_SSL_VERSION_SSL2, PY_SSL_VERSION_SSL3, - PY_SSL_VERSION_TLS, PY_SSL_VERSION_TLS1, PY_SSL_VERSION_TLS1_1, - PY_SSL_VERSION_TLS1_2) = range(6) - - -SOCKET_IS_NONBLOCKING, SOCKET_IS_BLOCKING = 0, 1 -SOCKET_HAS_TIMED_OUT, SOCKET_HAS_BEEN_CLOSED = 2, 3 -SOCKET_TOO_LARGE_FOR_SELECT, SOCKET_OPERATION_OK = 4, 5 - -HAVE_RPOLL = 'poll' in dir(rpoll) - -constants = {} -constants["SSL_ERROR_ZERO_RETURN"] = PY_SSL_ERROR_ZERO_RETURN -constants["SSL_ERROR_WANT_READ"] = PY_SSL_ERROR_WANT_READ -constants["SSL_ERROR_WANT_WRITE"] = PY_SSL_ERROR_WANT_WRITE -constants["SSL_ERROR_WANT_X509_LOOKUP"] = PY_SSL_ERROR_WANT_X509_LOOKUP -constants["SSL_ERROR_SYSCALL"] = PY_SSL_ERROR_SYSCALL -constants["SSL_ERROR_SSL"] = PY_SSL_ERROR_SSL -constants["SSL_ERROR_WANT_CONNECT"] = PY_SSL_ERROR_WANT_CONNECT -constants["SSL_ERROR_EOF"] = PY_SSL_ERROR_EOF -constants["SSL_ERROR_INVALID_ERROR_CODE"] = PY_SSL_ERROR_INVALID_ERROR_CODE - -constants["CERT_NONE"] = PY_SSL_CERT_NONE -constants["CERT_OPTIONAL"] = PY_SSL_CERT_OPTIONAL -constants["CERT_REQUIRED"] = PY_SSL_CERT_REQUIRED - -constants["VERIFY_DEFAULT"] = 0 -constants["VERIFY_CRL_CHECK_LEAF"] = X509_V_FLAG_CRL_CHECK -constants["VERIFY_CRL_CHECK_CHAIN"] = X509_V_FLAG_CRL_CHECK|X509_V_FLAG_CRL_CHECK_ALL -constants["VERIFY_X509_STRICT"] = X509_V_FLAG_X509_STRICT - -constants["HAS_SNI"] = HAS_SNI -constants["HAS_TLS_UNIQUE"] = HAVE_OPENSSL_FINISHED -constants["HAS_ECDH"] = not OPENSSL_NO_ECDH -constants["HAS_NPN"] = HAS_NPN -constants["HAS_ALPN"] = HAS_ALPN - -constants["PROTOCOL_TLS"] = PY_SSL_VERSION_TLS -constants["PROTOCOL_SSLv23"] = PY_SSL_VERSION_TLS # Legacy name -if not OPENSSL_NO_SSL2: - constants["PROTOCOL_SSLv2"] = PY_SSL_VERSION_SSL2 -if not OPENSSL_NO_SSL3: - constants["PROTOCOL_SSLv3"] = PY_SSL_VERSION_SSL3 -constants["PROTOCOL_TLSv1"] = PY_SSL_VERSION_TLS1 -if HAVE_TLSv1_2: - constants["PROTOCOL_TLSv1_1"] = PY_SSL_VERSION_TLS1_1 - constants["OP_NO_TLSv1_1"] = SSL_OP_NO_TLSv1_1 - constants["PROTOCOL_TLSv1_2"] = PY_SSL_VERSION_TLS1_2 - constants["OP_NO_TLSv1_2"] = SSL_OP_NO_TLSv1_2 - -# protocol options -constants["OP_ALL"] = SSL_OP_ALL & ~SSL_OP_DONT_INSERT_EMPTY_FRAGMENTS -constants["OP_NO_SSLv2"] = SSL_OP_NO_SSLv2 -constants["OP_NO_SSLv3"] = SSL_OP_NO_SSLv3 -constants["OP_NO_TLSv1"] = SSL_OP_NO_TLSv1 -constants["OP_CIPHER_SERVER_PREFERENCE"] = SSL_OP_CIPHER_SERVER_PREFERENCE -constants["OP_SINGLE_DH_USE"] = SSL_OP_SINGLE_DH_USE -constants["OP_SINGLE_ECDH_USE"] = SSL_OP_SINGLE_ECDH_USE -if SSL_OP_NO_COMPRESSION is not None: - constants["OP_NO_COMPRESSION"] = SSL_OP_NO_COMPRESSION - -# OpenSSL version -constants["OPENSSL_VERSION_NUMBER"] = OPENSSL_VERSION_NUMBER -ver = OPENSSL_VERSION_NUMBER -ver, status = divmod(ver, 16) -ver, patch = divmod(ver, 256) -ver, fix = divmod(ver, 256) -ver, minor = divmod(ver, 256) -ver, major = divmod(ver, 256) -version_info = (major, minor, fix, patch, status) -constants["OPENSSL_VERSION_INFO"] = version_info -constants["_OPENSSL_API_VERSION"] = version_info -constants["OPENSSL_VERSION"] = SSLEAY_VERSION - - -def ssl_error(space, msg, errno=0, w_errtype=None, errcode=0): - reason_str = None - lib_str = None - if errcode: - err_lib = libssl_ERR_GET_LIB(errcode) - err_reason = libssl_ERR_GET_REASON(errcode) - reason_str = ERROR_CODES_TO_NAMES.get((err_lib, err_reason), None) - lib_str = LIBRARY_CODES_TO_NAMES.get(err_lib, None) - raw_msg = libssl_ERR_reason_error_string(errcode) - msg = None - if raw_msg: - msg = rffi.charp2str(raw_msg) - if not msg: - msg = "unknown error" - if reason_str and lib_str: - msg = "[%s: %s] %s" % (lib_str, reason_str, msg) - elif lib_str: - msg = "[%s] %s" % (lib_str, msg) - - w_exception_class = w_errtype or get_error(space).w_error - if errno or errcode: - w_exception = space.call_function(w_exception_class, - space.wrap(errno), space.wrap(msg)) - else: - w_exception = space.call_function(w_exception_class, space.wrap(msg)) - space.setattr(w_exception, space.wrap("reason"), - space.wrap(reason_str) if reason_str else space.w_None) - space.setattr(w_exception, space.wrap("library"), - space.wrap(lib_str) if lib_str else space.w_None) - return OperationError(w_exception_class, w_exception) - -def timeout_error(space, msg): - w_exc_class = interp_socket.get_error(space, 'timeout') - w_exc = space.call_function(w_exc_class, space.wrap(msg)) - return OperationError(w_exc_class, w_exc) - -class SSLNpnProtocols(object): - - def __init__(self, ctx, protos): - self.protos = protos - self.buf, self.bufflag = rffi.get_nonmovingbuffer(protos) - NPN_STORAGE.set(rffi.cast(lltype.Unsigned, self.buf), self) - - # set both server and client callbacks, because the context - # can be used to create both types of sockets - libssl_SSL_CTX_set_next_protos_advertised_cb( - ctx, self.advertiseNPN_cb, self.buf) - libssl_SSL_CTX_set_next_proto_select_cb( - ctx, self.selectNPN_cb, self.buf) - - def __del__(self): - rffi.free_nonmovingbuffer( - self.protos, self.buf, self.bufflag) - - @staticmethod - def advertiseNPN_cb(s, data_ptr, len_ptr, args): - npn = NPN_STORAGE.get(rffi.cast(lltype.Unsigned, args)) - if npn and npn.protos: - data_ptr[0] = npn.buf - len_ptr[0] = rffi.cast(rffi.UINT, len(npn.protos)) - else: - data_ptr[0] = lltype.nullptr(rffi.CCHARP.TO) - len_ptr[0] = rffi.cast(rffi.UINT, 0) - - return rffi.cast(rffi.INT, SSL_TLSEXT_ERR_OK) - - @staticmethod - def selectNPN_cb(s, out_ptr, outlen_ptr, server, server_len, args): - npn = NPN_STORAGE.get(rffi.cast(lltype.Unsigned, args)) - if npn and npn.protos: - client = npn.buf - client_len = len(npn.protos) - else: - client = lltype.nullptr(rffi.CCHARP.TO) - client_len = 0 - - libssl_SSL_select_next_proto(out_ptr, outlen_ptr, - server, server_len, - client, client_len) - return rffi.cast(rffi.INT, SSL_TLSEXT_ERR_OK) - - -class SSLAlpnProtocols(object): - - def __init__(self, ctx, protos): - self.protos = protos - self.buf, self.bufflag = rffi.get_nonmovingbuffer(protos) - ALPN_STORAGE.set(rffi.cast(lltype.Unsigned, self.buf), self) - - with rffi.scoped_str2charp(protos) as protos_buf: - if libssl_SSL_CTX_set_alpn_protos( - ctx, rffi.cast(rffi.UCHARP, protos_buf), len(protos)): - raise MemoryError - libssl_SSL_CTX_set_alpn_select_cb( - ctx, self.selectALPN_cb, self.buf) - - def __del__(self): - rffi.free_nonmovingbuffer( - self.protos, self.buf, self.bufflag) - - @staticmethod - def selectALPN_cb(s, out_ptr, outlen_ptr, client, client_len, args): - alpn = ALPN_STORAGE.get(rffi.cast(lltype.Unsigned, args)) - if alpn and alpn.protos: - server = alpn.buf - server_len = len(alpn.protos) - else: - server = lltype.nullptr(rffi.CCHARP.TO) - server_len = 0 - - ret = libssl_SSL_select_next_proto(out_ptr, outlen_ptr, - server, server_len, - client, client_len) - if ret != OPENSSL_NPN_NEGOTIATED: - return rffi.cast(rffi.INT, SSL_TLSEXT_ERR_NOACK) - return rffi.cast(rffi.INT, SSL_TLSEXT_ERR_OK) - - -NPN_STORAGE = RWeakValueDictionary(r_uint, SSLNpnProtocols) -ALPN_STORAGE = RWeakValueDictionary(r_uint, SSLAlpnProtocols) - -SOCKET_STORAGE = RWeakValueDictionary(int, W_Root) - - -if HAVE_OPENSSL_RAND: - # helper routines for seeding the SSL PRNG - @unwrap_spec(string=str, entropy=float) - def RAND_add(space, string, entropy): - """RAND_add(string, entropy) - - - Mix string into the OpenSSL PRNG state. entropy (a float) is a lower - bound on the entropy contained in string.""" - with rffi.scoped_nonmovingbuffer(string) as buf: - libssl_RAND_add(buf, len(string), entropy) - - def _RAND_bytes(space, n, pseudo): - if n < 0: - raise oefmt(space.w_ValueError, "num must be positive") - - with rffi.scoped_alloc_buffer(n) as buf: - if pseudo: - ok = libssl_RAND_pseudo_bytes( - rffi.cast(rffi.UCHARP, buf.raw), n) - if ok == 0 or ok == 1: - return space.newtuple([ - space.newbytes(buf.str(n)), - space.wrap(ok == 1), - ]) - else: - ok = libssl_RAND_bytes( - rffi.cast(rffi.UCHARP, buf.raw), n) - if ok == 1: - return space.newbytes(buf.str(n)) - - raise ssl_error(space, "", errcode=libssl_ERR_get_error()) - - @unwrap_spec(n=int) - def RAND_bytes(space, n): - """RAND_bytes(n) -> bytes - - Generate n cryptographically strong pseudo-random bytes.""" - return _RAND_bytes(space, n, pseudo=False) - - @unwrap_spec(n=int) - def RAND_pseudo_bytes(space, n): - """RAND_pseudo_bytes(n) -> (bytes, is_cryptographic) - - Generate n pseudo-random bytes. is_cryptographic is True if the bytes - generated are cryptographically strong.""" - return _RAND_bytes(space, n, pseudo=True) - - def RAND_status(space): - """RAND_status() -> 0 or 1 - - Returns 1 if the OpenSSL PRNG has been seeded with enough data - and 0 if not. It is necessary to seed the PRNG with RAND_add() - on some platforms before using the ssl() function.""" - - res = libssl_RAND_status() - return space.wrap(res) - - if HAVE_OPENSSL_RAND_EGD: - @unwrap_spec(path=str) - def RAND_egd(space, path): - """RAND_egd(path) -> bytes - - Queries the entropy gather daemon (EGD) on socket path. Returns number - of bytes read. Raises socket.sslerror if connection to EGD fails or - if it does provide enough data to seed PRNG.""" - with rffi.scoped_str2charp(path) as socket_path: - bytes = libssl_RAND_egd(socket_path) - if bytes == -1: - raise ssl_error(space, - "EGD connection failed or EGD did not return " - "enough data to seed the PRNG") - return space.wrap(bytes) - else: - # Dummy func for platforms missing RAND_egd(). Most likely LibreSSL. - @unwrap_spec(path=str) - def RAND_egd(space, path): - raise ssl_error(space, "RAND_egd unavailable") - - -class SSLSocket(W_Root): - def __init__(self, space, w_ctx): - self.w_ctx = w_ctx - self.w_socket = None - self.w_owner = None - self.ssl = lltype.nullptr(SSL.TO) - self.peer_cert = lltype.nullptr(X509.TO) - self.shutdown_seen_zero = False - self.handshake_done = False - self.register_finalizer(space) - - def _finalize_(self): - peer_cert = self.peer_cert - if peer_cert: - self.peer_cert = lltype.nullptr(X509.TO) - libssl_X509_free(peer_cert) - ssl = self.ssl - if ssl: - self.ssl = lltype.nullptr(SSL.TO) - libssl_SSL_free(ssl) - - @unwrap_spec(data='bufferstr') - def write(self, space, data): - """write(s) -> len - - Writes the string s into the SSL object. Returns the number - of bytes written.""" - w_socket = self._get_socket(space) - - sockstate = checkwait(space, w_socket, True) - if sockstate == SOCKET_HAS_TIMED_OUT: - raise timeout_error(space, "The write operation timed out") - elif sockstate == SOCKET_HAS_BEEN_CLOSED: - raise ssl_error(space, "Underlying socket has been closed.") - elif sockstate == SOCKET_TOO_LARGE_FOR_SELECT: - raise ssl_error(space, "Underlying socket too large for select().") - - num_bytes = 0 - while True: - err = 0 - - num_bytes = libssl_SSL_write(self.ssl, data, len(data)) - err = libssl_SSL_get_error(self.ssl, num_bytes) - - if err == SSL_ERROR_WANT_READ: - sockstate = checkwait(space, w_socket, False) - elif err == SSL_ERROR_WANT_WRITE: - sockstate = checkwait(space, w_socket, True) - else: - sockstate = SOCKET_OPERATION_OK - - if sockstate == SOCKET_HAS_TIMED_OUT: - raise timeout_error(space, "The write operation timed out") - elif sockstate == SOCKET_HAS_BEEN_CLOSED: - raise ssl_error(space, "Underlying socket has been closed.") - elif sockstate == SOCKET_IS_NONBLOCKING: - break - - if err == SSL_ERROR_WANT_READ or err == SSL_ERROR_WANT_WRITE: - continue - else: - break - - if num_bytes > 0: - return space.wrap(num_bytes) - else: - raise _ssl_seterror(space, self, num_bytes) - - def pending(self, space): - """pending() -> count - - Returns the number of already decrypted bytes available for read, - pending on the connection.""" - count = libssl_SSL_pending(self.ssl) - if count < 0: - raise _ssl_seterror(space, self, count) - return space.wrap(count) - - @unwrap_spec(num_bytes=int) - def read(self, space, num_bytes, w_buffer=None): - """read([len]) -> string - - Read up to len bytes from the SSL socket.""" - w_socket = self._get_socket(space) - - count = libssl_SSL_pending(self.ssl) - if not count: - sockstate = checkwait(space, w_socket, False) - if sockstate == SOCKET_HAS_TIMED_OUT: - raise timeout_error(space, "The read operation timed out") - elif sockstate == SOCKET_TOO_LARGE_FOR_SELECT: - raise ssl_error(space, - "Underlying socket too large for select().") - elif sockstate == SOCKET_HAS_BEEN_CLOSED: - if libssl_SSL_get_shutdown(self.ssl) == SSL_RECEIVED_SHUTDOWN: - if space.is_none(w_buffer): - return space.newbytes('') - else: - return space.wrap(0) - raise ssl_error(space, - "Socket closed without SSL shutdown handshake") - - if w_buffer: - rwbuffer = space.getarg_w('w*', w_buffer) - buflen = rwbuffer.getlength() - if not 0 < num_bytes <= buflen: - num_bytes = buflen - else: - if num_bytes < 0: - raise oefmt(space.w_ValueError, "size should not be negative") - rwbuffer = None - - with rffi.scoped_alloc_buffer(num_bytes) as buf: - while True: - err = 0 - - count = libssl_SSL_read(self.ssl, buf.raw, num_bytes) - err = libssl_SSL_get_error(self.ssl, count) - - if err == SSL_ERROR_WANT_READ: - sockstate = checkwait(space, w_socket, False) - elif err == SSL_ERROR_WANT_WRITE: - sockstate = checkwait(space, w_socket, True) - elif (err == SSL_ERROR_ZERO_RETURN and - libssl_SSL_get_shutdown(self.ssl) == SSL_RECEIVED_SHUTDOWN): - if space.is_none(w_buffer): - return space.newbytes('') - else: - return space.wrap(0) - else: - sockstate = SOCKET_OPERATION_OK - - if sockstate == SOCKET_HAS_TIMED_OUT: - raise timeout_error(space, "The read operation timed out") - elif sockstate == SOCKET_IS_NONBLOCKING: - break - - if err == SSL_ERROR_WANT_READ or err == SSL_ERROR_WANT_WRITE: - continue - else: - break - - if count <= 0: - raise _ssl_seterror(space, self, count) - - result = buf.str(count) - - if rwbuffer is not None: - rwbuffer.setslice(0, result) - return space.wrap(count) - else: - return space.newbytes(result) - - def _get_socket(self, space): - w_socket = self.w_socket() - if w_socket is None: - raise ssl_error(space, "Underlying socket connection gone") - - # just in case the blocking state of the socket has been changed - w_timeout = space.call_method(w_socket, "gettimeout") - nonblocking = not space.is_w(w_timeout, space.w_None) - libssl_BIO_set_nbio(libssl_SSL_get_rbio(self.ssl), nonblocking) - libssl_BIO_set_nbio(libssl_SSL_get_wbio(self.ssl), nonblocking) - - return w_socket - - def do_handshake(self, space): - w_socket = self._get_socket(space) - - # Actually negotiate SSL connection - # XXX If SSL_do_handshake() returns 0, it's also a failure. - while True: - ret = libssl_SSL_do_handshake(self.ssl) - err = libssl_SSL_get_error(self.ssl, ret) - # XXX PyErr_CheckSignals() - if err == SSL_ERROR_WANT_READ: - sockstate = checkwait(space, w_socket, False) - elif err == SSL_ERROR_WANT_WRITE: - sockstate = checkwait(space, w_socket, True) - else: - sockstate = SOCKET_OPERATION_OK - if sockstate == SOCKET_HAS_TIMED_OUT: - raise timeout_error(space, "The handshake operation timed out") - elif sockstate == SOCKET_HAS_BEEN_CLOSED: - raise ssl_error(space, "Underlying socket has been closed.") - elif sockstate == SOCKET_TOO_LARGE_FOR_SELECT: - raise ssl_error(space, - "Underlying socket too large for select().") - elif sockstate == SOCKET_IS_NONBLOCKING: - break - - if err == SSL_ERROR_WANT_READ or err == SSL_ERROR_WANT_WRITE: - continue - else: - break - - if ret <= 0: - raise _ssl_seterror(space, self, ret) - - if self.peer_cert: - libssl_X509_free(self.peer_cert) - self.peer_cert = libssl_SSL_get_peer_certificate(self.ssl) - self.handshake_done = True - - def shutdown(self, space): - w_socket = self._get_socket(space) - - # Guard against closed socket - w_fileno = space.call_method(w_socket, "fileno") - if space.int_w(w_fileno) < 0: - raise ssl_error(space, "Underlying socket has been closed") - - zeros = 0 - while True: - # Disable read-ahead so that unwrap can work correctly. - # Otherwise OpenSSL might read in too much data, - # eating clear text data that happens to be - # transmitted after the SSL shutdown. - # Should be safe to call repeatedly everytime this - # function is used and the shutdown_seen_zero != 0 - # condition is met. - if self.shutdown_seen_zero: - libssl_SSL_set_read_ahead(self.ssl, 0) - ret = libssl_SSL_shutdown(self.ssl) - - # if err == 1, a secure shutdown with SSL_shutdown() is complete - if ret > 0: - break - if ret == 0: - # Don't loop endlessly; instead preserve legacy - # behaviour of trying SSL_shutdown() only twice. - # This looks necessary for OpenSSL < 0.9.8m - zeros += 1 - if zeros > 1: - break - # Shutdown was sent, now try receiving - self.shutdown_seen_zero = True - continue - - # Possibly retry shutdown until timeout or failure - ssl_err = libssl_SSL_get_error(self.ssl, ret) - if ssl_err == SSL_ERROR_WANT_READ: - sockstate = checkwait(space, w_socket, False) - elif ssl_err == SSL_ERROR_WANT_WRITE: - sockstate = checkwait(space, w_socket, True) - else: - break - - if sockstate == SOCKET_HAS_TIMED_OUT: - if ssl_err == SSL_ERROR_WANT_READ: - raise timeout_error(space, "The read operation timed out") - else: - raise timeout_error(space, "The write operation timed out") - elif sockstate == SOCKET_TOO_LARGE_FOR_SELECT: - raise ssl_error(space, - "Underlying socket too large for select().") - elif sockstate != SOCKET_OPERATION_OK: - # Retain the SSL error code - break - - if ret < 0: - raise _ssl_seterror(space, self, ret) - - return w_socket - - def cipher(self, space): - if not self.ssl: - return space.w_None - current = libssl_SSL_get_current_cipher(self.ssl) - if not current: - return space.w_None - - name = libssl_SSL_CIPHER_get_name(current) - w_name = space.wrap(rffi.charp2str(name)) if name else space.w_None - - proto = libssl_SSL_CIPHER_get_version(current) - w_proto = space.wrap(rffi.charp2str(proto)) if proto else space.w_None - - bits = libssl_SSL_CIPHER_get_bits(current, - lltype.nullptr(rffi.INTP.TO)) - w_bits = space.newint(bits) - return space.newtuple([w_name, w_proto, w_bits]) - - @unwrap_spec(der=bool) - def peer_certificate(self, space, der=False): - """peer_certificate([der=False]) -> certificate - - Returns the certificate for the peer. If no certificate was - provided, returns None. If a certificate was provided, but not - validated, returns an empty dictionary. Otherwise returns a - dict containing information about the peer certificate. - - If the optional argument is True, returns a DER-encoded copy of - the peer certificate, or None if no certificate was provided. - This will return the certificate even if it wasn't validated. - """ - if not self.handshake_done: - raise oefmt(space.w_ValueError, "hanshake not done yet") - if not self.peer_cert: - return space.w_None - - if der: - # return cert in DER-encoded format - return _certificate_to_der(space, self.peer_cert) - else: - verification = libssl_SSL_CTX_get_verify_mode( - libssl_SSL_get_SSL_CTX(self.ssl)) - if not verification & SSL_VERIFY_PEER: - return space.newdict() - else: - return _decode_certificate(space, self.peer_cert) - - def selected_npn_protocol(self, space): - if not HAS_NPN: - raise oefmt(space.w_NotImplementedError, - "The NPN extension requires OpenSSL 1.0.1 or later.") - with lltype.scoped_alloc(rffi.CCHARPP.TO, 1) as out_ptr: - with lltype.scoped_alloc(rffi.UINTP.TO, 1) as len_ptr: - libssl_SSL_get0_next_proto_negotiated(self.ssl, - out_ptr, len_ptr) - if out_ptr[0]: - return space.wrap( - rffi.charpsize2str(out_ptr[0], intmask(len_ptr[0]))) - - def selected_alpn_protocol(self, space): - if not HAS_ALPN: - raise oefmt(space.w_NotImplementedError, - "The ALPN extension requires OpenSSL 1.0.2 or later.") - with lltype.scoped_alloc(rffi.CCHARPP.TO, 1) as out_ptr: - with lltype.scoped_alloc(rffi.UINTP.TO, 1) as len_ptr: - libssl_SSL_get0_alpn_selected(self.ssl, - out_ptr, len_ptr) - if out_ptr[0]: - return space.wrap( - rffi.charpsize2str(out_ptr[0], intmask(len_ptr[0]))) - - def compression_w(self, space): - if not self.ssl: - return space.w_None - comp_method = libssl_SSL_get_current_compression(self.ssl) - if not comp_method: - return space.w_None - method_type = intmask(libssl_COMP_get_type(comp_method)) - if method_type == NID_undef: - return space.w_None - short_name = libssl_COMP_get_name(comp_method) - if not short_name: - return space.w_None - return space.wrap(rffi.charp2str(short_name)) - - def version_w(self, space): - if not self.ssl: - return space.w_None - version = libssl_SSL_get_version(self.ssl) - if not version: - return space.w_None - return space.wrap(rffi.charp2str(version)) - - def tls_unique_cb_w(self, space): - """Returns the 'tls-unique' channel binding data, as defined by RFC 5929. - If the TLS handshake is not yet complete, None is returned""" - - # In case of 'tls-unique' it will be 12 bytes for TLS, 36 - # bytes for older SSL, but let's be safe - CB_MAXLEN = 128 - - with lltype.scoped_alloc(rffi.CCHARP.TO, CB_MAXLEN) as buf: - if (libssl_SSL_session_reused(self.ssl) ^ - (self.socket_type == PY_SSL_CLIENT)): - # if session is resumed XOR we are the client - length = libssl_SSL_get_finished(self.ssl, buf, CB_MAXLEN) - else: - # if a new session XOR we are the server - length = libssl_SSL_get_peer_finished(self.ssl, buf, CB_MAXLEN) - - if length > 0: - return space.newbytes(rffi.charpsize2str(buf, intmask(length))) - - def descr_get_context(self, space): - return self.w_ctx - - def descr_set_context(self, space, w_ctx): - ctx = space.interp_w(SSLContext, w_ctx) - if not HAS_SNI: - raise oefmt(space.w_NotImplementedError, - "setting a socket's context " - "is not supported by your OpenSSL library") - self.w_ctx = w_ctx - libssl_SSL_set_SSL_CTX(self.ssl, ctx.ctx) - - def descr_get_owner(self, space): - if self.w_owner is not None: - w_owner = self.w_owner() - if w_owner: - return w_owner - return space.w_None - - def descr_set_owner(self, space, w_owner): - assert w_owner is not None - self.w_owner = weakref.ref(w_owner) - - -SSLSocket.typedef = TypeDef("_ssl._SSLSocket", - write = interp2app(SSLSocket.write), - pending = interp2app(SSLSocket.pending), - read = interp2app(SSLSocket.read), - do_handshake = interp2app(SSLSocket.do_handshake), - shutdown = interp2app(SSLSocket.shutdown), - cipher = interp2app(SSLSocket.cipher), - peer_certificate = interp2app(SSLSocket.peer_certificate), - selected_npn_protocol = interp2app(SSLSocket.selected_npn_protocol), - selected_alpn_protocol = interp2app(SSLSocket.selected_alpn_protocol), - compression = interp2app(SSLSocket.compression_w), - version = interp2app(SSLSocket.version_w), - tls_unique_cb = interp2app(SSLSocket.tls_unique_cb_w), - context=GetSetProperty(SSLSocket.descr_get_context, - SSLSocket.descr_set_context), - owner=GetSetProperty(SSLSocket.descr_get_owner, - SSLSocket.descr_set_owner), -) - -def _certificate_to_der(space, certificate): - with lltype.scoped_alloc(rffi.CCHARPP.TO, 1) as buf_ptr: - buf_ptr[0] = lltype.nullptr(rffi.CCHARP.TO) - length = libssl_i2d_X509(certificate, buf_ptr) - if length < 0: - raise _ssl_seterror(space, None, 0) - try: - return space.newbytes(rffi.charpsize2str(buf_ptr[0], length)) - finally: - libssl_OPENSSL_free(buf_ptr[0]) - -def _decode_certificate(space, certificate): - w_retval = space.newdict() - - w_peer = _create_tuple_for_X509_NAME( - space, libssl_X509_get_subject_name(certificate)) - space.setitem(w_retval, space.wrap("subject"), w_peer) - - w_issuer = _create_tuple_for_X509_NAME( - space, libssl_X509_get_issuer_name(certificate)) - space.setitem(w_retval, space.wrap("issuer"), w_issuer) - - space.setitem(w_retval, space.wrap("version"), - space.wrap(libssl_X509_get_version(certificate) + 1)) - - biobuf = libssl_BIO_new(libssl_BIO_s_mem()) - try: - - libssl_BIO_reset(biobuf) - serialNumber = libssl_X509_get_serialNumber(certificate) - libssl_i2a_ASN1_INTEGER(biobuf, serialNumber) - # should not exceed 20 octets, 160 bits, so buf is big enough - with lltype.scoped_alloc(rffi.CCHARP.TO, 100) as buf: - length = libssl_BIO_gets(biobuf, buf, 99) - if length < 0: - raise _ssl_seterror(space, None, length) - - w_serial = space.wrap(rffi.charpsize2str(buf, length)) - space.setitem(w_retval, space.wrap("serialNumber"), w_serial) - - libssl_BIO_reset(biobuf) - notBefore = libssl_X509_get_notBefore(certificate) - libssl_ASN1_TIME_print(biobuf, notBefore) - with lltype.scoped_alloc(rffi.CCHARP.TO, 100) as buf: - length = libssl_BIO_gets(biobuf, buf, 99) - if length < 0: - raise _ssl_seterror(space, None, length) - w_date = space.wrap(rffi.charpsize2str(buf, length)) - space.setitem(w_retval, space.wrap("notBefore"), w_date) - - libssl_BIO_reset(biobuf) - notAfter = libssl_X509_get_notAfter(certificate) - libssl_ASN1_TIME_print(biobuf, notAfter) - with lltype.scoped_alloc(rffi.CCHARP.TO, 100) as buf: - length = libssl_BIO_gets(biobuf, buf, 99) - if length < 0: - raise _ssl_seterror(space, None, length) - w_date = space.wrap(rffi.charpsize2str(buf, length)) - space.setitem(w_retval, space.wrap("notAfter"), w_date) - finally: - libssl_BIO_free(biobuf) - - # Now look for subjectAltName - w_alt_names = _get_peer_alt_names(space, certificate) - if w_alt_names is not space.w_None: - space.setitem(w_retval, space.wrap("subjectAltName"), w_alt_names) - - # Authority Information Access: OCSP URIs - w_ocsp = _get_aia_uri(space, certificate, NID_ad_OCSP) - if not space.is_none(w_ocsp): - space.setitem(w_retval, space.wrap("OCSP"), w_ocsp) - w_issuers = _get_aia_uri(space, certificate, NID_ad_ca_issuers) - if not space.is_none(w_issuers): - space.setitem(w_retval, space.wrap("caIssuers"), w_issuers) - - # CDP (CRL distribution points) - w_cdp = _get_crl_dp(space, certificate) - if not space.is_none(w_cdp): - space.setitem(w_retval, space.wrap("crlDistributionPoints"), w_cdp) - - return w_retval - - -def _create_tuple_for_X509_NAME(space, xname): - entry_count = libssl_X509_NAME_entry_count(xname) - dn_w = [] - rdn_w = [] - rdn_level = -1 - for index in range(entry_count): - entry = libssl_X509_NAME_get_entry(xname, index) - # check to see if we've gotten to a new RDN - entry_level = intmask(libssl_X509_NAME_ENTRY_set(entry)) - if rdn_level >= 0: - if rdn_level != entry_level: - # yes, new RDN - # add old RDN to DN - dn_w.append(space.newtuple(list(rdn_w))) - rdn_w = [] - rdn_level = entry_level - - # Now add this attribute to the current RDN - name = libssl_X509_NAME_ENTRY_get_object(entry) - value = libssl_X509_NAME_ENTRY_get_data(entry) - attr = _create_tuple_for_attribute(space, name, value) - rdn_w.append(attr) - - # Now, there is typically a dangling RDN - if rdn_w: - dn_w.append(space.newtuple(list(rdn_w))) - return space.newtuple(list(dn_w)) - - -def _get_peer_alt_names(space, certificate): - # this code follows the procedure outlined in - # OpenSSL's crypto/x509v3/v3_prn.c:X509v3_EXT_print() - # function to extract the STACK_OF(GENERAL_NAME), - # then iterates through the stack to add the - # names. - - if not certificate: - return space.w_None - - # get a memory buffer - biobuf = libssl_BIO_new(libssl_BIO_s_mem()) - - try: - alt_names_w = [] - i = -1 - while True: - i = libssl_X509_get_ext_by_NID( - certificate, NID_subject_alt_name, i) - if i < 0: - break - - # now decode the altName - ext = libssl_X509_get_ext(certificate, i) - method = libssl_X509V3_EXT_get(ext) - if not method: - raise ssl_error(space, - "No method for internalizing subjectAltName!'") - - with lltype.scoped_alloc(rffi.CCHARPP.TO, 1) as p_ptr: - ext_value = libssl_X509_EXTENSION_get_data(ext) - p_ptr[0] = ext_value.c_data - length = intmask(ext_value.c_length) - null = lltype.nullptr(rffi.VOIDP.TO) - if method[0].c_it: - names = rffi.cast(GENERAL_NAMES, libssl_ASN1_item_d2i( - null, p_ptr, length, - libssl_ASN1_ITEM_ptr(method[0].c_it))) - else: - names = rffi.cast(GENERAL_NAMES, method[0].c_d2i( - null, p_ptr, length)) - - try: - for j in range(libssl_sk_GENERAL_NAME_num(names)): - # Get a rendering of each name in the set of names - - name = libssl_sk_GENERAL_NAME_value(names, j) - gntype = intmask(name.c_type) - if gntype == GEN_DIRNAME: - # we special-case DirName as a tuple of tuples of - # attributes - dirname = libssl_pypy_GENERAL_NAME_dirn(name) - w_t = space.newtuple([ - space.wrap("DirName"), - _create_tuple_for_X509_NAME(space, dirname) - ]) - elif gntype in (GEN_EMAIL, GEN_DNS, GEN_URI): - # GENERAL_NAME_print() doesn't handle NULL bytes in - # ASN1_string correctly, CVE-2013-4238 - if gntype == GEN_EMAIL: - v = space.wrap("email") - elif gntype == GEN_DNS: - v = space.wrap("DNS") - elif gntype == GEN_URI: - v = space.wrap("URI") - else: - assert False - as_ = libssl_pypy_GENERAL_NAME_dirn(name) - as_ = rffi.cast(ASN1_STRING, as_) - buf = libssl_ASN1_STRING_data(as_) - length = libssl_ASN1_STRING_length(as_) - w_t = space.newtuple([ - v, space.wrap(rffi.charpsize2str(buf, length))]) - else: - # for everything else, we use the OpenSSL print form - if gntype not in (GEN_OTHERNAME, GEN_X400, GEN_EDIPARTY, - GEN_IPADD, GEN_RID): - space.warn(space.wrap("Unknown general name type"), - space.w_RuntimeWarning) - libssl_BIO_reset(biobuf) - libssl_GENERAL_NAME_print(biobuf, name) - with lltype.scoped_alloc(rffi.CCHARP.TO, 2048) as buf: - length = libssl_BIO_gets(biobuf, buf, 2047) - if length < 0: - raise _ssl_seterror(space, None, 0) - - v = rffi.charpsize2str(buf, length) - v1, v2 = v.split(':', 1) - w_t = space.newtuple([space.wrap(v1), - space.wrap(v2)]) - - alt_names_w.append(w_t) - finally: - libssl_pypy_GENERAL_NAME_pop_free(names) - finally: - libssl_BIO_free(biobuf) - - if alt_names_w: - return space.newtuple(list(alt_names_w)) - else: - return space.w_None - - -def _create_tuple_for_attribute(space, name, value): - with lltype.scoped_alloc(rffi.CCHARP.TO, X509_NAME_MAXLEN) as buf: - length = libssl_OBJ_obj2txt(buf, X509_NAME_MAXLEN, name, 0) - if length < 0: - raise _ssl_seterror(space, None, 0) - w_name = space.wrap(rffi.charpsize2str(buf, length)) - - with lltype.scoped_alloc(rffi.CCHARPP.TO, 1) as buf_ptr: - length = libssl_ASN1_STRING_to_UTF8(buf_ptr, value) - if length < 0: - raise _ssl_seterror(space, None, 0) - try: - w_value = space.newbytes(rffi.charpsize2str(buf_ptr[0], length)) - w_value = space.call_method(w_value, "decode", space.wrap("utf-8")) - finally: - libssl_OPENSSL_free(buf_ptr[0]) - - return space.newtuple([w_name, w_value]) - - -def _get_aia_uri(space, certificate, nid): - info = rffi.cast(AUTHORITY_INFO_ACCESS, libssl_X509_get_ext_d2i( - certificate, NID_info_access, None, None)) - try: - if not info or libssl_sk_ACCESS_DESCRIPTION_num(info) == 0: - return - - result_w = [] - for i in range(libssl_sk_ACCESS_DESCRIPTION_num(info)): - ad = libssl_sk_ACCESS_DESCRIPTION_value(info, i) - if libssl_OBJ_obj2nid(ad[0].c_method) != nid: - continue - - name = ad[0].c_location - gntype = intmask(name.c_type) - if gntype != GEN_URI: - continue - uri = libssl_pypy_GENERAL_NAME_uri(name) - length = intmask(uri.c_length) - s_uri = rffi.charpsize2str(uri.c_data, length) - result_w.append(space.wrap(s_uri)) - return space.newtuple(result_w[:]) - finally: - libssl_AUTHORITY_INFO_ACCESS_free(info) - -def _get_crl_dp(space, certificate): - if OPENSSL_VERSION_NUMBER >= 0x10001000: - # Calls x509v3_cache_extensions and sets up crldp - libssl_X509_check_ca(certificate) - dps = rffi.cast(stack_st_DIST_POINT, libssl_X509_get_ext_d2i( - certificate, NID_crl_distribution_points, None, None)) - if not dps: - return None - - try: - cdp_w = [] - for i in range(libssl_sk_DIST_POINT_num(dps)): - dp = libssl_sk_DIST_POINT_value(dps, i) - gns = libssl_pypy_DIST_POINT_fullname(dp) - - for j in range(libssl_sk_GENERAL_NAME_num(gns)): - name = libssl_sk_GENERAL_NAME_value(gns, j) - gntype = intmask(name.c_type) - if gntype != GEN_URI: - continue - uri = libssl_pypy_GENERAL_NAME_uri(name) - length = intmask(uri.c_length) - s_uri = rffi.charpsize2str(uri.c_data, length) - cdp_w.append(space.wrap(s_uri)) - finally: - if OPENSSL_VERSION_NUMBER < 0x10001000: - libssl_sk_DIST_POINT_free(dps) - return space.newtuple(cdp_w[:]) - -def new_sslobject(space, w_ctx, w_sock, side, server_hostname): - ss = SSLSocket(space, w_ctx) - - sock_fd = space.int_w(space.call_method(w_sock, "fileno")) - w_timeout = space.call_method(w_sock, "gettimeout") - has_timeout = not space.is_none(w_timeout) - - ss.ssl = libssl_SSL_new(w_ctx.ctx) # new ssl struct - libssl_SSL_set_fd(ss.ssl, sock_fd) # set the socket for SSL - # The ACCEPT_MOVING_WRITE_BUFFER flag is necessary because the address - # of a str object may be changed by the garbage collector. - libssl_SSL_set_mode( - ss.ssl, SSL_MODE_AUTO_RETRY | SSL_MODE_ACCEPT_MOVING_WRITE_BUFFER) - - if server_hostname: - libssl_SSL_set_tlsext_host_name(ss.ssl, server_hostname); - - # If the socket is in non-blocking mode or timeout mode, set the BIO - # to non-blocking mode (blocking is the default) - if has_timeout: - # Set both the read and write BIO's to non-blocking mode - libssl_BIO_set_nbio(libssl_SSL_get_rbio(ss.ssl), 1) - libssl_BIO_set_nbio(libssl_SSL_get_wbio(ss.ssl), 1) - - if side == PY_SSL_CLIENT: - libssl_SSL_set_connect_state(ss.ssl) - else: - libssl_SSL_set_accept_state(ss.ssl) - ss.socket_type = side - - ss.w_socket = weakref.ref(w_sock) - return ss - -def checkwait(space, w_sock, writing): - """If the socket has a timeout, do a select()/poll() on the socket. - The argument writing indicates the direction. - Returns one of the possibilities in the timeout_state enum (above).""" - - w_timeout = space.call_method(w_sock, "gettimeout") - if space.is_w(w_timeout, space.w_None): - return SOCKET_IS_BLOCKING - elif space.float_w(w_timeout) == 0.0: - return SOCKET_IS_NONBLOCKING - sock_timeout = space.float_w(w_timeout) - - sock_fd = space.int_w(space.call_method(w_sock, "fileno")) - - # guard against closed socket - if sock_fd < 0: - return SOCKET_HAS_BEEN_CLOSED - - # see if the socket is ready - - # Prefer poll, if available, since you can poll() any fd - # which can't be done with select(). - if HAVE_RPOLL: - if writing: - fddict = {sock_fd: rpoll.POLLOUT} - else: - fddict = {sock_fd: rpoll.POLLIN} - - # socket's timeout is in seconds, poll's timeout in ms - timeout = int(sock_timeout * 1000 + 0.5) - try: - ready = rpoll.poll(fddict, timeout) - except rpoll.PollError as e: - message = e.get_msg() - raise ssl_error(space, message, e.errno) - else: - if MAX_FD_SIZE is not None and sock_fd >= MAX_FD_SIZE: - return SOCKET_TOO_LARGE_FOR_SELECT - - try: - if writing: - r, w, e = rpoll.select([], [sock_fd], [], sock_timeout) - ready = w - else: - r, w, e = rpoll.select([sock_fd], [], [], sock_timeout) - ready = r - except rpoll.SelectError as e: - message = e.get_msg() - raise ssl_error(space, message, e.errno) - if ready: - return SOCKET_OPERATION_OK - else: - return SOCKET_HAS_TIMED_OUT - - -def _ssl_seterror(space, ss, ret): - assert ret <= 0 - - errcode = libssl_ERR_peek_last_error() - - if ss is None: - return ssl_error(space, None, errcode=errcode) - elif ss.ssl: - err = libssl_SSL_get_error(ss.ssl, ret) - else: - err = SSL_ERROR_SSL - w_errtype = None - errstr = "" - errval = 0 - - if err == SSL_ERROR_ZERO_RETURN: - w_errtype = get_error(space).w_ZeroReturnError - errstr = "TLS/SSL connection has been closed" - errval = PY_SSL_ERROR_ZERO_RETURN - elif err == SSL_ERROR_WANT_READ: - w_errtype = get_error(space).w_WantReadError - errstr = "The operation did not complete (read)" - errval = PY_SSL_ERROR_WANT_READ - elif err == SSL_ERROR_WANT_WRITE: - w_errtype = get_error(space).w_WantWriteError - errstr = "The operation did not complete (write)" - errval = PY_SSL_ERROR_WANT_WRITE - elif err == SSL_ERROR_WANT_X509_LOOKUP: - errstr = "The operation did not complete (X509 lookup)" - errval = PY_SSL_ERROR_WANT_X509_LOOKUP - elif err == SSL_ERROR_WANT_CONNECT: - errstr = "The operation did not complete (connect)" - errval = PY_SSL_ERROR_WANT_CONNECT - elif err == SSL_ERROR_SYSCALL: - e = libssl_ERR_get_error() - if e == 0: - if ret == 0 or ss.w_socket() is None: - w_errtype = get_error(space).w_EOFError - errstr = "EOF occurred in violation of protocol" - errval = PY_SSL_ERROR_EOF - elif ret == -1: - # the underlying BIO reported an I/0 error - error = rsocket.last_error() - return interp_socket.converted_error(space, error) - else: - w_errtype = get_error(space).w_SyscallError - errstr = "Some I/O error occurred" - errval = PY_SSL_ERROR_SYSCALL - else: - errstr = rffi.charp2str(libssl_ERR_error_string(e, None)) - errval = PY_SSL_ERROR_SYSCALL - elif err == SSL_ERROR_SSL: - errval = PY_SSL_ERROR_SSL - if errcode != 0: - errstr = rffi.charp2str(libssl_ERR_error_string(errcode, None)) - else: - errstr = "A failure in the SSL library occurred" - else: - errstr = "Invalid error code" - errval = PY_SSL_ERROR_INVALID_ERROR_CODE - - return ssl_error(space, errstr, errval, w_errtype=w_errtype, - errcode=errcode) - -def SSLError_descr_str(space, w_exc): - w_strerror = space.getattr(w_exc, space.wrap("strerror")) - if not space.is_none(w_strerror): - return w_strerror - return space.str(space.getattr(w_exc, space.wrap("args"))) - - -class ErrorCache: - def __init__(self, space): - w_socketerror = interp_socket.get_error(space, "error") - self.w_error = space.new_exception_class( - "_ssl.SSLError", w_socketerror) - space.setattr(self.w_error, space.wrap('__str__'), - space.wrap(interp2app(SSLError_descr_str))) - self.w_ZeroReturnError = space.new_exception_class( - "ssl.SSLZeroReturnError", self.w_error) - self.w_WantReadError = space.new_exception_class( - "ssl.SSLWantReadError", self.w_error) - self.w_WantWriteError = space.new_exception_class( - "ssl.SSLWantWriteError", self.w_error) - self.w_EOFError = space.new_exception_class( - "ssl.SSLEOFError", self.w_error) - self.w_SyscallError = space.new_exception_class( - "ssl.SSLSyscallError", self.w_error) - -def get_error(space): - return space.fromcache(ErrorCache) - - -@unwrap_spec(filename=str) -def _test_decode_cert(space, filename): - cert = libssl_BIO_new(libssl_BIO_s_file()) - if not cert: - raise ssl_error(space, "Can't malloc memory to read file") - - try: - if libssl_BIO_read_filename(cert, filename) <= 0: - raise ssl_error(space, "Can't open file") - - x = libssl_PEM_read_bio_X509_AUX(cert, None, None, None) - if not x: - raise ssl_error(space, "Error decoding PEM-encoded file") - - try: - return _decode_certificate(space, x) - finally: - libssl_X509_free(x) - finally: - libssl_BIO_free(cert) - - -# Data structure for the password callbacks -class PasswordInfo(object): - w_callable = None - password = None - operationerror = None -PWINFO_STORAGE = {} - -def _password_callback(buf, size, rwflag, userdata): - index = rffi.cast(lltype.Signed, userdata) - pw_info = PWINFO_STORAGE.get(index, None) - if not pw_info: - return rffi.cast(rffi.INT, -1) - space = pw_info.space - password = "" - if pw_info.w_callable: - try: - w_result = space.call_function(pw_info.w_callable) - if space.isinstance_w(w_result, space.w_unicode): - password = space.str_w(w_result) - else: - try: - password = pw_info.space.bufferstr_w(w_result) - except OperationError as e: - if not e.match(space, space.w_TypeError): - raise - raise oefmt(space.w_TypeError, - "password callback must return a string") - except OperationError as e: - pw_info.operationerror = e - return rffi.cast(rffi.INT, -1) - else: - password = pw_info.password - size = widen(size) - if len(password) > size: - pw_info.operationerror = oefmt( - space.w_ValueError, - "password cannot be longer than %d bytes", size) - return rffi.cast(rffi.INT, -1) - for i, c in enumerate(password): - buf[i] = c - return rffi.cast(rffi.INT, len(password)) - -class ServernameCallback(object): - w_ctx = None - space = None -SERVERNAME_CALLBACKS = RWeakValueDictionary(int, ServernameCallback) - -def _servername_callback(ssl, ad, arg): - struct = SERVERNAME_CALLBACKS.get(rffi.cast(lltype.Signed, arg)) - w_ctx = struct.w_ctx - space = struct.space - w_callback = struct.w_set_hostname - if not w_ctx.servername_callback: - # Possible race condition. - return rffi.cast(rffi.INT, SSL_TLSEXT_ERR_OK) - # The high-level ssl.SSLSocket object - index = rffi.cast(lltype.Signed, libssl_SSL_get_app_data(ssl)) - w_ssl = SOCKET_STORAGE.get(index) - assert isinstance(w_ssl, SSLSocket) - # The servername callback expects an argument that represents the current - # SSL connection and that has a .context attribute that can be changed to - # identify the requested hostname. Since the official API is the Python - # level API we want to pass the callback a Python level object rather than - # a _ssl.SSLSocket instance. If there's an "owner" (typically an - # SSLObject) that will be passed. Otherwise if there's a socket then that - # will be passed. If both do not exist only then the C-level object is - # passed. - if w_ssl.w_owner is not None: - w_ssl_socket = w_ssl.w_owner() - elif w_ssl.w_socket is not None: - w_ssl_socket = w_ssl.w_socket() - else: - w_ssl_socket = w_ssl - if space.is_none(w_ssl_socket): - ad[0] = rffi.cast(rffi.INT, SSL_AD_INTERNAL_ERROR) - return rffi.cast(rffi.INT, SSL_TLSEXT_ERR_ALERT_FATAL) - - servername = libssl_SSL_get_servername(ssl, TLSEXT_NAMETYPE_host_name) - try: - if not servername: - w_result = space.call_function(w_callback, - w_ssl_socket, space.w_None, w_ctx) - - else: - w_servername = space.newbytes(rffi.charp2str(servername)) - try: - w_servername_idna = space.call_method( - w_servername, 'decode', space.wrap('idna')) - except OperationError as e: - e.write_unraisable(space, "undecodable server name") - ad[0] = rffi.cast(rffi.INT, SSL_AD_INTERNAL_ERROR) - return rffi.cast(rffi.INT, SSL_TLSEXT_ERR_ALERT_FATAL) - - w_result = space.call_function(w_callback, - w_ssl_socket, - w_servername_idna, w_ctx) - except OperationError as e: - e.write_unraisable(space, "in servername callback") - ad[0] = rffi.cast(rffi.INT, SSL_AD_HANDSHAKE_FAILURE) - return rffi.cast(rffi.INT, SSL_TLSEXT_ERR_ALERT_FATAL) - - if space.is_none(w_result): - return rffi.cast(rffi.INT, SSL_TLSEXT_ERR_OK) - else: - try: - ad[0] = rffi.cast(rffi.INT, space.int_w(w_result)) - except OperationError as e: - e.write_unraisable(space, "servername callback result") - ad[0] = rffi.cast(rffi.INT, SSL_AD_INTERNAL_ERROR) - return rffi.cast(rffi.INT, SSL_TLSEXT_ERR_ALERT_FATAL) - - -class SSLContext(W_Root): - ctx = lltype.nullptr(SSL_CTX.TO) - - def __init__(self, space, protocol): - if protocol == PY_SSL_VERSION_TLS: - method = libssl_TLS_method() - elif protocol == PY_SSL_VERSION_TLS1: - method = libssl_TLSv1_method() - elif protocol == PY_SSL_VERSION_SSL3 and not OPENSSL_NO_SSL3: - method = libssl_SSLv3_method() - elif protocol == PY_SSL_VERSION_SSL2 and not OPENSSL_NO_SSL2: - method = libssl_SSLv2_method() - elif protocol == PY_SSL_VERSION_TLS1_1 and HAVE_TLSv1_2: - method = libssl_TLSv1_1_method() - elif protocol == PY_SSL_VERSION_TLS1_2 and HAVE_TLSv1_2: - method = libssl_TLSv1_2_method() - else: - raise oefmt(space.w_ValueError, "invalid protocol version") - self.ctx = libssl_SSL_CTX_new(method) - if not self.ctx: - raise ssl_error(space, "failed to allocate SSL context") - - rgc.add_memory_pressure(10 * 1024 * 1024) - self.check_hostname = False - self.register_finalizer(space) - - # Defaults - libssl_SSL_CTX_set_verify(self.ctx, SSL_VERIFY_NONE, None) - options = SSL_OP_ALL & ~SSL_OP_DONT_INSERT_EMPTY_FRAGMENTS - if protocol != PY_SSL_VERSION_SSL2: - options |= SSL_OP_NO_SSLv2 - if protocol != PY_SSL_VERSION_SSL3: - options |= SSL_OP_NO_SSLv3 - libssl_SSL_CTX_set_options(self.ctx, options) - libssl_SSL_CTX_set_session_id_context(self.ctx, "Python", len("Python")) - - if not OPENSSL_NO_ECDH: - # Allow automatic ECDH curve selection (on - # OpenSSL 1.0.2+), or use prime256v1 by default. - # This is Apache mod_ssl's initialization - # policy, so we should be safe. - # OpenSSL 1.1 has it enabled by default. - if libssl_SSL_CTX_set_ecdh_auto: - libssl_SSL_CTX_set_ecdh_auto(self.ctx, 1) - else: - key = libssl_EC_KEY_new_by_curve_name(NID_X9_62_prime256v1) - if not key: - raise _ssl_seterror(space, None, 0) - try: - libssl_SSL_CTX_set_tmp_ecdh(self.ctx, key) - finally: - libssl_EC_KEY_free(key) - - def _finalize_(self): - ctx = self.ctx - if ctx: - self.ctx = lltype.nullptr(SSL_CTX.TO) - libssl_SSL_CTX_free(ctx) - - @staticmethod - @unwrap_spec(protocol=int) - def descr_new(space, w_subtype, protocol=PY_SSL_VERSION_TLS): - self = space.allocate_instance(SSLContext, w_subtype) - self.__init__(space, protocol) - return space.wrap(self) - - @unwrap_spec(cipherlist=str) - def set_ciphers_w(self, space, cipherlist): - ret = libssl_SSL_CTX_set_cipher_list(self.ctx, cipherlist) - if ret == 0: - # Clearing the error queue is necessary on some OpenSSL - # versions, otherwise the error will be reported again - # when another SSL call is done. - libssl_ERR_clear_error() - raise ssl_error(space, "No cipher can be selected.") - - @unwrap_spec(server_side=int) - def wrap_socket_w(self, space, w_sock, server_side, - w_server_hostname=None): - assert w_sock is not None - # server_hostname is either None (or absent), or to be encoded - # using the idna encoding. - if space.is_none(w_server_hostname): - hostname = None - else: - hostname = space.bytes_w( - space.call_method(w_server_hostname, - "encode", space.wrap("idna"))) - - if hostname and not HAS_SNI: - raise oefmt(space.w_ValueError, - "server_hostname is not supported by your OpenSSL " - "library") - - return new_sslobject(space, self, w_sock, server_side, hostname) - - def session_stats_w(self, space): - w_stats = space.newdict() - for name, ssl_func in SSL_CTX_STATS: - w_value = space.wrap(ssl_func(self.ctx)) - space.setitem_str(w_stats, name, w_value) - return w_stats - - def descr_set_default_verify_paths(self, space): - if not libssl_SSL_CTX_set_default_verify_paths(self.ctx): - raise ssl_error(space, "") - - def descr_get_options(self, space): - return space.newlong(libssl_SSL_CTX_get_options(self.ctx)) - - def descr_set_options(self, space, w_new_opts): - new_opts = space.int_w(w_new_opts) - opts = libssl_SSL_CTX_get_options(self.ctx) - clear = opts & ~new_opts - set = ~opts & new_opts - if clear: - if HAVE_SSL_CTX_CLEAR_OPTIONS: - libssl_SSL_CTX_clear_options(self.ctx, clear) - else: - raise oefmt(space.w_ValueError, - "can't clear options before OpenSSL 0.9.8m") - if set: - libssl_SSL_CTX_set_options(self.ctx, set) - - def descr_get_verify_mode(self, space): - mode = libssl_SSL_CTX_get_verify_mode(self.ctx) - if mode == SSL_VERIFY_NONE: - return space.newlong(PY_SSL_CERT_NONE) - elif mode == SSL_VERIFY_PEER: - return space.newlong(PY_SSL_CERT_OPTIONAL) - elif mode == SSL_VERIFY_PEER | SSL_VERIFY_FAIL_IF_NO_PEER_CERT: - return space.newlong(PY_SSL_CERT_REQUIRED) - raise ssl_error(space, "invalid return value from SSL_CTX_get_verify_mode") - - def descr_set_verify_mode(self, space, w_mode): - n = space.int_w(w_mode) - if n == PY_SSL_CERT_NONE: - mode = SSL_VERIFY_NONE - elif n == PY_SSL_CERT_OPTIONAL: - mode = SSL_VERIFY_PEER - elif n == PY_SSL_CERT_REQUIRED: - mode = SSL_VERIFY_PEER | SSL_VERIFY_FAIL_IF_NO_PEER_CERT - else: - raise oefmt(space.w_ValueError, - "invalid value for verify_mode") - if mode == SSL_VERIFY_NONE and self.check_hostname: - raise oefmt(space.w_ValueError, - "Cannot set verify_mode to CERT_NONE when " - "check_hostname is enabled.") - libssl_SSL_CTX_set_verify(self.ctx, mode, None) - - def descr_get_verify_flags(self, space): - store = libssl_SSL_CTX_get_cert_store(self.ctx) - param = libssl_X509_STORE_get0_param(store) - flags = libssl_X509_VERIFY_PARAM_get_flags(param) - return space.wrap(flags) - - def descr_set_verify_flags(self, space, w_obj): - new_flags = space.int_w(w_obj) - store = libssl_SSL_CTX_get_cert_store(self.ctx) - param = libssl_X509_STORE_get0_param(store) - flags = libssl_X509_VERIFY_PARAM_get_flags(param) - flags_clear = flags & ~new_flags - flags_set = ~flags & new_flags - if flags_clear and not libssl_X509_VERIFY_PARAM_clear_flags( - param, flags_clear): - raise _ssl_seterror(space, None, 0) - if flags_set and not libssl_X509_VERIFY_PARAM_set_flags( - param, flags_set): - raise _ssl_seterror(space, None, 0) - - def descr_get_check_hostname(self, space): - return space.newbool(self.check_hostname) - - def descr_set_check_hostname(self, space, w_obj): - check_hostname = space.is_true(w_obj) - if check_hostname and libssl_SSL_CTX_get_verify_mode(self.ctx) == SSL_VERIFY_NONE: - raise oefmt(space.w_ValueError, - "check_hostname needs a SSL context with either " - "CERT_OPTIONAL or CERT_REQUIRED") - self.check_hostname = check_hostname - - def load_cert_chain_w(self, space, w_certfile, w_keyfile=None, - w_password=None): - if space.is_none(w_certfile): - certfile = None - else: - certfile = space.str_w(w_certfile) - if space.is_none(w_keyfile): - keyfile = certfile - else: - keyfile = space.str_w(w_keyfile) - pw_info = PasswordInfo() - pw_info.space = space - index = -1 - if not space.is_none(w_password): - index = rthread.get_ident() - PWINFO_STORAGE[index] = pw_info - - if space.is_true(space.callable(w_password)): - pw_info.w_callable = w_password - else: - if space.isinstance_w(w_password, space.w_unicode): - pw_info.password = space.str_w(w_password) - else: - try: - pw_info.password = space.bufferstr_w(w_password) - except OperationError as e: - if not e.match(space, space.w_TypeError): - raise - raise oefmt(space.w_TypeError, - "password should be a string or callable") - - libssl_SSL_CTX_set_default_passwd_cb( - self.ctx, _password_callback) - libssl_SSL_CTX_set_default_passwd_cb_userdata( - self.ctx, rffi.cast(rffi.VOIDP, index)) - - try: - ret = libssl_SSL_CTX_use_certificate_chain_file(self.ctx, certfile) - if ret != 1: - if pw_info.operationerror: - libssl_ERR_clear_error() - raise pw_info.operationerror - errno = get_saved_errno() - if errno: - libssl_ERR_clear_error() - raise wrap_oserror(space, OSError(errno, ''), - exception_name = 'w_IOError') - else: - raise _ssl_seterror(space, None, -1) - - ret = libssl_SSL_CTX_use_PrivateKey_file(self.ctx, keyfile, - SSL_FILETYPE_PEM) - if ret != 1: - if pw_info.operationerror: - libssl_ERR_clear_error() - raise pw_info.operationerror - errno = get_saved_errno() - if errno: - libssl_ERR_clear_error() - raise wrap_oserror(space, OSError(errno, ''), - exception_name = 'w_IOError') - else: - raise _ssl_seterror(space, None, -1) - - ret = libssl_SSL_CTX_check_private_key(self.ctx) - if ret != 1: - raise _ssl_seterror(space, None, -1) - finally: - if index >= 0: - del PWINFO_STORAGE[index] - libssl_SSL_CTX_set_default_passwd_cb( - self.ctx, lltype.nullptr(pem_password_cb.TO)) - libssl_SSL_CTX_set_default_passwd_cb_userdata( - self.ctx, None) - - @unwrap_spec(filepath='fsencode') - def load_dh_params_w(self, space, filepath): - bio = libssl_BIO_new_file(filepath, "r") - if not bio: - errno = get_saved_errno() - libssl_ERR_clear_error() - raise wrap_oserror(space, OSError(errno, ''), - exception_name = 'w_IOError') - try: - dh = libssl_PEM_read_bio_DHparams(bio, None, None, None) - finally: - libssl_BIO_free(bio) - if not dh: - errno = get_saved_errno() - if errno != 0: - libssl_ERR_clear_error() - raise wrap_oserror(space, OSError(errno, '')) - else: - raise _ssl_seterror(space, None, 0) - try: - if libssl_SSL_CTX_set_tmp_dh(self.ctx, dh) == 0: - raise _ssl_seterror(space, None, 0) - finally: - libssl_DH_free(dh) - - def load_verify_locations_w(self, space, w_cafile=None, w_capath=None, - w_cadata=None): - if space.is_none(w_cafile): - cafile = None - else: - cafile = space.str_w(w_cafile) - if space.is_none(w_capath): - capath = None - else: - capath = space.str_w(w_capath) - if space.is_none(w_cadata): - cadata = None - ca_file_type = -1 - else: - if not space.isinstance_w(w_cadata, space.w_unicode): - ca_file_type = SSL_FILETYPE_ASN1 - cadata = space.bufferstr_w(w_cadata) - else: - ca_file_type = SSL_FILETYPE_PEM - try: - cadata = space.unicode_w(w_cadata).encode('ascii') - except UnicodeEncodeError: - raise oefmt(space.w_TypeError, - "cadata should be a ASCII string or a " - "bytes-like object") - if cafile is None and capath is None and cadata is None: - raise oefmt(space.w_TypeError, - "cafile and capath cannot be both omitted") - # load from cadata - if cadata is not None: - with rffi.scoped_nonmovingbuffer(cadata) as buf: - self._add_ca_certs(space, buf, len(cadata), ca_file_type) - - # load cafile or capath - if cafile is not None or capath is not None: - ret = libssl_SSL_CTX_load_verify_locations( - self.ctx, cafile, capath) - if ret != 1: - errno = get_saved_errno() - if errno: - libssl_ERR_clear_error() - raise wrap_oserror(space, OSError(errno, ''), - exception_name = 'w_IOError') - else: - raise _ssl_seterror(space, None, -1) - - def _add_ca_certs(self, space, data, size, ca_file_type): - biobuf = libssl_BIO_new_mem_buf(data, size) - if not biobuf: - raise ssl_error(space, "Can't allocate buffer") - try: - store = libssl_SSL_CTX_get_cert_store(self.ctx) - loaded = 0 - while True: - if ca_file_type == SSL_FILETYPE_ASN1: - cert = libssl_d2i_X509_bio( - biobuf, None) - else: - cert = libssl_PEM_read_bio_X509( - biobuf, None, None, None) - if not cert: - break - try: - r = libssl_X509_STORE_add_cert(store, cert) - finally: - libssl_X509_free(cert) - if not r: - err = libssl_ERR_peek_last_error() - if (libssl_ERR_GET_LIB(err) == ERR_LIB_X509 and - libssl_ERR_GET_REASON(err) == - X509_R_CERT_ALREADY_IN_HASH_TABLE): - # cert already in hash table, not an error - libssl_ERR_clear_error() - else: - break - loaded += 1 - - err = libssl_ERR_peek_last_error() - if (ca_file_type == SSL_FILETYPE_ASN1 and - loaded > 0 and - libssl_ERR_GET_LIB(err) == ERR_LIB_ASN1 and - libssl_ERR_GET_REASON(err) == ASN1_R_HEADER_TOO_LONG): - # EOF ASN1 file, not an error - libssl_ERR_clear_error() - elif (ca_file_type == SSL_FILETYPE_PEM and - loaded > 0 and - libssl_ERR_GET_LIB(err) == ERR_LIB_PEM and - libssl_ERR_GET_REASON(err) == PEM_R_NO_START_LINE): - # EOF PEM file, not an error - libssl_ERR_clear_error() - else: - raise _ssl_seterror(space, None, 0) - finally: - libssl_BIO_free(biobuf) - - def cert_store_stats_w(self, space): - store = libssl_SSL_CTX_get_cert_store(self.ctx) - x509 = 0 - x509_ca = 0 - crl = 0 - objs = libssl_X509_STORE_get0_objects(store) - for i in range(libssl_sk_X509_OBJECT_num(objs)): - obj = libssl_sk_X509_OBJECT_value(objs, i) - obj_type = intmask(libssl_X509_OBJECT_get_type(obj)) - if obj_type == X509_LU_X509: - x509 += 1 - if libssl_X509_check_ca( - libssl_X509_OBJECT_get0_X509(obj)): - x509_ca += 1 - elif obj_type == X509_LU_CRL: - crl += 1 - else: - # Ignore X509_LU_FAIL, X509_LU_RETRY, X509_LU_PKEY. - # As far as I can tell they are internal states and never - # stored in a cert store - pass - w_result = space.newdict() - space.setitem(w_result, - space.wrap('x509'), space.wrap(x509)) - space.setitem(w_result, - space.wrap('x509_ca'), space.wrap(x509_ca)) - space.setitem(w_result, - space.wrap('crl'), space.wrap(crl)) - return w_result - - @unwrap_spec(protos='bufferstr') - def set_npn_protocols_w(self, space, protos): - if not HAS_NPN: - raise oefmt(space.w_NotImplementedError, - "The NPN extension requires OpenSSL 1.0.1 or later.") - - self.npn_protocols = SSLNpnProtocols(self.ctx, protos) - - @unwrap_spec(protos='bufferstr') - def set_alpn_protocols_w(self, space, protos): - if not HAS_ALPN: - raise oefmt(space.w_NotImplementedError, - "The ALPN extension requires OpenSSL 1.0.2 or later.") - - self.alpn_protocols = SSLAlpnProtocols(self.ctx, protos) - - def get_ca_certs_w(self, space, w_binary_form=None): - if w_binary_form and space.is_true(w_binary_form): - binary_mode = True - else: - binary_mode = False - rlist = [] - store = libssl_SSL_CTX_get_cert_store(self.ctx) - objs = libssl_X509_STORE_get0_objects(store) - for i in range(libssl_sk_X509_OBJECT_num(objs)): - obj = libssl_sk_X509_OBJECT_value(objs, i) - if intmask(libssl_X509_OBJECT_get_type(obj)) != X509_LU_X509: - # not a x509 cert - continue - # CA for any purpose - cert = libssl_X509_OBJECT_get0_X509(obj) - if not libssl_X509_check_ca(cert): - continue - if binary_mode: - rlist.append(_certificate_to_der(space, cert)) - else: - rlist.append(_decode_certificate(space, cert)) - return space.newlist(rlist) - - @unwrap_spec(name=str) - def set_ecdh_curve_w(self, space, name): - nid = libssl_OBJ_sn2nid(name) - if nid == 0: - raise oefmt(space.w_ValueError, - "unknown elliptic curve name '%s'", name) - key = libssl_EC_KEY_new_by_curve_name(nid) - if not key: - raise _ssl_seterror(space, None, 0) - try: - libssl_SSL_CTX_set_tmp_ecdh(self.ctx, key) - finally: - libssl_EC_KEY_free(key) - - def set_servername_callback_w(self, space, w_callback): - if space.is_none(w_callback): - libssl_SSL_CTX_set_tlsext_servername_callback( - self.ctx, lltype.nullptr(servername_cb.TO)) - self.servername_callback = None - return - if not space.is_true(space.callable(w_callback)): - raise oefmt(space.w_TypeError, "not a callable object") - callback_struct = ServernameCallback() - callback_struct.space = space - callback_struct.w_ctx = self - callback_struct.w_set_hostname = w_callback - self.servername_callback = callback_struct - index = compute_unique_id(self) - SERVERNAME_CALLBACKS.set(index, callback_struct) - libssl_SSL_CTX_set_tlsext_servername_callback( - self.ctx, _servername_callback) - libssl_SSL_CTX_set_tlsext_servername_arg(self.ctx, - rffi.cast(rffi.VOIDP, index)) - -SSLContext.typedef = TypeDef( - "_ssl._SSLContext", - __new__ = interp2app(SSLContext.descr_new), - _wrap_socket = interp2app(SSLContext.wrap_socket_w), - set_ciphers = interp2app(SSLContext.set_ciphers_w), - load_cert_chain = interp2app(SSLContext.load_cert_chain_w), - load_verify_locations = interp2app(SSLContext.load_verify_locations_w), - session_stats = interp2app(SSLContext.session_stats_w), - cert_store_stats=interp2app(SSLContext.cert_store_stats_w), - load_dh_params=interp2app(SSLContext.load_dh_params_w), - set_default_verify_paths=interp2app(SSLContext.descr_set_default_verify_paths), - _set_npn_protocols=interp2app(SSLContext.set_npn_protocols_w), - _set_alpn_protocols=interp2app(SSLContext.set_alpn_protocols_w), - get_ca_certs=interp2app(SSLContext.get_ca_certs_w), - set_ecdh_curve=interp2app(SSLContext.set_ecdh_curve_w), - set_servername_callback=interp2app(SSLContext.set_servername_callback_w), - - options=GetSetProperty(SSLContext.descr_get_options, - SSLContext.descr_set_options), - verify_mode=GetSetProperty(SSLContext.descr_get_verify_mode, - SSLContext.descr_set_verify_mode), - verify_flags=GetSetProperty(SSLContext.descr_get_verify_flags, - SSLContext.descr_set_verify_flags), - check_hostname=GetSetProperty(SSLContext.descr_get_check_hostname, - SSLContext.descr_set_check_hostname), -) - - -def _asn1obj2py(space, obj): - nid = libssl_OBJ_obj2nid(obj) - if nid == NID_undef: - raise oefmt(space.w_ValueError, "Unknown object") - with rffi.scoped_alloc_buffer(100) as buf: - buflen = libssl_OBJ_obj2txt(buf.raw, 100, obj, 1) - if buflen < 0: - raise _ssl_seterror(space, None, 0) - if buflen: - w_buf = space.wrap(buf.str(buflen)) - else: - w_buf = space.w_None - w_sn = space.wrap(rffi.charp2str(libssl_OBJ_nid2sn(nid))) - w_ln = space.wrap(rffi.charp2str(libssl_OBJ_nid2ln(nid))) - return space.newtuple([space.wrap(nid), w_sn, w_ln, w_buf]) - - -@unwrap_spec(txt=str, name=bool) -def txt2obj(space, txt, name=False): - obj = libssl_OBJ_txt2obj(txt, not name) - if not obj: - raise oefmt(space.w_ValueError, "unknown object '%s'", txt) - try: - w_result = _asn1obj2py(space, obj) - finally: - libssl_ASN1_OBJECT_free(obj) - return w_result - - -@unwrap_spec(nid=int) -def nid2obj(space, nid): - if nid < NID_undef: - raise oefmt(space.w_ValueError, "NID must be positive") - obj = libssl_OBJ_nid2obj(nid) - if not obj: - raise oefmt(space.w_ValueError, "unknown NID %d", nid) - try: - w_result = _asn1obj2py(space, obj) - finally: - libssl_ASN1_OBJECT_free(obj) - return w_result - - -def w_convert_path(space, path): - if not path: - return space.w_None - else: - return fsdecode(space, space.newbytes(rffi.charp2str(path))) - -def get_default_verify_paths(space): - return space.newtuple([ - w_convert_path(space, libssl_X509_get_default_cert_file_env()), - w_convert_path(space, libssl_X509_get_default_cert_file()), - w_convert_path(space, libssl_X509_get_default_cert_dir_env()), - w_convert_path(space, libssl_X509_get_default_cert_dir()), - ]) - - -class MemoryBIO(W_Root): - pass -MemoryBIO.typedef = TypeDef( - "_ssl.MemoryBIO", -) diff --git a/pypy/module/_ssl/interp_win32.py b/pypy/module/_ssl/interp_win32.py deleted file mode 100644 --- a/pypy/module/_ssl/interp_win32.py +++ /dev/null @@ -1,180 +0,0 @@ -from rpython.rlib import rwin32 -from rpython.rtyper.lltypesystem import rffi, lltype -from rpython.rtyper.tool import rffi_platform -from rpython.translator.tool.cbuild import ExternalCompilationInfo -from pypy.interpreter.gateway import unwrap_spec _______________________________________________ pypy-commit mailing list pypy-commit@python.org https://mail.python.org/mailman/listinfo/pypy-commit