sijie closed pull request #2201: function-instance lookup: retrieve function instance owner's workerId URL: https://github.com/apache/incubator-pulsar/pull/2201
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java index 78fd2e2e40..a7b34ae857 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java @@ -104,6 +104,7 @@ final String tenant = "external-repl-prop"; String pulsarFunctionsNamespace = tenant + "/use/pulsar-function-admin"; String primaryHost; + String workerId; private final int ZOOKEEPER_PORT = PortManager.nextFreePort(); private final int brokerWebServicePort = PortManager.nextFreePort(); @@ -220,9 +221,9 @@ private WorkerService createPulsarFunctionWorker(ServiceConfiguration config) { workerConfig.setWorkerPort(workerServicePort); workerConfig.setPulsarFunctionsCluster(config.getClusterName()); String hostname = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(config.getAdvertisedAddress()); + this.workerId = "c-" + config.getClusterName() + "-fw-" + hostname + "-" + workerConfig.getWorkerPort(); workerConfig.setWorkerHostname(hostname); - workerConfig - .setWorkerId("c-" + config.getClusterName() + "-fw-" + hostname + "-" + workerConfig.getWorkerPort()); + workerConfig.setWorkerId(workerId); workerConfig.setClientAuthenticationPlugin(AuthenticationTls.class.getName()); workerConfig.setClientAuthenticationParameters( @@ -368,8 +369,10 @@ public void testPulsarSinkStats() throws Exception { double count = metricsData.get(JavaInstanceRunnable.METRICS_TOTAL_PROCESSED).getCount(); double success = metricsData.get(JavaInstanceRunnable.METRICS_TOTAL_SUCCESS).getCount(); + String ownerWorkerId = stats.getWorkerId(); Assert.assertEquals((int) count, totalMsgs); Assert.assertEquals((int) success, totalMsgs); + Assert.assertEquals(ownerWorkerId, workerId); } protected FunctionDetails createSinkConfig(String jarFile, String tenant, String namespace, String functionName, String sinkTopic, String subscriptionName) { diff --git a/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto b/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto index 1a80f5ecf6..0078bc2544 100644 --- a/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto +++ b/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto @@ -50,6 +50,8 @@ message FunctionStatus { int64 lastInvocationTime = 13; string instanceId = 14; MetricsData metrics = 15; + // owner of function-instance + string workerId = 16; } message FunctionStatusList { diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java index bc6dbe3686..05a79d8da5 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java @@ -239,9 +239,10 @@ public synchronized void removeAssignments(Collection<Assignment> assignments) { */ public InstanceCommunication.FunctionStatus getFunctionInstanceStatus(String tenant, String namespace, String functionName, int instanceId) { - String workerId = this.workerConfig.getWorkerId(); - Assignment assignment = this.findAssignment(tenant, namespace, functionName, instanceId); + final String assignedWorkerId = assignment.getWorkerId(); + final String workerId = this.workerConfig.getWorkerId(); + if (assignment == null) { InstanceCommunication.FunctionStatus.Builder functionStatusBuilder = InstanceCommunication.FunctionStatus.newBuilder(); @@ -252,13 +253,16 @@ public synchronized void removeAssignments(Collection<Assignment> assignments) { InstanceCommunication.FunctionStatus functionStatus = null; // If I am running worker - if (assignment.getWorkerId().equals(workerId)) { + if (assignedWorkerId.equals(workerId)) { FunctionRuntimeInfo functionRuntimeInfo = this.getFunctionRuntimeInfo( Utils.getFullyQualifiedInstanceId(assignment.getInstance())); RuntimeSpawner runtimeSpawner = functionRuntimeInfo.getRuntimeSpawner(); if (runtimeSpawner != null) { try { - functionStatus = functionRuntimeInfo.getRuntimeSpawner().getFunctionStatus().get(); + InstanceCommunication.FunctionStatus.Builder functionStatusBuilder = InstanceCommunication.FunctionStatus + .newBuilder(functionRuntimeInfo.getRuntimeSpawner().getFunctionStatus().get()); + functionStatusBuilder.setWorkerId(assignedWorkerId); + functionStatus = functionStatusBuilder.build(); } catch (InterruptedException | ExecutionException e) { throw new RuntimeException(e); } @@ -270,6 +274,7 @@ public synchronized void removeAssignments(Collection<Assignment> assignments) { if (functionRuntimeInfo.getStartupException() != null) { functionStatusBuilder.setFailureException(functionRuntimeInfo.getStartupException().getMessage()); } + functionStatusBuilder.setWorkerId(assignedWorkerId); functionStatus = functionStatusBuilder.build(); } } else { @@ -306,6 +311,7 @@ public synchronized void removeAssignments(Collection<Assignment> assignments) { log.warn("Got invalid function status response from {}", workerInfo, e); throw new RuntimeException(e); } + functionStatusBuilder.setWorkerId(assignedWorkerId); functionStatus = functionStatusBuilder.build(); } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services