[GitHub] merlimat commented on a change in pull request #1156: Introduce ActiveConsumerListener for realizing if a consumer is active in a failover subscription group

2018-02-13 Thread GitBox
merlimat commented on a change in pull request #1156: Introduce 
ActiveConsumerListener for realizing if a consumer is active in a failover 
subscription group
URL: https://github.com/apache/incubator-pulsar/pull/1156#discussion_r168085822
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/api/ActiveConsumerListener.java
 ##
 @@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+/**
+ * Listener on the consumer state changes.
+ */
+public interface ActiveConsumerListener {
 
 Review comment:
   Sure, that makes sense. 
   
   One other thing that I was thinking is that we could have this listener to 
be more general oriented, so that we could in future reuse it for other kinds 
of notifications. 
   
   Something like `ConsumerEventsListener` so that is more neutral? What do you 
think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on a change in pull request #1156: Introduce ActiveConsumerListener for realizing if a consumer is active in a failover subscription group

2018-02-13 Thread GitBox
merlimat commented on a change in pull request #1156: Introduce 
ActiveConsumerListener for realizing if a consumer is active in a failover 
subscription group
URL: https://github.com/apache/incubator-pulsar/pull/1156#discussion_r168085850
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
 ##
 @@ -127,6 +129,33 @@ public ConsumerConfiguration 
setMessageListener(MessageListener messageListener)
 return this;
 }
 
