Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-09-04 Thread Boyang Chen
>From offline discussion, the eventual conclusion is to use a top-level
Consumer#getMetadata() API to fetch the latest group metadata information
for offset fencing, so that we only call initTransaction once in lifetime.

Since no further question is raised on this thread, I will start vote
today. In the meantime, still feel free to make comments on the discussion
thread, thank you!

Boyang

On Sun, Aug 25, 2019 at 11:43 PM Boyang Chen 
wrote:

> Hey Guozhang and Jason,
>
> I'm ok with either way. Thinking of Guozhang's approach, it is simpler to
> implement a consumer-producer if we avoid callback pattern and only do the
> group metadata initialization once, however the access pattern of consumer
> rebalance state is scattered, which means we get both rebalance listener
> and metadata getter. Jason's approach overloaded the initTransactions API,
> which could be more confusing as it already has been today. Comparing the
> two here, I'm inclined to Guozhang's approach as it is not conclusive to
> say a new metadata getter class will confuse any user, with a sacrifice in
> the cleanness of future implementation around consumer state. WDYT?
>
> Boyang
>
> On Wed, Aug 14, 2019 at 10:45 AM Guozhang Wang  wrote:
>
>> My main concern is to require the overloaded `initTransactions` to be
>> called repeatedly while the original `initTransactions` still called once
>> throughout the life time, which is a bit confusing.
>>
>> Looking into the current POC PR, we actually only need the latest
>> generation id when fetching offsets, so we can just make the GroupMetadata
>> returned from the consumer a wrapper of the underlying values, and the
>> getters of this object would always return the latest value.
>> The values would be reset internally within the rebalances; and then the
>> new `initTransactions` would still only be called once.
>>
>> Guozhang
>>
>>
>> On Wed, Aug 14, 2019 at 9:53 AM Jason Gustafson 
>> wrote:
>>
>> > Yeah, my reasoning is that the group metadata is only relevant to the
>> > subscription API. So it makes sense to only expose it to the rebalance
>> > listener.
>> >
>> > One option we could consider is bring back the `initTransactions`
>> overload.
>> > Then usage looks something like this:
>> >
>> > consumer.subscribe(topics, new RebalanceListener() {
>> >   void onGroupJoined(GroupMetadata metadata) {
>> > producer.initTransactions(metadata);
>> >   }
>> > }
>> >
>> > That seems pretty clean. What do you think?
>> >
>> > -Jason
>> >
>> > On Tue, Aug 13, 2019 at 6:07 PM Boyang Chen > >
>> > wrote:
>> >
>> > > Hey Guozhang,
>> > >
>> > > thanks for the suggestion. Could you elaborate more on why defining a
>> > > direct consumer API would be easier? The benefit of reusing consumer
>> > > rebalance listener is to consolidate the entry point of consumer
>> internal
>> > > states. Compared with letting consumer generate a deep-copy of
>> metadata
>> > > every time we call #sendOffsetsToTransactions, using a callback seems
>> > > reducing unnecessary updates towards the metadata. WDYT?
>> > >
>> > > Boyang
>> > >
>> > > On Tue, Aug 13, 2019 at 2:14 PM Guozhang Wang 
>> > wrote:
>> > >
>> > > > Hi Boyang, Jason,
>> > > >
>> > > > If we are going to expose the generation id / group.instance id etc
>> > > anyways
>> > > > I think its slightly better to just add a new API on KafkaConsumer
>> > > > returning the ConsumerGroupMetadata (option 3) than passing it in
>> on an
>> > > > additional callback of ConsumerRebalanceListener.
>> > > > It feels easier to leverage, than requiring users to pass in the
>> > > listener.
>> > > >
>> > > > Guozhang
>> > > >
>> > > > On Mon, Aug 12, 2019 at 3:41 PM Boyang Chen <
>> > reluctanthero...@gmail.com>
>> > > > wrote:
>> > > >
>> > > > > Thanks Jason, the intuition behind defining a separate callback
>> > > function
>> > > > is
>> > > > > that, with KIP-429 we no longer guarantee to call
>> > > OnPartitionsAssigned()
>> > > > or
>> > > > > OnPartitionsRevoked() with each rebalance. Our requirement is to
>> be
>> > > > > up-to-date with group metadata such as generation information, so
>> > > > callback
>> > > > > like onGroupJoined() would make more sense as it should be invoked
>> > > after
>> > > > > every successful rebalance.
>> > > > >
>> > > > > Best,
>> > > > > Boyang
>> > > > >
>> > > > > On Mon, Aug 12, 2019 at 2:02 PM Jason Gustafson <
>> ja...@confluent.io>
>> > > > > wrote:
>> > > > >
>> > > > > > Hey Boyang,
>> > > > > >
>> > > > > > I favor option 4 as well. It's a little more cumbersome than 3
>> for
>> > > this
>> > > > > use
>> > > > > > case, but it seems like a cleaner separation of concerns. The
>> > > rebalance
>> > > > > > listener is already concerned with events affecting the
>> assignment
>> > > > > > lifecycle and group membership. I think the only thing I'm
>> > wondering
>> > > is
>> > > > > > whether it should be a separate callback as you've suggested,
>> or if
>> > > it
>> > > > > > would make sense to overload 

Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-08-26 Thread Boyang Chen
Hey Guozhang and Jason,

I'm ok with either way. Thinking of Guozhang's approach, it is simpler to
implement a consumer-producer if we avoid callback pattern and only do the
group metadata initialization once, however the access pattern of consumer
rebalance state is scattered, which means we get both rebalance listener
and metadata getter. Jason's approach overloaded the initTransactions API,
which could be more confusing as it already has been today. Comparing the
two here, I'm inclined to Guozhang's approach as it is not conclusive to
say a new metadata getter class will confuse any user, with a sacrifice in
the cleanness of future implementation around consumer state. WDYT?

Boyang

On Wed, Aug 14, 2019 at 10:45 AM Guozhang Wang  wrote:

> My main concern is to require the overloaded `initTransactions` to be
> called repeatedly while the original `initTransactions` still called once
> throughout the life time, which is a bit confusing.
>
> Looking into the current POC PR, we actually only need the latest
> generation id when fetching offsets, so we can just make the GroupMetadata
> returned from the consumer a wrapper of the underlying values, and the
> getters of this object would always return the latest value.
> The values would be reset internally within the rebalances; and then the
> new `initTransactions` would still only be called once.
>
> Guozhang
>
>
> On Wed, Aug 14, 2019 at 9:53 AM Jason Gustafson 
> wrote:
>
> > Yeah, my reasoning is that the group metadata is only relevant to the
> > subscription API. So it makes sense to only expose it to the rebalance
> > listener.
> >
> > One option we could consider is bring back the `initTransactions`
> overload.
> > Then usage looks something like this:
> >
> > consumer.subscribe(topics, new RebalanceListener() {
> >   void onGroupJoined(GroupMetadata metadata) {
> > producer.initTransactions(metadata);
> >   }
> > }
> >
> > That seems pretty clean. What do you think?
> >
> > -Jason
> >
> > On Tue, Aug 13, 2019 at 6:07 PM Boyang Chen 
> > wrote:
> >
> > > Hey Guozhang,
> > >
> > > thanks for the suggestion. Could you elaborate more on why defining a
> > > direct consumer API would be easier? The benefit of reusing consumer
> > > rebalance listener is to consolidate the entry point of consumer
> internal
> > > states. Compared with letting consumer generate a deep-copy of metadata
> > > every time we call #sendOffsetsToTransactions, using a callback seems
> > > reducing unnecessary updates towards the metadata. WDYT?
> > >
> > > Boyang
> > >
> > > On Tue, Aug 13, 2019 at 2:14 PM Guozhang Wang 
> > wrote:
> > >
> > > > Hi Boyang, Jason,
> > > >
> > > > If we are going to expose the generation id / group.instance id etc
> > > anyways
> > > > I think its slightly better to just add a new API on KafkaConsumer
> > > > returning the ConsumerGroupMetadata (option 3) than passing it in on
> an
> > > > additional callback of ConsumerRebalanceListener.
> > > > It feels easier to leverage, than requiring users to pass in the
> > > listener.
> > > >
> > > > Guozhang
> > > >
> > > > On Mon, Aug 12, 2019 at 3:41 PM Boyang Chen <
> > reluctanthero...@gmail.com>
> > > > wrote:
> > > >
> > > > > Thanks Jason, the intuition behind defining a separate callback
> > > function
> > > > is
> > > > > that, with KIP-429 we no longer guarantee to call
> > > OnPartitionsAssigned()
> > > > or
> > > > > OnPartitionsRevoked() with each rebalance. Our requirement is to be
> > > > > up-to-date with group metadata such as generation information, so
> > > > callback
> > > > > like onGroupJoined() would make more sense as it should be invoked
> > > after
> > > > > every successful rebalance.
> > > > >
> > > > > Best,
> > > > > Boyang
> > > > >
> > > > > On Mon, Aug 12, 2019 at 2:02 PM Jason Gustafson <
> ja...@confluent.io>
> > > > > wrote:
> > > > >
> > > > > > Hey Boyang,
> > > > > >
> > > > > > I favor option 4 as well. It's a little more cumbersome than 3
> for
> > > this
> > > > > use
> > > > > > case, but it seems like a cleaner separation of concerns. The
> > > rebalance
> > > > > > listener is already concerned with events affecting the
> assignment
> > > > > > lifecycle and group membership. I think the only thing I'm
> > wondering
> > > is
> > > > > > whether it should be a separate callback as you've suggested, or
> if
> > > it
> > > > > > would make sense to overload `onPartitionsAssigned`. If it's
> > > separate,
> > > > > > maybe a name like `onGroupJoined` would be clearer?
> > > > > >
> > > > > > Thanks,
> > > > > > Jason
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Thu, Aug 8, 2019 at 10:59 PM Boyang Chen <
> > > > reluctanthero...@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Thank you Jason. We had some offline discussion on properly
> > keeping
> > > > > group
> > > > > > > metadata up to date, and here are some of our options
> > brainstormed:
> > > > > > > 1. Let the caller of `sendOffsetsToTransaction(offset,
> metadata)`
> > > > > > 

Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-08-14 Thread Guozhang Wang
My main concern is to require the overloaded `initTransactions` to be
called repeatedly while the original `initTransactions` still called once
throughout the life time, which is a bit confusing.

Looking into the current POC PR, we actually only need the latest
generation id when fetching offsets, so we can just make the GroupMetadata
returned from the consumer a wrapper of the underlying values, and the
getters of this object would always return the latest value.
The values would be reset internally within the rebalances; and then the
new `initTransactions` would still only be called once.

Guozhang


On Wed, Aug 14, 2019 at 9:53 AM Jason Gustafson  wrote:

> Yeah, my reasoning is that the group metadata is only relevant to the
> subscription API. So it makes sense to only expose it to the rebalance
> listener.
>
> One option we could consider is bring back the `initTransactions` overload.
> Then usage looks something like this:
>
> consumer.subscribe(topics, new RebalanceListener() {
>   void onGroupJoined(GroupMetadata metadata) {
> producer.initTransactions(metadata);
>   }
> }
>
> That seems pretty clean. What do you think?
>
> -Jason
>
> On Tue, Aug 13, 2019 at 6:07 PM Boyang Chen 
> wrote:
>
> > Hey Guozhang,
> >
> > thanks for the suggestion. Could you elaborate more on why defining a
> > direct consumer API would be easier? The benefit of reusing consumer
> > rebalance listener is to consolidate the entry point of consumer internal
> > states. Compared with letting consumer generate a deep-copy of metadata
> > every time we call #sendOffsetsToTransactions, using a callback seems
> > reducing unnecessary updates towards the metadata. WDYT?
> >
> > Boyang
> >
> > On Tue, Aug 13, 2019 at 2:14 PM Guozhang Wang 
> wrote:
> >
> > > Hi Boyang, Jason,
> > >
> > > If we are going to expose the generation id / group.instance id etc
> > anyways
> > > I think its slightly better to just add a new API on KafkaConsumer
> > > returning the ConsumerGroupMetadata (option 3) than passing it in on an
> > > additional callback of ConsumerRebalanceListener.
> > > It feels easier to leverage, than requiring users to pass in the
> > listener.
> > >
> > > Guozhang
> > >
> > > On Mon, Aug 12, 2019 at 3:41 PM Boyang Chen <
> reluctanthero...@gmail.com>
> > > wrote:
> > >
> > > > Thanks Jason, the intuition behind defining a separate callback
> > function
> > > is
> > > > that, with KIP-429 we no longer guarantee to call
> > OnPartitionsAssigned()
> > > or
> > > > OnPartitionsRevoked() with each rebalance. Our requirement is to be
> > > > up-to-date with group metadata such as generation information, so
> > > callback
> > > > like onGroupJoined() would make more sense as it should be invoked
> > after
> > > > every successful rebalance.
> > > >
> > > > Best,
> > > > Boyang
> > > >
> > > > On Mon, Aug 12, 2019 at 2:02 PM Jason Gustafson 
> > > > wrote:
> > > >
> > > > > Hey Boyang,
> > > > >
> > > > > I favor option 4 as well. It's a little more cumbersome than 3 for
> > this
> > > > use
> > > > > case, but it seems like a cleaner separation of concerns. The
> > rebalance
> > > > > listener is already concerned with events affecting the assignment
> > > > > lifecycle and group membership. I think the only thing I'm
> wondering
> > is
> > > > > whether it should be a separate callback as you've suggested, or if
> > it
> > > > > would make sense to overload `onPartitionsAssigned`. If it's
> > separate,
> > > > > maybe a name like `onGroupJoined` would be clearer?
> > > > >
> > > > > Thanks,
> > > > > Jason
> > > > >
> > > > >
> > > > >
> > > > > On Thu, Aug 8, 2019 at 10:59 PM Boyang Chen <
> > > reluctanthero...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Thank you Jason. We had some offline discussion on properly
> keeping
> > > > group
> > > > > > metadata up to date, and here are some of our options
> brainstormed:
> > > > > > 1. Let the caller of `sendOffsetsToTransaction(offset, metadata)`
> > > > > maintain
> > > > > > the ever-changing group metadata. This could be done on stream
> > side,
> > > > but
> > > > > > for non-stream EOS the sample code will become complicated as the
> > > user
> > > > > > needs to implement the partition assignor interface to get the
> > update
> > > > > from
> > > > > > `onAssignment`
> > > > > >
> > > > > > 2. Get a new API on producer like
> `refreshGroupMetadata(metadata)`.
> > > > This
> > > > > is
> > > > > > similar to option 1 except that now in the partition assignor
> > > callback
> > > > we
> > > > > > could straightly pass in the producer instance, which simplifies
> > the
> > > > > > non-stream EOS, however this new API seems weird to define on
> > > producer.
> > > > > >
> > > > > > 3. Make an accessing interface to group metadata, or just expose
> > the
> > > > > group
> > > > > > metadata through a consumer API like `consumer.GroupMetadata()`.
> > This
> > > > is
> > > > > > the old way which avoids the users’ effort to implement partition
> > > > > assignor
> > 

Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-08-14 Thread Jason Gustafson
Yeah, my reasoning is that the group metadata is only relevant to the
subscription API. So it makes sense to only expose it to the rebalance
listener.

One option we could consider is bring back the `initTransactions` overload.
Then usage looks something like this:

consumer.subscribe(topics, new RebalanceListener() {
  void onGroupJoined(GroupMetadata metadata) {
producer.initTransactions(metadata);
  }
}

That seems pretty clean. What do you think?

-Jason

On Tue, Aug 13, 2019 at 6:07 PM Boyang Chen 
wrote:

> Hey Guozhang,
>
> thanks for the suggestion. Could you elaborate more on why defining a
> direct consumer API would be easier? The benefit of reusing consumer
> rebalance listener is to consolidate the entry point of consumer internal
> states. Compared with letting consumer generate a deep-copy of metadata
> every time we call #sendOffsetsToTransactions, using a callback seems
> reducing unnecessary updates towards the metadata. WDYT?
>
> Boyang
>
> On Tue, Aug 13, 2019 at 2:14 PM Guozhang Wang  wrote:
>
> > Hi Boyang, Jason,
> >
> > If we are going to expose the generation id / group.instance id etc
> anyways
> > I think its slightly better to just add a new API on KafkaConsumer
> > returning the ConsumerGroupMetadata (option 3) than passing it in on an
> > additional callback of ConsumerRebalanceListener.
> > It feels easier to leverage, than requiring users to pass in the
> listener.
> >
> > Guozhang
> >
> > On Mon, Aug 12, 2019 at 3:41 PM Boyang Chen 
> > wrote:
> >
> > > Thanks Jason, the intuition behind defining a separate callback
> function
> > is
> > > that, with KIP-429 we no longer guarantee to call
> OnPartitionsAssigned()
> > or
> > > OnPartitionsRevoked() with each rebalance. Our requirement is to be
> > > up-to-date with group metadata such as generation information, so
> > callback
> > > like onGroupJoined() would make more sense as it should be invoked
> after
> > > every successful rebalance.
> > >
> > > Best,
> > > Boyang
> > >
> > > On Mon, Aug 12, 2019 at 2:02 PM Jason Gustafson 
> > > wrote:
> > >
> > > > Hey Boyang,
> > > >
> > > > I favor option 4 as well. It's a little more cumbersome than 3 for
> this
> > > use
> > > > case, but it seems like a cleaner separation of concerns. The
> rebalance
> > > > listener is already concerned with events affecting the assignment
> > > > lifecycle and group membership. I think the only thing I'm wondering
> is
> > > > whether it should be a separate callback as you've suggested, or if
> it
> > > > would make sense to overload `onPartitionsAssigned`. If it's
> separate,
> > > > maybe a name like `onGroupJoined` would be clearer?
> > > >
> > > > Thanks,
> > > > Jason
> > > >
> > > >
> > > >
> > > > On Thu, Aug 8, 2019 at 10:59 PM Boyang Chen <
> > reluctanthero...@gmail.com>
> > > > wrote:
> > > >
> > > > > Thank you Jason. We had some offline discussion on properly keeping
> > > group
> > > > > metadata up to date, and here are some of our options brainstormed:
> > > > > 1. Let the caller of `sendOffsetsToTransaction(offset, metadata)`
> > > > maintain
> > > > > the ever-changing group metadata. This could be done on stream
> side,
> > > but
> > > > > for non-stream EOS the sample code will become complicated as the
> > user
> > > > > needs to implement the partition assignor interface to get the
> update
> > > > from
> > > > > `onAssignment`
> > > > >
> > > > > 2. Get a new API on producer like `refreshGroupMetadata(metadata)`.
> > > This
> > > > is
> > > > > similar to option 1 except that now in the partition assignor
> > callback
> > > we
> > > > > could straightly pass in the producer instance, which simplifies
> the
> > > > > non-stream EOS, however this new API seems weird to define on
> > producer.
> > > > >
> > > > > 3. Make an accessing interface to group metadata, or just expose
> the
> > > > group
> > > > > metadata through a consumer API like `consumer.GroupMetadata()`.
> This
> > > is
> > > > > the old way which avoids the users’ effort to implement partition
> > > > assignor
> > > > > directly.
> > > > >
> > > > > 4. Expose the group metadata through rebalance listener, which is a
> > > more
> > > > > well-known and adopted callback interface. We could do sth like
> > > > > `onGroupMetadataUpdated(ConsumerGroupMetadata metadata)`
> > > > >
> > > > > To simplify the code logic, we believe option 3 & 4 are better
> > > solutions,
> > > > > and of which I slightly prefer option 4 as it is the most clean
> > > solution
> > > > > with less intrusion to both consumer and producer APIs.
> > > > >
> > > > > WDYT?
> > > > >
> > > > > Boyang
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On Wed, Aug 7, 2019 at 9:20 AM Jason Gustafson  >
> > > > wrote:
> > > > >
> > > > > > Hi Boyang,
> > > > > >
> > > > > > > We already persist member.id, instance.id and generation.id in
> > the
> > > > > > offset
> > > > > > topic, what extra fields we need to store?
> > > > > >
> > > > > > Yeah, you're right. I was a little confused and 

Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-08-13 Thread Boyang Chen
Hey Guozhang,

thanks for the suggestion. Could you elaborate more on why defining a
direct consumer API would be easier? The benefit of reusing consumer
rebalance listener is to consolidate the entry point of consumer internal
states. Compared with letting consumer generate a deep-copy of metadata
every time we call #sendOffsetsToTransactions, using a callback seems
reducing unnecessary updates towards the metadata. WDYT?

Boyang

On Tue, Aug 13, 2019 at 2:14 PM Guozhang Wang  wrote:

> Hi Boyang, Jason,
>
> If we are going to expose the generation id / group.instance id etc anyways
> I think its slightly better to just add a new API on KafkaConsumer
> returning the ConsumerGroupMetadata (option 3) than passing it in on an
> additional callback of ConsumerRebalanceListener.
> It feels easier to leverage, than requiring users to pass in the listener.
>
> Guozhang
>
> On Mon, Aug 12, 2019 at 3:41 PM Boyang Chen 
> wrote:
>
> > Thanks Jason, the intuition behind defining a separate callback function
> is
> > that, with KIP-429 we no longer guarantee to call OnPartitionsAssigned()
> or
> > OnPartitionsRevoked() with each rebalance. Our requirement is to be
> > up-to-date with group metadata such as generation information, so
> callback
> > like onGroupJoined() would make more sense as it should be invoked after
> > every successful rebalance.
> >
> > Best,
> > Boyang
> >
> > On Mon, Aug 12, 2019 at 2:02 PM Jason Gustafson 
> > wrote:
> >
> > > Hey Boyang,
> > >
> > > I favor option 4 as well. It's a little more cumbersome than 3 for this
> > use
> > > case, but it seems like a cleaner separation of concerns. The rebalance
> > > listener is already concerned with events affecting the assignment
> > > lifecycle and group membership. I think the only thing I'm wondering is
> > > whether it should be a separate callback as you've suggested, or if it
> > > would make sense to overload `onPartitionsAssigned`. If it's separate,
> > > maybe a name like `onGroupJoined` would be clearer?
> > >
> > > Thanks,
> > > Jason
> > >
> > >
> > >
> > > On Thu, Aug 8, 2019 at 10:59 PM Boyang Chen <
> reluctanthero...@gmail.com>
> > > wrote:
> > >
> > > > Thank you Jason. We had some offline discussion on properly keeping
> > group
> > > > metadata up to date, and here are some of our options brainstormed:
> > > > 1. Let the caller of `sendOffsetsToTransaction(offset, metadata)`
> > > maintain
> > > > the ever-changing group metadata. This could be done on stream side,
> > but
> > > > for non-stream EOS the sample code will become complicated as the
> user
> > > > needs to implement the partition assignor interface to get the update
> > > from
> > > > `onAssignment`
> > > >
> > > > 2. Get a new API on producer like `refreshGroupMetadata(metadata)`.
> > This
> > > is
> > > > similar to option 1 except that now in the partition assignor
> callback
> > we
> > > > could straightly pass in the producer instance, which simplifies the
> > > > non-stream EOS, however this new API seems weird to define on
> producer.
> > > >
> > > > 3. Make an accessing interface to group metadata, or just expose the
> > > group
> > > > metadata through a consumer API like `consumer.GroupMetadata()`. This
> > is
> > > > the old way which avoids the users’ effort to implement partition
> > > assignor
> > > > directly.
> > > >
> > > > 4. Expose the group metadata through rebalance listener, which is a
> > more
> > > > well-known and adopted callback interface. We could do sth like
> > > > `onGroupMetadataUpdated(ConsumerGroupMetadata metadata)`
> > > >
> > > > To simplify the code logic, we believe option 3 & 4 are better
> > solutions,
> > > > and of which I slightly prefer option 4 as it is the most clean
> > solution
> > > > with less intrusion to both consumer and producer APIs.
> > > >
> > > > WDYT?
> > > >
> > > > Boyang
> > > >
> > > >
> > > >
> > > >
> > > > On Wed, Aug 7, 2019 at 9:20 AM Jason Gustafson 
> > > wrote:
> > > >
> > > > > Hi Boyang,
> > > > >
> > > > > > We already persist member.id, instance.id and generation.id in
> the
> > > > > offset
> > > > > topic, what extra fields we need to store?
> > > > >
> > > > > Yeah, you're right. I was a little confused and thought this
> > > information
> > > > > was needed by the transaction coordinator.
> > > > >
> > > > > > This should be easily done on the stream side as we have
> > > > > StreamsPartitionAssignor to reflect metadata changes upon
> > > > #onAssignment(),
> > > > > but non-stream user has to code the callback by hand, do you think
> > the
> > > > > convenience we sacrifice here worth the simplification benefit?
> > > > >
> > > > > Either way, you need a reference to the consumer. I was mostly just
> > > > > thinking it would be better to reduce the integration point to its
> > > > minimum.
> > > > > Have you thought through the implications of needing to keep
> around a
> > > > > reference to the consumer in the producer? What if it gets closed?
> It
> > > > seems
> > > > > better 

Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-08-13 Thread Guozhang Wang
Hi Boyang, Jason,

If we are going to expose the generation id / group.instance id etc anyways
I think its slightly better to just add a new API on KafkaConsumer
returning the ConsumerGroupMetadata (option 3) than passing it in on an
additional callback of ConsumerRebalanceListener.
It feels easier to leverage, than requiring users to pass in the listener.

Guozhang

On Mon, Aug 12, 2019 at 3:41 PM Boyang Chen 
wrote:

> Thanks Jason, the intuition behind defining a separate callback function is
> that, with KIP-429 we no longer guarantee to call OnPartitionsAssigned() or
> OnPartitionsRevoked() with each rebalance. Our requirement is to be
> up-to-date with group metadata such as generation information, so callback
> like onGroupJoined() would make more sense as it should be invoked after
> every successful rebalance.
>
> Best,
> Boyang
>
> On Mon, Aug 12, 2019 at 2:02 PM Jason Gustafson 
> wrote:
>
> > Hey Boyang,
> >
> > I favor option 4 as well. It's a little more cumbersome than 3 for this
> use
> > case, but it seems like a cleaner separation of concerns. The rebalance
> > listener is already concerned with events affecting the assignment
> > lifecycle and group membership. I think the only thing I'm wondering is
> > whether it should be a separate callback as you've suggested, or if it
> > would make sense to overload `onPartitionsAssigned`. If it's separate,
> > maybe a name like `onGroupJoined` would be clearer?
> >
> > Thanks,
> > Jason
> >
> >
> >
> > On Thu, Aug 8, 2019 at 10:59 PM Boyang Chen 
> > wrote:
> >
> > > Thank you Jason. We had some offline discussion on properly keeping
> group
> > > metadata up to date, and here are some of our options brainstormed:
> > > 1. Let the caller of `sendOffsetsToTransaction(offset, metadata)`
> > maintain
> > > the ever-changing group metadata. This could be done on stream side,
> but
> > > for non-stream EOS the sample code will become complicated as the user
> > > needs to implement the partition assignor interface to get the update
> > from
> > > `onAssignment`
> > >
> > > 2. Get a new API on producer like `refreshGroupMetadata(metadata)`.
> This
> > is
> > > similar to option 1 except that now in the partition assignor callback
> we
> > > could straightly pass in the producer instance, which simplifies the
> > > non-stream EOS, however this new API seems weird to define on producer.
> > >
> > > 3. Make an accessing interface to group metadata, or just expose the
> > group
> > > metadata through a consumer API like `consumer.GroupMetadata()`. This
> is
> > > the old way which avoids the users’ effort to implement partition
> > assignor
> > > directly.
> > >
> > > 4. Expose the group metadata through rebalance listener, which is a
> more
> > > well-known and adopted callback interface. We could do sth like
> > > `onGroupMetadataUpdated(ConsumerGroupMetadata metadata)`
> > >
> > > To simplify the code logic, we believe option 3 & 4 are better
> solutions,
> > > and of which I slightly prefer option 4 as it is the most clean
> solution
> > > with less intrusion to both consumer and producer APIs.
> > >
> > > WDYT?
> > >
> > > Boyang
> > >
> > >
> > >
> > >
> > > On Wed, Aug 7, 2019 at 9:20 AM Jason Gustafson 
> > wrote:
> > >
> > > > Hi Boyang,
> > > >
> > > > > We already persist member.id, instance.id and generation.id in the
> > > > offset
> > > > topic, what extra fields we need to store?
> > > >
> > > > Yeah, you're right. I was a little confused and thought this
> > information
> > > > was needed by the transaction coordinator.
> > > >
> > > > > This should be easily done on the stream side as we have
> > > > StreamsPartitionAssignor to reflect metadata changes upon
> > > #onAssignment(),
> > > > but non-stream user has to code the callback by hand, do you think
> the
> > > > convenience we sacrifice here worth the simplification benefit?
> > > >
> > > > Either way, you need a reference to the consumer. I was mostly just
> > > > thinking it would be better to reduce the integration point to its
> > > minimum.
> > > > Have you thought through the implications of needing to keep around a
> > > > reference to the consumer in the producer? What if it gets closed? It
> > > seems
> > > > better not to have to think about these cases.
> > > >
> > > > -Jason
> > > >
> > > > On Tue, Aug 6, 2019 at 9:53 PM Boyang Chen <
> reluctanthero...@gmail.com
> > >
> > > > wrote:
> > > >
> > > > > Thank you for the suggestions Jason. And a side note for Guozhang,
> I
> > > > > updated the KIP to reflect the dependency on 447.
> > > > >
> > > > > On Tue, Aug 6, 2019 at 11:35 AM Jason Gustafson <
> ja...@confluent.io>
> > > > > wrote:
> > > > >
> > > > > > Hi Boyang, thanks for the updates. I have a few more comments:
> > > > > >
> > > > > > 1. We are adding some new fields to TxnOffsetCommit to support
> > > > > group-based
> > > > > > fencing. Do we need these fields to be persisted in the offsets
> > topic
> > > > to
> > > > > > ensure that the fencing still works 

Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-08-12 Thread Boyang Chen
Thanks Jason, the intuition behind defining a separate callback function is
that, with KIP-429 we no longer guarantee to call OnPartitionsAssigned() or
OnPartitionsRevoked() with each rebalance. Our requirement is to be
up-to-date with group metadata such as generation information, so callback
like onGroupJoined() would make more sense as it should be invoked after
every successful rebalance.

Best,
Boyang

On Mon, Aug 12, 2019 at 2:02 PM Jason Gustafson  wrote:

> Hey Boyang,
>
> I favor option 4 as well. It's a little more cumbersome than 3 for this use
> case, but it seems like a cleaner separation of concerns. The rebalance
> listener is already concerned with events affecting the assignment
> lifecycle and group membership. I think the only thing I'm wondering is
> whether it should be a separate callback as you've suggested, or if it
> would make sense to overload `onPartitionsAssigned`. If it's separate,
> maybe a name like `onGroupJoined` would be clearer?
>
> Thanks,
> Jason
>
>
>
> On Thu, Aug 8, 2019 at 10:59 PM Boyang Chen 
> wrote:
>
> > Thank you Jason. We had some offline discussion on properly keeping group
> > metadata up to date, and here are some of our options brainstormed:
> > 1. Let the caller of `sendOffsetsToTransaction(offset, metadata)`
> maintain
> > the ever-changing group metadata. This could be done on stream side, but
> > for non-stream EOS the sample code will become complicated as the user
> > needs to implement the partition assignor interface to get the update
> from
> > `onAssignment`
> >
> > 2. Get a new API on producer like `refreshGroupMetadata(metadata)`. This
> is
> > similar to option 1 except that now in the partition assignor callback we
> > could straightly pass in the producer instance, which simplifies the
> > non-stream EOS, however this new API seems weird to define on producer.
> >
> > 3. Make an accessing interface to group metadata, or just expose the
> group
> > metadata through a consumer API like `consumer.GroupMetadata()`. This is
> > the old way which avoids the users’ effort to implement partition
> assignor
> > directly.
> >
> > 4. Expose the group metadata through rebalance listener, which is a more
> > well-known and adopted callback interface. We could do sth like
> > `onGroupMetadataUpdated(ConsumerGroupMetadata metadata)`
> >
> > To simplify the code logic, we believe option 3 & 4 are better solutions,
> > and of which I slightly prefer option 4 as it is the most clean solution
> > with less intrusion to both consumer and producer APIs.
> >
> > WDYT?
> >
> > Boyang
> >
> >
> >
> >
> > On Wed, Aug 7, 2019 at 9:20 AM Jason Gustafson 
> wrote:
> >
> > > Hi Boyang,
> > >
> > > > We already persist member.id, instance.id and generation.id in the
> > > offset
> > > topic, what extra fields we need to store?
> > >
> > > Yeah, you're right. I was a little confused and thought this
> information
> > > was needed by the transaction coordinator.
> > >
> > > > This should be easily done on the stream side as we have
> > > StreamsPartitionAssignor to reflect metadata changes upon
> > #onAssignment(),
> > > but non-stream user has to code the callback by hand, do you think the
> > > convenience we sacrifice here worth the simplification benefit?
> > >
> > > Either way, you need a reference to the consumer. I was mostly just
> > > thinking it would be better to reduce the integration point to its
> > minimum.
> > > Have you thought through the implications of needing to keep around a
> > > reference to the consumer in the producer? What if it gets closed? It
> > seems
> > > better not to have to think about these cases.
> > >
> > > -Jason
> > >
> > > On Tue, Aug 6, 2019 at 9:53 PM Boyang Chen  >
> > > wrote:
> > >
> > > > Thank you for the suggestions Jason. And a side note for Guozhang, I
> > > > updated the KIP to reflect the dependency on 447.
> > > >
> > > > On Tue, Aug 6, 2019 at 11:35 AM Jason Gustafson 
> > > > wrote:
> > > >
> > > > > Hi Boyang, thanks for the updates. I have a few more comments:
> > > > >
> > > > > 1. We are adding some new fields to TxnOffsetCommit to support
> > > > group-based
> > > > > fencing. Do we need these fields to be persisted in the offsets
> topic
> > > to
> > > > > ensure that the fencing still works after a coordinator failover?
> > > > >
> > > > > We already persist member.id, instance.id and generation.id in the
> > > > offset
> > > > topic, what extra fields we need to store?
> > > >
> > > >
> > > > > 2. Since you are proposing a new `groupMetadata` API, have you
> > > considered
> > > > > whether we still need the `initTransactions` overload? Another way
> > > would
> > > > be
> > > > > to pass it through the `sendOffsetsToTransaction` API:
> > > > >
> > > > > void sendOffsetsToTransaction(Map OffsetAndMetadata>
> > > > > offsets, GroupMetadata groupMetadata) throws
> > > > > ProducerFencedException, IllegalGenerationException;
> > > > >
> > > > > This seems a little more consistent with the current API and 

Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-08-12 Thread Jason Gustafson
Hey Boyang,

I favor option 4 as well. It's a little more cumbersome than 3 for this use
case, but it seems like a cleaner separation of concerns. The rebalance
listener is already concerned with events affecting the assignment
lifecycle and group membership. I think the only thing I'm wondering is
whether it should be a separate callback as you've suggested, or if it
would make sense to overload `onPartitionsAssigned`. If it's separate,
maybe a name like `onGroupJoined` would be clearer?

Thanks,
Jason



On Thu, Aug 8, 2019 at 10:59 PM Boyang Chen 
wrote:

> Thank you Jason. We had some offline discussion on properly keeping group
> metadata up to date, and here are some of our options brainstormed:
> 1. Let the caller of `sendOffsetsToTransaction(offset, metadata)` maintain
> the ever-changing group metadata. This could be done on stream side, but
> for non-stream EOS the sample code will become complicated as the user
> needs to implement the partition assignor interface to get the update from
> `onAssignment`
>
> 2. Get a new API on producer like `refreshGroupMetadata(metadata)`. This is
> similar to option 1 except that now in the partition assignor callback we
> could straightly pass in the producer instance, which simplifies the
> non-stream EOS, however this new API seems weird to define on producer.
>
> 3. Make an accessing interface to group metadata, or just expose the group
> metadata through a consumer API like `consumer.GroupMetadata()`. This is
> the old way which avoids the users’ effort to implement partition assignor
> directly.
>
> 4. Expose the group metadata through rebalance listener, which is a more
> well-known and adopted callback interface. We could do sth like
> `onGroupMetadataUpdated(ConsumerGroupMetadata metadata)`
>
> To simplify the code logic, we believe option 3 & 4 are better solutions,
> and of which I slightly prefer option 4 as it is the most clean solution
> with less intrusion to both consumer and producer APIs.
>
> WDYT?
>
> Boyang
>
>
>
>
> On Wed, Aug 7, 2019 at 9:20 AM Jason Gustafson  wrote:
>
> > Hi Boyang,
> >
> > > We already persist member.id, instance.id and generation.id in the
> > offset
> > topic, what extra fields we need to store?
> >
> > Yeah, you're right. I was a little confused and thought this information
> > was needed by the transaction coordinator.
> >
> > > This should be easily done on the stream side as we have
> > StreamsPartitionAssignor to reflect metadata changes upon
> #onAssignment(),
> > but non-stream user has to code the callback by hand, do you think the
> > convenience we sacrifice here worth the simplification benefit?
> >
> > Either way, you need a reference to the consumer. I was mostly just
> > thinking it would be better to reduce the integration point to its
> minimum.
> > Have you thought through the implications of needing to keep around a
> > reference to the consumer in the producer? What if it gets closed? It
> seems
> > better not to have to think about these cases.
> >
> > -Jason
> >
> > On Tue, Aug 6, 2019 at 9:53 PM Boyang Chen 
> > wrote:
> >
> > > Thank you for the suggestions Jason. And a side note for Guozhang, I
> > > updated the KIP to reflect the dependency on 447.
> > >
> > > On Tue, Aug 6, 2019 at 11:35 AM Jason Gustafson 
> > > wrote:
> > >
> > > > Hi Boyang, thanks for the updates. I have a few more comments:
> > > >
> > > > 1. We are adding some new fields to TxnOffsetCommit to support
> > > group-based
> > > > fencing. Do we need these fields to be persisted in the offsets topic
> > to
> > > > ensure that the fencing still works after a coordinator failover?
> > > >
> > > > We already persist member.id, instance.id and generation.id in the
> > > offset
> > > topic, what extra fields we need to store?
> > >
> > >
> > > > 2. Since you are proposing a new `groupMetadata` API, have you
> > considered
> > > > whether we still need the `initTransactions` overload? Another way
> > would
> > > be
> > > > to pass it through the `sendOffsetsToTransaction` API:
> > > >
> > > > void sendOffsetsToTransaction(Map
> > > > offsets, GroupMetadata groupMetadata) throws
> > > > ProducerFencedException, IllegalGenerationException;
> > > >
> > > > This seems a little more consistent with the current API and avoids
> the
> > > > direct dependence on the Consumer in the producer.
> > > >
> > > > Note that although we avoid one dependency to consumer, producer
> needs
> > to
> > > periodically update
> > > its group metadata, or in this case the caller of
> > > *sendOffsetsToTransaction(Map > > OffsetAndMetadata>*
> > > *offsets, GroupMetadata groupMetadata) *is responsible for getting the
> > > latest value of group metadata.
> > > This should be easily done on the stream side as we have
> > > StreamsPartitionAssignor to reflect metadata changes upon
> > #onAssignment(),
> > > but non-stream user has to code the callback by hand, do you think the
> > > convenience we sacrifice here worth the simplification benefit?
> > >
> 

Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-08-09 Thread Boyang Chen
Thank you Jason. We had some offline discussion on properly keeping group
metadata up to date, and here are some of our options brainstormed:
1. Let the caller of `sendOffsetsToTransaction(offset, metadata)` maintain
the ever-changing group metadata. This could be done on stream side, but
for non-stream EOS the sample code will become complicated as the user
needs to implement the partition assignor interface to get the update from
`onAssignment`

2. Get a new API on producer like `refreshGroupMetadata(metadata)`. This is
similar to option 1 except that now in the partition assignor callback we
could straightly pass in the producer instance, which simplifies the
non-stream EOS, however this new API seems weird to define on producer.

3. Make an accessing interface to group metadata, or just expose the group
metadata through a consumer API like `consumer.GroupMetadata()`. This is
the old way which avoids the users’ effort to implement partition assignor
directly.

4. Expose the group metadata through rebalance listener, which is a more
well-known and adopted callback interface. We could do sth like
`onGroupMetadataUpdated(ConsumerGroupMetadata metadata)`

To simplify the code logic, we believe option 3 & 4 are better solutions,
and of which I slightly prefer option 4 as it is the most clean solution
with less intrusion to both consumer and producer APIs.

WDYT?

Boyang




On Wed, Aug 7, 2019 at 9:20 AM Jason Gustafson  wrote:

> Hi Boyang,
>
> > We already persist member.id, instance.id and generation.id in the
> offset
> topic, what extra fields we need to store?
>
> Yeah, you're right. I was a little confused and thought this information
> was needed by the transaction coordinator.
>
> > This should be easily done on the stream side as we have
> StreamsPartitionAssignor to reflect metadata changes upon #onAssignment(),
> but non-stream user has to code the callback by hand, do you think the
> convenience we sacrifice here worth the simplification benefit?
>
> Either way, you need a reference to the consumer. I was mostly just
> thinking it would be better to reduce the integration point to its minimum.
> Have you thought through the implications of needing to keep around a
> reference to the consumer in the producer? What if it gets closed? It seems
> better not to have to think about these cases.
>
> -Jason
>
> On Tue, Aug 6, 2019 at 9:53 PM Boyang Chen 
> wrote:
>
> > Thank you for the suggestions Jason. And a side note for Guozhang, I
> > updated the KIP to reflect the dependency on 447.
> >
> > On Tue, Aug 6, 2019 at 11:35 AM Jason Gustafson 
> > wrote:
> >
> > > Hi Boyang, thanks for the updates. I have a few more comments:
> > >
> > > 1. We are adding some new fields to TxnOffsetCommit to support
> > group-based
> > > fencing. Do we need these fields to be persisted in the offsets topic
> to
> > > ensure that the fencing still works after a coordinator failover?
> > >
> > > We already persist member.id, instance.id and generation.id in the
> > offset
> > topic, what extra fields we need to store?
> >
> >
> > > 2. Since you are proposing a new `groupMetadata` API, have you
> considered
> > > whether we still need the `initTransactions` overload? Another way
> would
> > be
> > > to pass it through the `sendOffsetsToTransaction` API:
> > >
> > > void sendOffsetsToTransaction(Map
> > > offsets, GroupMetadata groupMetadata) throws
> > > ProducerFencedException, IllegalGenerationException;
> > >
> > > This seems a little more consistent with the current API and avoids the
> > > direct dependence on the Consumer in the producer.
> > >
> > > Note that although we avoid one dependency to consumer, producer needs
> to
> > periodically update
> > its group metadata, or in this case the caller of
> > *sendOffsetsToTransaction(Map > OffsetAndMetadata>*
> > *offsets, GroupMetadata groupMetadata) *is responsible for getting the
> > latest value of group metadata.
> > This should be easily done on the stream side as we have
> > StreamsPartitionAssignor to reflect metadata changes upon
> #onAssignment(),
> > but non-stream user has to code the callback by hand, do you think the
> > convenience we sacrifice here worth the simplification benefit?
> >
> >
> > > 3. Can you clarify the behavior of the clients when the brokers do not
> > > support the latest API versions? This is both for the new
> TxnOffsetCommit
> > > and the OffsetFetch APIs. I guess the high level idea in streams is to
> > > detect broker support before instantiating the producer and consumer. I
> > > think that's reasonable, but we might need some approach for
> non-streams
> > > use cases. One option I was considering is enforcing the latest version
> > > through the new `sendOffsetsToTransaction` API. Basically when you use
> > the
> > > new API, we require support for the latest TxnOffsetCommit version.
> This
> > > puts some burden on users, but it avoids breaking correctness
> assumptions
> > > when the new APIs are in use. What do you think?

Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-08-07 Thread Jason Gustafson
Hi Boyang,

> We already persist member.id, instance.id and generation.id in the offset
topic, what extra fields we need to store?

Yeah, you're right. I was a little confused and thought this information
was needed by the transaction coordinator.

> This should be easily done on the stream side as we have
StreamsPartitionAssignor to reflect metadata changes upon #onAssignment(),
but non-stream user has to code the callback by hand, do you think the
convenience we sacrifice here worth the simplification benefit?

Either way, you need a reference to the consumer. I was mostly just
thinking it would be better to reduce the integration point to its minimum.
Have you thought through the implications of needing to keep around a
reference to the consumer in the producer? What if it gets closed? It seems
better not to have to think about these cases.

-Jason

On Tue, Aug 6, 2019 at 9:53 PM Boyang Chen 
wrote:

> Thank you for the suggestions Jason. And a side note for Guozhang, I
> updated the KIP to reflect the dependency on 447.
>
> On Tue, Aug 6, 2019 at 11:35 AM Jason Gustafson 
> wrote:
>
> > Hi Boyang, thanks for the updates. I have a few more comments:
> >
> > 1. We are adding some new fields to TxnOffsetCommit to support
> group-based
> > fencing. Do we need these fields to be persisted in the offsets topic to
> > ensure that the fencing still works after a coordinator failover?
> >
> > We already persist member.id, instance.id and generation.id in the
> offset
> topic, what extra fields we need to store?
>
>
> > 2. Since you are proposing a new `groupMetadata` API, have you considered
> > whether we still need the `initTransactions` overload? Another way would
> be
> > to pass it through the `sendOffsetsToTransaction` API:
> >
> > void sendOffsetsToTransaction(Map
> > offsets, GroupMetadata groupMetadata) throws
> > ProducerFencedException, IllegalGenerationException;
> >
> > This seems a little more consistent with the current API and avoids the
> > direct dependence on the Consumer in the producer.
> >
> > Note that although we avoid one dependency to consumer, producer needs to
> periodically update
> its group metadata, or in this case the caller of
> *sendOffsetsToTransaction(Map OffsetAndMetadata>*
> *offsets, GroupMetadata groupMetadata) *is responsible for getting the
> latest value of group metadata.
> This should be easily done on the stream side as we have
> StreamsPartitionAssignor to reflect metadata changes upon #onAssignment(),
> but non-stream user has to code the callback by hand, do you think the
> convenience we sacrifice here worth the simplification benefit?
>
>
> > 3. Can you clarify the behavior of the clients when the brokers do not
> > support the latest API versions? This is both for the new TxnOffsetCommit
> > and the OffsetFetch APIs. I guess the high level idea in streams is to
> > detect broker support before instantiating the producer and consumer. I
> > think that's reasonable, but we might need some approach for non-streams
> > use cases. One option I was considering is enforcing the latest version
> > through the new `sendOffsetsToTransaction` API. Basically when you use
> the
> > new API, we require support for the latest TxnOffsetCommit version. This
> > puts some burden on users, but it avoids breaking correctness assumptions
> > when the new APIs are in use. What do you think?
> >
> Yes, I think we haven't covered this case, so the plan is to crash the
> non-stream application when the job is using new sendOffsets API.
>
> >
> > -Jason
> >
> >
> >
> >
> > On Mon, Aug 5, 2019 at 6:06 PM Boyang Chen 
> > wrote:
> >
> > > Yep, Guozhang I think that would be best as passing in an entire
> consumer
> > > instance is indeed cumbersome.
> > >
> > > Just saw you updated KIP-429, I will follow-up to change 447 as well.
> > >
> > > Best,
> > > Boyang
> > >
> > > On Mon, Aug 5, 2019 at 3:18 PM Guozhang Wang 
> wrote:
> > >
> > > > okay I think I understand your concerns about ConsumerGroupMetadata
> > now:
> > > if
> > > > we still want to only call initTxns once, then we should allow the
> > > whatever
> > > > passed-in parameter to reflect the latest value of generation id
> > whenever
> > > > sending the offset fetch request.
> > > >
> > > > Whereas the current ConsumerGroupMetadata is a static object.
> > > >
> > > > Maybe we can consider having an extended class of
> ConsumerGroupMetadata
> > > > whose values are updated from the consumer's rebalance callback?
> > > >
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Mon, Aug 5, 2019 at 9:26 AM Boyang Chen <
> reluctanthero...@gmail.com
> > >
> > > > wrote:
> > > >
> > > > > Thank you Guozhang for the reply! I'm curious whether KIP-429 has
> > > > reflected
> > > > > the latest change on ConsumerGroupMetadata? Also regarding question
> > > one,
> > > > > the group metadata needs to be accessed via callback, does that
> mean
> > we
> > > > > need a separate producer API such like
> > > > > 

Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-08-06 Thread Boyang Chen
Thank you for the suggestions Jason. And a side note for Guozhang, I
updated the KIP to reflect the dependency on 447.

On Tue, Aug 6, 2019 at 11:35 AM Jason Gustafson  wrote:

> Hi Boyang, thanks for the updates. I have a few more comments:
>
> 1. We are adding some new fields to TxnOffsetCommit to support group-based
> fencing. Do we need these fields to be persisted in the offsets topic to
> ensure that the fencing still works after a coordinator failover?
>
> We already persist member.id, instance.id and generation.id in the offset
topic, what extra fields we need to store?


> 2. Since you are proposing a new `groupMetadata` API, have you considered
> whether we still need the `initTransactions` overload? Another way would be
> to pass it through the `sendOffsetsToTransaction` API:
>
> void sendOffsetsToTransaction(Map
> offsets, GroupMetadata groupMetadata) throws
> ProducerFencedException, IllegalGenerationException;
>
> This seems a little more consistent with the current API and avoids the
> direct dependence on the Consumer in the producer.
>
> Note that although we avoid one dependency to consumer, producer needs to
periodically update
its group metadata, or in this case the caller of
*sendOffsetsToTransaction(Map*
*offsets, GroupMetadata groupMetadata) *is responsible for getting the
latest value of group metadata.
This should be easily done on the stream side as we have
StreamsPartitionAssignor to reflect metadata changes upon #onAssignment(),
but non-stream user has to code the callback by hand, do you think the
convenience we sacrifice here worth the simplification benefit?


> 3. Can you clarify the behavior of the clients when the brokers do not
> support the latest API versions? This is both for the new TxnOffsetCommit
> and the OffsetFetch APIs. I guess the high level idea in streams is to
> detect broker support before instantiating the producer and consumer. I
> think that's reasonable, but we might need some approach for non-streams
> use cases. One option I was considering is enforcing the latest version
> through the new `sendOffsetsToTransaction` API. Basically when you use the
> new API, we require support for the latest TxnOffsetCommit version. This
> puts some burden on users, but it avoids breaking correctness assumptions
> when the new APIs are in use. What do you think?
>
Yes, I think we haven't covered this case, so the plan is to crash the
non-stream application when the job is using new sendOffsets API.

>
> -Jason
>
>
>
>
> On Mon, Aug 5, 2019 at 6:06 PM Boyang Chen 
> wrote:
>
> > Yep, Guozhang I think that would be best as passing in an entire consumer
> > instance is indeed cumbersome.
> >
> > Just saw you updated KIP-429, I will follow-up to change 447 as well.
> >
> > Best,
> > Boyang
> >
> > On Mon, Aug 5, 2019 at 3:18 PM Guozhang Wang  wrote:
> >
> > > okay I think I understand your concerns about ConsumerGroupMetadata
> now:
> > if
> > > we still want to only call initTxns once, then we should allow the
> > whatever
> > > passed-in parameter to reflect the latest value of generation id
> whenever
> > > sending the offset fetch request.
> > >
> > > Whereas the current ConsumerGroupMetadata is a static object.
> > >
> > > Maybe we can consider having an extended class of ConsumerGroupMetadata
> > > whose values are updated from the consumer's rebalance callback?
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Mon, Aug 5, 2019 at 9:26 AM Boyang Chen  >
> > > wrote:
> > >
> > > > Thank you Guozhang for the reply! I'm curious whether KIP-429 has
> > > reflected
> > > > the latest change on ConsumerGroupMetadata? Also regarding question
> > one,
> > > > the group metadata needs to be accessed via callback, does that mean
> we
> > > > need a separate producer API such like
> > > > "producer.refreshMetadata(groupMetadata)" to be able to access it
> > instead
> > > > of passing in the consumer instance?
> > > >
> > > > Boyang
> > > >
> > > > On Fri, Aug 2, 2019 at 4:36 PM Guozhang Wang 
> > wrote:
> > > >
> > > > > Thanks Boyang,
> > > > >
> > > > > I've made another pass on KIP-447 as well as
> > > > > https://github.com/apache/kafka/pull/7078, and have some minor
> > > comments
> > > > > about the proposed API:
> > > > >
> > > > > 1. it seems instead of needing the whole KafkaConsumer object,
> you'd
> > > only
> > > > > need the "ConsumerGroupMetadata", in that case can we just pass in
> > that
> > > > > object into the initTxns call?
> > > > >
> > > > > 2. the current trunk already has a public class named
> > > > > (ConsumerGroupMetadata)
> > > > > under o.a.k.clients.consumer created by KIP-429. If we want to just
> > use
> > > > > that then maybe it makes less sense to declare a base GroupMetadata
> > as
> > > we
> > > > > are already leaking such information on the assignor anyways.
> > > > >
> > > > >
> > > > > Guozhang
> > > > >
> > > > > On Tue, Jul 30, 2019 at 1:55 PM Boyang Chen <
> > > reluctanthero...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Thank 

Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-08-06 Thread Jason Gustafson
Hi Boyang, thanks for the updates. I have a few more comments:

1. We are adding some new fields to TxnOffsetCommit to support group-based
fencing. Do we need these fields to be persisted in the offsets topic to
ensure that the fencing still works after a coordinator failover?

2. Since you are proposing a new `groupMetadata` API, have you considered
whether we still need the `initTransactions` overload? Another way would be
to pass it through the `sendOffsetsToTransaction` API:

void sendOffsetsToTransaction(Map
offsets, GroupMetadata groupMetadata) throws
ProducerFencedException, IllegalGenerationException;

This seems a little more consistent with the current API and avoids the
direct dependence on the Consumer in the producer.

3. Can you clarify the behavior of the clients when the brokers do not
support the latest API versions? This is both for the new TxnOffsetCommit
and the OffsetFetch APIs. I guess the high level idea in streams is to
detect broker support before instantiating the producer and consumer. I
think that's reasonable, but we might need some approach for non-streams
use cases. One option I was considering is enforcing the latest version
through the new `sendOffsetsToTransaction` API. Basically when you use the
new API, we require support for the latest TxnOffsetCommit version. This
puts some burden on users, but it avoids breaking correctness assumptions
when the new APIs are in use. What do you think?


-Jason






On Mon, Aug 5, 2019 at 6:06 PM Boyang Chen 
wrote:

> Yep, Guozhang I think that would be best as passing in an entire consumer
> instance is indeed cumbersome.
>
> Just saw you updated KIP-429, I will follow-up to change 447 as well.
>
> Best,
> Boyang
>
> On Mon, Aug 5, 2019 at 3:18 PM Guozhang Wang  wrote:
>
> > okay I think I understand your concerns about ConsumerGroupMetadata now:
> if
> > we still want to only call initTxns once, then we should allow the
> whatever
> > passed-in parameter to reflect the latest value of generation id whenever
> > sending the offset fetch request.
> >
> > Whereas the current ConsumerGroupMetadata is a static object.
> >
> > Maybe we can consider having an extended class of ConsumerGroupMetadata
> > whose values are updated from the consumer's rebalance callback?
> >
> >
> > Guozhang
> >
> >
> > On Mon, Aug 5, 2019 at 9:26 AM Boyang Chen 
> > wrote:
> >
> > > Thank you Guozhang for the reply! I'm curious whether KIP-429 has
> > reflected
> > > the latest change on ConsumerGroupMetadata? Also regarding question
> one,
> > > the group metadata needs to be accessed via callback, does that mean we
> > > need a separate producer API such like
> > > "producer.refreshMetadata(groupMetadata)" to be able to access it
> instead
> > > of passing in the consumer instance?
> > >
> > > Boyang
> > >
> > > On Fri, Aug 2, 2019 at 4:36 PM Guozhang Wang 
> wrote:
> > >
> > > > Thanks Boyang,
> > > >
> > > > I've made another pass on KIP-447 as well as
> > > > https://github.com/apache/kafka/pull/7078, and have some minor
> > comments
> > > > about the proposed API:
> > > >
> > > > 1. it seems instead of needing the whole KafkaConsumer object, you'd
> > only
> > > > need the "ConsumerGroupMetadata", in that case can we just pass in
> that
> > > > object into the initTxns call?
> > > >
> > > > 2. the current trunk already has a public class named
> > > > (ConsumerGroupMetadata)
> > > > under o.a.k.clients.consumer created by KIP-429. If we want to just
> use
> > > > that then maybe it makes less sense to declare a base GroupMetadata
> as
> > we
> > > > are already leaking such information on the assignor anyways.
> > > >
> > > >
> > > > Guozhang
> > > >
> > > > On Tue, Jul 30, 2019 at 1:55 PM Boyang Chen <
> > reluctanthero...@gmail.com>
> > > > wrote:
> > > >
> > > > > Thank you Guozhang for the reply. We will consider the interface
> > change
> > > > > from 429 as a backup plan for 447.
> > > > >
> > > > > And bumping this thread for more discussion.
> > > > >
> > > > > On Mon, Jul 22, 2019 at 6:28 PM Guozhang Wang 
> > > > wrote:
> > > > >
> > > > > > On Sat, Jul 20, 2019 at 9:50 AM Boyang Chen <
> > > > reluctanthero...@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Thank you Guozhang for the suggestion! I would normally prefer
> > > > naming a
> > > > > > > flag corresponding to its functionality. Seems to me
> > > > `isolation_level`
> > > > > > > makes us another hop on information track.
> > > > > > >
> > > > > > > Fair enough, let's use a separate flag name then :)
> > > > > >
> > > > > >
> > > > > > > As for the generation.id exposure, I'm fine leveraging the new
> > API
> > > > > from
> > > > > > > 429, but however is that design finalized yet, and whether the
> > API
> > > > will
> > > > > > be
> > > > > > > added on the generic Consumer interface?
> > > > > > >
> > > > > > > The current PartitionAssignor is inside `internals` package and
> > in
> > > > > > KIP-429
> > > > > > we are going to create a new interface out of `internals` 

Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-08-05 Thread Boyang Chen
Yep, Guozhang I think that would be best as passing in an entire consumer
instance is indeed cumbersome.

Just saw you updated KIP-429, I will follow-up to change 447 as well.

Best,
Boyang

On Mon, Aug 5, 2019 at 3:18 PM Guozhang Wang  wrote:

> okay I think I understand your concerns about ConsumerGroupMetadata now: if
> we still want to only call initTxns once, then we should allow the whatever
> passed-in parameter to reflect the latest value of generation id whenever
> sending the offset fetch request.
>
> Whereas the current ConsumerGroupMetadata is a static object.
>
> Maybe we can consider having an extended class of ConsumerGroupMetadata
> whose values are updated from the consumer's rebalance callback?
>
>
> Guozhang
>
>
> On Mon, Aug 5, 2019 at 9:26 AM Boyang Chen 
> wrote:
>
> > Thank you Guozhang for the reply! I'm curious whether KIP-429 has
> reflected
> > the latest change on ConsumerGroupMetadata? Also regarding question one,
> > the group metadata needs to be accessed via callback, does that mean we
> > need a separate producer API such like
> > "producer.refreshMetadata(groupMetadata)" to be able to access it instead
> > of passing in the consumer instance?
> >
> > Boyang
> >
> > On Fri, Aug 2, 2019 at 4:36 PM Guozhang Wang  wrote:
> >
> > > Thanks Boyang,
> > >
> > > I've made another pass on KIP-447 as well as
> > > https://github.com/apache/kafka/pull/7078, and have some minor
> comments
> > > about the proposed API:
> > >
> > > 1. it seems instead of needing the whole KafkaConsumer object, you'd
> only
> > > need the "ConsumerGroupMetadata", in that case can we just pass in that
> > > object into the initTxns call?
> > >
> > > 2. the current trunk already has a public class named
> > > (ConsumerGroupMetadata)
> > > under o.a.k.clients.consumer created by KIP-429. If we want to just use
> > > that then maybe it makes less sense to declare a base GroupMetadata as
> we
> > > are already leaking such information on the assignor anyways.
> > >
> > >
> > > Guozhang
> > >
> > > On Tue, Jul 30, 2019 at 1:55 PM Boyang Chen <
> reluctanthero...@gmail.com>
> > > wrote:
> > >
> > > > Thank you Guozhang for the reply. We will consider the interface
> change
> > > > from 429 as a backup plan for 447.
> > > >
> > > > And bumping this thread for more discussion.
> > > >
> > > > On Mon, Jul 22, 2019 at 6:28 PM Guozhang Wang 
> > > wrote:
> > > >
> > > > > On Sat, Jul 20, 2019 at 9:50 AM Boyang Chen <
> > > reluctanthero...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Thank you Guozhang for the suggestion! I would normally prefer
> > > naming a
> > > > > > flag corresponding to its functionality. Seems to me
> > > `isolation_level`
> > > > > > makes us another hop on information track.
> > > > > >
> > > > > > Fair enough, let's use a separate flag name then :)
> > > > >
> > > > >
> > > > > > As for the generation.id exposure, I'm fine leveraging the new
> API
> > > > from
> > > > > > 429, but however is that design finalized yet, and whether the
> API
> > > will
> > > > > be
> > > > > > added on the generic Consumer interface?
> > > > > >
> > > > > > The current PartitionAssignor is inside `internals` package and
> in
> > > > > KIP-429
> > > > > we are going to create a new interface out of `internals` to really
> > > make
> > > > it
> > > > > public APIs, and as part of that we are refactoring some of its
> > method
> > > > > signatures. I just feel some of the newly introduced classes can be
> > > > reused
> > > > > in your KIP as well, i.e. just for code succinctness, but no
> > semantical
> > > > > indications.
> > > > >
> > > > >
> > > > > > Boyang
> > > > > >
> > > > > > On Fri, Jul 19, 2019 at 3:57 PM Guozhang Wang <
> wangg...@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > > > Boyang, thanks for the updated proposal!
> > > > > > >
> > > > > > > 3.a. As Jason mentioned, with EOS enabled we still need to
> > augment
> > > > the
> > > > > > > offset fetch request with a boolean to indicate "give me an
> > > retriable
> > > > > > error
> > > > > > > code if there's pending offset, rather than sending me the
> > > committed
> > > > > > offset
> > > > > > > immediately". Personally I still feel it is okay to piggy-back
> on
> > > the
> > > > > > > ISOLATION_LEVEL boolean, but I'm also fine with another
> > > > > > `await_transaction`
> > > > > > > boolean if you feel strongly about it.
> > > > > > >
> > > > > > > 10. About the exposure of generation id, there may be some
> > > > refactoring
> > > > > > work
> > > > > > > coming from KIP-429 that can benefit KIP-447 as well since we
> are
> > > > > > wrapping
> > > > > > > the consumer subscription / assignment data in new classes.
> Note
> > > that
> > > > > > > current proposal does not `generationId` since with the
> > cooperative
> > > > > > sticky
> > > > > > > assignor we think it is not necessary for correctness, but also
> > if
> > > we
> > > > > > agree
> > > > > > > it is okay to expose it we can potentially include it in
> > > > > 

Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-08-05 Thread Guozhang Wang
okay I think I understand your concerns about ConsumerGroupMetadata now: if
we still want to only call initTxns once, then we should allow the whatever
passed-in parameter to reflect the latest value of generation id whenever
sending the offset fetch request.

Whereas the current ConsumerGroupMetadata is a static object.

Maybe we can consider having an extended class of ConsumerGroupMetadata
whose values are updated from the consumer's rebalance callback?


Guozhang


On Mon, Aug 5, 2019 at 9:26 AM Boyang Chen 
wrote:

> Thank you Guozhang for the reply! I'm curious whether KIP-429 has reflected
> the latest change on ConsumerGroupMetadata? Also regarding question one,
> the group metadata needs to be accessed via callback, does that mean we
> need a separate producer API such like
> "producer.refreshMetadata(groupMetadata)" to be able to access it instead
> of passing in the consumer instance?
>
> Boyang
>
> On Fri, Aug 2, 2019 at 4:36 PM Guozhang Wang  wrote:
>
> > Thanks Boyang,
> >
> > I've made another pass on KIP-447 as well as
> > https://github.com/apache/kafka/pull/7078, and have some minor comments
> > about the proposed API:
> >
> > 1. it seems instead of needing the whole KafkaConsumer object, you'd only
> > need the "ConsumerGroupMetadata", in that case can we just pass in that
> > object into the initTxns call?
> >
> > 2. the current trunk already has a public class named
> > (ConsumerGroupMetadata)
> > under o.a.k.clients.consumer created by KIP-429. If we want to just use
> > that then maybe it makes less sense to declare a base GroupMetadata as we
> > are already leaking such information on the assignor anyways.
> >
> >
> > Guozhang
> >
> > On Tue, Jul 30, 2019 at 1:55 PM Boyang Chen 
> > wrote:
> >
> > > Thank you Guozhang for the reply. We will consider the interface change
> > > from 429 as a backup plan for 447.
> > >
> > > And bumping this thread for more discussion.
> > >
> > > On Mon, Jul 22, 2019 at 6:28 PM Guozhang Wang 
> > wrote:
> > >
> > > > On Sat, Jul 20, 2019 at 9:50 AM Boyang Chen <
> > reluctanthero...@gmail.com>
> > > > wrote:
> > > >
> > > > > Thank you Guozhang for the suggestion! I would normally prefer
> > naming a
> > > > > flag corresponding to its functionality. Seems to me
> > `isolation_level`
> > > > > makes us another hop on information track.
> > > > >
> > > > > Fair enough, let's use a separate flag name then :)
> > > >
> > > >
> > > > > As for the generation.id exposure, I'm fine leveraging the new API
> > > from
> > > > > 429, but however is that design finalized yet, and whether the API
> > will
> > > > be
> > > > > added on the generic Consumer interface?
> > > > >
> > > > > The current PartitionAssignor is inside `internals` package and in
> > > > KIP-429
> > > > we are going to create a new interface out of `internals` to really
> > make
> > > it
> > > > public APIs, and as part of that we are refactoring some of its
> method
> > > > signatures. I just feel some of the newly introduced classes can be
> > > reused
> > > > in your KIP as well, i.e. just for code succinctness, but no
> semantical
> > > > indications.
> > > >
> > > >
> > > > > Boyang
> > > > >
> > > > > On Fri, Jul 19, 2019 at 3:57 PM Guozhang Wang 
> > > > wrote:
> > > > >
> > > > > > Boyang, thanks for the updated proposal!
> > > > > >
> > > > > > 3.a. As Jason mentioned, with EOS enabled we still need to
> augment
> > > the
> > > > > > offset fetch request with a boolean to indicate "give me an
> > retriable
> > > > > error
> > > > > > code if there's pending offset, rather than sending me the
> > committed
> > > > > offset
> > > > > > immediately". Personally I still feel it is okay to piggy-back on
> > the
> > > > > > ISOLATION_LEVEL boolean, but I'm also fine with another
> > > > > `await_transaction`
> > > > > > boolean if you feel strongly about it.
> > > > > >
> > > > > > 10. About the exposure of generation id, there may be some
> > > refactoring
> > > > > work
> > > > > > coming from KIP-429 that can benefit KIP-447 as well since we are
> > > > > wrapping
> > > > > > the consumer subscription / assignment data in new classes. Note
> > that
> > > > > > current proposal does not `generationId` since with the
> cooperative
> > > > > sticky
> > > > > > assignor we think it is not necessary for correctness, but also
> if
> > we
> > > > > agree
> > > > > > it is okay to expose it we can potentially include it in
> > > > > > `ConsumerAssignmentData` as well.
> > > > > >
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > >
> > > > > > On Thu, Jul 18, 2019 at 3:55 PM Boyang Chen <
> > > > reluctanthero...@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Thank you Jason for the ideas.
> > > > > > >
> > > > > > > On Mon, Jul 15, 2019 at 5:28 PM Jason Gustafson <
> > > ja...@confluent.io>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi Boyang,
> > > > > > > >
> > > > > > > > Thanks for the updates. A few comments below:
> > > > > > > >
> > > > > > > > 1. The KIP mentions 

Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-08-05 Thread Boyang Chen
Thank you Guozhang for the reply! I'm curious whether KIP-429 has reflected
the latest change on ConsumerGroupMetadata? Also regarding question one,
the group metadata needs to be accessed via callback, does that mean we
need a separate producer API such like
"producer.refreshMetadata(groupMetadata)" to be able to access it instead
of passing in the consumer instance?

Boyang

On Fri, Aug 2, 2019 at 4:36 PM Guozhang Wang  wrote:

> Thanks Boyang,
>
> I've made another pass on KIP-447 as well as
> https://github.com/apache/kafka/pull/7078, and have some minor comments
> about the proposed API:
>
> 1. it seems instead of needing the whole KafkaConsumer object, you'd only
> need the "ConsumerGroupMetadata", in that case can we just pass in that
> object into the initTxns call?
>
> 2. the current trunk already has a public class named
> (ConsumerGroupMetadata)
> under o.a.k.clients.consumer created by KIP-429. If we want to just use
> that then maybe it makes less sense to declare a base GroupMetadata as we
> are already leaking such information on the assignor anyways.
>
>
> Guozhang
>
> On Tue, Jul 30, 2019 at 1:55 PM Boyang Chen 
> wrote:
>
> > Thank you Guozhang for the reply. We will consider the interface change
> > from 429 as a backup plan for 447.
> >
> > And bumping this thread for more discussion.
> >
> > On Mon, Jul 22, 2019 at 6:28 PM Guozhang Wang 
> wrote:
> >
> > > On Sat, Jul 20, 2019 at 9:50 AM Boyang Chen <
> reluctanthero...@gmail.com>
> > > wrote:
> > >
> > > > Thank you Guozhang for the suggestion! I would normally prefer
> naming a
> > > > flag corresponding to its functionality. Seems to me
> `isolation_level`
> > > > makes us another hop on information track.
> > > >
> > > > Fair enough, let's use a separate flag name then :)
> > >
> > >
> > > > As for the generation.id exposure, I'm fine leveraging the new API
> > from
> > > > 429, but however is that design finalized yet, and whether the API
> will
> > > be
> > > > added on the generic Consumer interface?
> > > >
> > > > The current PartitionAssignor is inside `internals` package and in
> > > KIP-429
> > > we are going to create a new interface out of `internals` to really
> make
> > it
> > > public APIs, and as part of that we are refactoring some of its method
> > > signatures. I just feel some of the newly introduced classes can be
> > reused
> > > in your KIP as well, i.e. just for code succinctness, but no semantical
> > > indications.
> > >
> > >
> > > > Boyang
> > > >
> > > > On Fri, Jul 19, 2019 at 3:57 PM Guozhang Wang 
> > > wrote:
> > > >
> > > > > Boyang, thanks for the updated proposal!
> > > > >
> > > > > 3.a. As Jason mentioned, with EOS enabled we still need to augment
> > the
> > > > > offset fetch request with a boolean to indicate "give me an
> retriable
> > > > error
> > > > > code if there's pending offset, rather than sending me the
> committed
> > > > offset
> > > > > immediately". Personally I still feel it is okay to piggy-back on
> the
> > > > > ISOLATION_LEVEL boolean, but I'm also fine with another
> > > > `await_transaction`
> > > > > boolean if you feel strongly about it.
> > > > >
> > > > > 10. About the exposure of generation id, there may be some
> > refactoring
> > > > work
> > > > > coming from KIP-429 that can benefit KIP-447 as well since we are
> > > > wrapping
> > > > > the consumer subscription / assignment data in new classes. Note
> that
> > > > > current proposal does not `generationId` since with the cooperative
> > > > sticky
> > > > > assignor we think it is not necessary for correctness, but also if
> we
> > > > agree
> > > > > it is okay to expose it we can potentially include it in
> > > > > `ConsumerAssignmentData` as well.
> > > > >
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > > On Thu, Jul 18, 2019 at 3:55 PM Boyang Chen <
> > > reluctanthero...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Thank you Jason for the ideas.
> > > > > >
> > > > > > On Mon, Jul 15, 2019 at 5:28 PM Jason Gustafson <
> > ja...@confluent.io>
> > > > > > wrote:
> > > > > >
> > > > > > > Hi Boyang,
> > > > > > >
> > > > > > > Thanks for the updates. A few comments below:
> > > > > > >
> > > > > > > 1. The KIP mentions that `transaction.timeout.ms` should be
> > > reduced
> > > > to
> > > > > > > 10s.
> > > > > > > I think this makes sense for Kafka Streams which is tied to the
> > > > > consumer
> > > > > > > group semantics and uses a default 10s session timeout.
> However,
> > it
> > > > > > seems a
> > > > > > > bit dangerous to make this change for the producer generally.
> > Could
> > > > we
> > > > > > just
> > > > > > > change it for streams?
> > > > > > >
> > > > > > > That sounds good to me.
> > > > > >
> > > > > > > 2. The new `initTransactions` API takes a `Consumer` instance.
> I
> > > > think
> > > > > > the
> > > > > > > idea is to basically put in a backdoor to give the producer
> > access
> > > to
> > > > > the
> > > > > > > group generationId. It's not clear to me how this 

Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-08-02 Thread Guozhang Wang
Thanks Boyang,

I've made another pass on KIP-447 as well as
https://github.com/apache/kafka/pull/7078, and have some minor comments
about the proposed API:

1. it seems instead of needing the whole KafkaConsumer object, you'd only
need the "ConsumerGroupMetadata", in that case can we just pass in that
object into the initTxns call?

2. the current trunk already has a public class named (ConsumerGroupMetadata)
under o.a.k.clients.consumer created by KIP-429. If we want to just use
that then maybe it makes less sense to declare a base GroupMetadata as we
are already leaking such information on the assignor anyways.


Guozhang

On Tue, Jul 30, 2019 at 1:55 PM Boyang Chen 
wrote:

> Thank you Guozhang for the reply. We will consider the interface change
> from 429 as a backup plan for 447.
>
> And bumping this thread for more discussion.
>
> On Mon, Jul 22, 2019 at 6:28 PM Guozhang Wang  wrote:
>
> > On Sat, Jul 20, 2019 at 9:50 AM Boyang Chen 
> > wrote:
> >
> > > Thank you Guozhang for the suggestion! I would normally prefer naming a
> > > flag corresponding to its functionality. Seems to me `isolation_level`
> > > makes us another hop on information track.
> > >
> > > Fair enough, let's use a separate flag name then :)
> >
> >
> > > As for the generation.id exposure, I'm fine leveraging the new API
> from
> > > 429, but however is that design finalized yet, and whether the API will
> > be
> > > added on the generic Consumer interface?
> > >
> > > The current PartitionAssignor is inside `internals` package and in
> > KIP-429
> > we are going to create a new interface out of `internals` to really make
> it
> > public APIs, and as part of that we are refactoring some of its method
> > signatures. I just feel some of the newly introduced classes can be
> reused
> > in your KIP as well, i.e. just for code succinctness, but no semantical
> > indications.
> >
> >
> > > Boyang
> > >
> > > On Fri, Jul 19, 2019 at 3:57 PM Guozhang Wang 
> > wrote:
> > >
> > > > Boyang, thanks for the updated proposal!
> > > >
> > > > 3.a. As Jason mentioned, with EOS enabled we still need to augment
> the
> > > > offset fetch request with a boolean to indicate "give me an retriable
> > > error
> > > > code if there's pending offset, rather than sending me the committed
> > > offset
> > > > immediately". Personally I still feel it is okay to piggy-back on the
> > > > ISOLATION_LEVEL boolean, but I'm also fine with another
> > > `await_transaction`
> > > > boolean if you feel strongly about it.
> > > >
> > > > 10. About the exposure of generation id, there may be some
> refactoring
> > > work
> > > > coming from KIP-429 that can benefit KIP-447 as well since we are
> > > wrapping
> > > > the consumer subscription / assignment data in new classes. Note that
> > > > current proposal does not `generationId` since with the cooperative
> > > sticky
> > > > assignor we think it is not necessary for correctness, but also if we
> > > agree
> > > > it is okay to expose it we can potentially include it in
> > > > `ConsumerAssignmentData` as well.
> > > >
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Thu, Jul 18, 2019 at 3:55 PM Boyang Chen <
> > reluctanthero...@gmail.com>
> > > > wrote:
> > > >
> > > > > Thank you Jason for the ideas.
> > > > >
> > > > > On Mon, Jul 15, 2019 at 5:28 PM Jason Gustafson <
> ja...@confluent.io>
> > > > > wrote:
> > > > >
> > > > > > Hi Boyang,
> > > > > >
> > > > > > Thanks for the updates. A few comments below:
> > > > > >
> > > > > > 1. The KIP mentions that `transaction.timeout.ms` should be
> > reduced
> > > to
> > > > > > 10s.
> > > > > > I think this makes sense for Kafka Streams which is tied to the
> > > > consumer
> > > > > > group semantics and uses a default 10s session timeout. However,
> it
> > > > > seems a
> > > > > > bit dangerous to make this change for the producer generally.
> Could
> > > we
> > > > > just
> > > > > > change it for streams?
> > > > > >
> > > > > > That sounds good to me.
> > > > >
> > > > > > 2. The new `initTransactions` API takes a `Consumer` instance. I
> > > think
> > > > > the
> > > > > > idea is to basically put in a backdoor to give the producer
> access
> > to
> > > > the
> > > > > > group generationId. It's not clear to me how this would work
> given
> > > > > package
> > > > > > restrictions. I wonder if it would be better to just expose the
> > state
> > > > we
> > > > > > need from the consumer. I know we have been reluctant to do this
> so
> > > far
> > > > > > because we treat the generationId as an implementation detail.
> > > > However, I
> > > > > > think we might just bite the bullet and expose it rather than
> > coming
> > > up
> > > > > > with a messy hack. Concepts such as memberIds have already been
> > > exposed
> > > > > in
> > > > > > the AdminClient, so maybe it is not too bad. Alternatively, we
> > could
> > > > use
> > > > > an
> > > > > > opaque type. For example:
> > > > > >
> > > > > > // public
> > > > > > interface GroupMetadata {}
> > > 

Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-07-30 Thread Boyang Chen
Thank you Guozhang for the reply. We will consider the interface change
from 429 as a backup plan for 447.

And bumping this thread for more discussion.

On Mon, Jul 22, 2019 at 6:28 PM Guozhang Wang  wrote:

> On Sat, Jul 20, 2019 at 9:50 AM Boyang Chen 
> wrote:
>
> > Thank you Guozhang for the suggestion! I would normally prefer naming a
> > flag corresponding to its functionality. Seems to me `isolation_level`
> > makes us another hop on information track.
> >
> > Fair enough, let's use a separate flag name then :)
>
>
> > As for the generation.id exposure, I'm fine leveraging the new API from
> > 429, but however is that design finalized yet, and whether the API will
> be
> > added on the generic Consumer interface?
> >
> > The current PartitionAssignor is inside `internals` package and in
> KIP-429
> we are going to create a new interface out of `internals` to really make it
> public APIs, and as part of that we are refactoring some of its method
> signatures. I just feel some of the newly introduced classes can be reused
> in your KIP as well, i.e. just for code succinctness, but no semantical
> indications.
>
>
> > Boyang
> >
> > On Fri, Jul 19, 2019 at 3:57 PM Guozhang Wang 
> wrote:
> >
> > > Boyang, thanks for the updated proposal!
> > >
> > > 3.a. As Jason mentioned, with EOS enabled we still need to augment the
> > > offset fetch request with a boolean to indicate "give me an retriable
> > error
> > > code if there's pending offset, rather than sending me the committed
> > offset
> > > immediately". Personally I still feel it is okay to piggy-back on the
> > > ISOLATION_LEVEL boolean, but I'm also fine with another
> > `await_transaction`
> > > boolean if you feel strongly about it.
> > >
> > > 10. About the exposure of generation id, there may be some refactoring
> > work
> > > coming from KIP-429 that can benefit KIP-447 as well since we are
> > wrapping
> > > the consumer subscription / assignment data in new classes. Note that
> > > current proposal does not `generationId` since with the cooperative
> > sticky
> > > assignor we think it is not necessary for correctness, but also if we
> > agree
> > > it is okay to expose it we can potentially include it in
> > > `ConsumerAssignmentData` as well.
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Thu, Jul 18, 2019 at 3:55 PM Boyang Chen <
> reluctanthero...@gmail.com>
> > > wrote:
> > >
> > > > Thank you Jason for the ideas.
> > > >
> > > > On Mon, Jul 15, 2019 at 5:28 PM Jason Gustafson 
> > > > wrote:
> > > >
> > > > > Hi Boyang,
> > > > >
> > > > > Thanks for the updates. A few comments below:
> > > > >
> > > > > 1. The KIP mentions that `transaction.timeout.ms` should be
> reduced
> > to
> > > > > 10s.
> > > > > I think this makes sense for Kafka Streams which is tied to the
> > > consumer
> > > > > group semantics and uses a default 10s session timeout. However, it
> > > > seems a
> > > > > bit dangerous to make this change for the producer generally. Could
> > we
> > > > just
> > > > > change it for streams?
> > > > >
> > > > > That sounds good to me.
> > > >
> > > > > 2. The new `initTransactions` API takes a `Consumer` instance. I
> > think
> > > > the
> > > > > idea is to basically put in a backdoor to give the producer access
> to
> > > the
> > > > > group generationId. It's not clear to me how this would work given
> > > > package
> > > > > restrictions. I wonder if it would be better to just expose the
> state
> > > we
> > > > > need from the consumer. I know we have been reluctant to do this so
> > far
> > > > > because we treat the generationId as an implementation detail.
> > > However, I
> > > > > think we might just bite the bullet and expose it rather than
> coming
> > up
> > > > > with a messy hack. Concepts such as memberIds have already been
> > exposed
> > > > in
> > > > > the AdminClient, so maybe it is not too bad. Alternatively, we
> could
> > > use
> > > > an
> > > > > opaque type. For example:
> > > > >
> > > > > // public
> > > > > interface GroupMetadata {}
> > > > >
> > > > > // private
> > > > > interface ConsumerGroupMetadata {
> > > > >   final int generationId;
> > > > >   final String memberId;
> > > > > }
> > > > >
> > > > > // Consumer API
> > > > > public GroupMetadata groupMetadata();
> > > > >
> > > > > I am probably leaning toward just exposing the state we need.
> > > > >
> > > > > Yes, also to mention that Kafka Streams use generic Cosnumer API
> > which
> > > > doesn't have rich
> > > > states like a full `KafkaConsumer`. The hack will not work as
> expected.
> > > >
> > > > Instead, just exposing the consumer generation.id seems a way easier
> > > work.
> > > > We could consolidate
> > > > the API and make it
> > > >
> > > > 3. Given that we are already providing a way to propagate group state
> > > from
> > > > > the consumer to the producer, I wonder if we may as well include
> the
> > > > > memberId and groupInstanceId. This would make the validation we do
> > for
> > > > > TxnOffsetCommit 

Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-07-22 Thread Guozhang Wang
On Sat, Jul 20, 2019 at 9:50 AM Boyang Chen 
wrote:

> Thank you Guozhang for the suggestion! I would normally prefer naming a
> flag corresponding to its functionality. Seems to me `isolation_level`
> makes us another hop on information track.
>
> Fair enough, let's use a separate flag name then :)


> As for the generation.id exposure, I'm fine leveraging the new API from
> 429, but however is that design finalized yet, and whether the API will be
> added on the generic Consumer interface?
>
> The current PartitionAssignor is inside `internals` package and in KIP-429
we are going to create a new interface out of `internals` to really make it
public APIs, and as part of that we are refactoring some of its method
signatures. I just feel some of the newly introduced classes can be reused
in your KIP as well, i.e. just for code succinctness, but no semantical
indications.


> Boyang
>
> On Fri, Jul 19, 2019 at 3:57 PM Guozhang Wang  wrote:
>
> > Boyang, thanks for the updated proposal!
> >
> > 3.a. As Jason mentioned, with EOS enabled we still need to augment the
> > offset fetch request with a boolean to indicate "give me an retriable
> error
> > code if there's pending offset, rather than sending me the committed
> offset
> > immediately". Personally I still feel it is okay to piggy-back on the
> > ISOLATION_LEVEL boolean, but I'm also fine with another
> `await_transaction`
> > boolean if you feel strongly about it.
> >
> > 10. About the exposure of generation id, there may be some refactoring
> work
> > coming from KIP-429 that can benefit KIP-447 as well since we are
> wrapping
> > the consumer subscription / assignment data in new classes. Note that
> > current proposal does not `generationId` since with the cooperative
> sticky
> > assignor we think it is not necessary for correctness, but also if we
> agree
> > it is okay to expose it we can potentially include it in
> > `ConsumerAssignmentData` as well.
> >
> >
> > Guozhang
> >
> >
> > On Thu, Jul 18, 2019 at 3:55 PM Boyang Chen 
> > wrote:
> >
> > > Thank you Jason for the ideas.
> > >
> > > On Mon, Jul 15, 2019 at 5:28 PM Jason Gustafson 
> > > wrote:
> > >
> > > > Hi Boyang,
> > > >
> > > > Thanks for the updates. A few comments below:
> > > >
> > > > 1. The KIP mentions that `transaction.timeout.ms` should be reduced
> to
> > > > 10s.
> > > > I think this makes sense for Kafka Streams which is tied to the
> > consumer
> > > > group semantics and uses a default 10s session timeout. However, it
> > > seems a
> > > > bit dangerous to make this change for the producer generally. Could
> we
> > > just
> > > > change it for streams?
> > > >
> > > > That sounds good to me.
> > >
> > > > 2. The new `initTransactions` API takes a `Consumer` instance. I
> think
> > > the
> > > > idea is to basically put in a backdoor to give the producer access to
> > the
> > > > group generationId. It's not clear to me how this would work given
> > > package
> > > > restrictions. I wonder if it would be better to just expose the state
> > we
> > > > need from the consumer. I know we have been reluctant to do this so
> far
> > > > because we treat the generationId as an implementation detail.
> > However, I
> > > > think we might just bite the bullet and expose it rather than coming
> up
> > > > with a messy hack. Concepts such as memberIds have already been
> exposed
> > > in
> > > > the AdminClient, so maybe it is not too bad. Alternatively, we could
> > use
> > > an
> > > > opaque type. For example:
> > > >
> > > > // public
> > > > interface GroupMetadata {}
> > > >
> > > > // private
> > > > interface ConsumerGroupMetadata {
> > > >   final int generationId;
> > > >   final String memberId;
> > > > }
> > > >
> > > > // Consumer API
> > > > public GroupMetadata groupMetadata();
> > > >
> > > > I am probably leaning toward just exposing the state we need.
> > > >
> > > > Yes, also to mention that Kafka Streams use generic Cosnumer API
> which
> > > doesn't have rich
> > > states like a full `KafkaConsumer`. The hack will not work as expected.
> > >
> > > Instead, just exposing the consumer generation.id seems a way easier
> > work.
> > > We could consolidate
> > > the API and make it
> > >
> > > 3. Given that we are already providing a way to propagate group state
> > from
> > > > the consumer to the producer, I wonder if we may as well include the
> > > > memberId and groupInstanceId. This would make the validation we do
> for
> > > > TxnOffsetCommit consistent with OffsetCommit. If for no other
> benefit,
> > at
> > > > least this may help with debugging.
> > > >
> > >
> > > Yes, we could put them into the GroupMetadata struct.
> > >
> > >
> > > > 4. I like the addition of isolation_level to the offset fetch. At the
> > > same
> > > > time, its behavior is a bit inconsistent with how it is used in the
> > > > consumer generally. There is no reason for the group coordinator to
> > ever
> > > > expose aborted data, so this is mostly about awaiting pending offset
> > > 

Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-07-20 Thread Boyang Chen
Thank you Guozhang for the suggestion! I would normally prefer naming a
flag corresponding to its functionality. Seems to me `isolation_level`
makes us another hop on information track.

As for the generation.id exposure, I'm fine leveraging the new API from
429, but however is that design finalized yet, and whether the API will be
added on the generic Consumer interface?

Boyang

On Fri, Jul 19, 2019 at 3:57 PM Guozhang Wang  wrote:

> Boyang, thanks for the updated proposal!
>
> 3.a. As Jason mentioned, with EOS enabled we still need to augment the
> offset fetch request with a boolean to indicate "give me an retriable error
> code if there's pending offset, rather than sending me the committed offset
> immediately". Personally I still feel it is okay to piggy-back on the
> ISOLATION_LEVEL boolean, but I'm also fine with another `await_transaction`
> boolean if you feel strongly about it.
>
> 10. About the exposure of generation id, there may be some refactoring work
> coming from KIP-429 that can benefit KIP-447 as well since we are wrapping
> the consumer subscription / assignment data in new classes. Note that
> current proposal does not `generationId` since with the cooperative sticky
> assignor we think it is not necessary for correctness, but also if we agree
> it is okay to expose it we can potentially include it in
> `ConsumerAssignmentData` as well.
>
>
> Guozhang
>
>
> On Thu, Jul 18, 2019 at 3:55 PM Boyang Chen 
> wrote:
>
> > Thank you Jason for the ideas.
> >
> > On Mon, Jul 15, 2019 at 5:28 PM Jason Gustafson 
> > wrote:
> >
> > > Hi Boyang,
> > >
> > > Thanks for the updates. A few comments below:
> > >
> > > 1. The KIP mentions that `transaction.timeout.ms` should be reduced to
> > > 10s.
> > > I think this makes sense for Kafka Streams which is tied to the
> consumer
> > > group semantics and uses a default 10s session timeout. However, it
> > seems a
> > > bit dangerous to make this change for the producer generally. Could we
> > just
> > > change it for streams?
> > >
> > > That sounds good to me.
> >
> > > 2. The new `initTransactions` API takes a `Consumer` instance. I think
> > the
> > > idea is to basically put in a backdoor to give the producer access to
> the
> > > group generationId. It's not clear to me how this would work given
> > package
> > > restrictions. I wonder if it would be better to just expose the state
> we
> > > need from the consumer. I know we have been reluctant to do this so far
> > > because we treat the generationId as an implementation detail.
> However, I
> > > think we might just bite the bullet and expose it rather than coming up
> > > with a messy hack. Concepts such as memberIds have already been exposed
> > in
> > > the AdminClient, so maybe it is not too bad. Alternatively, we could
> use
> > an
> > > opaque type. For example:
> > >
> > > // public
> > > interface GroupMetadata {}
> > >
> > > // private
> > > interface ConsumerGroupMetadata {
> > >   final int generationId;
> > >   final String memberId;
> > > }
> > >
> > > // Consumer API
> > > public GroupMetadata groupMetadata();
> > >
> > > I am probably leaning toward just exposing the state we need.
> > >
> > > Yes, also to mention that Kafka Streams use generic Cosnumer API which
> > doesn't have rich
> > states like a full `KafkaConsumer`. The hack will not work as expected.
> >
> > Instead, just exposing the consumer generation.id seems a way easier
> work.
> > We could consolidate
> > the API and make it
> >
> > 3. Given that we are already providing a way to propagate group state
> from
> > > the consumer to the producer, I wonder if we may as well include the
> > > memberId and groupInstanceId. This would make the validation we do for
> > > TxnOffsetCommit consistent with OffsetCommit. If for no other benefit,
> at
> > > least this may help with debugging.
> > >
> >
> > Yes, we could put them into the GroupMetadata struct.
> >
> >
> > > 4. I like the addition of isolation_level to the offset fetch. At the
> > same
> > > time, its behavior is a bit inconsistent with how it is used in the
> > > consumer generally. There is no reason for the group coordinator to
> ever
> > > expose aborted data, so this is mostly about awaiting pending offset
> > > commits, not reading uncommitted data. Perhaps instead of calling this
> > > "isolation level," it should be more like "await_pending_transaction"
> or
> > > something like that?
> > >
> > > Also, just to be clear, the consumer would treat this as an optional
> > field,
> > > right? So if the broker does not support the latest OffsetFetch API, it
> > > would silently revert to reading the old data. Basically it would be up
> > to
> > > the streams version probing logic to ensure that the expectation on
> this
> > > API fits with the usage of `transctional.id`.
> > >
> > > Sounds like a better naming to me, while I think it could be shortened
> to
> > `await_transaction`.
> > I think the field should be optional, too.
> >
> >
> > > Thanks,
> > > 

Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-07-19 Thread Guozhang Wang
Boyang, thanks for the updated proposal!

3.a. As Jason mentioned, with EOS enabled we still need to augment the
offset fetch request with a boolean to indicate "give me an retriable error
code if there's pending offset, rather than sending me the committed offset
immediately". Personally I still feel it is okay to piggy-back on the
ISOLATION_LEVEL boolean, but I'm also fine with another `await_transaction`
boolean if you feel strongly about it.

10. About the exposure of generation id, there may be some refactoring work
coming from KIP-429 that can benefit KIP-447 as well since we are wrapping
the consumer subscription / assignment data in new classes. Note that
current proposal does not `generationId` since with the cooperative sticky
assignor we think it is not necessary for correctness, but also if we agree
it is okay to expose it we can potentially include it in
`ConsumerAssignmentData` as well.


Guozhang


On Thu, Jul 18, 2019 at 3:55 PM Boyang Chen 
wrote:

> Thank you Jason for the ideas.
>
> On Mon, Jul 15, 2019 at 5:28 PM Jason Gustafson 
> wrote:
>
> > Hi Boyang,
> >
> > Thanks for the updates. A few comments below:
> >
> > 1. The KIP mentions that `transaction.timeout.ms` should be reduced to
> > 10s.
> > I think this makes sense for Kafka Streams which is tied to the consumer
> > group semantics and uses a default 10s session timeout. However, it
> seems a
> > bit dangerous to make this change for the producer generally. Could we
> just
> > change it for streams?
> >
> > That sounds good to me.
>
> > 2. The new `initTransactions` API takes a `Consumer` instance. I think
> the
> > idea is to basically put in a backdoor to give the producer access to the
> > group generationId. It's not clear to me how this would work given
> package
> > restrictions. I wonder if it would be better to just expose the state we
> > need from the consumer. I know we have been reluctant to do this so far
> > because we treat the generationId as an implementation detail. However, I
> > think we might just bite the bullet and expose it rather than coming up
> > with a messy hack. Concepts such as memberIds have already been exposed
> in
> > the AdminClient, so maybe it is not too bad. Alternatively, we could use
> an
> > opaque type. For example:
> >
> > // public
> > interface GroupMetadata {}
> >
> > // private
> > interface ConsumerGroupMetadata {
> >   final int generationId;
> >   final String memberId;
> > }
> >
> > // Consumer API
> > public GroupMetadata groupMetadata();
> >
> > I am probably leaning toward just exposing the state we need.
> >
> > Yes, also to mention that Kafka Streams use generic Cosnumer API which
> doesn't have rich
> states like a full `KafkaConsumer`. The hack will not work as expected.
>
> Instead, just exposing the consumer generation.id seems a way easier work.
> We could consolidate
> the API and make it
>
> 3. Given that we are already providing a way to propagate group state from
> > the consumer to the producer, I wonder if we may as well include the
> > memberId and groupInstanceId. This would make the validation we do for
> > TxnOffsetCommit consistent with OffsetCommit. If for no other benefit, at
> > least this may help with debugging.
> >
>
> Yes, we could put them into the GroupMetadata struct.
>
>
> > 4. I like the addition of isolation_level to the offset fetch. At the
> same
> > time, its behavior is a bit inconsistent with how it is used in the
> > consumer generally. There is no reason for the group coordinator to ever
> > expose aborted data, so this is mostly about awaiting pending offset
> > commits, not reading uncommitted data. Perhaps instead of calling this
> > "isolation level," it should be more like "await_pending_transaction" or
> > something like that?
> >
> > Also, just to be clear, the consumer would treat this as an optional
> field,
> > right? So if the broker does not support the latest OffsetFetch API, it
> > would silently revert to reading the old data. Basically it would be up
> to
> > the streams version probing logic to ensure that the expectation on this
> > API fits with the usage of `transctional.id`.
> >
> > Sounds like a better naming to me, while I think it could be shortened to
> `await_transaction`.
> I think the field should be optional, too.
>
>
> > Thanks,
> > Jason
> >
> >
> >
> >
> >
> > On Mon, Jul 8, 2019 at 3:19 PM Boyang Chen 
> > wrote:
> >
> > > Hey Guozhang,
> > >
> > > I will correct my statement from last email. I don't think the
> > > read_committed (3.a) is necessary to be added to the OffsetFetch
> request,
> > > as if we are using EOS application, the underlying consumers within the
> > > group should always back off when there is pending offsets.
> > >
> > > Let me know if you think this is correct.
> > >
> > > On Tue, Jul 2, 2019 at 3:21 PM Boyang Chen  >
> > > wrote:
> > >
> > > > Thank you Guozhang for the questions, inline answers are below.
> > > >
> > > > On Tue, Jul 2, 2019 at 3:14 PM Boyang Chen <
> 

Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-07-18 Thread Boyang Chen
Thank you Jason for the ideas.

On Mon, Jul 15, 2019 at 5:28 PM Jason Gustafson  wrote:

> Hi Boyang,
>
> Thanks for the updates. A few comments below:
>
> 1. The KIP mentions that `transaction.timeout.ms` should be reduced to
> 10s.
> I think this makes sense for Kafka Streams which is tied to the consumer
> group semantics and uses a default 10s session timeout. However, it seems a
> bit dangerous to make this change for the producer generally. Could we just
> change it for streams?
>
> That sounds good to me.

> 2. The new `initTransactions` API takes a `Consumer` instance. I think the
> idea is to basically put in a backdoor to give the producer access to the
> group generationId. It's not clear to me how this would work given package
> restrictions. I wonder if it would be better to just expose the state we
> need from the consumer. I know we have been reluctant to do this so far
> because we treat the generationId as an implementation detail. However, I
> think we might just bite the bullet and expose it rather than coming up
> with a messy hack. Concepts such as memberIds have already been exposed in
> the AdminClient, so maybe it is not too bad. Alternatively, we could use an
> opaque type. For example:
>
> // public
> interface GroupMetadata {}
>
> // private
> interface ConsumerGroupMetadata {
>   final int generationId;
>   final String memberId;
> }
>
> // Consumer API
> public GroupMetadata groupMetadata();
>
> I am probably leaning toward just exposing the state we need.
>
> Yes, also to mention that Kafka Streams use generic Cosnumer API which
doesn't have rich
states like a full `KafkaConsumer`. The hack will not work as expected.

Instead, just exposing the consumer generation.id seems a way easier work.
We could consolidate
the API and make it

3. Given that we are already providing a way to propagate group state from
> the consumer to the producer, I wonder if we may as well include the
> memberId and groupInstanceId. This would make the validation we do for
> TxnOffsetCommit consistent with OffsetCommit. If for no other benefit, at
> least this may help with debugging.
>

Yes, we could put them into the GroupMetadata struct.


> 4. I like the addition of isolation_level to the offset fetch. At the same
> time, its behavior is a bit inconsistent with how it is used in the
> consumer generally. There is no reason for the group coordinator to ever
> expose aborted data, so this is mostly about awaiting pending offset
> commits, not reading uncommitted data. Perhaps instead of calling this
> "isolation level," it should be more like "await_pending_transaction" or
> something like that?
>
> Also, just to be clear, the consumer would treat this as an optional field,
> right? So if the broker does not support the latest OffsetFetch API, it
> would silently revert to reading the old data. Basically it would be up to
> the streams version probing logic to ensure that the expectation on this
> API fits with the usage of `transctional.id`.
>
> Sounds like a better naming to me, while I think it could be shortened to
`await_transaction`.
I think the field should be optional, too.


> Thanks,
> Jason
>
>
>
>
>
> On Mon, Jul 8, 2019 at 3:19 PM Boyang Chen 
> wrote:
>
> > Hey Guozhang,
> >
> > I will correct my statement from last email. I don't think the
> > read_committed (3.a) is necessary to be added to the OffsetFetch request,
> > as if we are using EOS application, the underlying consumers within the
> > group should always back off when there is pending offsets.
> >
> > Let me know if you think this is correct.
> >
> > On Tue, Jul 2, 2019 at 3:21 PM Boyang Chen 
> > wrote:
> >
> > > Thank you Guozhang for the questions, inline answers are below.
> > >
> > > On Tue, Jul 2, 2019 at 3:14 PM Boyang Chen  >
> > > wrote:
> > >
> > >> Hey all,
> > >>
> > >> I have done a fundamental polish of KIP-447
> > >> <
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics
> >
> > and
> > >> written a design doc
> > >> <
> >
> https://docs.google.com/document/d/1LhzHGeX7_Lay4xvrEXxfciuDWATjpUXQhrEIkph9qRE/edit#
> >
> > depicting
> > >> internal changes. We stripped off many implementation details from the
> > KIP,
> > >> and simplified the public changes by a lot. For reviewers, it is
> highly
> > >> recommended to fully understand EOS design in KIP-98 and read its
> > >> corresponding design doc
> > >> <
> >
> https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF0wSw9ra8/edit
> >
> > if
> > >> you haven't done so already.
> > >>
> > >> Let me know if you found anything confusing around the KIP or the
> > design.
> > >> Would be happy to discuss in depth.
> > >>
> > >> Best,
> > >> Boyang
> > >>
> > >>
> > >>
> > >>
> > >>
> > >> On Wed, Jun 26, 2019 at 11:00 AM Guozhang Wang 
> > >> wrote:
> > >>
> > >>> 2. The reason we did not expose generation.id from KafkaConsumer
> > public
> > >>> APIs directly is to abstract this notion from 

Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-07-15 Thread Jason Gustafson
Hi Boyang,

Thanks for the updates. A few comments below:

1. The KIP mentions that `transaction.timeout.ms` should be reduced to 10s.
I think this makes sense for Kafka Streams which is tied to the consumer
group semantics and uses a default 10s session timeout. However, it seems a
bit dangerous to make this change for the producer generally. Could we just
change it for streams?

2. The new `initTransactions` API takes a `Consumer` instance. I think the
idea is to basically put in a backdoor to give the producer access to the
group generationId. It's not clear to me how this would work given package
restrictions. I wonder if it would be better to just expose the state we
need from the consumer. I know we have been reluctant to do this so far
because we treat the generationId as an implementation detail. However, I
think we might just bite the bullet and expose it rather than coming up
with a messy hack. Concepts such as memberIds have already been exposed in
the AdminClient, so maybe it is not too bad. Alternatively, we could use an
opaque type. For example:

// public
interface GroupMetadata {}

// private
interface ConsumerGroupMetadata {
  final int generationId;
  final String memberId;
}

// Consumer API
public GroupMetadata groupMetadata();

I am probably leaning toward just exposing the state we need.

3. Given that we are already providing a way to propagate group state from
the consumer to the producer, I wonder if we may as well include the
memberId and groupInstanceId. This would make the validation we do for
TxnOffsetCommit consistent with OffsetCommit. If for no other benefit, at
least this may help with debugging.

4. I like the addition of isolation_level to the offset fetch. At the same
time, its behavior is a bit inconsistent with how it is used in the
consumer generally. There is no reason for the group coordinator to ever
expose aborted data, so this is mostly about awaiting pending offset
commits, not reading uncommitted data. Perhaps instead of calling this
"isolation level," it should be more like "await_pending_transaction" or
something like that?

Also, just to be clear, the consumer would treat this as an optional field,
right? So if the broker does not support the latest OffsetFetch API, it
would silently revert to reading the old data. Basically it would be up to
the streams version probing logic to ensure that the expectation on this
API fits with the usage of `transctional.id`.

Thanks,
Jason





On Mon, Jul 8, 2019 at 3:19 PM Boyang Chen 
wrote:

> Hey Guozhang,
>
> I will correct my statement from last email. I don't think the
> read_committed (3.a) is necessary to be added to the OffsetFetch request,
> as if we are using EOS application, the underlying consumers within the
> group should always back off when there is pending offsets.
>
> Let me know if you think this is correct.
>
> On Tue, Jul 2, 2019 at 3:21 PM Boyang Chen 
> wrote:
>
> > Thank you Guozhang for the questions, inline answers are below.
> >
> > On Tue, Jul 2, 2019 at 3:14 PM Boyang Chen 
> > wrote:
> >
> >> Hey all,
> >>
> >> I have done a fundamental polish of KIP-447
> >> <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics>
> and
> >> written a design doc
> >> <
> https://docs.google.com/document/d/1LhzHGeX7_Lay4xvrEXxfciuDWATjpUXQhrEIkph9qRE/edit#>
> depicting
> >> internal changes. We stripped off many implementation details from the
> KIP,
> >> and simplified the public changes by a lot. For reviewers, it is highly
> >> recommended to fully understand EOS design in KIP-98 and read its
> >> corresponding design doc
> >> <
> https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF0wSw9ra8/edit>
> if
> >> you haven't done so already.
> >>
> >> Let me know if you found anything confusing around the KIP or the
> design.
> >> Would be happy to discuss in depth.
> >>
> >> Best,
> >> Boyang
> >>
> >>
> >>
> >>
> >>
> >> On Wed, Jun 26, 2019 at 11:00 AM Guozhang Wang 
> >> wrote:
> >>
> >>> 2. The reason we did not expose generation.id from KafkaConsumer
> public
> >>> APIs directly is to abstract this notion from users (since it is an
> >>> implementation detail of the rebalance protocol itself, e.g. if user
> >>> calls
> >>> consumer.assign() they do not need to invoke ConsumerCoordinator and no
> >>> need to be aware of generation.id at all).
> >>>
> >>> On the other hand, with the current proposal the txn.coordiantor did
> not
> >>> know about the latest generation from the source-of-truth
> >>> group.coordinator; instead, it will only bump up the generation from
> the
> >>> producer's InitProducerIdRequest only.
> >>>
> >>> The key here is that GroupCoordinator, when handling
> >>> `InitProducerIdRequest
> >>>
> >> In the new design, we just pass the entire consumer instance into the
> > producer through
> > #initTransaction, so no public API will be created.
> >
> >> 3. I agree that if we rely on the group coordinator to block 

Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-07-08 Thread Boyang Chen
Hey Guozhang,

I will correct my statement from last email. I don't think the
read_committed (3.a) is necessary to be added to the OffsetFetch request,
as if we are using EOS application, the underlying consumers within the
group should always back off when there is pending offsets.

Let me know if you think this is correct.

On Tue, Jul 2, 2019 at 3:21 PM Boyang Chen 
wrote:

> Thank you Guozhang for the questions, inline answers are below.
>
> On Tue, Jul 2, 2019 at 3:14 PM Boyang Chen 
> wrote:
>
>> Hey all,
>>
>> I have done a fundamental polish of KIP-447
>> 
>>  and
>> written a design doc
>> 
>>  depicting
>> internal changes. We stripped off many implementation details from the KIP,
>> and simplified the public changes by a lot. For reviewers, it is highly
>> recommended to fully understand EOS design in KIP-98 and read its
>> corresponding design doc
>> 
>>  if
>> you haven't done so already.
>>
>> Let me know if you found anything confusing around the KIP or the design.
>> Would be happy to discuss in depth.
>>
>> Best,
>> Boyang
>>
>>
>>
>>
>>
>> On Wed, Jun 26, 2019 at 11:00 AM Guozhang Wang 
>> wrote:
>>
>>> 2. The reason we did not expose generation.id from KafkaConsumer public
>>> APIs directly is to abstract this notion from users (since it is an
>>> implementation detail of the rebalance protocol itself, e.g. if user
>>> calls
>>> consumer.assign() they do not need to invoke ConsumerCoordinator and no
>>> need to be aware of generation.id at all).
>>>
>>> On the other hand, with the current proposal the txn.coordiantor did not
>>> know about the latest generation from the source-of-truth
>>> group.coordinator; instead, it will only bump up the generation from the
>>> producer's InitProducerIdRequest only.
>>>
>>> The key here is that GroupCoordinator, when handling
>>> `InitProducerIdRequest
>>>
>> In the new design, we just pass the entire consumer instance into the
> producer through
> #initTransaction, so no public API will be created.
>
>> 3. I agree that if we rely on the group coordinator to block on returning
>>> offset-fetch-response if read-committed is enabled, then we do not need
>>> to
>>> store partition assignment on txn coordinator and therefore it's better
>>> to
>>> still decouple them. For that case we still need to update the KIP wiki
>>> page that includes:
>>>
>>> 3.a. Augment OffsetFetchRequest with the ISOLATION_LEVEL as well.
>>> 3.b. Add new error code in OffsetFetchResponse to let client backoff and
>>> retry if there are pending txns including the interested partitions.
>>> 3.c. Also in the worst case we would let the client be blocked for the
>>> txn.timeout period, and for that rationale we may need to consider
>>> reducing
>>> our default txn.timeout value as well.
>>>
>>> Addressed 3.b and 3.c, will do 3.a.
>
>> 4. According to Colin it seems we do not need to create another KIP and we
>>> can just complete it as part of KIP-117 / KAFKA-5214; and we need to do
>>> some cleanup to have BrokerApiVersion exposed from AdminClient (@Colin
>>> please let use know if you have any concerns exposing it).
>>>
>> I think we no longer need to rely on api version for initialization,
> since we will be using the upgrade.from config anyway.
>
>>
>>> Guozhang
>>>
>>>
>>> On Tue, Jun 25, 2019 at 6:43 PM Jason Gustafson 
>>> wrote:
>>>
>>> > For reference, we have BrokerApiVersionCommand already as a public
>>> > interface. We have a bit of tech debt at the moment because it uses a
>>> > custom AdminClient. It would be nice to clean that up. In general, I
>>> think
>>> > it is reasonable to expose from AdminClient. It can be used by
>>> management
>>> > tools to inspect running Kafka versions for example.
>>> >
>>> > -Jason
>>> >
>>> > On Tue, Jun 25, 2019 at 4:37 PM Boyang Chen <
>>> reluctanthero...@gmail.com>
>>> > wrote:
>>> >
>>> > > Thank you for the context Colin. The groupId was indeed a copy-paste
>>> > error.
>>> > > Our use case here for 447 is (Quoted from Guozhang):
>>> > > '''
>>> > > I think if we can do something else to
>>> > > avoid this config though, for example we can use the embedded
>>> AdminClient
>>> > > to send the APIVersion request upon starting up, and based on the
>>> > returned
>>> > > value decides whether to go to the old code path or the new behavior.
>>> > > '''
>>> > > The benefit we get is to avoid adding a new configuration to make a
>>> > > decision simply base on broker version. If you have concerns with
>>> > exposing
>>> > > ApiVersion for client, we could
>>> > > try to think of alternative solutions too.
>>> > >
>>> > > Boyang
>>> > >
>>> > >
>>> > >
>>> > > On Tue, Jun 25, 2019 at 4:20 PM Colin McCabe 
>>> wrote:
>>> > >
>>> > > > kafka.api.ApiVersion is 

Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-07-02 Thread Boyang Chen
Thank you Guozhang for the questions, inline answers are below.

On Tue, Jul 2, 2019 at 3:14 PM Boyang Chen 
wrote:

> Hey all,
>
> I have done a fundamental polish of KIP-447
> 
>  and
> written a design doc
> 
>  depicting
> internal changes. We stripped off many implementation details from the KIP,
> and simplified the public changes by a lot. For reviewers, it is highly
> recommended to fully understand EOS design in KIP-98 and read its
> corresponding design doc
> 
>  if
> you haven't done so already.
>
> Let me know if you found anything confusing around the KIP or the design.
> Would be happy to discuss in depth.
>
> Best,
> Boyang
>
>
>
>
>
> On Wed, Jun 26, 2019 at 11:00 AM Guozhang Wang  wrote:
>
>> 2. The reason we did not expose generation.id from KafkaConsumer public
>> APIs directly is to abstract this notion from users (since it is an
>> implementation detail of the rebalance protocol itself, e.g. if user calls
>> consumer.assign() they do not need to invoke ConsumerCoordinator and no
>> need to be aware of generation.id at all).
>>
>> On the other hand, with the current proposal the txn.coordiantor did not
>> know about the latest generation from the source-of-truth
>> group.coordinator; instead, it will only bump up the generation from the
>> producer's InitProducerIdRequest only.
>>
>> The key here is that GroupCoordinator, when handling
>> `InitProducerIdRequest
>>
> In the new design, we just pass the entire consumer instance into the
producer through
#initTransaction, so no public API will be created.

> 3. I agree that if we rely on the group coordinator to block on returning
>> offset-fetch-response if read-committed is enabled, then we do not need to
>> store partition assignment on txn coordinator and therefore it's better to
>> still decouple them. For that case we still need to update the KIP wiki
>> page that includes:
>>
>> 3.a. Augment OffsetFetchRequest with the ISOLATION_LEVEL as well.
>> 3.b. Add new error code in OffsetFetchResponse to let client backoff and
>> retry if there are pending txns including the interested partitions.
>> 3.c. Also in the worst case we would let the client be blocked for the
>> txn.timeout period, and for that rationale we may need to consider
>> reducing
>> our default txn.timeout value as well.
>>
>> Addressed 3.b and 3.c, will do 3.a.

> 4. According to Colin it seems we do not need to create another KIP and we
>> can just complete it as part of KIP-117 / KAFKA-5214; and we need to do
>> some cleanup to have BrokerApiVersion exposed from AdminClient (@Colin
>> please let use know if you have any concerns exposing it).
>>
> I think we no longer need to rely on api version for initialization, since
we will be using the upgrade.from config anyway.

>
>> Guozhang
>>
>>
>> On Tue, Jun 25, 2019 at 6:43 PM Jason Gustafson 
>> wrote:
>>
>> > For reference, we have BrokerApiVersionCommand already as a public
>> > interface. We have a bit of tech debt at the moment because it uses a
>> > custom AdminClient. It would be nice to clean that up. In general, I
>> think
>> > it is reasonable to expose from AdminClient. It can be used by
>> management
>> > tools to inspect running Kafka versions for example.
>> >
>> > -Jason
>> >
>> > On Tue, Jun 25, 2019 at 4:37 PM Boyang Chen > >
>> > wrote:
>> >
>> > > Thank you for the context Colin. The groupId was indeed a copy-paste
>> > error.
>> > > Our use case here for 447 is (Quoted from Guozhang):
>> > > '''
>> > > I think if we can do something else to
>> > > avoid this config though, for example we can use the embedded
>> AdminClient
>> > > to send the APIVersion request upon starting up, and based on the
>> > returned
>> > > value decides whether to go to the old code path or the new behavior.
>> > > '''
>> > > The benefit we get is to avoid adding a new configuration to make a
>> > > decision simply base on broker version. If you have concerns with
>> > exposing
>> > > ApiVersion for client, we could
>> > > try to think of alternative solutions too.
>> > >
>> > > Boyang
>> > >
>> > >
>> > >
>> > > On Tue, Jun 25, 2019 at 4:20 PM Colin McCabe 
>> wrote:
>> > >
>> > > > kafka.api.ApiVersion is an internal class, not suitable to exposing
>> > > > through AdminClient.  That class is not even accessible without
>> having
>> > > the
>> > > > broker jars on your CLASSPATH.
>> > > >
>> > > > Another question is, what is the groupId parameter doing in the
>> call?
>> > > The
>> > > > API versions are the same no matter what consumer group we use,
>> right?
>> > > > Perhaps this was a copy and paste error?
>> > > >
>> > > > This is not the first time we have discussed having a method in
>> > > > AdminClient to retrieve API version 

Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-07-02 Thread Boyang Chen
Hey all,

I have done a fundamental polish of KIP-447

and
written a design doc

depicting
internal changes. We stripped off many implementation details from the KIP,
and simplified the public changes by a lot. For reviewers, it is highly
recommended to fully understand EOS design in KIP-98 and read its
corresponding design doc

if
you haven't done so already.

Let me know if you found anything confusing around the KIP or the design.
Would be happy to discuss in depth.

Best,
Boyang





On Wed, Jun 26, 2019 at 11:00 AM Guozhang Wang  wrote:

> 2. The reason we did not expose generation.id from KafkaConsumer public
> APIs directly is to abstract this notion from users (since it is an
> implementation detail of the rebalance protocol itself, e.g. if user calls
> consumer.assign() they do not need to invoke ConsumerCoordinator and no
> need to be aware of generation.id at all).
>
> On the other hand, with the current proposal the txn.coordiantor did not
> know about the latest generation from the source-of-truth
> group.coordinator; instead, it will only bump up the generation from the
> producer's InitProducerIdRequest only.
>
> The key here is that GroupCoordinator, when handling `InitProducerIdRequest
>
> 3. I agree that if we rely on the group coordinator to block on returning
> offset-fetch-response if read-committed is enabled, then we do not need to
> store partition assignment on txn coordinator and therefore it's better to
> still decouple them. For that case we still need to update the KIP wiki
> page that includes:
>
> 3.a. Augment OffsetFetchRequest with the ISOLATION_LEVEL as well.
> 3.b. Add new error code in OffsetFetchResponse to let client backoff and
> retry if there are pending txns including the interested partitions.
> 3.c. Also in the worst case we would let the client be blocked for the
> txn.timeout period, and for that rationale we may need to consider reducing
> our default txn.timeout value as well.
>
> 4. According to Colin it seems we do not need to create another KIP and we
> can just complete it as part of KIP-117 / KAFKA-5214; and we need to do
> some cleanup to have BrokerApiVersion exposed from AdminClient (@Colin
> please let use know if you have any concerns exposing it).
>
>
> Guozhang
>
>
> On Tue, Jun 25, 2019 at 6:43 PM Jason Gustafson 
> wrote:
>
> > For reference, we have BrokerApiVersionCommand already as a public
> > interface. We have a bit of tech debt at the moment because it uses a
> > custom AdminClient. It would be nice to clean that up. In general, I
> think
> > it is reasonable to expose from AdminClient. It can be used by management
> > tools to inspect running Kafka versions for example.
> >
> > -Jason
> >
> > On Tue, Jun 25, 2019 at 4:37 PM Boyang Chen 
> > wrote:
> >
> > > Thank you for the context Colin. The groupId was indeed a copy-paste
> > error.
> > > Our use case here for 447 is (Quoted from Guozhang):
> > > '''
> > > I think if we can do something else to
> > > avoid this config though, for example we can use the embedded
> AdminClient
> > > to send the APIVersion request upon starting up, and based on the
> > returned
> > > value decides whether to go to the old code path or the new behavior.
> > > '''
> > > The benefit we get is to avoid adding a new configuration to make a
> > > decision simply base on broker version. If you have concerns with
> > exposing
> > > ApiVersion for client, we could
> > > try to think of alternative solutions too.
> > >
> > > Boyang
> > >
> > >
> > >
> > > On Tue, Jun 25, 2019 at 4:20 PM Colin McCabe 
> wrote:
> > >
> > > > kafka.api.ApiVersion is an internal class, not suitable to exposing
> > > > through AdminClient.  That class is not even accessible without
> having
> > > the
> > > > broker jars on your CLASSPATH.
> > > >
> > > > Another question is, what is the groupId parameter doing in the call?
> > > The
> > > > API versions are the same no matter what consumer group we use,
> right?
> > > > Perhaps this was a copy and paste error?
> > > >
> > > > This is not the first time we have discussed having a method in
> > > > AdminClient to retrieve API version information.  In fact, the
> original
> > > KIP
> > > > which created KafkaAdminClient specified an API for fetching version
> > > > information.  It was called apiVersions and it is still there on the
> > > wiki.
> > > > See
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-117%3A+Add+a+public+AdminClient+API+for+Kafka+admin+operations
> > > >
> > > > However, this API wasn't ready in time for 0.11.0 so we shipped
> without
> > > > it.  There was a JIRA to implement it for later versions,
> > > > https://issues.apache.org/jira/browse/KAFKA-5214 , as well 

Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-06-26 Thread Guozhang Wang
2. The reason we did not expose generation.id from KafkaConsumer public
APIs directly is to abstract this notion from users (since it is an
implementation detail of the rebalance protocol itself, e.g. if user calls
consumer.assign() they do not need to invoke ConsumerCoordinator and no
need to be aware of generation.id at all).

On the other hand, with the current proposal the txn.coordiantor did not
know about the latest generation from the source-of-truth
group.coordinator; instead, it will only bump up the generation from the
producer's InitProducerIdRequest only.

The key here is that GroupCoordinator, when handling `InitProducerIdRequest

3. I agree that if we rely on the group coordinator to block on returning
offset-fetch-response if read-committed is enabled, then we do not need to
store partition assignment on txn coordinator and therefore it's better to
still decouple them. For that case we still need to update the KIP wiki
page that includes:

3.a. Augment OffsetFetchRequest with the ISOLATION_LEVEL as well.
3.b. Add new error code in OffsetFetchResponse to let client backoff and
retry if there are pending txns including the interested partitions.
3.c. Also in the worst case we would let the client be blocked for the
txn.timeout period, and for that rationale we may need to consider reducing
our default txn.timeout value as well.

4. According to Colin it seems we do not need to create another KIP and we
can just complete it as part of KIP-117 / KAFKA-5214; and we need to do
some cleanup to have BrokerApiVersion exposed from AdminClient (@Colin
please let use know if you have any concerns exposing it).


Guozhang


On Tue, Jun 25, 2019 at 6:43 PM Jason Gustafson  wrote:

> For reference, we have BrokerApiVersionCommand already as a public
> interface. We have a bit of tech debt at the moment because it uses a
> custom AdminClient. It would be nice to clean that up. In general, I think
> it is reasonable to expose from AdminClient. It can be used by management
> tools to inspect running Kafka versions for example.
>
> -Jason
>
> On Tue, Jun 25, 2019 at 4:37 PM Boyang Chen 
> wrote:
>
> > Thank you for the context Colin. The groupId was indeed a copy-paste
> error.
> > Our use case here for 447 is (Quoted from Guozhang):
> > '''
> > I think if we can do something else to
> > avoid this config though, for example we can use the embedded AdminClient
> > to send the APIVersion request upon starting up, and based on the
> returned
> > value decides whether to go to the old code path or the new behavior.
> > '''
> > The benefit we get is to avoid adding a new configuration to make a
> > decision simply base on broker version. If you have concerns with
> exposing
> > ApiVersion for client, we could
> > try to think of alternative solutions too.
> >
> > Boyang
> >
> >
> >
> > On Tue, Jun 25, 2019 at 4:20 PM Colin McCabe  wrote:
> >
> > > kafka.api.ApiVersion is an internal class, not suitable to exposing
> > > through AdminClient.  That class is not even accessible without having
> > the
> > > broker jars on your CLASSPATH.
> > >
> > > Another question is, what is the groupId parameter doing in the call?
> > The
> > > API versions are the same no matter what consumer group we use, right?
> > > Perhaps this was a copy and paste error?
> > >
> > > This is not the first time we have discussed having a method in
> > > AdminClient to retrieve API version information.  In fact, the original
> > KIP
> > > which created KafkaAdminClient specified an API for fetching version
> > > information.  It was called apiVersions and it is still there on the
> > wiki.
> > > See
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-117%3A+Add+a+public+AdminClient+API+for+Kafka+admin+operations
> > >
> > > However, this API wasn't ready in time for 0.11.0 so we shipped without
> > > it.  There was a JIRA to implement it for later versions,
> > > https://issues.apache.org/jira/browse/KAFKA-5214 , as well as a PR,
> > > https://github.com/apache/kafka/pull/3012 .  However, we started to
> > > rethink whether this AdminClient function was even necessary.  Most of
> > the
> > > use-cases we could think of seemed like horrible hacks.  So it has
> never
> > > really been implemented (yet?).
> > >
> > > best,
> > > Colin
> > >
> > >
> > > On Tue, Jun 25, 2019, at 15:46, Boyang Chen wrote:
> > > > Actually, after a second thought, I think it actually makes sense to
> > > > support auto upgrade through admin client to help use get api version
> > > > from
> > > > broker.
> > > > A draft KIP is here:
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-483%3A++Add+Broker+Version+API+in+Admin+Client
> > > >
> > > > Boyang
> > > >
> > > > On Tue, Jun 25, 2019 at 2:57 PM Boyang Chen <
> > reluctanthero...@gmail.com>
> > > > wrote:
> > > >
> > > > > Thank you Guozhang, some of my understandings are inline below.
> > > > >
> > > > > On Tue, Jun 25, 2019 at 11:05 AM Jason Gustafson <
> ja...@confluent.io
> 

Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-06-25 Thread Jason Gustafson
For reference, we have BrokerApiVersionCommand already as a public
interface. We have a bit of tech debt at the moment because it uses a
custom AdminClient. It would be nice to clean that up. In general, I think
it is reasonable to expose from AdminClient. It can be used by management
tools to inspect running Kafka versions for example.

-Jason

On Tue, Jun 25, 2019 at 4:37 PM Boyang Chen 
wrote:

> Thank you for the context Colin. The groupId was indeed a copy-paste error.
> Our use case here for 447 is (Quoted from Guozhang):
> '''
> I think if we can do something else to
> avoid this config though, for example we can use the embedded AdminClient
> to send the APIVersion request upon starting up, and based on the returned
> value decides whether to go to the old code path or the new behavior.
> '''
> The benefit we get is to avoid adding a new configuration to make a
> decision simply base on broker version. If you have concerns with exposing
> ApiVersion for client, we could
> try to think of alternative solutions too.
>
> Boyang
>
>
>
> On Tue, Jun 25, 2019 at 4:20 PM Colin McCabe  wrote:
>
> > kafka.api.ApiVersion is an internal class, not suitable to exposing
> > through AdminClient.  That class is not even accessible without having
> the
> > broker jars on your CLASSPATH.
> >
> > Another question is, what is the groupId parameter doing in the call?
> The
> > API versions are the same no matter what consumer group we use, right?
> > Perhaps this was a copy and paste error?
> >
> > This is not the first time we have discussed having a method in
> > AdminClient to retrieve API version information.  In fact, the original
> KIP
> > which created KafkaAdminClient specified an API for fetching version
> > information.  It was called apiVersions and it is still there on the
> wiki.
> > See
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-117%3A+Add+a+public+AdminClient+API+for+Kafka+admin+operations
> >
> > However, this API wasn't ready in time for 0.11.0 so we shipped without
> > it.  There was a JIRA to implement it for later versions,
> > https://issues.apache.org/jira/browse/KAFKA-5214 , as well as a PR,
> > https://github.com/apache/kafka/pull/3012 .  However, we started to
> > rethink whether this AdminClient function was even necessary.  Most of
> the
> > use-cases we could think of seemed like horrible hacks.  So it has never
> > really been implemented (yet?).
> >
> > best,
> > Colin
> >
> >
> > On Tue, Jun 25, 2019, at 15:46, Boyang Chen wrote:
> > > Actually, after a second thought, I think it actually makes sense to
> > > support auto upgrade through admin client to help use get api version
> > > from
> > > broker.
> > > A draft KIP is here:
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-483%3A++Add+Broker+Version+API+in+Admin+Client
> > >
> > > Boyang
> > >
> > > On Tue, Jun 25, 2019 at 2:57 PM Boyang Chen <
> reluctanthero...@gmail.com>
> > > wrote:
> > >
> > > > Thank you Guozhang, some of my understandings are inline below.
> > > >
> > > > On Tue, Jun 25, 2019 at 11:05 AM Jason Gustafson  >
> > > > wrote:
> > > >
> > > >> >
> > > >> > I think co-locating does have some merits here, i.e. letting the
> > > >> > ConsumerCoordinator which has the source-of-truth of assignment to
> > act
> > > >> as
> > > >> > the TxnCoordinator as well; but I agree there's also some cons of
> > > >> coupling
> > > >> > them together. I'm still a bit inclining towards colocation but if
> > there
> > > >> > are good rationales not to do so I can be convinced as well.
> > > >>
> > > >>
> > > >> The good rationale is that we have no mechanism to colocate
> > partitions ;).
> > > >> Are you suggesting we store the group and transaction state in the
> > same
> > > >> log? Can you be more concrete about the benefit?
> > > >>
> > > >> -Jason
> > > >>
> > > >> On Tue, Jun 25, 2019 at 10:51 AM Guozhang Wang 
> > > >> wrote:
> > > >>
> > > >> > Hi Boyang,
> > > >> >
> > > >> > 1. One advantage of retry against on-hold is that it will not
> > tie-up a
> > > >> > handler thread (of course the latter could do the same but that
> > involves
> > > >> > using a purgatory which is more complicated), and also it is less
> > > >> likely to
> > > >> > violate request timeout. So I think there are some rationales to
> > prefer
> > > >> > retries.
> > > >> >
> > > >>
> > > >  That sounds fair to me, also we are avoiding usage of another
> > purgatory
> > > > instance. Usually for one back-off
> > > > we are only delaying 50ms during startup which is trivial cost. This
> > > > behavior shouldn't be changed.
> > > >
> > > > > 2. Regarding "ConsumerRebalanceListener": both
> > ConsumerRebalanceListener
> > > >> > and PartitionAssignors are user-customizable modules, and only
> > > >> difference
> > > >> > is that the former is specified via code and the latter is
> > specified via
> > > >> > config.
> > > >> >
> > > >> > Regarding Jason's proposal of ConsumerAssignment, one thing to
> note
> > > >> though

Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-06-25 Thread Boyang Chen
Thank you for the context Colin. The groupId was indeed a copy-paste error.
Our use case here for 447 is (Quoted from Guozhang):
'''
I think if we can do something else to
avoid this config though, for example we can use the embedded AdminClient
to send the APIVersion request upon starting up, and based on the returned
value decides whether to go to the old code path or the new behavior.
'''
The benefit we get is to avoid adding a new configuration to make a
decision simply base on broker version. If you have concerns with exposing
ApiVersion for client, we could
try to think of alternative solutions too.

Boyang



On Tue, Jun 25, 2019 at 4:20 PM Colin McCabe  wrote:

> kafka.api.ApiVersion is an internal class, not suitable to exposing
> through AdminClient.  That class is not even accessible without having the
> broker jars on your CLASSPATH.
>
> Another question is, what is the groupId parameter doing in the call?  The
> API versions are the same no matter what consumer group we use, right?
> Perhaps this was a copy and paste error?
>
> This is not the first time we have discussed having a method in
> AdminClient to retrieve API version information.  In fact, the original KIP
> which created KafkaAdminClient specified an API for fetching version
> information.  It was called apiVersions and it is still there on the wiki.
> See
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-117%3A+Add+a+public+AdminClient+API+for+Kafka+admin+operations
>
> However, this API wasn't ready in time for 0.11.0 so we shipped without
> it.  There was a JIRA to implement it for later versions,
> https://issues.apache.org/jira/browse/KAFKA-5214 , as well as a PR,
> https://github.com/apache/kafka/pull/3012 .  However, we started to
> rethink whether this AdminClient function was even necessary.  Most of the
> use-cases we could think of seemed like horrible hacks.  So it has never
> really been implemented (yet?).
>
> best,
> Colin
>
>
> On Tue, Jun 25, 2019, at 15:46, Boyang Chen wrote:
> > Actually, after a second thought, I think it actually makes sense to
> > support auto upgrade through admin client to help use get api version
> > from
> > broker.
> > A draft KIP is here:
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-483%3A++Add+Broker+Version+API+in+Admin+Client
> >
> > Boyang
> >
> > On Tue, Jun 25, 2019 at 2:57 PM Boyang Chen 
> > wrote:
> >
> > > Thank you Guozhang, some of my understandings are inline below.
> > >
> > > On Tue, Jun 25, 2019 at 11:05 AM Jason Gustafson 
> > > wrote:
> > >
> > >> >
> > >> > I think co-locating does have some merits here, i.e. letting the
> > >> > ConsumerCoordinator which has the source-of-truth of assignment to
> act
> > >> as
> > >> > the TxnCoordinator as well; but I agree there's also some cons of
> > >> coupling
> > >> > them together. I'm still a bit inclining towards colocation but if
> there
> > >> > are good rationales not to do so I can be convinced as well.
> > >>
> > >>
> > >> The good rationale is that we have no mechanism to colocate
> partitions ;).
> > >> Are you suggesting we store the group and transaction state in the
> same
> > >> log? Can you be more concrete about the benefit?
> > >>
> > >> -Jason
> > >>
> > >> On Tue, Jun 25, 2019 at 10:51 AM Guozhang Wang 
> > >> wrote:
> > >>
> > >> > Hi Boyang,
> > >> >
> > >> > 1. One advantage of retry against on-hold is that it will not
> tie-up a
> > >> > handler thread (of course the latter could do the same but that
> involves
> > >> > using a purgatory which is more complicated), and also it is less
> > >> likely to
> > >> > violate request timeout. So I think there are some rationales to
> prefer
> > >> > retries.
> > >> >
> > >>
> > >  That sounds fair to me, also we are avoiding usage of another
> purgatory
> > > instance. Usually for one back-off
> > > we are only delaying 50ms during startup which is trivial cost. This
> > > behavior shouldn't be changed.
> > >
> > > > 2. Regarding "ConsumerRebalanceListener": both
> ConsumerRebalanceListener
> > >> > and PartitionAssignors are user-customizable modules, and only
> > >> difference
> > >> > is that the former is specified via code and the latter is
> specified via
> > >> > config.
> > >> >
> > >> > Regarding Jason's proposal of ConsumerAssignment, one thing to note
> > >> though
> > >> > with KIP-429 the onPartitionAssigned may not be called if the
> assignment
> > >> > does not change, whereas onAssignment would always be called at the
> end
> > >> of
> > >> > sync-group response. My proposed semantics is that
> > >> > `RebalanceListener#onPartitionsXXX` are used for notifications to
> user,
> > >> and
> > >> > hence if there's no changes these will not be called, whereas
> > >> > `PartitionAssignor` is used for assignor logic, whose callback would
> > >> always
> > >> > be called no matter if the partitions have changed or not.
> > >>
> > >> I think a third option is to gracefully expose generation id as part
> of
> > > consumer API, so that 

Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-06-25 Thread Colin McCabe
kafka.api.ApiVersion is an internal class, not suitable to exposing through 
AdminClient.  That class is not even accessible without having the broker jars 
on your CLASSPATH.

Another question is, what is the groupId parameter doing in the call?  The API 
versions are the same no matter what consumer group we use, right?  Perhaps 
this was a copy and paste error?

This is not the first time we have discussed having a method in AdminClient to 
retrieve API version information.  In fact, the original KIP which created 
KafkaAdminClient specified an API for fetching version information.  It was 
called apiVersions and it is still there on the wiki.  See 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-117%3A+Add+a+public+AdminClient+API+for+Kafka+admin+operations

However, this API wasn't ready in time for 0.11.0 so we shipped without it.  
There was a JIRA to implement it for later versions, 
https://issues.apache.org/jira/browse/KAFKA-5214 , as well as a PR, 
https://github.com/apache/kafka/pull/3012 .  However, we started to rethink 
whether this AdminClient function was even necessary.  Most of the use-cases we 
could think of seemed like horrible hacks.  So it has never really been 
implemented (yet?).

best,
Colin


On Tue, Jun 25, 2019, at 15:46, Boyang Chen wrote:
> Actually, after a second thought, I think it actually makes sense to
> support auto upgrade through admin client to help use get api version 
> from
> broker.
> A draft KIP is here:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-483%3A++Add+Broker+Version+API+in+Admin+Client
> 
> Boyang
> 
> On Tue, Jun 25, 2019 at 2:57 PM Boyang Chen 
> wrote:
> 
> > Thank you Guozhang, some of my understandings are inline below.
> >
> > On Tue, Jun 25, 2019 at 11:05 AM Jason Gustafson 
> > wrote:
> >
> >> >
> >> > I think co-locating does have some merits here, i.e. letting the
> >> > ConsumerCoordinator which has the source-of-truth of assignment to act
> >> as
> >> > the TxnCoordinator as well; but I agree there's also some cons of
> >> coupling
> >> > them together. I'm still a bit inclining towards colocation but if there
> >> > are good rationales not to do so I can be convinced as well.
> >>
> >>
> >> The good rationale is that we have no mechanism to colocate partitions ;).
> >> Are you suggesting we store the group and transaction state in the same
> >> log? Can you be more concrete about the benefit?
> >>
> >> -Jason
> >>
> >> On Tue, Jun 25, 2019 at 10:51 AM Guozhang Wang 
> >> wrote:
> >>
> >> > Hi Boyang,
> >> >
> >> > 1. One advantage of retry against on-hold is that it will not tie-up a
> >> > handler thread (of course the latter could do the same but that involves
> >> > using a purgatory which is more complicated), and also it is less
> >> likely to
> >> > violate request timeout. So I think there are some rationales to prefer
> >> > retries.
> >> >
> >>
> >  That sounds fair to me, also we are avoiding usage of another purgatory
> > instance. Usually for one back-off
> > we are only delaying 50ms during startup which is trivial cost. This
> > behavior shouldn't be changed.
> >
> > > 2. Regarding "ConsumerRebalanceListener": both ConsumerRebalanceListener
> >> > and PartitionAssignors are user-customizable modules, and only
> >> difference
> >> > is that the former is specified via code and the latter is specified via
> >> > config.
> >> >
> >> > Regarding Jason's proposal of ConsumerAssignment, one thing to note
> >> though
> >> > with KIP-429 the onPartitionAssigned may not be called if the assignment
> >> > does not change, whereas onAssignment would always be called at the end
> >> of
> >> > sync-group response. My proposed semantics is that
> >> > `RebalanceListener#onPartitionsXXX` are used for notifications to user,
> >> and
> >> > hence if there's no changes these will not be called, whereas
> >> > `PartitionAssignor` is used for assignor logic, whose callback would
> >> always
> >> > be called no matter if the partitions have changed or not.
> >>
> >> I think a third option is to gracefully expose generation id as part of
> > consumer API, so that we don't need to
> > bother overloading various callbacks. Of course, this builds upon the
> > assumption that topic partitions
> > will not be included in new initTransaction API.
> >
> > > 3. I feel it is a bit awkward to let the TxnCoordinator keeping partition
> >> > assignments since it is sort of taking over the job of the
> >> > ConsumerCoordinator, and may likely cause a split-brain problem as two
> >> > coordinators keep a copy of this assignment which may be different.
> >> >
> >> > I think co-locating does have some merits here, i.e. letting the
> >> > ConsumerCoordinator which has the source-of-truth of assignment to act
> >> as
> >> > the TxnCoordinator as well; but I agree there's also some cons of
> >> coupling
> >> > them together. I'm still a bit inclining towards colocation but if there
> >> > are good rationales not to do so I can be convinced as 

Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-06-25 Thread Boyang Chen
Actually, after a second thought, I think it actually makes sense to
support auto upgrade through admin client to help use get api version from
broker.
A draft KIP is here:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-483%3A++Add+Broker+Version+API+in+Admin+Client

Boyang

On Tue, Jun 25, 2019 at 2:57 PM Boyang Chen 
wrote:

> Thank you Guozhang, some of my understandings are inline below.
>
> On Tue, Jun 25, 2019 at 11:05 AM Jason Gustafson 
> wrote:
>
>> >
>> > I think co-locating does have some merits here, i.e. letting the
>> > ConsumerCoordinator which has the source-of-truth of assignment to act
>> as
>> > the TxnCoordinator as well; but I agree there's also some cons of
>> coupling
>> > them together. I'm still a bit inclining towards colocation but if there
>> > are good rationales not to do so I can be convinced as well.
>>
>>
>> The good rationale is that we have no mechanism to colocate partitions ;).
>> Are you suggesting we store the group and transaction state in the same
>> log? Can you be more concrete about the benefit?
>>
>> -Jason
>>
>> On Tue, Jun 25, 2019 at 10:51 AM Guozhang Wang 
>> wrote:
>>
>> > Hi Boyang,
>> >
>> > 1. One advantage of retry against on-hold is that it will not tie-up a
>> > handler thread (of course the latter could do the same but that involves
>> > using a purgatory which is more complicated), and also it is less
>> likely to
>> > violate request timeout. So I think there are some rationales to prefer
>> > retries.
>> >
>>
>  That sounds fair to me, also we are avoiding usage of another purgatory
> instance. Usually for one back-off
> we are only delaying 50ms during startup which is trivial cost. This
> behavior shouldn't be changed.
>
> > 2. Regarding "ConsumerRebalanceListener": both ConsumerRebalanceListener
>> > and PartitionAssignors are user-customizable modules, and only
>> difference
>> > is that the former is specified via code and the latter is specified via
>> > config.
>> >
>> > Regarding Jason's proposal of ConsumerAssignment, one thing to note
>> though
>> > with KIP-429 the onPartitionAssigned may not be called if the assignment
>> > does not change, whereas onAssignment would always be called at the end
>> of
>> > sync-group response. My proposed semantics is that
>> > `RebalanceListener#onPartitionsXXX` are used for notifications to user,
>> and
>> > hence if there's no changes these will not be called, whereas
>> > `PartitionAssignor` is used for assignor logic, whose callback would
>> always
>> > be called no matter if the partitions have changed or not.
>>
>> I think a third option is to gracefully expose generation id as part of
> consumer API, so that we don't need to
> bother overloading various callbacks. Of course, this builds upon the
> assumption that topic partitions
> will not be included in new initTransaction API.
>
> > 3. I feel it is a bit awkward to let the TxnCoordinator keeping partition
>> > assignments since it is sort of taking over the job of the
>> > ConsumerCoordinator, and may likely cause a split-brain problem as two
>> > coordinators keep a copy of this assignment which may be different.
>> >
>> > I think co-locating does have some merits here, i.e. letting the
>> > ConsumerCoordinator which has the source-of-truth of assignment to act
>> as
>> > the TxnCoordinator as well; but I agree there's also some cons of
>> coupling
>> > them together. I'm still a bit inclining towards colocation but if there
>> > are good rationales not to do so I can be convinced as well.
>> >
>>
> The purpose of co-location is to let txn coordinator see the group
> assignment. This priority is weakened
> when we already have defense on the consumer offset fetch, so I guess it's
> not super important anymore.
>
>
>> > 4. I guess I'm preferring the philosophy of "only add configs if
>> there's no
>> > other ways", since more and more configs would make it less and less
>> > intuitive out of the box to use.
>> >
>> > I think it's a valid point that checks upon starting up does not cope
>> with
>> > brokers downgrading but even with a config, but it is still hard for
>> users
>> > to determine when they can be ensured the broker would never downgrade
>> > anymore and hence can safely switch the config. So my feeling is that
>> this
>> > config would not be helping too much still. If we want to be at the
>> safer
>> > side, then I'd suggest we modify the Coordinator -> NetworkClient
>> hierarchy
>> > to allow the NetworkClient being able to pass the APIVersion metadata to
>> > Coordinator, so that Coordinator can rely on that logic to change its
>> > behavior dynamically.
>>
> The stream thread init could not be supported by a client coordinator
> behavior change on the fly,
> we are only losing possibilities after we initialized. (main thread gets
> exit and no thread has global picture anymore)
> If we do want to support auto version detection, admin client request in
> this sense shall be easier.
>
>
>> >
>> > 5. I do not have a 

Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-06-25 Thread Boyang Chen
Thank you Guozhang, some of my understandings are inline below.

On Tue, Jun 25, 2019 at 11:05 AM Jason Gustafson  wrote:

> >
> > I think co-locating does have some merits here, i.e. letting the
> > ConsumerCoordinator which has the source-of-truth of assignment to act as
> > the TxnCoordinator as well; but I agree there's also some cons of
> coupling
> > them together. I'm still a bit inclining towards colocation but if there
> > are good rationales not to do so I can be convinced as well.
>
>
> The good rationale is that we have no mechanism to colocate partitions ;).
> Are you suggesting we store the group and transaction state in the same
> log? Can you be more concrete about the benefit?
>
> -Jason
>
> On Tue, Jun 25, 2019 at 10:51 AM Guozhang Wang  wrote:
>
> > Hi Boyang,
> >
> > 1. One advantage of retry against on-hold is that it will not tie-up a
> > handler thread (of course the latter could do the same but that involves
> > using a purgatory which is more complicated), and also it is less likely
> to
> > violate request timeout. So I think there are some rationales to prefer
> > retries.
> >
>
 That sounds fair to me, also we are avoiding usage of another purgatory
instance. Usually for one back-off
we are only delaying 50ms during startup which is trivial cost. This
behavior shouldn't be changed.

> 2. Regarding "ConsumerRebalanceListener": both ConsumerRebalanceListener
> > and PartitionAssignors are user-customizable modules, and only difference
> > is that the former is specified via code and the latter is specified via
> > config.
> >
> > Regarding Jason's proposal of ConsumerAssignment, one thing to note
> though
> > with KIP-429 the onPartitionAssigned may not be called if the assignment
> > does not change, whereas onAssignment would always be called at the end
> of
> > sync-group response. My proposed semantics is that
> > `RebalanceListener#onPartitionsXXX` are used for notifications to user,
> and
> > hence if there's no changes these will not be called, whereas
> > `PartitionAssignor` is used for assignor logic, whose callback would
> always
> > be called no matter if the partitions have changed or not.
>
> I think a third option is to gracefully expose generation id as part of
consumer API, so that we don't need to
bother overloading various callbacks. Of course, this builds upon the
assumption that topic partitions
will not be included in new initTransaction API.

> 3. I feel it is a bit awkward to let the TxnCoordinator keeping partition
> > assignments since it is sort of taking over the job of the
> > ConsumerCoordinator, and may likely cause a split-brain problem as two
> > coordinators keep a copy of this assignment which may be different.
> >
> > I think co-locating does have some merits here, i.e. letting the
> > ConsumerCoordinator which has the source-of-truth of assignment to act as
> > the TxnCoordinator as well; but I agree there's also some cons of
> coupling
> > them together. I'm still a bit inclining towards colocation but if there
> > are good rationales not to do so I can be convinced as well.
> >
>
The purpose of co-location is to let txn coordinator see the group
assignment. This priority is weakened
when we already have defense on the consumer offset fetch, so I guess it's
not super important anymore.


> > 4. I guess I'm preferring the philosophy of "only add configs if there's
> no
> > other ways", since more and more configs would make it less and less
> > intuitive out of the box to use.
> >
> > I think it's a valid point that checks upon starting up does not cope
> with
> > brokers downgrading but even with a config, but it is still hard for
> users
> > to determine when they can be ensured the broker would never downgrade
> > anymore and hence can safely switch the config. So my feeling is that
> this
> > config would not be helping too much still. If we want to be at the safer
> > side, then I'd suggest we modify the Coordinator -> NetworkClient
> hierarchy
> > to allow the NetworkClient being able to pass the APIVersion metadata to
> > Coordinator, so that Coordinator can rely on that logic to change its
> > behavior dynamically.
>
The stream thread init could not be supported by a client coordinator
behavior change on the fly,
we are only losing possibilities after we initialized. (main thread gets
exit and no thread has global picture anymore)
If we do want to support auto version detection, admin client request in
this sense shall be easier.


> >
> > 5. I do not have a concrete idea about how the impact on Connect would
> > make, maybe Randall or Konstantine can help here?
> >
>
Sounds good, let's see their thoughts.


> > Guozhang
> >
> > On Mon, Jun 24, 2019 at 10:26 PM Boyang Chen  >
> > wrote:
> >
> > > Hey Jason,
> > >
> > > thank you for the proposal here. Some of my thoughts below.
> > >
> > > On Mon, Jun 24, 2019 at 8:58 PM Jason Gustafson 
> > > wrote:
> > >
> > > > Hi Boyang,
> > > >
> > > > Thanks for picking this up! Still reading 

Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-06-25 Thread Jason Gustafson
>
> I think co-locating does have some merits here, i.e. letting the
> ConsumerCoordinator which has the source-of-truth of assignment to act as
> the TxnCoordinator as well; but I agree there's also some cons of coupling
> them together. I'm still a bit inclining towards colocation but if there
> are good rationales not to do so I can be convinced as well.


The good rationale is that we have no mechanism to colocate partitions ;).
Are you suggesting we store the group and transaction state in the same
log? Can you be more concrete about the benefit?

