Repository: samza Updated Branches: refs/heads/master f7b0d3834 -> 89dc18e96
SAMZA-1607: Handle ZkNoNodeExistsException in zkUtils.readProcessorData Author: Shanthoosh Venkataraman <svenkatara...@linkedin.com> Reviewers: Xinyu Liu <xinyuliu...@gmail.com> Closes #437 from shanthoosh/fix_zkutils_get_processor_data Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/89dc18e9 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/89dc18e9 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/89dc18e9 Branch: refs/heads/master Commit: 89dc18e9691adb02eb735d55c121d169ea6c99b5 Parents: f7b0d38 Author: Shanthoosh Venkataraman <svenkatara...@linkedin.com> Authored: Wed Mar 7 13:24:54 2018 -0800 Committer: xiliu <xi...@linkedin.com> Committed: Wed Mar 7 13:24:54 2018 -0800 ---------------------------------------------------------------------- .../org/apache/samza/zk/ZkJobCoordinator.java | 5 +-- .../main/java/org/apache/samza/zk/ZkUtils.java | 32 +++++++++++--------- .../java/org/apache/samza/zk/TestZkUtils.java | 17 ++++++++++- 3 files changed, 37 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/89dc18e9/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java index 00eeeae..407291a 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java @@ -195,11 +195,12 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { Set<String> uniqueProcessorIds = new HashSet<String>(currentProcessorIds); if (currentProcessorIds.size() != uniqueProcessorIds.size()) { - LOG.info("Processors: {} has duplicates. Not generating job model.", currentProcessorIds); + LOG.info("Processors: {} has duplicates. Not generating JobModel.", currentProcessorIds); return; } // Generate the JobModel + LOG.info("Generating new JobModel with processors: {}.", currentProcessorIds); JobModel jobModel = generateNewJobModel(currentProcessorIds); // Create checkpoint and changelog streams if they don't exist @@ -217,7 +218,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { // Assign the next version of JobModel String currentJMVersion = zkUtils.getJobModelVersion(); String nextJMVersion = zkUtils.getNextJobModelVersion(currentJMVersion); - LOG.info("pid=" + processorId + "Generated new Job Model. Version = " + nextJMVersion); + LOG.info("pid=" + processorId + "Generated new JobModel with version: " + nextJMVersion + " and processors: " + currentProcessorIds); // Publish the new job model zkUtils.publishJobModel(nextJMVersion, jobModel); http://git-wip-us.apache.org/repos/asf/samza/blob/89dc18e9/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java index 300fff6..43f7d9c 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java @@ -189,15 +189,18 @@ public class ZkUtils { * Fetches all the ephemeral processor nodes of a standalone job from zookeeper. * @return a list of {@link ProcessorNode}, where each ProcessorNode represents a registered stream processor. */ - private List<ProcessorNode> getAllProcessorNodes() { + List<ProcessorNode> getAllProcessorNodes() { List<String> processorZNodes = getSortedActiveProcessorsZnodes(); LOG.debug("Active ProcessorZNodes in zookeeper: {}.", processorZNodes); - return processorZNodes.stream() - .map(processorZNode -> { - String ephemeralProcessorPath = String.format("%s/%s", keyBuilder.getProcessorsPath(), processorZNode); - String data = readProcessorData(ephemeralProcessorPath); - return new ProcessorNode(new ProcessorData(data), ephemeralProcessorPath); - }).collect(Collectors.toList()); + List<ProcessorNode> processorNodes = new ArrayList<>(); + for (String processorZNode: processorZNodes) { + String ephemeralProcessorPath = String.format("%s/%s", keyBuilder.getProcessorsPath(), processorZNode); + String data = readProcessorData(ephemeralProcessorPath); + if (data != null) { + processorNodes.add(new ProcessorNode(new ProcessorData(data), ephemeralProcessorPath)); + } + } + return processorNodes; } /** @@ -221,12 +224,10 @@ public class ZkUtils { * @throws SamzaException when fullPath doesn't exist in zookeeper * or problems with connecting to zookeeper. */ - String readProcessorData(String fullPath) { + private String readProcessorData(String fullPath) { try { - String data = zkClient.readData(fullPath, false); - if (metrics != null) { - metrics.reads.inc(); - } + String data = zkClient.readData(fullPath, true); + metrics.reads.inc(); return data; } catch (Exception e) { throw new SamzaException(String.format("Cannot read ZK node: %s", fullPath), e); @@ -252,7 +253,10 @@ public class ZkUtils { if (znodeIds.size() > 0) { for (String child : znodeIds) { String fullPath = String.format("%s/%s", processorPath, child); - processorIds.add(new ProcessorData(readProcessorData(fullPath)).getProcessorId()); + String processorData = readProcessorData(fullPath); + if (processorData != null) { + processorIds.add(new ProcessorData(processorData).getProcessorId()); + } } Collections.sort(processorIds); LOG.info("Found these children - " + znodeIds); @@ -589,7 +593,7 @@ public class ZkUtils { /** * Represents zookeeper processor node. */ - private static class ProcessorNode { + static class ProcessorNode { private final ProcessorData processorData; // Ex: /test/processors/0000000000 http://git-wip-us.apache.org/repos/asf/samza/blob/89dc18e9/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java index ec04949..1dfb414 100644 --- a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java +++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java @@ -27,6 +27,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.function.BooleanSupplier; +import com.google.common.collect.ImmutableList; import org.I0Itec.zkclient.IZkDataListener; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.ZkConnection; @@ -49,7 +50,6 @@ import org.junit.Test; import org.junit.rules.ExpectedException; import org.mockito.Mockito; - public class TestZkUtils { private static EmbeddedZookeeper zkServer = null; private static final ZkKeyBuilder KEY_BUILDER = new ZkKeyBuilder("test"); @@ -122,6 +122,21 @@ public class TestZkUtils { } @Test + public void testGetActiveProcessorIdShouldReturnEmptyForNonExistingZookeeperNodes() { + List<String> processorsIDs = zkUtils.getActiveProcessorsIDs(ImmutableList.of("node1", "node2")); + + Assert.assertEquals(0, processorsIDs.size()); + } + + + @Test + public void testGetAllProcessorNodesShouldReturnEmptyForNonExistingZookeeperNodes() { + List<ZkUtils.ProcessorNode> processorsIDs = zkUtils.getAllProcessorNodes(); + + Assert.assertEquals(0, processorsIDs.size()); + } + + @Test public void testZKProtocolVersion() { // first time connect, version should be set to ZkUtils.ZK_PROTOCOL_VERSION ZkLeaderElector le = new ZkLeaderElector("1", zkUtils);