Hi Jason,

The copying is not a problem in terms of performance. It's just annoying to
write the extra code. My point with the copy is that since the client is
already making a copy when it returns the set to me, why would it matter if
I modify the copy. Creating an unmodifiable set on top of a copy seems
redundant. It would be easiest for us as users to do something like this:

final Set<TopicPartition> partitions = consumer.assignment();  // This
already returns a copy of the underlying assignment, thus ensuring that the
internal data structures are protected.
partitions.add(myNewTopicPartition);  // This is fine to modify since
consumer.assignment() returns a copy.
partitions.remove(topicPartitionToBeRemoved);
consumer.assign(partitions);

Instead we have to do something like this right now.

final Set<TopicPartition> partitions = consumer.assignment();  // This
returns a copy of the underlying assignment wrapped in an UnmodifiableSet
which seems redundant.
final Set<TopicPartition> yetAnotherCopy = new HashSet<>(partitions);  //
We need this copy since consumer.assignment() is unmodifiable, even though
it is a copy.
yetAnotherCopy.add(myNewTopicPartition);
yetAnotherCopy.remove(topicPartitionToBeRemoved);
List<TopicPartition> wayTooManyCopies = new ArrayList<>(yetAnotherCopy);
consumer.assign(wayTooManyCopies);

Thanks,
Rajiv


On Tue, Dec 15, 2015 at 2:35 PM, Jason Gustafson <ja...@confluent.io> wrote:

> Hey Rajiv,
>
> I agree the Set/List inconsistency is a little unfortunate (another
> annoying one is pause() which uses a vararg). I think we should probably
> add the following variants:
>
> assign(Collection<TopicPartition>)
> subscribe(Collection<String>)
> pause(Collection<TopicPartition>)
>
> I can open a JIRA to fix this. As for returning the unmodifiable set, I can
> see your point, but I think it's a little dangerous for user code to depend
> on being able to modify a collection returned from the API. Making it
> immutable reduces the coupling with user code and gives us more freedom in
> the future (not that we have any intention of changing the set type, but we
> could). I think the way I might try to implement your use case would be to
> maintain the assignment set yourself. You can make changes to that set and
> always pass it to assign(), which would avoid the need to use assignment().
> Also, I probably wouldn't be overly concerned about the copying overhead
> unless profiling shows that it is actually a problem. Are your partition
> assignments generally very large?
>
> -Jason
>
>
> On Tue, Dec 15, 2015 at 1:32 PM, Rajiv Kurian <ra...@signalfx.com> wrote:
>
> > We are trying to use the Kafka 0.9 consumer API to poll specific
> > partitions. We consume partitions based on our own logic instead of
> > delegating that to Kafka. One of our use cases is handling a change in
> the
> > partitions that we consume. This means that sometimes we need to consume
> > additional partitions and other times we need to stop consuming (not
> pause
> > but stop entirely) some of the partitions that we are currently polling.
> >
> > The semantics of the assign() call at
> >
> >
> http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
> > is that we need to provide the entire list of subscriptions. So when we
> > want to add or remove partitions we call the assignment() method to get
> the
> > existing set of TopicPartitions being polled, and then modify this set
> and
> > pass it back to the assign() call. However it seems weird that the
> assign()
> > call takes a List<TopicPartitions> whereas the assignment call returns a
> > Set<TopicPartitions>. Further the Set returned by the method is an
> > unmodifiable set which means to change this set we need to create a new
> > List/Set from it and then modify the new collection. Looking at the code
> > for the assignment() method further shows that a copy of the underlying
> set
> > is made and then wrapped in an unmodifiable set. The wrapping seems
> > unnecessary given that a copy is already being made. Excerpt here:
> >
> > public Set<TopicPartition> assignment() {
> >
> >         acquire();
> >
> >         try {
> >
> >             return Collections.unmodifiableSet(new HashSet<>(this.
> > subscriptions.assignedPartitions()));
> >
> >         } finally {
> >
> >             release();
> >
> >         }
> >
> >     }
> >
> > Ideally the API would take and return a Set instead of taking in a List
> and
> > returning a Set. Further given that the Set returned is a copy of the
> > existing assignments, wrapping it in an unmodifiable set seems overkill
> > which requires the user of the API to make yet another copy just to
> modify
> > what is already a copy.
> >
> > Thanks,
> > Rajiv
> >
>

Reply via email to