merlimat closed pull request #1348: adding max retries for assignment write wait
URL: https://github.com/apache/incubator-pulsar/pull/1348
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml
index c2c40911e..daaeae9a9 100644
--- a/conf/functions_worker.yml
+++ b/conf/functions_worker.yml
@@ -44,3 +44,4 @@ functionAssignmentTopicName: "assignments"
 failureCheckFreqMs: 30000
 rescheduleTimeoutMs: 60000
 initialBrokerReconnectMaxRetries: 60
+assignmentWriteMaxRetries: 60
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
index de0228dc5..fd61161af 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
@@ -164,13 +164,19 @@ private void invokeScheduler() {
         }
 
         // wait for assignment update to go throw the pipeline
+        int retries = 0;
         while (this.functionRuntimeManager.getCurrentAssignmentVersion() < 
assignmentVersion) {
+            if (retries >= this.workerConfig.getAssignmentWriteMaxRetries()) {
+                log.warn("Max number of retries reached for waiting for 
assignment to propagate. Will continue now.");
+                break;
+            }
             log.info("Waiting for assignments to propagate...");
             try {
                 Thread.sleep(500);
             } catch (InterruptedException e) {
                 throw new RuntimeException(e);
             }
+            retries++;
         }
     }
 
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index 84f243852..579572a58 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -57,6 +57,7 @@
     private long failureCheckFreqMs;
     private long rescheduleTimeoutMs;
     private int initialBrokerReconnectMaxRetries;
+    private int assignmentWriteMaxRetries;
 
     @Data
     @Setter
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
index f9a03dbcd..54a245f72 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
@@ -41,9 +41,8 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doReturn;
@@ -74,6 +73,7 @@ public void setup() throws PulsarClientException {
         workerConfig.setMetricsConfig(new WorkerConfig.MetricsConfig()
                 
.setMetricsSinkClassName(FunctionRuntimeManagerTest.TestSink.class.getName()));
         
workerConfig.setSchedulerClassName(RoundRobinScheduler.class.getName());
+        workerConfig.setAssignmentWriteMaxRetries(0);
 
         producer = mock(Producer.class);
         completableFuture = spy(new CompletableFuture<>());
@@ -94,7 +94,8 @@ public void setup() throws PulsarClientException {
     }
 
     @Test
-    public void testSchedule() throws PulsarClientException, 
NoSuchMethodException, InterruptedException {
+    public void testSchedule() throws PulsarClientException, 
NoSuchMethodException, InterruptedException,
+            TimeoutException, ExecutionException {
 
         List<Function.FunctionMetaData> functionMetaDataList = new 
LinkedList<>();
         long version = 5;
@@ -139,7 +140,7 @@ public void testSchedule() throws PulsarClientException, 
NoSuchMethodException,
 
     @Test
     public void testNothingNewToSchedule() throws InterruptedException, 
ExecutionException, NoSuchMethodException,
-            InvalidProtocolBufferException {
+            InvalidProtocolBufferException, TimeoutException {
 
         List<Function.FunctionMetaData> functionMetaDataList = new 
LinkedList<>();
         long version = 5;
@@ -191,7 +192,7 @@ public void testNothingNewToSchedule() throws 
InterruptedException, ExecutionExc
 
     @Test
     public void testAddingFunctions() throws NoSuchMethodException, 
InterruptedException,
-            InvalidProtocolBufferException {
+            InvalidProtocolBufferException, TimeoutException, 
ExecutionException {
         List<Function.FunctionMetaData> functionMetaDataList = new 
LinkedList<>();
         long version = 5;
         Function.FunctionMetaData function1 = 
Function.FunctionMetaData.newBuilder()
@@ -255,7 +256,7 @@ public void testAddingFunctions() throws 
NoSuchMethodException, InterruptedExcep
 
     @Test
     public void testDeletingFunctions() throws NoSuchMethodException, 
InterruptedException,
-            InvalidProtocolBufferException {
+            InvalidProtocolBufferException, TimeoutException, 
ExecutionException {
         List<Function.FunctionMetaData> functionMetaDataList = new 
LinkedList<>();
         long version = 5;
         Function.FunctionMetaData function1 = 
Function.FunctionMetaData.newBuilder()
@@ -321,7 +322,8 @@ public void testDeletingFunctions() throws 
NoSuchMethodException, InterruptedExc
     }
 
     @Test
-    public void testScalingUp() throws NoSuchMethodException, 
InterruptedException, InvalidProtocolBufferException, PulsarClientException {
+    public void testScalingUp() throws NoSuchMethodException, 
InterruptedException, InvalidProtocolBufferException,
+            PulsarClientException, TimeoutException, ExecutionException {
         List<Function.FunctionMetaData> functionMetaDataList = new 
LinkedList<>();
         long version = 5;
         Function.FunctionMetaData function1 = 
Function.FunctionMetaData.newBuilder()
@@ -431,7 +433,7 @@ public void testScalingUp() throws NoSuchMethodException, 
InterruptedException,
 
     @Test
     public void testScalingDown() throws PulsarClientException, 
NoSuchMethodException, InterruptedException,
-            InvalidProtocolBufferException {
+            InvalidProtocolBufferException, TimeoutException, 
ExecutionException {
         List<Function.FunctionMetaData> functionMetaDataList = new 
LinkedList<>();
         long version = 5;
         Function.FunctionMetaData function1 = 
Function.FunctionMetaData.newBuilder()
@@ -542,7 +544,7 @@ public void testScalingDown() throws PulsarClientException, 
NoSuchMethodExceptio
 
     @Test
     public void testUpdate() throws PulsarClientException, 
NoSuchMethodException, InterruptedException,
-            InvalidProtocolBufferException {
+            InvalidProtocolBufferException, TimeoutException, 
ExecutionException {
         List<Function.FunctionMetaData> functionMetaDataList = new 
LinkedList<>();
         long version = 5;
         Function.FunctionMetaData function1 = 
Function.FunctionMetaData.newBuilder()
@@ -666,29 +668,13 @@ public void testUpdate() throws PulsarClientException, 
NoSuchMethodException, In
         );
     }
 
-    private void callSchedule() throws NoSuchMethodException, 
InterruptedException {
+    private void callSchedule() throws NoSuchMethodException, 
InterruptedException,
+            TimeoutException, ExecutionException {
         long intialVersion = 
functionRuntimeManager.getCurrentAssignmentVersion();
-        int initalCount = getMethodInvocationDetails(completableFuture,
-                CompletableFuture.class.getMethod("get")).size();
-        log.info("initalCount: {}", initalCount);
         Future<?> complete = schedulerManager.schedule();
-        int count = 0;
-        while (!complete.isDone()) {
 
-            int invocationCount = getMethodInvocationDetails(completableFuture,
-                    CompletableFuture.class.getMethod("get")).size();
-            log.info("invocationCount: {}", invocationCount);
-
-            if (invocationCount >= initalCount + 1) {
-                doReturn(intialVersion + 
1).when(functionRuntimeManager).getCurrentAssignmentVersion();
-            }
-
-            if (count > 100) {
-                Assert.fail("Scheduler failed to terminate!");
-            }
-            Thread.sleep(100);
-            count++;
-        }
+        complete.get(30, TimeUnit.SECONDS);
+        doReturn(intialVersion + 
1).when(functionRuntimeManager).getCurrentAssignmentVersion();
     }
 
     private List<Invocation> getMethodInvocationDetails(Object o, Method 
method) throws NoSuchMethodException {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to