This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch retry_creation in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 9f62a190b577243c1044e718de7f99ea3792693e Author: Sanjeev Kulkarni <sanj...@streaml.io> AuthorDate: Thu Feb 28 16:05:33 2019 -0800 Retry creation of assignment topic a few times before giving up --- .../pulsar/functions/worker/SchedulerManager.java | 23 +++++----------------- 1 file changed, 5 insertions(+), 18 deletions(-) diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java index 2a93494..f50acc3 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java @@ -105,25 +105,12 @@ public class SchedulerManager implements AutoCloseable { .compressionType(CompressionType.LZ4) .sendTimeout(0, TimeUnit.MILLISECONDS) .createAsync().get(10, TimeUnit.SECONDS); + } catch (Exception e) { + log.error("Exception while at creating producer to topic {}", config.getFunctionAssignmentTopic(), e); + } + try { + Thread.sleep(10000); } catch (InterruptedException e) { - log.error("Interrupted at creating producer to topic {}", config.getFunctionAssignmentTopic(), e); - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } catch (ExecutionException e) { - log.error("Encountered exceptions at creating producer for topic {}", - config.getFunctionAssignmentTopic(), e); - throw new RuntimeException(e); - } catch (TimeoutException e) { - try { - log.info("Can't create a producer on assignment topic {} in {} seconds, retry in 10 seconds ...", - stopwatch.elapsed(TimeUnit.SECONDS)); - TimeUnit.SECONDS.sleep(10); - } catch (InterruptedException e1) { - log.error("Interrupted at creating producer to topic {}", config.getFunctionAssignmentTopic(), e); - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } - continue; } } throw new RuntimeException("Can't create a producer on assignment topic "