This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 98eeaf5 function-instance lookup: retrieve function instance owner's workerId (#2201) 98eeaf5 is described below commit 98eeaf5fb8344e021e1ff3a20c6c36759a6644bd Author: Rajan Dhabalia <rdhaba...@apache.org> AuthorDate: Thu Jul 19 09:50:46 2018 -0700 function-instance lookup: retrieve function instance owner's workerId (#2201) ### Motivation for administrative purpose, we need to know worker-owner of the function-instance. so, adding workerId along with function-stats to find out owner of the instance. --- .../test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java | 7 +++++-- .../proto/src/main/proto/InstanceCommunication.proto | 2 ++ .../pulsar/functions/worker/FunctionRuntimeManager.java | 14 ++++++++++---- 3 files changed, 17 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java index 78fd2e2..a7b34ae 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java @@ -104,6 +104,7 @@ public class PulsarSinkE2ETest { final String tenant = "external-repl-prop"; String pulsarFunctionsNamespace = tenant + "/use/pulsar-function-admin"; String primaryHost; + String workerId; private final int ZOOKEEPER_PORT = PortManager.nextFreePort(); private final int brokerWebServicePort = PortManager.nextFreePort(); @@ -220,9 +221,9 @@ public class PulsarSinkE2ETest { workerConfig.setWorkerPort(workerServicePort); workerConfig.setPulsarFunctionsCluster(config.getClusterName()); String hostname = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(config.getAdvertisedAddress()); + this.workerId = "c-" + config.getClusterName() + "-fw-" + hostname + "-" + workerConfig.getWorkerPort(); workerConfig.setWorkerHostname(hostname); - workerConfig - .setWorkerId("c-" + config.getClusterName() + "-fw-" + hostname + "-" + workerConfig.getWorkerPort()); + workerConfig.setWorkerId(workerId); workerConfig.setClientAuthenticationPlugin(AuthenticationTls.class.getName()); workerConfig.setClientAuthenticationParameters( @@ -368,8 +369,10 @@ public class PulsarSinkE2ETest { double count = metricsData.get(JavaInstanceRunnable.METRICS_TOTAL_PROCESSED).getCount(); double success = metricsData.get(JavaInstanceRunnable.METRICS_TOTAL_SUCCESS).getCount(); + String ownerWorkerId = stats.getWorkerId(); Assert.assertEquals((int) count, totalMsgs); Assert.assertEquals((int) success, totalMsgs); + Assert.assertEquals(ownerWorkerId, workerId); } protected FunctionDetails createSinkConfig(String jarFile, String tenant, String namespace, String functionName, String sinkTopic, String subscriptionName) { diff --git a/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto b/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto index 1a80f5e..0078bc2 100644 --- a/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto +++ b/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto @@ -50,6 +50,8 @@ message FunctionStatus { int64 lastInvocationTime = 13; string instanceId = 14; MetricsData metrics = 15; + // owner of function-instance + string workerId = 16; } message FunctionStatusList { diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java index bc6dbe3..05a79d8 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java @@ -239,9 +239,10 @@ public class FunctionRuntimeManager implements AutoCloseable{ */ public InstanceCommunication.FunctionStatus getFunctionInstanceStatus(String tenant, String namespace, String functionName, int instanceId) { - String workerId = this.workerConfig.getWorkerId(); - Assignment assignment = this.findAssignment(tenant, namespace, functionName, instanceId); + final String assignedWorkerId = assignment.getWorkerId(); + final String workerId = this.workerConfig.getWorkerId(); + if (assignment == null) { InstanceCommunication.FunctionStatus.Builder functionStatusBuilder = InstanceCommunication.FunctionStatus.newBuilder(); @@ -252,13 +253,16 @@ public class FunctionRuntimeManager implements AutoCloseable{ InstanceCommunication.FunctionStatus functionStatus = null; // If I am running worker - if (assignment.getWorkerId().equals(workerId)) { + if (assignedWorkerId.equals(workerId)) { FunctionRuntimeInfo functionRuntimeInfo = this.getFunctionRuntimeInfo( Utils.getFullyQualifiedInstanceId(assignment.getInstance())); RuntimeSpawner runtimeSpawner = functionRuntimeInfo.getRuntimeSpawner(); if (runtimeSpawner != null) { try { - functionStatus = functionRuntimeInfo.getRuntimeSpawner().getFunctionStatus().get(); + InstanceCommunication.FunctionStatus.Builder functionStatusBuilder = InstanceCommunication.FunctionStatus + .newBuilder(functionRuntimeInfo.getRuntimeSpawner().getFunctionStatus().get()); + functionStatusBuilder.setWorkerId(assignedWorkerId); + functionStatus = functionStatusBuilder.build(); } catch (InterruptedException | ExecutionException e) { throw new RuntimeException(e); } @@ -270,6 +274,7 @@ public class FunctionRuntimeManager implements AutoCloseable{ if (functionRuntimeInfo.getStartupException() != null) { functionStatusBuilder.setFailureException(functionRuntimeInfo.getStartupException().getMessage()); } + functionStatusBuilder.setWorkerId(assignedWorkerId); functionStatus = functionStatusBuilder.build(); } } else { @@ -306,6 +311,7 @@ public class FunctionRuntimeManager implements AutoCloseable{ log.warn("Got invalid function status response from {}", workerInfo, e); throw new RuntimeException(e); } + functionStatusBuilder.setWorkerId(assignedWorkerId); functionStatus = functionStatusBuilder.build(); }