This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new be5e847 [functions] change instance id from string to int and expose number of instances in context (#2411) be5e847 is described below commit be5e847a18c12453c6a98b93eb1bb3da6eeb70b8 Author: Sijie Guo <guosi...@gmail.com> AuthorDate: Wed Sep 26 10:11:33 2018 -0700 [functions] change instance id from string to int and expose number of instances in context (#2411) * [functions] change instance id from string to int and expose number of instances in context ### Motivation When writing a connector reading from a list of sources, it is hard for the connector implementation to decide how to distribute the list of sources across the function instances. because there is no way to tell how many function instances is running. ### Changes - change instance id from string to integer (since the implementation is already assuming instance id is an integer) - add getNumInstances in the context - expose both interfaces in source and sink connector context * Fix compilation --- .../java/org/apache/pulsar/admin/cli/CmdFunctions.java | 2 +- .../java/org/apache/pulsar/functions/api/Context.java | 9 ++++++++- .../org/apache/pulsar/functions/instance/ContextImpl.java | 9 +++++++-- .../apache/pulsar/functions/instance/InstanceConfig.java | 11 ++++++++++- .../pulsar/functions/instance/JavaInstanceRunnable.java | 15 +++++++-------- .../apache/pulsar/functions/windowing/WindowContext.java | 2 +- .../pulsar/functions/windowing/WindowContextImpl.java | 2 +- .../apache/pulsar/functions/runtime/JavaInstanceMain.java | 2 +- .../apache/pulsar/functions/runtime/ProcessRuntime.java | 2 +- .../apache/pulsar/functions/runtime/RuntimeSpawner.java | 4 ++-- .../pulsar/functions/runtime/ProcessRuntimeTest.java | 2 +- .../apache/pulsar/functions/worker/FunctionActioner.java | 2 +- .../main/java/org/apache/pulsar/io/core/SinkContext.java | 15 +++++++++++++++ .../java/org/apache/pulsar/io/core/SourceContext.java | 15 +++++++++++++++ 14 files changed, 71 insertions(+), 21 deletions(-) diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java index b176fe1..395e881 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java @@ -1258,7 +1258,7 @@ public class CmdFunctions extends CmdBase { // TODO: correctly implement function version and id instanceConfig.setFunctionVersion(UUID.randomUUID().toString()); instanceConfig.setFunctionId(UUID.randomUUID().toString()); - instanceConfig.setInstanceId(Integer.toString(i + instanceIdOffset)); + instanceConfig.setInstanceId(i + instanceIdOffset); instanceConfig.setMaxBufferedTuples(1024); instanceConfig.setPort(Utils.findAvailablePort()); RuntimeSpawner runtimeSpawner = new RuntimeSpawner( diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java index 2856b7c..c66ea6e 100644 --- a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java +++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java @@ -86,7 +86,14 @@ public interface Context { * * @return the instance id */ - String getInstanceId(); + int getInstanceId(); + + /** + * Get the number of instances that invoke this function. + * + * @return the number of instances that invoke this function. + */ + int getNumInstances(); /** * The version of the function that we are executing diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java index c4099f4..4d47433 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java @@ -177,8 +177,13 @@ class ContextImpl implements Context, SinkContext, SourceContext { } @Override - public String getInstanceId() { - return config.getInstanceId().toString(); + public int getInstanceId() { + return config.getInstanceId(); + } + + @Override + public int getNumInstances() { + return config.getFunctionDetails().getParallelism(); } @Override diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java index 9f9da79..040af91 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java @@ -35,10 +35,19 @@ import org.apache.pulsar.functions.proto.Function.FunctionDetails; @EqualsAndHashCode @ToString public class InstanceConfig { - private String instanceId; + private int instanceId; private String functionId; private String functionVersion; private FunctionDetails functionDetails; private int maxBufferedTuples; private int port; + + /** + * Get the string representation of {@link #getInstanceId()}. + * + * @return the string representation of {@link #getInstanceId()}. + */ + public String getInstanceName() { + return "" + instanceId; + } } diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index 32e878d..0f8f6ca 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -51,7 +51,6 @@ import org.apache.logging.log4j.ThreadContext; import org.apache.logging.log4j.core.LoggerContext; import org.apache.logging.log4j.core.config.Configuration; import org.apache.logging.log4j.core.config.LoggerConfig; -import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.PulsarClientImpl; @@ -65,7 +64,6 @@ import org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData.Build import org.apache.pulsar.functions.sink.PulsarSink; import org.apache.pulsar.functions.sink.PulsarSinkConfig; import org.apache.pulsar.functions.sink.PulsarSinkDisable; -import org.apache.pulsar.functions.source.PulsarRecord; import org.apache.pulsar.functions.source.PulsarSource; import org.apache.pulsar.functions.source.PulsarSourceConfig; import org.apache.pulsar.functions.utils.ConsumerConfig; @@ -144,7 +142,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { // initialize the thread context ThreadContext.put("function", FunctionDetailsUtils.getFullyQualifiedName(instanceConfig.getFunctionDetails())); ThreadContext.put("functionname", instanceConfig.getFunctionDetails().getName()); - ThreadContext.put("instance", instanceConfig.getInstanceId()); + ThreadContext.put("instance", instanceConfig.getInstanceName()); log.info("Starting Java Instance {} : \n Details = {}", instanceConfig.getFunctionDetails().getName(), instanceConfig.getFunctionDetails()); @@ -239,17 +237,18 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { } private void loadJars() throws Exception { - try { // Let's first try to treat it as a nar archive - fnCache.registerFunctionInstanceWithArchive(instanceConfig.getFunctionId(), instanceConfig.getInstanceId(), - jarFile); + fnCache.registerFunctionInstanceWithArchive( + instanceConfig.getFunctionId(), + instanceConfig.getInstanceName(), + jarFile); } catch (FileNotFoundException e) { log.info("For Function {} Loading as NAR failed with {}; treating it as Jar instead", instanceConfig, e); // create the function class loader fnCache.registerFunctionInstance( instanceConfig.getFunctionId(), - instanceConfig.getInstanceId(), + instanceConfig.getInstanceName(), Arrays.asList(jarFile), Collections.emptyList()); } @@ -393,7 +392,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { // once the thread quits, clean up the instance fnCache.unregisterFunctionInstance( instanceConfig.getFunctionId(), - instanceConfig.getInstanceId()); + instanceConfig.getInstanceName()); log.info("Unloading JAR files for function {}", instanceConfig); } diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowContext.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowContext.java index 1bb54cd..63e395c 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowContext.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowContext.java @@ -40,7 +40,7 @@ public interface WindowContext { * * @return the instance id */ - String getInstanceId(); + int getInstanceId(); /** * The version of the function that we are executing diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowContextImpl.java index c7b3919..e03ed97 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowContextImpl.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowContextImpl.java @@ -42,7 +42,7 @@ public class WindowContextImpl implements WindowContext { } @Override - public String getInstanceId() { + public int getInstanceId() { return this.context.getInstanceId(); } diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java index 38a4c28..c18eff5 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java @@ -54,7 +54,7 @@ public class JavaInstanceMain implements AutoCloseable { protected String jarFile; @Parameter(names = "--instance_id", description = "Instance Id\n", required = true) - protected String instanceId; + protected int instanceId; @Parameter(names = "--function_id", description = "Function Id\n", required = true) protected String functionId; diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java index 044f636..63f6f3f 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java @@ -128,7 +128,7 @@ class ProcessRuntime implements Runtime { // TODO:- Find a platform independent way of controlling memory for a python application } args.add("--instance_id"); - args.add(instanceConfig.getInstanceId()); + args.add(instanceConfig.getInstanceName()); args.add("--function_id"); args.add(instanceConfig.getFunctionId()); args.add("--function_version"); diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java index 7049300..030a5a7 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java @@ -106,8 +106,8 @@ public class RuntimeSpawner implements AutoCloseable { public CompletableFuture<FunctionStatus> getFunctionStatus() { return runtime.getFunctionStatus().thenApply(f -> { - FunctionStatus.Builder builder = FunctionStatus.newBuilder(); - builder.mergeFrom(f).setNumRestarts(numRestarts).setInstanceId(instanceConfig.getInstanceId()); + FunctionStatus.Builder builder = FunctionStatus.newBuilder(); + builder.mergeFrom(f).setNumRestarts(numRestarts).setInstanceId(instanceConfig.getInstanceName()); if (!f.getRunning() && runtimeDeathException != null) { builder.setFailureException(runtimeDeathException.getMessage()); } diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java index 97881a4..2345783 100644 --- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java +++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java @@ -104,7 +104,7 @@ public class ProcessRuntimeTest { config.setFunctionDetails(createFunctionDetails(runtime)); config.setFunctionId(java.util.UUID.randomUUID().toString()); config.setFunctionVersion("1.0"); - config.setInstanceId(java.util.UUID.randomUUID().toString()); + config.setInstanceId(0); config.setMaxBufferedTuples(1024); return config; diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java index 724d80c..a3355b0 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java @@ -165,7 +165,7 @@ public class FunctionActioner implements AutoCloseable { // TODO: set correct function id and version when features implemented instanceConfig.setFunctionId(UUID.randomUUID().toString()); instanceConfig.setFunctionVersion(UUID.randomUUID().toString()); - instanceConfig.setInstanceId(String.valueOf(instanceId)); + instanceConfig.setInstanceId(instanceId); instanceConfig.setMaxBufferedTuples(1024); instanceConfig.setPort(org.apache.pulsar.functions.utils.Utils.findAvailablePort()); diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java index 2d58e0c..bf8678b 100644 --- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java +++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java @@ -19,6 +19,21 @@ package org.apache.pulsar.io.core; public interface SinkContext { + + /** + * The id of the instance that invokes this function. + * + * @return the instance id + */ + int getInstanceId(); + + /** + * Get the number of instances that invoke this function. + * + * @return the number of instances that invoke this function. + */ + int getNumInstances(); + /** * Record a user defined metric * @param metricName The name of the metric diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SourceContext.java b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SourceContext.java index 3ea707e..b557f53 100644 --- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SourceContext.java +++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SourceContext.java @@ -19,6 +19,21 @@ package org.apache.pulsar.io.core; public interface SourceContext { + + /** + * The id of the instance that invokes this function. + * + * @return the instance id + */ + int getInstanceId(); + + /** + * Get the number of instances that invoke this function. + * + * @return the number of instances that invoke this function. + */ + int getNumInstances(); + /** * Record a user defined metric * @param metricName The name of the metric