[GitHub] merlimat commented on a change in pull request #1156: Introduce ActiveConsumerListener for realizing if a consumer is active in a failover subscription group
merlimat commented on a change in pull request #1156: Introduce ActiveConsumerListener for realizing if a consumer is active in a failover subscription group URL: https://github.com/apache/incubator-pulsar/pull/1156#discussion_r168085822 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/api/ActiveConsumerListener.java ## @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +/** + * Listener on the consumer state changes. + */ +public interface ActiveConsumerListener { Review comment: Sure, that makes sense. One other thing that I was thinking is that we could have this listener to be more general oriented, so that we could in future reuse it for other kinds of notifications. Something like `ConsumerEventsListener` so that is more neutral? What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on a change in pull request #1156: Introduce ActiveConsumerListener for realizing if a consumer is active in a failover subscription group
merlimat commented on a change in pull request #1156: Introduce ActiveConsumerListener for realizing if a consumer is active in a failover subscription group URL: https://github.com/apache/incubator-pulsar/pull/1156#discussion_r168085850 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java ## @@ -127,6 +129,33 @@ public ConsumerConfiguration setMessageListener(MessageListener messageListener) return this; } +/** + * @return this configured {@link ActiveConsumerListener} for the consumer. + * @see #setActiveConsumerListener(ActiveConsumerListener) + * @since 1.22.0 Review comment: since 2.0 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on a change in pull request #1156: Introduce ActiveConsumerListener for realizing if a consumer is active in a failover subscription group
merlimat commented on a change in pull request #1156: Introduce ActiveConsumerListener for realizing if a consumer is active in a failover subscription group URL: https://github.com/apache/incubator-pulsar/pull/1156#discussion_r167413466 ## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java ## @@ -108,8 +122,15 @@ public synchronized void addConsumer(Consumer consumer) throws BrokerServiceExce consumers.add(consumer); -// Pick an active consumer and start it -pickAndScheduleActiveConsumer(); +if (!pickAndScheduleActiveConsumer()) { +// the active consumer is not changed +Consumer currentActiveConsumer = ACTIVE_CONSUMER_UPDATER.get(this); +if (null == currentActiveConsumer) { +log.warn("Current active consumer disappears while adding consumer {}", consumer); Review comment: This should be a debug log This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on a change in pull request #1156: Introduce ActiveConsumerListener for realizing if a consumer is active in a failover subscription group
merlimat commented on a change in pull request #1156: Introduce ActiveConsumerListener for realizing if a consumer is active in a failover subscription group URL: https://github.com/apache/incubator-pulsar/pull/1156#discussion_r167412905 ## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java ## @@ -145,6 +146,19 @@ public String consumerName() { return consumerName; } +void notifyConsumerGroupChange(long activeConsumerId) { +if (cnx.getRemoteEndpointProtocolVersion() < ProtocolVersion.v11.getNumber()) { Review comment: I was thinking here that we could have more descriptive methods in `Commands` class to group all these feature checks in a single place. Eg: ```java if (!Commands.peerSupportsActiveConsumerListener(cnx)) { return; } ... ``` (I know in other places we're checking the version like this, and we should also change separately from this PR) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on a change in pull request #1156: Introduce ActiveConsumerListener for realizing if a consumer is active in a failover subscription group
merlimat commented on a change in pull request #1156: Introduce ActiveConsumerListener for realizing if a consumer is active in a failover subscription group URL: https://github.com/apache/incubator-pulsar/pull/1156#discussion_r167412855 ## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java ## @@ -145,6 +146,19 @@ public String consumerName() { return consumerName; } +void notifyConsumerGroupChange(long activeConsumerId) { +if (cnx.getRemoteEndpointProtocolVersion() < ProtocolVersion.v11.getNumber()) { +// if the client is older than `v11`, we don't need to send consumer group changes. +return; +} + +if (log.isDebugEnabled()) { +log.debug("notify consumer {} - that [{}] for subscription {} has new active consumer : {}", +consumerId, topicName, subscription.getName(), activeConsumerId); +} +cnx.ctx().write(Commands.newConsumerGroupChange(consumerId, activeConsumerId)); Review comment: Use `writeAndFlush()`, otherwise the message won't be written on the socket until and unrelated flush is triggered. Also, pass `cnx.ctx().voidPromise()` as the 2nd parameter to avoid creating the write promise since we don't care here. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on a change in pull request #1156: Introduce ActiveConsumerListener for realizing if a consumer is active in a failover subscription group
merlimat commented on a change in pull request #1156: Introduce ActiveConsumerListener for realizing if a consumer is active in a failover subscription group URL: https://github.com/apache/incubator-pulsar/pull/1156#discussion_r167413350 ## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java ## @@ -81,20 +82,33 @@ public AbstractDispatcherSingleActiveConsumer(SubType subscriptionType, int part protected abstract void cancelPendingRead(); -protected void pickAndScheduleActiveConsumer() { +protected void notifyConsumerGroupChanged(Consumer activeConsumer) { +if (null != activeConsumer && subscriptionType == SubType.Failover) { +consumers.forEach(consumer -> Review comment: We should try to avoid to send multiple "inactive" notifications to a consumer. If I was already "inactive", I don't need to be notified when a new consumer is promoted and I'm not involved. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on a change in pull request #1156: Introduce ActiveConsumerListener for realizing if a consumer is active in a failover subscription group
merlimat commented on a change in pull request #1156: Introduce ActiveConsumerListener for realizing if a consumer is active in a failover subscription group URL: https://github.com/apache/incubator-pulsar/pull/1156#discussion_r167413443 ## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java ## @@ -81,20 +82,33 @@ public AbstractDispatcherSingleActiveConsumer(SubType subscriptionType, int part protected abstract void cancelPendingRead(); -protected void pickAndScheduleActiveConsumer() { +protected void notifyConsumerGroupChanged(Consumer activeConsumer) { +if (null != activeConsumer && subscriptionType == SubType.Failover) { +consumers.forEach(consumer -> + consumer.notifyConsumerGroupChange(activeConsumer.consumerId())); Review comment: The `consumerId` is a per-connection identifier (unique within a single TCP connection), but different connections from different clients can very well use the same Id. (The id is just a shortcut to refer to a particular topic/subscription without passing the names each time). For consumers on a single subscription, there is no single unique identifier. `consumerName` is generally a random generated string, though there is collision detection (since collisions wont affect the correctness) and it can be manually overridden. I think the only way is to use the Consumer object identify. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on a change in pull request #1156: Introduce ActiveConsumerListener for realizing if a consumer is active in a failover subscription group
merlimat commented on a change in pull request #1156: Introduce ActiveConsumerListener for realizing if a consumer is active in a failover subscription group URL: https://github.com/apache/incubator-pulsar/pull/1156#discussion_r167413003 ## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java ## @@ -145,6 +146,19 @@ public String consumerName() { return consumerName; } +void notifyConsumerGroupChange(long activeConsumerId) { +if (cnx.getRemoteEndpointProtocolVersion() < ProtocolVersion.v11.getNumber()) { Review comment: One thing we need to be careful here is that C++ code gets compiled and automatically will present itself with the latest version v11 (or whatever). In this case, we should make sure that C++ client can at least discard the notification This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on a change in pull request #1156: Introduce ActiveConsumerListener for realizing if a consumer is active in a failover subscription group
merlimat commented on a change in pull request #1156: Introduce ActiveConsumerListener for realizing if a consumer is active in a failover subscription group URL: https://github.com/apache/incubator-pulsar/pull/1156#discussion_r167413190 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java ## @@ -735,6 +735,20 @@ private void failPendingReceive() { } } +void consumerGroupChanged(long activeConsumerId) { Review comment: `consumerGroupChanged()` --> `activeConsumerChanged()` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on a change in pull request #1156: Introduce ActiveConsumerListener for realizing if a consumer is active in a failover subscription group
merlimat commented on a change in pull request #1156: Introduce ActiveConsumerListener for realizing if a consumer is active in a failover subscription group URL: https://github.com/apache/incubator-pulsar/pull/1156#discussion_r167413168 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/api/ActiveConsumerListener.java ## @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +/** + * Listener on the consumer state changes. + */ +public interface ActiveConsumerListener { Review comment: If we use a single method, it becomes accessible through a lambda. Eg: ```java interface ActiveConsumerListener { void statusChanged(Consumer, int partitionId, Status status); } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services