rdhabalia closed pull request #2549: [Function] avoid creating assignment 
snapshot and publish individual assignment msg
URL: https://github.com/apache/incubator-pulsar/pull/2549
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml
index 444b7fb9f6..0c7b8af695 100644
--- a/conf/functions_worker.yml
+++ b/conf/functions_worker.yml
@@ -44,6 +44,8 @@ rescheduleTimeoutMs: 60000
 initialBrokerReconnectMaxRetries: 60
 assignmentWriteMaxRetries: 60
 instanceLivenessCheckFreqMs: 30000
+# Frequency how often worker performs compaction on function-topics
+topicCompactionFrequencySec: 1800
 metricsSamplingPeriodSec: 60
 # Enforce authentication
 authenticationEnabled: false
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
index 425e04921b..612b33601b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
@@ -359,4 +359,4 @@ private void phaseTwoLoop(RawReader reader, MessageId to, 
Map<String, MessageId>
             this.latestForKey = latestForKey;
         }
     }
-}
+}
\ No newline at end of file
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index 32c93b4626..8e7b292d6a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -1268,4 +1268,4 @@ public void testEmptyCompactionLedger() throws Exception {
         }
     }
 
-}
+}
\ No newline at end of file
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
new file mode 100644
index 0000000000..fe28a5119e
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
@@ -0,0 +1,370 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker;
+
+import static 
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
+import static org.mockito.Mockito.spy;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+
+import java.io.File;
+import java.lang.reflect.Method;
+import java.net.InetAddress;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import org.apache.bookkeeper.test.PortManager;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.ServiceConfigurationUtils;
+import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
+import org.apache.pulsar.client.admin.BrokerStats;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.functions.api.utils.IdentityFunction;
+import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.proto.Function.Assignment;
+import org.apache.pulsar.functions.proto.Function.FunctionDetails;
+import org.apache.pulsar.functions.proto.Function.SinkSpec;
+import org.apache.pulsar.functions.proto.Function.SourceSpec;
+import org.apache.pulsar.functions.sink.PulsarSink;
+import org.apache.pulsar.functions.utils.Reflections;
+import org.apache.pulsar.functions.utils.Utils;
+import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.gson.Gson;
+
+import jersey.repackaged.com.google.common.collect.Lists;
+
+/**
+ * Test Pulsar sink on function
+ *
+ */
+public class PulsarWorkerAssignmentTest {
+    LocalBookkeeperEnsemble bkEnsemble;
+
+    ServiceConfiguration config;
+    WorkerConfig workerConfig;
+    PulsarService pulsar;
+    PulsarAdmin admin;
+    PulsarClient pulsarClient;
+    BrokerStats brokerStatsClient;
+    WorkerService functionsWorkerService;
+    final String tenant = "external-repl-prop";
+    String pulsarFunctionsNamespace = tenant + "/use/pulsar-function-admin";
+    String primaryHost;
+    String workerId;
+
+    private final int ZOOKEEPER_PORT = PortManager.nextFreePort();
+    private final int brokerWebServicePort = PortManager.nextFreePort();
+    private final int brokerServicePort = PortManager.nextFreePort();
+    private final int workerServicePort = PortManager.nextFreePort();
+
+    private static final Logger log = 
LoggerFactory.getLogger(PulsarWorkerAssignmentTest.class);
+
+    @BeforeMethod
+    void setup(Method method) throws Exception {
+
+        log.info("--- Setting up method {} ---", method.getName());
+
+        // Start local bookkeeper ensemble
+        bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, 
PortManager.nextFreePort());
+        bkEnsemble.start();
+
+        String brokerServiceUrl = "http://127.0.0.1:"; + brokerServicePort;
+        String brokerWeServiceUrl = "http://127.0.0.1:"; + brokerWebServicePort;
+
+        config = spy(new ServiceConfiguration());
+        config.setClusterName("use");
+        Set<String> superUsers = Sets.newHashSet("superUser");
+        config.setSuperUserRoles(superUsers);
+        config.setWebServicePort(brokerWebServicePort);
+        config.setZookeeperServers("127.0.0.1" + ":" + ZOOKEEPER_PORT);
+        config.setBrokerServicePort(brokerServicePort);
+        config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName());
+
+        functionsWorkerService = createPulsarFunctionWorker(config);
+        Optional<WorkerService> functionWorkerService = 
Optional.of(functionsWorkerService);
+        pulsar = new PulsarService(config, functionWorkerService);
+        pulsar.start();
+
+        admin = 
spy(PulsarAdmin.builder().serviceHttpUrl(brokerWeServiceUrl).build());
+
+        brokerStatsClient = admin.brokerStats();
+        primaryHost = String.format("http://%s:%d";, 
InetAddress.getLocalHost().getHostName(), brokerWebServicePort);
+
+        // update cluster metadata
+        ClusterData clusterData = new ClusterData(brokerServiceUrl);
+        admin.clusters().updateCluster(config.getClusterName(), clusterData);
+
+        ClientBuilder clientBuilder = 
PulsarClient.builder().serviceUrl(this.workerConfig.getPulsarServiceUrl());
+        pulsarClient = clientBuilder.build();
+
+        TenantInfo propAdmin = new TenantInfo();
+        propAdmin.getAdminRoles().add("superUser");
+        
propAdmin.setAllowedClusters(Sets.newHashSet(Lists.newArrayList("use")));
+        admin.tenants().updateTenant(tenant, propAdmin);
+
+        Thread.sleep(100);
+    }
+
+    @AfterMethod
+    void shutdown() throws Exception {
+        log.info("--- Shutting down ---");
+        pulsarClient.close();
+        admin.close();
+        functionsWorkerService.stop();
+        pulsar.close();
+        bkEnsemble.stop();
+    }
+
+    private WorkerService createPulsarFunctionWorker(ServiceConfiguration 
config) {
+        workerConfig = new WorkerConfig();
+        workerConfig.setPulsarFunctionsNamespace(pulsarFunctionsNamespace);
+        workerConfig.setSchedulerClassName(
+                
org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler.class.getName());
+        workerConfig.setThreadContainerFactory(new 
WorkerConfig.ThreadContainerFactory().setThreadGroupName("use"));
+        // worker talks to local broker
+        workerConfig.setPulsarServiceUrl("pulsar://127.0.0.1:" + 
config.getBrokerServicePort());
+        workerConfig.setPulsarWebServiceUrl("http://127.0.0.1:"; + 
config.getWebServicePort());
+        workerConfig.setFailureCheckFreqMs(100);
+        workerConfig.setNumFunctionPackageReplicas(1);
+        workerConfig.setClusterCoordinationTopicName("coordinate");
+        workerConfig.setFunctionAssignmentTopicName("assignment");
+        workerConfig.setFunctionMetadataTopicName("metadata");
+        workerConfig.setInstanceLivenessCheckFreqMs(100);
+        workerConfig.setWorkerPort(workerServicePort);
+        workerConfig.setPulsarFunctionsCluster(config.getClusterName());
+        String hostname = 
ServiceConfigurationUtils.getDefaultOrConfiguredAddress(config.getAdvertisedAddress());
+        this.workerId = "c-" + config.getClusterName() + "-fw-" + hostname + 
"-" + workerConfig.getWorkerPort();
+        workerConfig.setWorkerHostname(hostname);
+        workerConfig.setWorkerId(workerId);
+        workerConfig.setTopicCompactionFrequencySec(1);
+
+        return new WorkerService(workerConfig);
+    }
+
+    @Test
+    public void testFunctionAssignments() throws Exception {
+
+        final String namespacePortion = "assignment-test";
+        final String replNamespace = tenant + "/" + namespacePortion;
+        final String sinkTopic = "persistent://" + replNamespace + 
"/my-topic1";
+        final String functionName = "assign";
+        final String subscriptionName = "test-sub";
+        admin.namespaces().createNamespace(replNamespace);
+        Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
+        admin.namespaces().setNamespaceReplicationClusters(replNamespace, 
clusters);
+
+        String jarFilePathUrl = Utils.FILE + ":"
+                + 
PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath();
+        FunctionDetails.Builder functionDetailsBuilder = 
createFunctionDetails(jarFilePathUrl, tenant, namespacePortion,
+                functionName, "my.*", sinkTopic, subscriptionName);
+        functionDetailsBuilder.setParallelism(2);
+        FunctionDetails functionDetails = functionDetailsBuilder.build();
+
+        // (1) Create function with 2 instance
+        admin.functions().createFunctionWithUrl(functionDetails, 
jarFilePathUrl);
+        retryStrategically((test) -> {
+            try {
+                return admin.topics().getStats(sinkTopic).subscriptions.size() 
== 1
+                        && 
admin.topics().getStats(sinkTopic).subscriptions.values().iterator().next().consumers
+                                .size() == 2;
+            } catch (PulsarAdminException e) {
+                return false;
+            }
+        }, 5, 150);
+        // validate 2 instances have been started
+        assertEquals(admin.topics().getStats(sinkTopic).subscriptions.size(), 
1);
+        
assertEquals(admin.topics().getStats(sinkTopic).subscriptions.values().iterator().next().consumers.size(),
 2);
+
+        // (2) Update function with 1 instance
+        functionDetailsBuilder.setParallelism(1);
+        functionDetails = functionDetailsBuilder.build();
+        // try to update function to test: update-function functionality
+        admin.functions().updateFunctionWithUrl(functionDetails, 
jarFilePathUrl);
+        retryStrategically((test) -> {
+            try {
+                return admin.topics().getStats(sinkTopic).subscriptions.size() 
== 1
+                        && 
admin.topics().getStats(sinkTopic).subscriptions.values().iterator().next().consumers
+                                .size() == 1;
+            } catch (PulsarAdminException e) {
+                return false;
+            }
+        }, 5, 150);
+        // validate pulsar sink consumer has started on the topic
+        
assertEquals(admin.topics().getStats(sinkTopic).subscriptions.values().iterator().next().consumers.size(),
 1);
+    }
+
+    @Test(timeOut=20000)
+    public void testFunctionAssignmentsWithRestart() throws Exception {
+
+        final String namespacePortion = "assignment-test";
+        final String replNamespace = tenant + "/" + namespacePortion;
+        final String sinkTopic = "persistent://" + replNamespace + 
"/my-topic1";
+        final String baseFunctionName = "assign-restart";
+        final String subscriptionName = "test-sub";
+        final int totalFunctions = 5;
+        final int parallelism = 2;
+        admin.namespaces().createNamespace(replNamespace);
+        Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
+        admin.namespaces().setNamespaceReplicationClusters(replNamespace, 
clusters);
+        final FunctionRuntimeManager runtimeManager = 
functionsWorkerService.getFunctionRuntimeManager();
+
+        String jarFilePathUrl = Utils.FILE + ":"
+                + 
PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath();
+        FunctionDetails.Builder functionDetailsBuilder = null;
+        // (1) Register functions with 2 instances
+        for (int i = 0; i < totalFunctions; i++) {
+            String functionName = baseFunctionName + i;
+            functionDetailsBuilder = createFunctionDetails(jarFilePathUrl, 
tenant, namespacePortion, functionName,
+                    "my.*", sinkTopic, subscriptionName);
+            functionDetailsBuilder.setParallelism(parallelism);
+            // set-auto-ack prop =true
+            functionDetailsBuilder.setAutoAck(true);
+            FunctionDetails functionDetails = functionDetailsBuilder.build();
+            admin.functions().createFunctionWithUrl(functionDetails, 
jarFilePathUrl);
+        }
+        retryStrategically((test) -> {
+            try {
+                Map<String, Assignment> assgn = 
runtimeManager.getCurrentAssignments().values().iterator().next();
+                return assgn.size() == (totalFunctions * parallelism);
+            } catch (Exception e) {
+                return false;
+            }
+        }, 5, 150);
+
+        // Validate registered assignments
+        Map<String, Assignment> assignments = 
runtimeManager.getCurrentAssignments().values().iterator().next();
+        assertEquals(assignments.size(), (totalFunctions * parallelism));
+
+        // (2) Update function with prop=auto-ack and Delete 2 functions
+        for (int i = 0; i < totalFunctions; i++) {
+            String functionName = baseFunctionName + i;
+            functionDetailsBuilder = createFunctionDetails(jarFilePathUrl, 
tenant, namespacePortion, functionName,
+                    "my.*", sinkTopic, subscriptionName);
+            functionDetailsBuilder.setParallelism(parallelism);
+            // set-auto-ack prop =false
+            functionDetailsBuilder.setAutoAck(false);
+            FunctionDetails functionDetails = functionDetailsBuilder.build();
+            admin.functions().updateFunctionWithUrl(functionDetails, 
jarFilePathUrl);
+        }
+
+        int totalDeletedFunction = 2;
+        for (int i = (totalFunctions - 1); i >= (totalFunctions - 
totalDeletedFunction); i--) {
+            String functionName = baseFunctionName + i;
+            admin.functions().deleteFunction(tenant, namespacePortion, 
functionName);
+        }
+        retryStrategically((test) -> {
+            try {
+                Map<String, Assignment> assgn = 
runtimeManager.getCurrentAssignments().values().iterator().next();
+                return assgn.size() == ((totalFunctions - 
totalDeletedFunction) * parallelism);
+            } catch (Exception e) {
+                return false;
+            }
+        }, 5, 150);
+
+        // Validate registered assignments
+        assignments = 
runtimeManager.getCurrentAssignments().values().iterator().next();
+        assertEquals(assignments.size(), ((totalFunctions - 
totalDeletedFunction) * parallelism));
+
+        // (3) Restart worker service and check registered functions
+        URI dlUri = functionsWorkerService.getDlogUri();
+        functionsWorkerService.stop();
+        functionsWorkerService = new WorkerService(workerConfig);
+        functionsWorkerService.start(dlUri);
+        FunctionRuntimeManager runtimeManager2 = 
functionsWorkerService.getFunctionRuntimeManager();
+        retryStrategically((test) -> {
+            try {
+                Map<String, Assignment> assgn = 
runtimeManager2.getCurrentAssignments().values().iterator().next();
+                return assgn.size() == ((totalFunctions - 
totalDeletedFunction) * parallelism);
+            } catch (Exception e) {
+                return false;
+            }
+        }, 5, 150);
+
+        // Validate registered assignments
+        assignments = 
runtimeManager2.getCurrentAssignments().values().iterator().next();
+        assertEquals(assignments.size(), ((totalFunctions - 
totalDeletedFunction) * parallelism));
+
+        // validate updated function prop = auto-ack=false and instnaceid
+        for (int i = 0; i < (totalFunctions - totalDeletedFunction); i++) {
+            String functionName = baseFunctionName + i;
+            assertFalse(admin.functions().getFunction(tenant, 
namespacePortion, functionName).getAutoAck());
+        }
+    }
+
+    protected static FunctionDetails.Builder createFunctionDetails(String 
jarFile, String tenant, String namespace,
+            String functionName, String sourceTopic, String sinkTopic, String 
subscriptionName) {
+
+        File file = new File(jarFile);
+        try {
+            Reflections.loadJar(file);
+        } catch (MalformedURLException e) {
+            throw new RuntimeException("Failed to load user jar " + file, e);
+        }
+        String sourceTopicPattern = String.format("persistent://%s/%s/%s", 
tenant, namespace, sourceTopic);
+        Class<?> typeArg = byte[].class;
+
+        FunctionDetails.Builder functionDetailsBuilder = 
FunctionDetails.newBuilder();
+        functionDetailsBuilder.setTenant(tenant);
+        functionDetailsBuilder.setNamespace(namespace);
+        functionDetailsBuilder.setName(functionName);
+        functionDetailsBuilder.setRuntime(FunctionDetails.Runtime.JAVA);
+        functionDetailsBuilder.setParallelism(1);
+        functionDetailsBuilder.setClassName(IdentityFunction.class.getName());
+
+        // set source spec
+        // source spec classname should be empty so that the default pulsar 
source will be used
+        SourceSpec.Builder sourceSpecBuilder = SourceSpec.newBuilder();
+        
sourceSpecBuilder.setSubscriptionType(Function.SubscriptionType.SHARED);
+        sourceSpecBuilder.setTypeClassName(typeArg.getName());
+        sourceSpecBuilder.setTopicsPattern(sourceTopicPattern);
+        sourceSpecBuilder.setSubscriptionName(subscriptionName);
+        sourceSpecBuilder.putTopicsToSerDeClassName(sourceTopicPattern, "");
+        functionDetailsBuilder.setAutoAck(true);
+        functionDetailsBuilder.setSource(sourceSpecBuilder);
+
+        // set up sink spec
+        SinkSpec.Builder sinkSpecBuilder = SinkSpec.newBuilder();
+        // sinkSpecBuilder.setClassName(PulsarSink.class.getName());
+        sinkSpecBuilder.setTopic(sinkTopic);
+        Map<String, Object> sinkConfigMap = Maps.newHashMap();
+        sinkSpecBuilder.setConfigs(new Gson().toJson(sinkConfigMap));
+        sinkSpecBuilder.setTypeClassName(typeArg.getName());
+        functionDetailsBuilder.setSink(sinkSpecBuilder);
+
+        return functionDetailsBuilder;
+    }
+
+}
\ No newline at end of file
diff --git a/pulsar-functions/proto/src/main/proto/Request.proto 
b/pulsar-functions/proto/src/main/proto/Request.proto
index c8e31d75ed..574346911c 100644
--- a/pulsar-functions/proto/src/main/proto/Request.proto
+++ b/pulsar-functions/proto/src/main/proto/Request.proto
@@ -37,8 +37,3 @@ message ServiceRequest {
     FunctionMetaData functionMetaData = 3;
     string workerId = 4;
 }
