Right I could do that. Thanks for creating the JIRA!

On Tue, Dec 15, 2015 at 3:01 PM, Jason Gustafson <ja...@confluent.io> wrote:

> Hey Rajiv,
>
> My point was that you could maintain the assignment set yourself in a
> field, which would eliminate the need to copy the set returned by
> assignment(). Then it's just one copy to convert it to a list, and we can
> fix this by adding the assign() variant I suggested above.
>
> By the way, here's a link to the JIRA I created:
> https://issues.apache.org/jira/browse/KAFKA-2991.
>
> -Jason
>
> On Tue, Dec 15, 2015 at 2:53 PM, Rajiv Kurian <ra...@signalfx.com> wrote:
>
> > Hi Jason,
> >
> > The copying is not a problem in terms of performance. It's just annoying
> to
> > write the extra code. My point with the copy is that since the client is
> > already making a copy when it returns the set to me, why would it matter
> if
> > I modify the copy. Creating an unmodifiable set on top of a copy seems
> > redundant. It would be easiest for us as users to do something like this:
> >
> > final Set<TopicPartition> partitions = consumer.assignment();  // This
> > already returns a copy of the underlying assignment, thus ensuring that
> the
> > internal data structures are protected.
> > partitions.add(myNewTopicPartition);  // This is fine to modify since
> > consumer.assignment() returns a copy.
> > partitions.remove(topicPartitionToBeRemoved);
> > consumer.assign(partitions);
> >
> > Instead we have to do something like this right now.
> >
> > final Set<TopicPartition> partitions = consumer.assignment();  // This
> > returns a copy of the underlying assignment wrapped in an UnmodifiableSet
> > which seems redundant.
> > final Set<TopicPartition> yetAnotherCopy = new HashSet<>(partitions);  //
> > We need this copy since consumer.assignment() is unmodifiable, even
> though
> > it is a copy.
> > yetAnotherCopy.add(myNewTopicPartition);
> > yetAnotherCopy.remove(topicPartitionToBeRemoved);
> > List<TopicPartition> wayTooManyCopies = new ArrayList<>(yetAnotherCopy);
> > consumer.assign(wayTooManyCopies);
> >
> > Thanks,
> > Rajiv
> >
> >
> > On Tue, Dec 15, 2015 at 2:35 PM, Jason Gustafson <ja...@confluent.io>
> > wrote:
> >
> > > Hey Rajiv,
> > >
> > > I agree the Set/List inconsistency is a little unfortunate (another
> > > annoying one is pause() which uses a vararg). I think we should
> probably
> > > add the following variants:
> > >
> > > assign(Collection<TopicPartition>)
> > > subscribe(Collection<String>)
> > > pause(Collection<TopicPartition>)
> > >
> > > I can open a JIRA to fix this. As for returning the unmodifiable set, I
> > can
> > > see your point, but I think it's a little dangerous for user code to
> > depend
> > > on being able to modify a collection returned from the API. Making it
> > > immutable reduces the coupling with user code and gives us more freedom
> > in
> > > the future (not that we have any intention of changing the set type,
> but
> > we
> > > could). I think the way I might try to implement your use case would be
> > to
> > > maintain the assignment set yourself. You can make changes to that set
> > and
> > > always pass it to assign(), which would avoid the need to use
> > assignment().
> > > Also, I probably wouldn't be overly concerned about the copying
> overhead
> > > unless profiling shows that it is actually a problem. Are your
> partition
> > > assignments generally very large?
> > >
> > > -Jason
> > >
> > >
> > > On Tue, Dec 15, 2015 at 1:32 PM, Rajiv Kurian <ra...@signalfx.com>
> > wrote:
> > >
> > > > We are trying to use the Kafka 0.9 consumer API to poll specific
> > > > partitions. We consume partitions based on our own logic instead of
> > > > delegating that to Kafka. One of our use cases is handling a change
> in
> > > the
> > > > partitions that we consume. This means that sometimes we need to
> > consume
> > > > additional partitions and other times we need to stop consuming (not
> > > pause
> > > > but stop entirely) some of the partitions that we are currently
> > polling.
> > > >
> > > > The semantics of the assign() call at
> > > >
> > > >
> > >
> >
> http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
> > > > is that we need to provide the entire list of subscriptions. So when
> we
> > > > want to add or remove partitions we call the assignment() method to
> get
> > > the
> > > > existing set of TopicPartitions being polled, and then modify this
> set
> > > and
> > > > pass it back to the assign() call. However it seems weird that the
> > > assign()
> > > > call takes a List<TopicPartitions> whereas the assignment call
> returns
> > a
> > > > Set<TopicPartitions>. Further the Set returned by the method is an
> > > > unmodifiable set which means to change this set we need to create a
> new
> > > > List/Set from it and then modify the new collection. Looking at the
> > code
> > > > for the assignment() method further shows that a copy of the
> underlying
> > > set
> > > > is made and then wrapped in an unmodifiable set. The wrapping seems
> > > > unnecessary given that a copy is already being made. Excerpt here:
> > > >
> > > > public Set<TopicPartition> assignment() {
> > > >
> > > >         acquire();
> > > >
> > > >         try {
> > > >
> > > >             return Collections.unmodifiableSet(new HashSet<>(this.
> > > > subscriptions.assignedPartitions()));
> > > >
> > > >         } finally {
> > > >
> > > >             release();
> > > >
> > > >         }
> > > >
> > > >     }
> > > >
> > > > Ideally the API would take and return a Set instead of taking in a
> List
> > > and
> > > > returning a Set. Further given that the Set returned is a copy of the
> > > > existing assignments, wrapping it in an unmodifiable set seems
> overkill
> > > > which requires the user of the API to make yet another copy just to
> > > modify
> > > > what is already a copy.
> > > >
> > > > Thanks,
> > > > Rajiv
> > > >
> > >
> >
>

Reply via email to