-Jason

On Tue, Jun 25, 2019 at 10:51 AM Guozhang Wang  wrote:

> Hi Boyang,
>
> 1. One advantage of retry against on-hold is that it will not tie-up a
> handler thread (of course the latter could do the same but that involves
> using a purgatory which is more complicated), and also it is less likely to
> violate request timeout. So I think there are some rationales to prefer
> retries.
>
> 2. Regarding "ConsumerRebalanceListener": both ConsumerRebalanceListener
> and PartitionAssignors are user-customizable modules, and only difference
> is that the former is specified via code and the latter is specified via
> config.
>
> Regarding Jason's proposal of ConsumerAssignment, one thing to note though
> with KIP-429 the onPartitionAssigned may not be called if the assignment
> does not change, whereas onAssignment would always be called at the end of
> sync-group response. My proposed semantics is that
> `RebalanceListener#onPartitionsXXX` are used for notifications to user, and
> hence if there's no changes these will not be called, whereas
> `PartitionAssignor` is used for assignor logic, whose callback would always
> be called no matter if the partitions have changed or not.
>
> 3. I feel it is a bit awkward to let the TxnCoordinator keeping partition
> assignments since it is sort of taking over the job of the
> ConsumerCoordinator, and may likely cause a split-brain problem as two
> coordinators keep a copy of this assignment which may be different.
>
> I think co-locating does have some merits here, i.e. letting the
> ConsumerCoordinator which has the source-of-truth of assignment to act as
> the TxnCoordinator as well; but I agree there's also some cons of coupling
> them together. I'm still a bit inclining towards colocation but if there
> are good rationales not to do so I can be convinced as well.
>
> 4. I guess I'm preferring the philosophy of "only add configs if there's no
> other ways", since more and more configs would make it less and less
> intuitive out of the box to use.
>
> I think it's a valid point that checks upon starting up does not cope with
> brokers downgrading but even with a config, but it is still hard for users
> to determine when they can be ensured the broker would never downgrade
> anymore and hence can safely switch the config. So my feeling is that this
> config would not be helping too much still. If we want to be at the safer
> side, then I'd suggest we modify the Coordinator -> NetworkClient hierarchy
> to allow the NetworkClient being able to pass the APIVersion metadata to
> Coordinator, so that Coordinator can rely on that logic to change its
> behavior dynamically.
>
> 5. I do not have a concrete idea about how the impact on Connect would
> make, maybe Randall or Konstantine can help here?
>
>
> Guozhang
>
> On Mon, Jun 24, 2019 at 10:26 PM Boyang Chen 
> wrote:
>
> > Hey Jason,
> >
> > thank you for the proposal here. Some of my thoughts below.
> >
> > On Mon, Jun 24, 2019 at 8:58 PM Jason Gustafson 
> > wrote:
> >
> > > Hi Boyang,
> > >
> > > Thanks for picking this up! Still reading through the updates, but here
> > are
> > > a couple initial comments on the APIs:
> > >
> > > 1. The `TxnProducerIdentity` class is a bit awkward. I think we are
> > trying
> > > to encapsulate state from the current group assignment. Maybe something
> > > like `ConsumerAssignment` would be clearer? If we make the usage
> > consistent
> > > across the consumer and producer, then we can avoid exposing internal
> > state
> > > like the generationId.
> > >
> > > For example:
> > >
> > > // Public API
> > > interface ConsumerAssignment {
> > >   Set partittions();
> > > }
> > >
> > > // Not a public API
> > > class InternalConsumerAssignment implements ConsumerAssignment {
> > >   Set partittions;
> > >   int generationId;
> > > }
> > >
> > > Then we can change the rebalance listener to something like this:
> > > onPartitionsAssigned(ConsumerAssignment assignment)
> > >
> > > And on the producer:
> > > void initTransactions(String groupId, ConsumerAssignment assignment);
> > >
> > > 2. Another bit of awkwardness is the fact that we have to pass the
> > groupId
> > > through both initTransactions() and sendOffsetsToTransaction(). We
> could
> > > consider a config instead. Maybe something like `
> transactional.group.id
> > `?
> > > Then we could simplify the producer APIs, potentially even deprecating
> > the
> > > current 

Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-06-25 Thread Guozhang Wang
Hi Boyang,