-
-message AssignmentsUpdate {
-    repeated Assignment assignments = 1;
-    uint64 version = 2;
-}
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
index 366eaba6a0..3ad6c7c6ee 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
@@ -18,14 +18,16 @@
  */
 package org.apache.pulsar.functions.worker;
 
-import lombok.extern.slf4j.Slf4j;
+import java.io.IOException;
+import java.util.function.Function;
+
 import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
-import org.apache.pulsar.functions.proto.Request;
+import org.apache.pulsar.functions.proto.Function.Assignment;
 
-import java.io.IOException;
-import java.util.function.Function;
+import lombok.extern.slf4j.Slf4j;
 
 @Slf4j
 public class FunctionAssignmentTailer
@@ -34,15 +36,16 @@
         private final FunctionRuntimeManager functionRuntimeManager;
         private final Reader<byte[]> reader;
 
-    public FunctionAssignmentTailer(FunctionRuntimeManager 
functionRuntimeManager,
-                Reader<byte[]> reader)
+    public FunctionAssignmentTailer(FunctionRuntimeManager 
functionRuntimeManager)
             throws PulsarClientException {
-            this.functionRuntimeManager = functionRuntimeManager;
-            this.reader = reader;
-        }
+        this.functionRuntimeManager = functionRuntimeManager;
 
-    public void start() {
+        this.reader = 
functionRuntimeManager.getWorkerService().getClient().newReader()
+                
.topic(functionRuntimeManager.getWorkerConfig().getFunctionAssignmentTopic()).readCompacted(true)
+                .startMessageId(MessageId.earliest).create();
+    }
 
+    public void start() {
         receiveOne();
     }
 
