Re: Kafka 0.9 consumer API question

2015-12-17 Thread hsy...@gmail.com
Hi Rajiv,

I think it makes sense to return a read-only assignments. What we can
improve here is we can have addPartition method for
consumer.
Then we don't have to do any operations on the assignments returned by
assignment method

BTW,
I think you can implement PartitionAssignor interface to solve your use
case.
I couldn't find the javadoc for that interface but here is method you can
use

/**
 * Perform the group assignment given the member subscriptions and
current cluster metadata.
 * @param metadata Current topic/broker metadata known by consumer
 * @param subscriptions Subscriptions from all members provided through
{@link #subscription(Set)}
 * @return A map from the members to their respective assignment. This
should have one entry
 * for all members who in the input subscription map.
 */
Map assign(Cluster metadata, Map subscriptions);

The subscription map has each consumer's member id as key. It can be used
as a reference to the consumer and you can adjust the assignments there.




On Tue, Dec 15, 2015 at 2:53 PM, Rajiv Kurian  wrote:

> Hi Jason,
>
> The copying is not a problem in terms of performance. It's just annoying to
> write the extra code. My point with the copy is that since the client is
> already making a copy when it returns the set to me, why would it matter if
> I modify the copy. Creating an unmodifiable set on top of a copy seems
> redundant. It would be easiest for us as users to do something like this:
>
> final Set partitions = consumer.assignment();  // This
> already returns a copy of the underlying assignment, thus ensuring that the
> internal data structures are protected.
> partitions.add(myNewTopicPartition);  // This is fine to modify since
> consumer.assignment() returns a copy.
> partitions.remove(topicPartitionToBeRemoved);
> consumer.assign(partitions);
>
> Instead we have to do something like this right now.
>
> final Set partitions = consumer.assignment();  // This
> returns a copy of the underlying assignment wrapped in an UnmodifiableSet
> which seems redundant.
> final Set yetAnotherCopy = new HashSet<>(partitions);  //
> We need this copy since consumer.assignment() is unmodifiable, even though
> it is a copy.
> yetAnotherCopy.add(myNewTopicPartition);
> yetAnotherCopy.remove(topicPartitionToBeRemoved);
> List wayTooManyCopies = new ArrayList<>(yetAnotherCopy);
> consumer.assign(wayTooManyCopies);
>
> Thanks,
> Rajiv
>
>
> On Tue, Dec 15, 2015 at 2:35 PM, Jason Gustafson 
> wrote:
>
> > Hey Rajiv,
> >
> > I agree the Set/List inconsistency is a little unfortunate (another
> > annoying one is pause() which uses a vararg). I think we should probably
> > add the following variants:
> >
> > assign(Collection)
> > subscribe(Collection)
> > pause(Collection)
> >
> > I can open a JIRA to fix this. As for returning the unmodifiable set, I
> can
> > see your point, but I think it's a little dangerous for user code to
> depend
> > on being able to modify a collection returned from the API. Making it
> > immutable reduces the coupling with user code and gives us more freedom
> in
> > the future (not that we have any intention of changing the set type, but
> we
> > could). I think the way I might try to implement your use case would be
> to
> > maintain the assignment set yourself. You can make changes to that set
> and
> > always pass it to assign(), which would avoid the need to use
> assignment().
> > Also, I probably wouldn't be overly concerned about the copying overhead
> > unless profiling shows that it is actually a problem. Are your partition
> > assignments generally very large?
> >
> > -Jason
> >
> >
> > On Tue, Dec 15, 2015 at 1:32 PM, Rajiv Kurian 
> wrote:
> >
> > > We are trying to use the Kafka 0.9 consumer API to poll specific
> > > partitions. We consume partitions based on our own logic instead of
> > > delegating that to Kafka. One of our use cases is handling a change in
> > the
> > > partitions that we consume. This means that sometimes we need to
> consume
> > > additional partitions and other times we need to stop consuming (not
> > pause
> > > but stop entirely) some of the partitions that we are currently
> polling.
> > >
> > > The semantics of the assign() call at
> > >
> > >
> >
> http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
> > > is that we need to provide the entire list of subscriptions. So when we
> > > want to add or remove partitions we call the assignment() method to get
> > the
> > > existing set of TopicPartitions being polled, and then modify this set
> > and
> > > pass it back to the assign() call. However it seems weird that the
> > assign()
> > > call takes a List whereas the assignment call returns
> a
> > > Set. Further the Set returned by the method is an
> > > unmodifiable set which means to change this set we need to create a 

