SAMZA-1324: Fix NullPointerException in ZkUtils api's. Problem: Read/Write api methods in ZkUtils updates counters/timers in `metrics` field. In a ZkUtils constructor this fields is not initialized properly. Java default for uninitialized field is null resulting in NPE.
Fix: Initialize private fields of ZkUtils class with appropriate defaults. Author: Shanthoosh Venkataraman <svenkatara...@linkedin.com> Reviewers: Navina Ramesh <nav...@apache.org> Closes #235 from shanthoosh/fix_zkutils_api Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/ebb1b7fe Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/ebb1b7fe Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/ebb1b7fe Branch: refs/heads/0.14.0 Commit: ebb1b7fea2518e827e589fe8523089322c68c9bf Parents: be98993 Author: Shanthoosh Venkataraman <svenkatara...@linkedin.com> Authored: Wed Jul 12 11:13:09 2017 -0700 Committer: navina <nav...@apache.org> Committed: Wed Jul 12 11:13:09 2017 -0700 ---------------------------------------------------------------------- .../versioned/container/metrics-table.html | 8 ++- .../samza/zk/ZkCoordinationServiceFactory.java | 5 +- .../org/apache/samza/zk/ZkJobCoordinator.java | 2 +- .../samza/zk/ZkJobCoordinatorMetrics.java | 9 ---- .../main/java/org/apache/samza/zk/ZkUtils.java | 17 +++--- .../org/apache/samza/zk/ZkUtilsMetrics.java | 56 ++++++++++++++++++++ .../zk/TestZkBarrierForVersionUpgrade.java | 4 +- .../apache/samza/zk/TestZkLeaderElector.java | 2 +- .../apache/samza/zk/TestZkProcessorLatch.java | 4 +- .../java/org/apache/samza/zk/TestZkUtils.java | 4 +- .../processor/TestZkLocalApplicationRunner.java | 5 +- 11 files changed, 82 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/ebb1b7fe/docs/learn/documentation/versioned/container/metrics-table.html ---------------------------------------------------------------------- diff --git a/docs/learn/documentation/versioned/container/metrics-table.html b/docs/learn/documentation/versioned/container/metrics-table.html index 7fbbc40..e504fa3 100644 --- a/docs/learn/documentation/versioned/container/metrics-table.html +++ b/docs/learn/documentation/versioned/container/metrics-table.html @@ -142,6 +142,7 @@ <li><a href="#bootstrapping-chooser-metrics">BootstrappingChooserMetrics</a></li> <li><a href="#hdfs-system-producer-metrics">HdfsSystemProducerMetrics</a></li> <li><a href="#elasticsearch-system-producer-metrics">ElasticsearchSystemProducerMetrics</a></li> + <li><a href="#zookeeper-client-metrics">ZookeeperClientMetrics</a></li> <li><a href="#zookeeper-job-coordinator-metrics">ZkJobCoordinatorMetrics</a></li> </ul> <p>Words highlighted like <span class="system">this</span> are placeholders for your own variable names defined in configuration file or system variables defined while starting the job.</p> @@ -894,7 +895,7 @@ </tr> <tr> - <th colspan="2" class="section" id="zookeeper-job-coordinator-metrics">org.apache.samza.zk.ZkJobCoordinatorMetrics</th> + <th colspan="2" class="section" id="zookeeper-client-metrics">org.apache.samza.zk.ZkUtilsMetrics</th> </tr> <tr> <td>reads</td> @@ -909,10 +910,13 @@ <td>Number of subscriptions to znodes in Zookeeper</td> </tr> <tr> - <td>zk-connection-error</td> + <td>zk-connection-errors</td> <td>Number of Zookeeper connection errors</td> </tr> <tr> + <th colspan="2" class="section" id="zookeeper-job-coordinator-metrics">org.apache.samza.zk.ZkJobCoordinatorMetrics</th> + </tr> + <tr> <td>is-leader</td> <td>Denotes if the processor is a leader or not</td> </tr> http://git-wip-us.apache.org/repos/asf/samza/blob/ebb1b7fe/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java index 20fcfa4..d0633a8 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java @@ -25,13 +25,14 @@ import org.apache.samza.config.Config; import org.apache.samza.config.ZkConfig; import org.apache.samza.coordinator.CoordinationServiceFactory; import org.apache.samza.coordinator.CoordinationUtils; +import org.apache.samza.util.NoOpMetricsRegistry; import org.apache.zookeeper.client.ConnectStringParser; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ZkCoordinationServiceFactory implements CoordinationServiceFactory { - private final static Logger LOG = LoggerFactory.getLogger(ZkCoordinationServiceFactory.class); + private static final Logger LOG = LoggerFactory.getLogger(ZkCoordinationServiceFactory.class); public CoordinationUtils getCoordinationService(String groupId, String participantId, Config config) { ZkConfig zkConfig = new ZkConfig(config); @@ -39,7 +40,7 @@ public class ZkCoordinationServiceFactory implements CoordinationServiceFactory ZkClient zkClient = createZkClient(zkConfig.getZkConnect(), zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs()); - ZkUtils zkUtils = new ZkUtils(new ZkKeyBuilder(groupId), zkClient, zkConfig.getZkConnectionTimeoutMs()); + ZkUtils zkUtils = new ZkUtils(new ZkKeyBuilder(groupId), zkClient, zkConfig.getZkConnectionTimeoutMs(), new NoOpMetricsRegistry()); return new ZkCoordinationUtils(participantId, zkConfig, zkUtils); } http://git-wip-us.apache.org/repos/asf/samza/blob/ebb1b7fe/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java index f2fc3de..94c3054 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java @@ -80,7 +80,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { zkConfig.getZkConnect(), zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs()), - zkConfig.getZkConnectionTimeoutMs(), metrics); + zkConfig.getZkConnectionTimeoutMs(), metricsRegistry); this.processorId = createProcessorId(config); LeaderElector leaderElector = new ZkLeaderElector(processorId, zkUtils); http://git-wip-us.apache.org/repos/asf/samza/blob/ebb1b7fe/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorMetrics.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorMetrics.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorMetrics.java index 3437602..3d00897 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorMetrics.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorMetrics.java @@ -31,11 +31,6 @@ public class ZkJobCoordinatorMetrics extends MetricsBase { private final MetricsRegistry metricsRegistry; - public final Counter reads; - public final Counter writes; - public final Counter subscriptions; - public final Counter zkConnectionError; - /** * Denotes if the processor is a leader or not */ @@ -65,10 +60,6 @@ public class ZkJobCoordinatorMetrics extends MetricsBase { public ZkJobCoordinatorMetrics(MetricsRegistry metricsRegistry) { super(metricsRegistry); this.metricsRegistry = metricsRegistry; - this.reads = newCounter("reads"); - this.writes = newCounter("writes"); - this.subscriptions = newCounter("subscriptions"); - this.zkConnectionError = newCounter("zk-connection-error"); this.isLeader = newGauge("is-leader", false); this.barrierCreation = newCounter("barrier-creation"); this.barrierStateChange = newCounter("barrier-state-change"); http://git-wip-us.apache.org/repos/asf/samza/blob/ebb1b7fe/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java index aa55ff7..ecf118b 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java @@ -31,6 +31,7 @@ import org.I0Itec.zkclient.ZkConnection; import org.I0Itec.zkclient.exception.ZkInterruptedException; import org.apache.samza.SamzaException; import org.apache.samza.job.model.JobModel; +import org.apache.samza.metrics.MetricsRegistry; import org.apache.samza.serializers.model.SamzaObjectMapper; import org.apache.zookeeper.data.Stat; import org.codehaus.jackson.map.ObjectMapper; @@ -65,19 +66,13 @@ public class ZkUtils { private volatile String ephemeralPath = null; private final ZkKeyBuilder keyBuilder; private final int connectionTimeoutMs; - private ZkJobCoordinatorMetrics metrics; + private final ZkUtilsMetrics metrics; - public ZkUtils(ZkKeyBuilder zkKeyBuilder, ZkClient zkClient, int connectionTimeoutMs) { + public ZkUtils(ZkKeyBuilder zkKeyBuilder, ZkClient zkClient, int connectionTimeoutMs, MetricsRegistry metricsRegistry) { this.keyBuilder = zkKeyBuilder; this.connectionTimeoutMs = connectionTimeoutMs; this.zkClient = zkClient; - } - - public ZkUtils(ZkKeyBuilder zkKeyBuilder, ZkClient zkClient, int connectionTimeoutMs, ZkJobCoordinatorMetrics metrics) { - this.keyBuilder = zkKeyBuilder; - this.connectionTimeoutMs = connectionTimeoutMs; - this.zkClient = zkClient; - this.metrics = metrics; + this.metrics = new ZkUtilsMetrics(metricsRegistry); } public void connect() throws ZkInterruptedException { @@ -269,7 +264,9 @@ public class ZkUtils { * @return jobmodel version as a string */ public String getJobModelVersion() { - return zkClient.<String>readData(keyBuilder.getJobModelVersionPath()); + String jobModelVersion = zkClient.readData(keyBuilder.getJobModelVersionPath()); + metrics.reads.inc(); + return jobModelVersion; } /** http://git-wip-us.apache.org/repos/asf/samza/blob/ebb1b7fe/samza-core/src/main/java/org/apache/samza/zk/ZkUtilsMetrics.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtilsMetrics.java b/samza-core/src/main/java/org/apache/samza/zk/ZkUtilsMetrics.java new file mode 100644 index 0000000..b9f4aa8 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtilsMetrics.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.zk; + +import org.apache.samza.metrics.Counter; +import org.apache.samza.metrics.MetricsBase; +import org.apache.samza.metrics.MetricsRegistry; + +/** + * Contains all the metrics published by {@link ZkUtils}. + */ +public class ZkUtilsMetrics extends MetricsBase { + /** + * Number of data reads from zookeeper. + */ + public final Counter reads; + + /** + * Number of data writes into zookeeper. + */ + public final Counter writes; + + /** + * Number of subscriptions created with zookeeper. + */ + public final Counter subscriptions; + + /** + * Number of zookeeper connection errors in ZkClient. + */ + public final Counter zkConnectionError; + + public ZkUtilsMetrics(MetricsRegistry metricsRegistry) { + super(metricsRegistry); + this.reads = newCounter("reads"); + this.writes = newCounter("writes"); + this.subscriptions = newCounter("subscriptions"); + this.zkConnectionError = newCounter("zk-connection-errors"); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/ebb1b7fe/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java index 49cd280..3dd1bd5 100644 --- a/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java +++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java @@ -53,9 +53,9 @@ public class TestZkBarrierForVersionUpgrade { @Before public void testSetup() { ZkClient zkClient = new ZkClient(testZkConnectionString, ZkConfig.DEFAULT_SESSION_TIMEOUT_MS, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS); - this.zkUtils = new ZkUtils(new ZkKeyBuilder("group1"), zkClient, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS, new ZkJobCoordinatorMetrics(new NoOpMetricsRegistry())); + this.zkUtils = new ZkUtils(new ZkKeyBuilder("group1"), zkClient, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS, new NoOpMetricsRegistry()); ZkClient zkClient1 = new ZkClient(testZkConnectionString, ZkConfig.DEFAULT_SESSION_TIMEOUT_MS, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS); - this.zkUtils1 = new ZkUtils(new ZkKeyBuilder("group1"), zkClient1, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS, new ZkJobCoordinatorMetrics(new NoOpMetricsRegistry())); + this.zkUtils1 = new ZkUtils(new ZkKeyBuilder("group1"), zkClient1, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS, new NoOpMetricsRegistry()); } @After http://git-wip-us.apache.org/repos/asf/samza/blob/ebb1b7fe/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java index 993297b..3ff9175 100644 --- a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java +++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java @@ -437,6 +437,6 @@ public class TestZkLeaderElector { return new ZkUtils( KEY_BUILDER, zkClient, - CONNECTION_TIMEOUT_MS, new ZkJobCoordinatorMetrics(new NoOpMetricsRegistry())); + CONNECTION_TIMEOUT_MS, new NoOpMetricsRegistry()); } } http://git-wip-us.apache.org/repos/asf/samza/blob/ebb1b7fe/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java index 9f089a0..b2a5533 100644 --- a/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java +++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java @@ -27,6 +27,7 @@ import java.util.concurrent.TimeoutException; import org.I0Itec.zkclient.ZkClient; import org.apache.samza.coordinator.Latch; import org.apache.samza.testUtils.EmbeddedZookeeper; +import org.apache.samza.util.NoOpMetricsRegistry; import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; @@ -219,6 +220,7 @@ public class TestZkProcessorLatch { return new ZkUtils( KEY_BUILDER, zkClient, - CONNECTION_TIMEOUT_MS); + CONNECTION_TIMEOUT_MS, + new NoOpMetricsRegistry()); } } http://git-wip-us.apache.org/repos/asf/samza/blob/ebb1b7fe/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java index 9e33484..a33bf03 100644 --- a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java +++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java @@ -71,7 +71,7 @@ public class TestZkUtils { zkUtils = new ZkUtils( KEY_BUILDER, zkClient, - SESSION_TIMEOUT_MS, new ZkJobCoordinatorMetrics(new NoOpMetricsRegistry())); + SESSION_TIMEOUT_MS, new NoOpMetricsRegistry()); zkUtils.connect(); } @@ -110,7 +110,7 @@ public class TestZkUtils { zkUtils.registerProcessorAndGetId(new ProcessorData("host1", "1")); List<String> l = zkUtils.getSortedActiveProcessorsIDs(); Assert.assertEquals(1, l.size()); - new ZkUtils(KEY_BUILDER, zkClient, SESSION_TIMEOUT_MS).registerProcessorAndGetId(new ProcessorData("host2", "2")); + new ZkUtils(KEY_BUILDER, zkClient, SESSION_TIMEOUT_MS, new NoOpMetricsRegistry()).registerProcessorAndGetId(new ProcessorData("host2", "2")); l = zkUtils.getSortedActiveProcessorsIDs(); Assert.assertEquals(2, l.size()); http://git-wip-us.apache.org/repos/asf/samza/blob/ebb1b7fe/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java index 4865647..2d5da2b 100644 --- a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java +++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java @@ -54,7 +54,6 @@ import org.apache.samza.runtime.LocalApplicationRunner; import org.apache.samza.test.StandaloneIntegrationTestHarness; import org.apache.samza.test.StandaloneTestUtils; import org.apache.samza.util.NoOpMetricsRegistry; -import org.apache.samza.zk.ZkJobCoordinatorMetrics; import org.apache.samza.zk.ZkKeyBuilder; import org.apache.samza.zk.ZkUtils; import org.junit.Rule; @@ -94,7 +93,6 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne private LocalApplicationRunner applicationRunner1; private LocalApplicationRunner applicationRunner2; private LocalApplicationRunner applicationRunner3; - private ZkJobCoordinatorMetrics zkJobCoordinatorMetrics; // Set 90 seconds as max execution time for each test. @Rule @@ -110,8 +108,7 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne outputKafkaTopic = String.format("test-output-topic-%s", uniqueTestId); ZkClient zkClient = new ZkClient(zkConnect()); ZkKeyBuilder zkKeyBuilder = new ZkKeyBuilder(String.format("app-%s-%s", testStreamAppName, testStreamAppId)); - zkJobCoordinatorMetrics = new ZkJobCoordinatorMetrics(new NoOpMetricsRegistry()); - zkUtils = new ZkUtils(zkKeyBuilder, zkClient, ZK_CONNECTION_TIMEOUT_MS, zkJobCoordinatorMetrics); + zkUtils = new ZkUtils(zkKeyBuilder, zkClient, ZK_CONNECTION_TIMEOUT_MS, new NoOpMetricsRegistry()); zkUtils.connect(); // Set up stream application configs with different processorIds and same testStreamAppName, testStreamAppId.