1. One advantage of retry against on-hold is that it will not tie-up a
handler thread (of course the latter could do the same but that involves
using a purgatory which is more complicated), and also it is less likely to
violate request timeout. So I think there are some rationales to prefer
retries.

2. Regarding "ConsumerRebalanceListener": both ConsumerRebalanceListener
and PartitionAssignors are user-customizable modules, and only difference
is that the former is specified via code and the latter is specified via
config.

Regarding Jason's proposal of ConsumerAssignment, one thing to note though
with KIP-429 the onPartitionAssigned may not be called if the assignment
does not change, whereas onAssignment would always be called at the end of
sync-group response. My proposed semantics is that
`RebalanceListener#onPartitionsXXX` are used for notifications to user, and
hence if there's no changes these will not be called, whereas
`PartitionAssignor` is used for assignor logic, whose callback would always
be called no matter if the partitions have changed or not.

3. I feel it is a bit awkward to let the TxnCoordinator keeping partition
assignments since it is sort of taking over the job of the
ConsumerCoordinator, and may likely cause a split-brain problem as two
coordinators keep a copy of this assignment which may be different.

I think co-locating does have some merits here, i.e. letting the
ConsumerCoordinator which has the source-of-truth of assignment to act as
the TxnCoordinator as well; but I agree there's also some cons of coupling
them together. I'm still a bit inclining towards colocation but if there
are good rationales not to do so I can be convinced as well.