Re: Kafka 0.9 consumer API question

2015-12-15 Thread Jason Gustafson
Hey Rajiv,

I agree the Set/List inconsistency is a little unfortunate (another
annoying one is pause() which uses a vararg). I think we should probably
add the following variants:

assign(Collection)
subscribe(Collection)
pause(Collection)

I can open a JIRA to fix this. As for returning the unmodifiable set, I can
see your point, but I think it's a little dangerous for user code to depend
on being able to modify a collection returned from the API. Making it
immutable reduces the coupling with user code and gives us more freedom in
the future (not that we have any intention of changing the set type, but we
could). I think the way I might try to implement your use case would be to
maintain the assignment set yourself. You can make changes to that set and
always pass it to assign(), which would avoid the need to use assignment().
Also, I probably wouldn't be overly concerned about the copying overhead
unless profiling shows that it is actually a problem. Are your partition
assignments generally very large?

-Jason


On Tue, Dec 15, 2015 at 1:32 PM, Rajiv Kurian  wrote:

> We are trying to use the Kafka 0.9 consumer API to poll specific
> partitions. We consume partitions based on our own logic instead of
> delegating that to Kafka. One of our use cases is handling a change in the
> partitions that we consume. This means that sometimes we need to consume
> additional partitions and other times we need to stop consuming (not pause
> but stop entirely) some of the partitions that we are currently polling.
>
> The semantics of the assign() call at
>
> http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
> is that we need to provide the entire list of subscriptions. So when we
> want to add or remove partitions we call the assignment() method to get the
> existing set of TopicPartitions being polled, and then modify this set and
> pass it back to the assign() call. However it seems weird that the assign()
> call takes a List whereas the assignment call returns a
> Set. Further the Set returned by the method is an
> unmodifiable set which means to change this set we need to create a new
> List/Set from it and then modify the new collection. Looking at the code
> for the assignment() method further shows that a copy of the underlying set
> is made and then wrapped in an unmodifiable set. The wrapping seems
> unnecessary given that a copy is already being made. Excerpt here:
>
> public Set assignment() {
>
> acquire();
>
> try {
>
> return Collections.unmodifiableSet(new HashSet<>(this.
> subscriptions.assignedPartitions()));
>
> } finally {
>
> release();
>
> }
>
> }
>
> Ideally the API would take and return a Set instead of taking in a List and
> returning a Set. Further given that the Set returned is a copy of the
> existing assignments, wrapping it in an unmodifiable set seems overkill
> which requires the user of the API to make yet another copy just to modify
> what is already a copy.
>
> Thanks,
> Rajiv
>


Kafka 0.9 consumer API question

2015-12-15 Thread Rajiv Kurian
We are trying to use the Kafka 0.9 consumer API to poll specific
partitions. We consume partitions based on our own logic instead of
delegating that to Kafka. One of our use cases is handling a change in the
partitions that we consume. This means that sometimes we need to consume
additional partitions and other times we need to stop consuming (not pause
but stop entirely) some of the partitions that we are currently polling.

The semantics of the assign() call at
http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
is that we need to provide the entire list of subscriptions. So when we
want to add or remove partitions we call the assignment() method to get the
existing set of TopicPartitions being polled, and then modify this set and
pass it back to the assign() call. However it seems weird that the assign()
call takes a List whereas the assignment call returns a
Set. Further the Set returned by the method is an
unmodifiable set which means to change this set we need to create a new
List/Set from it and then modify the new collection. Looking at the code
for the assignment() method further shows that a copy of the underlying set
is made and then wrapped in an unmodifiable set. The wrapping seems
unnecessary given that a copy is already being made. Excerpt here:

public Set assignment() {

acquire();

try {

return Collections.unmodifiableSet(new HashSet<>(this.
subscriptions.assignedPartitions()));

} finally {

release();

}

}

Ideally the API would take and return a Set instead of taking in a List and
returning a Set. Further given that the Set returned is a copy of the
existing assignments, wrapping it in an unmodifiable set seems overkill
which requires the user of the API to make yet another copy just to modify
what is already a copy.

Thanks,
Rajiv


Re: Kafka 0.9 consumer API question

2015-12-15 Thread Jason Gustafson
Hey Rajiv,

My point was that you could maintain the assignment set yourself in a
field, which would eliminate the need to copy the set returned by
assignment(). Then it's just one copy to convert it to a list, and we can
fix this by adding the assign() variant I suggested above.

