The branch, master has been updated via 6bb89aa s4-tests: Added a speedtest for LDAP search operations with different accounts. via aab37c3 s4-tests: Added tests for LDAP add/delete/modify using anonymous login. via a53f09b s4-dsdb: Fixed incorrect LDAP return code when anonymous login is used. from b3630b4 Fix bug 7866 - "net" in v3-6-test broken.
http://gitweb.samba.org/?p=samba.git;a=shortlog;h=master - Log ----------------------------------------------------------------- commit 6bb89aaa0d38d59ce4f0d9362822ba1c525eb203 Author: Nadezhda Ivanova <nivan...@samba.org> Date: Wed Dec 15 21:29:53 2010 +0200 s4-tests: Added a speedtest for LDAP search operations with different accounts. Autobuild-User: Nadezhda Ivanova <nivan...@samba.org> Autobuild-Date: Wed Dec 15 21:32:09 CET 2010 on sn-devel-104 commit aab37c314671f9ad712ab03b1b1c2e6688df772d Author: Nadezhda Ivanova <nivan...@samba.org> Date: Wed Dec 15 21:28:59 2010 +0200 s4-tests: Added tests for LDAP add/delete/modify using anonymous login. commit a53f09b9312fc08d4cdb2d94ec9119ee29b1bf84 Author: Nadezhda Ivanova <nivan...@samba.org> Date: Wed Dec 15 21:28:12 2010 +0200 s4-dsdb: Fixed incorrect LDAP return code when anonymous login is used. ----------------------------------------------------------------------- Summary of changes: source4/dsdb/samdb/ldb_modules/rootdse.c | 2 +- source4/dsdb/tests/python/acl.py | 58 +++++++++++++++++++--- source4/scripting/devel/speedtest.py | 78 +++++++++++++++++++++++++---- 3 files changed, 118 insertions(+), 20 deletions(-) Changeset truncated at 500 lines: diff --git a/source4/dsdb/samdb/ldb_modules/rootdse.c b/source4/dsdb/samdb/ldb_modules/rootdse.c index e7ea765..2571bc3 100644 --- a/source4/dsdb/samdb/ldb_modules/rootdse.c +++ b/source4/dsdb/samdb/ldb_modules/rootdse.c @@ -641,7 +641,7 @@ static int rootdse_filter_operations(struct ldb_module *module, struct ldb_reque } } ldb_set_errstring(ldb_module_get_ctx(module), "Operation unavailable without authentication"); - return LDB_ERR_STRONG_AUTH_REQUIRED; + return LDB_ERR_OPERATIONS_ERROR; } static int rootdse_search(struct ldb_module *module, struct ldb_request *req) diff --git a/source4/dsdb/tests/python/acl.py b/source4/dsdb/tests/python/acl.py index 85018b0..12f653b 100755 --- a/source4/dsdb/tests/python/acl.py +++ b/source4/dsdb/tests/python/acl.py @@ -6,7 +6,6 @@ import optparse import sys import base64 import re - sys.path.append("bin/python") import samba samba.ensure_external_module("testtools", "testtools") @@ -20,7 +19,7 @@ from ldb import ( from ldb import ERR_CONSTRAINT_VIOLATION from ldb import ERR_OPERATIONS_ERROR from ldb import Message, MessageElement, Dn -from ldb import FLAG_MOD_REPLACE, FLAG_MOD_DELETE +from ldb import FLAG_MOD_REPLACE, FLAG_MOD_ADD, FLAG_MOD_DELETE from samba.ndr import ndr_pack, ndr_unpack from samba.dcerpc import security @@ -67,6 +66,13 @@ class AclTests(samba.tests.TestCase): self.user_pass = "samba123@" self.configuration_dn = self.ldb_admin.get_config_basedn().get_linearized() self.sd_utils = sd_utils.SDUtils(ldb) + #used for anonymous login + self.creds_tmp = Credentials() + self.creds_tmp.set_username("") + self.creds_tmp.set_password("") + self.creds_tmp.set_domain(creds.get_domain()) + self.creds_tmp.set_realm(creds.get_realm()) + self.creds_tmp.set_workstation(creds.get_workstation()) print "baseDN: %s" % self.base_dn def get_user_dn(self, name): @@ -134,6 +140,7 @@ class AclAddTests(AclTests): delete_force(self.ldb_admin, self.get_user_dn(self.usr_admin_owner)) delete_force(self.ldb_admin, self.get_user_dn(self.usr_admin_not_owner)) delete_force(self.ldb_admin, self.get_user_dn(self.regular_user)) + delete_force(self.ldb_admin, self.get_user_dn("test_add_anonymous")) # Make sure top OU is deleted (and so everything under it) def assert_top_ou_deleted(self): @@ -229,6 +236,16 @@ class AclAddTests(AclTests): expression="(distinguishedName=%s,%s)" % ("CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn)) self.assertTrue(len(res) > 0) + def test_add_anonymous(self): + """Test add operation with anonymous user""" + anonymous = SamDB(url=host, credentials=self.creds_tmp, lp=lp) + try: + anonymous.newuser("test_add_anonymous", self.user_pass) + except LdbError, (num, _): + self.assertEquals(num, ERR_OPERATIONS_ERROR) + else: + self.fail() + #tests on ldap modify operations class AclModifyTests(AclTests): @@ -259,6 +276,7 @@ class AclModifyTests(AclTests): delete_force(self.ldb_admin, self.get_user_dn(self.user_with_sm)) delete_force(self.ldb_admin, self.get_user_dn(self.user_with_group_sm)) delete_force(self.ldb_admin, self.get_user_dn("test_modify_user2")) + delete_force(self.ldb_admin, self.get_user_dn("test_anonymous")) def test_modify_u1(self): """5 Modify one attribute if you have DS_WRITE_PROPERTY for it""" @@ -554,6 +572,23 @@ Member: CN=test_modify_user2,CN=Users,""" + self.base_dn % ("CN=test_modify_group2,CN=Users," + self.base_dn), attrs=["Member"]) self.assertEqual(res[0]["Member"][0], "CN=test_modify_user2,CN=Users," + self.base_dn) + def test_modify_anonymous(self): + """Test add operation with anonymous user""" + anonymous = SamDB(url=host, credentials=self.creds_tmp, lp=lp) + self.ldb_admin.newuser("test_anonymous", "samba123@") + m = Message() + m.dn = Dn(anonymous, self.get_user_dn("test_anonymous")) + + m["description"] = MessageElement("sambauser2", + FLAG_MOD_ADD, + "description") + try: + anonymous.modify(m) + except LdbError, (num, _): + self.assertEquals(num, ERR_OPERATIONS_ERROR) + else: + self.fail() + #enable these when we have search implemented class AclSearchTests(AclTests): @@ -563,12 +598,6 @@ class AclSearchTests(AclTests): self.u2 = "search_u2" self.u3 = "search_u3" self.group1 = "group1" - self.creds_tmp = Credentials() - self.creds_tmp.set_username("") - self.creds_tmp.set_password("") - self.creds_tmp.set_domain(creds.get_domain()) - self.creds_tmp.set_realm(creds.get_realm()) - self.creds_tmp.set_workstation(creds.get_workstation()) self.ldb_admin.newuser(self.u1, self.user_pass) self.ldb_admin.newuser(self.u2, self.user_pass) self.ldb_admin.newuser(self.u3, self.user_pass) @@ -926,6 +955,7 @@ class AclDeleteTests(AclTests): super(AclDeleteTests, self).tearDown() delete_force(self.ldb_admin, self.get_user_dn("test_delete_user1")) delete_force(self.ldb_admin, self.get_user_dn(self.regular_user)) + delete_force(self.ldb_admin, self.get_user_dn("test_anonymous")) def test_delete_u1(self): """User is prohibited by default to delete another User object""" @@ -965,6 +995,18 @@ class AclDeleteTests(AclTests): expression="(distinguishedName=%s)" % user_dn) self.assertEqual(res, []) + def test_delete_anonymous(self): + """Test add operation with anonymous user""" + anonymous = SamDB(url=host, credentials=self.creds_tmp, lp=lp) + self.ldb_admin.newuser("test_anonymous", "samba123@") + + try: + anonymous.delete(self.get_user_dn("test_anonymous")) + except LdbError, (num, _): + self.assertEquals(num, ERR_OPERATIONS_ERROR) + else: + self.fail() + #tests on ldap rename operations class AclRenameTests(AclTests): diff --git a/source4/scripting/devel/speedtest.py b/source4/scripting/devel/speedtest.py index 891a741..a7adfba 100755 --- a/source4/scripting/devel/speedtest.py +++ b/source4/scripting/devel/speedtest.py @@ -42,7 +42,7 @@ from samba.ndr import ndr_pack, ndr_unpack from samba.dcerpc import security from samba.auth import system_session -from samba import gensec +from samba import gensec, sd_utils from samba.samdb import SamDB from samba.credentials import Credentials import samba.tests @@ -77,12 +77,6 @@ creds.set_gensec_features(creds.get_gensec_features() | gensec.FEATURE_SEAL) class SpeedTest(samba.tests.TestCase): - def find_basedn(self, ldb): - res = ldb.search(base="", expression="", scope=SCOPE_BASE, - attrs=["defaultNamingContext"]) - self.assertEquals(len(res), 1) - return res[0]["defaultNamingContext"][0] - def find_domain_sid(self, ldb): res = ldb.search(base=self.base_dn, expression="(objectClass=*)", scope=SCOPE_BASE) return ndr_unpack(security.dom_sid,res[0]["objectSid"][0]) @@ -90,8 +84,8 @@ class SpeedTest(samba.tests.TestCase): def setUp(self): super(SpeedTest, self).setUp() self.ldb_admin = ldb - self.base_dn = self.find_basedn(self.ldb_admin) - self.domain_sid = self.find_domain_sid(self.ldb_admin) + self.base_dn = ldb.domain_dn() + self.domain_sid = security.dom_sid(ldb.get_domain_sid()) self.user_pass = "samba123@" print "baseDN: %s" % self.base_dn @@ -129,6 +123,11 @@ url: www.example.com for dn in dn_list: delete_force(self.ldb_admin, dn) +class SpeedTestAddDel(SpeedTest): + + def setUp(self): + super(SpeedTestAddDel, self).setUp() + def run_bundle(self, num): print "\n=== Test ADD/DEL %s user objects ===\n" % num avg_add = Decimal("0.0") @@ -169,6 +168,62 @@ url: www.example.com """ self.run_bundle(10000) +class AclSearchSpeedTest(SpeedTest): + + def setUp(self): + super(AclSearchSpeedTest, self).setUp() + self.ldb_admin.newuser("acltestuser", "samba123@") + self.sd_utils = sd_utils.SDUtils(self.ldb_admin) + self.ldb_user = self.get_ldb_connection("acltestuser", "samba123@") + self.user_sid = self.sd_utils.get_object_sid(self.get_user_dn("acltestuser")) + + def tearDown(self): + super(AclSearchSpeedTest, self).tearDown() + delete_force(self.ldb_admin, self.get_user_dn("acltestuser")) + + def run_search_bundle(self, num, _ldb): + print "\n=== Creating %s user objects ===\n" % num + self.create_bundle(num) + mod = "(A;;LC;;;%s)(D;;RP;;;%s)" % (str(self.user_sid), str(self.user_sid)) + for i in range(num): + self.sd_utils.dacl_add_ace("cn=speedtestuser%d,cn=Users,%s" % + (i+1, self.base_dn), mod) + print "\n=== %s user objects created ===\n" % num + print "\n=== Test search on %s user objects ===\n" % num + avg_search = Decimal("0.0") + for x in [1, 2, 3]: + start = time.time() + res = _ldb.search(base=self.base_dn, expression="(objectClass=*)", scope=SCOPE_SUBTREE) + res_search = Decimal( str(time.time() - start) ) + avg_search += res_search + print " Attempt %s SEARCH: %.3fs" % ( x, float(res_search) ) + print "Average Search: %.3fs" % float( Decimal(avg_search) / Decimal("3.0") ) + self.remove_bundle(num) + + def get_user_dn(self, name): + return "CN=%s,CN=Users,%s" % (name, self.base_dn) + + def get_ldb_connection(self, target_username, target_password): + creds_tmp = Credentials() + creds_tmp.set_username(target_username) + creds_tmp.set_password(target_password) + creds_tmp.set_domain(creds.get_domain()) + creds_tmp.set_realm(creds.get_realm()) + creds_tmp.set_workstation(creds.get_workstation()) + creds_tmp.set_gensec_features(creds_tmp.get_gensec_features() + | gensec.FEATURE_SEAL) + ldb_target = SamDB(url=host, credentials=creds_tmp, lp=lp) + return ldb_target + + def test_search_01000(self): + self.run_search_bundle(1000, self.ldb_admin) + + def test_search2_01000(self): + # allow the user to see objects but not attributes, all attributes will be filtered out + mod = "(A;;LC;;;%s)(D;;RP;;;%s)" % (str(self.user_sid), str(self.user_sid)) + self.sd_utils.dacl_add_ace("CN=Users,%s" % self.base_dn, mod) + self.run_search_bundle(1000, self.ldb_user) + # Important unit running information if not "://" in host: @@ -179,7 +234,8 @@ ldb = SamDB(host, credentials=creds, session_info=system_session(), lp=lp, optio runner = SubunitTestRunner() rc = 0 -if not runner.run(unittest.makeSuite(SpeedTest)).wasSuccessful(): +if not runner.run(unittest.makeSuite(SpeedTestAddDel)).wasSuccessful(): + rc = 1 +if not runner.run(unittest.makeSuite(AclSearchSpeedTest)).wasSuccessful(): rc = 1 - sys.exit(rc) -- Samba Shared Repository