@@ -65,29 +68,21 @@ public void close() {
 
     @Override
     public void accept(Message<byte[]> msg) {
-
-        // check if latest
-        boolean hasMessageAvailable;
-        try {
-            hasMessageAvailable = this.reader.hasMessageAvailable();
-        } catch (PulsarClientException e) {
-            throw new RuntimeException(e);
-        }
-        if (!hasMessageAvailable) {
-            Request.AssignmentsUpdate assignmentsUpdate;
+        if(msg.getData()==null || (msg.getData().length==0)) {
+            log.info("Received assignment delete: {}", msg.getKey());
+            this.functionRuntimeManager.deleteAssignment(msg.getKey());
+        } else {
+            Assignment assignment;
             try {
-                assignmentsUpdate = 
Request.AssignmentsUpdate.parseFrom(msg.getData());
+                assignment = Assignment.parseFrom(msg.getData());
             } catch (IOException e) {
                 log.error("[{}] Received bad assignment update at message {}", 
reader.getTopic(), msg.getMessageId(),
                         e);
                 // TODO: find a better way to handle bad request
                 throw new RuntimeException(e);
             }
-            if (log.isDebugEnabled()) {
-                log.debug("Received assignment update: {}", assignmentsUpdate);
-            }
-
-            
this.functionRuntimeManager.processAssignmentUpdate(msg.getMessageId(), 
assignmentsUpdate);
+            log.info("Received assignment update: {}", assignment);
+            this.functionRuntimeManager.processAssignment(assignment);    
         }
         // receive next request
         receiveOne();
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index 43cd27bcb5..e634130f5e 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -18,48 +18,45 @@
  */
 package org.apache.pulsar.functions.worker;
 
-import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
 import java.net.URI;
-import java.net.URISyntaxException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.stream.Collectors;
+
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.client.Client;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+import javax.ws.rs.core.UriBuilder;
 
-import lombok.extern.slf4j.Slf4j;
 import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.common.policies.data.ErrorData;
-import org.apache.pulsar.functions.proto.Function.Assignment;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
+import org.apache.pulsar.functions.proto.Function.Assignment;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
-import org.apache.pulsar.functions.proto.Request.AssignmentsUpdate;
-import org.apache.pulsar.functions.runtime.RuntimeFactory;
 import org.apache.pulsar.functions.runtime.ProcessRuntimeFactory;
 import org.apache.pulsar.functions.runtime.Runtime;
-import org.apache.pulsar.functions.runtime.ThreadRuntimeFactory;
+import org.apache.pulsar.functions.runtime.RuntimeFactory;
 import org.apache.pulsar.functions.runtime.RuntimeSpawner;
+import org.apache.pulsar.functions.runtime.ThreadRuntimeFactory;
 
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.client.Client;
-import javax.ws.rs.client.ClientBuilder;
-import javax.ws.rs.client.Entity;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-import javax.ws.rs.core.UriBuilder;
-import javax.ws.rs.core.Response.Status;
+import com.google.common.annotations.VisibleForTesting;
 
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.stream.Collectors;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
 
 /**
  * This class managers all aspects of functions assignments and running of 
function assignments for this worker
@@ -78,13 +75,12 @@
     Map<String, FunctionRuntimeInfo> functionRuntimeInfoMap = new 
ConcurrentHashMap<>();
 
     @VisibleForTesting
+    @Getter
     final WorkerConfig workerConfig;
 
     @VisibleForTesting
     LinkedBlockingQueue<FunctionAction> actionQueue;
 
-    private long currentAssignmentVersion = 0;
-
     private final FunctionAssignmentTailer functionAssignmentTailer;
 
     private FunctionActioner functionActioner;
@@ -92,22 +88,19 @@
     private RuntimeFactory runtimeFactory;
 
     private MembershipManager membershipManager;
-    private final ConnectorsManager connectorsManager;
     
     private final PulsarAdmin functionAdmin;
+    
+    @Getter
+    private WorkerService workerService;
 
     public FunctionRuntimeManager(WorkerConfig workerConfig, WorkerService 
workerService, Namespace dlogNamespace,
             MembershipManager membershipManager, ConnectorsManager 
connectorsManager) throws Exception {
         this.workerConfig = workerConfig;
-        this.connectorsManager = connectorsManager;
+        this.workerService = workerService;
         this.functionAdmin = workerService.getFunctionAdmin();
 
-        Reader<byte[]> reader = workerService.getClient().newReader()
-                .topic(this.workerConfig.getFunctionAssignmentTopic())
-                .startMessageId(MessageId.earliest)
-                .create();
-
-        this.functionAssignmentTailer = new FunctionAssignmentTailer(this, 
reader);
+        this.functionAssignmentTailer = new FunctionAssignmentTailer(this);
 
         AuthenticationConfig authConfig = AuthenticationConfig.builder()
                 
.clientAuthenticationPlugin(workerConfig.getClientAuthenticationPlugin())
@@ -222,13 +215,6 @@ public synchronized Assignment 
findFunctionAssignment(String tenant, String name
         return assignments;
     }
 
-    /**
-     * get the current version number of assignments
-     * @return assignments version number
-     */
-    public synchronized long getCurrentAssignmentVersion() {
-        return new Long(this.currentAssignmentVersion);
-    }
 
     /**
      * Removes a collection of assignments
@@ -461,105 +447,108 @@ private void stopFunction(String 
fullyQualifiedInstanceId, boolean restart) thro
     /**
      * Process an assignment update from the assignment topic
      * @param messageId the message id of the update assignment
-     * @param assignmentsUpdate the assignment update
+     * @param newAssignment the assignment
      */
-    public synchronized void processAssignmentUpdate(MessageId messageId, 
AssignmentsUpdate assignmentsUpdate) {
+    public synchronized void processAssignment(Assignment newAssignment) {
 
-        if (assignmentsUpdate.getVersion() > this.currentAssignmentVersion) {
+        Map<String, Assignment> existingAssignmentMap = new HashMap<>();
+        for (Map<String, Assignment> entry : 
this.workerIdToAssignments.values()) {
+            existingAssignmentMap.putAll(entry);
+        }
 
-            Map<String, Assignment> assignmentMap = new HashMap<>();
-            for (Assignment assignment : 
assignmentsUpdate.getAssignmentsList()) {
-                assignmentMap.put(
-                        
Utils.getFullyQualifiedInstanceId(assignment.getInstance()),
-                        assignment);
+        if 
(existingAssignmentMap.containsKey(Utils.getFullyQualifiedInstanceId(newAssignment.getInstance())))
 {
+            updateAssignment(newAssignment);
+        } else {
+            addAssignment(newAssignment);
+        }
+    }
+
+    private void updateAssignment(Assignment assignment) {
+        String fullyQualifiedInstanceId = 
Utils.getFullyQualifiedInstanceId(assignment.getInstance());
+        Assignment existingAssignment = this.findAssignment(assignment);
+        // potential updates need to happen
+        if (!existingAssignment.equals(assignment)) {
+            FunctionRuntimeInfo functionRuntimeInfo = 
this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId);
+            //stop function
+            if (functionRuntimeInfo != null) {
+                this.insertStopAction(functionRuntimeInfo);
             }
-            Map<String, Assignment> existingAssignmentMap = new HashMap<>();
-            for (Map<String, Assignment> entry : 
this.workerIdToAssignments.values()) {
-                existingAssignmentMap.putAll(entry);
+            // still assigned to me, need to restart
+            if 
(assignment.getWorkerId().equals(this.workerConfig.getWorkerId())) {
+                //start again
+                FunctionRuntimeInfo newFunctionRuntimeInfo = new 
FunctionRuntimeInfo();
+                
newFunctionRuntimeInfo.setFunctionInstance(assignment.getInstance());
+                this.insertStartAction(newFunctionRuntimeInfo);
+                this.setFunctionRuntimeInfo(fullyQualifiedInstanceId, 
newFunctionRuntimeInfo);
             }
 
-            Map<String, Assignment> assignmentsToAdd = diff(assignmentMap, 
existingAssignmentMap);
-
-            Map<String, Assignment> assignmentsToDelete = 
diff(existingAssignmentMap, assignmentMap);
-
-            Map<String, Assignment> existingAssignments = 
inCommon(assignmentMap, existingAssignmentMap);
-
-            // functions to add
-            for (Map.Entry<String, Assignment> assignmentEntry : 
assignmentsToAdd.entrySet()) {
-                String fullyQualifiedInstanceId = assignmentEntry.getKey();
-                Assignment assignment = assignmentEntry.getValue();
-
-                //add new function
-                this.setAssignment(assignment);
-
-                //Assigned to me
-                if 
(assignment.getWorkerId().equals(workerConfig.getWorkerId())) {
-                    if 
(!this.functionRuntimeInfoMap.containsKey(fullyQualifiedInstanceId)) {
-                        this.setFunctionRuntimeInfo(fullyQualifiedInstanceId, 
new FunctionRuntimeInfo()
-                                
.setFunctionInstance(assignment.getInstance()));
-
-                    } else {
-                        //Somehow this function is already started
-                        log.warn("Function {} already running. Going to 
restart function.",
-                                
this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId));
-                        
this.insertStopAction(this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId));
-                    }
-                    FunctionRuntimeInfo functionRuntimeInfo = 
this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId);
-                    this.insertStartAction(functionRuntimeInfo);
-                }
+            // find existing assignment
+            Assignment existing_assignment = this.findAssignment(assignment);
+            if (existing_assignment != null) {
+                // delete old assignment that could have old data
+                this.deleteAssignment(existing_assignment);
             }
+            // set to newest assignment
+            this.setAssignment(assignment);
+        }
+    }
+   
+    public synchronized void deleteAssignment(String fullyQualifiedInstanceId) 
{
+        FunctionRuntimeInfo functionRuntimeInfo = 
this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId);
+        if (functionRuntimeInfo != null) {
+            this.insertStopAction(functionRuntimeInfo);
+            this.deleteFunctionRuntimeInfo(fullyQualifiedInstanceId);
+        }
+        
+        String workerId = null;
+        for(Entry<String, Map<String, Assignment>> workerAssignments : 
workerIdToAssignments.entrySet()) {
+            
if(workerAssignments.getValue().remove(fullyQualifiedInstanceId)!=null) {
+                workerId = workerAssignments.getKey();
+                break;
+            }
+        }
+        Map<String, Assignment> worker;
+        if (workerId != null && ((worker = 
workerIdToAssignments.get(workerId)) != null && worker.isEmpty())) {
+            this.workerIdToAssignments.remove(workerId);
+        }
+    }
 
-            // functions to delete
-            for (Map.Entry<String, Assignment> assignmentEntry : 
assignmentsToDelete.entrySet()) {
-                String fullyQualifiedInstanceId = assignmentEntry.getKey();
-                Assignment assignment = assignmentEntry.getValue();
-
-                FunctionRuntimeInfo functionRuntimeInfo = 
this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId);
-                if (functionRuntimeInfo != null) {
-                    this.insertStopAction(functionRuntimeInfo);
-                    this.deleteFunctionRuntimeInfo(fullyQualifiedInstanceId);
-                }
-                this.deleteAssignment(assignment);
+    @VisibleForTesting
+    void deleteAssignment(Assignment assignment) {
+        String fullyQualifiedInstanceId = 
Utils.getFullyQualifiedInstanceId(assignment.getInstance());
+        Map<String, Assignment> assignmentMap = 
this.workerIdToAssignments.get(assignment.getWorkerId());
+        if (assignmentMap != null) {
+            if (assignmentMap.containsKey(fullyQualifiedInstanceId)) {
+                assignmentMap.remove(fullyQualifiedInstanceId);
             }
+            if (assignmentMap.isEmpty()) {
+                this.workerIdToAssignments.remove(assignment.getWorkerId());
+            }
+        }
+    }
 
-            // functions to update
-            for (Map.Entry<String, Assignment> assignmentEntry : 
existingAssignments.entrySet()) {
-                String fullyQualifiedInstanceId = assignmentEntry.getKey();
-                Assignment assignment = assignmentEntry.getValue();
-                Assignment existingAssignment = 
this.findAssignment(assignment);
-                // potential updates need to happen
-                if (!existingAssignment.equals(assignment)) {
-                    FunctionRuntimeInfo functionRuntimeInfo = 
this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId);
-                    //stop function
-                    if (functionRuntimeInfo != null) {
-                        this.insertStopAction(functionRuntimeInfo);
-                    }
-                    // still assigned to me, need to restart
-                    if 
(assignment.getWorkerId().equals(this.workerConfig.getWorkerId())) {
-                        //start again
-                        FunctionRuntimeInfo newFunctionRuntimeInfo = new 
FunctionRuntimeInfo();
-                        
newFunctionRuntimeInfo.setFunctionInstance(assignment.getInstance());
-                        this.insertStartAction(newFunctionRuntimeInfo);
-                        this.setFunctionRuntimeInfo(fullyQualifiedInstanceId, 
newFunctionRuntimeInfo);
-                    }
+    private void addAssignment(Assignment assignment) {
+        String fullyQualifiedInstanceId = 
Utils.getFullyQualifiedInstanceId(assignment.getInstance());
 
-                    // find existing assignment
-                    Assignment existing_assignment = 
this.findAssignment(assignment);
-                    if (existing_assignment != null) {
-                        // delete old assignment that could have old data
-                        this.deleteAssignment(existing_assignment);
-                    }
-                    // set to newest assignment
-                    this.setAssignment(assignment);
-                }
-            }
+        //add new function
+        this.setAssignment(assignment);
 
-            // set as current assignment
-            this.currentAssignmentVersion = assignmentsUpdate.getVersion();
+        //Assigned to me
+        if (assignment.getWorkerId().equals(workerConfig.getWorkerId())) {
+            if 
(!this.functionRuntimeInfoMap.containsKey(fullyQualifiedInstanceId)) {
+                this.setFunctionRuntimeInfo(fullyQualifiedInstanceId, new 
FunctionRuntimeInfo()
+                        .setFunctionInstance(assignment.getInstance()));
 
-        } else {
-            log.debug("Received out of date assignment update: {}", 
assignmentsUpdate);
+            } else {
+                //Somehow this function is already started
+                log.warn("Function {} already running. Going to restart 
function.",
+                        
this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId));
+                
this.insertStopAction(this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId));
+            }
+            FunctionRuntimeInfo functionRuntimeInfo = 
this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId);
+            this.insertStartAction(functionRuntimeInfo);
         }
