This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 672a167  Race condition function-runtime-manager read old assignments 
(#2437)
672a167 is described below

commit 672a167a31c719dab7623770d39666e8d7e7a0fd
Author: Rajan Dhabalia <rdhaba...@apache.org>
AuthorDate: Thu Aug 23 18:34:44 2018 -0700

    Race condition function-runtime-manager read old assignments (#2437)
---
 .../apache/pulsar/client/impl/ConsumerBase.java    |  4 ++++
 .../pulsar/functions/worker/MembershipManager.java | 26 ++++++++++++++++++----
 .../pulsar/functions/worker/WorkerService.java     |  2 +-
 .../functions/worker/MembershipManagerTest.java    | 24 ++++++++++++--------
 4 files changed, 42 insertions(+), 14 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 016324e..a6fda6f 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -335,6 +335,10 @@ public abstract class ConsumerBase<T> extends HandlerState 
implements Consumer<T
         return subscription;
     }
 
+    public String getConsumerName() {
+        return this.consumerName;
+    }
+
     /**
      * Redelivers the given unacknowledged messages. In Failover mode, the 
request is ignored if the consumer is not
      * active for the given topic. In Shared mode, the consumers messages to 
be redelivered are distributed across all
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
index 7994ef3..b18fd12 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
@@ -41,6 +41,7 @@ import org.apache.pulsar.client.api.ConsumerEventListener;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.ConsumerImpl;
 import org.apache.pulsar.common.policies.data.ConsumerStats;
 import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.functions.proto.Function;
@@ -53,7 +54,7 @@ import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
 public class MembershipManager implements AutoCloseable, ConsumerEventListener 
{
 
     private final String consumerName;
-    private final Consumer<byte[]> consumer;
+    private final ConsumerImpl<byte[]> consumer;
     private final WorkerConfig workerConfig;
     private PulsarAdmin pulsarAdminClient;
     private final CompletableFuture<Void> firstConsumerEventFuture;
@@ -68,9 +69,9 @@ public class MembershipManager implements AutoCloseable, 
ConsumerEventListener {
     @VisibleForTesting
     Map<Function.Instance, Long> unsignedFunctionDurations = new HashMap<>();
 
-    MembershipManager(WorkerConfig workerConfig, PulsarClient client)
+    MembershipManager(WorkerService service, PulsarClient client)
             throws PulsarClientException {
-        this.workerConfig = workerConfig;
+        this.workerConfig = service.getWorkerConfig();
         consumerName = String.format(
             "%s:%s:%d",
             workerConfig.getWorkerId(),
@@ -82,13 +83,15 @@ public class MembershipManager implements AutoCloseable, 
ConsumerEventListener {
         // we don't produce any messages into this topic, we only use the 
`failover` subscription
         // to elect an active consumer as the leader worker. The leader worker 
will be responsible
         // for scheduling snapshots for FMT and doing task assignment.
-        consumer = client.newConsumer()
+        consumer = (ConsumerImpl<byte[]>) client.newConsumer()
                 .topic(workerConfig.getClusterCoordinationTopic())
                 .subscriptionName(COORDINATION_TOPIC_SUBSCRIPTION)
                 .subscriptionType(SubscriptionType.Failover)
                 .consumerEventListener(this)
                 .property(WORKER_IDENTIFIER, consumerName)
                 .subscribe();
+        
+        isLeader.set(checkLeader(service, consumer.getConsumerName()));
     }
 
     @Override
@@ -282,4 +285,19 @@ public class MembershipManager implements AutoCloseable, 
ConsumerEventListener {
         return this.pulsarAdminClient;
     }
 
+    private boolean checkLeader(WorkerService service, String consumerName) {
+        try {
+            TopicStats stats = service.getBrokerAdmin().topics()
+                    
.getStats(service.getWorkerConfig().getClusterCoordinationTopic());
+            String activeConsumerName = stats != null
+                    && 
stats.subscriptions.get(COORDINATION_TOPIC_SUBSCRIPTION) != null
+                            ? 
stats.subscriptions.get(COORDINATION_TOPIC_SUBSCRIPTION).activeConsumerName
+                            : null;
+            return consumerName != null && 
consumerName.equalsIgnoreCase(activeConsumerName);
+        } catch (Exception e) {
+            log.warn("Failed to check leader {}", e.getMessage());
+        }
+        return false;
+    }
+    
 }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
index 0850766..7fc0cc9 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
@@ -137,7 +137,7 @@ public class WorkerService {
             this.connectorsManager = new ConnectorsManager(workerConfig);
 
             //create membership manager
-            this.membershipManager = new MembershipManager(this.workerConfig, 
this.client);
+            this.membershipManager = new MembershipManager(this, this.client);
 
             // create function runtime manager
             this.functionRuntimeManager = new FunctionRuntimeManager(
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
index 7ed6aca..2753bf1 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
@@ -45,6 +45,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.ReaderBuilder;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.ConsumerImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.functions.proto.Function;
 import org.mockito.ArgumentMatcher;
@@ -68,7 +69,7 @@ public class MembershipManagerTest {
     public void testConsumerEventListener() throws Exception {
         PulsarClientImpl mockClient = mock(PulsarClientImpl.class);
 
-        Consumer<byte[]> mockConsumer = mock(Consumer.class);
+        ConsumerImpl<byte[]> mockConsumer = mock(ConsumerImpl.class);
         ConsumerBuilder<byte[]> mockConsumerBuilder = 
mock(ConsumerBuilder.class);
 
         
when(mockConsumerBuilder.topic(anyString())).thenReturn(mockConsumerBuilder);
@@ -77,6 +78,8 @@ public class MembershipManagerTest {
         when(mockConsumerBuilder.property(anyString(), 
anyString())).thenReturn(mockConsumerBuilder);
 
         when(mockConsumerBuilder.subscribe()).thenReturn(mockConsumer);
+        WorkerService workerService = mock(WorkerService.class);
+        doReturn(workerConfig).when(workerService).getWorkerConfig();
 
         AtomicReference<ConsumerEventListener> listenerHolder = new 
AtomicReference<>();
         
when(mockConsumerBuilder.consumerEventListener(any(ConsumerEventListener.class))).thenAnswer(invocationOnMock
 -> {
@@ -89,7 +92,7 @@ public class MembershipManagerTest {
 
         when(mockClient.newConsumer()).thenReturn(mockConsumerBuilder);
 
-        MembershipManager membershipManager = spy(new 
MembershipManager(workerConfig, mockClient));
+        MembershipManager membershipManager = spy(new 
MembershipManager(workerService, mockClient));
         assertFalse(membershipManager.isLeader());
         verify(mockClient, times(1))
             .newConsumer();
@@ -104,7 +107,7 @@ public class MembershipManagerTest {
     private static PulsarClient mockPulsarClient() throws 
PulsarClientException {
         PulsarClientImpl mockClient = mock(PulsarClientImpl.class);
 
-        Consumer<byte[]> mockConsumer = mock(Consumer.class);
+        ConsumerImpl<byte[]> mockConsumer = mock(ConsumerImpl.class);
         ConsumerBuilder<byte[]> mockConsumerBuilder = 
mock(ConsumerBuilder.class);
 
         
when(mockConsumerBuilder.topic(anyString())).thenReturn(mockConsumerBuilder);
@@ -124,7 +127,7 @@ public class MembershipManagerTest {
     @Test
     public void testCheckFailuresNoFailures() throws Exception {
         SchedulerManager schedulerManager = mock(SchedulerManager.class);
-        PulsarClient pulsarClient = mock(PulsarClient.class);
+        PulsarClient pulsarClient = mockPulsarClient();
         ReaderBuilder<byte[]> readerBuilder = mock(ReaderBuilder.class);
         doReturn(readerBuilder).when(pulsarClient).newReader();
         doReturn(readerBuilder).when(readerBuilder).topic(anyString());
@@ -132,6 +135,7 @@ public class MembershipManagerTest {
         doReturn(mock(Reader.class)).when(readerBuilder).create();
         WorkerService workerService = mock(WorkerService.class);
         doReturn(pulsarClient).when(workerService).getClient();
+        doReturn(workerConfig).when(workerService).getWorkerConfig();
         
doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();
         
         FunctionRuntimeManager functionRuntimeManager = spy(new 
FunctionRuntimeManager(
@@ -142,7 +146,7 @@ public class MembershipManagerTest {
                 mock(ConnectorsManager.class)
         ));
         FunctionMetaDataManager functionMetaDataManager = 
mock(FunctionMetaDataManager.class);
-        MembershipManager membershipManager = spy(new 
MembershipManager(workerConfig, mockPulsarClient()));
+        MembershipManager membershipManager = spy(new 
MembershipManager(workerService, pulsarClient));
 
         List<WorkerInfo> workerInfoList = new LinkedList<>();
         workerInfoList.add(WorkerInfo.of("worker-1", "host-1", 8000));
@@ -193,7 +197,7 @@ public class MembershipManagerTest {
     public void testCheckFailuresSomeFailures() throws Exception {
         workerConfig.setRescheduleTimeoutMs(30000);
         SchedulerManager schedulerManager = mock(SchedulerManager.class);
-        PulsarClient pulsarClient = mock(PulsarClient.class);
+        PulsarClient pulsarClient = mockPulsarClient();
         ReaderBuilder<byte[]> readerBuilder = mock(ReaderBuilder.class);
         doReturn(readerBuilder).when(pulsarClient).newReader();
         doReturn(readerBuilder).when(readerBuilder).topic(anyString());
@@ -201,6 +205,7 @@ public class MembershipManagerTest {
         doReturn(mock(Reader.class)).when(readerBuilder).create();
         WorkerService workerService = mock(WorkerService.class);
         doReturn(pulsarClient).when(workerService).getClient();
+        doReturn(workerConfig).when(workerService).getWorkerConfig();
         
doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();
         
         FunctionRuntimeManager functionRuntimeManager = spy(new 
FunctionRuntimeManager(
@@ -212,7 +217,7 @@ public class MembershipManagerTest {
         ));
 
         FunctionMetaDataManager functionMetaDataManager = 
mock(FunctionMetaDataManager.class);
-        MembershipManager membershipManager = spy(new 
MembershipManager(workerConfig, mockPulsarClient()));
+        MembershipManager membershipManager = spy(new 
MembershipManager(workerService, mockPulsarClient()));
 
         List<WorkerInfo> workerInfoList = new LinkedList<>();
         workerInfoList.add(WorkerInfo.of("worker-1", "host-1", 8000));
@@ -287,7 +292,7 @@ public class MembershipManagerTest {
         workerConfig.setStateStorageServiceUrl("foo");
         workerConfig.setRescheduleTimeoutMs(30000);
         SchedulerManager schedulerManager = mock(SchedulerManager.class);
-        PulsarClient pulsarClient = mock(PulsarClient.class);
+        PulsarClient pulsarClient = mockPulsarClient();
         ReaderBuilder<byte[]> readerBuilder = mock(ReaderBuilder.class);
         doReturn(readerBuilder).when(pulsarClient).newReader();
         doReturn(readerBuilder).when(readerBuilder).topic(anyString());
@@ -295,6 +300,7 @@ public class MembershipManagerTest {
         doReturn(mock(Reader.class)).when(readerBuilder).create();
         WorkerService workerService = mock(WorkerService.class);
         doReturn(pulsarClient).when(workerService).getClient();
+        doReturn(workerConfig).when(workerService).getWorkerConfig();
         
doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();
         
         FunctionRuntimeManager functionRuntimeManager = spy(new 
FunctionRuntimeManager(
@@ -305,7 +311,7 @@ public class MembershipManagerTest {
                 mock(ConnectorsManager.class)
         ));
         FunctionMetaDataManager functionMetaDataManager = 
mock(FunctionMetaDataManager.class);
-        MembershipManager membershipManager = spy(new 
MembershipManager(workerConfig, mockPulsarClient()));
+        MembershipManager membershipManager = spy(new 
MembershipManager(workerService, mockPulsarClient()));
 
         List<WorkerInfo> workerInfoList = new LinkedList<>();
         workerInfoList.add(WorkerInfo.of("worker-1", "host-1", 8000));

Reply via email to