This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 61ba4e7 [bookkeeper] allow configuring region aware placement related settings (#5100) 61ba4e7 is described below commit 61ba4e746f414fd43c301bbaa824d61f51096244 Author: Sijie Guo <si...@apache.org> AuthorDate: Mon Sep 16 03:55:19 2019 -0700 [bookkeeper] allow configuring region aware placement related settings (#5100) *Motivation* We introduced settings to allow user configuring whether to enable rack-aware or region-aware placement policies. However the settings for region-aware placement policy is not configurable. This pull request enables configuring region-aware placement related settings. --- .../pulsar/broker/BookKeeperClientFactoryImpl.java | 44 +++++- .../broker/BookKeeperClientFactoryImplTest.java | 150 +++++++++++++++++++++ 2 files changed, 187 insertions(+), 7 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java index 3ba65c3..31c95b5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java @@ -18,6 +18,12 @@ */ package org.apache.pulsar.broker; +import static org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl.REPP_DNS_RESOLVER_CLASS; +import static org.apache.bookkeeper.client.RegionAwareEnsemblePlacementPolicy.REPP_ENABLE_DURABILITY_ENFORCEMENT_IN_REPLACE; +import static org.apache.bookkeeper.client.RegionAwareEnsemblePlacementPolicy.REPP_ENABLE_VALIDATION; +import static org.apache.bookkeeper.client.RegionAwareEnsemblePlacementPolicy.REPP_MINIMUM_REGIONS_FOR_DURABILITY; +import static org.apache.bookkeeper.client.RegionAwareEnsemblePlacementPolicy.REPP_REGIONS_TO_WRITE; + import java.io.IOException; import java.util.Map; import java.util.Optional; @@ -54,7 +60,7 @@ public class BookKeeperClientFactoryImpl implements BookKeeperClientFactory { if (ensemblePlacementPolicyClass.isPresent()) { setEnsemblePlacementPolicy(bkConf, conf, zkClient, ensemblePlacementPolicyClass.get()); } else { - setDefaultEnsemblePlacementPolicy(bkConf, conf, zkClient); + setDefaultEnsemblePlacementPolicy(rackawarePolicyZkCache, clientIsolationZkCache, bkConf, conf, zkClient); } try { return BookKeeper.forConfig(bkConf) @@ -110,16 +116,40 @@ public class BookKeeperClientFactoryImpl implements BookKeeperClientFactory { return bkConf; } - private void setDefaultEnsemblePlacementPolicy(ClientConfiguration bkConf, ServiceConfiguration conf, - ZooKeeper zkClient) { + public static void setDefaultEnsemblePlacementPolicy( + AtomicReference<ZooKeeperCache> rackawarePolicyZkCache, + AtomicReference<ZooKeeperCache> clientIsolationZkCache, + ClientConfiguration bkConf, + ServiceConfiguration conf, + ZooKeeper zkClient + ) { if (conf.isBookkeeperClientRackawarePolicyEnabled() || conf.isBookkeeperClientRegionawarePolicyEnabled()) { if (conf.isBookkeeperClientRegionawarePolicyEnabled()) { bkConf.setEnsemblePlacementPolicy(RegionAwareEnsemblePlacementPolicy.class); + + bkConf.setProperty( + REPP_ENABLE_VALIDATION, + conf.getProperties().getProperty(REPP_ENABLE_VALIDATION, "true") + ); + bkConf.setProperty( + REPP_REGIONS_TO_WRITE, + conf.getProperties().getProperty(REPP_REGIONS_TO_WRITE, null) + ); + bkConf.setProperty( + REPP_MINIMUM_REGIONS_FOR_DURABILITY, + conf.getProperties().getProperty(REPP_MINIMUM_REGIONS_FOR_DURABILITY, "2") + ); + bkConf.setProperty( + REPP_ENABLE_DURABILITY_ENFORCEMENT_IN_REPLACE, + conf.getProperties().getProperty(REPP_ENABLE_DURABILITY_ENFORCEMENT_IN_REPLACE, "true") + ); } else { bkConf.setEnsemblePlacementPolicy(RackawareEnsemblePlacementPolicy.class); } - bkConf.setProperty(RackawareEnsemblePlacementPolicy.REPP_DNS_RESOLVER_CLASS, - ZkBookieRackAffinityMapping.class.getName()); + bkConf.setProperty(REPP_DNS_RESOLVER_CLASS, + conf.getProperties().getProperty( + REPP_DNS_RESOLVER_CLASS, + ZkBookieRackAffinityMapping.class.getName())); ZooKeeperCache zkc = new ZooKeeperCache(zkClient, conf.getZooKeeperOperationTimeoutSeconds()) { }; @@ -127,7 +157,7 @@ public class BookKeeperClientFactoryImpl implements BookKeeperClientFactory { zkc.stop(); } - bkConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, this.rackawarePolicyZkCache.get()); + bkConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, rackawarePolicyZkCache.get()); } if (conf.getBookkeeperClientIsolationGroups() != null && !conf.getBookkeeperClientIsolationGroups().isEmpty()) { @@ -143,7 +173,7 @@ public class BookKeeperClientFactoryImpl implements BookKeeperClientFactory { if (!clientIsolationZkCache.compareAndSet(null, zkc)) { zkc.stop(); } - bkConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, this.clientIsolationZkCache.get()); + bkConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, clientIsolationZkCache.get()); } } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BookKeeperClientFactoryImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BookKeeperClientFactoryImplTest.java new file mode 100644 index 0000000..cb216bf --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BookKeeperClientFactoryImplTest.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker; + +import static org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl.REPP_DNS_RESOLVER_CLASS; +import static org.apache.bookkeeper.client.RegionAwareEnsemblePlacementPolicy.REPP_ENABLE_DURABILITY_ENFORCEMENT_IN_REPLACE; +import static org.apache.bookkeeper.client.RegionAwareEnsemblePlacementPolicy.REPP_ENABLE_VALIDATION; +import static org.apache.bookkeeper.client.RegionAwareEnsemblePlacementPolicy.REPP_MINIMUM_REGIONS_FOR_DURABILITY; +import static org.apache.bookkeeper.client.RegionAwareEnsemblePlacementPolicy.REPP_REGIONS_TO_WRITE; +import static org.mockito.Mockito.mock; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; + +import java.util.concurrent.atomic.AtomicReference; +import org.apache.bookkeeper.conf.ClientConfiguration; +import org.apache.bookkeeper.net.CachedDNSToSwitchMapping; +import org.apache.pulsar.zookeeper.ZkBookieRackAffinityMapping; +import org.apache.pulsar.zookeeper.ZooKeeperCache; +import org.apache.zookeeper.ZooKeeper; +import org.testng.annotations.Test; + +/** + * Unit test {@link BookKeeperClientFactoryImpl}. + */ +public class BookKeeperClientFactoryImplTest { + + @Test + public void testSetDefaultEnsemblePlacementPolicyRackAwareDisabled() { + AtomicReference<ZooKeeperCache> rackawarePolicyZkCache = new AtomicReference<>(); + AtomicReference<ZooKeeperCache> clientIsolationZkCache = new AtomicReference<>(); + ClientConfiguration bkConf = new ClientConfiguration(); + ServiceConfiguration conf = new ServiceConfiguration(); + ZooKeeper zkClient = mock(ZooKeeper.class); + + assertNull(bkConf.getProperty(REPP_ENABLE_VALIDATION)); + assertNull(bkConf.getProperty(REPP_REGIONS_TO_WRITE)); + assertNull(bkConf.getProperty(REPP_MINIMUM_REGIONS_FOR_DURABILITY)); + assertNull(bkConf.getProperty(REPP_ENABLE_DURABILITY_ENFORCEMENT_IN_REPLACE)); + assertNull(bkConf.getProperty(REPP_DNS_RESOLVER_CLASS)); + + BookKeeperClientFactoryImpl.setDefaultEnsemblePlacementPolicy( + rackawarePolicyZkCache, + clientIsolationZkCache, + bkConf, + conf, + zkClient + ); + + assertNull(bkConf.getProperty(REPP_ENABLE_VALIDATION)); + assertNull(bkConf.getProperty(REPP_REGIONS_TO_WRITE)); + assertNull(bkConf.getProperty(REPP_MINIMUM_REGIONS_FOR_DURABILITY)); + assertNull(bkConf.getProperty(REPP_ENABLE_DURABILITY_ENFORCEMENT_IN_REPLACE)); + assertEquals( + bkConf.getProperty(REPP_DNS_RESOLVER_CLASS), + ZkBookieRackAffinityMapping.class.getName()); + + } + + @Test + public void testSetDefaultEnsemblePlacementPolicyRackAwareEnabled() { + AtomicReference<ZooKeeperCache> rackawarePolicyZkCache = new AtomicReference<>(); + AtomicReference<ZooKeeperCache> clientIsolationZkCache = new AtomicReference<>(); + ClientConfiguration bkConf = new ClientConfiguration(); + ServiceConfiguration conf = new ServiceConfiguration(); + ZooKeeper zkClient = mock(ZooKeeper.class); + + assertNull(bkConf.getProperty(REPP_ENABLE_VALIDATION)); + assertNull(bkConf.getProperty(REPP_REGIONS_TO_WRITE)); + assertNull(bkConf.getProperty(REPP_MINIMUM_REGIONS_FOR_DURABILITY)); + assertNull(bkConf.getProperty(REPP_ENABLE_DURABILITY_ENFORCEMENT_IN_REPLACE)); + assertNull(bkConf.getProperty(REPP_DNS_RESOLVER_CLASS)); + + conf.setBookkeeperClientRegionawarePolicyEnabled(true); + + BookKeeperClientFactoryImpl.setDefaultEnsemblePlacementPolicy( + rackawarePolicyZkCache, + clientIsolationZkCache, + bkConf, + conf, + zkClient + ); + + assertTrue(bkConf.getBoolean(REPP_ENABLE_VALIDATION)); + assertNull(bkConf.getString(REPP_REGIONS_TO_WRITE)); + assertEquals(2, bkConf.getInt(REPP_MINIMUM_REGIONS_FOR_DURABILITY)); + assertTrue(bkConf.getBoolean(REPP_ENABLE_DURABILITY_ENFORCEMENT_IN_REPLACE)); + assertEquals( + bkConf.getProperty(REPP_DNS_RESOLVER_CLASS), + ZkBookieRackAffinityMapping.class.getName()); + + } + + @Test + public void testSetDefaultEnsemblePlacementPolicyRackAwareEnabledChangedValues() { + AtomicReference<ZooKeeperCache> rackawarePolicyZkCache = new AtomicReference<>(); + AtomicReference<ZooKeeperCache> clientIsolationZkCache = new AtomicReference<>(); + ClientConfiguration bkConf = new ClientConfiguration(); + ServiceConfiguration conf = new ServiceConfiguration(); + ZooKeeper zkClient = mock(ZooKeeper.class); + + assertNull(bkConf.getProperty(REPP_ENABLE_VALIDATION)); + assertNull(bkConf.getProperty(REPP_REGIONS_TO_WRITE)); + assertNull(bkConf.getProperty(REPP_MINIMUM_REGIONS_FOR_DURABILITY)); + assertNull(bkConf.getProperty(REPP_ENABLE_DURABILITY_ENFORCEMENT_IN_REPLACE)); + assertNull(bkConf.getProperty(REPP_DNS_RESOLVER_CLASS)); + + conf.setBookkeeperClientRegionawarePolicyEnabled(true); + conf.getProperties().setProperty(REPP_ENABLE_VALIDATION, "false"); + conf.getProperties().setProperty(REPP_REGIONS_TO_WRITE, "region1;region2"); + conf.getProperties().setProperty(REPP_MINIMUM_REGIONS_FOR_DURABILITY, "4"); + conf.getProperties().setProperty(REPP_ENABLE_DURABILITY_ENFORCEMENT_IN_REPLACE, "false"); + conf.getProperties().setProperty(REPP_DNS_RESOLVER_CLASS, CachedDNSToSwitchMapping.class.getName()); + + BookKeeperClientFactoryImpl.setDefaultEnsemblePlacementPolicy( + rackawarePolicyZkCache, + clientIsolationZkCache, + bkConf, + conf, + zkClient + ); + + assertFalse(bkConf.getBoolean(REPP_ENABLE_VALIDATION)); + assertEquals("region1;region2", bkConf.getString(REPP_REGIONS_TO_WRITE)); + assertEquals(4, bkConf.getInt(REPP_MINIMUM_REGIONS_FOR_DURABILITY)); + assertFalse(bkConf.getBoolean(REPP_ENABLE_DURABILITY_ENFORCEMENT_IN_REPLACE)); + assertEquals( + bkConf.getProperty(REPP_DNS_RESOLVER_CLASS), + CachedDNSToSwitchMapping.class.getName()); + + } + +}