Hi Jason, The copying is not a problem in terms of performance. It's just annoying to write the extra code. My point with the copy is that since the client is already making a copy when it returns the set to me, why would it matter if I modify the copy. Creating an unmodifiable set on top of a copy seems redundant. It would be easiest for us as users to do something like this:
final Set<TopicPartition> partitions = consumer.assignment(); // This already returns a copy of the underlying assignment, thus ensuring that the internal data structures are protected. partitions.add(myNewTopicPartition); // This is fine to modify since consumer.assignment() returns a copy. partitions.remove(topicPartitionToBeRemoved); consumer.assign(partitions); Instead we have to do something like this right now. final Set<TopicPartition> partitions = consumer.assignment(); // This returns a copy of the underlying assignment wrapped in an UnmodifiableSet which seems redundant. final Set<TopicPartition> yetAnotherCopy = new HashSet<>(partitions); // We need this copy since consumer.assignment() is unmodifiable, even though it is a copy. yetAnotherCopy.add(myNewTopicPartition); yetAnotherCopy.remove(topicPartitionToBeRemoved); List<TopicPartition> wayTooManyCopies = new ArrayList<>(yetAnotherCopy); consumer.assign(wayTooManyCopies); Thanks, Rajiv On Tue, Dec 15, 2015 at 2:35 PM, Jason Gustafson <ja...@confluent.io> wrote: > Hey Rajiv, > > I agree the Set/List inconsistency is a little unfortunate (another > annoying one is pause() which uses a vararg). I think we should probably > add the following variants: > > assign(Collection<TopicPartition>) > subscribe(Collection<String>) > pause(Collection<TopicPartition>) > > I can open a JIRA to fix this. As for returning the unmodifiable set, I can > see your point, but I think it's a little dangerous for user code to depend > on being able to modify a collection returned from the API. Making it > immutable reduces the coupling with user code and gives us more freedom in > the future (not that we have any intention of changing the set type, but we > could). I think the way I might try to implement your use case would be to > maintain the assignment set yourself. You can make changes to that set and > always pass it to assign(), which would avoid the need to use assignment(). > Also, I probably wouldn't be overly concerned about the copying overhead > unless profiling shows that it is actually a problem. Are your partition > assignments generally very large? > > -Jason > > > On Tue, Dec 15, 2015 at 1:32 PM, Rajiv Kurian <ra...@signalfx.com> wrote: > > > We are trying to use the Kafka 0.9 consumer API to poll specific > > partitions. We consume partitions based on our own logic instead of > > delegating that to Kafka. One of our use cases is handling a change in > the > > partitions that we consume. This means that sometimes we need to consume > > additional partitions and other times we need to stop consuming (not > pause > > but stop entirely) some of the partitions that we are currently polling. > > > > The semantics of the assign() call at > > > > > http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html > > is that we need to provide the entire list of subscriptions. So when we > > want to add or remove partitions we call the assignment() method to get > the > > existing set of TopicPartitions being polled, and then modify this set > and > > pass it back to the assign() call. However it seems weird that the > assign() > > call takes a List<TopicPartitions> whereas the assignment call returns a > > Set<TopicPartitions>. Further the Set returned by the method is an > > unmodifiable set which means to change this set we need to create a new > > List/Set from it and then modify the new collection. Looking at the code > > for the assignment() method further shows that a copy of the underlying > set > > is made and then wrapped in an unmodifiable set. The wrapping seems > > unnecessary given that a copy is already being made. Excerpt here: > > > > public Set<TopicPartition> assignment() { > > > > acquire(); > > > > try { > > > > return Collections.unmodifiableSet(new HashSet<>(this. > > subscriptions.assignedPartitions())); > > > > } finally { > > > > release(); > > > > } > > > > } > > > > Ideally the API would take and return a Set instead of taking in a List > and > > returning a Set. Further given that the Set returned is a copy of the > > existing assignments, wrapping it in an unmodifiable set seems overkill > > which requires the user of the API to make yet another copy just to > modify > > what is already a copy. > > > > Thanks, > > Rajiv > > >