4. I guess I'm preferring the philosophy of "only add configs if there's no
other ways", since more and more configs would make it less and less
intuitive out of the box to use.

I think it's a valid point that checks upon starting up does not cope with
brokers downgrading but even with a config, but it is still hard for users
to determine when they can be ensured the broker would never downgrade
anymore and hence can safely switch the config. So my feeling is that this
config would not be helping too much still. If we want to be at the safer
side, then I'd suggest we modify the Coordinator -> NetworkClient hierarchy
to allow the NetworkClient being able to pass the APIVersion metadata to
Coordinator, so that Coordinator can rely on that logic to change its
behavior dynamically.

5. I do not have a concrete idea about how the impact on Connect would
make, maybe Randall or Konstantine can help here?


Guozhang

On Mon, Jun 24, 2019 at 10:26 PM Boyang Chen 
wrote:

> Hey Jason,
>
> thank you for the proposal here. Some of my thoughts below.
>
> On Mon, Jun 24, 2019 at 8:58 PM Jason Gustafson 
> wrote:
>
> > Hi Boyang,
> >
> > Thanks for picking this up! Still reading through the updates, but here
> are
> > a couple initial comments on the APIs:
> >
> > 1. The `TxnProducerIdentity` class is a bit awkward. I think we are
> trying
> > to encapsulate state from the current group assignment. Maybe something
> > like `ConsumerAssignment` would be clearer? If we make the usage
> consistent
> > across the consumer and producer, then we can avoid exposing internal
> state
> > like the generationId.
> >
> > For example:
> >
> > // Public API
> > interface ConsumerAssignment {
> >   Set partittions();
> > }
> >
> > // Not a public API
> > class InternalConsumerAssignment implements ConsumerAssignment {
> >   Set partittions;
> >   int generationId;
> > }
> >
> > Then we can change the rebalance listener to something like this:
> > onPartitionsAssigned(ConsumerAssignment assignment)
> >
> > And on the producer:
> > void initTransactions(String groupId, ConsumerAssignment assignment);
> >
> > 2. Another bit of awkwardness is the fact that we have to pass the
> groupId
> > through both initTransactions() and sendOffsetsToTransaction(). We could
> > consider a config instead. Maybe something like `transactional.group.id
> `?
> > Then we could simplify the producer APIs, potentially even deprecating
> the
> > current sendOffsetsToTransaction. In fact, for this new usage, the `
> > transational.id` config is not needed. It would be nice if we don't have
> > to
> > provide it.
> >
> > I like the idea of combining 1 and 2. We could definitely pass in a
> group.id config
> so that we could avoid exposing that information in a public API. The
> question I have
> is that whether we should name the interface `GroupAssignment` instead, so
> that Connect later
> could also extend on the same interface, just to echo Guozhang's point
> here, Also the base interface
> is better to be defined empty for easy extension, or define an abstract
> type called `Resource` to be shareable
> later IMHO.
>
>
> > By the way, I'm a bit confused about discussion above about colocating
> the
> > txn and group coordinators. That is not actually necessary, is it?
> >
> > 

Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-06-25 Thread Jason Gustafson
>
> The question I have is that whether we should name the interface
> `GroupAssignment` instead, so
> that Connect later could also extend on the same interface, just to echo
> Guozhang's point here,


