Right I could do that. Thanks for creating the JIRA!
On Tue, Dec 15, 2015 at 3:01 PM, Jason Gustafson <ja...@confluent.io> wrote: > Hey Rajiv, > > My point was that you could maintain the assignment set yourself in a > field, which would eliminate the need to copy the set returned by > assignment(). Then it's just one copy to convert it to a list, and we can > fix this by adding the assign() variant I suggested above. > > By the way, here's a link to the JIRA I created: > https://issues.apache.org/jira/browse/KAFKA-2991. > > -Jason > > On Tue, Dec 15, 2015 at 2:53 PM, Rajiv Kurian <ra...@signalfx.com> wrote: > > > Hi Jason, > > > > The copying is not a problem in terms of performance. It's just annoying > to > > write the extra code. My point with the copy is that since the client is > > already making a copy when it returns the set to me, why would it matter > if > > I modify the copy. Creating an unmodifiable set on top of a copy seems > > redundant. It would be easiest for us as users to do something like this: > > > > final Set<TopicPartition> partitions = consumer.assignment(); // This > > already returns a copy of the underlying assignment, thus ensuring that > the > > internal data structures are protected. > > partitions.add(myNewTopicPartition); // This is fine to modify since > > consumer.assignment() returns a copy. > > partitions.remove(topicPartitionToBeRemoved); > > consumer.assign(partitions); > > > > Instead we have to do something like this right now. > > > > final Set<TopicPartition> partitions = consumer.assignment(); // This > > returns a copy of the underlying assignment wrapped in an UnmodifiableSet > > which seems redundant. > > final Set<TopicPartition> yetAnotherCopy = new HashSet<>(partitions); // > > We need this copy since consumer.assignment() is unmodifiable, even > though > > it is a copy. > > yetAnotherCopy.add(myNewTopicPartition); > > yetAnotherCopy.remove(topicPartitionToBeRemoved); > > List<TopicPartition> wayTooManyCopies = new ArrayList<>(yetAnotherCopy); > > consumer.assign(wayTooManyCopies); > > > > Thanks, > > Rajiv > > > > > > On Tue, Dec 15, 2015 at 2:35 PM, Jason Gustafson <ja...@confluent.io> > > wrote: > > > > > Hey Rajiv, > > > > > > I agree the Set/List inconsistency is a little unfortunate (another > > > annoying one is pause() which uses a vararg). I think we should > probably > > > add the following variants: > > > > > > assign(Collection<TopicPartition>) > > > subscribe(Collection<String>) > > > pause(Collection<TopicPartition>) > > > > > > I can open a JIRA to fix this. As for returning the unmodifiable set, I > > can > > > see your point, but I think it's a little dangerous for user code to > > depend > > > on being able to modify a collection returned from the API. Making it > > > immutable reduces the coupling with user code and gives us more freedom > > in > > > the future (not that we have any intention of changing the set type, > but > > we > > > could). I think the way I might try to implement your use case would be > > to > > > maintain the assignment set yourself. You can make changes to that set > > and > > > always pass it to assign(), which would avoid the need to use > > assignment(). > > > Also, I probably wouldn't be overly concerned about the copying > overhead > > > unless profiling shows that it is actually a problem. Are your > partition > > > assignments generally very large? > > > > > > -Jason > > > > > > > > > On Tue, Dec 15, 2015 at 1:32 PM, Rajiv Kurian <ra...@signalfx.com> > > wrote: > > > > > > > We are trying to use the Kafka 0.9 consumer API to poll specific > > > > partitions. We consume partitions based on our own logic instead of > > > > delegating that to Kafka. One of our use cases is handling a change > in > > > the > > > > partitions that we consume. This means that sometimes we need to > > consume > > > > additional partitions and other times we need to stop consuming (not > > > pause > > > > but stop entirely) some of the partitions that we are currently > > polling. > > > > > > > > The semantics of the assign() call at > > > > > > > > > > > > > > http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html > > > > is that we need to provide the entire list of subscriptions. So when > we > > > > want to add or remove partitions we call the assignment() method to > get > > > the > > > > existing set of TopicPartitions being polled, and then modify this > set > > > and > > > > pass it back to the assign() call. However it seems weird that the > > > assign() > > > > call takes a List<TopicPartitions> whereas the assignment call > returns > > a > > > > Set<TopicPartitions>. Further the Set returned by the method is an > > > > unmodifiable set which means to change this set we need to create a > new > > > > List/Set from it and then modify the new collection. Looking at the > > code > > > > for the assignment() method further shows that a copy of the > underlying > > > set > > > > is made and then wrapped in an unmodifiable set. The wrapping seems > > > > unnecessary given that a copy is already being made. Excerpt here: > > > > > > > > public Set<TopicPartition> assignment() { > > > > > > > > acquire(); > > > > > > > > try { > > > > > > > > return Collections.unmodifiableSet(new HashSet<>(this. > > > > subscriptions.assignedPartitions())); > > > > > > > > } finally { > > > > > > > > release(); > > > > > > > > } > > > > > > > > } > > > > > > > > Ideally the API would take and return a Set instead of taking in a > List > > > and > > > > returning a Set. Further given that the Set returned is a copy of the > > > > existing assignments, wrapping it in an unmodifiable set seems > overkill > > > > which requires the user of the API to make yet another copy just to > > > modify > > > > what is already a copy. > > > > > > > > Thanks, > > > > Rajiv > > > > > > > > > >