Repository: samza
Updated Branches:
  refs/heads/master f7b0d3834 -> 89dc18e96


SAMZA-1607: Handle ZkNoNodeExistsException in zkUtils.readProcessorData

Author: Shanthoosh Venkataraman <svenkatara...@linkedin.com>

Reviewers: Xinyu Liu <xinyuliu...@gmail.com>

Closes #437 from shanthoosh/fix_zkutils_get_processor_data


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/89dc18e9
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/89dc18e9
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/89dc18e9

Branch: refs/heads/master
Commit: 89dc18e9691adb02eb735d55c121d169ea6c99b5
Parents: f7b0d38
Author: Shanthoosh Venkataraman <svenkatara...@linkedin.com>
Authored: Wed Mar 7 13:24:54 2018 -0800
Committer: xiliu <xi...@linkedin.com>
Committed: Wed Mar 7 13:24:54 2018 -0800

----------------------------------------------------------------------
 .../org/apache/samza/zk/ZkJobCoordinator.java   |  5 +--
 .../main/java/org/apache/samza/zk/ZkUtils.java  | 32 +++++++++++---------
 .../java/org/apache/samza/zk/TestZkUtils.java   | 17 ++++++++++-
 3 files changed, 37 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/89dc18e9/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java 
b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index 00eeeae..407291a 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -195,11 +195,12 @@ public class ZkJobCoordinator implements JobCoordinator, 
ZkControllerListener {
     Set<String> uniqueProcessorIds = new HashSet<String>(currentProcessorIds);
 
     if (currentProcessorIds.size() != uniqueProcessorIds.size()) {
-      LOG.info("Processors: {} has duplicates. Not generating job model.", 
currentProcessorIds);
+      LOG.info("Processors: {} has duplicates. Not generating JobModel.", 
currentProcessorIds);
       return;
     }
 
     // Generate the JobModel
+    LOG.info("Generating new JobModel with processors: {}.", 
currentProcessorIds);
     JobModel jobModel = generateNewJobModel(currentProcessorIds);
 
     // Create checkpoint and changelog streams if they don't exist
@@ -217,7 +218,7 @@ public class ZkJobCoordinator implements JobCoordinator, 
ZkControllerListener {
     // Assign the next version of JobModel
     String currentJMVersion = zkUtils.getJobModelVersion();
     String nextJMVersion = zkUtils.getNextJobModelVersion(currentJMVersion);
-    LOG.info("pid=" + processorId + "Generated new Job Model. Version = " + 
nextJMVersion);
+    LOG.info("pid=" + processorId + "Generated new JobModel with version: " + 
nextJMVersion + " and processors: " + currentProcessorIds);
 
     // Publish the new job model
     zkUtils.publishJobModel(nextJMVersion, jobModel);

http://git-wip-us.apache.org/repos/asf/samza/blob/89dc18e9/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java 
b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
index 300fff6..43f7d9c 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
@@ -189,15 +189,18 @@ public class ZkUtils {
    * Fetches all the ephemeral processor nodes of a standalone job from 
zookeeper.
    * @return a list of {@link ProcessorNode}, where each ProcessorNode 
represents a registered stream processor.
    */
-  private List<ProcessorNode> getAllProcessorNodes() {
+  List<ProcessorNode> getAllProcessorNodes() {
     List<String> processorZNodes = getSortedActiveProcessorsZnodes();
     LOG.debug("Active ProcessorZNodes in zookeeper: {}.", processorZNodes);
-    return processorZNodes.stream()
-                          .map(processorZNode -> {
-                              String ephemeralProcessorPath = 
String.format("%s/%s", keyBuilder.getProcessorsPath(), processorZNode);
-                              String data = 
readProcessorData(ephemeralProcessorPath);
-                              return new ProcessorNode(new 
ProcessorData(data), ephemeralProcessorPath);
-                            }).collect(Collectors.toList());
+    List<ProcessorNode> processorNodes = new ArrayList<>();
+    for (String processorZNode: processorZNodes) {
+      String ephemeralProcessorPath = String.format("%s/%s", 
keyBuilder.getProcessorsPath(), processorZNode);
+      String data = readProcessorData(ephemeralProcessorPath);
+      if (data != null) {
+        processorNodes.add(new ProcessorNode(new ProcessorData(data), 
ephemeralProcessorPath));
+      }
+    }
+    return processorNodes;
   }
 
   /**
@@ -221,12 +224,10 @@ public class ZkUtils {
    * @throws SamzaException when fullPath doesn't exist in zookeeper
    * or problems with connecting to zookeeper.
    */
-  String readProcessorData(String fullPath) {
+  private String readProcessorData(String fullPath) {
     try {
-      String data = zkClient.readData(fullPath, false);
-      if (metrics != null) {
-        metrics.reads.inc();
-      }
+      String data = zkClient.readData(fullPath, true);
+      metrics.reads.inc();
       return data;
     } catch (Exception e) {
       throw new SamzaException(String.format("Cannot read ZK node: %s", 
fullPath), e);
@@ -252,7 +253,10 @@ public class ZkUtils {
     if (znodeIds.size() > 0) {
       for (String child : znodeIds) {
         String fullPath = String.format("%s/%s", processorPath, child);
-        processorIds.add(new 
ProcessorData(readProcessorData(fullPath)).getProcessorId());
+        String processorData = readProcessorData(fullPath);
+        if (processorData != null) {
+          processorIds.add(new ProcessorData(processorData).getProcessorId());
+        }
       }
       Collections.sort(processorIds);
       LOG.info("Found these children - " + znodeIds);
@@ -589,7 +593,7 @@ public class ZkUtils {
   /**
    * Represents zookeeper processor node.
    */
-  private static class ProcessorNode {
+  static class ProcessorNode {
     private final ProcessorData processorData;
 
     // Ex: /test/processors/0000000000

http://git-wip-us.apache.org/repos/asf/samza/blob/89dc18e9/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java 
b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
index ec04949..1dfb414 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
@@ -27,6 +27,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.function.BooleanSupplier;
+import com.google.common.collect.ImmutableList;
 import org.I0Itec.zkclient.IZkDataListener;
 import org.I0Itec.zkclient.ZkClient;
 import org.I0Itec.zkclient.ZkConnection;
@@ -49,7 +50,6 @@ import org.junit.Test;
 import org.junit.rules.ExpectedException;
 import org.mockito.Mockito;
 
-
 public class TestZkUtils {
   private static EmbeddedZookeeper zkServer = null;
   private static final ZkKeyBuilder KEY_BUILDER = new ZkKeyBuilder("test");
@@ -122,6 +122,21 @@ public class TestZkUtils {
   }
 
   @Test
+  public void 
testGetActiveProcessorIdShouldReturnEmptyForNonExistingZookeeperNodes() {
+    List<String> processorsIDs = 
zkUtils.getActiveProcessorsIDs(ImmutableList.of("node1", "node2"));
+
+    Assert.assertEquals(0, processorsIDs.size());
+  }
+
+
+  @Test
+  public void 
testGetAllProcessorNodesShouldReturnEmptyForNonExistingZookeeperNodes() {
+    List<ZkUtils.ProcessorNode> processorsIDs = zkUtils.getAllProcessorNodes();
+
+    Assert.assertEquals(0, processorsIDs.size());
+  }
+
+  @Test
   public void testZKProtocolVersion() {
     // first time connect, version should be set to ZkUtils.ZK_PROTOCOL_VERSION
     ZkLeaderElector le = new ZkLeaderElector("1", zkUtils);

Reply via email to