Author: Amaury Forgeot d'Arc <[email protected]>
Branch: stdlib-2.7.9
Changeset: r75789:13c3e6256e35
Date: 2015-02-09 23:34 +0100
http://bitbucket.org/pypy/pypy/changeset/13c3e6256e35/

Log:    ssl: add win32-specific functions. Completely blind translation,
        need to test on buildbot.

diff --git a/pypy/module/_ssl/__init__.py b/pypy/module/_ssl/__init__.py
--- a/pypy/module/_ssl/__init__.py
+++ b/pypy/module/_ssl/__init__.py
@@ -1,3 +1,4 @@
+import sys
 from rpython.rlib.rarithmetic import intmask
 from pypy.interpreter.mixedmodule import MixedModule
 from pypy.module._ssl import ssl_data
@@ -23,6 +24,10 @@
         '_SSLContext': 'interp_ssl._SSLContext',
     }
 
+    if sys.platform == 'win32':
+        interpleveldefs['enum_certificates'] = 
'interp_win32.enum_certificates_w'
+        interpleveldefs['enum_crls'] = 'interp_win32.enum_crls_w'
+
     appleveldefs = {
     }
 
diff --git a/pypy/module/_ssl/interp_win32.py b/pypy/module/_ssl/interp_win32.py
new file mode 100644
--- /dev/null
+++ b/pypy/module/_ssl/interp_win32.py
@@ -0,0 +1,166 @@
+from rpython.rlib import rwin32
+from rpython.rtyper.lltypesystem import rffi
+from rpython.rtyper.tool import rffi_platform
+from rpython.translator.tool.cbuild import ExternalCompilationInfo
+from pypy.interpreter.gateway import unwrap_spec
+from pypy.interpreter.error import wrap_windowserror
+
+eci = ExternalCompilationInfo(
+    includes = ['windows.h', 'stdio.h', 'stdlib.h'],
+    libraries = ['crypt32'],
+)
+
+class CConfig:
+    _compilation_info_ = eci
+
+    X509_ASN_ENCODING = rffi_platform.ConstantInteger('X509_ASN_ENCODING')
+    PKCS_7_ASN_ENCODING = rffi_platform.ConstantInteger('PKCS_7_ASN_ENCODING')
+
+    CERT_ENHKEY_USAGE = rffi_platform.Struct(
+        'CERT_ENHKEY_USAGE', [('cUsageIdentifier', rwin32.DWORD),
+                              ('rgpszUsageIdentifier', rffi.CCHARPP)])
+    CERT_CONTEXT = rffi_platform.Struct(
+        'CERT_CONTEXT', [('pbCertEncoded', rffi.CCHARP),
+                         ('cbCertEncoded', rwin32.DWORD),
+                         ('dwCertEncodingType', rwin32.DWORD)])
+    CRL_CONTEXT = rffi_platform.Struct(
+        'CRL_CONTEXT', [('pbCertEncoded', rffi.CCHARP),
+                        ('cbCertEncoded', rwin32.DWORD),
+                        ('dwCertEncodingType', rwin32.DWORD)])
+
+for k, v in rffi_platform.configure(CConfig).items():
+    globals()[k] = v
+
+PCERT_ENHKEY_USAGE = lltype.Ptr(CERT_ENHKEY_USAGE)
+PCCERT_CONTEXT = lltype.Ptr(CERT_CONTEXT)
+PCCRL_CONTEXT = lltype.Ptr(CRL_CONTEXT)
+
+def external(name, argtypes, restype, **kw):
+    kw['compilation_info'] = eci
+    kw['calling_conv'] = 'win'
+    return rffi.llexternal(
+        name, argtypes, restype, **kw)
+
+CertOpenSystemStore = external(
+    'CertOpenSystemStoreA', [rffi.VOIDP, rffi.CCHARP], rwin32.HANDLE)
+CertCloseStore = external(
+    'CertCloseStore', [rwin32.HANDLE, rwin32.DWORD], rwin32.BOOL)
+CertFreeCertificateContext = external(
+    'CertFreeCertificateContext', [PCCERT_CONTEXT], rwin32.BOOL)
+CertGetEnhancedKeyUsage = external(
+    'CertGetEnhancedKeyUsage', [PCCERT_CONTEXT, rwin32.DWORD,
+                                PCERT_ENHKEY_USAGE, rwin32.LPDWORD], rffi.BOOL)
+CertEnumCertificatesInStore = external(
+    'CertEnumCertificatesInStore',
+    [rffi.HANDLE, PCCERT_CONTEXT], PCCERT_CONTEXT)
+CertEnumCRLsInStore = external(
+    'CertEnumCRLsInStore',
+    [rffi.HANDLE, PCCRLT_CONTEXT], PCCRL_CONTEXT)
+
+def w_certEncodingType(space, encodingType):
+    if encodingType == X509_ASN_ENCODING:
+        return space.wrap("x509_asn")
+    elif encodingType == PKCS_7_ASN_ENCODING:
+        return space.wrap("pkcs_7_asn")
+    else:
+        return space.wrap(encodingType)
+
+def w_parseKeyUsage(space, pCertCtx, flags):
+    with rffi.scoped_alloc(rwin32.LPDWORD.TO, 1) as size_ptr:
+        if not CertGetEnhancedKeyUsage(pCertCtx, flags, None, size_ptr):
+            last_error = rwin32.lastSavedWindowsError()
+            if last_error == CRYPT_E_NOT_FOUND:
+                return space.w_True
+            raise wrap_windowserror(WindowsError(last_error))
+
+        size = rffi.widen(size_ptr[0])
+        with rffi.scoped_alloc(rffi.CHARP, size) as buf:
+            usage = rffi.cast(PCERT_ENHKEY_USAGE, buf)
+            # Now get the actual enhanced usage property
+            if not CertGetEnhancedKeyUsage(pCertCtx, flags, usage, size_ptr):
+                last_error = rwin32.lastSavedWindowsError()
+                if last_error == CRYPT_E_NOT_FOUND:
+                    return space.w_True
+                raise wrap_windowserror(WindowsError(last_error))
+
+            result_w = []
+            for i in range(usage.c_cUsageIdentifier):
+                if not usage.c_rgpszUsageIdentifier[i]:
+                    continue
+                result_w.append(
+                    space.wrap(rffi.charp2str(
+                        usage.c_rgpszUsageIdentifier[i])))
+            return space.newset(result_w)
+
+@unwrap_spec(store_name=str)
+def enum_certificates_w(space, store_name):
+    """enum_certificates(store_name) -> []
+
+    Retrieve certificates from Windows' cert store. store_name may be one of
+    'CA', 'ROOT' or 'MY'. The system may provide more cert storages, too.
+    The function returns a list of (bytes, encoding_type, trust) tuples. The
+    encoding_type flag can be interpreted with X509_ASN_ENCODING or
+    PKCS_7_ASN_ENCODING. The trust setting is either a set of OIDs or the
+    boolean True."""
+
+    result_w = []
+    pCertCtx = lltype.nullptr(PCCERT_CONTEXT)
+    hStore = CertOpenSystemStore(None, store_name)
+    if not hStore:
+        raise wrap_windowserror(rwin32.lastSavedWindowsError())
+    try:
+        while True:
+            pCertCtx = CertEnumCertificatesInStore(hStore, pCertCtx)
+            if not pCertCtx:
+                break
+            w_cert = space.wrapbytes(
+                rffi.charpsize2str(pCertCtx.c_pbCertEncoded,
+                                   pCertCtx.c_cbCertEncoded))
+            w_enc = w_certEncodingType(space, pCertCtx.c_dwCertEncodingType)
+            w_keyusage = w_parseKeyUsage(
+                space, pCertCtx, CERT_FIND_PROP_ONLY_ENHKEY_USAGE_FLAG)
+            if space.is_w(w_keyusage, space.w_True):
+                w_keyusage = w_parseKeyUsage(
+                    space, pCertCtx, CERT_FIND_EXT_ONLY_ENHKEY_USAGE_FLAG)
+            result_w.append(space.newtuple([w_cert, w_enc, w_keyusage]))
+    finally:
+        if pCertCtx:
+            # loop ended with an error, need to clean up context manually
+            CertFreeCertificateContext(pCertCtx)
+        if not CertCloseStore(hStore, 0):
+            # This error case might shadow another exception.
+            raise wrap_windowserror(rwin32.lastSavedWindowsError())
+
+    return space.newlist(result_w)
+
+@unwrap_spec(store_name=str)
+def enum_crls_w(space, store_name):
+    """enum_crls(store_name) -> []
+
+    Retrieve CRLs from Windows' cert store. store_name may be one of
+    'CA', 'ROOT' or 'MY'. The system may provide more cert storages, too.
+    The function returns a list of (bytes, encoding_type) tuples. The
+    encoding_type flag can be interpreted with X509_ASN_ENCODING or
+    PKCS_7_ASN_ENCODING."""
+    result_w = []
+    pCrlCtx = lltype.nullptr(PCCRL_CONTEXT)
+    hStore = CertOpenSystemStore(None, store_name)
+    if not hStore:
+        raise wrap_windowserror(rwin32.lastSavedWindowsError())
+    try:
+        while True:
+            pCrlCtx = CertEnumCRLsInStore(hStore, pCrlCtx)
+            if not pCrlCtx:
+                break
+            w_crl = space.wrapbytes(
+                rffi.charpsize2str(pCrlCtx.c_pbCrlEncoded,
+                                   pCrlCtx.c_cbCrlEncoded))
+            w_enc = w_certEncodingType(space, pCrlCtx.c_dwCertEncodingType)
+            result_w.append(space.newtuple([w_cert, w_enc]))
+    finally:
+        if pCrlCtx:
+            # loop ended with an error, need to clean up context manually
+            CertFreeCRLContext(pCrlCtx)
+        if not CertCloseStore(hStore, 0):
+            # This error case might shadow another exception.
+            raise wrap_windowserror(rwin32.lastSavedWindowsError())
diff --git a/pypy/module/_ssl/test/test_win32.py 
b/pypy/module/_ssl/test/test_win32.py
new file mode 100644
--- /dev/null
+++ b/pypy/module/_ssl/test/test_win32.py
@@ -0,0 +1,53 @@
+import sys
+
+if sys.platform != 'win32':
+    from py.test import skip
+    skip("ssl.enum_certificates is only available on Windows")
+
+class AppTestWin32:
+    spaceconfig = dict(usemodules=('_ssl',))
+
+    def setup_class(cls):
+        if sys.platform != 'win32':
+            py.test.skip("pexpect not found")
+
+    def test_enum_certificates(self):
+        import _ssl
+        assert _ssl.enum_certificates("CA")
+        assert _ssl.enum_certificates("ROOT")
+
+        raises(TypeError, _ssl.enum_certificates)
+        raises(WindowsError, _ssl.enum_certificates, "")
+
+        trust_oids = set()
+        for storename in ("CA", "ROOT"):
+            store = _ssl.enum_certificates(storename)
+            assert isinstance(store, list)
+            for element in store:
+                assert isinstance(element, tuple)
+                assert len(element) == 3
+                cert, enc, trust = element
+                assert isinstance(cert, bytes)
+                assert enc in {"x509_asn", "pkcs_7_asn"}
+                assert isinstance(trust, (set, bool))
+                if isinstance(trust, set):
+                    trust_oids.update(trust)
+
+        serverAuth = "1.3.6.1.5.5.7.3.1"
+        assert serverAuth in trust_oids
+
+    def test_enum_crls(self):
+        import _ssl
+        assert _ssl.enum_crls("CA")
+        raises(TypeError, _ssl.enum_crls)
+        raises(WindowsError, _ssl.enum_crls, "")
+
+        crls = _ssl.enum_crls("CA")
+        assert isinstance(crls, list)
+        for element in crls:
+            assert isinstance(element, tuple)
+            assert len(element) == 2
+            assert isinstance(element[0], bytes)
+            assert element[1] in {"x509_asn", "pkcs_7_asn"}
+
+
_______________________________________________
pypy-commit mailing list
[email protected]
https://mail.python.org/mailman/listinfo/pypy-commit

Reply via email to