[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace

2018-02-14 Thread GitBox
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide 
`TopicsConsumer` to consume from several topics under same namespace
URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r168178825
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
 ##
 @@ -0,0 +1,881 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.util.ConsumerName;
+import org.apache.pulsar.client.util.FutureUtil;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
+import org.apache.pulsar.common.naming.DestinationName;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TopicsConsumerImpl extends ConsumerBase {
+
+// All topics should be in same namespace
+protected NamespaceName namespaceName;
+
+// Map , when get do ACK, consumer will by find 
by topic name
+private final ConcurrentHashMap consumers;
 
 Review comment:
   Yes, the changes for TopicMessageIdImpl, TopicMessageImpl, and 
UnAckedMessageTracker is aimed to make PartitionedConsumerImpl and 
TopicsConsumerImpl could be easy merged in the future.  opened issue #1236 
tracking this.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace

2018-02-14 Thread GitBox
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide 
`TopicsConsumer` to consume from several topics under same namespace
URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167877509
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
 ##
 @@ -0,0 +1,881 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.util.ConsumerName;
+import org.apache.pulsar.client.util.FutureUtil;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
+import org.apache.pulsar.common.naming.DestinationName;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TopicsConsumerImpl extends ConsumerBase {
+
+// All topics should be in same namespace
+protected NamespaceName namespaceName;
+
+// Map , when get do ACK, consumer will by find 
by topic name
+private final ConcurrentHashMap consumers;
 
 Review comment:
   Thanks. Yes. There is a plan to make this extends PartitionedConsumerImpl, 
that is also the reason that this change keep use a lot of same interface. 
   But it maybe good to make PartitionedConsumerImpl and TopicsConsumerImpl 
separate at first, this could avoid bring bugs into PartitionedConsumerImpl.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace

2018-02-13 Thread GitBox
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide 
`TopicsConsumer` to consume from several topics under same namespace
URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167915637
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
 ##
 @@ -0,0 +1,881 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.util.ConsumerName;
+import org.apache.pulsar.client.util.FutureUtil;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
+import org.apache.pulsar.common.naming.DestinationName;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TopicsConsumerImpl extends ConsumerBase {
+
+// All topics should be in same namespace
+protected NamespaceName namespaceName;
+
+// Map , when get do ACK, consumer will by find 
by topic name
+private final ConcurrentHashMap consumers;
+
+// Map , store partition number for each topic
+private final ConcurrentHashMap topics;
+
+// Queue of partition consumers on which we have stopped calling 
receiveAsync() because the
+// shared incoming queue was full
+private final ConcurrentLinkedQueue pausedConsumers;
+
+// Threshold for the shared queue. When the size of the shared queue goes 
below the threshold, we are going to
+// resume receiving from the paused consumer partitions
+private final int sharedQueueResumeThreshold;
+
+// sum of topicPartitions, simple topic has 1, partitioned topic equals to 
partition number.
+AtomicInteger numberTopicPartitions;
+
+private final ReadWriteLock lock = new ReentrantReadWriteLock();
+private final ConsumerStats stats;
+private final UnAckedMessageTracker unAckedMessageTracker;
+private final ConsumerConfiguration internalConfig;
+
+TopicsConsumerImpl(PulsarClientImpl client, Collection topics, 
String subscription,
+   ConsumerConfiguration conf, ExecutorService 
listenerExecutor,
+   CompletableFuture subscribeFuture) {
+super(client, "TopicsConsumerFakeTopicName" + 
ConsumerName.generateRandomName(), subscription,
+conf, Math.max(2, conf.getReceiverQueueSize()), listenerExecutor,
+subscribeFuture);
+
+checkArgument(conf.getReceiverQueueSize() > 0,
+"Receiver queue size needs to be greater than 0 for Topics 
Consumer");
+
+this.topics = new ConcurrentHashMap<>();
+this.consumers = new ConcurrentHashMap<>();
+this.pausedConsumers = new ConcurrentLinkedQueue<>();
+this.sharedQueueResumeThreshold = maxReceiverQueueSize / 2;
+this.numberTopicPartitions = new AtomicInteger(0);
+
+if (conf.getAckTimeoutMillis() != 0) {
+

[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace

2018-02-13 Thread GitBox
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide 
`TopicsConsumer` to consume from several topics under same namespace
URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167915413
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
 ##
 @@ -0,0 +1,881 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.util.ConsumerName;
+import org.apache.pulsar.client.util.FutureUtil;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
+import org.apache.pulsar.common.naming.DestinationName;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TopicsConsumerImpl extends ConsumerBase {
+
+// All topics should be in same namespace
+protected NamespaceName namespaceName;
+
+// Map , when get do ACK, consumer will by find 
by topic name
+private final ConcurrentHashMap consumers;
+
+// Map , store partition number for each topic
+private final ConcurrentHashMap topics;
+
+// Queue of partition consumers on which we have stopped calling 
receiveAsync() because the
+// shared incoming queue was full
+private final ConcurrentLinkedQueue pausedConsumers;
+
+// Threshold for the shared queue. When the size of the shared queue goes 
below the threshold, we are going to
+// resume receiving from the paused consumer partitions
+private final int sharedQueueResumeThreshold;
+
+// sum of topicPartitions, simple topic has 1, partitioned topic equals to 
partition number.
+AtomicInteger numberTopicPartitions;
+
+private final ReadWriteLock lock = new ReentrantReadWriteLock();
+private final ConsumerStats stats;
+private final UnAckedMessageTracker unAckedMessageTracker;
+private final ConsumerConfiguration internalConfig;
+
+TopicsConsumerImpl(PulsarClientImpl client, Collection topics, 
String subscription,
+   ConsumerConfiguration conf, ExecutorService 
listenerExecutor,
+   CompletableFuture subscribeFuture) {
+super(client, "TopicsConsumerFakeTopicName" + 
ConsumerName.generateRandomName(), subscription,
+conf, Math.max(2, conf.getReceiverQueueSize()), listenerExecutor,
+subscribeFuture);
+
+checkArgument(conf.getReceiverQueueSize() > 0,
+"Receiver queue size needs to be greater than 0 for Topics 
Consumer");
+
+this.topics = new ConcurrentHashMap<>();
+this.consumers = new ConcurrentHashMap<>();
+this.pausedConsumers = new ConcurrentLinkedQueue<>();
+this.sharedQueueResumeThreshold = maxReceiverQueueSize / 2;
+this.numberTopicPartitions = new AtomicInteger(0);
+
+if (conf.getAckTimeoutMillis() != 0) {
+

[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace

2018-02-13 Thread GitBox
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide 
`TopicsConsumer` to consume from several topics under same namespace
URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167915437
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
 ##
 @@ -0,0 +1,881 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.util.ConsumerName;
+import org.apache.pulsar.client.util.FutureUtil;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
+import org.apache.pulsar.common.naming.DestinationName;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TopicsConsumerImpl extends ConsumerBase {
+
+// All topics should be in same namespace
+protected NamespaceName namespaceName;
+
+// Map , when get do ACK, consumer will by find 
by topic name
+private final ConcurrentHashMap consumers;
+
+// Map , store partition number for each topic
+private final ConcurrentHashMap topics;
+
+// Queue of partition consumers on which we have stopped calling 
receiveAsync() because the
+// shared incoming queue was full
+private final ConcurrentLinkedQueue pausedConsumers;
+
+// Threshold for the shared queue. When the size of the shared queue goes 
below the threshold, we are going to
+// resume receiving from the paused consumer partitions
+private final int sharedQueueResumeThreshold;
+
+// sum of topicPartitions, simple topic has 1, partitioned topic equals to 
partition number.
+AtomicInteger numberTopicPartitions;
+
+private final ReadWriteLock lock = new ReentrantReadWriteLock();
+private final ConsumerStats stats;
+private final UnAckedMessageTracker unAckedMessageTracker;
+private final ConsumerConfiguration internalConfig;
+
+TopicsConsumerImpl(PulsarClientImpl client, Collection topics, 
String subscription,
+   ConsumerConfiguration conf, ExecutorService 
listenerExecutor,
+   CompletableFuture subscribeFuture) {
+super(client, "TopicsConsumerFakeTopicName" + 
ConsumerName.generateRandomName(), subscription,
+conf, Math.max(2, conf.getReceiverQueueSize()), listenerExecutor,
+subscribeFuture);
+
+checkArgument(conf.getReceiverQueueSize() > 0,
+"Receiver queue size needs to be greater than 0 for Topics 
Consumer");
+
+this.topics = new ConcurrentHashMap<>();
+this.consumers = new ConcurrentHashMap<>();
+this.pausedConsumers = new ConcurrentLinkedQueue<>();
+this.sharedQueueResumeThreshold = maxReceiverQueueSize / 2;
+this.numberTopicPartitions = new AtomicInteger(0);
+
+if (conf.getAckTimeoutMillis() != 0) {
+

[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace

2018-02-13 Thread GitBox
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide 
`TopicsConsumer` to consume from several topics under same namespace
URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167914612
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/api/Message.java
 ##
 @@ -124,4 +124,14 @@
  * @return the key of the message
  */
 String getKey();
+
+/**
+ * Get the topic name of this message.
+ * This is mainly for TopicsConsumerImpl to identify a message belongs to 
which topic.
 
 Review comment:
   Thanks. will change it


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace

2018-02-13 Thread GitBox
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide 
`TopicsConsumer` to consume from several topics under same namespace
URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167914515
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/api/Message.java
 ##
 @@ -124,4 +124,14 @@
  * @return the key of the message
  */
 String getKey();
+
+/**
+ * Get the topic name of this message.
+ * This is mainly for TopicsConsumerImpl to identify a message belongs to 
which topic.
+ *
+ * @return the topic name
+ */
+default String getTopicName() {
 
 Review comment:
   thanks. will change it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace

2018-02-13 Thread GitBox
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide 
`TopicsConsumer` to consume from several topics under same namespace
URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167911587
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
 ##
 @@ -0,0 +1,881 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.util.ConsumerName;
+import org.apache.pulsar.client.util.FutureUtil;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
+import org.apache.pulsar.common.naming.DestinationName;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TopicsConsumerImpl extends ConsumerBase {
+
+// All topics should be in same namespace
+protected NamespaceName namespaceName;
+
+// Map , when get do ACK, consumer will by find 
by topic name
+private final ConcurrentHashMap consumers;
+
+// Map , store partition number for each topic
+private final ConcurrentHashMap topics;
+
+// Queue of partition consumers on which we have stopped calling 
receiveAsync() because the
+// shared incoming queue was full
+private final ConcurrentLinkedQueue pausedConsumers;
+
+// Threshold for the shared queue. When the size of the shared queue goes 
below the threshold, we are going to
+// resume receiving from the paused consumer partitions
+private final int sharedQueueResumeThreshold;
+
+// sum of topicPartitions, simple topic has 1, partitioned topic equals to 
partition number.
+AtomicInteger numberTopicPartitions;
+
+private final ReadWriteLock lock = new ReentrantReadWriteLock();
+private final ConsumerStats stats;
+private final UnAckedMessageTracker unAckedMessageTracker;
+private final ConsumerConfiguration internalConfig;
+
+TopicsConsumerImpl(PulsarClientImpl client, Collection topics, 
String subscription,
+   ConsumerConfiguration conf, ExecutorService 
listenerExecutor,
+   CompletableFuture subscribeFuture) {
+super(client, "TopicsConsumerFakeTopicName" + 
ConsumerName.generateRandomName(), subscription,
+conf, Math.max(2, conf.getReceiverQueueSize()), listenerExecutor,
+subscribeFuture);
+
+checkArgument(conf.getReceiverQueueSize() > 0,
+"Receiver queue size needs to be greater than 0 for Topics 
Consumer");
+
+this.topics = new ConcurrentHashMap<>();
+this.consumers = new ConcurrentHashMap<>();
+this.pausedConsumers = new ConcurrentLinkedQueue<>();
+this.sharedQueueResumeThreshold = maxReceiverQueueSize / 2;
+this.numberTopicPartitions = new AtomicInteger(0);
+
+if (conf.getAckTimeoutMillis() != 0) {
+

[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace

2018-02-13 Thread GitBox
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide 
`TopicsConsumer` to consume from several topics under same namespace
URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167907583
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
 ##
 @@ -0,0 +1,881 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.util.ConsumerName;
+import org.apache.pulsar.client.util.FutureUtil;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
+import org.apache.pulsar.common.naming.DestinationName;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TopicsConsumerImpl extends ConsumerBase {
+
+// All topics should be in same namespace
+protected NamespaceName namespaceName;
+
+// Map , when get do ACK, consumer will by find 
by topic name
+private final ConcurrentHashMap consumers;
+
+// Map , store partition number for each topic
+private final ConcurrentHashMap topics;
+
+// Queue of partition consumers on which we have stopped calling 
receiveAsync() because the
+// shared incoming queue was full
+private final ConcurrentLinkedQueue pausedConsumers;
+
+// Threshold for the shared queue. When the size of the shared queue goes 
below the threshold, we are going to
+// resume receiving from the paused consumer partitions
+private final int sharedQueueResumeThreshold;
+
+// sum of topicPartitions, simple topic has 1, partitioned topic equals to 
partition number.
+AtomicInteger numberTopicPartitions;
+
+private final ReadWriteLock lock = new ReentrantReadWriteLock();
+private final ConsumerStats stats;
+private final UnAckedMessageTracker unAckedMessageTracker;
+private final ConsumerConfiguration internalConfig;
+
+TopicsConsumerImpl(PulsarClientImpl client, Collection topics, 
String subscription,
+   ConsumerConfiguration conf, ExecutorService 
listenerExecutor,
+   CompletableFuture subscribeFuture) {
+super(client, "TopicsConsumerFakeTopicName" + 
ConsumerName.generateRandomName(), subscription,
+conf, Math.max(2, conf.getReceiverQueueSize()), listenerExecutor,
+subscribeFuture);
+
+checkArgument(conf.getReceiverQueueSize() > 0,
+"Receiver queue size needs to be greater than 0 for Topics 
Consumer");
+
+this.topics = new ConcurrentHashMap<>();
+this.consumers = new ConcurrentHashMap<>();
+this.pausedConsumers = new ConcurrentLinkedQueue<>();
+this.sharedQueueResumeThreshold = maxReceiverQueueSize / 2;
+this.numberTopicPartitions = new AtomicInteger(0);
+
+if (conf.getAckTimeoutMillis() != 0) {
+

[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace

2018-02-13 Thread GitBox
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide 
`TopicsConsumer` to consume from several topics under same namespace
URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167906488
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
 ##
 @@ -18,26 +18,24 @@
  */
 package org.apache.pulsar.client.impl;
 
+import io.netty.util.Timeout;
+import io.netty.util.TimerTask;
 import java.io.Closeable;
-import java.util.ArrayList;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-
+import java.util.function.Predicate;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import io.netty.util.Timeout;
-import io.netty.util.TimerTask;
-
 public class UnAckedMessageTracker implements Closeable {
 
 Review comment:
   Thanks, will make a class UnAckedTopicMessageTracker extends 
UnAckedMessageTracker


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace

2018-02-13 Thread GitBox
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide 
`TopicsConsumer` to consume from several topics under same namespace
URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167879404
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
 ##
 @@ -468,17 +468,20 @@ public void redeliverUnacknowledgedMessages() {
 }
 
 @Override
-public void redeliverUnacknowledgedMessages(Set messageIds) 
{
+public void redeliverUnacknowledgedMessages(Set messageIds) {
 
 Review comment:
   Oh, it is in ConsumerBase.java, sorry for the wrong reference of 
Consumer.java. 
   TopicsMessageIdImpl is more need to be a wrapper for MessageIdImpl, and as 
the reply below, may be better to implements MessageId, instead of extends 
MessageIdImpl.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace

2018-02-13 Thread GitBox
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide 
`TopicsConsumer` to consume from several topics under same namespace
URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167252486
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageId.java
 ##
 @@ -37,6 +36,16 @@
  */
 byte[] toByteArray();
 
+/**
+ * Get the topic name of this MessageId.
+ * This is mainly for TopicsConsumerImpl to identify a message belongs to 
which topic.
+ *
+ * @return the topic name
+ */
+default String getTopicName() {
 
 Review comment:
   Yes. It is as @merlimat metioned. for MessageId, we also need `topicName` 
for `redeliverUnacknowledgedMessages`.
   will remove it from MessageId


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace

2018-02-13 Thread GitBox
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide 
`TopicsConsumer` to consume from several topics under same namespace
URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167877509
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
 ##
 @@ -0,0 +1,881 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.util.ConsumerName;
+import org.apache.pulsar.client.util.FutureUtil;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
+import org.apache.pulsar.common.naming.DestinationName;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TopicsConsumerImpl extends ConsumerBase {
+
+// All topics should be in same namespace
+protected NamespaceName namespaceName;
+
+// Map , when get do ACK, consumer will by find 
by topic name
+private final ConcurrentHashMap consumers;
 
 Review comment:
   Thanks. Yes. There is a plan to make this extends PartitionedConsumerImpl, 
that is also the reason that this change keep use a lot of same interface. 
   But it maybe good to make them separate at first, this could avoid bring 
bugs into PartitionedConsumerImpl.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace

2018-02-13 Thread GitBox
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide 
`TopicsConsumer` to consume from several topics under same namespace
URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167864929
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
 ##
 @@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.client.impl;
+
+import java.util.Map;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+
+public class TopicMessageImpl implements Message {
 
 Review comment:
   Thanks. Here it is also mainly a wrapper for original MessageImpl, the 
original MessageImpl will be used for one internal-sub-consumer. a extend may 
be not easy to handle constructor and get.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace

2018-02-13 Thread GitBox
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide 
`TopicsConsumer` to consume from several topics under same namespace
URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167864929
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
 ##
 @@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.client.impl;
+
+import java.util.Map;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+
+public class TopicMessageImpl implements Message {
 
 Review comment:
   Thanks. Here it is also mainly a wrapper for original MessageImpl, the 
original MessageImpl will be used for one internal-sub-consumer. a extend may 
be not easy to handle it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace

2018-02-13 Thread GitBox
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide 
`TopicsConsumer` to consume from several topics under same namespace
URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167863222
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java
 ##
 @@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import org.apache.pulsar.client.api.MessageId;
+
+public class TopicMessageIdImpl implements MessageId {
 
 Review comment:
   Thanks. Here TopicMessageIdImpl is mainly a wrapper for MessageIdImpl, We 
need keep a reference of MessageIdImpl, because it will be used for one 
internal-sub-consumer. 
   If extending,  the constructor and getInnerMessageIdInner may be not easy to 
handle.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace

2018-02-13 Thread GitBox
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide 
`TopicsConsumer` to consume from several topics under same namespace
URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167863222
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java
 ##
 @@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import org.apache.pulsar.client.api.MessageId;
+
+public class TopicMessageIdImpl implements MessageId {
 
 Review comment:
   Thanks. Here TopicMessageIdImpl is mainly a wrapper for MessageIdImpl, We 
need keep a reference of MessageIdImpl, because it will be used for each 
internal-sub-consumer. 
   If extending,  the constructor and getInnerMessageIdInner may be not easy to 
handle.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace

2018-02-13 Thread GitBox
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide 
`TopicsConsumer` to consume from several topics under same namespace
URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167813669
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
 ##
 @@ -0,0 +1,881 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.util.ConsumerName;
+import org.apache.pulsar.client.util.FutureUtil;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
+import org.apache.pulsar.common.naming.DestinationName;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TopicsConsumerImpl extends ConsumerBase {
+
+// All topics should be in same namespace
+protected NamespaceName namespaceName;
+
+// Map , when get do ACK, consumer will by find 
by topic name
+private final ConcurrentHashMap consumers;
+
+// Map , store partition number for each topic
+private final ConcurrentHashMap topics;
+
+// Queue of partition consumers on which we have stopped calling 
receiveAsync() because the
+// shared incoming queue was full
+private final ConcurrentLinkedQueue pausedConsumers;
+
+// Threshold for the shared queue. When the size of the shared queue goes 
below the threshold, we are going to
+// resume receiving from the paused consumer partitions
+private final int sharedQueueResumeThreshold;
+
+// sum of topicPartitions, simple topic has 1, partitioned topic equals to 
partition number.
+AtomicInteger numberTopicPartitions;
+
+private final ReadWriteLock lock = new ReentrantReadWriteLock();
+private final ConsumerStats stats;
+private final UnAckedMessageTracker unAckedMessageTracker;
+private final ConsumerConfiguration internalConfig;
+
+TopicsConsumerImpl(PulsarClientImpl client, Collection topics, 
String subscription,
+   ConsumerConfiguration conf, ExecutorService 
listenerExecutor,
+   CompletableFuture subscribeFuture) {
+super(client, "TopicsConsumerFakeTopicName" + 
ConsumerName.generateRandomName(), subscription,
+conf, Math.max(2, conf.getReceiverQueueSize()), listenerExecutor,
+subscribeFuture);
+
+checkArgument(conf.getReceiverQueueSize() > 0,
+"Receiver queue size needs to be greater than 0 for Topics 
Consumer");
+
+this.topics = new ConcurrentHashMap<>();
+this.consumers = new ConcurrentHashMap<>();
+this.pausedConsumers = new ConcurrentLinkedQueue<>();
+this.sharedQueueResumeThreshold = maxReceiverQueueSize / 2;
+this.numberTopicPartitions = new AtomicInteger(0);
+
+if (conf.getAckTimeoutMillis() != 0) {
+

[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace

2018-02-09 Thread GitBox
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide 
`TopicsConsumer` to consume from several topics under same namespace
URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167372153
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
 ##
 @@ -0,0 +1,881 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.util.ConsumerName;
+import org.apache.pulsar.client.util.FutureUtil;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
+import org.apache.pulsar.common.naming.DestinationName;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TopicsConsumerImpl extends ConsumerBase {
+
+// All topics should be in same namespace
+protected NamespaceName namespaceName;
+
+// Map , when get do ACK, consumer will by find 
by topic name
+private final ConcurrentHashMap consumers;
+
+// Map , store partition number for each topic
+private final ConcurrentHashMap topics;
+
+// Queue of partition consumers on which we have stopped calling 
receiveAsync() because the
+// shared incoming queue was full
+private final ConcurrentLinkedQueue pausedConsumers;
+
+// Threshold for the shared queue. When the size of the shared queue goes 
below the threshold, we are going to
+// resume receiving from the paused consumer partitions
+private final int sharedQueueResumeThreshold;
+
+// sum of topicPartitions, simple topic has 1, partitioned topic equals to 
partition number.
+AtomicInteger numberTopicPartitions;
+
+private final ReadWriteLock lock = new ReentrantReadWriteLock();
+private final ConsumerStats stats;
+private final UnAckedMessageTracker unAckedMessageTracker;
+private final ConsumerConfiguration internalConfig;
+
+TopicsConsumerImpl(PulsarClientImpl client, Collection topics, 
String subscription,
+   ConsumerConfiguration conf, ExecutorService 
listenerExecutor,
+   CompletableFuture subscribeFuture) {
+super(client, "TopicsConsumerFakeTopicName" + 
ConsumerName.generateRandomName(), subscription,
+conf, Math.max(2, conf.getReceiverQueueSize()), listenerExecutor,
+subscribeFuture);
+
+checkArgument(conf.getReceiverQueueSize() > 0,
+"Receiver queue size needs to be greater than 0 for Topics 
Consumer");
+
+this.topics = new ConcurrentHashMap<>();
+this.consumers = new ConcurrentHashMap<>();
+this.pausedConsumers = new ConcurrentLinkedQueue<>();
+this.sharedQueueResumeThreshold = maxReceiverQueueSize / 2;
+this.numberTopicPartitions = new AtomicInteger(0);
+
+if (conf.getAckTimeoutMillis() != 0) {
+

[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace

2018-02-09 Thread GitBox
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide 
`TopicsConsumer` to consume from several topics under same namespace
URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167264860
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
 ##
 @@ -0,0 +1,881 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.util.ConsumerName;
+import org.apache.pulsar.client.util.FutureUtil;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
+import org.apache.pulsar.common.naming.DestinationName;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TopicsConsumerImpl extends ConsumerBase {
+
+// All topics should be in same namespace
+protected NamespaceName namespaceName;
+
+// Map , when get do ACK, consumer will by find 
by topic name
+private final ConcurrentHashMap consumers;
+
+// Map , store partition number for each topic
+private final ConcurrentHashMap topics;
+
+// Queue of partition consumers on which we have stopped calling 
receiveAsync() because the
+// shared incoming queue was full
+private final ConcurrentLinkedQueue pausedConsumers;
+
+// Threshold for the shared queue. When the size of the shared queue goes 
below the threshold, we are going to
+// resume receiving from the paused consumer partitions
+private final int sharedQueueResumeThreshold;
+
+// sum of topicPartitions, simple topic has 1, partitioned topic equals to 
partition number.
+AtomicInteger numberTopicPartitions;
+
+private final ReadWriteLock lock = new ReentrantReadWriteLock();
+private final ConsumerStats stats;
+private final UnAckedMessageTracker unAckedMessageTracker;
+private final ConsumerConfiguration internalConfig;
+
+TopicsConsumerImpl(PulsarClientImpl client, Collection topics, 
String subscription,
+   ConsumerConfiguration conf, ExecutorService 
listenerExecutor,
+   CompletableFuture subscribeFuture) {
+super(client, "TopicsConsumerFakeTopicName" + 
ConsumerName.generateRandomName(), subscription,
+conf, Math.max(2, conf.getReceiverQueueSize()), listenerExecutor,
+subscribeFuture);
+
+checkArgument(conf.getReceiverQueueSize() > 0,
+"Receiver queue size needs to be greater than 0 for Topics 
Consumer");
+
+this.topics = new ConcurrentHashMap<>();
+this.consumers = new ConcurrentHashMap<>();
+this.pausedConsumers = new ConcurrentLinkedQueue<>();
+this.sharedQueueResumeThreshold = maxReceiverQueueSize / 2;
+this.numberTopicPartitions = new AtomicInteger(0);
+
+if (conf.getAckTimeoutMillis() != 0) {
+

[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace

2018-02-09 Thread GitBox
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide 
`TopicsConsumer` to consume from several topics under same namespace
URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167264860
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
 ##
 @@ -0,0 +1,881 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.util.ConsumerName;
+import org.apache.pulsar.client.util.FutureUtil;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
+import org.apache.pulsar.common.naming.DestinationName;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TopicsConsumerImpl extends ConsumerBase {
+
+// All topics should be in same namespace
+protected NamespaceName namespaceName;
+
+// Map , when get do ACK, consumer will by find 
by topic name
+private final ConcurrentHashMap consumers;
+
+// Map , store partition number for each topic
+private final ConcurrentHashMap topics;
+
+// Queue of partition consumers on which we have stopped calling 
receiveAsync() because the
+// shared incoming queue was full
+private final ConcurrentLinkedQueue pausedConsumers;
+
+// Threshold for the shared queue. When the size of the shared queue goes 
below the threshold, we are going to
+// resume receiving from the paused consumer partitions
+private final int sharedQueueResumeThreshold;
+
+// sum of topicPartitions, simple topic has 1, partitioned topic equals to 
partition number.
+AtomicInteger numberTopicPartitions;
+
+private final ReadWriteLock lock = new ReentrantReadWriteLock();
+private final ConsumerStats stats;
+private final UnAckedMessageTracker unAckedMessageTracker;
+private final ConsumerConfiguration internalConfig;
+
+TopicsConsumerImpl(PulsarClientImpl client, Collection topics, 
String subscription,
+   ConsumerConfiguration conf, ExecutorService 
listenerExecutor,
+   CompletableFuture subscribeFuture) {
+super(client, "TopicsConsumerFakeTopicName" + 
ConsumerName.generateRandomName(), subscription,
+conf, Math.max(2, conf.getReceiverQueueSize()), listenerExecutor,
+subscribeFuture);
+
+checkArgument(conf.getReceiverQueueSize() > 0,
+"Receiver queue size needs to be greater than 0 for Topics 
Consumer");
+
+this.topics = new ConcurrentHashMap<>();
+this.consumers = new ConcurrentHashMap<>();
+this.pausedConsumers = new ConcurrentLinkedQueue<>();
+this.sharedQueueResumeThreshold = maxReceiverQueueSize / 2;
+this.numberTopicPartitions = new AtomicInteger(0);
+
+if (conf.getAckTimeoutMillis() != 0) {
+

[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace

2018-02-09 Thread GitBox
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide 
`TopicsConsumer` to consume from several topics under same namespace
URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167263964
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
 ##
 @@ -18,26 +18,24 @@
  */
 package org.apache.pulsar.client.impl;
 
+import io.netty.util.Timeout;
+import io.netty.util.TimerTask;
 import java.io.Closeable;
-import java.util.ArrayList;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-
+import java.util.function.Predicate;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import io.netty.util.Timeout;
-import io.netty.util.TimerTask;
-
 public class UnAckedMessageTracker implements Closeable {
 
 Review comment:
   Thanks. UnackedMessageTracker is not only a member of ConsumerImpl, it 
should also a member of PartitionedConsumer and TopicsConsumer here. Most of 
the change is to leverage MessageId::compareTo to make the code clearer. and 
seems it not casts the type too often here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace

2018-02-09 Thread GitBox
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide 
`TopicsConsumer` to consume from several topics under same namespace
URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167263964
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
 ##
 @@ -18,26 +18,24 @@
  */
 package org.apache.pulsar.client.impl;
 
+import io.netty.util.Timeout;
+import io.netty.util.TimerTask;
 import java.io.Closeable;
-import java.util.ArrayList;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-
+import java.util.function.Predicate;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import io.netty.util.Timeout;
-import io.netty.util.TimerTask;
-
 public class UnAckedMessageTracker implements Closeable {
 
 Review comment:
   UnackedMessageTracker is not only a member of ConsumerImpl, it should also a 
member of PartitionedConsumer and TopicsConsumer here. Most of the change is to 
leverage MessageId::compareTo to make the code clearer. and seems it not casts 
the type too often here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace

2018-02-09 Thread GitBox
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide 
`TopicsConsumer` to consume from several topics under same namespace
URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167261106
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
 ##
 @@ -468,17 +468,20 @@ public void redeliverUnacknowledgedMessages() {
 }
 
 @Override
-public void redeliverUnacknowledgedMessages(Set messageIds) 
{
+public void redeliverUnacknowledgedMessages(Set messageIds) {
 
 Review comment:
   Thanks, As above reply, we also need this redeliverUnacknowledgedMessages 
method in Consumer.java  handling TopicMessageIdImpl. would like to change this 
in  Consumer.java  and make the case in each child class.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace

2018-02-09 Thread GitBox
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide 
`TopicsConsumer` to consume from several topics under same namespace
URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167258867
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
 ##
 @@ -0,0 +1,881 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.util.ConsumerName;
+import org.apache.pulsar.client.util.FutureUtil;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
+import org.apache.pulsar.common.naming.DestinationName;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TopicsConsumerImpl extends ConsumerBase {
 
 Review comment:
   HandlerBase seems only useful for ConsumerImpl and ProducerImpl, There was 
an issue opened, which plan to remove it from ConsumerBase. 
   Since ConsumerBase has did some of the  work, would like to leverage it and 
not to do replicated work.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace

2018-02-09 Thread GitBox
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide 
`TopicsConsumer` to consume from several topics under same namespace
URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167252486
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageId.java
 ##
 @@ -37,6 +36,16 @@
  */
 byte[] toByteArray();
 
+/**
+ * Get the topic name of this MessageId.
+ * This is mainly for TopicsConsumerImpl to identify a message belongs to 
which topic.
+ *
+ * @return the topic name
+ */
+default String getTopicName() {
 
 Review comment:
   Yes. It is as @merlimat metioned. for MessageId, we also need `topicName` 
for `redeliverUnacknowledgedMessages`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services