sijie closed pull request #2201: function-instance lookup: retrieve function 
instance owner's workerId
URL: https://github.com/apache/incubator-pulsar/pull/2201
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
index 78fd2e2e40..a7b34ae857 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
@@ -104,6 +104,7 @@
     final String tenant = "external-repl-prop";
     String pulsarFunctionsNamespace = tenant + "/use/pulsar-function-admin";
     String primaryHost;
+    String workerId;
 
     private final int ZOOKEEPER_PORT = PortManager.nextFreePort();
     private final int brokerWebServicePort = PortManager.nextFreePort();
@@ -220,9 +221,9 @@ private WorkerService 
createPulsarFunctionWorker(ServiceConfiguration config) {
         workerConfig.setWorkerPort(workerServicePort);
         workerConfig.setPulsarFunctionsCluster(config.getClusterName());
         String hostname = 
ServiceConfigurationUtils.getDefaultOrConfiguredAddress(config.getAdvertisedAddress());
+        this.workerId = "c-" + config.getClusterName() + "-fw-" + hostname + 
"-" + workerConfig.getWorkerPort();
         workerConfig.setWorkerHostname(hostname);
-        workerConfig
-                .setWorkerId("c-" + config.getClusterName() + "-fw-" + 
hostname + "-" + workerConfig.getWorkerPort());
+        workerConfig.setWorkerId(workerId);
 
         
workerConfig.setClientAuthenticationPlugin(AuthenticationTls.class.getName());
         workerConfig.setClientAuthenticationParameters(
@@ -368,8 +369,10 @@ public void testPulsarSinkStats() throws Exception {
 
         double count = 
metricsData.get(JavaInstanceRunnable.METRICS_TOTAL_PROCESSED).getCount();
         double success = 
metricsData.get(JavaInstanceRunnable.METRICS_TOTAL_SUCCESS).getCount();
+        String ownerWorkerId = stats.getWorkerId();
         Assert.assertEquals((int) count, totalMsgs);
         Assert.assertEquals((int) success, totalMsgs);
+        Assert.assertEquals(ownerWorkerId, workerId);
     }
 
     protected FunctionDetails createSinkConfig(String jarFile, String tenant, 
String namespace, String functionName, String sinkTopic, String 
subscriptionName) {
diff --git a/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto 
b/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto
index 1a80f5ecf6..0078bc2544 100644
--- a/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto
+++ b/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto
@@ -50,6 +50,8 @@ message FunctionStatus {
     int64 lastInvocationTime = 13;
     string instanceId = 14;
     MetricsData metrics = 15;
+    // owner of function-instance
+    string workerId = 16;
 }
 
 message FunctionStatusList {
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index bc6dbe3686..05a79d8da5 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -239,9 +239,10 @@ public synchronized void 
removeAssignments(Collection<Assignment> assignments) {
      */
     public InstanceCommunication.FunctionStatus 
getFunctionInstanceStatus(String tenant, String namespace,
                                                                           
String functionName, int instanceId) {
-        String workerId = this.workerConfig.getWorkerId();
-
         Assignment assignment = this.findAssignment(tenant, namespace, 
functionName, instanceId);
+        final String assignedWorkerId = assignment.getWorkerId();
+        final String workerId = this.workerConfig.getWorkerId();
+        
         if (assignment == null) {
             InstanceCommunication.FunctionStatus.Builder functionStatusBuilder
                     = InstanceCommunication.FunctionStatus.newBuilder();
@@ -252,13 +253,16 @@ public synchronized void 
removeAssignments(Collection<Assignment> assignments) {
 
         InstanceCommunication.FunctionStatus functionStatus = null;
         // If I am running worker
-        if (assignment.getWorkerId().equals(workerId)) {
+        if (assignedWorkerId.equals(workerId)) {
             FunctionRuntimeInfo functionRuntimeInfo = 
this.getFunctionRuntimeInfo(
                     
Utils.getFullyQualifiedInstanceId(assignment.getInstance()));
             RuntimeSpawner runtimeSpawner = 
functionRuntimeInfo.getRuntimeSpawner();
             if (runtimeSpawner != null) {
                 try {
-                    functionStatus = 
functionRuntimeInfo.getRuntimeSpawner().getFunctionStatus().get();
+                    InstanceCommunication.FunctionStatus.Builder 
functionStatusBuilder = InstanceCommunication.FunctionStatus
+                            
.newBuilder(functionRuntimeInfo.getRuntimeSpawner().getFunctionStatus().get());
+                    functionStatusBuilder.setWorkerId(assignedWorkerId);
+                    functionStatus = functionStatusBuilder.build();
                 } catch (InterruptedException | ExecutionException e) {
                     throw new RuntimeException(e);
                 }
@@ -270,6 +274,7 @@ public synchronized void 
removeAssignments(Collection<Assignment> assignments) {
                 if (functionRuntimeInfo.getStartupException() != null) {
                     
functionStatusBuilder.setFailureException(functionRuntimeInfo.getStartupException().getMessage());
                 }
+                functionStatusBuilder.setWorkerId(assignedWorkerId);
                 functionStatus = functionStatusBuilder.build();
             }
         } else {
@@ -306,6 +311,7 @@ public synchronized void 
removeAssignments(Collection<Assignment> assignments) {
                 log.warn("Got invalid function status response from {}", 
workerInfo, e);
                 throw new RuntimeException(e);
             }
+            functionStatusBuilder.setWorkerId(assignedWorkerId);
             functionStatus = functionStatusBuilder.build();
         }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to