This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new fc0ef90  [functions][worker] timeout creating producer for worker 
(#2738)
fc0ef90 is described below

commit fc0ef9012f8e5d18985c6f897d35b9b55f7b7479
Author: Sijie Guo <guosi...@gmail.com>
AuthorDate: Fri Oct 5 16:31:51 2018 -0700

    [functions][worker] timeout creating producer for worker (#2738)
    
    
    
    *Motivation*
    
    Sometime when we run worker service as part of broker, some pods can be 
hanging on creating producers to assignment topics.
    It is unknown whether is it a k8s problem or not. But in general, timeout 
to fail fast to allow k8s to reschedule the pods.
    
    *Changes*
    
    Add a timeout logic at creating producers.
---
 .../pulsar/functions/worker/SchedulerManager.java  | 53 ++++++++++++++++------
 .../functions/worker/SchedulerManagerTest.java     | 15 +-----
 2 files changed, 41 insertions(+), 27 deletions(-)

diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
index 69725ad..24217cf 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
@@ -18,19 +18,19 @@
  */
 package org.apache.pulsar.functions.worker;
 
-import static 
org.apache.pulsar.functions.worker.SchedulerManager.checkHeartBeatFunction;
-
+import com.google.common.base.Stopwatch;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
 
 import org.apache.commons.lang3.tuple.ImmutablePair;
@@ -88,21 +88,48 @@ public class SchedulerManager implements AutoCloseable {
         this.scheduler = 
Reflections.createInstance(workerConfig.getSchedulerClassName(), 
IScheduler.class,
                 Thread.currentThread().getContextClassLoader());
 
-        try {
-            this.producer = 
pulsarClient.newProducer().topic(this.workerConfig.getFunctionAssignmentTopic())
-                    
.enableBatching(false).blockIfQueueFull(true).compressionType(CompressionType.LZ4).
-                    sendTimeout(0, TimeUnit.MILLISECONDS).create();
-        } catch (PulsarClientException e) {
-            log.error("Failed to create producer to function assignment topic "
-                    + this.workerConfig.getFunctionAssignmentTopic(), e);
-            throw new RuntimeException(e);
-        }
-
+        this.producer = createProducer(pulsarClient, workerConfig);
         this.executorService = executor;
         
         scheduleCompaction(executor, 
workerConfig.getTopicCompactionFrequencySec());
     }
 
+    private static Producer<byte[]> createProducer(PulsarClient client, 
WorkerConfig config) {
+        Stopwatch stopwatch = Stopwatch.createStarted();
+        for (int i = 0; i < 6; i++) {
+            try {
+                return 
client.newProducer().topic(config.getFunctionAssignmentTopic())
+                    .enableBatching(false)
+                    .blockIfQueueFull(true)
+                    .compressionType(CompressionType.LZ4)
+                    .sendTimeout(0, TimeUnit.MILLISECONDS)
+                    .createAsync().get(10, TimeUnit.SECONDS);
+            } catch (InterruptedException e) {
+                log.error("Interrupted at creating producer to topic {}", 
config.getFunctionAssignmentTopic(), e);
+                Thread.currentThread().interrupt();
+                throw new RuntimeException(e);
+            } catch (ExecutionException e) {
+                log.error("Encountered exceptions at creating producer for 
topic {}",
+                    config.getFunctionAssignmentTopic(), e);
+                throw new RuntimeException(e);
+            } catch (TimeoutException e) {
+                try {
+                    log.info("Can't create a producer on assignment topic {} 
in {} seconds, retry in 10 seconds ...",
+                        stopwatch.elapsed(TimeUnit.SECONDS));
+                    TimeUnit.SECONDS.sleep(10);
+                } catch (InterruptedException e1) {
+                    log.error("Interrupted at creating producer to topic {}", 
config.getFunctionAssignmentTopic(), e);
+                    Thread.currentThread().interrupt();
+                    throw new RuntimeException(e);
+                }
+                continue;
+            }
+        }
+        throw new RuntimeException("Can't create a producer on assignment 
topic "
+            + config.getFunctionAssignmentTopic() + " in " + 
stopwatch.elapsed(TimeUnit.SECONDS)
+            + " seconds, fail fast ...");
+    }
+
     public Future<?> schedule() {
         return executorService.submit(() -> {
             synchronized (SchedulerManager.this) {
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
index d3e0118..6dde08c 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
@@ -53,7 +53,6 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.Assignment;
-import org.apache.pulsar.functions.proto.Request;
 import org.apache.pulsar.functions.runtime.ThreadRuntimeFactory;
 import org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler;
 import org.mockito.Mockito;
@@ -81,18 +80,6 @@ public class SchedulerManagerTest {
     private TypedMessageBuilder<byte[]> message;
     private ScheduledExecutorService executor;
 
-    private static PulsarClient mockPulsarClient() throws 
PulsarClientException {
-        ProducerBuilder<byte[]> builder = mock(ProducerBuilder.class);
-        when(builder.topic(anyString())).thenReturn(builder);
-
-        when(builder.create()).thenReturn(mock(Producer.class));
-
-        PulsarClient client = mock(PulsarClient.class);
-        when(client.newProducer()).thenReturn(builder);
-
-        return client;
-    }
-
     @BeforeMethod
     public void setup() throws PulsarClientException {
         WorkerConfig workerConfig = new WorkerConfig();
@@ -121,7 +108,7 @@ public class SchedulerManagerTest {
         
when(builder.compressionType(any(CompressionType.class))).thenReturn(builder);
         when(builder.sendTimeout(anyInt(), 
any(TimeUnit.class))).thenReturn(builder);
 
-        when(builder.create()).thenReturn(producer);
+        
when(builder.createAsync()).thenReturn(CompletableFuture.completedFuture(producer));
 
         PulsarClient pulsarClient = mock(PulsarClient.class);
         when(pulsarClient.newProducer()).thenReturn(builder);

Reply via email to