Are you referring to the API used for initTransactions? There would be no
reason to use a more generic interface in ConsumerRebalanceListener since
that is already tied to the consumer. Possibly you can have GroupAssignment
> ConsumerAssignment to try and leave the door open for a ConnectAssignment
implementation in the future. Then we could have the following APIs:

// ConsumerRebalanceListener
onPartitionsAssigned(ConsumerAssignment assignment)
// Producer
void initTransactions(String groupId, GroupAssignment assignment);

The mechanism Connect uses for offsets is quite different. They are stored
in a separate topic, so we don't have the convenience of the group
coordinator to gate access. This begs another question. The current
proposal still has my initial suggestion to include the partition
assignment in the InitProducerId API. Do we still need this since we are
doing fencing in the group coordinator? To handle Connect, we would need to
generalize the notion of an assignment in the transaction coordinator.
Alternatively, we can rely on group coordinator and leave the assignment
out of transaction coordinator for now. This could always be revisited in
the future.

-Jason


On Mon, Jun 24, 2019 at 10:26 PM Boyang Chen 
wrote:

> Hey Jason,
>
> thank you for the proposal here. Some of my thoughts below.
>
> On Mon, Jun 24, 2019 at 8:58 PM Jason Gustafson 
> wrote:
>
> > Hi Boyang,
> >
> > Thanks for picking this up! Still reading through the updates, but here
> are
> > a couple initial comments on the APIs:
> >
> > 1. The `TxnProducerIdentity` class is a bit awkward. I think we are
> trying
> > to encapsulate state from the current group assignment. Maybe something
> > like `ConsumerAssignment` would be clearer? If we make the usage
> consistent
> > across the consumer and producer, then we can avoid exposing internal
> state
> > like the generationId.
> >
> > For example:
> >
> > // Public API
> > interface ConsumerAssignment {
> >   Set partittions();
> > }
> >
> > // Not a public API
> > class InternalConsumerAssignment implements ConsumerAssignment {
> >   Set partittions;
> >   int generationId;
> > }
> >
> > Then we can change the rebalance listener to something like this:
> > onPartitionsAssigned(ConsumerAssignment assignment)
> >
> > And on the producer:
> > void initTransactions(String groupId, ConsumerAssignment assignment);
> >
> > 2. Another bit of awkwardness is the fact that we have to pass the
> groupId
> > through both initTransactions() and sendOffsetsToTransaction(). We could
> > consider a config instead. Maybe something like `transactional.group.id
> `?
> > Then we could simplify the producer APIs, potentially even deprecating
> the
> > current sendOffsetsToTransaction. In fact, for this new usage, the `
> > transational.id` config is not needed. It would be nice if we don't have
> > to
> > provide it.
> >
> > I like the idea of combining 1 and 2. We could definitely pass in a
> group.id config
> so that we could avoid exposing that information in a public API. The
> question I have
> is that whether we should name the interface `GroupAssignment` instead, so
> that Connect later
> could also extend on the same interface, just to echo Guozhang's point
> here, Also the base interface
> is better to be defined empty for easy extension, or define an abstract
> type called `Resource` to be shareable
> later IMHO.
>
>
> > By the way, I'm a bit confused about discussion above about colocating
> the
> > txn and group coordinators. That is not actually necessary, is it?
> >
> > Yes, this is not a requirement for this KIP, because it is inherently
> impossible to
> achieve co-locating  topic partition of transaction log and consumed offset
> topics.
>
>
> > Thanks,
> > Jason
> >
> On Mon, Jun 24, 2019 at 10:07 AM Boyang Chen 
> > wrote:
> >
> > > Thank you Ismael for the suggestion. We will attempt to address it by
> > > giving more details to rejected alternative section.
> > >
> > >
> > > Thank you for the comment Guozhang! Answers are inline below.
> > >
> > >
> > >
> > > On Sun, Jun 23, 2019 at 6:33 PM Guozhang Wang 
> > wrote:
> > >
> > > > Hello Boyang,
> > > >
> > > > Thanks for the KIP, I have some comments below:
> > > >
> > > > 1. "Once transactions are complete, the call will return." This seems
> > > > different from the existing behavior, in which we would return a
> > > retriable
> > > > CONCURRENT_TRANSACTIONS and let the client to retry, is this
> > intentional?
> > > >
> > >
> > > I don’t think it is intentional, and I will defer this question to
> Jason
> > > when he got time to answer since from what I understood retry and on
> hold
> > > seem both valid approaches.
> > >
> > >
> > > > 2. "an overload to onPartitionsAssigned in the consumer's rebalance
> > > > listener 

Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-06-24 Thread Boyang Chen
Hey Jason,

