[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r168178825 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java ## @@ -0,0 +1,881 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.collect.Lists; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerConfiguration; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.util.ConsumerName; +import org.apache.pulsar.client.util.FutureUtil; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; +import org.apache.pulsar.common.naming.DestinationName; +import org.apache.pulsar.common.naming.NamespaceName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TopicsConsumerImpl extends ConsumerBase { + +// All topics should be in same namespace +protected NamespaceName namespaceName; + +// Map, when get do ACK, consumer will by find by topic name +private final ConcurrentHashMap consumers; Review comment: Yes, the changes for TopicMessageIdImpl, TopicMessageImpl, and UnAckedMessageTracker is aimed to make PartitionedConsumerImpl and TopicsConsumerImpl could be easy merged in the future. opened issue #1236 tracking this. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167877509 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java ## @@ -0,0 +1,881 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.collect.Lists; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerConfiguration; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.util.ConsumerName; +import org.apache.pulsar.client.util.FutureUtil; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; +import org.apache.pulsar.common.naming.DestinationName; +import org.apache.pulsar.common.naming.NamespaceName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TopicsConsumerImpl extends ConsumerBase { + +// All topics should be in same namespace +protected NamespaceName namespaceName; + +// Map, when get do ACK, consumer will by find by topic name +private final ConcurrentHashMap consumers; Review comment: Thanks. Yes. There is a plan to make this extends PartitionedConsumerImpl, that is also the reason that this change keep use a lot of same interface. But it maybe good to make PartitionedConsumerImpl and TopicsConsumerImpl separate at first, this could avoid bring bugs into PartitionedConsumerImpl. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167915637 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java ## @@ -0,0 +1,881 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.collect.Lists; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerConfiguration; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.util.ConsumerName; +import org.apache.pulsar.client.util.FutureUtil; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; +import org.apache.pulsar.common.naming.DestinationName; +import org.apache.pulsar.common.naming.NamespaceName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TopicsConsumerImpl extends ConsumerBase { + +// All topics should be in same namespace +protected NamespaceName namespaceName; + +// Map, when get do ACK, consumer will by find by topic name +private final ConcurrentHashMap consumers; + +// Map , store partition number for each topic +private final ConcurrentHashMap topics; + +// Queue of partition consumers on which we have stopped calling receiveAsync() because the +// shared incoming queue was full +private final ConcurrentLinkedQueue pausedConsumers; + +// Threshold for the shared queue. When the size of the shared queue goes below the threshold, we are going to +// resume receiving from the paused consumer partitions +private final int sharedQueueResumeThreshold; + +// sum of topicPartitions, simple topic has 1, partitioned topic equals to partition number. +AtomicInteger numberTopicPartitions; + +private final ReadWriteLock lock = new ReentrantReadWriteLock(); +private final ConsumerStats stats; +private final UnAckedMessageTracker unAckedMessageTracker; +private final ConsumerConfiguration internalConfig; + +TopicsConsumerImpl(PulsarClientImpl client, Collection topics, String subscription, + ConsumerConfiguration conf, ExecutorService listenerExecutor, + CompletableFuture subscribeFuture) { +super(client, "TopicsConsumerFakeTopicName" + ConsumerName.generateRandomName(), subscription, +conf, Math.max(2, conf.getReceiverQueueSize()), listenerExecutor, +subscribeFuture); + +checkArgument(conf.getReceiverQueueSize() > 0, +"Receiver queue size needs to be greater than 0 for Topics Consumer"); + +this.topics = new ConcurrentHashMap<>(); +this.consumers = new ConcurrentHashMap<>(); +this.pausedConsumers = new ConcurrentLinkedQueue<>(); +this.sharedQueueResumeThreshold = maxReceiverQueueSize / 2; +this.numberTopicPartitions = new AtomicInteger(0); + +if (conf.getAckTimeoutMillis() != 0) { +
[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167915413 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java ## @@ -0,0 +1,881 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.collect.Lists; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerConfiguration; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.util.ConsumerName; +import org.apache.pulsar.client.util.FutureUtil; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; +import org.apache.pulsar.common.naming.DestinationName; +import org.apache.pulsar.common.naming.NamespaceName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TopicsConsumerImpl extends ConsumerBase { + +// All topics should be in same namespace +protected NamespaceName namespaceName; + +// Map, when get do ACK, consumer will by find by topic name +private final ConcurrentHashMap consumers; + +// Map , store partition number for each topic +private final ConcurrentHashMap topics; + +// Queue of partition consumers on which we have stopped calling receiveAsync() because the +// shared incoming queue was full +private final ConcurrentLinkedQueue pausedConsumers; + +// Threshold for the shared queue. When the size of the shared queue goes below the threshold, we are going to +// resume receiving from the paused consumer partitions +private final int sharedQueueResumeThreshold; + +// sum of topicPartitions, simple topic has 1, partitioned topic equals to partition number. +AtomicInteger numberTopicPartitions; + +private final ReadWriteLock lock = new ReentrantReadWriteLock(); +private final ConsumerStats stats; +private final UnAckedMessageTracker unAckedMessageTracker; +private final ConsumerConfiguration internalConfig; + +TopicsConsumerImpl(PulsarClientImpl client, Collection topics, String subscription, + ConsumerConfiguration conf, ExecutorService listenerExecutor, + CompletableFuture subscribeFuture) { +super(client, "TopicsConsumerFakeTopicName" + ConsumerName.generateRandomName(), subscription, +conf, Math.max(2, conf.getReceiverQueueSize()), listenerExecutor, +subscribeFuture); + +checkArgument(conf.getReceiverQueueSize() > 0, +"Receiver queue size needs to be greater than 0 for Topics Consumer"); + +this.topics = new ConcurrentHashMap<>(); +this.consumers = new ConcurrentHashMap<>(); +this.pausedConsumers = new ConcurrentLinkedQueue<>(); +this.sharedQueueResumeThreshold = maxReceiverQueueSize / 2; +this.numberTopicPartitions = new AtomicInteger(0); + +if (conf.getAckTimeoutMillis() != 0) { +
[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167915437 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java ## @@ -0,0 +1,881 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.collect.Lists; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerConfiguration; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.util.ConsumerName; +import org.apache.pulsar.client.util.FutureUtil; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; +import org.apache.pulsar.common.naming.DestinationName; +import org.apache.pulsar.common.naming.NamespaceName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TopicsConsumerImpl extends ConsumerBase { + +// All topics should be in same namespace +protected NamespaceName namespaceName; + +// Map, when get do ACK, consumer will by find by topic name +private final ConcurrentHashMap consumers; + +// Map , store partition number for each topic +private final ConcurrentHashMap topics; + +// Queue of partition consumers on which we have stopped calling receiveAsync() because the +// shared incoming queue was full +private final ConcurrentLinkedQueue pausedConsumers; + +// Threshold for the shared queue. When the size of the shared queue goes below the threshold, we are going to +// resume receiving from the paused consumer partitions +private final int sharedQueueResumeThreshold; + +// sum of topicPartitions, simple topic has 1, partitioned topic equals to partition number. +AtomicInteger numberTopicPartitions; + +private final ReadWriteLock lock = new ReentrantReadWriteLock(); +private final ConsumerStats stats; +private final UnAckedMessageTracker unAckedMessageTracker; +private final ConsumerConfiguration internalConfig; + +TopicsConsumerImpl(PulsarClientImpl client, Collection topics, String subscription, + ConsumerConfiguration conf, ExecutorService listenerExecutor, + CompletableFuture subscribeFuture) { +super(client, "TopicsConsumerFakeTopicName" + ConsumerName.generateRandomName(), subscription, +conf, Math.max(2, conf.getReceiverQueueSize()), listenerExecutor, +subscribeFuture); + +checkArgument(conf.getReceiverQueueSize() > 0, +"Receiver queue size needs to be greater than 0 for Topics Consumer"); + +this.topics = new ConcurrentHashMap<>(); +this.consumers = new ConcurrentHashMap<>(); +this.pausedConsumers = new ConcurrentLinkedQueue<>(); +this.sharedQueueResumeThreshold = maxReceiverQueueSize / 2; +this.numberTopicPartitions = new AtomicInteger(0); + +if (conf.getAckTimeoutMillis() != 0) { +
[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167914612 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/api/Message.java ## @@ -124,4 +124,14 @@ * @return the key of the message */ String getKey(); + +/** + * Get the topic name of this message. + * This is mainly for TopicsConsumerImpl to identify a message belongs to which topic. Review comment: Thanks. will change it This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167914515 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/api/Message.java ## @@ -124,4 +124,14 @@ * @return the key of the message */ String getKey(); + +/** + * Get the topic name of this message. + * This is mainly for TopicsConsumerImpl to identify a message belongs to which topic. + * + * @return the topic name + */ +default String getTopicName() { Review comment: thanks. will change it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167911587 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java ## @@ -0,0 +1,881 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.collect.Lists; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerConfiguration; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.util.ConsumerName; +import org.apache.pulsar.client.util.FutureUtil; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; +import org.apache.pulsar.common.naming.DestinationName; +import org.apache.pulsar.common.naming.NamespaceName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TopicsConsumerImpl extends ConsumerBase { + +// All topics should be in same namespace +protected NamespaceName namespaceName; + +// Map, when get do ACK, consumer will by find by topic name +private final ConcurrentHashMap consumers; + +// Map , store partition number for each topic +private final ConcurrentHashMap topics; + +// Queue of partition consumers on which we have stopped calling receiveAsync() because the +// shared incoming queue was full +private final ConcurrentLinkedQueue pausedConsumers; + +// Threshold for the shared queue. When the size of the shared queue goes below the threshold, we are going to +// resume receiving from the paused consumer partitions +private final int sharedQueueResumeThreshold; + +// sum of topicPartitions, simple topic has 1, partitioned topic equals to partition number. +AtomicInteger numberTopicPartitions; + +private final ReadWriteLock lock = new ReentrantReadWriteLock(); +private final ConsumerStats stats; +private final UnAckedMessageTracker unAckedMessageTracker; +private final ConsumerConfiguration internalConfig; + +TopicsConsumerImpl(PulsarClientImpl client, Collection topics, String subscription, + ConsumerConfiguration conf, ExecutorService listenerExecutor, + CompletableFuture subscribeFuture) { +super(client, "TopicsConsumerFakeTopicName" + ConsumerName.generateRandomName(), subscription, +conf, Math.max(2, conf.getReceiverQueueSize()), listenerExecutor, +subscribeFuture); + +checkArgument(conf.getReceiverQueueSize() > 0, +"Receiver queue size needs to be greater than 0 for Topics Consumer"); + +this.topics = new ConcurrentHashMap<>(); +this.consumers = new ConcurrentHashMap<>(); +this.pausedConsumers = new ConcurrentLinkedQueue<>(); +this.sharedQueueResumeThreshold = maxReceiverQueueSize / 2; +this.numberTopicPartitions = new AtomicInteger(0); + +if (conf.getAckTimeoutMillis() != 0) { +
[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167907583 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java ## @@ -0,0 +1,881 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.collect.Lists; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerConfiguration; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.util.ConsumerName; +import org.apache.pulsar.client.util.FutureUtil; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; +import org.apache.pulsar.common.naming.DestinationName; +import org.apache.pulsar.common.naming.NamespaceName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TopicsConsumerImpl extends ConsumerBase { + +// All topics should be in same namespace +protected NamespaceName namespaceName; + +// Map, when get do ACK, consumer will by find by topic name +private final ConcurrentHashMap consumers; + +// Map , store partition number for each topic +private final ConcurrentHashMap topics; + +// Queue of partition consumers on which we have stopped calling receiveAsync() because the +// shared incoming queue was full +private final ConcurrentLinkedQueue pausedConsumers; + +// Threshold for the shared queue. When the size of the shared queue goes below the threshold, we are going to +// resume receiving from the paused consumer partitions +private final int sharedQueueResumeThreshold; + +// sum of topicPartitions, simple topic has 1, partitioned topic equals to partition number. +AtomicInteger numberTopicPartitions; + +private final ReadWriteLock lock = new ReentrantReadWriteLock(); +private final ConsumerStats stats; +private final UnAckedMessageTracker unAckedMessageTracker; +private final ConsumerConfiguration internalConfig; + +TopicsConsumerImpl(PulsarClientImpl client, Collection topics, String subscription, + ConsumerConfiguration conf, ExecutorService listenerExecutor, + CompletableFuture subscribeFuture) { +super(client, "TopicsConsumerFakeTopicName" + ConsumerName.generateRandomName(), subscription, +conf, Math.max(2, conf.getReceiverQueueSize()), listenerExecutor, +subscribeFuture); + +checkArgument(conf.getReceiverQueueSize() > 0, +"Receiver queue size needs to be greater than 0 for Topics Consumer"); + +this.topics = new ConcurrentHashMap<>(); +this.consumers = new ConcurrentHashMap<>(); +this.pausedConsumers = new ConcurrentLinkedQueue<>(); +this.sharedQueueResumeThreshold = maxReceiverQueueSize / 2; +this.numberTopicPartitions = new AtomicInteger(0); + +if (conf.getAckTimeoutMillis() != 0) { +
[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167906488 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java ## @@ -18,26 +18,24 @@ */ package org.apache.pulsar.client.impl; +import io.netty.util.Timeout; +import io.netty.util.TimerTask; import java.io.Closeable; -import java.util.ArrayList; import java.util.HashSet; -import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock; - +import java.util.function.Predicate; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.netty.util.Timeout; -import io.netty.util.TimerTask; - public class UnAckedMessageTracker implements Closeable { Review comment: Thanks, will make a class UnAckedTopicMessageTracker extends UnAckedMessageTracker This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167879404 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java ## @@ -468,17 +468,20 @@ public void redeliverUnacknowledgedMessages() { } @Override -public void redeliverUnacknowledgedMessages(Set messageIds) { +public void redeliverUnacknowledgedMessages(Set messageIds) { Review comment: Oh, it is in ConsumerBase.java, sorry for the wrong reference of Consumer.java. TopicsMessageIdImpl is more need to be a wrapper for MessageIdImpl, and as the reply below, may be better to implements MessageId, instead of extends MessageIdImpl. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167252486 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageId.java ## @@ -37,6 +36,16 @@ */ byte[] toByteArray(); +/** + * Get the topic name of this MessageId. + * This is mainly for TopicsConsumerImpl to identify a message belongs to which topic. + * + * @return the topic name + */ +default String getTopicName() { Review comment: Yes. It is as @merlimat metioned. for MessageId, we also need `topicName` for `redeliverUnacknowledgedMessages`. will remove it from MessageId This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167877509 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java ## @@ -0,0 +1,881 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.collect.Lists; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerConfiguration; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.util.ConsumerName; +import org.apache.pulsar.client.util.FutureUtil; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; +import org.apache.pulsar.common.naming.DestinationName; +import org.apache.pulsar.common.naming.NamespaceName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TopicsConsumerImpl extends ConsumerBase { + +// All topics should be in same namespace +protected NamespaceName namespaceName; + +// Map, when get do ACK, consumer will by find by topic name +private final ConcurrentHashMap consumers; Review comment: Thanks. Yes. There is a plan to make this extends PartitionedConsumerImpl, that is also the reason that this change keep use a lot of same interface. But it maybe good to make them separate at first, this could avoid bring bugs into PartitionedConsumerImpl. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167864929 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java ## @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.client.impl; + +import java.util.Map; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; + +public class TopicMessageImpl implements Message { Review comment: Thanks. Here it is also mainly a wrapper for original MessageImpl, the original MessageImpl will be used for one internal-sub-consumer. a extend may be not easy to handle constructor and get. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167864929 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java ## @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.client.impl; + +import java.util.Map; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; + +public class TopicMessageImpl implements Message { Review comment: Thanks. Here it is also mainly a wrapper for original MessageImpl, the original MessageImpl will be used for one internal-sub-consumer. a extend may be not easy to handle it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167863222 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java ## @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import org.apache.pulsar.client.api.MessageId; + +public class TopicMessageIdImpl implements MessageId { Review comment: Thanks. Here TopicMessageIdImpl is mainly a wrapper for MessageIdImpl, We need keep a reference of MessageIdImpl, because it will be used for one internal-sub-consumer. If extending, the constructor and getInnerMessageIdInner may be not easy to handle. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167863222 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java ## @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import org.apache.pulsar.client.api.MessageId; + +public class TopicMessageIdImpl implements MessageId { Review comment: Thanks. Here TopicMessageIdImpl is mainly a wrapper for MessageIdImpl, We need keep a reference of MessageIdImpl, because it will be used for each internal-sub-consumer. If extending, the constructor and getInnerMessageIdInner may be not easy to handle. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167813669 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java ## @@ -0,0 +1,881 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.collect.Lists; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerConfiguration; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.util.ConsumerName; +import org.apache.pulsar.client.util.FutureUtil; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; +import org.apache.pulsar.common.naming.DestinationName; +import org.apache.pulsar.common.naming.NamespaceName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TopicsConsumerImpl extends ConsumerBase { + +// All topics should be in same namespace +protected NamespaceName namespaceName; + +// Map, when get do ACK, consumer will by find by topic name +private final ConcurrentHashMap consumers; + +// Map , store partition number for each topic +private final ConcurrentHashMap topics; + +// Queue of partition consumers on which we have stopped calling receiveAsync() because the +// shared incoming queue was full +private final ConcurrentLinkedQueue pausedConsumers; + +// Threshold for the shared queue. When the size of the shared queue goes below the threshold, we are going to +// resume receiving from the paused consumer partitions +private final int sharedQueueResumeThreshold; + +// sum of topicPartitions, simple topic has 1, partitioned topic equals to partition number. +AtomicInteger numberTopicPartitions; + +private final ReadWriteLock lock = new ReentrantReadWriteLock(); +private final ConsumerStats stats; +private final UnAckedMessageTracker unAckedMessageTracker; +private final ConsumerConfiguration internalConfig; + +TopicsConsumerImpl(PulsarClientImpl client, Collection topics, String subscription, + ConsumerConfiguration conf, ExecutorService listenerExecutor, + CompletableFuture subscribeFuture) { +super(client, "TopicsConsumerFakeTopicName" + ConsumerName.generateRandomName(), subscription, +conf, Math.max(2, conf.getReceiverQueueSize()), listenerExecutor, +subscribeFuture); + +checkArgument(conf.getReceiverQueueSize() > 0, +"Receiver queue size needs to be greater than 0 for Topics Consumer"); + +this.topics = new ConcurrentHashMap<>(); +this.consumers = new ConcurrentHashMap<>(); +this.pausedConsumers = new ConcurrentLinkedQueue<>(); +this.sharedQueueResumeThreshold = maxReceiverQueueSize / 2; +this.numberTopicPartitions = new AtomicInteger(0); + +if (conf.getAckTimeoutMillis() != 0) { +
[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167372153 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java ## @@ -0,0 +1,881 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.collect.Lists; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerConfiguration; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.util.ConsumerName; +import org.apache.pulsar.client.util.FutureUtil; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; +import org.apache.pulsar.common.naming.DestinationName; +import org.apache.pulsar.common.naming.NamespaceName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TopicsConsumerImpl extends ConsumerBase { + +// All topics should be in same namespace +protected NamespaceName namespaceName; + +// Map, when get do ACK, consumer will by find by topic name +private final ConcurrentHashMap consumers; + +// Map , store partition number for each topic +private final ConcurrentHashMap topics; + +// Queue of partition consumers on which we have stopped calling receiveAsync() because the +// shared incoming queue was full +private final ConcurrentLinkedQueue pausedConsumers; + +// Threshold for the shared queue. When the size of the shared queue goes below the threshold, we are going to +// resume receiving from the paused consumer partitions +private final int sharedQueueResumeThreshold; + +// sum of topicPartitions, simple topic has 1, partitioned topic equals to partition number. +AtomicInteger numberTopicPartitions; + +private final ReadWriteLock lock = new ReentrantReadWriteLock(); +private final ConsumerStats stats; +private final UnAckedMessageTracker unAckedMessageTracker; +private final ConsumerConfiguration internalConfig; + +TopicsConsumerImpl(PulsarClientImpl client, Collection topics, String subscription, + ConsumerConfiguration conf, ExecutorService listenerExecutor, + CompletableFuture subscribeFuture) { +super(client, "TopicsConsumerFakeTopicName" + ConsumerName.generateRandomName(), subscription, +conf, Math.max(2, conf.getReceiverQueueSize()), listenerExecutor, +subscribeFuture); + +checkArgument(conf.getReceiverQueueSize() > 0, +"Receiver queue size needs to be greater than 0 for Topics Consumer"); + +this.topics = new ConcurrentHashMap<>(); +this.consumers = new ConcurrentHashMap<>(); +this.pausedConsumers = new ConcurrentLinkedQueue<>(); +this.sharedQueueResumeThreshold = maxReceiverQueueSize / 2; +this.numberTopicPartitions = new AtomicInteger(0); + +if (conf.getAckTimeoutMillis() != 0) { +
[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167264860 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java ## @@ -0,0 +1,881 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.collect.Lists; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerConfiguration; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.util.ConsumerName; +import org.apache.pulsar.client.util.FutureUtil; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; +import org.apache.pulsar.common.naming.DestinationName; +import org.apache.pulsar.common.naming.NamespaceName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TopicsConsumerImpl extends ConsumerBase { + +// All topics should be in same namespace +protected NamespaceName namespaceName; + +// Map, when get do ACK, consumer will by find by topic name +private final ConcurrentHashMap consumers; + +// Map , store partition number for each topic +private final ConcurrentHashMap topics; + +// Queue of partition consumers on which we have stopped calling receiveAsync() because the +// shared incoming queue was full +private final ConcurrentLinkedQueue pausedConsumers; + +// Threshold for the shared queue. When the size of the shared queue goes below the threshold, we are going to +// resume receiving from the paused consumer partitions +private final int sharedQueueResumeThreshold; + +// sum of topicPartitions, simple topic has 1, partitioned topic equals to partition number. +AtomicInteger numberTopicPartitions; + +private final ReadWriteLock lock = new ReentrantReadWriteLock(); +private final ConsumerStats stats; +private final UnAckedMessageTracker unAckedMessageTracker; +private final ConsumerConfiguration internalConfig; + +TopicsConsumerImpl(PulsarClientImpl client, Collection topics, String subscription, + ConsumerConfiguration conf, ExecutorService listenerExecutor, + CompletableFuture subscribeFuture) { +super(client, "TopicsConsumerFakeTopicName" + ConsumerName.generateRandomName(), subscription, +conf, Math.max(2, conf.getReceiverQueueSize()), listenerExecutor, +subscribeFuture); + +checkArgument(conf.getReceiverQueueSize() > 0, +"Receiver queue size needs to be greater than 0 for Topics Consumer"); + +this.topics = new ConcurrentHashMap<>(); +this.consumers = new ConcurrentHashMap<>(); +this.pausedConsumers = new ConcurrentLinkedQueue<>(); +this.sharedQueueResumeThreshold = maxReceiverQueueSize / 2; +this.numberTopicPartitions = new AtomicInteger(0); + +if (conf.getAckTimeoutMillis() != 0) { +
[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167264860 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java ## @@ -0,0 +1,881 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.collect.Lists; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerConfiguration; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.util.ConsumerName; +import org.apache.pulsar.client.util.FutureUtil; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; +import org.apache.pulsar.common.naming.DestinationName; +import org.apache.pulsar.common.naming.NamespaceName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TopicsConsumerImpl extends ConsumerBase { + +// All topics should be in same namespace +protected NamespaceName namespaceName; + +// Map, when get do ACK, consumer will by find by topic name +private final ConcurrentHashMap consumers; + +// Map , store partition number for each topic +private final ConcurrentHashMap topics; + +// Queue of partition consumers on which we have stopped calling receiveAsync() because the +// shared incoming queue was full +private final ConcurrentLinkedQueue pausedConsumers; + +// Threshold for the shared queue. When the size of the shared queue goes below the threshold, we are going to +// resume receiving from the paused consumer partitions +private final int sharedQueueResumeThreshold; + +// sum of topicPartitions, simple topic has 1, partitioned topic equals to partition number. +AtomicInteger numberTopicPartitions; + +private final ReadWriteLock lock = new ReentrantReadWriteLock(); +private final ConsumerStats stats; +private final UnAckedMessageTracker unAckedMessageTracker; +private final ConsumerConfiguration internalConfig; + +TopicsConsumerImpl(PulsarClientImpl client, Collection topics, String subscription, + ConsumerConfiguration conf, ExecutorService listenerExecutor, + CompletableFuture subscribeFuture) { +super(client, "TopicsConsumerFakeTopicName" + ConsumerName.generateRandomName(), subscription, +conf, Math.max(2, conf.getReceiverQueueSize()), listenerExecutor, +subscribeFuture); + +checkArgument(conf.getReceiverQueueSize() > 0, +"Receiver queue size needs to be greater than 0 for Topics Consumer"); + +this.topics = new ConcurrentHashMap<>(); +this.consumers = new ConcurrentHashMap<>(); +this.pausedConsumers = new ConcurrentLinkedQueue<>(); +this.sharedQueueResumeThreshold = maxReceiverQueueSize / 2; +this.numberTopicPartitions = new AtomicInteger(0); + +if (conf.getAckTimeoutMillis() != 0) { +
[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167263964 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java ## @@ -18,26 +18,24 @@ */ package org.apache.pulsar.client.impl; +import io.netty.util.Timeout; +import io.netty.util.TimerTask; import java.io.Closeable; -import java.util.ArrayList; import java.util.HashSet; -import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock; - +import java.util.function.Predicate; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.netty.util.Timeout; -import io.netty.util.TimerTask; - public class UnAckedMessageTracker implements Closeable { Review comment: Thanks. UnackedMessageTracker is not only a member of ConsumerImpl, it should also a member of PartitionedConsumer and TopicsConsumer here. Most of the change is to leverage MessageId::compareTo to make the code clearer. and seems it not casts the type too often here. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167263964 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java ## @@ -18,26 +18,24 @@ */ package org.apache.pulsar.client.impl; +import io.netty.util.Timeout; +import io.netty.util.TimerTask; import java.io.Closeable; -import java.util.ArrayList; import java.util.HashSet; -import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock; - +import java.util.function.Predicate; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.netty.util.Timeout; -import io.netty.util.TimerTask; - public class UnAckedMessageTracker implements Closeable { Review comment: UnackedMessageTracker is not only a member of ConsumerImpl, it should also a member of PartitionedConsumer and TopicsConsumer here. Most of the change is to leverage MessageId::compareTo to make the code clearer. and seems it not casts the type too often here. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167261106 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java ## @@ -468,17 +468,20 @@ public void redeliverUnacknowledgedMessages() { } @Override -public void redeliverUnacknowledgedMessages(Set messageIds) { +public void redeliverUnacknowledgedMessages(Set messageIds) { Review comment: Thanks, As above reply, we also need this redeliverUnacknowledgedMessages method in Consumer.java handling TopicMessageIdImpl. would like to change this in Consumer.java and make the case in each child class. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167258867 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java ## @@ -0,0 +1,881 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.collect.Lists; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerConfiguration; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.util.ConsumerName; +import org.apache.pulsar.client.util.FutureUtil; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; +import org.apache.pulsar.common.naming.DestinationName; +import org.apache.pulsar.common.naming.NamespaceName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TopicsConsumerImpl extends ConsumerBase { Review comment: HandlerBase seems only useful for ConsumerImpl and ProducerImpl, There was an issue opened, which plan to remove it from ConsumerBase. Since ConsumerBase has did some of the work, would like to leverage it and not to do replicated work. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167252486 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageId.java ## @@ -37,6 +36,16 @@ */ byte[] toByteArray(); +/** + * Get the topic name of this MessageId. + * This is mainly for TopicsConsumerImpl to identify a message belongs to which topic. + * + * @return the topic name + */ +default String getTopicName() { Review comment: Yes. It is as @merlimat metioned. for MessageId, we also need `topicName` for `redeliverUnacknowledgedMessages` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services