By the way, here's a link to the JIRA I created:
https://issues.apache.org/jira/browse/KAFKA-2991.

-Jason

On Tue, Dec 15, 2015 at 2:53 PM, Rajiv Kurian  wrote:

> Hi Jason,
>
> The copying is not a problem in terms of performance. It's just annoying to
> write the extra code. My point with the copy is that since the client is
> already making a copy when it returns the set to me, why would it matter if
> I modify the copy. Creating an unmodifiable set on top of a copy seems
> redundant. It would be easiest for us as users to do something like this:
>
> final Set partitions = consumer.assignment();  // This
> already returns a copy of the underlying assignment, thus ensuring that the
> internal data structures are protected.
> partitions.add(myNewTopicPartition);  // This is fine to modify since
> consumer.assignment() returns a copy.
> partitions.remove(topicPartitionToBeRemoved);
> consumer.assign(partitions);
>
> Instead we have to do something like this right now.
>
> final Set partitions = consumer.assignment();  // This
> returns a copy of the underlying assignment wrapped in an UnmodifiableSet
> which seems redundant.
> final Set yetAnotherCopy = new HashSet<>(partitions);  //
> We need this copy since consumer.assignment() is unmodifiable, even though
> it is a copy.
> yetAnotherCopy.add(myNewTopicPartition);
> yetAnotherCopy.remove(topicPartitionToBeRemoved);
> List wayTooManyCopies = new ArrayList<>(yetAnotherCopy);
> consumer.assign(wayTooManyCopies);
>
> Thanks,
> Rajiv
>
>
> On Tue, Dec 15, 2015 at 2:35 PM, Jason Gustafson 
> wrote:
>
> > Hey Rajiv,
> >
> > I agree the Set/List inconsistency is a little unfortunate (another
> > annoying one is pause() which uses a vararg). I think we should probably
> > add the following variants:
> >
> > assign(Collection)
> > subscribe(Collection)
> > pause(Collection)
> >
> > I can open a JIRA to fix this. As for returning the unmodifiable set, I
> can
> > see your point, but I think it's a little dangerous for user code to
> depend
> > on being able to modify a collection returned from the API. Making it
> > immutable reduces the coupling with user code and gives us more freedom
> in
> > the future (not that we have any intention of changing the set type, but
> we
> > could). I think the way I might try to implement your use case would be
> to
> > maintain the assignment set yourself. You can make changes to that set
> and
> > always pass it to assign(), which would avoid the need to use
> assignment().
> > Also, I probably wouldn't be overly concerned about the copying overhead
> > unless profiling shows that it is actually a problem. Are your partition
> > assignments generally very large?
> >
> > -Jason
> >
> >
> > On Tue, Dec 15, 2015 at 1:32 PM, Rajiv Kurian 
> wrote:
> >
> > > We are trying to use the Kafka 0.9 consumer API to poll specific
> > > partitions. We consume partitions based on our own logic instead of
> > > delegating that to Kafka. One of our use cases is handling a change in
> > the
> > > partitions that we consume. This means that sometimes we need to
> consume
> > > additional partitions and other times we need to stop consuming (not
> > pause
> > > but stop entirely) some of the partitions that we are currently
> polling.
> > >
> > > The semantics of the assign() call at
> > >
> > >
> >
> http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
> > > is that we need to provide the entire list of subscriptions. So when we
> > > want to add or remove partitions we call the assignment() method to get
> > the
> > > existing set of TopicPartitions being polled, and then modify this set
> > and
> > > pass it back to the assign() call. However it seems weird that the
> > assign()
> > > call takes a List whereas the assignment call returns
> a
> > > Set. Further the Set returned by the method is an
> > > unmodifiable set which means to change this set we need to create a new
> > > List/Set from it and then modify the new collection. Looking at the
> code
> > > for the assignment() method further shows that a copy of the underlying
> > set
> > > is made and then wrapped in an unmodifiable set. The wrapping seems
> > > unnecessary given that a copy is already being made. Excerpt here:
> > >
> > > public Set assignment() {
> > >
> > > acquire();
> > >
> > > try {
> > >
> > > return Collections.unmodifiableSet(new HashSet<>(this.
> > > subscriptions.assignedPartitions()));
> > >
> > > } finally {
> > >
> > > release();
> > >
> > > }
> > >
> > > }
> > >
> > > Ideally the API would take and return a Set 

Re: Kafka 0.9 consumer API question

2015-12-15 Thread Rajiv Kurian
Hi Jason,