thank you for the proposal here. Some of my thoughts below.

On Mon, Jun 24, 2019 at 8:58 PM Jason Gustafson  wrote:

> Hi Boyang,
>
> Thanks for picking this up! Still reading through the updates, but here are
> a couple initial comments on the APIs:
>
> 1. The `TxnProducerIdentity` class is a bit awkward. I think we are trying
> to encapsulate state from the current group assignment. Maybe something
> like `ConsumerAssignment` would be clearer? If we make the usage consistent
> across the consumer and producer, then we can avoid exposing internal state
> like the generationId.
>
> For example:
>
> // Public API
> interface ConsumerAssignment {
>   Set partittions();
> }
>
> // Not a public API
> class InternalConsumerAssignment implements ConsumerAssignment {
>   Set partittions;
>   int generationId;
> }
>
> Then we can change the rebalance listener to something like this:
> onPartitionsAssigned(ConsumerAssignment assignment)
>
> And on the producer:
> void initTransactions(String groupId, ConsumerAssignment assignment);
>
> 2. Another bit of awkwardness is the fact that we have to pass the groupId
> through both initTransactions() and sendOffsetsToTransaction(). We could
> consider a config instead. Maybe something like `transactional.group.id`?
> Then we could simplify the producer APIs, potentially even deprecating the
> current sendOffsetsToTransaction. In fact, for this new usage, the `
> transational.id` config is not needed. It would be nice if we don't have
> to
> provide it.
>
> I like the idea of combining 1 and 2. We could definitely pass in a
group.id config
so that we could avoid exposing that information in a public API. The
question I have
is that whether we should name the interface `GroupAssignment` instead, so
that Connect later
could also extend on the same interface, just to echo Guozhang's point
here, Also the base interface
is better to be defined empty for easy extension, or define an abstract
type called `Resource` to be shareable
later IMHO.