+/**
+ * @return this configured {@link ActiveConsumerListener} for the consumer.
+ * @see #setActiveConsumerListener(ActiveConsumerListener)
+ * @since 1.22.0
 
 Review comment:
   since 2.0


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on a change in pull request #1156: Introduce ActiveConsumerListener for realizing if a consumer is active in a failover subscription group

2018-02-10 Thread GitBox
merlimat commented on a change in pull request #1156: Introduce 
ActiveConsumerListener for realizing if a consumer is active in a failover 
subscription group
URL: https://github.com/apache/incubator-pulsar/pull/1156#discussion_r167413466
 
 

 ##
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
 ##
 @@ -108,8 +122,15 @@ public synchronized void addConsumer(Consumer consumer) 
throws BrokerServiceExce
 
 consumers.add(consumer);
 
-// Pick an active consumer and start it
-pickAndScheduleActiveConsumer();
+if (!pickAndScheduleActiveConsumer()) {
+// the active consumer is not changed
+Consumer currentActiveConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
+if (null == currentActiveConsumer) {
+log.warn("Current active consumer disappears while adding 
consumer {}", consumer);
 
 Review comment:
   This should be a debug log


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on a change in pull request #1156: Introduce ActiveConsumerListener for realizing if a consumer is active in a failover subscription group

2018-02-10 Thread GitBox
merlimat commented on a change in pull request #1156: Introduce 
ActiveConsumerListener for realizing if a consumer is active in a failover 
subscription group
URL: https://github.com/apache/incubator-pulsar/pull/1156#discussion_r167412905
 
 

 ##
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
 ##
 @@ -145,6 +146,19 @@ public String consumerName() {
 return consumerName;
 }
 
+void notifyConsumerGroupChange(long activeConsumerId) {
+if (cnx.getRemoteEndpointProtocolVersion() < 
ProtocolVersion.v11.getNumber()) {
 
 Review comment:
   I was thinking here that we could have more descriptive methods in 
`Commands` class to group all these feature checks in a single place. Eg: 
   
   ```java
   if (!Commands.peerSupportsActiveConsumerListener(cnx)) {
   return;
   }
   ...
   ```
   
   (I know in other places we're checking the version like this, and we should 
also change separately from this PR)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on a change in pull request #1156: Introduce ActiveConsumerListener for realizing if a consumer is active in a failover subscription group

2018-02-10 Thread GitBox
merlimat commented on a change in pull request #1156: Introduce 
ActiveConsumerListener for realizing if a consumer is active in a failover 
subscription group
URL: https://github.com/apache/incubator-pulsar/pull/1156#discussion_r167412855
 
 

 ##
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
 ##
 @@ -145,6 +146,19 @@ public String consumerName() {
 return consumerName;
 }
 
+void notifyConsumerGroupChange(long activeConsumerId) {
+if (cnx.getRemoteEndpointProtocolVersion() < 
ProtocolVersion.v11.getNumber()) {
+// if the client is older than `v11`, we don't need to send 
consumer group changes.
+return;
+}
+
+if (log.isDebugEnabled()) {
+log.debug("notify consumer {} - that [{}] for subscription {} has 
new active consumer : {}",
+consumerId, topicName, subscription.getName(), 
activeConsumerId);
+}
+cnx.ctx().write(Commands.newConsumerGroupChange(consumerId, 
activeConsumerId));
 
 Review comment:
   Use `writeAndFlush()`, otherwise the message won't be written on the socket 
until and unrelated flush is triggered.
   
   Also, pass `cnx.ctx().voidPromise()` as the 2nd parameter to avoid creating 
the write promise since we don't care here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on a change in pull request #1156: Introduce ActiveConsumerListener for realizing if a consumer is active in a failover subscription group

2018-02-10 Thread GitBox
merlimat commented on a change in pull request #1156: Introduce 
ActiveConsumerListener for realizing if a consumer is active in a failover 
subscription group
URL: https://github.com/apache/incubator-pulsar/pull/1156#discussion_r167413350
 
 

 ##
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
 ##
 @@ -81,20 +82,33 @@ public AbstractDispatcherSingleActiveConsumer(SubType 
subscriptionType, int part
 
 protected abstract void cancelPendingRead();
 
-protected void pickAndScheduleActiveConsumer() {
+protected void notifyConsumerGroupChanged(Consumer activeConsumer) {
+if (null != activeConsumer && subscriptionType == SubType.Failover) {
+consumers.forEach(consumer ->
 
 Review comment:
   We should try to avoid to send multiple "inactive" notifications to a 
consumer. 
   If I was already "inactive", I don't need to be notified when a new consumer 
is promoted and I'm not involved.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on a change in pull request #1156: Introduce ActiveConsumerListener for realizing if a consumer is active in a failover subscription group

2018-02-10 Thread GitBox
merlimat commented on a change in pull request #1156: Introduce 
ActiveConsumerListener for realizing if a consumer is active in a failover 
subscription group
URL: https://github.com/apache/incubator-pulsar/pull/1156#discussion_r167413443
 
 

 ##
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
 ##
 @@ -81,20 +82,33 @@ public AbstractDispatcherSingleActiveConsumer(SubType 
subscriptionType, int part
 
 protected abstract void cancelPendingRead();
 
-protected void pickAndScheduleActiveConsumer() {
+protected void notifyConsumerGroupChanged(Consumer activeConsumer) {
+if (null != activeConsumer && subscriptionType == SubType.Failover) {
+consumers.forEach(consumer ->
+
consumer.notifyConsumerGroupChange(activeConsumer.consumerId()));
 
 Review comment:
   The  `consumerId` is a per-connection identifier (unique within a single TCP 
connection), but different connections from different clients can very well use 
the same Id. (The id is just a shortcut to refer to a particular 
topic/subscription without passing the names each time). 
   
   For consumers on a single subscription, there is no single unique 
identifier. `consumerName` is generally a random generated string, though there 
is collision detection (since collisions wont affect the correctness) and it 
can be manually overridden.
   
   I think the only way is to use the Consumer object identify.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on a change in pull request #1156: Introduce ActiveConsumerListener for realizing if a consumer is active in a failover subscription group

2018-02-10 Thread GitBox
merlimat commented on a change in pull request #1156: Introduce 
ActiveConsumerListener for realizing if a consumer is active in a failover 
subscription group
URL: https://github.com/apache/incubator-pulsar/pull/1156#discussion_r167413003
 
 

 ##
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
 ##
 @@ -145,6 +146,19 @@ public String consumerName() {
 return consumerName;
 }
 
+void notifyConsumerGroupChange(long activeConsumerId) {
+if (cnx.getRemoteEndpointProtocolVersion() < 
ProtocolVersion.v11.getNumber()) {
 
 Review comment:
   One thing we need to be careful here is that C++ code gets compiled and 
automatically will present itself with the latest version v11 (or whatever). 
   
   In this case, we should make sure that C++ client can at least discard the 
notification


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on a change in pull request #1156: Introduce ActiveConsumerListener for realizing if a consumer is active in a failover subscription group

2018-02-10 Thread GitBox
merlimat commented on a change in pull request #1156: Introduce 
ActiveConsumerListener for realizing if a consumer is active in a failover 
subscription group
URL: https://github.com/apache/incubator-pulsar/pull/1156#discussion_r167413190
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
 ##
 @@ -735,6 +735,20 @@ private void failPendingReceive() {
 }
 }
 
+void consumerGroupChanged(long activeConsumerId) {
 
 Review comment:
   `consumerGroupChanged()` --> `activeConsumerChanged()`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on a change in pull request #1156: Introduce ActiveConsumerListener for realizing if a consumer is active in a failover subscription group

2018-02-10 Thread GitBox
merlimat commented on a change in pull request #1156: Introduce 
ActiveConsumerListener for realizing if a consumer is active in a failover 
subscription group
URL: https://github.com/apache/incubator-pulsar/pull/1156#discussion_r167413168
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/api/ActiveConsumerListener.java
 ##
 @@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+/**
+ * Listener on the consumer state changes.
+ */
+public interface ActiveConsumerListener {
 
 Review comment:
   If we use a single method, it becomes accessible through a lambda. Eg: 
   
   ```java
   interface ActiveConsumerListener {
  void statusChanged(Consumer, int partitionId, Status status);
   }
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services