Repository: samza
Updated Branches:
  refs/heads/master e63d27c0c -> c89841366


SAMZA-1686: Set finite operation timeout when creating zkClient.

Currently zkClient is created with operationRetryTimeOut of -1. This causes 
zkClient to retry indefinitely in case of irrecoverable exceptions thereby 
delaying the StreamProcessor shutdown.

Author: Shanthoosh Venkataraman <svenkatara...@linkedin.com>

Reviewers: Jagadish Venkatraman <vjagadish1...@apache.org>

Closes #487 from shanthoosh/master


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/c8984136
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/c8984136
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/c8984136

Branch: refs/heads/master
Commit: c89841366d53b9e2733dc72bbf91dea9382099a0
Parents: e63d27c
Author: Shanthoosh Venkataraman <svenkatara...@linkedin.com>
Authored: Tue Apr 24 15:56:05 2018 -0700
Committer: Prateek Maheshwari <pmahe...@linkedin.com>
Committed: Tue Apr 24 15:56:05 2018 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/samza/zk/ZkCoordinationUtilsFactory.java | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/c8984136/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtilsFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtilsFactory.java 
b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtilsFactory.java
index 8dd42c1..072a2f5 100644
--- 
a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtilsFactory.java
+++ 
b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtilsFactory.java
@@ -20,6 +20,7 @@ package org.apache.samza.zk;
 
 import com.google.common.base.Strings;
 import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.serialize.SerializableSerializer;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.ZkConfig;
@@ -54,7 +55,7 @@ public class ZkCoordinationUtilsFactory implements 
CoordinationUtilsFactory {
   public static ZkClient createZkClient(String connectString, int 
sessionTimeoutMS, int connectionTimeoutMs) {
     ZkClient zkClient;
     try {
-      zkClient = new ZkClient(connectString, sessionTimeoutMS, 
connectionTimeoutMs);
+      zkClient = new ZkClient(connectString, sessionTimeoutMS, 
connectionTimeoutMs, new SerializableSerializer(), connectionTimeoutMs);
     } catch (Exception e) {
       // ZkClient constructor may throw a variety of different exceptions, not 
all of them Zk based.
       throw new SamzaException("zkClient failed to connect to ZK at :" + 
connectString, e);

Reply via email to