> By the way, I'm a bit confused about discussion above about colocating the
> txn and group coordinators. That is not actually necessary, is it?
>
> Yes, this is not a requirement for this KIP, because it is inherently
impossible to
achieve co-locating  topic partition of transaction log and consumed offset
topics.


> Thanks,
> Jason
>
On Mon, Jun 24, 2019 at 10:07 AM Boyang Chen 
> wrote:
>
> > Thank you Ismael for the suggestion. We will attempt to address it by
> > giving more details to rejected alternative section.
> >
> >
> > Thank you for the comment Guozhang! Answers are inline below.
> >
> >
> >
> > On Sun, Jun 23, 2019 at 6:33 PM Guozhang Wang 
> wrote:
> >
> > > Hello Boyang,
> > >
> > > Thanks for the KIP, I have some comments below:
> > >
> > > 1. "Once transactions are complete, the call will return." This seems
> > > different from the existing behavior, in which we would return a
> > retriable
> > > CONCURRENT_TRANSACTIONS and let the client to retry, is this
> intentional?
> > >
> >
> > I don’t think it is intentional, and I will defer this question to Jason
> > when he got time to answer since from what I understood retry and on hold
> > seem both valid approaches.
> >
> >
> > > 2. "an overload to onPartitionsAssigned in the consumer's rebalance
> > > listener interface": as part of KIP-341 we've already add this
> > information
> > > to the onAssignment callback. Would this be sufficient? Or more
> generally
> > > speaking, which information have to be passed around in rebalance
> > callback
> > > while others can be passed around in PartitionAssignor callback? In
> > Streams
> > > for example both callbacks are used but most critical information is
> > passed
> > > via onAssignment.
> > >
> >
> > We still need to extend ConsumerRebalanceListener because it’s the
> > interface we could have public access to. The #onAssignment call is
> defined
> > on PartitionAssignor level which is not easy to work with external
> > producers.
> >
> >
> > > 3. "We propose to use a separate record type in order to store the
> group
> > > assignment.": hmm, I thought with the third typed FindCoordinator, the
> > same
> > > broker that act as the  consumer coordinator would always be selected
> as
> > > the txn coordinator, in which case it can access its local cache
> > metadata /
> > > offset topic to get this information already? We just need to think
> about
> > > how to make these two modules directly exchange information without
> > messing
> > > up the code hierarchy.
> > >
> >
> > These two coordinators will be on the same broker only when number of
> > partitions for transaction state topic and consumer offset topic are the
> > same. This normally holds true, but I'm afraid
> > we couldn't make this assumption?
> >
> > 4. The config of "CONSUMER_GROUP_AWARE_TRANSACTION": it seems the goal of
> > > this config is just to avoid old-versioned broker to not be able to
> > > recognize newer versioned client. I think if 

Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-06-24 Thread Jason Gustafson
Hi Boyang,