+        
     }
 
     public Map<String, FunctionRuntimeInfo> getFunctionRuntimeInfos() {
@@ -642,20 +631,6 @@ void setAssignment(Assignment assignment) {
                 assignment);
     }
 
-    @VisibleForTesting
-    void deleteAssignment(Assignment assignment) {
-        Map<String, Assignment> assignmentMap = 
this.workerIdToAssignments.get(assignment.getWorkerId());
-        if (assignmentMap != null) {
-            String fullyQualifiedInstanceId = 
Utils.getFullyQualifiedInstanceId(assignment.getInstance());
-            if (assignmentMap.containsKey(fullyQualifiedInstanceId)) {
-                assignmentMap.remove(fullyQualifiedInstanceId);
-            }
-            if (assignmentMap.isEmpty()) {
-                this.workerIdToAssignments.remove(assignment.getWorkerId());
-            }
-        }
-    }
-
     private void deleteFunctionRuntimeInfo(String fullyQualifiedInstanceId) {
         this.functionRuntimeInfoMap.remove(fullyQualifiedInstanceId);
     }
@@ -674,28 +649,8 @@ public void close() throws Exception {
         }
     }
 
-    private Map<String, Assignment> diff(Map<String, Assignment> 
assignmentMap1, Map<String, Assignment> assignmentMap2) {
-        Map<String, Assignment> result = new HashMap<>();
-        for (Map.Entry<String, Assignment> entry : assignmentMap1.entrySet()) {
-            if (!assignmentMap2.containsKey(entry.getKey())) {
-                result.put(entry.getKey(), entry.getValue());
-            }
-        }
-        return result;
-    }
-
-    private Map<String, Assignment> inCommon(Map<String, Assignment> 
assignmentMap1, Map<String, Assignment> assignmentMap2) {
-
-        Map<String, Assignment> result = new HashMap<>();
-        for (Map.Entry<String, Assignment> entry : assignmentMap1.entrySet()) {
-            if (assignmentMap2.containsKey(entry.getKey())) {
-                result.put(entry.getKey(), entry.getValue());
-            }
-        }
-        return result;
-    }
-
     private FunctionRuntimeInfo getFunctionRuntimeInfo(String 
fullyQualifiedInstanceId) {
         return this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId);
     }
+
 }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
index ed00958d7e..db7785a55b 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
@@ -23,30 +23,29 @@
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
-import lombok.Setter;
-import lombok.extern.slf4j.Slf4j;
-
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.CompressionType;
-import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.Assignment;
 import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
-import org.apache.pulsar.functions.proto.Request;
 import org.apache.pulsar.functions.utils.Reflections;
 import org.apache.pulsar.functions.worker.scheduler.IScheduler;
 