The copying is not a problem in terms of performance. It's just annoying to
write the extra code. My point with the copy is that since the client is
already making a copy when it returns the set to me, why would it matter if
I modify the copy. Creating an unmodifiable set on top of a copy seems
redundant. It would be easiest for us as users to do something like this:

final Set partitions = consumer.assignment();  // This
already returns a copy of the underlying assignment, thus ensuring that the
internal data structures are protected.
partitions.add(myNewTopicPartition);  // This is fine to modify since
consumer.assignment() returns a copy.
partitions.remove(topicPartitionToBeRemoved);
consumer.assign(partitions);

Instead we have to do something like this right now.

final Set partitions = consumer.assignment();  // This
returns a copy of the underlying assignment wrapped in an UnmodifiableSet
which seems redundant.
final Set yetAnotherCopy = new HashSet<>(partitions);  //
We need this copy since consumer.assignment() is unmodifiable, even though
it is a copy.
yetAnotherCopy.add(myNewTopicPartition);
yetAnotherCopy.remove(topicPartitionToBeRemoved);
List wayTooManyCopies = new ArrayList<>(yetAnotherCopy);
consumer.assign(wayTooManyCopies);

Thanks,
Rajiv


On Tue, Dec 15, 2015 at 2:35 PM, Jason Gustafson  wrote:

> Hey Rajiv,
>
> I agree the Set/List inconsistency is a little unfortunate (another
> annoying one is pause() which uses a vararg). I think we should probably
> add the following variants:
>
> assign(Collection)
> subscribe(Collection)
> pause(Collection)
>
> I can open a JIRA to fix this. As for returning the unmodifiable set, I can
> see your point, but I think it's a little dangerous for user code to depend
> on being able to modify a collection returned from the API. Making it
> immutable reduces the coupling with user code and gives us more freedom in
> the future (not that we have any intention of changing the set type, but we
> could). I think the way I might try to implement your use case would be to
> maintain the assignment set yourself. You can make changes to that set and
> always pass it to assign(), which would avoid the need to use assignment().
> Also, I probably wouldn't be overly concerned about the copying overhead
> unless profiling shows that it is actually a problem. Are your partition
> assignments generally very large?
>
> -Jason
>
>
> On Tue, Dec 15, 2015 at 1:32 PM, Rajiv Kurian  wrote:
>
> > We are trying to use the Kafka 0.9 consumer API to poll specific
> > partitions. We consume partitions based on our own logic instead of
> > delegating that to Kafka. One of our use cases is handling a change in
> the
> > partitions that we consume. This means that sometimes we need to consume
> > additional partitions and other times we need to stop consuming (not
> pause
> > but stop entirely) some of the partitions that we are currently polling.
> >
> > The semantics of the assign() call at
> >
> >
> http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
> > is that we need to provide the entire list of subscriptions. So when we
> > want to add or remove partitions we call the assignment() method to get
> the
> > existing set of TopicPartitions being polled, and then modify this set
> and
> > pass it back to the assign() call. However it seems weird that the
> assign()
> > call takes a List whereas the assignment call returns a
> > Set. Further the Set returned by the method is an
> > unmodifiable set which means to change this set we need to create a new
> > List/Set from it and then modify the new collection. Looking at the code
> > for the assignment() method further shows that a copy of the underlying
> set
> > is made and then wrapped in an unmodifiable set. The wrapping seems
> > unnecessary given that a copy is already being made. Excerpt here:
> >
> > public Set assignment() {
> >
> > acquire();
> >
> > try {
> >
> > return Collections.unmodifiableSet(new HashSet<>(this.
> > subscriptions.assignedPartitions()));
> >
> > } finally {
> >
> > release();
> >
> > }
> >
> > }
> >
> > Ideally the API would take and return a Set instead of taking in a List
> and
> > returning a Set. Further given that the Set returned is a copy of the
> > existing assignments, wrapping it in an unmodifiable set seems overkill
> > which requires the user of the API to make yet another copy just to
> modify
> > what is already a copy.
> >
> > Thanks,
> > Rajiv
> >
>


Re: Kafka 0.9 consumer API question

2015-12-15 Thread Rajiv Kurian
Right I could do that. Thanks for creating the JIRA!


On Tue, Dec 15, 2015 at 3:01 PM, Jason Gustafson  wrote:

> Hey Rajiv,
>
> My point was that you could maintain the assignment set yourself in a
> field, which would eliminate the need to copy the set returned by
> assignment(). Then it's just one copy to convert it to a list, and we can
> fix this by adding the assign() variant I suggested above.
>
> By the way, here's a link to the JIRA I created:
> https://issues.apache.org/jira/browse/KAFKA-2991.
>
> -Jason
>
> On Tue, Dec 15, 2015 at 2:53 PM, Rajiv Kurian  wrote:
>
> > Hi Jason,
> >
> > The copying is not a problem in terms of performance. It's just annoying
> to
> > write the extra code. My point with the copy is that since the client is
> > already making a copy when it returns the set to me, why would it matter
> if
> > I modify the copy. Creating an unmodifiable set on top of a copy seems
> > redundant. It would be easiest for us as users to do something like this:
> >
> > final Set partitions = consumer.assignment();  // This
> > already returns a copy of the underlying assignment, thus ensuring that
> the
> > internal data structures are protected.
> > partitions.add(myNewTopicPartition);  // This is fine to modify since
> > consumer.assignment() returns a copy.
> > partitions.remove(topicPartitionToBeRemoved);
> > consumer.assign(partitions);
> >
> > Instead we have to do something like this right now.
> >
> > final Set partitions = consumer.assignment();  // This
> > returns a copy of the underlying assignment wrapped in an UnmodifiableSet
> > which seems redundant.
> > final Set yetAnotherCopy = new HashSet<>(partitions);  //
> > We need this copy since consumer.assignment() is unmodifiable, even
> though
> > it is a copy.
> > yetAnotherCopy.add(myNewTopicPartition);
> > yetAnotherCopy.remove(topicPartitionToBeRemoved);
> > List wayTooManyCopies = new ArrayList<>(yetAnotherCopy);
> > consumer.assign(wayTooManyCopies);
> >
> > Thanks,
> > Rajiv
> >
> >
> > On Tue, Dec 15, 2015 at 2:35 PM, Jason Gustafson 
> > wrote:
> >
> > > Hey Rajiv,
> > >
> > > I agree the Set/List inconsistency is a little unfortunate (another
> > > annoying one is pause() which uses a vararg). I think we should
> probably
> > > add the following variants:
> > >
> > > assign(Collection)
> > > subscribe(Collection)
> > > pause(Collection)
> > >
> > > I can open a JIRA to fix this. As for returning the unmodifiable set, I
> > can
> > > see your point, but I think it's a little dangerous for user code to
> > depend
> > > on being able to modify a collection returned from the API. Making it
> > > immutable reduces the coupling with user code and gives us more freedom
> > in
> > > the future (not that we have any intention of changing the set type,
> but
> > we
> > > could). I think the way I might try to implement your use case would be
> > to
> > > maintain the assignment set yourself. You can make changes to that set
> > and
> > > always pass it to assign(), which would avoid the need to use
> > assignment().
> > > Also, I probably wouldn't be overly concerned about the copying
> overhead
> > > unless profiling shows that it is actually a problem. Are your
> partition
> > > assignments generally very large?
> > >
> > > -Jason
> > >
> > >
> > > On Tue, Dec 15, 2015 at 1:32 PM, Rajiv Kurian 
> > wrote:
> > >
> > > > We are trying to use the Kafka 0.9 consumer API to poll specific
> > > > partitions. We consume partitions based on our own logic instead of
> > > > delegating that to Kafka. One of our use cases is handling a change
> in
> > > the
> > > > partitions that we consume. This means that sometimes we need to
> > consume
> > > > additional partitions and other times we need to stop consuming (not
> > > pause
> > > > but stop entirely) some of the partitions that we are currently
> > polling.
> > > >
> > > > The semantics of the assign() call at
> > > >
> > > >
> > >
> >
> http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
> > > > is that we need to provide the entire list of subscriptions. So when
> we
> > > > want to add or remove partitions we call the assignment() method to
> get
> > > the
> > > > existing set of TopicPartitions being polled, and then modify this
> set
> > > and
> > > > pass it back to the assign() call. However it seems weird that the
> > > assign()
> > > > call takes a List whereas the assignment call
> returns
> > a
> > > > Set. Further the Set returned by the method is an
> > > > unmodifiable set which means to change this set we need to create a
> new
> > > > List/Set from it and then modify the new collection. Looking at the
> > code
> > > > for the assignment() method further shows that a copy of the
> underlying
> > > set
> > > > is made and then wrapped in an unmodifiable set. The wrapping seems
> > > > unnecessary given that a copy is already being made.