Author: Amaury Forgeot d'Arc <[email protected]>
Branch: stdlib-2.7.9
Changeset: r75789:13c3e6256e35
Date: 2015-02-09 23:34 +0100
http://bitbucket.org/pypy/pypy/changeset/13c3e6256e35/
Log: ssl: add win32-specific functions. Completely blind translation,
need to test on buildbot.
diff --git a/pypy/module/_ssl/__init__.py b/pypy/module/_ssl/__init__.py
--- a/pypy/module/_ssl/__init__.py
+++ b/pypy/module/_ssl/__init__.py
@@ -1,3 +1,4 @@
+import sys
from rpython.rlib.rarithmetic import intmask
from pypy.interpreter.mixedmodule import MixedModule
from pypy.module._ssl import ssl_data
@@ -23,6 +24,10 @@
'_SSLContext': 'interp_ssl._SSLContext',
}
+ if sys.platform == 'win32':
+ interpleveldefs['enum_certificates'] =
'interp_win32.enum_certificates_w'
+ interpleveldefs['enum_crls'] = 'interp_win32.enum_crls_w'
+
appleveldefs = {
}
diff --git a/pypy/module/_ssl/interp_win32.py b/pypy/module/_ssl/interp_win32.py
new file mode 100644
--- /dev/null
+++ b/pypy/module/_ssl/interp_win32.py
@@ -0,0 +1,166 @@
+from rpython.rlib import rwin32
+from rpython.rtyper.lltypesystem import rffi
+from rpython.rtyper.tool import rffi_platform
+from rpython.translator.tool.cbuild import ExternalCompilationInfo
+from pypy.interpreter.gateway import unwrap_spec
+from pypy.interpreter.error import wrap_windowserror
+
+eci = ExternalCompilationInfo(
+ includes = ['windows.h', 'stdio.h', 'stdlib.h'],
+ libraries = ['crypt32'],
+)
+
+class CConfig:
+ _compilation_info_ = eci
+
+ X509_ASN_ENCODING = rffi_platform.ConstantInteger('X509_ASN_ENCODING')
+ PKCS_7_ASN_ENCODING = rffi_platform.ConstantInteger('PKCS_7_ASN_ENCODING')
+
+ CERT_ENHKEY_USAGE = rffi_platform.Struct(
+ 'CERT_ENHKEY_USAGE', [('cUsageIdentifier', rwin32.DWORD),
+ ('rgpszUsageIdentifier', rffi.CCHARPP)])
+ CERT_CONTEXT = rffi_platform.Struct(
+ 'CERT_CONTEXT', [('pbCertEncoded', rffi.CCHARP),
+ ('cbCertEncoded', rwin32.DWORD),
+ ('dwCertEncodingType', rwin32.DWORD)])
+ CRL_CONTEXT = rffi_platform.Struct(
+ 'CRL_CONTEXT', [('pbCertEncoded', rffi.CCHARP),
+ ('cbCertEncoded', rwin32.DWORD),
+ ('dwCertEncodingType', rwin32.DWORD)])
+
+for k, v in rffi_platform.configure(CConfig).items():
+ globals()[k] = v
+
+PCERT_ENHKEY_USAGE = lltype.Ptr(CERT_ENHKEY_USAGE)
+PCCERT_CONTEXT = lltype.Ptr(CERT_CONTEXT)
+PCCRL_CONTEXT = lltype.Ptr(CRL_CONTEXT)
+
+def external(name, argtypes, restype, **kw):
+ kw['compilation_info'] = eci
+ kw['calling_conv'] = 'win'
+ return rffi.llexternal(
+ name, argtypes, restype, **kw)
+
+CertOpenSystemStore = external(
+ 'CertOpenSystemStoreA', [rffi.VOIDP, rffi.CCHARP], rwin32.HANDLE)
+CertCloseStore = external(
+ 'CertCloseStore', [rwin32.HANDLE, rwin32.DWORD], rwin32.BOOL)
+CertFreeCertificateContext = external(
+ 'CertFreeCertificateContext', [PCCERT_CONTEXT], rwin32.BOOL)
+CertGetEnhancedKeyUsage = external(
+ 'CertGetEnhancedKeyUsage', [PCCERT_CONTEXT, rwin32.DWORD,
+ PCERT_ENHKEY_USAGE, rwin32.LPDWORD], rffi.BOOL)
+CertEnumCertificatesInStore = external(
+ 'CertEnumCertificatesInStore',
+ [rffi.HANDLE, PCCERT_CONTEXT], PCCERT_CONTEXT)
+CertEnumCRLsInStore = external(
+ 'CertEnumCRLsInStore',
+ [rffi.HANDLE, PCCRLT_CONTEXT], PCCRL_CONTEXT)
+
+def w_certEncodingType(space, encodingType):
+ if encodingType == X509_ASN_ENCODING:
+ return space.wrap("x509_asn")
+ elif encodingType == PKCS_7_ASN_ENCODING:
+ return space.wrap("pkcs_7_asn")
+ else:
+ return space.wrap(encodingType)
+
+def w_parseKeyUsage(space, pCertCtx, flags):
+ with rffi.scoped_alloc(rwin32.LPDWORD.TO, 1) as size_ptr:
+ if not CertGetEnhancedKeyUsage(pCertCtx, flags, None, size_ptr):
+ last_error = rwin32.lastSavedWindowsError()
+ if last_error == CRYPT_E_NOT_FOUND:
+ return space.w_True
+ raise wrap_windowserror(WindowsError(last_error))
+
+ size = rffi.widen(size_ptr[0])
+ with rffi.scoped_alloc(rffi.CHARP, size) as buf:
+ usage = rffi.cast(PCERT_ENHKEY_USAGE, buf)
+ # Now get the actual enhanced usage property
+ if not CertGetEnhancedKeyUsage(pCertCtx, flags, usage, size_ptr):
+ last_error = rwin32.lastSavedWindowsError()
+ if last_error == CRYPT_E_NOT_FOUND:
+ return space.w_True
+ raise wrap_windowserror(WindowsError(last_error))
+
+ result_w = []
+ for i in range(usage.c_cUsageIdentifier):
+ if not usage.c_rgpszUsageIdentifier[i]:
+ continue
+ result_w.append(
+ space.wrap(rffi.charp2str(
+ usage.c_rgpszUsageIdentifier[i])))
+ return space.newset(result_w)
+
+@unwrap_spec(store_name=str)
+def enum_certificates_w(space, store_name):
+ """enum_certificates(store_name) -> []
+
+ Retrieve certificates from Windows' cert store. store_name may be one of
+ 'CA', 'ROOT' or 'MY'. The system may provide more cert storages, too.
+ The function returns a list of (bytes, encoding_type, trust) tuples. The
+ encoding_type flag can be interpreted with X509_ASN_ENCODING or
+ PKCS_7_ASN_ENCODING. The trust setting is either a set of OIDs or the
+ boolean True."""
+
+ result_w = []
+ pCertCtx = lltype.nullptr(PCCERT_CONTEXT)
+ hStore = CertOpenSystemStore(None, store_name)
+ if not hStore:
+ raise wrap_windowserror(rwin32.lastSavedWindowsError())
+ try:
+ while True:
+ pCertCtx = CertEnumCertificatesInStore(hStore, pCertCtx)
+ if not pCertCtx:
+ break
+ w_cert = space.wrapbytes(
+ rffi.charpsize2str(pCertCtx.c_pbCertEncoded,
+ pCertCtx.c_cbCertEncoded))
+ w_enc = w_certEncodingType(space, pCertCtx.c_dwCertEncodingType)
+ w_keyusage = w_parseKeyUsage(
+ space, pCertCtx, CERT_FIND_PROP_ONLY_ENHKEY_USAGE_FLAG)
+ if space.is_w(w_keyusage, space.w_True):
+ w_keyusage = w_parseKeyUsage(
+ space, pCertCtx, CERT_FIND_EXT_ONLY_ENHKEY_USAGE_FLAG)
+ result_w.append(space.newtuple([w_cert, w_enc, w_keyusage]))
+ finally:
+ if pCertCtx:
+ # loop ended with an error, need to clean up context manually
+ CertFreeCertificateContext(pCertCtx)
+ if not CertCloseStore(hStore, 0):
+ # This error case might shadow another exception.
+ raise wrap_windowserror(rwin32.lastSavedWindowsError())
+
+ return space.newlist(result_w)
+
+@unwrap_spec(store_name=str)
+def enum_crls_w(space, store_name):
+ """enum_crls(store_name) -> []
+
+ Retrieve CRLs from Windows' cert store. store_name may be one of
+ 'CA', 'ROOT' or 'MY'. The system may provide more cert storages, too.
+ The function returns a list of (bytes, encoding_type) tuples. The
+ encoding_type flag can be interpreted with X509_ASN_ENCODING or
+ PKCS_7_ASN_ENCODING."""
+ result_w = []
+ pCrlCtx = lltype.nullptr(PCCRL_CONTEXT)
+ hStore = CertOpenSystemStore(None, store_name)
+ if not hStore:
+ raise wrap_windowserror(rwin32.lastSavedWindowsError())
+ try:
+ while True:
+ pCrlCtx = CertEnumCRLsInStore(hStore, pCrlCtx)
+ if not pCrlCtx:
+ break
+ w_crl = space.wrapbytes(
+ rffi.charpsize2str(pCrlCtx.c_pbCrlEncoded,
+ pCrlCtx.c_cbCrlEncoded))
+ w_enc = w_certEncodingType(space, pCrlCtx.c_dwCertEncodingType)
+ result_w.append(space.newtuple([w_cert, w_enc]))
+ finally:
+ if pCrlCtx:
+ # loop ended with an error, need to clean up context manually
+ CertFreeCRLContext(pCrlCtx)
+ if not CertCloseStore(hStore, 0):
+ # This error case might shadow another exception.
+ raise wrap_windowserror(rwin32.lastSavedWindowsError())
diff --git a/pypy/module/_ssl/test/test_win32.py
b/pypy/module/_ssl/test/test_win32.py
new file mode 100644
--- /dev/null
+++ b/pypy/module/_ssl/test/test_win32.py
@@ -0,0 +1,53 @@
+import sys
+
+if sys.platform != 'win32':
+ from py.test import skip
+ skip("ssl.enum_certificates is only available on Windows")
+
+class AppTestWin32:
+ spaceconfig = dict(usemodules=('_ssl',))
+
+ def setup_class(cls):
+ if sys.platform != 'win32':
+ py.test.skip("pexpect not found")
+
+ def test_enum_certificates(self):
+ import _ssl
+ assert _ssl.enum_certificates("CA")
+ assert _ssl.enum_certificates("ROOT")
+
+ raises(TypeError, _ssl.enum_certificates)
+ raises(WindowsError, _ssl.enum_certificates, "")
+
+ trust_oids = set()
+ for storename in ("CA", "ROOT"):
+ store = _ssl.enum_certificates(storename)
+ assert isinstance(store, list)
+ for element in store:
+ assert isinstance(element, tuple)
+ assert len(element) == 3
+ cert, enc, trust = element
+ assert isinstance(cert, bytes)
+ assert enc in {"x509_asn", "pkcs_7_asn"}
+ assert isinstance(trust, (set, bool))
+ if isinstance(trust, set):
+ trust_oids.update(trust)
+
+ serverAuth = "1.3.6.1.5.5.7.3.1"
+ assert serverAuth in trust_oids
+
+ def test_enum_crls(self):
+ import _ssl
+ assert _ssl.enum_crls("CA")
+ raises(TypeError, _ssl.enum_crls)
+ raises(WindowsError, _ssl.enum_crls, "")
+
+ crls = _ssl.enum_crls("CA")
+ assert isinstance(crls, list)
+ for element in crls:
+ assert isinstance(element, tuple)
+ assert len(element) == 2
+ assert isinstance(element[0], bytes)
+ assert element[1] in {"x509_asn", "pkcs_7_asn"}
+
+
_______________________________________________
pypy-commit mailing list
[email protected]
https://mail.python.org/mailman/listinfo/pypy-commit