The branch, master has been updated via 2dba2a31c27 python:tests/krb5: let create_trust() take {ingress,egress}_claims_tf_rules via b1348ad2885 python:tests/krb5: let create_trust() take forest_info via 322827f792b python:tests/krb5: let modified_ticket() to take modify_{tkt,enc}_fn via fabf0d1565a python:tests/krb5: add remove_pac_buffers() via bcd11579833 python:tests/krb5: set_pac_claims with claims=[] should be an empty blob via b78af644464 python:tests/krb5: let set_pac_sids() replace the requester_sid via a7349dd363f python:tests/krb5: add set_pac_names() to modify the names in a pac via 5ab87a840fc python:tests/krb5: give KerberosTicketCreds a basic __str__() function via 559bcd0c33d python:tests/krb5: let create_ccache[_with_ticket] use the correct crealm via ab8473dd418 python:tests/krb5: allow get_service_ticket() to fail with expected_status via 188da466378 python:tests/krb5: add KerberosTicketCreds.set_srealm() from 154875244c5 s3:testparm: make it clear that 'client use krb5 netlogon' is experimental
https://git.samba.org/?p=samba.git;a=shortlog;h=master - Log ----------------------------------------------------------------- commit 2dba2a31c27fc8779bc27d4095eca1948831b04d Author: Stefan Metzmacher <me...@samba.org> Date: Wed Feb 5 09:15:47 2025 +0100 python:tests/krb5: let create_trust() take {ingress,egress}_claims_tf_rules Signed-off-by: Stefan Metzmacher <me...@samba.org> Reviewed-by: Ralph Boehme <s...@samba.org> Autobuild-User(master): Ralph Böhme <s...@samba.org> Autobuild-Date(master): Mon Feb 24 10:28:02 UTC 2025 on atb-devel-224 commit b1348ad2885f40a7e7aa4000027bb267a18c981d Author: Stefan Metzmacher <me...@samba.org> Date: Tue Jan 7 20:14:49 2025 +0100 python:tests/krb5: let create_trust() take forest_info Signed-off-by: Stefan Metzmacher <me...@samba.org> Reviewed-by: Ralph Boehme <s...@samba.org> commit 322827f792b71fb66768fef222d5381e0436e133 Author: Stefan Metzmacher <me...@samba.org> Date: Tue Dec 3 16:46:31 2024 +0100 python:tests/krb5: let modified_ticket() to take modify_{tkt,enc}_fn This makes it possible modify the public ticket part well as the enc part. Signed-off-by: Stefan Metzmacher <me...@samba.org> Reviewed-by: Ralph Boehme <s...@samba.org> commit fabf0d1565ab7c9ab89f70ad98616c0161c4c4ff Author: Stefan Metzmacher <me...@samba.org> Date: Thu Feb 6 16:47:30 2025 +0100 python:tests/krb5: add remove_pac_buffers() Signed-off-by: Stefan Metzmacher <me...@samba.org> Reviewed-by: Ralph Boehme <s...@samba.org> commit bcd115798337b70c83e5df64c1653da1cdbbc483 Author: Stefan Metzmacher <me...@samba.org> Date: Thu Feb 6 16:46:44 2025 +0100 python:tests/krb5: set_pac_claims with claims=[] should be an empty blob Review with: git show -w Signed-off-by: Stefan Metzmacher <me...@samba.org> Reviewed-by: Ralph Boehme <s...@samba.org> commit b78af644464f283096a78f1ae0e747a03d700f1b Author: Stefan Metzmacher <me...@samba.org> Date: Tue Dec 3 12:50:18 2024 +0100 python:tests/krb5: let set_pac_sids() replace the requester_sid Signed-off-by: Stefan Metzmacher <me...@samba.org> Reviewed-by: Ralph Boehme <s...@samba.org> commit a7349dd363f392d7f6b9b90da634d2bf9880b9f7 Author: Stefan Metzmacher <me...@samba.org> Date: Tue Dec 3 12:03:21 2024 +0100 python:tests/krb5: add set_pac_names() to modify the names in a pac Signed-off-by: Stefan Metzmacher <me...@samba.org> Reviewed-by: Ralph Boehme <s...@samba.org> commit 5ab87a840fcbb15b95503bf89dda9148ea3d7efc Author: Stefan Metzmacher <me...@samba.org> Date: Tue Feb 18 12:13:26 2025 +0100 python:tests/krb5: give KerberosTicketCreds a basic __str__() function This makes debugging easier... Signed-off-by: Stefan Metzmacher <me...@samba.org> Reviewed-by: Ralph Boehme <s...@samba.org> commit 559bcd0c33d25973e0155b5a34811db6ea632a99 Author: Stefan Metzmacher <me...@samba.org> Date: Tue Feb 18 12:11:58 2025 +0100 python:tests/krb5: let create_ccache[_with_ticket] use the correct crealm It can be different from the servers realm. Signed-off-by: Stefan Metzmacher <me...@samba.org> Reviewed-by: Ralph Boehme <s...@samba.org> commit ab8473dd418ec6db6a09b7823d3806707b4c164f Author: Stefan Metzmacher <me...@samba.org> Date: Mon Dec 2 20:05:22 2024 +0100 python:tests/krb5: allow get_service_ticket() to fail with expected_status Signed-off-by: Stefan Metzmacher <me...@samba.org> Reviewed-by: Ralph Boehme <s...@samba.org> commit 188da46637898ea3dabebb241680bad7780eada4 Author: Stefan Metzmacher <me...@samba.org> Date: Mon Dec 2 19:59:57 2024 +0100 python:tests/krb5: add KerberosTicketCreds.set_srealm() Signed-off-by: Stefan Metzmacher <me...@samba.org> Reviewed-by: Ralph Boehme <s...@samba.org> ----------------------------------------------------------------------- Summary of changes: python/samba/tests/krb5/kdc_base_test.py | 248 ++++++++++++++++++++++++++++--- python/samba/tests/krb5/raw_testcase.py | 49 ++++-- 2 files changed, 265 insertions(+), 32 deletions(-) Changeset truncated at 500 lines: diff --git a/python/samba/tests/krb5/kdc_base_test.py b/python/samba/tests/krb5/kdc_base_test.py index dee6ef83071..20a004579a4 100644 --- a/python/samba/tests/krb5/kdc_base_test.py +++ b/python/samba/tests/krb5/kdc_base_test.py @@ -103,6 +103,10 @@ from samba.join import DCJoinContext from samba.ndr import ndr_pack, ndr_unpack from samba.param import LoadParm from samba.samdb import SamDB, dsdb_Dn +from samba.security import ( + claims_tf_policy_parse_rules, + claims_tf_policy_wrap_xml, +) rc4_bit = security.KERB_ENCTYPE_RC4_HMAC_MD5 aes256_sk_bit = security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96_SK @@ -120,6 +124,7 @@ from samba.tests.krb5.rfc4120_constants import ( AD_WIN2K_PAC, AES256_CTS_HMAC_SHA1_96, ARCFOUR_HMAC_MD5, + KDC_ERR_POLICY, KDC_ERR_PREAUTH_REQUIRED, KDC_ERR_TGT_REVOKED, KRB_AS_REP, @@ -205,6 +210,7 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest): cls.ldb_cleanups = [] cls._claim_types_dn = None + cls._claim_tf_policies_dn = None cls._authn_policy_config_dn = None cls._authn_policies_dn = None cls._authn_silos_dn = None @@ -249,6 +255,46 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest): # Return a copy of the DN. return ldb.Dn(samdb, str(self._claim_types_dn)) + def get_claim_tf_policies_dn(self): + samdb = self.get_samdb() + + if self._claim_tf_policies_dn is None: + claim_config_dn = samdb.get_config_basedn() + + claim_config_dn.add_child('CN=Claims Configuration,CN=Services') + details = { + 'dn': claim_config_dn, + 'objectClass': 'container', + } + try: + samdb.add(details) + except ldb.LdbError as err: + num, _ = err.args + if num != ldb.ERR_ENTRY_ALREADY_EXISTS: + raise + else: + self.accounts.append(str(claim_config_dn)) + + claim_tf_policies_dn = claim_config_dn + claim_tf_policies_dn.add_child('CN=Claims Transformation Policies') + details = { + 'dn': claim_tf_policies_dn, + 'objectClass': 'msDS-ClaimsTransformationPolicies', + } + try: + samdb.add(details) + except ldb.LdbError as err: + num, _ = err.args + if num != ldb.ERR_ENTRY_ALREADY_EXISTS: + raise + else: + self.accounts.append(str(claim_tf_policies_dn)) + + type(self)._claim_tf_policies_dn = claim_tf_policies_dn + + # Return a copy of the DN. + return ldb.Dn(samdb, str(self._claim_tf_policies_dn)) + def get_authn_policy_config_dn(self): samdb = self.get_samdb() @@ -908,8 +954,11 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest): def create_trust(self, trust_info, trust_enc_types=None, + forest_info=None, trust_incoming_password=None, trust_outgoing_password=None, + ingress_claims_tf_rules=None, + egress_claims_tf_rules=None, expect_error=None, preserve=True): """Create an trust account for testing. @@ -990,9 +1039,61 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest): lsa_conn.SetInformationTrustedDomain(tdo_handle, lsa.LSA_TRUSTED_DOMAIN_SUPPORTED_ENCRYPTION_TYPES, trust_enc_types) + trust_dns_nameL = lsa.StringLarge() + trust_dns_nameL.string = trust_info.domain_name.string + if isinstance(forest_info, lsa.ForestTrustInformation2): + local_forest_collision = \ + lsa_conn.lsaRSetForestTrustInformation2(lsa_policy, + trust_dns_nameL, + lsa.LSA_FOREST_TRUST_RECORD2_TYPE_LAST, + forest_info, + 0) + elif isinstance(forest_info, lsa.ForestTrustInformation): + local_forest_collision = \ + lsa_conn.lsaRSetForestTrustInformation(lsa_policy, + trust_dns_nameL, + lsa.LSA_FOREST_TRUST_RECORD_TYPE_LAST, + forest_info, + 0) samdb = self.get_samdb() + def claims_tf_policy_dn(name, rules): + xml_rules = claims_tf_policy_wrap_xml(rules) + rs = claims_tf_policy_parse_rules(xml_rules, strip_xml=True) + policy_dn = self.get_claim_tf_policies_dn() + policy_dn.add_child('CN=%s' % name) + details = { + 'dn': policy_dn, + 'objectClass': 'msDS-ClaimsTransformationPolicyType', + 'msDS-TransformationRules': xml_rules, + } + try: + samdb.add(details) + except ldb.LdbError as err: + num, _ = err.args + if num != ldb.ERR_ENTRY_ALREADY_EXISTS: + raise + raise + else: + self.accounts.append(str(policy_dn)) + + return policy_dn + + if ingress_claims_tf_rules is not None: + ingress_policy_dn = claims_tf_policy_dn("%s-Ingress" % + trust_dns_name.string, + ingress_claims_tf_rules) + else: + ingress_policy_dn = None + + if egress_claims_tf_rules is not None: + egress_policy_dn = claims_tf_policy_dn("%s-Egress" % + trust_dns_name.string, + egress_claims_tf_rules) + else: + egress_policy_dn = None + incoming_account_name = trust_info.netbios_name.string incoming_account_name += '$' incoming_nbt_domain = local_info.name.string @@ -1011,6 +1112,22 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest): self.assertEqual(len(tdo_res), 1) tdo_dn = tdo_res[0].dn + if ingress_policy_dn or egress_policy_dn: + msg = ldb.Message() + msg.dn = tdo_dn + if ingress_policy_dn: + msg['ingress'] = ldb.MessageElement( + str(ingress_policy_dn), + ldb.FLAG_MOD_REPLACE, + 'msDS-IngressClaimsTransformationPolicy') + if egress_policy_dn: + msg['egress'] = ldb.MessageElement( + str(egress_policy_dn), + ldb.FLAG_MOD_REPLACE, + 'msDS-EgressClaimsTransformationPolicy') + + samdb.modify(msg) + acct_search_filter = "(&(objectClass=user)(sAMAccountName=%s))" % ( incoming_account_name) acct_res = samdb.search(scope=ldb.SCOPE_SUBTREE, @@ -1905,6 +2022,7 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest): new_sids, domain_sid=None, user_rid=None, + requester_sid=None, set_user_flags=0, reset_user_flags=0): if domain_sid is None: @@ -2032,8 +2150,58 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest): upn_dns_info_ex.objectsid = security.dom_sid( f'{domain_sid}-{user_rid}') - # But don't replace the user's SID in the Requester SID buffer, or - # we'll get a SID mismatch. + elif pac_buffer.type == krb5pac.PAC_TYPE_REQUESTER_SID: + if requester_sid is not None: + pac_buffer.info.sid = requester_sid + + self.assertTrue(found_logon_info, 'no LOGON_INFO PAC buffer') + + pac.buffers = pac_buffers + + return pac + + # Replace the Names in a PAC. + def set_pac_names(self, + pac, + *, + account_name=False, + logon_server=False, + logon_domain=False, + logon_name=False, + upn_name=False, + logon_dns_domain=False, + samaccountname=False): + + found_logon_info = False + + pac_buffers = pac.buffers + for pac_buffer in pac_buffers: + # Find the LOGON_INFO PAC buffer. + if pac_buffer.type == krb5pac.PAC_TYPE_LOGON_INFO: + logon_info = pac_buffer.info.info + + if account_name is not False: + logon_info.info3.base.account_name.string = account_name + if logon_server is not False: + logon_info.info3.base.logon_server.string = logon_server + if logon_domain is not False: + logon_info.info3.base.logon_domain.string = logon_domain + + found_logon_info = True + + elif pac_buffer.type == krb5pac.PAC_TYPE_LOGON_NAME: + if logon_name is not False: + pac_buffer.info.account_name = logon_name + + elif pac_buffer.type == krb5pac.PAC_TYPE_UPN_DNS_INFO: + upn_dns_info = pac_buffer.info + + if upn_name is not False: + upn_dns_info.upn_name = upn_name + if logon_dns_domain is not False: + upn_dns_info.dns_domain_name = logon_dns_domain + if samaccountname is not False: + upn_dns_info.ex.samaccountname = samaccountname self.assertTrue(found_logon_info, 'no LOGON_INFO PAC buffer') @@ -2243,26 +2411,31 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest): claims_arrays.append(claims_array) - claims_set = claims.CLAIMS_SET() - claims_set.claims_arrays = claims_arrays - claims_set.claims_array_count = len(claims_arrays) + if len(claims_arrays) != 0: + claims_set = claims.CLAIMS_SET() + claims_set.claims_arrays = claims_arrays + claims_set.claims_array_count = len(claims_arrays) - claims_ctr = claims.CLAIMS_SET_CTR() - claims_ctr.claims = claims_set + claims_ctr = claims.CLAIMS_SET_CTR() + claims_ctr.claims = claims_set - claims_ndr = claims.CLAIMS_SET_NDR() - claims_ndr.claims = claims_ctr + claims_ndr = claims.CLAIMS_SET_NDR() + claims_ndr.claims = claims_ctr - metadata = claims.CLAIMS_SET_METADATA() - metadata.claims_set = claims_ndr - metadata.compression_format = ( - claims.CLAIMS_COMPRESSION_FORMAT_XPRESS_HUFF) + metadata = claims.CLAIMS_SET_METADATA() + metadata.claims_set = claims_ndr + metadata.compression_format = ( + claims.CLAIMS_COMPRESSION_FORMAT_XPRESS_HUFF) - metadata_ctr = claims.CLAIMS_SET_METADATA_CTR() - metadata_ctr.metadata = metadata + metadata_ctr = claims.CLAIMS_SET_METADATA_CTR() + metadata_ctr.metadata = metadata - metadata_ndr = claims.CLAIMS_SET_METADATA_NDR() - metadata_ndr.claims = metadata_ctr + metadata_ndr = claims.CLAIMS_SET_METADATA_NDR() + metadata_ndr.claims = metadata_ctr + + claims_buffer = ndr_pack(metadata_ndr) + else: + claims_buffer = b'' pac_buffers = pac.buffers for pac_buffer in pac_buffers: @@ -2275,7 +2448,7 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest): pac_buffers.append(pac_buffer) - pac_buffer.info.remaining = ndr_pack(metadata_ndr) + pac_buffer.info.remaining = claims_buffer pac.buffers = pac_buffers pac.num_buffers = len(pac_buffers) @@ -2305,6 +2478,19 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest): return pac + def remove_pac_buffers(self, pac, *, pac_buffer_type=None): + + old_buffers = pac.buffers + new_buffers = [] + for pac_buffer in old_buffers: + if pac_buffer.type != pac_buffer_type: + new_buffers.append(pac_buffer) + + pac.buffers = new_buffers + pac.num_buffers = len(new_buffers) + + return pac + def get_cached_creds(self, *, account_type: AccountType, opts: Optional[dict]=None, @@ -3249,6 +3435,7 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest): sname=None, target_name=None, till=None, rc4_support=True, to_rodc=False, kdc_options=None, + expected_status=None, expected_flags=None, unexpected_flags=None, expected_groups=None, unexpected_groups=None, @@ -3287,7 +3474,7 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest): expect_pac_attrs, expect_pac_attrs_pac_request) - if not fresh: + if not fresh and expected_status is None: ticket = self.tkt_cache.get(cache_key) if ticket is not None: @@ -3309,6 +3496,15 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest): decryption_key = self.TicketDecryptionKey_from_creds(target_creds) + if expected_status is not None: + check_rep_fn = None + check_error_fn = self.generic_check_kdc_error + expected_error_mode = KDC_ERR_POLICY + else: + check_rep_fn = self.generic_check_kdc_rep + check_error_fn = None + expected_error_mode = 0 + kdc_exchange_dict = self.tgs_exchange_dict( expected_crealm=tgt.crealm, expected_cname=tgt.cname, @@ -3327,7 +3523,10 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest): unexpected_device_claims=unexpected_device_claims, ticket_decryption_key=decryption_key, expect_ticket_kvno=(not expect_krbtgt_referral), - check_rep_fn=self.generic_check_kdc_rep, + check_rep_fn=check_rep_fn, + check_error_fn=check_error_fn, + expected_error_mode=expected_error_mode, + expected_status=expected_status, check_kdc_private_fn=self.generic_check_kdc_private, tgt=tgt, authenticator_subkey=authenticator_subkey, @@ -3346,6 +3545,9 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest): sname=sname, till_time=till, etypes=etype) + if expected_status is not None: + self.check_error_rep(rep, expected_error_mode) + return None self.check_tgs_reply(rep) service_ticket_creds = kdc_exchange_dict['rep_ticket_creds'] @@ -3766,7 +3968,7 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest): msg[name] = ldb.MessageElement([], flag, name) samdb.modify(msg) - def create_ccache(self, cname, ticket, enc_part): + def create_ccache(self, crealm, cname, ticket, enc_part): """ Lay out a version 4 on-disk credentials cache, to be read using the FILE: protocol. """ @@ -3791,7 +3993,7 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest): cprincipal = krb5ccache.PRINCIPAL() cprincipal.name_type = cname['name-type'] cprincipal.component_count = len(cname_string) - cprincipal.realm = ticket['realm'] + cprincipal.realm = crealm cprincipal.components = cname_string sname = ticket['sname'] @@ -3884,7 +4086,7 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest): # Write the ticket into a credentials cache file that can be ingested # by the main credentials code. - cachefile = self.create_ccache(cname, ticket.ticket, + cachefile = self.create_ccache(realm, cname, ticket.ticket, ticket.encpart_private) # Create a credentials object to reference the credentials cache. diff --git a/python/samba/tests/krb5/raw_testcase.py b/python/samba/tests/krb5/raw_testcase.py index c014c18c846..df49652a725 100644 --- a/python/samba/tests/krb5/raw_testcase.py +++ b/python/samba/tests/krb5/raw_testcase.py @@ -47,6 +47,7 @@ import pyasn1.type.univ from pyasn1.error import PyAsn1Error from samba import unix2nttime +from samba.common import get_string from samba.credentials import Credentials from samba.dcerpc import claims, krb5pac, netlogon, samr, security, krb5ccache from samba.gensec import FEATURE_SEAL @@ -701,6 +702,13 @@ class KerberosTicketCreds: self.ticket['sname'] = sname self.sname = sname + def set_srealm(self, srealm): + self.ticket['realm'] = srealm + self.srealm = srealm + + def __str__(self): + return "KerberosTicketCreds(crealm=%s, cname=%s, srealm=%s, sname=%s)" %( + self.crealm, self.cname, self.srealm, self.sname) class PkInit(Enum): NOT_USED = object() @@ -5857,13 +5865,20 @@ class RawKerberosTest(TestCase): def modified_ticket(self, ticket, *, new_ticket_key=None, - modify_fn=None, + modify_fn=None, # legacy version of modify_enc_fn + modify_tkt_fn=None, + modify_enc_fn=None, modify_pac_fn=None, exclude_pac=False, allow_empty_authdata=False, update_pac_checksums=None, checksum_keys=None, include_checksums=None): + if modify_enc_fn is not None: + self.assertIsNone(modify_fn) + else: + modify_enc_fn = modify_fn + if checksum_keys is None: # A dict containing a key for each checksum type to be created in # the PAC. @@ -5937,10 +5952,10 @@ class RawKerberosTest(TestCase): enc_part, asn1Spec=krb5_asn1.EncTicketPart()) # Modify the ticket here. - if callable(modify_fn): - enc_part = modify_fn(enc_part) - elif modify_fn: - for fn in modify_fn: + if callable(modify_enc_fn): + enc_part = modify_enc_fn(enc_part) + elif modify_enc_fn: + for fn in modify_enc_fn: enc_part = fn(enc_part) auth_data = enc_part.get('authorization-data') @@ -6012,13 +6027,29 @@ class RawKerberosTest(TestCase): new_ticket = ticket.ticket.copy() new_ticket['enc-part'] = enc_part_new + # Modify the ticket here. + if callable(modify_tkt_fn): + new_ticket = modify_tkt_fn(new_ticket) + elif modify_tkt_fn: + for fn in modify_tkt_fn: + new_ticket = fn(new_ticket) + + crealm = get_string(enc_part['crealm']) + cname = dict(enc_part['cname']) + for i in range(0, len(cname['name-string'])): + cname['name-string'][i] = get_string(cname['name-string'][i]) + srealm = get_string(new_ticket['realm']) + sname = dict(new_ticket['sname']) + for i in range(0, len(sname['name-string'])): + sname['name-string'][i] = get_string(sname['name-string'][i]) + new_ticket_creds = KerberosTicketCreds( new_ticket, session_key=ticket.session_key, - crealm=ticket.crealm, - cname=ticket.cname, - srealm=ticket.srealm, - sname=ticket.sname, + crealm=crealm, + cname=cname, + srealm=srealm, + sname=sname, decryption_key=new_ticket_key, ticket_private=enc_part, encpart_private=ticket.encpart_private) -- Samba Shared Repository