This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new a1a1abe add initialize routine to FunctionRuntimeManager (#2784) a1a1abe is described below commit a1a1abed57f108a1e4be3c76abe69e61da8fd619 Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com> AuthorDate: Fri Oct 12 09:41:41 2018 -0500 add initialize routine to FunctionRuntimeManager (#2784) --- .../functions/worker/FunctionAssignmentTailer.java | 16 +-- .../functions/worker/FunctionMetaDataManager.java | 4 +- .../functions/worker/FunctionRuntimeManager.java | 120 +++++++++++++------ .../pulsar/functions/worker/WorkerService.java | 3 + .../worker/FunctionRuntimeManagerTest.java | 132 ++++++++++++++++++++- 5 files changed, 229 insertions(+), 46 deletions(-) diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java index 3ad6c7c..3f8bec3 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java @@ -36,13 +36,11 @@ public class FunctionAssignmentTailer private final FunctionRuntimeManager functionRuntimeManager; private final Reader<byte[]> reader; - public FunctionAssignmentTailer(FunctionRuntimeManager functionRuntimeManager) + public FunctionAssignmentTailer(FunctionRuntimeManager functionRuntimeManager, Reader<byte[]> reader) throws PulsarClientException { this.functionRuntimeManager = functionRuntimeManager; - this.reader = functionRuntimeManager.getWorkerService().getClient().newReader() - .topic(functionRuntimeManager.getWorkerConfig().getFunctionAssignmentTopic()).readCompacted(true) - .startMessageId(MessageId.earliest).create(); + this.reader = reader; } public void start() { @@ -66,8 +64,7 @@ public class FunctionAssignmentTailer log.info("Stopped function state consumer"); } - @Override - public void accept(Message<byte[]> msg) { + public void processAssignment(Message<byte[]> msg) { if(msg.getData()==null || (msg.getData().length==0)) { log.info("Received assignment delete: {}", msg.getKey()); this.functionRuntimeManager.deleteAssignment(msg.getKey()); @@ -82,8 +79,13 @@ public class FunctionAssignmentTailer throw new RuntimeException(e); } log.info("Received assignment update: {}", assignment); - this.functionRuntimeManager.processAssignment(assignment); + this.functionRuntimeManager.processAssignment(assignment); } + } + + @Override + public void accept(Message<byte[]> msg) { + processAssignment(msg); // receive next request receiveOne(); } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java index 44ff807..4faed11 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java @@ -82,13 +82,11 @@ public class FunctionMetaDataManager implements AutoCloseable { /** * Initializes the FunctionMetaDataManager. Does the following: - * 1. Restores from snapshot if one exists - * 2. Sends out initialize marker to FMT and consume messages until the initialize marker is consumed + * 1. Consume all existing function meta data upon start to establish existing state */ public void initialize() { log.info("/** Initializing Function Metadata Manager **/"); try { - Reader<byte[]> reader = pulsarClient.newReader() .topic(this.workerConfig.getFunctionMetadataTopic()) .startMessageId(MessageId.earliest) diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java index aa843ca..ee57659 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java @@ -36,10 +36,13 @@ import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; import javax.ws.rs.core.UriBuilder; +import lombok.Setter; import org.apache.commons.lang3.StringUtils; import org.apache.distributedlog.api.namespace.Namespace; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.common.policies.data.ErrorData; import org.apache.pulsar.functions.instance.AuthenticationConfig; import org.apache.pulsar.functions.proto.Function.Assignment; @@ -65,6 +68,8 @@ public class FunctionRuntimeManager implements AutoCloseable{ // All the runtime info related to functions executed by this worker // Fully Qualified InstanceId - > FunctionRuntimeInfo + // NOTE: please use setFunctionRuntimeInfo and deleteFunctionRuntimeInfo methods to modify this data structure + // Since during initialization phase nothing should be modified @VisibleForTesting Map<String, FunctionRuntimeInfo> functionRuntimeInfoMap = new ConcurrentHashMap<>(); @@ -75,7 +80,7 @@ public class FunctionRuntimeManager implements AutoCloseable{ @VisibleForTesting LinkedBlockingQueue<FunctionAction> actionQueue; - private final FunctionAssignmentTailer functionAssignmentTailer; + private FunctionAssignmentTailer functionAssignmentTailer; private FunctionActioner functionActioner; @@ -89,14 +94,16 @@ public class FunctionRuntimeManager implements AutoCloseable{ @Getter private WorkerService workerService; + @Setter + @Getter + boolean isInitializePhase = false; + public FunctionRuntimeManager(WorkerConfig workerConfig, WorkerService workerService, Namespace dlogNamespace, MembershipManager membershipManager, ConnectorsManager connectorsManager) throws Exception { this.workerConfig = workerConfig; this.workerService = workerService; this.functionAdmin = workerService.getFunctionAdmin(); - this.functionAssignmentTailer = new FunctionAssignmentTailer(this); - AuthenticationConfig authConfig = AuthenticationConfig.builder() .clientAuthenticationPlugin(workerConfig.getClientAuthenticationPlugin()) .clientAuthenticationParameters(workerConfig.getClientAuthenticationParameters()) @@ -145,6 +152,41 @@ public class FunctionRuntimeManager implements AutoCloseable{ } /** + * Initializes the FunctionRuntimeManager. Does the following: + * 1. Consume all existing assignments to establish existing/latest set of assignments + * 2. After current assignments are read, assignments belonging to this worker will be processed + */ + public void initialize() { + log.info("/** Initializing Runtime Manager **/"); + try { + Reader<byte[]> reader = this.getWorkerService().getClient().newReader() + .topic(this.getWorkerConfig().getFunctionAssignmentTopic()).readCompacted(true) + .startMessageId(MessageId.earliest).create(); + + this.functionAssignmentTailer = new FunctionAssignmentTailer(this, reader); + // read all existing messages + this.setInitializePhase(true); + while (reader.hasMessageAvailable()) { + this.functionAssignmentTailer.processAssignment(reader.readNext()); + } + this.setInitializePhase(false); + // realize existing assignments + Map<String, Assignment> assignmentMap = workerIdToAssignments.get(this.workerConfig.getWorkerId()); + if (assignmentMap != null) { + for (Assignment assignment : assignmentMap.values()) { + startFunctionInstance(assignment); + } + } + // start assignment tailer + this.functionAssignmentTailer.start(); + + } catch (Exception e) { + log.error("Failed to initialize function runtime manager: ", e.getMessage(), e); + throw new RuntimeException(e); + } + } + + /** * Starts the function runtime manager */ public void start() { @@ -623,27 +665,29 @@ public class FunctionRuntimeManager implements AutoCloseable{ } private void addAssignment(Assignment assignment) { - String fullyQualifiedInstanceId = Utils.getFullyQualifiedInstanceId(assignment.getInstance()); - //add new function this.setAssignment(assignment); //Assigned to me if (assignment.getWorkerId().equals(workerConfig.getWorkerId())) { - if (!this.functionRuntimeInfoMap.containsKey(fullyQualifiedInstanceId)) { - this.setFunctionRuntimeInfo(fullyQualifiedInstanceId, new FunctionRuntimeInfo() - .setFunctionInstance(assignment.getInstance())); + startFunctionInstance(assignment); + } + } - } else { - //Somehow this function is already started - log.warn("Function {} already running. Going to restart function.", - this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId)); - this.insertStopAction(this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId)); - } - FunctionRuntimeInfo functionRuntimeInfo = this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId); - this.insertStartAction(functionRuntimeInfo); + private void startFunctionInstance(Assignment assignment) { + String fullyQualifiedInstanceId = Utils.getFullyQualifiedInstanceId(assignment.getInstance()); + if (!this.functionRuntimeInfoMap.containsKey(fullyQualifiedInstanceId)) { + this.setFunctionRuntimeInfo(fullyQualifiedInstanceId, new FunctionRuntimeInfo() + .setFunctionInstance(assignment.getInstance())); + + } else { + //Somehow this function is already started + log.warn("Function {} already running. Going to restart function.", + this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId)); + this.insertStopAction(this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId)); } - + FunctionRuntimeInfo functionRuntimeInfo = this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId); + this.insertStartAction(functionRuntimeInfo); } public Map<String, FunctionRuntimeInfo> getFunctionRuntimeInfos() { @@ -675,26 +719,29 @@ public class FunctionRuntimeManager implements AutoCloseable{ @VisibleForTesting void insertStopAction(FunctionRuntimeInfo functionRuntimeInfo) { - FunctionAction functionAction = new FunctionAction(); - functionAction.setAction(FunctionAction.Action.STOP); - functionAction.setFunctionRuntimeInfo(functionRuntimeInfo); - try { - actionQueue.put(functionAction); - } catch (InterruptedException ex) { - throw new RuntimeException("Interrupted while putting action"); + if (!this.isInitializePhase) { + FunctionAction functionAction = new FunctionAction(); + functionAction.setAction(FunctionAction.Action.STOP); + functionAction.setFunctionRuntimeInfo(functionRuntimeInfo); + try { + actionQueue.put(functionAction); + } catch (InterruptedException ex) { + throw new RuntimeException("Interrupted while putting action"); + } } - } @VisibleForTesting void insertStartAction(FunctionRuntimeInfo functionRuntimeInfo) { - FunctionAction functionAction = new FunctionAction(); - functionAction.setAction(FunctionAction.Action.START); - functionAction.setFunctionRuntimeInfo(functionRuntimeInfo); - try { - actionQueue.put(functionAction); - } catch (InterruptedException ex) { - throw new RuntimeException("Interrupted while putting action"); + if (!this.isInitializePhase) { + FunctionAction functionAction = new FunctionAction(); + functionAction.setAction(FunctionAction.Action.START); + functionAction.setFunctionRuntimeInfo(functionRuntimeInfo); + try { + actionQueue.put(functionAction); + } catch (InterruptedException ex) { + throw new RuntimeException("Interrupted while putting action"); + } } } @@ -731,11 +778,16 @@ public class FunctionRuntimeManager implements AutoCloseable{ } private void deleteFunctionRuntimeInfo(String fullyQualifiedInstanceId) { - this.functionRuntimeInfoMap.remove(fullyQualifiedInstanceId); + if (!this.isInitializePhase) { + this.functionRuntimeInfoMap.remove(fullyQualifiedInstanceId); + } } private void setFunctionRuntimeInfo(String fullyQualifiedInstanceId, FunctionRuntimeInfo functionRuntimeInfo) { - this.functionRuntimeInfoMap.put(fullyQualifiedInstanceId, functionRuntimeInfo); + // Don't modify Function Runtime Infos when initializing + if (!this.isInitializePhase) { + this.functionRuntimeInfoMap.put(fullyQualifiedInstanceId, functionRuntimeInfo); + } } @Override diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java index 488bcd7..2b9c632 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java @@ -150,6 +150,9 @@ public class WorkerService { this.functionRuntimeManager = new FunctionRuntimeManager( this.workerConfig, this, this.dlogNamespace, this.membershipManager, connectorsManager); + // initialize function runtime manager + this.functionRuntimeManager.initialize(); + // Setting references to managers in scheduler this.schedulerManager.setFunctionMetaDataManager(this.functionMetaDataManager); this.schedulerManager.setFunctionRuntimeManager(this.functionRuntimeManager); diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java index 5e1ed02..490be77 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java @@ -18,23 +18,31 @@ */ package org.apache.pulsar.functions.worker; +import io.netty.buffer.Unpooled; +import lombok.extern.slf4j.Slf4j; import org.apache.distributedlog.api.namespace.Namespace; import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.api.ReaderBuilder; +import org.apache.pulsar.client.impl.MessageImpl; +import org.apache.pulsar.functions.metrics.MetricsSink; import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.InstanceCommunication; -import org.apache.pulsar.functions.proto.Request; -import org.apache.pulsar.functions.metrics.MetricsSink; import org.mockito.ArgumentMatcher; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import org.testng.Assert; import org.testng.annotations.Test; +import java.util.HashMap; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyBoolean; @@ -46,7 +54,9 @@ import static org.mockito.Mockito.reset; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +@Slf4j public class FunctionRuntimeManagerTest { public static class TestSink implements MetricsSink { @@ -384,4 +394,122 @@ public class FunctionRuntimeManagerTest { Assert.assertEquals(functionRuntimeManager.workerIdToAssignments .get("worker-1").get("test-tenant/test-namespace/func-2:0"), assignment3); } + + @Test + public void testRuntimeManagerInitialize() throws Exception { + WorkerConfig workerConfig = new WorkerConfig(); + workerConfig.setWorkerId("worker-1"); + workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("test")); + workerConfig.setPulsarServiceUrl("pulsar://localhost:6650"); + workerConfig.setStateStorageServiceUrl("foo"); + workerConfig.setFunctionAssignmentTopicName("assignments"); + + Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder().setFunctionDetails( + Function.FunctionDetails.newBuilder() + .setTenant("test-tenant").setNamespace("test-namespace").setName("func-1")).build(); + + Function.FunctionMetaData function2 = Function.FunctionMetaData.newBuilder().setFunctionDetails( + Function.FunctionDetails.newBuilder() + .setTenant("test-tenant").setNamespace("test-namespace").setName("func-2")).build(); + + Function.Assignment assignment1 = Function.Assignment.newBuilder() + .setWorkerId("worker-1") + .setInstance(Function.Instance.newBuilder() + .setFunctionMetaData(function1).setInstanceId(0).build()) + .build(); + Function.Assignment assignment2 = Function.Assignment.newBuilder() + .setWorkerId("worker-1") + .setInstance(Function.Instance.newBuilder() + .setFunctionMetaData(function2).setInstanceId(0).build()) + .build(); + + Function.Assignment assignment3 = Function.Assignment.newBuilder() + .setWorkerId("worker-1") + .setInstance(Function.Instance.newBuilder() + .setFunctionMetaData(function2).setInstanceId(0).build()) + .build(); + + List<Message<byte[]>> messageList = new LinkedList<>(); + Message message1 = spy(new MessageImpl("foo", MessageId.latest.toString(), + new HashMap<>(), Unpooled.copiedBuffer(assignment1.toByteArray()), null)); + doReturn(Utils.getFullyQualifiedInstanceId(assignment1.getInstance())).when(message1).getKey(); + + Message message2 = spy(new MessageImpl("foo", MessageId.latest.toString(), + new HashMap<>(), Unpooled.copiedBuffer(assignment2.toByteArray()), null)); + doReturn(Utils.getFullyQualifiedInstanceId(assignment2.getInstance())).when(message2).getKey(); + + // delete function2 + Message message3 = spy(new MessageImpl("foo", MessageId.latest.toString(), + new HashMap<>(), Unpooled.copiedBuffer("".getBytes()), null)); + doReturn(Utils.getFullyQualifiedInstanceId(assignment3.getInstance())).when(message3).getKey(); + + messageList.add(message1); + messageList.add(message2); + messageList.add(message3); + + PulsarClient pulsarClient = mock(PulsarClient.class); + + Reader<byte[]> reader = mock(Reader.class); + + Iterator<Message<byte[]>> it = messageList.iterator(); + + when(reader.readNext()).thenAnswer(new Answer<Message<byte[]>>() { + @Override + public Message<byte[]> answer(InvocationOnMock invocationOnMock) throws Throwable { + return it.next(); + } + }); + + when(reader.readNextAsync()).thenAnswer(new Answer<CompletableFuture<Message<byte[]>>>() { + @Override + public CompletableFuture<Message<byte[]>> answer(InvocationOnMock invocationOnMock) throws Throwable { + return new CompletableFuture<>(); + } + }); + + + when(reader.hasMessageAvailable()).thenAnswer(new Answer<Boolean>() { + @Override + public Boolean answer(InvocationOnMock invocationOnMock) throws Throwable { + return it.hasNext(); + } + }); + + + + ReaderBuilder readerBuilder = mock(ReaderBuilder.class); + doReturn(readerBuilder).when(pulsarClient).newReader(); + doReturn(readerBuilder).when(readerBuilder).topic(anyString()); + doReturn(readerBuilder).when(readerBuilder).startMessageId(any()); + doReturn(readerBuilder).when(readerBuilder).startMessageId(any()); + doReturn(readerBuilder).when(readerBuilder).readCompacted(anyBoolean()); + + doReturn(reader).when(readerBuilder).create(); + WorkerService workerService = mock(WorkerService.class); + doReturn(pulsarClient).when(workerService).getClient(); + doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin(); + + // test new assignment add functions + FunctionRuntimeManager functionRuntimeManager = spy(new FunctionRuntimeManager( + workerConfig, + workerService, + mock(Namespace.class), + mock(MembershipManager.class), + mock(ConnectorsManager.class) + )); + + + functionRuntimeManager.initialize(); + + Assert.assertEquals(functionRuntimeManager.workerIdToAssignments.size(), 1); + log.info("actionQueue: {}", functionRuntimeManager.actionQueue); + Assert.assertEquals(functionRuntimeManager.actionQueue.size(), 1); + + FunctionAction functionAction = functionRuntimeManager.actionQueue.poll(); + + // only actually start function1 + Assert.assertEquals(functionAction.getAction(), FunctionAction.Action.START); + Assert.assertEquals(functionAction.getFunctionRuntimeInfo().getFunctionInstance(), assignment1.getInstance()); + + } }