Thanks for picking this up! Still reading through the updates, but here are
a couple initial comments on the APIs:

1. The `TxnProducerIdentity` class is a bit awkward. I think we are trying
to encapsulate state from the current group assignment. Maybe something
like `ConsumerAssignment` would be clearer? If we make the usage consistent
across the consumer and producer, then we can avoid exposing internal state
like the generationId.

For example:

// Public API
interface ConsumerAssignment {
  Set partittions();
}

// Not a public API
class InternalConsumerAssignment implements ConsumerAssignment {
  Set partittions;
  int generationId;
}

Then we can change the rebalance listener to something like this:
onPartitionsAssigned(ConsumerAssignment assignment)

And on the producer:
void initTransactions(String groupId, ConsumerAssignment assignment);

2. Another bit of awkwardness is the fact that we have to pass the groupId
through both initTransactions() and sendOffsetsToTransaction(). We could
consider a config instead. Maybe something like `transactional.group.id`?
Then we could simplify the producer APIs, potentially even deprecating the
current sendOffsetsToTransaction. In fact, for this new usage, the `
transational.id` config is not needed. It would be nice if we don't have to
provide it.

By the way, I'm a bit confused about discussion above about colocating the
txn and group coordinators. That is not actually necessary, is it?

Thanks,
Jason

On Mon, Jun 24, 2019 at 10:07 AM Boyang Chen 
wrote:

> Thank you Ismael for the suggestion. We will attempt to address it by
> giving more details to rejected alternative section.
>
>
> Thank you for the comment Guozhang! Answers are inline below.
>
>
>
> On Sun, Jun 23, 2019 at 6:33 PM Guozhang Wang  wrote:
>
> > Hello Boyang,
> >
> > Thanks for the KIP, I have some comments below:
> >
> > 1. "Once transactions are complete, the call will return." This seems
> > different from the existing behavior, in which we would return a
> retriable
> > CONCURRENT_TRANSACTIONS and let the client to retry, is this intentional?
> >
>
> I don’t think it is intentional, and I will defer this question to Jason
> when he got time to answer since from what I understood retry and on hold
> seem both valid approaches.
>
>
> > 2. "an overload to onPartitionsAssigned in the consumer's rebalance
> > listener interface": as part of KIP-341 we've already add this
> information
> > to the onAssignment callback. Would this be sufficient? Or more generally
> > speaking, which information have to be passed around in rebalance
> callback
> > while others can be passed around in PartitionAssignor callback? In
> Streams
> > for example both callbacks are used but most critical information is
> passed
> > via onAssignment.
> >
>
> We still need to extend ConsumerRebalanceListener because it’s the
> interface we could have public access to. The #onAssignment call is defined
> on PartitionAssignor level which is not easy to work with external
> producers.
>
>
> > 3. "We propose to use a separate record type in order to store the group
> > assignment.": hmm, I thought with the third typed FindCoordinator, the
> same
> > broker that act as the  consumer coordinator would always be selected as
> > the txn coordinator, in which case it can access its local cache
> metadata /
> > offset topic to get this information already? We just need to think about
> > how to make these two modules directly exchange information without
> messing
> > up the code hierarchy.
> >
>
> These two coordinators will be on the same broker only when number of
> partitions for transaction state topic and consumer offset topic are the
> same. This normally holds true, but I'm afraid
> we couldn't make this assumption?
>
> 4. The config of "CONSUMER_GROUP_AWARE_TRANSACTION": it seems the goal of
> > this config is just to avoid old-versioned broker to not be able to
> > recognize newer versioned client. I think if we can do something else to
> > avoid this config though, for example we can use the embedded AdminClient
> > to send the APIVersion request upon starting up, and based on the
> returned
> > value decides whether to go to the old code path or the new behavior.
> > Admittedly asking a random broker about APIVersion does not guarantee the
> > whole cluster's versions, but what we can do is to first 1) find the
> > coordinator (and if the random broker does not even recognize the new
> > discover type, fall back to old path directly), and then 2) ask the
> > discovered coordinator about its supported APIVersion.
> >
>
> The caveat here is that we have to make sure both the group coordinator and
> transaction coordinator are on the latest version during init stage. This
> is potentially doable as we only need a consumer group.id
> to check that. In the meantime, a hard-coded config is still a favorable
> backup in case the server has downgraded, so you will want to use a new
> version client without 

Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-06-24 Thread Boyang Chen
Thank you Ismael for the suggestion. We will attempt to address it by
giving more details to rejected alternative section.


Thank you for the comment Guozhang! Answers are inline below.



On Sun, Jun 23, 2019 at 6:33 PM Guozhang Wang  wrote:

> Hello Boyang,
>
> Thanks for the KIP, I have some comments below:
>
> 1. "Once transactions are complete, the call will return." This seems
> different from the existing behavior, in which we would return a retriable
> CONCURRENT_TRANSACTIONS and let the client to retry, is this intentional?
>

I don’t think it is intentional, and I will defer this question to Jason
when he got time to answer since from what I understood retry and on hold
seem both valid approaches.


> 2. "an overload to onPartitionsAssigned in the consumer's rebalance
> listener interface": as part of KIP-341 we've already add this information
> to the onAssignment callback. Would this be sufficient? Or more generally
> speaking, which information have to be passed around in rebalance callback
> while others can be passed around in PartitionAssignor callback? In Streams
> for example both callbacks are used but most critical information is passed
> via onAssignment.
>

We still need to extend ConsumerRebalanceListener because it’s the
interface we could have public access to. The #onAssignment call is defined
on PartitionAssignor level which is not easy to work with external
producers.


> 3. "We propose to use a separate record type in order to store the group
> assignment.": hmm, I thought with the third typed FindCoordinator, the same
> broker that act as the  consumer coordinator would always be selected as
> the txn coordinator, in which case it can access its local cache metadata /
> offset topic to get this information already? We just need to think about
> how to make these two modules directly exchange information without messing
> up the code hierarchy.
>

These two coordinators will be on the same broker only when number of
partitions for transaction state topic and consumer offset topic are the
same. This normally holds true, but I'm afraid
we couldn't make this assumption?

4. The config of "CONSUMER_GROUP_AWARE_TRANSACTION": it seems the goal of
> this config is just to avoid old-versioned broker to not be able to
> recognize newer versioned client. I think if we can do something else to
> avoid this config though, for example we can use the embedded AdminClient
> to send the APIVersion request upon starting up, and based on the returned
> value decides whether to go to the old code path or the new behavior.
> Admittedly asking a random broker about APIVersion does not guarantee the
> whole cluster's versions, but what we can do is to first 1) find the
> coordinator (and if the random broker does not even recognize the new
> discover type, fall back to old path directly), and then 2) ask the
> discovered coordinator about its supported APIVersion.
>

The caveat here is that we have to make sure both the group coordinator and
transaction coordinator are on the latest version during init stage. This
is potentially doable as we only need a consumer group.id
to check that. In the meantime, a hard-coded config is still a favorable
backup in case the server has downgraded, so you will want to use a new
version client without `consumer group` transactional support.

5. This is a meta question: have you considered how this can be applied to
> Kafka Connect as well? For example, for source connectors, the assignment
> is not by "partitions", but by some other sort of "resources" based on the
> source systems, how KIP-447 would affect Kafka Connectors that implemented
> EOS as well?
>

No, it's not currently included in the scope. Could you point me to a
sample source connector who uses EOS? Could always piggy-back into the
TxnProducerIdentity struct with more information such as tasks. If
this is something to support in near term, an abstract type called
"Resource" could be provided and let topic partition and connect task
implement it.


>
> Guozhang
>
>
> On Sat, Jun 22, 2019 at 8:40 PM Ismael Juma  wrote:
>
> > Hi Boyang,
> >
> > Thanks for the KIP. It's good that we listed a number of rejected
> > alternatives. It would be helpful to have an explanation of why they were
> > rejected.
> >
> > Ismael
> >
> > On Sat, Jun 22, 2019 at 8:31 PM Boyang Chen  wrote:
> >
> > > Hey all,
> > >
> > > I would like to start a discussion for KIP-447:
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics
> > >
> > > this is a work originated by Jason Gustafson and we would like to
> proceed
> > > into discussion stage.
> > >
> > > Let me know your thoughts, thanks!
> > >
> > > Boyang
> > >
> >
>
>
> --
> -- Guozhang
>


Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-06-23 Thread Guozhang Wang
Hello Boyang,

Thanks for the KIP, I have some comments below:

1. "Once transactions are complete, the call will return." This seems
different from the existing behavior, in which we would return a retriable
CONCURRENT_TRANSACTIONS and let the client to retry, is this intentional?

2. "an overload to onPartitionsAssigned in the consumer's rebalance
listener interface": as part of KIP-341 we've already add this information
to the onAssignment callback. Would this be sufficient? Or more generally
speaking, which information have to be passed around in rebalance callback
while others can be passed around in PartitionAssignor callback? In Streams
for example both callbacks are used but most critical information is passed
via onAssignment.

3. "We propose to use a separate record type in order to store the group
assignment.": hmm, I thought with the third typed FindCoordinator, the same
broker that act as the  consumer coordinator would always be selected as
the txn coordinator, in which case it can access its local cache metadata /
offset topic to get this information already? We just need to think about
how to make these two modules directly exchange information without messing
up the code hierarchy.

4. The config of "CONSUMER_GROUP_AWARE_TRANSACTION": it seems the goal of
this config is just to avoid old-versioned broker to not be able to
recognize newer versioned client. I think if we can do something else to
avoid this config though, for example we can use the embedded AdminClient
to send the APIVersion request upon starting up, and based on the returned
value decides whether to go to the old code path or the new behavior.
Admittedly asking a random broker about APIVersion does not guarantee the
whole cluster's versions, but what we can do is to first 1) find the
coordinator (and if the random broker does not even recognize the new
discover type, fall back to old path directly), and then 2) ask the
discovered coordinator about its supported APIVersion.

5. This is a meta question: have you considered how this can be applied to
Kafka Connect as well? For example, for source connectors, the assignment
is not by "partitions", but by some other sort of "resources" based on the
source systems, how KIP-447 would affect Kafka Connectors that implemented
EOS as well?


Guozhang


On Sat, Jun 22, 2019 at 8:40 PM Ismael Juma  wrote:

> Hi Boyang,
>
> Thanks for the KIP. It's good that we listed a number of rejected
> alternatives. It would be helpful to have an explanation of why they were
> rejected.
>
> Ismael
>
> On Sat, Jun 22, 2019 at 8:31 PM Boyang Chen  wrote:
>
> > Hey all,
> >
> > I would like to start a discussion for KIP-447:
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics
> >
> > this is a work originated by Jason Gustafson and we would like to proceed
> > into discussion stage.
> >
> > Let me know your thoughts, thanks!
> >
> > Boyang
> >
>


-- 
-- Guozhang


Re: [DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-06-22 Thread Ismael Juma
Hi Boyang,

Thanks for the KIP. It's good that we listed a number of rejected
alternatives. It would be helpful to have an explanation of why they were
rejected.

Ismael

On Sat, Jun 22, 2019 at 8:31 PM Boyang Chen  wrote:

> Hey all,
>
> I would like to start a discussion for KIP-447:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics
>
> this is a work originated by Jason Gustafson and we would like to proceed
> into discussion stage.
>
> Let me know your thoughts, thanks!
>
> Boyang
>


[DISCUSS] KIP-447: Producer scalability for exactly once semantics

2019-06-22 Thread Boyang Chen
Hey all,

I would like to start a discussion for KIP-447:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics

this is a work originated by Jason Gustafson and we would like to proceed into 
discussion stage.

Let me know your thoughts, thanks!

Boyang