+import com.google.common.annotations.VisibleForTesting;
+
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+
 @Slf4j
 public class SchedulerManager implements AutoCloseable {
 
@@ -65,16 +64,22 @@
 
     private final Producer<byte[]> producer;
 
-    private final ExecutorService executorService;
+    private final ScheduledExecutorService executorService;
+    
+    private final PulsarAdmin admin;
+    
+    AtomicBoolean isCompactionNeeded = new AtomicBoolean(false);
+    private static final long DEFAULT_ADMIN_API_BACKOFF_SEC = 60; 
 
-    public SchedulerManager(WorkerConfig workerConfig, PulsarClient 
pulsarClient) {
+    public SchedulerManager(WorkerConfig workerConfig, PulsarClient 
pulsarClient, PulsarAdmin admin, ScheduledExecutorService executor) {
         this.workerConfig = workerConfig;
+        this.admin = admin;
         this.scheduler = 
Reflections.createInstance(workerConfig.getSchedulerClassName(), 
IScheduler.class,
                 Thread.currentThread().getContextClassLoader());
 
         try {
             this.producer = 
pulsarClient.newProducer().topic(this.workerConfig.getFunctionAssignmentTopic())
-                    
.enableBatching(true).blockIfQueueFull(true).compressionType(CompressionType.LZ4).
+                    
.enableBatching(false).blockIfQueueFull(true).compressionType(CompressionType.LZ4).
                     sendTimeout(0, TimeUnit.MILLISECONDS).create();
         } catch (PulsarClientException e) {
             log.error("Failed to create producer to function assignment topic "
@@ -82,9 +87,9 @@ public SchedulerManager(WorkerConfig workerConfig, 
PulsarClient pulsarClient) {
             throw new RuntimeException(e);
         }
 
-        this.executorService =
-                new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
-                        new LinkedBlockingQueue<>());
+        this.executorService = executor;
+        
+        scheduleCompaction(executor, 
workerConfig.getTopicCompactionFrequencySec());
     }
 
     public Future<?> schedule() {
@@ -92,13 +97,31 @@ public SchedulerManager(WorkerConfig workerConfig, 
PulsarClient pulsarClient) {
             synchronized (SchedulerManager.this) {
                 boolean isLeader = membershipManager.isLeader();
                 if (isLeader) {
-                    invokeScheduler();
+                    try {
+                        invokeScheduler();
+                    } catch (Exception e) {
+                        log.warn("Failed to invoke scheduler", e);
+                        schedule();
+                    }
                 }
             }
         });
     }
 
-    private void invokeScheduler() {
+    private void scheduleCompaction(ScheduledExecutorService executor, long 
scheduleFrequencySec) {
+        if (executor != null) {
+            executor.scheduleWithFixedDelay(() -> {
+                if (membershipManager.isLeader() && isCompactionNeeded.get()) {
+                    compactAssignmentTopic();
+                    isCompactionNeeded.set(false);
+                }
+            }, scheduleFrequencySec, scheduleFrequencySec, TimeUnit.SECONDS);
+        }
+    }
+    
+    @VisibleForTesting
+    public void invokeScheduler() {
+        
         List<String> currentMembership = 
this.membershipManager.getCurrentMembership()
                 .stream().map(workerInfo -> 
workerInfo.getWorkerId()).collect(Collectors.toList());
 
@@ -111,12 +134,16 @@ private void invokeScheduler() {
         while (it.hasNext()) {
             Map.Entry<String, Map<String, Assignment>> 
workerIdToAssignmentEntry = it.next();
             Map<String, Assignment> functionMap = 
workerIdToAssignmentEntry.getValue();
+
             // remove instances that don't exist anymore
-            functionMap.entrySet().removeIf(
-                    entry -> {
-                        String fullyQualifiedInstanceId = entry.getKey();
-                        return 
!allInstances.containsKey(fullyQualifiedInstanceId);
-                    });
+            functionMap.entrySet().removeIf(entry -> {
+                String fullyQualifiedInstanceId = entry.getKey();
+                boolean deleted = 
!allInstances.containsKey(fullyQualifiedInstanceId);
+                if (deleted) {
+                    publishNewAssignment(entry.getValue().toBuilder().build(), 
true);
+                }
+                return deleted;
+            });
 
             // update assignment instances in case attributes of a function 
gets updated
             for (Map.Entry<String, Assignment> entry : functionMap.entrySet()) 
{
@@ -143,36 +170,40 @@ private void invokeScheduler() {
         List<Assignment> assignments = this.scheduler.schedule(
                 needsAssignment, currentAssignments, currentMembership);
 
-        log.debug("New assignments computed: {}", assignments);
+        if (log.isDebugEnabled()) {
+            log.debug("New assignments computed: {}", assignments);
+        }
 
-        long assignmentVersion = 
this.functionRuntimeManager.getCurrentAssignmentVersion() + 1;
-        Request.AssignmentsUpdate assignmentsUpdate = 
Request.AssignmentsUpdate.newBuilder()
-                .setVersion(assignmentVersion)
-                .addAllAssignments(assignments)
-                .build();
+        isCompactionNeeded.set(!assignments.isEmpty());
 
-        CompletableFuture<MessageId> messageIdCompletableFuture = 
producer.sendAsync(assignmentsUpdate.toByteArray());
-        try {
-            messageIdCompletableFuture.get();
-        } catch (InterruptedException | ExecutionException e) {
-            log.error("Failed to send assignment update", e);
-            throw new RuntimeException(e);
+        for(Assignment assignment : assignments) {
+            publishNewAssignment(assignment, false);
         }
+        
+    }
 
-        // wait for assignment update to go throw the pipeline
-        int retries = 0;
-        while (this.functionRuntimeManager.getCurrentAssignmentVersion() < 
assignmentVersion) {
-            if (retries >= this.workerConfig.getAssignmentWriteMaxRetries()) {
-                log.warn("Max number of retries reached for waiting for 
assignment to propagate. Will continue now.");
-                break;
-            }
-            log.info("Waiting for assignments to propagate...");
+    public void compactAssignmentTopic() {
+        if (this.admin != null) {
             try {
-                Thread.sleep(500);
-            } catch (InterruptedException e) {
-                throw new RuntimeException(e);
+                
this.admin.topics().triggerCompaction(workerConfig.getFunctionAssignmentTopic());
+            } catch (PulsarAdminException e) {
+                log.error("Failed to trigger compaction {}", e);
+                executorService.schedule(() -> compactAssignmentTopic(), 
DEFAULT_ADMIN_API_BACKOFF_SEC,
+                        TimeUnit.SECONDS);
             }
-            retries++;
+        }
+    }
+
+    private void publishNewAssignment(Assignment assignment, boolean deleted) {
+        try {
+            String fullyQualifiedInstanceId = 
Utils.getFullyQualifiedInstanceId(assignment.getInstance());
+            // publish empty message with instance-id key so, compactor can 
delete and skip delivery of this instance-id
+            // message 
+            producer.newMessage().key(fullyQualifiedInstanceId)
+                    .value(deleted ? "".getBytes() : 
assignment.toByteArray()).sendAsync().get();
+        } catch (Exception e) {
+            log.error("Failed to {} assignment update {}", assignment, deleted 
? "send" : "deleted", e);
+            throw new RuntimeException(e);
         }
     }
 
@@ -224,6 +255,5 @@ public void close() {
         } catch (PulsarClientException e) {
             log.warn("Failed to shutdown scheduler manager assignment 
producer", e);
         }
-        this.executorService.shutdown();
     }
 }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index 0f695a974f..a2524c6d97 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -74,6 +74,9 @@
     private long instanceLivenessCheckFreqMs;
     private String clientAuthenticationPlugin;
     private String clientAuthenticationParameters;
+    // Frequency how often worker performs compaction on function-topics
+    private long topicCompactionFrequencySec = 30 * 60; // 30 minutes
+    private int metricsSamplingPeriodSec = 60;
     /***** --- TLS --- ****/
     // Enable TLS
     private boolean tlsEnabled = false;
@@ -88,8 +91,6 @@
     private boolean tlsRequireTrustedClientCertOnConnect = false;
     private boolean useTls = false;
     private boolean tlsHostnameVerificationEnable = false;
-    
-    private int metricsSamplingPeriodSec = 60;
     // Enforce authentication
     private boolean authenticationEnabled = false;
     // Autentication provider name list, which is a list of class names
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
index 7fc0cc94d7..488bcd775d 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
@@ -20,6 +20,7 @@
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
 
 import io.netty.util.concurrent.DefaultThreadFactory;
 
@@ -68,11 +69,15 @@
     private PulsarAdmin brokerAdmin;
     private PulsarAdmin functionAdmin;
     private final MetricsGenerator metricsGenerator;
+    private final ScheduledExecutorService executor;
+    @VisibleForTesting
+    private URI dlogUri;
 
     public WorkerService(WorkerConfig workerConfig) {
         this.workerConfig = workerConfig;
         this.statsUpdater = Executors
                 .newSingleThreadScheduledExecutor(new 
DefaultThreadFactory("worker-stats-updater"));
+        this.executor = Executors.newScheduledThreadPool(10, new 
DefaultThreadFactory("pulsar-worker"));
         this.metricsGenerator = new MetricsGenerator(this.statsUpdater);
     }
 
@@ -98,12 +103,13 @@ public void start(URI dlogUri) throws InterruptedException 
{
         }
 
         // create the dlog namespace for storing function packages
+        this.dlogUri = dlogUri;
         DistributedLogConfiguration dlogConf = Utils.getDlogConf(workerConfig);
         try {
             this.dlogNamespace = NamespaceBuilder.newBuilder()
                     .conf(dlogConf)
                     .clientId("function-worker-" + workerConfig.getWorkerId())
-                    .uri(dlogUri)
+                    .uri(this.dlogUri)
                     .build();
         } catch (Exception e) {
             log.error("Failed to initialize dlog namespace {} for storing 
function packages",
@@ -128,7 +134,8 @@ public void start(URI dlogUri) throws InterruptedException {
             log.info("Created Pulsar client");
 
             //create scheduler manager
-            this.schedulerManager = new SchedulerManager(this.workerConfig, 
this.client);
+            this.schedulerManager = new SchedulerManager(this.workerConfig, 
this.client, this.brokerAdmin,
+                    this.executor);
 
             //create function meta data manager
             this.functionMetaDataManager = new FunctionMetaDataManager(
@@ -232,6 +239,10 @@ public void stop() {
         if (null != this.functionAdmin) {
             this.functionAdmin.close();
         }
+        
+        if(this.executor != null) {
+            this.executor.shutdown();
+        }
     }
 
 }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/scheduler/RoundRobinScheduler.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/scheduler/RoundRobinScheduler.java
index 58c1a9a513..817962d4f1 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/scheduler/RoundRobinScheduler.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/scheduler/RoundRobinScheduler.java
@@ -22,6 +22,8 @@
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.Function.Instance;
 
+import com.google.common.collect.Lists;
+
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -47,19 +49,17 @@
             
workerIdToAssignment.get(existingAssignment.getWorkerId()).add(existingAssignment);
         }
 
+        List<Assignment> newAssignments = Lists.newArrayList();
         for (Instance unassignedFunctionInstance : 
unassignedFunctionInstances) {
             String heartBeatWorkerId = 
checkHeartBeatFunction(unassignedFunctionInstance);
             String workerId = heartBeatWorkerId != null ? heartBeatWorkerId : 
findNextWorker(workerIdToAssignment);
             Assignment newAssignment = 
Assignment.newBuilder().setInstance(unassignedFunctionInstance)
                     .setWorkerId(workerId).build();
             workerIdToAssignment.get(workerId).add(newAssignment);
+            newAssignments.add(newAssignment);
         }
 
-        List<Assignment> assignments
-                = workerIdToAssignment.entrySet().stream()
-                .flatMap(entry -> 
entry.getValue().stream()).collect(Collectors.toList());
-
-        return assignments;
+        return newAssignments;
     }
 
     private static String checkHeartBeatFunction(Instance funInstance) {
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
index 85a2122dd0..5e1ed023aa 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
@@ -37,6 +37,7 @@
 import java.util.Map;
 
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.argThat;
 import static org.mockito.Mockito.doReturn;
@@ -86,6 +87,8 @@ public void testProcessAssignmentUpdateAddFunctions() throws 
Exception {
         doReturn(readerBuilder).when(pulsarClient).newReader();
         doReturn(readerBuilder).when(readerBuilder).topic(anyString());
         doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
+        doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
+        
doReturn(readerBuilder).when(readerBuilder).readCompacted(anyBoolean());
         doReturn(mock(Reader.class)).when(readerBuilder).create();
         WorkerService workerService = mock(WorkerService.class);
         doReturn(pulsarClient).when(workerService).getClient();
@@ -123,12 +126,8 @@ public void testProcessAssignmentUpdateAddFunctions() 
throws Exception {
         assignments.add(assignment1);
         assignments.add(assignment2);
 
-        Request.AssignmentsUpdate assignmentsUpdate = 
Request.AssignmentsUpdate.newBuilder()
-                .addAllAssignments(assignments)
-                .setVersion(1)
-                .build();
-
-        functionRuntimeManager.processAssignmentUpdate(MessageId.earliest, 
assignmentsUpdate);
+        functionRuntimeManager.processAssignment(assignment1);
+        functionRuntimeManager.processAssignment(assignment2);
 
         verify(functionRuntimeManager, 
times(2)).setAssignment(any(Function.Assignment.class));
         verify(functionRuntimeManager, 
times(0)).deleteAssignment(any(Function.Assignment.class));
@@ -183,6 +182,7 @@ public void testProcessAssignmentUpdateDeleteFunctions() 
throws Exception {
         doReturn(readerBuilder).when(pulsarClient).newReader();
         doReturn(readerBuilder).when(readerBuilder).topic(anyString());
         doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
+        
doReturn(readerBuilder).when(readerBuilder).readCompacted(anyBoolean());
         doReturn(mock(Reader.class)).when(readerBuilder).create();
         WorkerService workerService = mock(WorkerService.class);
         doReturn(pulsarClient).when(workerService).getClient();
@@ -205,6 +205,7 @@ public void testProcessAssignmentUpdateDeleteFunctions() 
throws Exception {
                 Function.FunctionDetails.newBuilder()
                         
.setTenant("test-tenant").setNamespace("test-namespace").setName("func-2")).build();
 
+        // Delete this assignment
         Function.Assignment assignment1 = Function.Assignment.newBuilder()
                 .setWorkerId("worker-1")
                 .setInstance(Function.Instance.newBuilder()
@@ -221,23 +222,18 @@ public void testProcessAssignmentUpdateDeleteFunctions() 
throws Exception {
         functionRuntimeManager.setAssignment(assignment2);
         reset(functionRuntimeManager);
 
-        List<Function.Assignment> assignments = new LinkedList<>();
-        assignments.add(assignment2);
-
-        Request.AssignmentsUpdate assignmentsUpdate = 
Request.AssignmentsUpdate.newBuilder()
-                .addAllAssignments(assignments)
-                .setVersion(1)
-                .build();
-
         functionRuntimeManager.functionRuntimeInfoMap.put(
                 "test-tenant/test-namespace/func-1:0", new 
FunctionRuntimeInfo().setFunctionInstance(
                         
Function.Instance.newBuilder().setFunctionMetaData(function1).setInstanceId(0)
                                 .build()));
 
-        functionRuntimeManager.processAssignmentUpdate(MessageId.earliest, 
assignmentsUpdate);
+        functionRuntimeManager.processAssignment(assignment1);
+        functionRuntimeManager.processAssignment(assignment2);
 
+        
functionRuntimeManager.deleteAssignment(Utils.getFullyQualifiedInstanceId(assignment1.getInstance()));
+        
         verify(functionRuntimeManager, 
times(0)).setAssignment(any(Function.Assignment.class));
-        verify(functionRuntimeManager, 
times(1)).deleteAssignment(any(Function.Assignment.class));
+        verify(functionRuntimeManager, 
times(1)).deleteAssignment(any(String.class));
 
         
Assert.assertEquals(functionRuntimeManager.workerIdToAssignments.size(), 1);
         Assert.assertEquals(functionRuntimeManager.workerIdToAssignments
@@ -284,6 +280,7 @@ public void testProcessAssignmentUpdateModifyFunctions() 
throws Exception {
         doReturn(readerBuilder).when(pulsarClient).newReader();
         doReturn(readerBuilder).when(readerBuilder).topic(anyString());
         doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
+        
doReturn(readerBuilder).when(readerBuilder).readCompacted(anyBoolean());
         doReturn(mock(Reader.class)).when(readerBuilder).create();
         WorkerService workerService = mock(WorkerService.class);
         doReturn(pulsarClient).when(workerService).getClient();
@@ -327,14 +324,6 @@ public void testProcessAssignmentUpdateModifyFunctions() 
throws Exception {
                 .setInstance(Function.Instance.newBuilder()
                         
.setFunctionMetaData(function2).setInstanceId(0).build())
                 .build();
-        List<Function.Assignment> assignments = new LinkedList<>();
-        assignments.add(assignment1);
-        assignments.add(assignment3);
-
-        Request.AssignmentsUpdate assignmentsUpdate = 
Request.AssignmentsUpdate.newBuilder()
-                .addAllAssignments(assignments)
-                .setVersion(1)
-                .build();
 
         functionRuntimeManager.functionRuntimeInfoMap.put(
                 "test-tenant/test-namespace/func-1:0", new 
FunctionRuntimeInfo().setFunctionInstance(
@@ -345,7 +334,8 @@ public void testProcessAssignmentUpdateModifyFunctions() 
throws Exception {
                         
Function.Instance.newBuilder().setFunctionMetaData(function2).setInstanceId(0)
                                 .build()));
 
-        functionRuntimeManager.processAssignmentUpdate(MessageId.earliest, 
assignmentsUpdate);
+        functionRuntimeManager.processAssignment(assignment1);
+        functionRuntimeManager.processAssignment(assignment3);
 
         verify(functionRuntimeManager, 
times(1)).insertStopAction(any(FunctionRuntimeInfo.class));
         verify(functionRuntimeManager).insertStopAction(argThat(new 
ArgumentMatcher<FunctionRuntimeInfo>() {
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
index 2753bf196a..c849fd36de 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
@@ -131,6 +131,7 @@ public void testCheckFailuresNoFailures() throws Exception {
         ReaderBuilder<byte[]> readerBuilder = mock(ReaderBuilder.class);
         doReturn(readerBuilder).when(pulsarClient).newReader();
         doReturn(readerBuilder).when(readerBuilder).topic(anyString());
+        doReturn(readerBuilder).when(readerBuilder).readCompacted(true);
         doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
         doReturn(mock(Reader.class)).when(readerBuilder).create();
         WorkerService workerService = mock(WorkerService.class);
@@ -202,6 +203,7 @@ public void testCheckFailuresSomeFailures() throws 
Exception {
         doReturn(readerBuilder).when(pulsarClient).newReader();
         doReturn(readerBuilder).when(readerBuilder).topic(anyString());
         doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
+        doReturn(readerBuilder).when(readerBuilder).readCompacted(true);
         doReturn(mock(Reader.class)).when(readerBuilder).create();
         WorkerService workerService = mock(WorkerService.class);
         doReturn(pulsarClient).when(workerService).getClient();
@@ -296,6 +298,7 @@ public void testCheckFailuresSomeUnassigned() throws 
Exception {
         ReaderBuilder<byte[]> readerBuilder = mock(ReaderBuilder.class);
         doReturn(readerBuilder).when(pulsarClient).newReader();
         doReturn(readerBuilder).when(readerBuilder).topic(anyString());
+        doReturn(readerBuilder).when(readerBuilder).readCompacted(true);
         doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
         doReturn(mock(Reader.class)).when(readerBuilder).create();
         WorkerService workerService = mock(WorkerService.class);
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
index 19977bd812..00e4b6e0e1 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
@@ -28,15 +28,19 @@
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertTrue;
 
 import java.lang.reflect.Method;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -46,6 +50,7 @@
 import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.Assignment;
 import org.apache.pulsar.functions.proto.Request;
@@ -53,9 +58,14 @@
 import org.mockito.Mockito;
 import org.mockito.invocation.Invocation;
 import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import com.google.common.collect.Sets;
+import com.google.protobuf.InvalidProtocolBufferException;
+
+import io.netty.util.concurrent.DefaultThreadFactory;
 import lombok.extern.slf4j.Slf4j;
 
 @Slf4j
@@ -67,6 +77,8 @@
     private MembershipManager membershipManager;
     private CompletableFuture<MessageId> completableFuture;
     private Producer producer;
+    private TypedMessageBuilder<byte[]> message;
+    private ScheduledExecutorService executor;
 
     private static PulsarClient mockPulsarClient() throws 
PulsarClientException {
         ProducerBuilder<byte[]> builder = mock(ProducerBuilder.class);
@@ -94,8 +106,12 @@ public void setup() throws PulsarClientException {
         producer = mock(Producer.class);
         completableFuture = spy(new CompletableFuture<>());
         completableFuture.complete(MessageId.earliest);
-        byte[] bytes = any();
-        when(producer.sendAsync(bytes)).thenReturn(completableFuture);
+        //byte[] bytes = any();
+        message = mock(TypedMessageBuilder.class);
+        when(producer.newMessage()).thenReturn(message);
+        when(message.key(anyString())).thenReturn(message);
+        when(message.value(any())).thenReturn(message);
+        when(message.sendAsync()).thenReturn(completableFuture);
 
         ProducerBuilder<byte[]> builder = mock(ProducerBuilder.class);
         when(builder.topic(anyString())).thenReturn(builder);
@@ -109,7 +125,9 @@ public void setup() throws PulsarClientException {
         PulsarClient pulsarClient = mock(PulsarClient.class);
         when(pulsarClient.newProducer()).thenReturn(builder);
 
-        schedulerManager = spy(new SchedulerManager(workerConfig, 
pulsarClient));
+        this.executor = Executors
+                .newSingleThreadScheduledExecutor(new 
DefaultThreadFactory("worker-test"));
+        schedulerManager = spy(new SchedulerManager(workerConfig, 
pulsarClient, null, executor));
         functionRuntimeManager = mock(FunctionRuntimeManager.class);
         functionMetaDataManager = mock(FunctionMetaDataManager.class);
         membershipManager = mock(MembershipManager.class);
@@ -118,6 +136,11 @@ public void setup() throws PulsarClientException {
         schedulerManager.setMembershipManager(membershipManager);
     }
 
+    @AfterMethod
+    public void stop() {
+        this.executor.shutdown();
+    }
+    
     @Test
     public void testSchedule() throws Exception {
 
@@ -143,9 +166,6 @@ public void testSchedule() throws Exception {
         currentAssignments.put("worker-1", assignmentEntry1);
         
doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments();
 
-        //set version
-        
doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion();
-
         // single node
         List<WorkerInfo> workerInfoList = new LinkedList<>();
         workerInfoList.add(WorkerInfo.of("worker-1", "workerHostname-1", 
5000));
@@ -159,7 +179,9 @@ public void testSchedule() throws Exception {
         // i am leader
         doReturn(true).when(membershipManager).isLeader();
         callSchedule();
-        verify(producer, times(1)).sendAsync(any(byte[].class));
+        List<Invocation> invocations = 
getMethodInvocationDetails(schedulerManager,
+                SchedulerManager.class.getMethod("invokeScheduler"));
+        Assert.assertEquals(invocations.size(), 1);
     }
 
     @Test
@@ -187,9 +209,6 @@ public void testNothingNewToSchedule() throws Exception {
         currentAssignments.put("worker-1", assignmentEntry1);
         
doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments();
 
-        //set version
-        
doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion();
-
         // single node
         List<WorkerInfo> workerInfoList = new LinkedList<>();
         workerInfoList.add(WorkerInfo.of("worker-1", "workerHostname-1", 
5000));
@@ -200,17 +219,8 @@ public void testNothingNewToSchedule() throws Exception {
 
         callSchedule();
 
-        List<Invocation> invocations = getMethodInvocationDetails(producer, 
Producer.class.getMethod("sendAsync",
-                Object.class));
-        Assert.assertEquals(invocations.size(), 1);
-
-        byte[] send = (byte[]) invocations.get(0).getRawArguments()[0];
-        Request.AssignmentsUpdate assignmentsUpdate = 
Request.AssignmentsUpdate.parseFrom(send);
-        log.info("assignmentsUpdate: {}", assignmentsUpdate);
-        Assert.assertEquals(
-                Request.AssignmentsUpdate.newBuilder().setVersion(version + 1)
-                        .addAssignments(assignment1).build(),
-                assignmentsUpdate);
+        List<Invocation> invocations = getMethodInvocationDetails(message, 
TypedMessageBuilder.class.getMethod("sendAsync"));
+        Assert.assertEquals(invocations.size(), 0);
     }
 
     @Test
@@ -243,9 +253,6 @@ public void testAddingFunctions() throws Exception {
         currentAssignments.put("worker-1", assignmentEntry1);
         
doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments();
 
-        //set version
-        
doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion();
-
         // single node
         List<WorkerInfo> workerInfoList = new LinkedList<>();
         workerInfoList.add(WorkerInfo.of("worker-1", "workerHostname-1", 
5000));
@@ -256,23 +263,20 @@ public void testAddingFunctions() throws Exception {
 
         callSchedule();
 
-        List<Invocation> invocations = getMethodInvocationDetails(producer, 
Producer.class.getMethod("sendAsync",
-                Object.class));
+        List<Invocation> invocations = getMethodInvocationDetails(message, 
TypedMessageBuilder.class.getMethod("sendAsync"));
         Assert.assertEquals(invocations.size(), 1);
-
+        invocations = getMethodInvocationDetails(message, 
TypedMessageBuilder.class.getMethod("value",
+                Object.class));
         byte[] send = (byte[]) invocations.get(0).getRawArguments()[0];
-        Request.AssignmentsUpdate assignmentsUpdate = 
Request.AssignmentsUpdate.parseFrom(send);
+        Assignment assignments = Assignment.parseFrom(send);
 
-        log.info("assignmentsUpdate: {}", assignmentsUpdate);
+        log.info("assignments: {}", assignments);
         Function.Assignment assignment2 = Function.Assignment.newBuilder()
                 .setWorkerId("worker-1")
                 .setInstance(Function.Instance.newBuilder()
                         
.setFunctionMetaData(function2).setInstanceId(0).build())
                 .build();
-        Assert.assertEquals(
-                Request.AssignmentsUpdate.newBuilder().setVersion(version + 1)
-                        
.addAssignments(assignment1).addAssignments(assignment2).build(),
-                assignmentsUpdate);
+        Assert.assertEquals(assignment2, assignments);
 
     }
 
@@ -288,7 +292,7 @@ public void testDeletingFunctions() throws Exception {
         // simulate function2 got removed
         Function.FunctionMetaData function2 = 
Function.FunctionMetaData.newBuilder()
                 
.setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-2")
-                        
.setNamespace("namespace-1").setTenant("tenant-1").setParallelism(1)).setVersion(version)
+                        
.setNamespace("namespace-1").setTenant("tenant-1").setParallelism(1))
                 .build();
         functionMetaDataList.add(function1);
         
doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
@@ -300,6 +304,7 @@ public void testDeletingFunctions() throws Exception {
                         
.setFunctionMetaData(function1).setInstanceId(0).build())
                 .build();
 
+        // Delete this assignment
         Function.Assignment assignment2 = Function.Assignment.newBuilder()
                 .setWorkerId("worker-1")
                 .setInstance(Function.Instance.newBuilder()
@@ -309,14 +314,12 @@ public void testDeletingFunctions() throws Exception {
         Map<String, Map<String, Function.Assignment>> currentAssignments = new 
HashMap<>();
         Map<String, Function.Assignment> assignmentEntry1 = new HashMap<>();
         
assignmentEntry1.put(Utils.getFullyQualifiedInstanceId(assignment1.getInstance()),
 assignment1);
+        //TODO: delete this assignment
         
assignmentEntry1.put(Utils.getFullyQualifiedInstanceId(assignment2.getInstance()),
 assignment2);
 
         currentAssignments.put("worker-1", assignmentEntry1);
         
doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments();
 
-        //set version
-        
doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion();
-
         // single node
         List<WorkerInfo> workerInfoList = new LinkedList<>();
         workerInfoList.add(WorkerInfo.of("worker-1", "workerHostname-1", 
5000));
@@ -327,19 +330,16 @@ public void testDeletingFunctions() throws Exception {
 
         callSchedule();
 
-        List<Invocation> invocations = getMethodInvocationDetails(producer, 
Producer.class.getMethod("sendAsync",
-                Object.class));
+        List<Invocation> invocations = getMethodInvocationDetails(message, 
TypedMessageBuilder.class.getMethod("sendAsync"));
         Assert.assertEquals(invocations.size(), 1);
-
+        invocations = getMethodInvocationDetails(message, 
TypedMessageBuilder.class.getMethod("value",
+                Object.class));
         byte[] send = (byte[]) invocations.get(0).getRawArguments()[0];
-        Request.AssignmentsUpdate assignmentsUpdate = 
Request.AssignmentsUpdate.parseFrom(send);
+        Assignment assignments = Assignment.parseFrom(send);
 
-        log.info("assignmentsUpdate: {}", assignmentsUpdate);
+        log.info("assignments: {}", assignments);
 
-        Assert.assertEquals(
-                Request.AssignmentsUpdate.newBuilder().setVersion(version + 1)
-                        .addAssignments(assignment1).build(),
-                assignmentsUpdate);
+        Assert.assertEquals(0, send.length);
     }
 
     @Test
@@ -373,9 +373,6 @@ public void testScalingUp() throws Exception {
         currentAssignments.put("worker-1", assignmentEntry1);
         
doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments();
 
-        //set version
-        
doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion();
-
         // single node
         List<WorkerInfo> workerInfoList = new LinkedList<>();
         workerInfoList.add(WorkerInfo.of("worker-1", "workerHostname-1", 
5000));
@@ -386,25 +383,21 @@ public void testScalingUp() throws Exception {
 
         callSchedule();
 
-        List<Invocation> invocations = getMethodInvocationDetails(producer, 
Producer.class.getMethod("sendAsync",
-                Object.class));
+        List<Invocation> invocations = getMethodInvocationDetails(message, 
TypedMessageBuilder.class.getMethod("sendAsync"));
         Assert.assertEquals(invocations.size(), 1);
-
+        invocations = getMethodInvocationDetails(message, 
TypedMessageBuilder.class.getMethod("value",
+                Object.class));
         byte[] send = (byte[]) invocations.get(0).getRawArguments()[0];
-        Request.AssignmentsUpdate assignmentsUpdate = 
Request.AssignmentsUpdate.parseFrom(send);
+        Assignment assignments = Assignment.parseFrom(send);
 
-        log.info("assignmentsUpdate: {}", assignmentsUpdate);
+        log.info("assignments: {}", assignments);
 
         Function.Assignment assignment2 = Function.Assignment.newBuilder()
                 .setWorkerId("worker-1")
                 .setInstance(Function.Instance.newBuilder()
                         
.setFunctionMetaData(function2).setInstanceId(0).build())
                 .build();
-        Assert.assertEquals(
-                assignmentsUpdate,
-                Request.AssignmentsUpdate.newBuilder().setVersion(version + 1)
-                        
.addAssignments(assignment1).addAssignments(assignment2).build()
-                );
+        Assert.assertEquals(assignments, assignment2);
 
         // scale up
 
@@ -435,17 +428,23 @@ public void testScalingUp() throws Exception {
 
         callSchedule();
 
-        invocations = getMethodInvocationDetails(producer, 
Producer.class.getMethod("sendAsync",
+        invocations = getMethodInvocationDetails(message, 
TypedMessageBuilder.class.getMethod("sendAsync"));
+        Assert.assertEquals(invocations.size(), 4);
+        invocations = getMethodInvocationDetails(message, 
TypedMessageBuilder.class.getMethod("value",
                 Object.class));
-        Assert.assertEquals(invocations.size(), 2);
+        
+        Set<Assignment> allAssignments = Sets.newHashSet();
+        invocations.forEach(invocation -> {
+            try {
+                
allAssignments.add(Assignment.parseFrom((byte[])invocation.getRawArguments()[0]));
+            } catch (InvalidProtocolBufferException e) {
+                throw new RuntimeException(e);
+            }
+        });
 
-        send = (byte[]) invocations.get(1).getRawArguments()[0];
-        assignmentsUpdate = Request.AssignmentsUpdate.parseFrom(send);
-        Assert.assertEquals(assignmentsUpdate,
-                Request.AssignmentsUpdate.newBuilder().setVersion(version + 1 
+ 1)
-                        
.addAssignments(assignment1).addAssignments(assignment2Scaled1)
-                        
.addAssignments(assignment2Scaled2).addAssignments(assignment2Scaled3).build()
-                );
+        assertTrue(allAssignments.contains(assignment2Scaled1));
+        assertTrue(allAssignments.contains(assignment2Scaled2));
+        assertTrue(allAssignments.contains(assignment2Scaled3));
     }
 
     @Test
@@ -479,9 +478,6 @@ public void testScalingDown() throws Exception {
         currentAssignments.put("worker-1", assignmentEntry1);
         
doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments();
 
-        //set version
-        
doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion();
-
         // single node
         List<WorkerInfo> workerInfoList = new LinkedList<>();
         workerInfoList.add(WorkerInfo.of("worker-1", "workerHostname-1", 
5000));
@@ -492,14 +488,24 @@ public void testScalingDown() throws Exception {
 
         callSchedule();
 
-        List<Invocation> invocations = getMethodInvocationDetails(producer, 
Producer.class.getMethod("sendAsync",
+        List<Invocation> invocations = getMethodInvocationDetails(message, 
TypedMessageBuilder.class.getMethod("sendAsync"));
+        Assert.assertEquals(invocations.size(), 3);
+        invocations = getMethodInvocationDetails(message, 
TypedMessageBuilder.class.getMethod("value",
                 Object.class));
-        Assert.assertEquals(invocations.size(), 1);
-
         byte[] send = (byte[]) invocations.get(0).getRawArguments()[0];
-        Request.AssignmentsUpdate assignmentsUpdate = 
Request.AssignmentsUpdate.parseFrom(send);
+        Assignment assignments = Assignment.parseFrom(send);
+
+        log.info("assignments: {}", assignments);
+        
+        Set<Assignment> allAssignments = Sets.newHashSet();
+        invocations.forEach(invocation -> {
+            try {
+                
allAssignments.add(Assignment.parseFrom((byte[])invocation.getRawArguments()[0]));
+            } catch (InvalidProtocolBufferException e) {
+                throw new RuntimeException(e);
+            }
+        });
 
-        log.info("assignmentsUpdate: {}", assignmentsUpdate);
 
         Function.Assignment assignment2_1 = Function.Assignment.newBuilder()
                 .setWorkerId("worker-1")
@@ -516,13 +522,11 @@ public void testScalingDown() throws Exception {
                 .setInstance(Function.Instance.newBuilder()
                         
.setFunctionMetaData(function2).setInstanceId(2).build())
                 .build();
-        Assert.assertEquals(
-                assignmentsUpdate,
-                Request.AssignmentsUpdate.newBuilder().setVersion(version + 1)
-                        
.addAssignments(assignment1).addAssignments(assignment2_1)
-                        
.addAssignments(assignment2_2).addAssignments(assignment2_3).build()
-        );
-
+        
+        assertTrue(allAssignments.contains(assignment2_1));
+        assertTrue(allAssignments.contains(assignment2_2));
+        assertTrue(allAssignments.contains(assignment2_3));
+        
         // scale down
 
         Function.FunctionMetaData function2Scaled = 
Function.FunctionMetaData.newBuilder()
@@ -542,17 +546,23 @@ public void testScalingDown() throws Exception {
 
         callSchedule();
 
-        invocations = getMethodInvocationDetails(producer, 
Producer.class.getMethod("sendAsync",
+        invocations = getMethodInvocationDetails(message, 
TypedMessageBuilder.class.getMethod("sendAsync"));
+        Assert.assertEquals(invocations.size(), 4);
+        invocations = getMethodInvocationDetails(message, 
TypedMessageBuilder.class.getMethod("value",
                 Object.class));
-        Assert.assertEquals(invocations.size(), 2);
+        send = (byte[]) invocations.get(0).getRawArguments()[0];
+        assignments = Assignment.parseFrom(send);
+        
+        Set<Assignment> allAssignments2 = Sets.newHashSet();
+        invocations.forEach(invocation -> {
+            try {
+                
allAssignments2.add(Assignment.parseFrom((byte[])invocation.getRawArguments()[0]));
+            } catch (InvalidProtocolBufferException e) {
+                throw new RuntimeException(e);
+            }
+        });
 
-        send = (byte[]) invocations.get(1).getRawArguments()[0];
-        assignmentsUpdate = Request.AssignmentsUpdate.parseFrom(send);
-        Assert.assertEquals(assignmentsUpdate,
-                Request.AssignmentsUpdate.newBuilder().setVersion(version + 1 
+ 1)
-                        
.addAssignments(assignment1).addAssignments(assignment2Scaled)
-                        .build()
-        );
+        assertTrue(allAssignments2.contains(assignment2Scaled));
     }
 
     @Test
@@ -582,9 +592,6 @@ public void testHeartbeatFunction() throws Exception {
         currentAssignments.put("worker-1", assignmentEntry1);
         
doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments();
 
-        // set version
-        
doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion();
-
         List<WorkerInfo> workerInfoList = new LinkedList<>();
         workerInfoList.add(WorkerInfo.of(workerId1, "workerHostname-1", 5000));
         workerInfoList.add(WorkerInfo.of(workerId2, "workerHostname-1", 6000));
@@ -595,20 +602,20 @@ public void testHeartbeatFunction() throws Exception {
 
         callSchedule();
 
-        List<Invocation> invocations = getMethodInvocationDetails(producer,
-                Producer.class.getMethod("sendAsync", Object.class));
-        Assert.assertEquals(invocations.size(), 1);
-
-        byte[] send = (byte[]) invocations.get(0).getRawArguments()[0];
-        Request.AssignmentsUpdate assignmentsUpdate = 
Request.AssignmentsUpdate.parseFrom(send);
-
-        List<Assignment> assignmentList = 
assignmentsUpdate.getAssignmentsList();
-        Assert.assertEquals(assignmentList.size(), 2);
-        for (Assignment assignment : assignmentList) {
-            String functionName = 
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getName();
-            String assignedWorkerId = assignment.getWorkerId();
-            Assert.assertEquals(functionName, assignedWorkerId);
-        }
+        List<Invocation> invocations = getMethodInvocationDetails(message, 
TypedMessageBuilder.class.getMethod("sendAsync"));
+        Assert.assertEquals(invocations.size(), 2);
+        invocations = getMethodInvocationDetails(message, 
TypedMessageBuilder.class.getMethod("value",
+                Object.class));
+        invocations.forEach(invocation -> {
+            try {
+                Assignment assignment = 
Assignment.parseFrom((byte[])invocation.getRawArguments()[0]);
+                String functionName = 
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getName();
+                String assignedWorkerId = assignment.getWorkerId();
+                Assert.assertEquals(functionName, assignedWorkerId);
+            } catch (InvalidProtocolBufferException e) {
+                throw new RuntimeException(e);
+            }
+        });
     }
     
     @Test
@@ -644,9 +651,6 @@ public void testUpdate() throws Exception {
         currentAssignments.put("worker-1", assignmentEntry1);
         
doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments();
 
-        //set version
-        
doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion();
-
         // single node
         List<WorkerInfo> workerInfoList = new LinkedList<>();
         workerInfoList.add(WorkerInfo.of("worker-1", "workerHostname-1", 
5000));
@@ -657,14 +661,14 @@ public void testUpdate() throws Exception {
 
         callSchedule();
 
-        List<Invocation> invocations = getMethodInvocationDetails(producer, 
Producer.class.getMethod("sendAsync",
+        List<Invocation> invocations = getMethodInvocationDetails(message, 
TypedMessageBuilder.class.getMethod("sendAsync"));
+        Assert.assertEquals(invocations.size(), 3);
+        invocations = getMethodInvocationDetails(message, 
TypedMessageBuilder.class.getMethod("value",
                 Object.class));
-        Assert.assertEquals(invocations.size(), 1);
-
         byte[] send = (byte[]) invocations.get(0).getRawArguments()[0];
-        Request.AssignmentsUpdate assignmentsUpdate = 
Request.AssignmentsUpdate.parseFrom(send);
+        Assignment assignments = Assignment.parseFrom(send);
 
-        log.info("assignmentsUpdate: {}", assignmentsUpdate);
+        log.info("assignmentsUpdate: {}", assignments);
 
         Function.Assignment assignment2_1 = Function.Assignment.newBuilder()
                 .setWorkerId("worker-1")
@@ -681,13 +685,27 @@ public void testUpdate() throws Exception {
                 .setInstance(Function.Instance.newBuilder()
                         
.setFunctionMetaData(function2).setInstanceId(2).build())
                 .build();
-        Assert.assertEquals(
-                assignmentsUpdate,
-                Request.AssignmentsUpdate.newBuilder().setVersion(version + 1)
-                        
.addAssignments(assignment1).addAssignments(assignment2_1)
-                        
.addAssignments(assignment2_2).addAssignments(assignment2_3).build()
-        );
-
+        
+        invocations = getMethodInvocationDetails(message, 
TypedMessageBuilder.class.getMethod("sendAsync"));
+        Assert.assertEquals(invocations.size(), 3);
+        invocations = getMethodInvocationDetails(message, 
TypedMessageBuilder.class.getMethod("value",
+                Object.class));
+        send = (byte[]) invocations.get(0).getRawArguments()[0];
+        assignments = Assignment.parseFrom(send);
+        
+        Set<Assignment> allAssignments = Sets.newHashSet();
+        invocations.forEach(invocation -> {
+            try {
+                
allAssignments.add(Assignment.parseFrom((byte[])invocation.getRawArguments()[0]));
+            } catch (InvalidProtocolBufferException e) {
+                throw new RuntimeException(e);
+            }
+        });
+        
+        assertTrue(allAssignments.contains(assignment2_1));
+        assertTrue(allAssignments.contains(assignment2_2));
+        assertTrue(allAssignments.contains(assignment2_3));
+        
         // scale down
 
         Function.FunctionMetaData function2Updated = 
Function.FunctionMetaData.newBuilder()
@@ -718,28 +736,32 @@ public void testUpdate() throws Exception {
 
         callSchedule();
 
-        invocations = getMethodInvocationDetails(producer, 
Producer.class.getMethod("sendAsync",
+        invocations = getMethodInvocationDetails(message, 
TypedMessageBuilder.class.getMethod("sendAsync"));
+        Assert.assertEquals(invocations.size(), 6);
+        invocations = getMethodInvocationDetails(message, 
TypedMessageBuilder.class.getMethod("value",
                 Object.class));
-        Assert.assertEquals(invocations.size(), 2);
-
-        send = (byte[]) invocations.get(1).getRawArguments()[0];
-        assignmentsUpdate = Request.AssignmentsUpdate.parseFrom(send);
-        Assert.assertEquals(assignmentsUpdate,
-                Request.AssignmentsUpdate.newBuilder().setVersion(version + 1 
+ 1)
-                        
.addAssignments(assignment1).addAssignments(assignment2Updated1)
-                        .addAssignments(assignment2Updated2)
-                        .addAssignments(assignment2Updated3)
-                        .build()
-        );
+        send = (byte[]) invocations.get(0).getRawArguments()[0];
+        assignments = Assignment.parseFrom(send);
+        
+        Set<Assignment> allAssignments2 = Sets.newHashSet();
+        invocations.forEach(invocation -> {
+            try {
+                
allAssignments2.add(Assignment.parseFrom((byte[])invocation.getRawArguments()[0]));
+            } catch (InvalidProtocolBufferException e) {
+                throw new RuntimeException(e);
+            }
+        });
+        
+        assertTrue(allAssignments2.contains(assignment2Updated1));
+        assertTrue(allAssignments2.contains(assignment2Updated2));
+        assertTrue(allAssignments2.contains(assignment2Updated3));
     }
 
     private void callSchedule() throws NoSuchMethodException, 
InterruptedException,
             TimeoutException, ExecutionException {
-        long intialVersion = 
functionRuntimeManager.getCurrentAssignmentVersion();
         Future<?> complete = schedulerManager.schedule();
 
         complete.get(30, TimeUnit.SECONDS);
-        doReturn(intialVersion + 
1).when(functionRuntimeManager).getCurrentAssignmentVersion();
     }
 
     private List<Invocation> getMethodInvocationDetails(Object o, Method 
method) throws NoSuchMethodException {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to