This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 98eeaf5  function-instance lookup: retrieve function instance owner's 
workerId (#2201)
98eeaf5 is described below

commit 98eeaf5fb8344e021e1ff3a20c6c36759a6644bd
Author: Rajan Dhabalia <rdhaba...@apache.org>
AuthorDate: Thu Jul 19 09:50:46 2018 -0700

    function-instance lookup: retrieve function instance owner's workerId 
(#2201)
    
    ### Motivation
    
    for administrative purpose, we need to know worker-owner of the 
function-instance. so, adding workerId along with function-stats to find out 
owner of the instance.
---
 .../test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java  |  7 +++++--
 .../proto/src/main/proto/InstanceCommunication.proto       |  2 ++
 .../pulsar/functions/worker/FunctionRuntimeManager.java    | 14 ++++++++++----
 3 files changed, 17 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
index 78fd2e2..a7b34ae 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
@@ -104,6 +104,7 @@ public class PulsarSinkE2ETest {
     final String tenant = "external-repl-prop";
     String pulsarFunctionsNamespace = tenant + "/use/pulsar-function-admin";
     String primaryHost;
+    String workerId;
 
     private final int ZOOKEEPER_PORT = PortManager.nextFreePort();
     private final int brokerWebServicePort = PortManager.nextFreePort();
@@ -220,9 +221,9 @@ public class PulsarSinkE2ETest {
         workerConfig.setWorkerPort(workerServicePort);
         workerConfig.setPulsarFunctionsCluster(config.getClusterName());
         String hostname = 
ServiceConfigurationUtils.getDefaultOrConfiguredAddress(config.getAdvertisedAddress());
+        this.workerId = "c-" + config.getClusterName() + "-fw-" + hostname + 
"-" + workerConfig.getWorkerPort();
         workerConfig.setWorkerHostname(hostname);
-        workerConfig
-                .setWorkerId("c-" + config.getClusterName() + "-fw-" + 
hostname + "-" + workerConfig.getWorkerPort());
+        workerConfig.setWorkerId(workerId);
 
         
workerConfig.setClientAuthenticationPlugin(AuthenticationTls.class.getName());
         workerConfig.setClientAuthenticationParameters(
@@ -368,8 +369,10 @@ public class PulsarSinkE2ETest {
 
         double count = 
metricsData.get(JavaInstanceRunnable.METRICS_TOTAL_PROCESSED).getCount();
         double success = 
metricsData.get(JavaInstanceRunnable.METRICS_TOTAL_SUCCESS).getCount();
+        String ownerWorkerId = stats.getWorkerId();
         Assert.assertEquals((int) count, totalMsgs);
         Assert.assertEquals((int) success, totalMsgs);
+        Assert.assertEquals(ownerWorkerId, workerId);
     }
 
     protected FunctionDetails createSinkConfig(String jarFile, String tenant, 
String namespace, String functionName, String sinkTopic, String 
subscriptionName) {
diff --git a/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto 
b/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto
index 1a80f5e..0078bc2 100644
--- a/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto
+++ b/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto
@@ -50,6 +50,8 @@ message FunctionStatus {
     int64 lastInvocationTime = 13;
     string instanceId = 14;
     MetricsData metrics = 15;
+    // owner of function-instance
+    string workerId = 16;
 }
 
 message FunctionStatusList {
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index bc6dbe3..05a79d8 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -239,9 +239,10 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
      */
     public InstanceCommunication.FunctionStatus 
getFunctionInstanceStatus(String tenant, String namespace,
                                                                           
String functionName, int instanceId) {
-        String workerId = this.workerConfig.getWorkerId();
-
         Assignment assignment = this.findAssignment(tenant, namespace, 
functionName, instanceId);
+        final String assignedWorkerId = assignment.getWorkerId();
+        final String workerId = this.workerConfig.getWorkerId();
+        
         if (assignment == null) {
             InstanceCommunication.FunctionStatus.Builder functionStatusBuilder
                     = InstanceCommunication.FunctionStatus.newBuilder();
@@ -252,13 +253,16 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
 
         InstanceCommunication.FunctionStatus functionStatus = null;
         // If I am running worker
-        if (assignment.getWorkerId().equals(workerId)) {
+        if (assignedWorkerId.equals(workerId)) {
             FunctionRuntimeInfo functionRuntimeInfo = 
this.getFunctionRuntimeInfo(
                     
Utils.getFullyQualifiedInstanceId(assignment.getInstance()));
             RuntimeSpawner runtimeSpawner = 
functionRuntimeInfo.getRuntimeSpawner();
             if (runtimeSpawner != null) {
                 try {
-                    functionStatus = 
functionRuntimeInfo.getRuntimeSpawner().getFunctionStatus().get();
+                    InstanceCommunication.FunctionStatus.Builder 
functionStatusBuilder = InstanceCommunication.FunctionStatus
+                            
.newBuilder(functionRuntimeInfo.getRuntimeSpawner().getFunctionStatus().get());
+                    functionStatusBuilder.setWorkerId(assignedWorkerId);
+                    functionStatus = functionStatusBuilder.build();
                 } catch (InterruptedException | ExecutionException e) {
                     throw new RuntimeException(e);
                 }
@@ -270,6 +274,7 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
                 if (functionRuntimeInfo.getStartupException() != null) {
                     
functionStatusBuilder.setFailureException(functionRuntimeInfo.getStartupException().getMessage());
                 }
+                functionStatusBuilder.setWorkerId(assignedWorkerId);
                 functionStatus = functionStatusBuilder.build();
             }
         } else {
@@ -306,6 +311,7 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
                 log.warn("Got invalid function status response from {}", 
workerInfo, e);
                 throw new RuntimeException(e);
             }
+            functionStatusBuilder.setWorkerId(assignedWorkerId);
             functionStatus = functionStatusBuilder.build();
         }
 

Reply via email to