Author: Amaury Forgeot d'Arc <[email protected]>
Branch: py3k
Changeset: r75909:3896a15828da
Date: 2015-02-16 01:05 +0100
http://bitbucket.org/pypy/pypy/changeset/3896a15828da/
Log: Fix ssl module
diff --git a/pypy/module/_ssl/interp_ssl.py b/pypy/module/_ssl/interp_ssl.py
--- a/pypy/module/_ssl/interp_ssl.py
+++ b/pypy/module/_ssl/interp_ssl.py
@@ -1,4 +1,3 @@
-<<<<<<< local
import weakref
from rpython.rlib import rpoll, rsocket
@@ -14,6 +13,7 @@
from pypy.interpreter.error import OperationError, oefmt, wrap_oserror
from pypy.interpreter.gateway import interp2app, unwrap_spec
from pypy.interpreter.typedef import TypeDef, GetSetProperty
+from pypy.interpreter.unicodehelper import fsdecode
from pypy.module._ssl.ssl_data import (
LIBRARY_CODES_TO_NAMES, ERROR_CODES_TO_NAMES)
from pypy.module._socket import interp_socket
@@ -177,202 +177,6 @@
SOCKET_STORAGE = RWeakValueDictionary(int, W_Root)
-class SSLContext(W_Root):
- ctx = lltype.nullptr(SSL_CTX.TO)
-
- def __init__(self, space, protocol):
- if protocol == PY_SSL_VERSION_TLS1:
- method = libssl_TLSv1_method()
- elif protocol == PY_SSL_VERSION_SSL3 and not OPENSSL_NO_SSL3:
- method = libssl_SSLv3_method()
- elif protocol == PY_SSL_VERSION_SSL2 and not OPENSSL_NO_SSL2:
- method = libssl_SSLv2_method()
- elif protocol == PY_SSL_VERSION_SSL23:
- method = libssl_SSLv23_method()
- else:
- raise oefmt(space.w_ValueError, "invalid protocol version")
- self.ctx = libssl_SSL_CTX_new(method)
-
- # Defaults
- libssl_SSL_CTX_set_verify(self.ctx, SSL_VERIFY_NONE, None)
- options = SSL_OP_ALL & ~SSL_OP_DONT_INSERT_EMPTY_FRAGMENTS
- if protocol != PY_SSL_VERSION_SSL2:
- options |= SSL_OP_NO_SSLv2
- libssl_SSL_CTX_set_options(self.ctx, options)
- libssl_SSL_CTX_set_session_id_context(self.ctx, "Python",
len("Python"))
-
- def __del__(self):
- if self.ctx:
- libssl_SSL_CTX_free(self.ctx)
-
- @unwrap_spec(protocol=int)
- def descr_new(space, w_subtype, protocol=PY_SSL_VERSION_SSL23):
- self = space.allocate_instance(SSLContext, w_subtype)
- self.__init__(space, protocol)
- if not self.ctx:
- raise ssl_error(space, "failed to allocate SSL context")
- return space.wrap(self)
-
- @unwrap_spec(cipherlist=str)
- def set_ciphers_w(self, space, cipherlist):
- ret = libssl_SSL_CTX_set_cipher_list(self.ctx, cipherlist)
- if ret == 0:
- # Clearing the error queue is necessary on some OpenSSL
- # versions, otherwise the error will be reported again
- # when another SSL call is done.
- libssl_ERR_clear_error()
- raise ssl_error(space, "No cipher can be selected.")
-
- def get_verify_mode_w(self, space):
- verify_mode = libssl_SSL_CTX_get_verify_mode(self.ctx)
- if verify_mode == SSL_VERIFY_NONE:
- return space.wrap(PY_SSL_CERT_NONE)
- elif verify_mode == SSL_VERIFY_PEER:
- return space.wrap(PY_SSL_CERT_OPTIONAL)
- elif verify_mode == (SSL_VERIFY_PEER |
SSL_VERIFY_FAIL_IF_NO_PEER_CERT):
- return space.wrap(PY_SSL_CERT_REQUIRED)
- else:
- raise ssl_error(
- space, "invalid return value from SSL_CTX_get_verify_mode")
-
- def set_verify_mode_w(self, space, w_mode):
- mode = space.int_w(w_mode)
- if mode == PY_SSL_CERT_NONE:
- verify_mode = SSL_VERIFY_NONE
- elif mode == PY_SSL_CERT_OPTIONAL:
- verify_mode = SSL_VERIFY_PEER
- elif mode == PY_SSL_CERT_REQUIRED:
- verify_mode = SSL_VERIFY_PEER | SSL_VERIFY_FAIL_IF_NO_PEER_CERT
- else:
- raise OperationError(space.w_ValueError, space.wrap(
- "invalid value for verify_mode"))
- libssl_SSL_CTX_set_verify(self.ctx, verify_mode, None)
-
- def get_options_w(self, space):
- return space.wrap(libssl_SSL_CTX_get_options(self.ctx))
-
- def set_options_w(self, space, w_value):
- value = space.int_w(w_value)
- opts = libssl_SSL_CTX_get_options(self.ctx)
- clear = opts & ~value
- set = ~opts & value
- if clear:
- if HAVE_SSL_CTX_CLEAR_OPTIONS:
- libssl_SSL_CTX_clear_options(self.ctx, clear)
- else:
- raise OperationError(space.w_ValueError, space.wrap(
- "can't clear options before OpenSSL 0.9.8m"))
- if set:
- libssl_SSL_CTX_set_options(self.ctx, set)
-
- def load_cert_chain_w(self, space, w_certfile, w_keyfile=None):
- if space.is_none(w_certfile):
- certfile = None
- else:
- certfile = space.str_w(w_certfile)
- if space.is_none(w_keyfile):
- keyfile = certfile
- else:
- keyfile = space.str_w(w_keyfile)
-
- ret = libssl_SSL_CTX_use_certificate_chain_file(self.ctx, certfile)
- if ret != 1:
- errno = get_saved_errno()
- if errno:
- libssl_ERR_clear_error()
- raise wrap_oserror(space, OSError(errno, ''),
- exception_name = 'w_IOError')
- else:
- raise _ssl_seterror(space, None, -1)
-
- ret = libssl_SSL_CTX_use_PrivateKey_file(self.ctx, keyfile,
- SSL_FILETYPE_PEM)
- if ret != 1:
- errno = get_saved_errno()
- if errno:
- libssl_ERR_clear_error()
- raise wrap_oserror(space, OSError(errno, ''),
- exception_name = 'w_IOError')
- else:
- raise _ssl_seterror(space, None, -1)
-
- ret = libssl_SSL_CTX_check_private_key(self.ctx)
- if ret != 1:
- raise _ssl_seterror(space, None, -1)
-
- def load_verify_locations_w(self, space, w_cafile=None, w_capath=None):
- if space.is_none(w_cafile):
- cafile = None
- else:
- cafile = space.str_w(w_cafile)
- if space.is_none(w_capath):
- capath = None
- else:
- capath = space.str_w(w_capath)
- if cafile is None and capath is None:
- raise OperationError(space.w_TypeError, space.wrap(
- "cafile and capath cannot be both omitted"))
- ret = libssl_SSL_CTX_load_verify_locations(
- self.ctx, cafile, capath)
- if ret != 1:
- errno = get_saved_errno()
- if errno:
- libssl_ERR_clear_error()
- raise wrap_oserror(space, OSError(errno, ''),
- exception_name = 'w_IOError')
- else:
- raise _ssl_seterror(space, None, -1)
-
- @unwrap_spec(server_side=int)
- def wrap_socket_w(self, space, w_sock, server_side,
- w_server_hostname=None):
- assert w_sock is not None
- # server_hostname is either None (or absent), or to be encoded
- # using the idna encoding.
- if space.is_none(w_server_hostname):
- hostname = None
- else:
- hostname = space.bytes_w(
- space.call_method(w_server_hostname,
- "encode", space.wrap("idna")))
-
- if hostname and not HAS_SNI:
- raise OperationError(space.w_ValueError,
- space.wrap("server_hostname is not supported "
- "by your OpenSSL library"))
-
- return new_sslobject(space, self.ctx, w_sock, server_side, hostname)
-
- def session_stats_w(self, space):
- w_stats = space.newdict()
- for name, ssl_func in SSL_CTX_STATS:
- w_value = space.wrap(ssl_func(self.ctx))
- space.setitem_str(w_stats, name, w_value)
- return w_stats
-
- def set_default_verify_paths_w(self, space):
- ret = libssl_SSL_CTX_set_default_verify_paths(self.ctx)
- if ret != 1:
- raise _ssl_seterror(space, None, -1)
-
-
-SSLContext.typedef = TypeDef(
- "_SSLContext",
- __new__ = interp2app(SSLContext.descr_new.im_func),
- options = GetSetProperty(SSLContext.get_options_w,
- SSLContext.set_options_w),
- verify_mode = GetSetProperty(SSLContext.get_verify_mode_w,
- SSLContext.set_verify_mode_w),
- _wrap_socket = interp2app(SSLContext.wrap_socket_w),
- set_ciphers = interp2app(SSLContext.set_ciphers_w),
- load_cert_chain = interp2app(SSLContext.load_cert_chain_w),
- load_verify_locations = interp2app(SSLContext.load_verify_locations_w),
- session_stats = interp2app(SSLContext.session_stats_w),
- set_default_verify_paths=interp2app(SSLContext.set_default_verify_paths_w),
-)
-
-
-
if HAVE_OPENSSL_RAND:
# helper routines for seeding the SSL PRNG
@unwrap_spec(string=str, entropy=float)
@@ -499,7 +303,7 @@
"Underlying socket too large for select().")
elif sockstate == SOCKET_HAS_BEEN_CLOSED:
if libssl_SSL_get_shutdown(self.ssl) == SSL_RECEIVED_SHUTDOWN:
- if space.is_none(w_buf):
+ if space.is_none(w_buffer):
return space.wrapbytes('')
else:
return space.wrap(0)
@@ -525,7 +329,7 @@
sockstate = checkwait(space, w_socket, True)
elif (err == SSL_ERROR_ZERO_RETURN and
libssl_SSL_get_shutdown(self.ssl) ==
SSL_RECEIVED_SHUTDOWN):
- if space.is_none(w_buf):
+ if space.is_none(w_buffer):
return space.wrapbytes('')
else:
return space.wrap(0)
@@ -761,13 +565,13 @@
length = libssl_SSL_get_peer_finished(self.ssl, buf, CB_MAXLEN)
if length > 0:
- return space.wrap(rffi.charpsize2str(buf, intmask(length)))
+ return space.wrapbytes(rffi.charpsize2str(buf,
intmask(length)))
def descr_get_context(self, space):
return self.w_ctx
def descr_set_context(self, space, w_ctx):
- ctx = space.interp_w(_SSLContext, w_ctx)
+ ctx = space.interp_w(SSLContext, w_ctx)
if not HAS_SNI:
raise oefmt(space.w_NotImplementedError,
"setting a socket's context "
@@ -776,22 +580,20 @@
libssl_SSL_set_SSL_CTX(self.ssl, ctx.ctx)
-_SSLSocket.typedef = TypeDef(
- "_ssl._SSLSocket",
-
- do_handshake=interp2app(_SSLSocket.do_handshake),
- write=interp2app(_SSLSocket.write),
- read=interp2app(_SSLSocket.read),
- pending=interp2app(_SSLSocket.pending),
- peer_certificate=interp2app(_SSLSocket.peer_certificate),
- cipher=interp2app(_SSLSocket.cipher),
- shutdown=interp2app(_SSLSocket.shutdown),
- selected_npn_protocol = interp2app(_SSLSocket.selected_npn_protocol),
- compression = interp2app(_SSLSocket.compression_w),
- version = interp2app(_SSLSocket.version_w),
- tls_unique_cb = interp2app(_SSLSocket.tls_unique_cb_w),
- context=GetSetProperty(_SSLSocket.descr_get_context,
- _SSLSocket.descr_set_context),
+SSLSocket.typedef = TypeDef("_ssl._SSLSocket",
+ write = interp2app(SSLSocket.write),
+ pending = interp2app(SSLSocket.pending),
+ read = interp2app(SSLSocket.read),
+ do_handshake = interp2app(SSLSocket.do_handshake),
+ shutdown = interp2app(SSLSocket.shutdown),
+ cipher = interp2app(SSLSocket.cipher),
+ peer_certificate = interp2app(SSLSocket.peer_certificate),
+ selected_npn_protocol = interp2app(SSLSocket.selected_npn_protocol),
+ compression = interp2app(SSLSocket.compression_w),
+ version = interp2app(SSLSocket.version_w),
+ tls_unique_cb = interp2app(SSLSocket.tls_unique_cb_w),
+ context=GetSetProperty(SSLSocket.descr_get_context,
+ SSLSocket.descr_set_context),
)
def _certificate_to_der(space, certificate):
@@ -801,7 +603,7 @@
if length < 0:
raise _ssl_seterror(space, None, 0)
try:
- return space.wrap(rffi.charpsize2str(buf_ptr[0], length))
+ return space.wrapbytes(rffi.charpsize2str(buf_ptr[0], length))
finally:
libssl_OPENSSL_free(buf_ptr[0])
@@ -1021,16 +823,6 @@
return space.newtuple([w_name, w_value])
-SSLSocket.typedef = TypeDef("_SSLSocket",
- write = interp2app(SSLSocket.write),
- pending = interp2app(SSLSocket.pending),
- read = interp2app(SSLSocket.read),
- do_handshake = interp2app(SSLSocket.do_handshake),
- shutdown = interp2app(SSLSocket.shutdown),
- cipher = interp2app(SSLSocket.cipher),
- peer_certificate = interp2app(SSLSocket.peer_certificate),
-)
-
def _get_aia_uri(space, certificate, nid):
info = rffi.cast(AUTHORITY_INFO_ACCESS, libssl_X509_get_ext_d2i(
certificate, NID_info_access, None, None))
@@ -1109,6 +901,7 @@
libssl_SSL_set_connect_state(ss.ssl)
else:
libssl_SSL_set_accept_state(ss.ssl)
+ ss.socket_type = side
ss.w_socket = weakref.ref(w_sock)
return ss
@@ -1339,11 +1132,8 @@
# The high-level ssl.SSLSocket object
index = rffi.cast(lltype.Signed, libssl_SSL_get_app_data(ssl))
w_ssl = SOCKET_STORAGE.get(index)
- assert isinstance(w_ssl, _SSLSocket)
- if w_ssl.ssl_sock_weakref_w is not None:
- w_ssl_socket = w_ssl.ssl_sock_weakref_w()
- else:
- w_ssl_socket = space.w_None
+ assert isinstance(w_ssl, SSLSocket)
+ w_ssl_socket = w_ssl # So far. Need to change in 3.3.
if space.is_none(w_ssl_socket):
ad[0] = rffi.cast(rffi.INT, SSL_AD_INTERNAL_ERROR)
return rffi.cast(rffi.INT, SSL_TLSEXT_ERR_ALERT_FATAL)
@@ -1383,10 +1173,10 @@
return rffi.cast(rffi.INT, SSL_TLSEXT_ERR_ALERT_FATAL)
-class _SSLContext(W_Root):
- @staticmethod
- @unwrap_spec(protocol=int)
- def descr_new(space, w_subtype, protocol):
+class SSLContext(W_Root):
+ ctx = lltype.nullptr(SSL_CTX.TO)
+
+ def __init__(self, space, protocol):
if protocol == PY_SSL_VERSION_TLS1:
method = libssl_TLSv1_method()
elif protocol == PY_SSL_VERSION_SSL3 and not OPENSSL_NO_SSL3:
@@ -1397,17 +1187,19 @@
method = libssl_SSLv23_method()
else:
raise oefmt(space.w_ValueError, "invalid protocol version")
- ctx = libssl_SSL_CTX_new(method)
- if not ctx:
+ self.ctx = libssl_SSL_CTX_new(method)
+ if not self.ctx:
raise ssl_error(space, "failed to allocate SSL context")
- self = space.allocate_instance(_SSLContext, w_subtype)
- self.ctx = ctx
self.check_hostname = False
+
+ # Defaults
+ libssl_SSL_CTX_set_verify(self.ctx, SSL_VERIFY_NONE, None)
options = SSL_OP_ALL & ~SSL_OP_DONT_INSERT_EMPTY_FRAGMENTS
if protocol != PY_SSL_VERSION_SSL2:
options |= SSL_OP_NO_SSLv2
- libssl_SSL_CTX_set_options(ctx, options)
+ libssl_SSL_CTX_set_options(self.ctx, options)
+ libssl_SSL_CTX_set_session_id_context(self.ctx, "Python",
len("Python"))
if not OPENSSL_NO_ECDH:
# Allow automatic ECDH curve selection (on
@@ -1425,18 +1217,46 @@
finally:
libssl_EC_KEY_free(key)
- return self
+ def __del__(self):
+ if self.ctx:
+ libssl_SSL_CTX_free(self.ctx)
+
+ @staticmethod
+ @unwrap_spec(protocol=int)
+ def descr_new(space, w_subtype, protocol=PY_SSL_VERSION_SSL23):
+ self = space.allocate_instance(SSLContext, w_subtype)
+ self.__init__(space, protocol)
+ return space.wrap(self)
+
+ @unwrap_spec(cipherlist=str)
+ def set_ciphers_w(self, space, cipherlist):
+ ret = libssl_SSL_CTX_set_cipher_list(self.ctx, cipherlist)
+ if ret == 0:
+ # Clearing the error queue is necessary on some OpenSSL
+ # versions, otherwise the error will be reported again
+ # when another SSL call is done.
+ libssl_ERR_clear_error()
+ raise ssl_error(space, "No cipher can be selected.")
@unwrap_spec(server_side=int)
- def descr_wrap_socket(self, space, w_sock, server_side,
w_server_hostname=None, w_ssl_sock=None):
- return _SSLSocket.descr_new(space, self, w_sock, server_side,
w_server_hostname, w_ssl_sock)
+ def wrap_socket_w(self, space, w_sock, server_side,
+ w_server_hostname=None):
+ assert w_sock is not None
+ # server_hostname is either None (or absent), or to be encoded
+ # using the idna encoding.
+ if space.is_none(w_server_hostname):
+ hostname = None
+ else:
+ hostname = space.bytes_w(
+ space.call_method(w_server_hostname,
+ "encode", space.wrap("idna")))
- @unwrap_spec(cipherlist=str)
- def descr_set_ciphers(self, space, cipherlist):
- ret = libssl_SSL_CTX_set_cipher_list(self.ctx, cipherlist)
- if ret == 0:
- libssl_ERR_clear_error()
- raise ssl_error(space, "No cipher can be selected.")
+ if hostname and not HAS_SNI:
+ raise OperationError(space.w_ValueError,
+ space.wrap("server_hostname is not supported "
+ "by your OpenSSL library"))
+
+ return new_sslobject(space, self.ctx, w_sock, server_side, hostname)
def session_stats_w(self, space):
w_stats = space.newdict()
@@ -1806,30 +1626,30 @@
libssl_SSL_CTX_set_tlsext_servername_arg(self.ctx,
rffi.cast(rffi.VOIDP, index))
-_SSLContext.typedef = TypeDef(
+SSLContext.typedef = TypeDef(
"_ssl._SSLContext",
- __new__=interp2app(_SSLContext.descr_new),
- _wrap_socket=interp2app(_SSLContext.descr_wrap_socket),
- set_ciphers=interp2app(_SSLContext.descr_set_ciphers),
- cert_store_stats=interp2app(_SSLContext.cert_store_stats_w),
- load_cert_chain=interp2app(_SSLContext.load_cert_chain_w),
- load_dh_params=interp2app(_SSLContext.load_dh_params_w),
- load_verify_locations=interp2app(_SSLContext.load_verify_locations_w),
- session_stats = interp2app(_SSLContext.session_stats_w),
-
set_default_verify_paths=interp2app(_SSLContext.descr_set_default_verify_paths),
- _set_npn_protocols=interp2app(_SSLContext.set_npn_protocols_w),
- get_ca_certs=interp2app(_SSLContext.get_ca_certs_w),
- set_ecdh_curve=interp2app(_SSLContext.set_ecdh_curve_w),
- set_servername_callback=interp2app(_SSLContext.set_servername_callback_w),
+ __new__ = interp2app(SSLContext.descr_new),
+ _wrap_socket = interp2app(SSLContext.wrap_socket_w),
+ set_ciphers = interp2app(SSLContext.set_ciphers_w),
+ load_cert_chain = interp2app(SSLContext.load_cert_chain_w),
+ load_verify_locations = interp2app(SSLContext.load_verify_locations_w),
+ session_stats = interp2app(SSLContext.session_stats_w),
+ cert_store_stats=interp2app(SSLContext.cert_store_stats_w),
+ load_dh_params=interp2app(SSLContext.load_dh_params_w),
+
set_default_verify_paths=interp2app(SSLContext.descr_set_default_verify_paths),
+ _set_npn_protocols=interp2app(SSLContext.set_npn_protocols_w),
+ get_ca_certs=interp2app(SSLContext.get_ca_certs_w),
+ set_ecdh_curve=interp2app(SSLContext.set_ecdh_curve_w),
+ set_servername_callback=interp2app(SSLContext.set_servername_callback_w),
- options=GetSetProperty(_SSLContext.descr_get_options,
- _SSLContext.descr_set_options),
- verify_mode=GetSetProperty(_SSLContext.descr_get_verify_mode,
- _SSLContext.descr_set_verify_mode),
- verify_flags=GetSetProperty(_SSLContext.descr_get_verify_flags,
- _SSLContext.descr_set_verify_flags),
- check_hostname=GetSetProperty(_SSLContext.descr_get_check_hostname,
- _SSLContext.descr_set_check_hostname),
+ options=GetSetProperty(SSLContext.descr_get_options,
+ SSLContext.descr_set_options),
+ verify_mode=GetSetProperty(SSLContext.descr_get_verify_mode,
+ SSLContext.descr_set_verify_mode),
+ verify_flags=GetSetProperty(SSLContext.descr_get_verify_flags,
+ SSLContext.descr_set_verify_flags),
+ check_hostname=GetSetProperty(SSLContext.descr_get_check_hostname,
+ SSLContext.descr_set_check_hostname),
)
@@ -1880,7 +1700,7 @@
if not path:
return space.w_None
else:
- return space.wrapbytes(rffi.charp2str(path))
+ return fsdecode(space, space.wrapbytes(rffi.charp2str(path)))
def get_default_verify_paths(space):
return space.newtuple([
diff --git a/pypy/module/_ssl/test/test_ssl.py
b/pypy/module/_ssl/test/test_ssl.py
--- a/pypy/module/_ssl/test/test_ssl.py
+++ b/pypy/module/_ssl/test/test_ssl.py
@@ -73,28 +73,6 @@
s = _socket.socket()
ss = ssl.wrap_socket(s)
- exc = raises(_socket.error, ss.do_handshake)
- if sys.platform == 'win32':
- assert exc.value.errno == 10057 # WSAENOTCONN
- else:
- assert exc.value.errno == 32 # Broken pipe
- del exc, ss, s
- gc.collect() # force the destructor() to be called now
-
- def test_async_closed(self):
- import _ssl, _socket, sys, gc
- s = _socket.socket()
- s.settimeout(3)
- if sys.version_info < (2, 7, 9):
- ss = _ssl.sslwrap(s, 0)
- else:
- ss = _ssl._SSLContext(_ssl.PROTOCOL_TLSv1)._wrap_socket(s, 0)
- s.close()
- exc = raises(_ssl.SSLError, ss.write, "data")
- assert exc.value.message == 'Underlying socket has been closed.'
- del exc, ss, s
- gc.collect() # force the destructor() to be called now
-
def test_test_decode_nullbytecert(self):
import _ssl
p = _ssl._test_decode_cert(self.nullbytecert)
@@ -119,7 +97,7 @@
s = _ssl._SSLContext(_ssl.PROTOCOL_TLSv1)
raises(ValueError, _ssl._SSLContext, -1)
- assert type(s.options) is long
+ assert type(s.options) is int
assert s.options & _ssl.OP_NO_SSLv2
s.options &= ~_ssl.OP_NO_SSLv2
assert not s.options & _ssl.OP_NO_SSLv2
@@ -136,7 +114,7 @@
exc = raises(ValueError, "s.verify_mode = 1234")
assert str(exc.value) == "invalid value for verify_mode"
- assert type(s.verify_flags) is long
+ assert type(s.verify_flags) is int
assert s.verify_flags == _ssl.VERIFY_DEFAULT
s.verify_flags = _ssl.VERIFY_CRL_CHECK_LEAF
assert s.verify_flags == _ssl.VERIFY_CRL_CHECK_LEAF
@@ -260,7 +238,7 @@
import socket, _ssl, gc
ctx = _ssl._SSLContext(_ssl.PROTOCOL_TLSv1)
ctx._set_npn_protocols(b'\x08http/1.1\x06spdy/2')
- ss = ctx._wrap_socket(self.s._sock, True,
+ ss = ctx._wrap_socket(self.s, True,
server_hostname="svn.python.org")
self.s.close()
del ss; gc.collect()
@@ -269,7 +247,7 @@
import ssl, sys, gc
ss = ssl.wrap_socket(self.s)
ss.do_handshake()
- assert isinstance(ss.get_channel_binding(), bytes)
+ assert isinstance(ss._sslobj.tls_unique_cb(), bytes)
self.s.close()
del ss; gc.collect()
@@ -277,7 +255,7 @@
import ssl, sys, gc
ss = ssl.wrap_socket(self.s)
ss.do_handshake()
- assert ss.compression() in [None, 'ZLIB', 'RLE']
+ assert ss._sslobj.compression() in [None, 'ZLIB', 'RLE']
self.s.close()
del ss; gc.collect()
@@ -351,7 +329,7 @@
ctx = _ssl._SSLContext(_ssl.PROTOCOL_TLSv1)
with open(self.keycert) as f:
- cacert_pem = f.read().decode('ascii')
+ cacert_pem = f.read()
ctx.load_verify_locations(cadata=cacert_pem)
assert ctx.cert_store_stats()["x509_ca"] == 0
@@ -450,32 +428,6 @@
s = str(exc.value)
assert s.startswith("[PEM: NO_START_LINE] no start line")
- def test_subclass(self):
- # Check that the appropriate SSLError subclass is raised
- # (this only tests one of them)
- import _ssl, _socket
- ctx = _ssl._SSLContext(_ssl.PROTOCOL_TLSv1)
- s = _socket.socket()
- try:
- s.bind(("127.0.0.1", 0))
- s.listen(5)
- c = _socket.socket()
- c.connect(s.getsockname())
- c.setblocking(False)
-
- c = ctx._wrap_socket(c, False)
- try:
- exc = raises(_ssl.SSLWantReadError, c.do_handshake)
- msg= str(exc.value)
- assert msg.startswith("The operation did not complete (read)")
- # For compatibility
- assert exc.value.errno == _ssl.SSL_ERROR_WANT_READ
- finally:
- c.shutdown()
- finally:
- s.close()
->>>>>>> other
-
SSL_CERTIFICATE = """
-----BEGIN CERTIFICATE-----
MIICVDCCAb2gAwIBAgIJANfHOBkZr8JOMA0GCSqGSIb3DQEBBQUAMF8xCzAJBgNV
diff --git a/pypy/objspace/std/objspace.py b/pypy/objspace/std/objspace.py
--- a/pypy/objspace/std/objspace.py
+++ b/pypy/objspace/std/objspace.py
@@ -155,7 +155,7 @@
# poor man's x.decode('ascii', 'replace'), since it's not
# supported by RPython
if not we_are_translated():
- print 'WARNING: space.str() called on a non-ascii byte
string: %r' % x
+ print 'WARNING: space.wrap() called on a non-ascii byte
string: %r' % x
lst = []
for ch in x:
ch = ord(ch)
_______________________________________________
pypy-commit mailing list
[email protected]
https://mail.python.org/mailman/listinfo/pypy-commit