rdhabalia closed pull request #2549: [Function] avoid creating assignment snapshot and publish individual assignment msg URL: https://github.com/apache/incubator-pulsar/pull/2549
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml index 444b7fb9f6..0c7b8af695 100644 --- a/conf/functions_worker.yml +++ b/conf/functions_worker.yml @@ -44,6 +44,8 @@ rescheduleTimeoutMs: 60000 initialBrokerReconnectMaxRetries: 60 assignmentWriteMaxRetries: 60 instanceLivenessCheckFreqMs: 30000 +# Frequency how often worker performs compaction on function-topics +topicCompactionFrequencySec: 1800 metricsSamplingPeriodSec: 60 # Enforce authentication authenticationEnabled: false diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java index 425e04921b..612b33601b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java @@ -359,4 +359,4 @@ private void phaseTwoLoop(RawReader reader, MessageId to, Map<String, MessageId> this.latestForKey = latestForKey; } } -} +} \ No newline at end of file diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java index 32c93b4626..8e7b292d6a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java @@ -1268,4 +1268,4 @@ public void testEmptyCompactionLedger() throws Exception { } } -} +} \ No newline at end of file diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java new file mode 100644 index 0000000000..fe28a5119e --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java @@ -0,0 +1,370 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.worker; + +import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically; +import static org.mockito.Mockito.spy; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; + +import java.io.File; +import java.lang.reflect.Method; +import java.net.InetAddress; +import java.net.MalformedURLException; +import java.net.URI; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import org.apache.bookkeeper.test.PortManager; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.ServiceConfigurationUtils; +import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl; +import org.apache.pulsar.client.admin.BrokerStats; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.ClientBuilder; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.TenantInfo; +import org.apache.pulsar.functions.api.utils.IdentityFunction; +import org.apache.pulsar.functions.proto.Function; +import org.apache.pulsar.functions.proto.Function.Assignment; +import org.apache.pulsar.functions.proto.Function.FunctionDetails; +import org.apache.pulsar.functions.proto.Function.SinkSpec; +import org.apache.pulsar.functions.proto.Function.SourceSpec; +import org.apache.pulsar.functions.sink.PulsarSink; +import org.apache.pulsar.functions.utils.Reflections; +import org.apache.pulsar.functions.utils.Utils; +import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.google.gson.Gson; + +import jersey.repackaged.com.google.common.collect.Lists; + +/** + * Test Pulsar sink on function + * + */ +public class PulsarWorkerAssignmentTest { + LocalBookkeeperEnsemble bkEnsemble; + + ServiceConfiguration config; + WorkerConfig workerConfig; + PulsarService pulsar; + PulsarAdmin admin; + PulsarClient pulsarClient; + BrokerStats brokerStatsClient; + WorkerService functionsWorkerService; + final String tenant = "external-repl-prop"; + String pulsarFunctionsNamespace = tenant + "/use/pulsar-function-admin"; + String primaryHost; + String workerId; + + private final int ZOOKEEPER_PORT = PortManager.nextFreePort(); + private final int brokerWebServicePort = PortManager.nextFreePort(); + private final int brokerServicePort = PortManager.nextFreePort(); + private final int workerServicePort = PortManager.nextFreePort(); + + private static final Logger log = LoggerFactory.getLogger(PulsarWorkerAssignmentTest.class); + + @BeforeMethod + void setup(Method method) throws Exception { + + log.info("--- Setting up method {} ---", method.getName()); + + // Start local bookkeeper ensemble + bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, PortManager.nextFreePort()); + bkEnsemble.start(); + + String brokerServiceUrl = "http://127.0.0.1:" + brokerServicePort; + String brokerWeServiceUrl = "http://127.0.0.1:" + brokerWebServicePort; + + config = spy(new ServiceConfiguration()); + config.setClusterName("use"); + Set<String> superUsers = Sets.newHashSet("superUser"); + config.setSuperUserRoles(superUsers); + config.setWebServicePort(brokerWebServicePort); + config.setZookeeperServers("127.0.0.1" + ":" + ZOOKEEPER_PORT); + config.setBrokerServicePort(brokerServicePort); + config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName()); + + functionsWorkerService = createPulsarFunctionWorker(config); + Optional<WorkerService> functionWorkerService = Optional.of(functionsWorkerService); + pulsar = new PulsarService(config, functionWorkerService); + pulsar.start(); + + admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerWeServiceUrl).build()); + + brokerStatsClient = admin.brokerStats(); + primaryHost = String.format("http://%s:%d", InetAddress.getLocalHost().getHostName(), brokerWebServicePort); + + // update cluster metadata + ClusterData clusterData = new ClusterData(brokerServiceUrl); + admin.clusters().updateCluster(config.getClusterName(), clusterData); + + ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(this.workerConfig.getPulsarServiceUrl()); + pulsarClient = clientBuilder.build(); + + TenantInfo propAdmin = new TenantInfo(); + propAdmin.getAdminRoles().add("superUser"); + propAdmin.setAllowedClusters(Sets.newHashSet(Lists.newArrayList("use"))); + admin.tenants().updateTenant(tenant, propAdmin); + + Thread.sleep(100); + } + + @AfterMethod + void shutdown() throws Exception { + log.info("--- Shutting down ---"); + pulsarClient.close(); + admin.close(); + functionsWorkerService.stop(); + pulsar.close(); + bkEnsemble.stop(); + } + + private WorkerService createPulsarFunctionWorker(ServiceConfiguration config) { + workerConfig = new WorkerConfig(); + workerConfig.setPulsarFunctionsNamespace(pulsarFunctionsNamespace); + workerConfig.setSchedulerClassName( + org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler.class.getName()); + workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("use")); + // worker talks to local broker + workerConfig.setPulsarServiceUrl("pulsar://127.0.0.1:" + config.getBrokerServicePort()); + workerConfig.setPulsarWebServiceUrl("http://127.0.0.1:" + config.getWebServicePort()); + workerConfig.setFailureCheckFreqMs(100); + workerConfig.setNumFunctionPackageReplicas(1); + workerConfig.setClusterCoordinationTopicName("coordinate"); + workerConfig.setFunctionAssignmentTopicName("assignment"); + workerConfig.setFunctionMetadataTopicName("metadata"); + workerConfig.setInstanceLivenessCheckFreqMs(100); + workerConfig.setWorkerPort(workerServicePort); + workerConfig.setPulsarFunctionsCluster(config.getClusterName()); + String hostname = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(config.getAdvertisedAddress()); + this.workerId = "c-" + config.getClusterName() + "-fw-" + hostname + "-" + workerConfig.getWorkerPort(); + workerConfig.setWorkerHostname(hostname); + workerConfig.setWorkerId(workerId); + workerConfig.setTopicCompactionFrequencySec(1); + + return new WorkerService(workerConfig); + } + + @Test + public void testFunctionAssignments() throws Exception { + + final String namespacePortion = "assignment-test"; + final String replNamespace = tenant + "/" + namespacePortion; + final String sinkTopic = "persistent://" + replNamespace + "/my-topic1"; + final String functionName = "assign"; + final String subscriptionName = "test-sub"; + admin.namespaces().createNamespace(replNamespace); + Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use")); + admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters); + + String jarFilePathUrl = Utils.FILE + ":" + + PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath(); + FunctionDetails.Builder functionDetailsBuilder = createFunctionDetails(jarFilePathUrl, tenant, namespacePortion, + functionName, "my.*", sinkTopic, subscriptionName); + functionDetailsBuilder.setParallelism(2); + FunctionDetails functionDetails = functionDetailsBuilder.build(); + + // (1) Create function with 2 instance + admin.functions().createFunctionWithUrl(functionDetails, jarFilePathUrl); + retryStrategically((test) -> { + try { + return admin.topics().getStats(sinkTopic).subscriptions.size() == 1 + && admin.topics().getStats(sinkTopic).subscriptions.values().iterator().next().consumers + .size() == 2; + } catch (PulsarAdminException e) { + return false; + } + }, 5, 150); + // validate 2 instances have been started + assertEquals(admin.topics().getStats(sinkTopic).subscriptions.size(), 1); + assertEquals(admin.topics().getStats(sinkTopic).subscriptions.values().iterator().next().consumers.size(), 2); + + // (2) Update function with 1 instance + functionDetailsBuilder.setParallelism(1); + functionDetails = functionDetailsBuilder.build(); + // try to update function to test: update-function functionality + admin.functions().updateFunctionWithUrl(functionDetails, jarFilePathUrl); + retryStrategically((test) -> { + try { + return admin.topics().getStats(sinkTopic).subscriptions.size() == 1 + && admin.topics().getStats(sinkTopic).subscriptions.values().iterator().next().consumers + .size() == 1; + } catch (PulsarAdminException e) { + return false; + } + }, 5, 150); + // validate pulsar sink consumer has started on the topic + assertEquals(admin.topics().getStats(sinkTopic).subscriptions.values().iterator().next().consumers.size(), 1); + } + + @Test(timeOut=20000) + public void testFunctionAssignmentsWithRestart() throws Exception { + + final String namespacePortion = "assignment-test"; + final String replNamespace = tenant + "/" + namespacePortion; + final String sinkTopic = "persistent://" + replNamespace + "/my-topic1"; + final String baseFunctionName = "assign-restart"; + final String subscriptionName = "test-sub"; + final int totalFunctions = 5; + final int parallelism = 2; + admin.namespaces().createNamespace(replNamespace); + Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use")); + admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters); + final FunctionRuntimeManager runtimeManager = functionsWorkerService.getFunctionRuntimeManager(); + + String jarFilePathUrl = Utils.FILE + ":" + + PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath(); + FunctionDetails.Builder functionDetailsBuilder = null; + // (1) Register functions with 2 instances + for (int i = 0; i < totalFunctions; i++) { + String functionName = baseFunctionName + i; + functionDetailsBuilder = createFunctionDetails(jarFilePathUrl, tenant, namespacePortion, functionName, + "my.*", sinkTopic, subscriptionName); + functionDetailsBuilder.setParallelism(parallelism); + // set-auto-ack prop =true + functionDetailsBuilder.setAutoAck(true); + FunctionDetails functionDetails = functionDetailsBuilder.build(); + admin.functions().createFunctionWithUrl(functionDetails, jarFilePathUrl); + } + retryStrategically((test) -> { + try { + Map<String, Assignment> assgn = runtimeManager.getCurrentAssignments().values().iterator().next(); + return assgn.size() == (totalFunctions * parallelism); + } catch (Exception e) { + return false; + } + }, 5, 150); + + // Validate registered assignments + Map<String, Assignment> assignments = runtimeManager.getCurrentAssignments().values().iterator().next(); + assertEquals(assignments.size(), (totalFunctions * parallelism)); + + // (2) Update function with prop=auto-ack and Delete 2 functions + for (int i = 0; i < totalFunctions; i++) { + String functionName = baseFunctionName + i; + functionDetailsBuilder = createFunctionDetails(jarFilePathUrl, tenant, namespacePortion, functionName, + "my.*", sinkTopic, subscriptionName); + functionDetailsBuilder.setParallelism(parallelism); + // set-auto-ack prop =false + functionDetailsBuilder.setAutoAck(false); + FunctionDetails functionDetails = functionDetailsBuilder.build(); + admin.functions().updateFunctionWithUrl(functionDetails, jarFilePathUrl); + } + + int totalDeletedFunction = 2; + for (int i = (totalFunctions - 1); i >= (totalFunctions - totalDeletedFunction); i--) { + String functionName = baseFunctionName + i; + admin.functions().deleteFunction(tenant, namespacePortion, functionName); + } + retryStrategically((test) -> { + try { + Map<String, Assignment> assgn = runtimeManager.getCurrentAssignments().values().iterator().next(); + return assgn.size() == ((totalFunctions - totalDeletedFunction) * parallelism); + } catch (Exception e) { + return false; + } + }, 5, 150); + + // Validate registered assignments + assignments = runtimeManager.getCurrentAssignments().values().iterator().next(); + assertEquals(assignments.size(), ((totalFunctions - totalDeletedFunction) * parallelism)); + + // (3) Restart worker service and check registered functions + URI dlUri = functionsWorkerService.getDlogUri(); + functionsWorkerService.stop(); + functionsWorkerService = new WorkerService(workerConfig); + functionsWorkerService.start(dlUri); + FunctionRuntimeManager runtimeManager2 = functionsWorkerService.getFunctionRuntimeManager(); + retryStrategically((test) -> { + try { + Map<String, Assignment> assgn = runtimeManager2.getCurrentAssignments().values().iterator().next(); + return assgn.size() == ((totalFunctions - totalDeletedFunction) * parallelism); + } catch (Exception e) { + return false; + } + }, 5, 150); + + // Validate registered assignments + assignments = runtimeManager2.getCurrentAssignments().values().iterator().next(); + assertEquals(assignments.size(), ((totalFunctions - totalDeletedFunction) * parallelism)); + + // validate updated function prop = auto-ack=false and instnaceid + for (int i = 0; i < (totalFunctions - totalDeletedFunction); i++) { + String functionName = baseFunctionName + i; + assertFalse(admin.functions().getFunction(tenant, namespacePortion, functionName).getAutoAck()); + } + } + + protected static FunctionDetails.Builder createFunctionDetails(String jarFile, String tenant, String namespace, + String functionName, String sourceTopic, String sinkTopic, String subscriptionName) { + + File file = new File(jarFile); + try { + Reflections.loadJar(file); + } catch (MalformedURLException e) { + throw new RuntimeException("Failed to load user jar " + file, e); + } + String sourceTopicPattern = String.format("persistent://%s/%s/%s", tenant, namespace, sourceTopic); + Class<?> typeArg = byte[].class; + + FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder(); + functionDetailsBuilder.setTenant(tenant); + functionDetailsBuilder.setNamespace(namespace); + functionDetailsBuilder.setName(functionName); + functionDetailsBuilder.setRuntime(FunctionDetails.Runtime.JAVA); + functionDetailsBuilder.setParallelism(1); + functionDetailsBuilder.setClassName(IdentityFunction.class.getName()); + + // set source spec + // source spec classname should be empty so that the default pulsar source will be used + SourceSpec.Builder sourceSpecBuilder = SourceSpec.newBuilder(); + sourceSpecBuilder.setSubscriptionType(Function.SubscriptionType.SHARED); + sourceSpecBuilder.setTypeClassName(typeArg.getName()); + sourceSpecBuilder.setTopicsPattern(sourceTopicPattern); + sourceSpecBuilder.setSubscriptionName(subscriptionName); + sourceSpecBuilder.putTopicsToSerDeClassName(sourceTopicPattern, ""); + functionDetailsBuilder.setAutoAck(true); + functionDetailsBuilder.setSource(sourceSpecBuilder); + + // set up sink spec + SinkSpec.Builder sinkSpecBuilder = SinkSpec.newBuilder(); + // sinkSpecBuilder.setClassName(PulsarSink.class.getName()); + sinkSpecBuilder.setTopic(sinkTopic); + Map<String, Object> sinkConfigMap = Maps.newHashMap(); + sinkSpecBuilder.setConfigs(new Gson().toJson(sinkConfigMap)); + sinkSpecBuilder.setTypeClassName(typeArg.getName()); + functionDetailsBuilder.setSink(sinkSpecBuilder); + + return functionDetailsBuilder; + } + +} \ No newline at end of file diff --git a/pulsar-functions/proto/src/main/proto/Request.proto b/pulsar-functions/proto/src/main/proto/Request.proto index c8e31d75ed..574346911c 100644 --- a/pulsar-functions/proto/src/main/proto/Request.proto +++ b/pulsar-functions/proto/src/main/proto/Request.proto @@ -37,8 +37,3 @@ message ServiceRequest { FunctionMetaData functionMetaData = 3; string workerId = 4; } - -message AssignmentsUpdate { - repeated Assignment assignments = 1; - uint64 version = 2; -} diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java index 366eaba6a0..3ad6c7c6ee 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java @@ -18,14 +18,16 @@ */ package org.apache.pulsar.functions.worker; -import lombok.extern.slf4j.Slf4j; +import java.io.IOException; +import java.util.function.Function; + import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Reader; -import org.apache.pulsar.functions.proto.Request; +import org.apache.pulsar.functions.proto.Function.Assignment; -import java.io.IOException; -import java.util.function.Function; +import lombok.extern.slf4j.Slf4j; @Slf4j public class FunctionAssignmentTailer @@ -34,15 +36,16 @@ private final FunctionRuntimeManager functionRuntimeManager; private final Reader<byte[]> reader; - public FunctionAssignmentTailer(FunctionRuntimeManager functionRuntimeManager, - Reader<byte[]> reader) + public FunctionAssignmentTailer(FunctionRuntimeManager functionRuntimeManager) throws PulsarClientException { - this.functionRuntimeManager = functionRuntimeManager; - this.reader = reader; - } + this.functionRuntimeManager = functionRuntimeManager; - public void start() { + this.reader = functionRuntimeManager.getWorkerService().getClient().newReader() + .topic(functionRuntimeManager.getWorkerConfig().getFunctionAssignmentTopic()).readCompacted(true) + .startMessageId(MessageId.earliest).create(); + } + public void start() { receiveOne(); } @@ -65,29 +68,21 @@ public void close() { @Override public void accept(Message<byte[]> msg) { - - // check if latest - boolean hasMessageAvailable; - try { - hasMessageAvailable = this.reader.hasMessageAvailable(); - } catch (PulsarClientException e) { - throw new RuntimeException(e); - } - if (!hasMessageAvailable) { - Request.AssignmentsUpdate assignmentsUpdate; + if(msg.getData()==null || (msg.getData().length==0)) { + log.info("Received assignment delete: {}", msg.getKey()); + this.functionRuntimeManager.deleteAssignment(msg.getKey()); + } else { + Assignment assignment; try { - assignmentsUpdate = Request.AssignmentsUpdate.parseFrom(msg.getData()); + assignment = Assignment.parseFrom(msg.getData()); } catch (IOException e) { log.error("[{}] Received bad assignment update at message {}", reader.getTopic(), msg.getMessageId(), e); // TODO: find a better way to handle bad request throw new RuntimeException(e); } - if (log.isDebugEnabled()) { - log.debug("Received assignment update: {}", assignmentsUpdate); - } - - this.functionRuntimeManager.processAssignmentUpdate(msg.getMessageId(), assignmentsUpdate); + log.info("Received assignment update: {}", assignment); + this.functionRuntimeManager.processAssignment(assignment); } // receive next request receiveOne(); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java index 43cd27bcb5..e634130f5e 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java @@ -18,48 +18,45 @@ */ package org.apache.pulsar.functions.worker; -import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.net.URI; -import java.net.URISyntaxException; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.stream.Collectors; + +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.client.Client; +import javax.ws.rs.client.ClientBuilder; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.Status; +import javax.ws.rs.core.UriBuilder; -import lombok.extern.slf4j.Slf4j; import org.apache.distributedlog.api.namespace.Namespace; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.MessageId; -import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.common.policies.data.ErrorData; -import org.apache.pulsar.functions.proto.Function.Assignment; import org.apache.pulsar.functions.instance.AuthenticationConfig; +import org.apache.pulsar.functions.proto.Function.Assignment; import org.apache.pulsar.functions.proto.InstanceCommunication; -import org.apache.pulsar.functions.proto.Request.AssignmentsUpdate; -import org.apache.pulsar.functions.runtime.RuntimeFactory; import org.apache.pulsar.functions.runtime.ProcessRuntimeFactory; import org.apache.pulsar.functions.runtime.Runtime; -import org.apache.pulsar.functions.runtime.ThreadRuntimeFactory; +import org.apache.pulsar.functions.runtime.RuntimeFactory; import org.apache.pulsar.functions.runtime.RuntimeSpawner; +import org.apache.pulsar.functions.runtime.ThreadRuntimeFactory; -import javax.ws.rs.WebApplicationException; -import javax.ws.rs.client.Client; -import javax.ws.rs.client.ClientBuilder; -import javax.ws.rs.client.Entity; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.Response; -import javax.ws.rs.core.UriBuilder; -import javax.ws.rs.core.Response.Status; +import com.google.common.annotations.VisibleForTesting; -import java.util.Collection; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.stream.Collectors; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; /** * This class managers all aspects of functions assignments and running of function assignments for this worker @@ -78,13 +75,12 @@ Map<String, FunctionRuntimeInfo> functionRuntimeInfoMap = new ConcurrentHashMap<>(); @VisibleForTesting + @Getter final WorkerConfig workerConfig; @VisibleForTesting LinkedBlockingQueue<FunctionAction> actionQueue; - private long currentAssignmentVersion = 0; - private final FunctionAssignmentTailer functionAssignmentTailer; private FunctionActioner functionActioner; @@ -92,22 +88,19 @@ private RuntimeFactory runtimeFactory; private MembershipManager membershipManager; - private final ConnectorsManager connectorsManager; private final PulsarAdmin functionAdmin; + + @Getter + private WorkerService workerService; public FunctionRuntimeManager(WorkerConfig workerConfig, WorkerService workerService, Namespace dlogNamespace, MembershipManager membershipManager, ConnectorsManager connectorsManager) throws Exception { this.workerConfig = workerConfig; - this.connectorsManager = connectorsManager; + this.workerService = workerService; this.functionAdmin = workerService.getFunctionAdmin(); - Reader<byte[]> reader = workerService.getClient().newReader() - .topic(this.workerConfig.getFunctionAssignmentTopic()) - .startMessageId(MessageId.earliest) - .create(); - - this.functionAssignmentTailer = new FunctionAssignmentTailer(this, reader); + this.functionAssignmentTailer = new FunctionAssignmentTailer(this); AuthenticationConfig authConfig = AuthenticationConfig.builder() .clientAuthenticationPlugin(workerConfig.getClientAuthenticationPlugin()) @@ -222,13 +215,6 @@ public synchronized Assignment findFunctionAssignment(String tenant, String name return assignments; } - /** - * get the current version number of assignments - * @return assignments version number - */ - public synchronized long getCurrentAssignmentVersion() { - return new Long(this.currentAssignmentVersion); - } /** * Removes a collection of assignments @@ -461,105 +447,108 @@ private void stopFunction(String fullyQualifiedInstanceId, boolean restart) thro /** * Process an assignment update from the assignment topic * @param messageId the message id of the update assignment - * @param assignmentsUpdate the assignment update + * @param newAssignment the assignment */ - public synchronized void processAssignmentUpdate(MessageId messageId, AssignmentsUpdate assignmentsUpdate) { + public synchronized void processAssignment(Assignment newAssignment) { - if (assignmentsUpdate.getVersion() > this.currentAssignmentVersion) { + Map<String, Assignment> existingAssignmentMap = new HashMap<>(); + for (Map<String, Assignment> entry : this.workerIdToAssignments.values()) { + existingAssignmentMap.putAll(entry); + } - Map<String, Assignment> assignmentMap = new HashMap<>(); - for (Assignment assignment : assignmentsUpdate.getAssignmentsList()) { - assignmentMap.put( - Utils.getFullyQualifiedInstanceId(assignment.getInstance()), - assignment); + if (existingAssignmentMap.containsKey(Utils.getFullyQualifiedInstanceId(newAssignment.getInstance()))) { + updateAssignment(newAssignment); + } else { + addAssignment(newAssignment); + } + } + + private void updateAssignment(Assignment assignment) { + String fullyQualifiedInstanceId = Utils.getFullyQualifiedInstanceId(assignment.getInstance()); + Assignment existingAssignment = this.findAssignment(assignment); + // potential updates need to happen + if (!existingAssignment.equals(assignment)) { + FunctionRuntimeInfo functionRuntimeInfo = this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId); + //stop function + if (functionRuntimeInfo != null) { + this.insertStopAction(functionRuntimeInfo); } - Map<String, Assignment> existingAssignmentMap = new HashMap<>(); - for (Map<String, Assignment> entry : this.workerIdToAssignments.values()) { - existingAssignmentMap.putAll(entry); + // still assigned to me, need to restart + if (assignment.getWorkerId().equals(this.workerConfig.getWorkerId())) { + //start again + FunctionRuntimeInfo newFunctionRuntimeInfo = new FunctionRuntimeInfo(); + newFunctionRuntimeInfo.setFunctionInstance(assignment.getInstance()); + this.insertStartAction(newFunctionRuntimeInfo); + this.setFunctionRuntimeInfo(fullyQualifiedInstanceId, newFunctionRuntimeInfo); } - Map<String, Assignment> assignmentsToAdd = diff(assignmentMap, existingAssignmentMap); - - Map<String, Assignment> assignmentsToDelete = diff(existingAssignmentMap, assignmentMap); - - Map<String, Assignment> existingAssignments = inCommon(assignmentMap, existingAssignmentMap); - - // functions to add - for (Map.Entry<String, Assignment> assignmentEntry : assignmentsToAdd.entrySet()) { - String fullyQualifiedInstanceId = assignmentEntry.getKey(); - Assignment assignment = assignmentEntry.getValue(); - - //add new function - this.setAssignment(assignment); - - //Assigned to me - if (assignment.getWorkerId().equals(workerConfig.getWorkerId())) { - if (!this.functionRuntimeInfoMap.containsKey(fullyQualifiedInstanceId)) { - this.setFunctionRuntimeInfo(fullyQualifiedInstanceId, new FunctionRuntimeInfo() - .setFunctionInstance(assignment.getInstance())); - - } else { - //Somehow this function is already started - log.warn("Function {} already running. Going to restart function.", - this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId)); - this.insertStopAction(this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId)); - } - FunctionRuntimeInfo functionRuntimeInfo = this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId); - this.insertStartAction(functionRuntimeInfo); - } + // find existing assignment + Assignment existing_assignment = this.findAssignment(assignment); + if (existing_assignment != null) { + // delete old assignment that could have old data + this.deleteAssignment(existing_assignment); } + // set to newest assignment + this.setAssignment(assignment); + } + } + + public synchronized void deleteAssignment(String fullyQualifiedInstanceId) { + FunctionRuntimeInfo functionRuntimeInfo = this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId); + if (functionRuntimeInfo != null) { + this.insertStopAction(functionRuntimeInfo); + this.deleteFunctionRuntimeInfo(fullyQualifiedInstanceId); + } + + String workerId = null; + for(Entry<String, Map<String, Assignment>> workerAssignments : workerIdToAssignments.entrySet()) { + if(workerAssignments.getValue().remove(fullyQualifiedInstanceId)!=null) { + workerId = workerAssignments.getKey(); + break; + } + } + Map<String, Assignment> worker; + if (workerId != null && ((worker = workerIdToAssignments.get(workerId)) != null && worker.isEmpty())) { + this.workerIdToAssignments.remove(workerId); + } + } - // functions to delete - for (Map.Entry<String, Assignment> assignmentEntry : assignmentsToDelete.entrySet()) { - String fullyQualifiedInstanceId = assignmentEntry.getKey(); - Assignment assignment = assignmentEntry.getValue(); - - FunctionRuntimeInfo functionRuntimeInfo = this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId); - if (functionRuntimeInfo != null) { - this.insertStopAction(functionRuntimeInfo); - this.deleteFunctionRuntimeInfo(fullyQualifiedInstanceId); - } - this.deleteAssignment(assignment); + @VisibleForTesting + void deleteAssignment(Assignment assignment) { + String fullyQualifiedInstanceId = Utils.getFullyQualifiedInstanceId(assignment.getInstance()); + Map<String, Assignment> assignmentMap = this.workerIdToAssignments.get(assignment.getWorkerId()); + if (assignmentMap != null) { + if (assignmentMap.containsKey(fullyQualifiedInstanceId)) { + assignmentMap.remove(fullyQualifiedInstanceId); } + if (assignmentMap.isEmpty()) { + this.workerIdToAssignments.remove(assignment.getWorkerId()); + } + } + } - // functions to update - for (Map.Entry<String, Assignment> assignmentEntry : existingAssignments.entrySet()) { - String fullyQualifiedInstanceId = assignmentEntry.getKey(); - Assignment assignment = assignmentEntry.getValue(); - Assignment existingAssignment = this.findAssignment(assignment); - // potential updates need to happen - if (!existingAssignment.equals(assignment)) { - FunctionRuntimeInfo functionRuntimeInfo = this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId); - //stop function - if (functionRuntimeInfo != null) { - this.insertStopAction(functionRuntimeInfo); - } - // still assigned to me, need to restart - if (assignment.getWorkerId().equals(this.workerConfig.getWorkerId())) { - //start again - FunctionRuntimeInfo newFunctionRuntimeInfo = new FunctionRuntimeInfo(); - newFunctionRuntimeInfo.setFunctionInstance(assignment.getInstance()); - this.insertStartAction(newFunctionRuntimeInfo); - this.setFunctionRuntimeInfo(fullyQualifiedInstanceId, newFunctionRuntimeInfo); - } + private void addAssignment(Assignment assignment) { + String fullyQualifiedInstanceId = Utils.getFullyQualifiedInstanceId(assignment.getInstance()); - // find existing assignment - Assignment existing_assignment = this.findAssignment(assignment); - if (existing_assignment != null) { - // delete old assignment that could have old data - this.deleteAssignment(existing_assignment); - } - // set to newest assignment - this.setAssignment(assignment); - } - } + //add new function + this.setAssignment(assignment); - // set as current assignment - this.currentAssignmentVersion = assignmentsUpdate.getVersion(); + //Assigned to me + if (assignment.getWorkerId().equals(workerConfig.getWorkerId())) { + if (!this.functionRuntimeInfoMap.containsKey(fullyQualifiedInstanceId)) { + this.setFunctionRuntimeInfo(fullyQualifiedInstanceId, new FunctionRuntimeInfo() + .setFunctionInstance(assignment.getInstance())); - } else { - log.debug("Received out of date assignment update: {}", assignmentsUpdate); + } else { + //Somehow this function is already started + log.warn("Function {} already running. Going to restart function.", + this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId)); + this.insertStopAction(this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId)); + } + FunctionRuntimeInfo functionRuntimeInfo = this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId); + this.insertStartAction(functionRuntimeInfo); } + } public Map<String, FunctionRuntimeInfo> getFunctionRuntimeInfos() { @@ -642,20 +631,6 @@ void setAssignment(Assignment assignment) { assignment); } - @VisibleForTesting - void deleteAssignment(Assignment assignment) { - Map<String, Assignment> assignmentMap = this.workerIdToAssignments.get(assignment.getWorkerId()); - if (assignmentMap != null) { - String fullyQualifiedInstanceId = Utils.getFullyQualifiedInstanceId(assignment.getInstance()); - if (assignmentMap.containsKey(fullyQualifiedInstanceId)) { - assignmentMap.remove(fullyQualifiedInstanceId); - } - if (assignmentMap.isEmpty()) { - this.workerIdToAssignments.remove(assignment.getWorkerId()); - } - } - } - private void deleteFunctionRuntimeInfo(String fullyQualifiedInstanceId) { this.functionRuntimeInfoMap.remove(fullyQualifiedInstanceId); } @@ -674,28 +649,8 @@ public void close() throws Exception { } } - private Map<String, Assignment> diff(Map<String, Assignment> assignmentMap1, Map<String, Assignment> assignmentMap2) { - Map<String, Assignment> result = new HashMap<>(); - for (Map.Entry<String, Assignment> entry : assignmentMap1.entrySet()) { - if (!assignmentMap2.containsKey(entry.getKey())) { - result.put(entry.getKey(), entry.getValue()); - } - } - return result; - } - - private Map<String, Assignment> inCommon(Map<String, Assignment> assignmentMap1, Map<String, Assignment> assignmentMap2) { - - Map<String, Assignment> result = new HashMap<>(); - for (Map.Entry<String, Assignment> entry : assignmentMap1.entrySet()) { - if (assignmentMap2.containsKey(entry.getKey())) { - result.put(entry.getKey(), entry.getValue()); - } - } - return result; - } - private FunctionRuntimeInfo getFunctionRuntimeInfo(String fullyQualifiedInstanceId) { return this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId); } + } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java index ed00958d7e..db7785a55b 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java @@ -23,30 +23,29 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; -import lombok.Setter; -import lombok.extern.slf4j.Slf4j; - +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.CompressionType; -import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.Function.Assignment; import org.apache.pulsar.functions.proto.Function.FunctionMetaData; -import org.apache.pulsar.functions.proto.Request; import org.apache.pulsar.functions.utils.Reflections; import org.apache.pulsar.functions.worker.scheduler.IScheduler; +import com.google.common.annotations.VisibleForTesting; + +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; + @Slf4j public class SchedulerManager implements AutoCloseable { @@ -65,16 +64,22 @@ private final Producer<byte[]> producer; - private final ExecutorService executorService; + private final ScheduledExecutorService executorService; + + private final PulsarAdmin admin; + + AtomicBoolean isCompactionNeeded = new AtomicBoolean(false); + private static final long DEFAULT_ADMIN_API_BACKOFF_SEC = 60; - public SchedulerManager(WorkerConfig workerConfig, PulsarClient pulsarClient) { + public SchedulerManager(WorkerConfig workerConfig, PulsarClient pulsarClient, PulsarAdmin admin, ScheduledExecutorService executor) { this.workerConfig = workerConfig; + this.admin = admin; this.scheduler = Reflections.createInstance(workerConfig.getSchedulerClassName(), IScheduler.class, Thread.currentThread().getContextClassLoader()); try { this.producer = pulsarClient.newProducer().topic(this.workerConfig.getFunctionAssignmentTopic()) - .enableBatching(true).blockIfQueueFull(true).compressionType(CompressionType.LZ4). + .enableBatching(false).blockIfQueueFull(true).compressionType(CompressionType.LZ4). sendTimeout(0, TimeUnit.MILLISECONDS).create(); } catch (PulsarClientException e) { log.error("Failed to create producer to function assignment topic " @@ -82,9 +87,9 @@ public SchedulerManager(WorkerConfig workerConfig, PulsarClient pulsarClient) { throw new RuntimeException(e); } - this.executorService = - new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue<>()); + this.executorService = executor; + + scheduleCompaction(executor, workerConfig.getTopicCompactionFrequencySec()); } public Future<?> schedule() { @@ -92,13 +97,31 @@ public SchedulerManager(WorkerConfig workerConfig, PulsarClient pulsarClient) { synchronized (SchedulerManager.this) { boolean isLeader = membershipManager.isLeader(); if (isLeader) { - invokeScheduler(); + try { + invokeScheduler(); + } catch (Exception e) { + log.warn("Failed to invoke scheduler", e); + schedule(); + } } } }); } - private void invokeScheduler() { + private void scheduleCompaction(ScheduledExecutorService executor, long scheduleFrequencySec) { + if (executor != null) { + executor.scheduleWithFixedDelay(() -> { + if (membershipManager.isLeader() && isCompactionNeeded.get()) { + compactAssignmentTopic(); + isCompactionNeeded.set(false); + } + }, scheduleFrequencySec, scheduleFrequencySec, TimeUnit.SECONDS); + } + } + + @VisibleForTesting + public void invokeScheduler() { + List<String> currentMembership = this.membershipManager.getCurrentMembership() .stream().map(workerInfo -> workerInfo.getWorkerId()).collect(Collectors.toList()); @@ -111,12 +134,16 @@ private void invokeScheduler() { while (it.hasNext()) { Map.Entry<String, Map<String, Assignment>> workerIdToAssignmentEntry = it.next(); Map<String, Assignment> functionMap = workerIdToAssignmentEntry.getValue(); + // remove instances that don't exist anymore - functionMap.entrySet().removeIf( - entry -> { - String fullyQualifiedInstanceId = entry.getKey(); - return !allInstances.containsKey(fullyQualifiedInstanceId); - }); + functionMap.entrySet().removeIf(entry -> { + String fullyQualifiedInstanceId = entry.getKey(); + boolean deleted = !allInstances.containsKey(fullyQualifiedInstanceId); + if (deleted) { + publishNewAssignment(entry.getValue().toBuilder().build(), true); + } + return deleted; + }); // update assignment instances in case attributes of a function gets updated for (Map.Entry<String, Assignment> entry : functionMap.entrySet()) { @@ -143,36 +170,40 @@ private void invokeScheduler() { List<Assignment> assignments = this.scheduler.schedule( needsAssignment, currentAssignments, currentMembership); - log.debug("New assignments computed: {}", assignments); + if (log.isDebugEnabled()) { + log.debug("New assignments computed: {}", assignments); + } - long assignmentVersion = this.functionRuntimeManager.getCurrentAssignmentVersion() + 1; - Request.AssignmentsUpdate assignmentsUpdate = Request.AssignmentsUpdate.newBuilder() - .setVersion(assignmentVersion) - .addAllAssignments(assignments) - .build(); + isCompactionNeeded.set(!assignments.isEmpty()); - CompletableFuture<MessageId> messageIdCompletableFuture = producer.sendAsync(assignmentsUpdate.toByteArray()); - try { - messageIdCompletableFuture.get(); - } catch (InterruptedException | ExecutionException e) { - log.error("Failed to send assignment update", e); - throw new RuntimeException(e); + for(Assignment assignment : assignments) { + publishNewAssignment(assignment, false); } + + } - // wait for assignment update to go throw the pipeline - int retries = 0; - while (this.functionRuntimeManager.getCurrentAssignmentVersion() < assignmentVersion) { - if (retries >= this.workerConfig.getAssignmentWriteMaxRetries()) { - log.warn("Max number of retries reached for waiting for assignment to propagate. Will continue now."); - break; - } - log.info("Waiting for assignments to propagate..."); + public void compactAssignmentTopic() { + if (this.admin != null) { try { - Thread.sleep(500); - } catch (InterruptedException e) { - throw new RuntimeException(e); + this.admin.topics().triggerCompaction(workerConfig.getFunctionAssignmentTopic()); + } catch (PulsarAdminException e) { + log.error("Failed to trigger compaction {}", e); + executorService.schedule(() -> compactAssignmentTopic(), DEFAULT_ADMIN_API_BACKOFF_SEC, + TimeUnit.SECONDS); } - retries++; + } + } + + private void publishNewAssignment(Assignment assignment, boolean deleted) { + try { + String fullyQualifiedInstanceId = Utils.getFullyQualifiedInstanceId(assignment.getInstance()); + // publish empty message with instance-id key so, compactor can delete and skip delivery of this instance-id + // message + producer.newMessage().key(fullyQualifiedInstanceId) + .value(deleted ? "".getBytes() : assignment.toByteArray()).sendAsync().get(); + } catch (Exception e) { + log.error("Failed to {} assignment update {}", assignment, deleted ? "send" : "deleted", e); + throw new RuntimeException(e); } } @@ -224,6 +255,5 @@ public void close() { } catch (PulsarClientException e) { log.warn("Failed to shutdown scheduler manager assignment producer", e); } - this.executorService.shutdown(); } } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java index 0f695a974f..a2524c6d97 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java @@ -74,6 +74,9 @@ private long instanceLivenessCheckFreqMs; private String clientAuthenticationPlugin; private String clientAuthenticationParameters; + // Frequency how often worker performs compaction on function-topics + private long topicCompactionFrequencySec = 30 * 60; // 30 minutes + private int metricsSamplingPeriodSec = 60; /***** --- TLS --- ****/ // Enable TLS private boolean tlsEnabled = false; @@ -88,8 +91,6 @@ private boolean tlsRequireTrustedClientCertOnConnect = false; private boolean useTls = false; private boolean tlsHostnameVerificationEnable = false; - - private int metricsSamplingPeriodSec = 60; // Enforce authentication private boolean authenticationEnabled = false; // Autentication provider name list, which is a list of class names diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java index 7fc0cc94d7..488bcd775d 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java @@ -20,6 +20,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; import io.netty.util.concurrent.DefaultThreadFactory; @@ -68,11 +69,15 @@ private PulsarAdmin brokerAdmin; private PulsarAdmin functionAdmin; private final MetricsGenerator metricsGenerator; + private final ScheduledExecutorService executor; + @VisibleForTesting + private URI dlogUri; public WorkerService(WorkerConfig workerConfig) { this.workerConfig = workerConfig; this.statsUpdater = Executors .newSingleThreadScheduledExecutor(new DefaultThreadFactory("worker-stats-updater")); + this.executor = Executors.newScheduledThreadPool(10, new DefaultThreadFactory("pulsar-worker")); this.metricsGenerator = new MetricsGenerator(this.statsUpdater); } @@ -98,12 +103,13 @@ public void start(URI dlogUri) throws InterruptedException { } // create the dlog namespace for storing function packages + this.dlogUri = dlogUri; DistributedLogConfiguration dlogConf = Utils.getDlogConf(workerConfig); try { this.dlogNamespace = NamespaceBuilder.newBuilder() .conf(dlogConf) .clientId("function-worker-" + workerConfig.getWorkerId()) - .uri(dlogUri) + .uri(this.dlogUri) .build(); } catch (Exception e) { log.error("Failed to initialize dlog namespace {} for storing function packages", @@ -128,7 +134,8 @@ public void start(URI dlogUri) throws InterruptedException { log.info("Created Pulsar client"); //create scheduler manager - this.schedulerManager = new SchedulerManager(this.workerConfig, this.client); + this.schedulerManager = new SchedulerManager(this.workerConfig, this.client, this.brokerAdmin, + this.executor); //create function meta data manager this.functionMetaDataManager = new FunctionMetaDataManager( @@ -232,6 +239,10 @@ public void stop() { if (null != this.functionAdmin) { this.functionAdmin.close(); } + + if(this.executor != null) { + this.executor.shutdown(); + } } } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/scheduler/RoundRobinScheduler.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/scheduler/RoundRobinScheduler.java index 58c1a9a513..817962d4f1 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/scheduler/RoundRobinScheduler.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/scheduler/RoundRobinScheduler.java @@ -22,6 +22,8 @@ import org.apache.pulsar.functions.proto.Function.FunctionDetails; import org.apache.pulsar.functions.proto.Function.Instance; +import com.google.common.collect.Lists; + import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -47,19 +49,17 @@ workerIdToAssignment.get(existingAssignment.getWorkerId()).add(existingAssignment); } + List<Assignment> newAssignments = Lists.newArrayList(); for (Instance unassignedFunctionInstance : unassignedFunctionInstances) { String heartBeatWorkerId = checkHeartBeatFunction(unassignedFunctionInstance); String workerId = heartBeatWorkerId != null ? heartBeatWorkerId : findNextWorker(workerIdToAssignment); Assignment newAssignment = Assignment.newBuilder().setInstance(unassignedFunctionInstance) .setWorkerId(workerId).build(); workerIdToAssignment.get(workerId).add(newAssignment); + newAssignments.add(newAssignment); } - List<Assignment> assignments - = workerIdToAssignment.entrySet().stream() - .flatMap(entry -> entry.getValue().stream()).collect(Collectors.toList()); - - return assignments; + return newAssignments; } private static String checkHeartBeatFunction(Instance funInstance) { diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java index 85a2122dd0..5e1ed023aa 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java @@ -37,6 +37,7 @@ import java.util.Map; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.argThat; import static org.mockito.Mockito.doReturn; @@ -86,6 +87,8 @@ public void testProcessAssignmentUpdateAddFunctions() throws Exception { doReturn(readerBuilder).when(pulsarClient).newReader(); doReturn(readerBuilder).when(readerBuilder).topic(anyString()); doReturn(readerBuilder).when(readerBuilder).startMessageId(any()); + doReturn(readerBuilder).when(readerBuilder).startMessageId(any()); + doReturn(readerBuilder).when(readerBuilder).readCompacted(anyBoolean()); doReturn(mock(Reader.class)).when(readerBuilder).create(); WorkerService workerService = mock(WorkerService.class); doReturn(pulsarClient).when(workerService).getClient(); @@ -123,12 +126,8 @@ public void testProcessAssignmentUpdateAddFunctions() throws Exception { assignments.add(assignment1); assignments.add(assignment2); - Request.AssignmentsUpdate assignmentsUpdate = Request.AssignmentsUpdate.newBuilder() - .addAllAssignments(assignments) - .setVersion(1) - .build(); - - functionRuntimeManager.processAssignmentUpdate(MessageId.earliest, assignmentsUpdate); + functionRuntimeManager.processAssignment(assignment1); + functionRuntimeManager.processAssignment(assignment2); verify(functionRuntimeManager, times(2)).setAssignment(any(Function.Assignment.class)); verify(functionRuntimeManager, times(0)).deleteAssignment(any(Function.Assignment.class)); @@ -183,6 +182,7 @@ public void testProcessAssignmentUpdateDeleteFunctions() throws Exception { doReturn(readerBuilder).when(pulsarClient).newReader(); doReturn(readerBuilder).when(readerBuilder).topic(anyString()); doReturn(readerBuilder).when(readerBuilder).startMessageId(any()); + doReturn(readerBuilder).when(readerBuilder).readCompacted(anyBoolean()); doReturn(mock(Reader.class)).when(readerBuilder).create(); WorkerService workerService = mock(WorkerService.class); doReturn(pulsarClient).when(workerService).getClient(); @@ -205,6 +205,7 @@ public void testProcessAssignmentUpdateDeleteFunctions() throws Exception { Function.FunctionDetails.newBuilder() .setTenant("test-tenant").setNamespace("test-namespace").setName("func-2")).build(); + // Delete this assignment Function.Assignment assignment1 = Function.Assignment.newBuilder() .setWorkerId("worker-1") .setInstance(Function.Instance.newBuilder() @@ -221,23 +222,18 @@ public void testProcessAssignmentUpdateDeleteFunctions() throws Exception { functionRuntimeManager.setAssignment(assignment2); reset(functionRuntimeManager); - List<Function.Assignment> assignments = new LinkedList<>(); - assignments.add(assignment2); - - Request.AssignmentsUpdate assignmentsUpdate = Request.AssignmentsUpdate.newBuilder() - .addAllAssignments(assignments) - .setVersion(1) - .build(); - functionRuntimeManager.functionRuntimeInfoMap.put( "test-tenant/test-namespace/func-1:0", new FunctionRuntimeInfo().setFunctionInstance( Function.Instance.newBuilder().setFunctionMetaData(function1).setInstanceId(0) .build())); - functionRuntimeManager.processAssignmentUpdate(MessageId.earliest, assignmentsUpdate); + functionRuntimeManager.processAssignment(assignment1); + functionRuntimeManager.processAssignment(assignment2); + functionRuntimeManager.deleteAssignment(Utils.getFullyQualifiedInstanceId(assignment1.getInstance())); + verify(functionRuntimeManager, times(0)).setAssignment(any(Function.Assignment.class)); - verify(functionRuntimeManager, times(1)).deleteAssignment(any(Function.Assignment.class)); + verify(functionRuntimeManager, times(1)).deleteAssignment(any(String.class)); Assert.assertEquals(functionRuntimeManager.workerIdToAssignments.size(), 1); Assert.assertEquals(functionRuntimeManager.workerIdToAssignments @@ -284,6 +280,7 @@ public void testProcessAssignmentUpdateModifyFunctions() throws Exception { doReturn(readerBuilder).when(pulsarClient).newReader(); doReturn(readerBuilder).when(readerBuilder).topic(anyString()); doReturn(readerBuilder).when(readerBuilder).startMessageId(any()); + doReturn(readerBuilder).when(readerBuilder).readCompacted(anyBoolean()); doReturn(mock(Reader.class)).when(readerBuilder).create(); WorkerService workerService = mock(WorkerService.class); doReturn(pulsarClient).when(workerService).getClient(); @@ -327,14 +324,6 @@ public void testProcessAssignmentUpdateModifyFunctions() throws Exception { .setInstance(Function.Instance.newBuilder() .setFunctionMetaData(function2).setInstanceId(0).build()) .build(); - List<Function.Assignment> assignments = new LinkedList<>(); - assignments.add(assignment1); - assignments.add(assignment3); - - Request.AssignmentsUpdate assignmentsUpdate = Request.AssignmentsUpdate.newBuilder() - .addAllAssignments(assignments) - .setVersion(1) - .build(); functionRuntimeManager.functionRuntimeInfoMap.put( "test-tenant/test-namespace/func-1:0", new FunctionRuntimeInfo().setFunctionInstance( @@ -345,7 +334,8 @@ public void testProcessAssignmentUpdateModifyFunctions() throws Exception { Function.Instance.newBuilder().setFunctionMetaData(function2).setInstanceId(0) .build())); - functionRuntimeManager.processAssignmentUpdate(MessageId.earliest, assignmentsUpdate); + functionRuntimeManager.processAssignment(assignment1); + functionRuntimeManager.processAssignment(assignment3); verify(functionRuntimeManager, times(1)).insertStopAction(any(FunctionRuntimeInfo.class)); verify(functionRuntimeManager).insertStopAction(argThat(new ArgumentMatcher<FunctionRuntimeInfo>() { diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java index 2753bf196a..c849fd36de 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java @@ -131,6 +131,7 @@ public void testCheckFailuresNoFailures() throws Exception { ReaderBuilder<byte[]> readerBuilder = mock(ReaderBuilder.class); doReturn(readerBuilder).when(pulsarClient).newReader(); doReturn(readerBuilder).when(readerBuilder).topic(anyString()); + doReturn(readerBuilder).when(readerBuilder).readCompacted(true); doReturn(readerBuilder).when(readerBuilder).startMessageId(any()); doReturn(mock(Reader.class)).when(readerBuilder).create(); WorkerService workerService = mock(WorkerService.class); @@ -202,6 +203,7 @@ public void testCheckFailuresSomeFailures() throws Exception { doReturn(readerBuilder).when(pulsarClient).newReader(); doReturn(readerBuilder).when(readerBuilder).topic(anyString()); doReturn(readerBuilder).when(readerBuilder).startMessageId(any()); + doReturn(readerBuilder).when(readerBuilder).readCompacted(true); doReturn(mock(Reader.class)).when(readerBuilder).create(); WorkerService workerService = mock(WorkerService.class); doReturn(pulsarClient).when(workerService).getClient(); @@ -296,6 +298,7 @@ public void testCheckFailuresSomeUnassigned() throws Exception { ReaderBuilder<byte[]> readerBuilder = mock(ReaderBuilder.class); doReturn(readerBuilder).when(pulsarClient).newReader(); doReturn(readerBuilder).when(readerBuilder).topic(anyString()); + doReturn(readerBuilder).when(readerBuilder).readCompacted(true); doReturn(readerBuilder).when(readerBuilder).startMessageId(any()); doReturn(mock(Reader.class)).when(readerBuilder).create(); WorkerService workerService = mock(WorkerService.class); diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java index 19977bd812..00e4b6e0e1 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java @@ -28,15 +28,19 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.testng.Assert.assertTrue; import java.lang.reflect.Method; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -46,6 +50,7 @@ import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.TypedMessageBuilder; import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.Function.Assignment; import org.apache.pulsar.functions.proto.Request; @@ -53,9 +58,14 @@ import org.mockito.Mockito; import org.mockito.invocation.Invocation; import org.testng.Assert; +import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import com.google.common.collect.Sets; +import com.google.protobuf.InvalidProtocolBufferException; + +import io.netty.util.concurrent.DefaultThreadFactory; import lombok.extern.slf4j.Slf4j; @Slf4j @@ -67,6 +77,8 @@ private MembershipManager membershipManager; private CompletableFuture<MessageId> completableFuture; private Producer producer; + private TypedMessageBuilder<byte[]> message; + private ScheduledExecutorService executor; private static PulsarClient mockPulsarClient() throws PulsarClientException { ProducerBuilder<byte[]> builder = mock(ProducerBuilder.class); @@ -94,8 +106,12 @@ public void setup() throws PulsarClientException { producer = mock(Producer.class); completableFuture = spy(new CompletableFuture<>()); completableFuture.complete(MessageId.earliest); - byte[] bytes = any(); - when(producer.sendAsync(bytes)).thenReturn(completableFuture); + //byte[] bytes = any(); + message = mock(TypedMessageBuilder.class); + when(producer.newMessage()).thenReturn(message); + when(message.key(anyString())).thenReturn(message); + when(message.value(any())).thenReturn(message); + when(message.sendAsync()).thenReturn(completableFuture); ProducerBuilder<byte[]> builder = mock(ProducerBuilder.class); when(builder.topic(anyString())).thenReturn(builder); @@ -109,7 +125,9 @@ public void setup() throws PulsarClientException { PulsarClient pulsarClient = mock(PulsarClient.class); when(pulsarClient.newProducer()).thenReturn(builder); - schedulerManager = spy(new SchedulerManager(workerConfig, pulsarClient)); + this.executor = Executors + .newSingleThreadScheduledExecutor(new DefaultThreadFactory("worker-test")); + schedulerManager = spy(new SchedulerManager(workerConfig, pulsarClient, null, executor)); functionRuntimeManager = mock(FunctionRuntimeManager.class); functionMetaDataManager = mock(FunctionMetaDataManager.class); membershipManager = mock(MembershipManager.class); @@ -118,6 +136,11 @@ public void setup() throws PulsarClientException { schedulerManager.setMembershipManager(membershipManager); } + @AfterMethod + public void stop() { + this.executor.shutdown(); + } + @Test public void testSchedule() throws Exception { @@ -143,9 +166,6 @@ public void testSchedule() throws Exception { currentAssignments.put("worker-1", assignmentEntry1); doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments(); - //set version - doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion(); - // single node List<WorkerInfo> workerInfoList = new LinkedList<>(); workerInfoList.add(WorkerInfo.of("worker-1", "workerHostname-1", 5000)); @@ -159,7 +179,9 @@ public void testSchedule() throws Exception { // i am leader doReturn(true).when(membershipManager).isLeader(); callSchedule(); - verify(producer, times(1)).sendAsync(any(byte[].class)); + List<Invocation> invocations = getMethodInvocationDetails(schedulerManager, + SchedulerManager.class.getMethod("invokeScheduler")); + Assert.assertEquals(invocations.size(), 1); } @Test @@ -187,9 +209,6 @@ public void testNothingNewToSchedule() throws Exception { currentAssignments.put("worker-1", assignmentEntry1); doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments(); - //set version - doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion(); - // single node List<WorkerInfo> workerInfoList = new LinkedList<>(); workerInfoList.add(WorkerInfo.of("worker-1", "workerHostname-1", 5000)); @@ -200,17 +219,8 @@ public void testNothingNewToSchedule() throws Exception { callSchedule(); - List<Invocation> invocations = getMethodInvocationDetails(producer, Producer.class.getMethod("sendAsync", - Object.class)); - Assert.assertEquals(invocations.size(), 1); - - byte[] send = (byte[]) invocations.get(0).getRawArguments()[0]; - Request.AssignmentsUpdate assignmentsUpdate = Request.AssignmentsUpdate.parseFrom(send); - log.info("assignmentsUpdate: {}", assignmentsUpdate); - Assert.assertEquals( - Request.AssignmentsUpdate.newBuilder().setVersion(version + 1) - .addAssignments(assignment1).build(), - assignmentsUpdate); + List<Invocation> invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("sendAsync")); + Assert.assertEquals(invocations.size(), 0); } @Test @@ -243,9 +253,6 @@ public void testAddingFunctions() throws Exception { currentAssignments.put("worker-1", assignmentEntry1); doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments(); - //set version - doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion(); - // single node List<WorkerInfo> workerInfoList = new LinkedList<>(); workerInfoList.add(WorkerInfo.of("worker-1", "workerHostname-1", 5000)); @@ -256,23 +263,20 @@ public void testAddingFunctions() throws Exception { callSchedule(); - List<Invocation> invocations = getMethodInvocationDetails(producer, Producer.class.getMethod("sendAsync", - Object.class)); + List<Invocation> invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("sendAsync")); Assert.assertEquals(invocations.size(), 1); - + invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("value", + Object.class)); byte[] send = (byte[]) invocations.get(0).getRawArguments()[0]; - Request.AssignmentsUpdate assignmentsUpdate = Request.AssignmentsUpdate.parseFrom(send); + Assignment assignments = Assignment.parseFrom(send); - log.info("assignmentsUpdate: {}", assignmentsUpdate); + log.info("assignments: {}", assignments); Function.Assignment assignment2 = Function.Assignment.newBuilder() .setWorkerId("worker-1") .setInstance(Function.Instance.newBuilder() .setFunctionMetaData(function2).setInstanceId(0).build()) .build(); - Assert.assertEquals( - Request.AssignmentsUpdate.newBuilder().setVersion(version + 1) - .addAssignments(assignment1).addAssignments(assignment2).build(), - assignmentsUpdate); + Assert.assertEquals(assignment2, assignments); } @@ -288,7 +292,7 @@ public void testDeletingFunctions() throws Exception { // simulate function2 got removed Function.FunctionMetaData function2 = Function.FunctionMetaData.newBuilder() .setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-2") - .setNamespace("namespace-1").setTenant("tenant-1").setParallelism(1)).setVersion(version) + .setNamespace("namespace-1").setTenant("tenant-1").setParallelism(1)) .build(); functionMetaDataList.add(function1); doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData(); @@ -300,6 +304,7 @@ public void testDeletingFunctions() throws Exception { .setFunctionMetaData(function1).setInstanceId(0).build()) .build(); + // Delete this assignment Function.Assignment assignment2 = Function.Assignment.newBuilder() .setWorkerId("worker-1") .setInstance(Function.Instance.newBuilder() @@ -309,14 +314,12 @@ public void testDeletingFunctions() throws Exception { Map<String, Map<String, Function.Assignment>> currentAssignments = new HashMap<>(); Map<String, Function.Assignment> assignmentEntry1 = new HashMap<>(); assignmentEntry1.put(Utils.getFullyQualifiedInstanceId(assignment1.getInstance()), assignment1); + //TODO: delete this assignment assignmentEntry1.put(Utils.getFullyQualifiedInstanceId(assignment2.getInstance()), assignment2); currentAssignments.put("worker-1", assignmentEntry1); doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments(); - //set version - doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion(); - // single node List<WorkerInfo> workerInfoList = new LinkedList<>(); workerInfoList.add(WorkerInfo.of("worker-1", "workerHostname-1", 5000)); @@ -327,19 +330,16 @@ public void testDeletingFunctions() throws Exception { callSchedule(); - List<Invocation> invocations = getMethodInvocationDetails(producer, Producer.class.getMethod("sendAsync", - Object.class)); + List<Invocation> invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("sendAsync")); Assert.assertEquals(invocations.size(), 1); - + invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("value", + Object.class)); byte[] send = (byte[]) invocations.get(0).getRawArguments()[0]; - Request.AssignmentsUpdate assignmentsUpdate = Request.AssignmentsUpdate.parseFrom(send); + Assignment assignments = Assignment.parseFrom(send); - log.info("assignmentsUpdate: {}", assignmentsUpdate); + log.info("assignments: {}", assignments); - Assert.assertEquals( - Request.AssignmentsUpdate.newBuilder().setVersion(version + 1) - .addAssignments(assignment1).build(), - assignmentsUpdate); + Assert.assertEquals(0, send.length); } @Test @@ -373,9 +373,6 @@ public void testScalingUp() throws Exception { currentAssignments.put("worker-1", assignmentEntry1); doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments(); - //set version - doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion(); - // single node List<WorkerInfo> workerInfoList = new LinkedList<>(); workerInfoList.add(WorkerInfo.of("worker-1", "workerHostname-1", 5000)); @@ -386,25 +383,21 @@ public void testScalingUp() throws Exception { callSchedule(); - List<Invocation> invocations = getMethodInvocationDetails(producer, Producer.class.getMethod("sendAsync", - Object.class)); + List<Invocation> invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("sendAsync")); Assert.assertEquals(invocations.size(), 1); - + invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("value", + Object.class)); byte[] send = (byte[]) invocations.get(0).getRawArguments()[0]; - Request.AssignmentsUpdate assignmentsUpdate = Request.AssignmentsUpdate.parseFrom(send); + Assignment assignments = Assignment.parseFrom(send); - log.info("assignmentsUpdate: {}", assignmentsUpdate); + log.info("assignments: {}", assignments); Function.Assignment assignment2 = Function.Assignment.newBuilder() .setWorkerId("worker-1") .setInstance(Function.Instance.newBuilder() .setFunctionMetaData(function2).setInstanceId(0).build()) .build(); - Assert.assertEquals( - assignmentsUpdate, - Request.AssignmentsUpdate.newBuilder().setVersion(version + 1) - .addAssignments(assignment1).addAssignments(assignment2).build() - ); + Assert.assertEquals(assignments, assignment2); // scale up @@ -435,17 +428,23 @@ public void testScalingUp() throws Exception { callSchedule(); - invocations = getMethodInvocationDetails(producer, Producer.class.getMethod("sendAsync", + invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("sendAsync")); + Assert.assertEquals(invocations.size(), 4); + invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("value", Object.class)); - Assert.assertEquals(invocations.size(), 2); + + Set<Assignment> allAssignments = Sets.newHashSet(); + invocations.forEach(invocation -> { + try { + allAssignments.add(Assignment.parseFrom((byte[])invocation.getRawArguments()[0])); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + }); - send = (byte[]) invocations.get(1).getRawArguments()[0]; - assignmentsUpdate = Request.AssignmentsUpdate.parseFrom(send); - Assert.assertEquals(assignmentsUpdate, - Request.AssignmentsUpdate.newBuilder().setVersion(version + 1 + 1) - .addAssignments(assignment1).addAssignments(assignment2Scaled1) - .addAssignments(assignment2Scaled2).addAssignments(assignment2Scaled3).build() - ); + assertTrue(allAssignments.contains(assignment2Scaled1)); + assertTrue(allAssignments.contains(assignment2Scaled2)); + assertTrue(allAssignments.contains(assignment2Scaled3)); } @Test @@ -479,9 +478,6 @@ public void testScalingDown() throws Exception { currentAssignments.put("worker-1", assignmentEntry1); doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments(); - //set version - doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion(); - // single node List<WorkerInfo> workerInfoList = new LinkedList<>(); workerInfoList.add(WorkerInfo.of("worker-1", "workerHostname-1", 5000)); @@ -492,14 +488,24 @@ public void testScalingDown() throws Exception { callSchedule(); - List<Invocation> invocations = getMethodInvocationDetails(producer, Producer.class.getMethod("sendAsync", + List<Invocation> invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("sendAsync")); + Assert.assertEquals(invocations.size(), 3); + invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("value", Object.class)); - Assert.assertEquals(invocations.size(), 1); - byte[] send = (byte[]) invocations.get(0).getRawArguments()[0]; - Request.AssignmentsUpdate assignmentsUpdate = Request.AssignmentsUpdate.parseFrom(send); + Assignment assignments = Assignment.parseFrom(send); + + log.info("assignments: {}", assignments); + + Set<Assignment> allAssignments = Sets.newHashSet(); + invocations.forEach(invocation -> { + try { + allAssignments.add(Assignment.parseFrom((byte[])invocation.getRawArguments()[0])); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + }); - log.info("assignmentsUpdate: {}", assignmentsUpdate); Function.Assignment assignment2_1 = Function.Assignment.newBuilder() .setWorkerId("worker-1") @@ -516,13 +522,11 @@ public void testScalingDown() throws Exception { .setInstance(Function.Instance.newBuilder() .setFunctionMetaData(function2).setInstanceId(2).build()) .build(); - Assert.assertEquals( - assignmentsUpdate, - Request.AssignmentsUpdate.newBuilder().setVersion(version + 1) - .addAssignments(assignment1).addAssignments(assignment2_1) - .addAssignments(assignment2_2).addAssignments(assignment2_3).build() - ); - + + assertTrue(allAssignments.contains(assignment2_1)); + assertTrue(allAssignments.contains(assignment2_2)); + assertTrue(allAssignments.contains(assignment2_3)); + // scale down Function.FunctionMetaData function2Scaled = Function.FunctionMetaData.newBuilder() @@ -542,17 +546,23 @@ public void testScalingDown() throws Exception { callSchedule(); - invocations = getMethodInvocationDetails(producer, Producer.class.getMethod("sendAsync", + invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("sendAsync")); + Assert.assertEquals(invocations.size(), 4); + invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("value", Object.class)); - Assert.assertEquals(invocations.size(), 2); + send = (byte[]) invocations.get(0).getRawArguments()[0]; + assignments = Assignment.parseFrom(send); + + Set<Assignment> allAssignments2 = Sets.newHashSet(); + invocations.forEach(invocation -> { + try { + allAssignments2.add(Assignment.parseFrom((byte[])invocation.getRawArguments()[0])); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + }); - send = (byte[]) invocations.get(1).getRawArguments()[0]; - assignmentsUpdate = Request.AssignmentsUpdate.parseFrom(send); - Assert.assertEquals(assignmentsUpdate, - Request.AssignmentsUpdate.newBuilder().setVersion(version + 1 + 1) - .addAssignments(assignment1).addAssignments(assignment2Scaled) - .build() - ); + assertTrue(allAssignments2.contains(assignment2Scaled)); } @Test @@ -582,9 +592,6 @@ public void testHeartbeatFunction() throws Exception { currentAssignments.put("worker-1", assignmentEntry1); doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments(); - // set version - doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion(); - List<WorkerInfo> workerInfoList = new LinkedList<>(); workerInfoList.add(WorkerInfo.of(workerId1, "workerHostname-1", 5000)); workerInfoList.add(WorkerInfo.of(workerId2, "workerHostname-1", 6000)); @@ -595,20 +602,20 @@ public void testHeartbeatFunction() throws Exception { callSchedule(); - List<Invocation> invocations = getMethodInvocationDetails(producer, - Producer.class.getMethod("sendAsync", Object.class)); - Assert.assertEquals(invocations.size(), 1); - - byte[] send = (byte[]) invocations.get(0).getRawArguments()[0]; - Request.AssignmentsUpdate assignmentsUpdate = Request.AssignmentsUpdate.parseFrom(send); - - List<Assignment> assignmentList = assignmentsUpdate.getAssignmentsList(); - Assert.assertEquals(assignmentList.size(), 2); - for (Assignment assignment : assignmentList) { - String functionName = assignment.getInstance().getFunctionMetaData().getFunctionDetails().getName(); - String assignedWorkerId = assignment.getWorkerId(); - Assert.assertEquals(functionName, assignedWorkerId); - } + List<Invocation> invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("sendAsync")); + Assert.assertEquals(invocations.size(), 2); + invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("value", + Object.class)); + invocations.forEach(invocation -> { + try { + Assignment assignment = Assignment.parseFrom((byte[])invocation.getRawArguments()[0]); + String functionName = assignment.getInstance().getFunctionMetaData().getFunctionDetails().getName(); + String assignedWorkerId = assignment.getWorkerId(); + Assert.assertEquals(functionName, assignedWorkerId); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + }); } @Test @@ -644,9 +651,6 @@ public void testUpdate() throws Exception { currentAssignments.put("worker-1", assignmentEntry1); doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments(); - //set version - doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion(); - // single node List<WorkerInfo> workerInfoList = new LinkedList<>(); workerInfoList.add(WorkerInfo.of("worker-1", "workerHostname-1", 5000)); @@ -657,14 +661,14 @@ public void testUpdate() throws Exception { callSchedule(); - List<Invocation> invocations = getMethodInvocationDetails(producer, Producer.class.getMethod("sendAsync", + List<Invocation> invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("sendAsync")); + Assert.assertEquals(invocations.size(), 3); + invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("value", Object.class)); - Assert.assertEquals(invocations.size(), 1); - byte[] send = (byte[]) invocations.get(0).getRawArguments()[0]; - Request.AssignmentsUpdate assignmentsUpdate = Request.AssignmentsUpdate.parseFrom(send); + Assignment assignments = Assignment.parseFrom(send); - log.info("assignmentsUpdate: {}", assignmentsUpdate); + log.info("assignmentsUpdate: {}", assignments); Function.Assignment assignment2_1 = Function.Assignment.newBuilder() .setWorkerId("worker-1") @@ -681,13 +685,27 @@ public void testUpdate() throws Exception { .setInstance(Function.Instance.newBuilder() .setFunctionMetaData(function2).setInstanceId(2).build()) .build(); - Assert.assertEquals( - assignmentsUpdate, - Request.AssignmentsUpdate.newBuilder().setVersion(version + 1) - .addAssignments(assignment1).addAssignments(assignment2_1) - .addAssignments(assignment2_2).addAssignments(assignment2_3).build() - ); - + + invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("sendAsync")); + Assert.assertEquals(invocations.size(), 3); + invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("value", + Object.class)); + send = (byte[]) invocations.get(0).getRawArguments()[0]; + assignments = Assignment.parseFrom(send); + + Set<Assignment> allAssignments = Sets.newHashSet(); + invocations.forEach(invocation -> { + try { + allAssignments.add(Assignment.parseFrom((byte[])invocation.getRawArguments()[0])); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + }); + + assertTrue(allAssignments.contains(assignment2_1)); + assertTrue(allAssignments.contains(assignment2_2)); + assertTrue(allAssignments.contains(assignment2_3)); + // scale down Function.FunctionMetaData function2Updated = Function.FunctionMetaData.newBuilder() @@ -718,28 +736,32 @@ public void testUpdate() throws Exception { callSchedule(); - invocations = getMethodInvocationDetails(producer, Producer.class.getMethod("sendAsync", + invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("sendAsync")); + Assert.assertEquals(invocations.size(), 6); + invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("value", Object.class)); - Assert.assertEquals(invocations.size(), 2); - - send = (byte[]) invocations.get(1).getRawArguments()[0]; - assignmentsUpdate = Request.AssignmentsUpdate.parseFrom(send); - Assert.assertEquals(assignmentsUpdate, - Request.AssignmentsUpdate.newBuilder().setVersion(version + 1 + 1) - .addAssignments(assignment1).addAssignments(assignment2Updated1) - .addAssignments(assignment2Updated2) - .addAssignments(assignment2Updated3) - .build() - ); + send = (byte[]) invocations.get(0).getRawArguments()[0]; + assignments = Assignment.parseFrom(send); + + Set<Assignment> allAssignments2 = Sets.newHashSet(); + invocations.forEach(invocation -> { + try { + allAssignments2.add(Assignment.parseFrom((byte[])invocation.getRawArguments()[0])); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + }); + + assertTrue(allAssignments2.contains(assignment2Updated1)); + assertTrue(allAssignments2.contains(assignment2Updated2)); + assertTrue(allAssignments2.contains(assignment2Updated3)); } private void callSchedule() throws NoSuchMethodException, InterruptedException, TimeoutException, ExecutionException { - long intialVersion = functionRuntimeManager.getCurrentAssignmentVersion(); Future<?> complete = schedulerManager.schedule(); complete.get(30, TimeUnit.SECONDS); - doReturn(intialVersion + 1).when(functionRuntimeManager).getCurrentAssignmentVersion(); } private List<Invocation> getMethodInvocationDetails(Object o, Method method) throws NoSuchMethodException { ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services