[DISCUSS] KafkaConsumer pause(Collection partitions)

2021-11-17 Thread Riven Sun
Hi Kafka devs,
For the existing KafkaConsumer pause method, I think it needs to be
improved. At the same time, KafkaConsumer should be able to support the
pause method that is not affected by groupRebalance.
I give some suggestions on this, I hope you can actively participate in the
discussion.
And I hope everyone will discuss together first, and the solution may not
be completed by me alone.So I chose to create an Improvement type of Jira
instead of directly creating a KIP.
Jira link: https://issues.apache.org/jira/browse/KAFKA-13463

Looking forward to any suggestions from everyone

Best Regards
RivenSun


Re: [DISCUSS] KIP-796: Interactive Query v2

2021-11-17 Thread John Roesler
Ah, sorry, Guozhang,

It seem I was a bit too eager with starting the vote thread.

13: I think that makes perfect sense. I've updated the KIP.

14: Oof, I can't believe I overlooked those newer
exceptions. Some of them will become exceptions in IQv2,
whereas others will beceome individual partition QueryResult
failures. Here is an accounting of what will become of those
proposed exceptions:

* StreamsNotStartedException: thrown when stream thread
state is CREATED, the user can retry until to RUNNING.

* StreamsRebalancingException: thrown when stream thread is
not running and stream state is REBALANCING. This exception
is no longer applicable. Regardless of the rebalanceing
state of the store's task, the state will either be up to
the requested bound or not.

* StateStoreMigratedException: thrown when state store
already closed and stream state is RUNNING. This is a per-
partition failure, so it now maps to the
FailureReason.NOT_PRESENT failure.


* StateStoreNotAvailableException: thrown when state store
closed and stream state is PENDING_SHUTDOWN / NOT_RUNNING / 
ERROR. I (subjectively) felt the name was ambiguous with
respect to the prior condition in which a store partition is
not locally available. This is replaced with the thrown
exception, StreamsStoppedException (the JavaDoc states the
that it is thrown when Streams is in any terminal state). 

* UnknownStateStoreException: thrown when passing an unknown
state store. This is still a thown exception.

* InvalidStateStorePartitionException: thrown when user
requested partition is not available on the stream instance.
If the partition actually does exist, then we will now
return a per-partition FailureReason.NOT_PRESENT. If the
requested partition is actually not present in the topology
at all, then we will return the per-partition
FailureReason.DOES_NOT_EXIST.

Sorry for the oversight. The KIP has been updated.

Thanks,
-John

On Wed, 2021-11-17 at 15:48 -0800, Guozhang Wang wrote:
> Thanks John.
> 
> I made another pass on the KIP and overall it already looks pretty good. I
> just have a couple more minor comments:
> 
> 13: What do you think about just removing the following function in
> QueryResult
> 
>   // returns a failed query result because caller requested a "latest"
> bound, but the task was
>   // not active and running.
>   public static  QueryResult notActive(String currentState);
> 
> Instead just use `notUpToBound` for the case when `latest` bound is
> requested but the node is not the active replica. My main motivation is
> trying to abstract away the notion of active/standby from the public APIs
> itself, and hence capturing both this case as well as just a
> normal "position bound not achieved" in the same return signal, until later
> when we think it is indeed needed to separate them with different returns.
> 
> 14: Regarding the possible exceptions being thrown from `query`, it seems
> more exception types are possible from KIP-216:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-216%3A+IQ+should+throw+different+exceptions+for+different+errors,
> should we include all in the javadocs?
> 
> 
> Guozhang
> 
> 
> 
> On Wed, Nov 17, 2021 at 3:25 PM John Roesler  wrote:
> 
> > Thanks for the reply, Guozhang!
> > 
> > I have updated the KIP to tie up the remaining points that
> > we have discussed. I really appreciate your time in refining
> > the proposal. I included a quick summary of the final state
> > of our discussion points below.
> > 
> > Since it seems like this discussion thread is pretty
> > convergent, I'll go ahead and start the voting thread soon.
> > 
> > Thanks again!
> > -John
> > 
> > P.S.: the final state of our discussion points:
> > 
> > 1. I removed serdesForStore from the proposal (and moved it
> > to Rejected Alternatives)
> > 
> > 2. Thanks for that reference. I had overlooked that
> > implementation. I'd note that the ListValuesStore is
> > currently only used in the KStream API, which doesn't
> > support queries at all. Due to its interface, it could
> > theoretically be used to materialize a KTable, though it has
> > no supplier provided in the typical Stores factory class.
> > 
> > Regardless, I think that it would still be a similar story
> > to the Segmented store. The ListValues store would simply
> > choose to terminate the query on its own and not delegate to
> > any of the wrapped KeyValue stores. It wouldn't matter that
> > the wrapped stores have a query-handling facility of their
> > own, if the wrapping store doesn't choose to delegate, the
> > wrapped store will not try to execute any queries.
> > 
> > Specifically regarding the key transformation that these
> > "formatted" stores perform, when they handle the query, they
> > would have the ability to execute the query in any way that
> > makes sense OR to just reject the query if it doesn't make
> > sense.
> > 
> > 3, 4: nothing to do
> > 
> > 5: I updated the KIP to specify the exceptions that may be
> > thrown in `KafkaStreams#query` 

[jira] [Created] (KAFKA-13463) Improvement: KafkaConsumer pause(Collection partitions)

2021-11-17 Thread RivenSun (Jira)
RivenSun created KAFKA-13463:


 Summary: Improvement: KafkaConsumer 
pause(Collection partitions)
 Key: KAFKA-13463
 URL: https://issues.apache.org/jira/browse/KAFKA-13463
 Project: Kafka
  Issue Type: Improvement
Reporter: RivenSun


h1. 1.Background

When users use the kafkaConsumer#pause(...) method, they will maybe ignore: the 
pause method may no longer work, and data will be lost.

For example, the following simple code:
{code:java}
while (true) {
try {
kafkaConsumer.pause(kafkaConsumer.assignment());
ConsumerRecords records = 
kafkaConsumer.poll(Duration.ofSeconds(2));
if (!records.isEmpty()) {
log.error("kafka poll for rebalance discard some record!");
}
} catch (Exception e) {
log.error("maintain poll for rebalance with error:{}", e.getMessage(), 
e);
}
}{code}
Even if you call pause(assignment) before the poll method every time, the poll 
method may still return messages.

 
h1. 2. RootCause:

In short, during the rebalance of the group, 
ConsumerCoordinator#invokePartitionsRevoked(...) will clear the paused mark on 
the partitions previously held by kafkaConsumer. However, while clearing the 
paused mark of partitions, the corresponding message in the memory 
(Fetcher.completedFetches) of pausedPartitions was not cleared, resulting in 
Fetcher#fetchedRecords() still fetching the message and returning it to the 
customer.

For more detailed analysis, if you are interested, you can read Jira 
!https://issues.apache.org/jira/s/xdetwe/820001/13pdxe5/_/images/fav-jsw.png![KAFKA-13425]
 KafkaConsumer#pause() will lose its effect after groupRebalance occurs, which 
maybe cause data loss on the consumer side - ASF JIRA , looking forward to your 
reply.

 
h1. 3.Discuss : Can KafkaConsumer support the pause method that is not affected 
by groupRebalance?

The KafkaConsumer#pause method actually stated one point at the beginning of 
its design:
 * Rebalance does not preserve pause/resume state.

link:!https://issues.apache.org/jira/s/xdetwe/820001/13pdxe5/_/images/fav-jsw.png![KAFKA-2350]
 Add KafkaConsumer pause capability - ASF JIRA

Unfortunately, I did not see this from the comments of the 
KafkaConsumer#pause(...) method. At the same time, 
ConsumerCoordinator#invokePartitionsRevoked did not have any log output when 
cleaning up the paused mark. I believe that this will cause many users to use 
the KafkaConsumer#pause(...) method incorrectly.

But I think it is necessary for KafkaConsumer to provide a pause method that is 
not affected by groupRebalance.

 
h1. 4. Suggestions

I will optimize the existing pause method from several different perspectives, 
or provide some new {{pause}} methods, and each point is an independent solution
h2. 1)ConsumerCoordinator#invokePartitionsRevoked should also trigger Fetcher 
to clean up the revokedAndPausedPartitions message in memory when clearing the 
paused mark

This can prevent the Fetcher#fetchedRecords() method from mistakenly thinking 
that revokedAndPausedPartitions is legal and returning messages. There are 
various checks on the partition in the fetchedRecords method.

The price of this is that if the user does not call the pause(...) method 
before calling the poll method next time, a new FetchMessage request may be 
initiated, which will cause additional network transmission.

 
h2. 2)Efforts to maintain the old paused mark on the KafkaConsumer side

<1>In the ConsumerCoordinator#onJoinPrepare(...) method, record all 
pausedTopicPartitions from the current assignment of KafkaConsumer;

 <2> In the ConsumerCoordinator#onJoinComplete(...) method, use 
pausedTopicPartitions to render the latest assignment and restore the paused 
marks of the partitions that are still in the latest assignment.

{*}Note{*}: If the new assignment of kafkaConsumer no longer contains 
topicPartitions that have been paused before rebalance, the paused mark of 
these topicPartitions will be lost forever on the kafkaConsumer side, even if 
in a future rebalance, the kafkaConsumer will hold these partitions again.

At the end of the Jira KAFKA-13425 I mentioned above, I gave a draft code 
suggestion on this point

<3> In fact, for consumers who use the RebalanceProtocol.COOPERATIVE protocol

For example, consumers who use the currently supported PartitionAssignor: 
CooperativeStickyAssignor, through code analysis, we can find that the default 
behavior of these consumers is to maintain the old paused flag, and consumers 
who use the RebalanceProtocol.EAGER protocol default to clear all paused marks.

I suggest that the KafkaConsumer behavior of the two RebalanceProtocol should 
be consistent, otherwise it will cause ambiguity to the existing 
KafkaConsumer#pause(...) and cause great confusion to users.

 
h2. 3)In the groupRebalance process, pass the paused flag of topicPartitions

In the JoinGroup request, 

[jira] [Created] (KAFKA-13462) KRaft server does not return internal topics on list topics RPC

2021-11-17 Thread dengziming (Jira)
dengziming created KAFKA-13462:
--

 Summary: KRaft server does not return internal topics on list 
topics RPC
 Key: KAFKA-13462
 URL: https://issues.apache.org/jira/browse/KAFKA-13462
 Project: Kafka
  Issue Type: Bug
Reporter: dengziming






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] KIP-797 Accept duplicate listener on port for IPv4/IPv6

2021-11-17 Thread Luke Chen
Hi Matthew,
Thanks for the KIP.

I have a question:
If I remembered correctly, the "advertised listeners" already support
duplicated ports, so your KIP should only focus on "listeners"
configuration, is that right? If so, could you please make it clear in KIP,
to mention that your change only apply to "listeners", not "advertised
listeners".

Also, you should also mention in the KIP, that the doc for "listeners" will
also be updated. (I checked your PR, and found you missed that)

Thank you.
Luke

On Tue, Nov 16, 2021 at 10:24 PM Matthew de Detrich
 wrote:

> Since no one has commented on either this thread or the original one I will
> summon a vote by the end of this week.
>
> Regards
>
> On Wed, Nov 10, 2021 at 5:28 PM Matthew de Detrich <
> matthew.dedetr...@aiven.io> wrote:
>
> > Hello everyone,
> >
> > I would like to start a discussion for KIP-797 which is about allowing
> > duplicate listeners on the same port in the specific case where one host
> is
> > an IPv4 address and the other host is an IPv6 address.
> >
> > The proposal is here
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=195726330
> >
> > Regards
> > --
> >
> > Matthew de Detrich
> >
> > *Aiven Deutschland GmbH*
> >
> > Immanuelkirchstraße 26, 10405 Berlin
> >
> > Amtsgericht Charlottenburg, HRB 209739 B
> >
> > Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
> >
> > *m:* +491603708037
> >
> > *w:* aiven.io *e:* matthew.dedetr...@aiven.io
> >
>
>
> --
>
> Matthew de Detrich
>
> *Aiven Deutschland GmbH*
>
> Immanuelkirchstraße 26, 10405 Berlin
>
> Amtsgericht Charlottenburg, HRB 209739 B
>
> Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
>
> *m:* +491603708037
>
> *w:* aiven.io *e:* matthew.dedetr...@aiven.io
>


Re: [DISCUSS] KIP-778 KRaft Upgrades

2021-11-17 Thread Jason Gustafson
Hi Colin/David,

> Like David said, basically it boils down to creating a feature flag for
the new proposed __consumer_offsets version, or using a new
IBP/metadata.version for it. Both approaches have pros and cons. Using an
IBP/metadata.version bump reduces the size of the testing matrix. But using
a feature flag allows people to avoid any bugs or pain associated with the
change if they don't care about enabling it. This is basically the classic
"should I use a feature flag or not?" discussion and we need to have it on
a case-by-case basis.

I think most users are not going to care to manage versions for a bunch of
different features. The IBP today has many shortcomings, but at least it's
tied to a version that users understand (i.e. the release version). How
would users know after upgrading to Kafka 3.1, for example, that they need
to upgrade the metadata.version to 3  and offsets.version to 4 (or
whatever)? It's a lot of overhead trying to understand all of the potential
features and what each upgrade actually means to them. I am wondering if we
could give them something more convenient which is tied to the release
version. For example, maybe we could use a command like `kafka-features
upgrade --release 3.1`, which the broker would then translate to an upgrade
to the latest versions of the individual features at the time of the 3.1
release. Basically it's just a static map from release version to feature
versions to make the upgrade process more convenient.

Thanks,
Jason




On Wed, Nov 17, 2021 at 6:20 PM Jason Gustafson  wrote:

> A few additional questions:
>
> 1. Currently the IBP tells us what version of individual inter-broker RPCs
> will be used. I think the plan in this KIP is to use ApiVersions request
> instead to find the highest compatible version (just like clients). Do I
> have that right?
>
> 2. The following wasn't very clear to me:
>
> > Brokers will be able to observe changes to metadata.version by observing
> the metadata log, and could then submit a new ApiVersionsRequest to the
> other Kafka nodes.
>
> Is the purpose of submitting new ApiVersions requests to update the
> features or the RPC versions? Does metadata.version also influence the
> versions that a broker advertises? It would help to have more detail about
> this.
>
> 3. I imagine users will want to know before performing an upgrade whether
> downgrading will be safe. Would the --dry-run flag tell them this?
>
> Thanks,
> Jason
>
>
>
>
>
> On Wed, Nov 17, 2021 at 3:55 PM Colin McCabe  wrote:
>
>> On Wed, Nov 17, 2021, at 11:28, Jason Gustafson wrote:
>> > Hi David,
>> >
>> > Forgive me if this ground has been covered already. Today, we have a few
>> > other things that we have latched onto the IBP, such as upgrades to the
>> > format of records in __consumer_offsets. I've been assuming that
>> > metadata.version is not covering this. Is that right or is there some
>> other
>> > plan to take care of cases like this?
>> >
>>
>> I think metadata.version could cover changes to things like
>> __consumer_offsets, if people want it to. Or to put it another way, that is
>> out of scope for this KIP.
>>
>> Like David said, basically it boils down to creating a feature flag for
>> the new proposed __consumer_offsets version, or using a new
>> IBP/metadata.version for it. Both approaches have pros and cons. Using an
>> IBP/metadata.version bump reduces the size of the testing matrix. But using
>> a feature flag allows people to avoid any bugs or pain associated with the
>> change if they don't care about enabling it. This is basically the classic
>> "should I use a feature flag or not?" discussion and we need to have it on
>> a case-by-case basis.
>>
>> I think it's worth calling out that having a 1:1 mapping between IBP
>> versions and metadata.versions will result in some metadata.versions that
>> "don't do anything" (aka they do the same thing as the previous
>> metadata.version). For example, if we change StopReplicaRequest again, that
>> will not affect KRaft mode, but probably would require an IBP bump and
>> hence a metadata.version bump. I think that's OK. It's not that different
>> from updating your IBP and getting support for ZStandard, when your
>> deployment doesn't use ZStandard compression.
>>
>> best,
>> Colin
>>
>> > Thanks,
>> > Jason
>> >
>> >
>> >
>> > On Wed, Nov 17, 2021 at 10:17 AM Jun Rao 
>> wrote:
>> >
>> >> Hi, Colin,
>> >>
>> >> Thanks for the reply.
>> >>
>> >> For case b, I am not sure that I understand your suggestion. Does "each
>> >> subsequent level for metadata.version corresponds to an IBP version"
>> mean
>> >> that we need to keep IBP forever? Could you describe the upgrade
>> process in
>> >> this case?
>> >>
>> >> Jun
>> >>
>> >> On Tue, Nov 16, 2021 at 3:45 PM Colin McCabe 
>> wrote:
>> >>
>> >> > On Tue, Nov 16, 2021, at 15:13, Jun Rao wrote:
>> >> > > Hi, David, Colin,
>> >> > >
>> >> > > Thanks for the reply.
>> >> > >
>> >> > > 16. Discussed with David offline a bit. We have 3 cases.
>> 

Re: [DISCUSS] KIP-778 KRaft Upgrades

2021-11-17 Thread Jason Gustafson
A few additional questions:

1. Currently the IBP tells us what version of individual inter-broker RPCs
will be used. I think the plan in this KIP is to use ApiVersions request
instead to find the highest compatible version (just like clients). Do I
have that right?

2. The following wasn't very clear to me:

> Brokers will be able to observe changes to metadata.version by observing
the metadata log, and could then submit a new ApiVersionsRequest to the
other Kafka nodes.

Is the purpose of submitting new ApiVersions requests to update the
features or the RPC versions? Does metadata.version also influence the
versions that a broker advertises? It would help to have more detail about
this.

3. I imagine users will want to know before performing an upgrade whether
downgrading will be safe. Would the --dry-run flag tell them this?

Thanks,
Jason





On Wed, Nov 17, 2021 at 3:55 PM Colin McCabe  wrote:

> On Wed, Nov 17, 2021, at 11:28, Jason Gustafson wrote:
> > Hi David,
> >
> > Forgive me if this ground has been covered already. Today, we have a few
> > other things that we have latched onto the IBP, such as upgrades to the
> > format of records in __consumer_offsets. I've been assuming that
> > metadata.version is not covering this. Is that right or is there some
> other
> > plan to take care of cases like this?
> >
>
> I think metadata.version could cover changes to things like
> __consumer_offsets, if people want it to. Or to put it another way, that is
> out of scope for this KIP.
>
> Like David said, basically it boils down to creating a feature flag for
> the new proposed __consumer_offsets version, or using a new
> IBP/metadata.version for it. Both approaches have pros and cons. Using an
> IBP/metadata.version bump reduces the size of the testing matrix. But using
> a feature flag allows people to avoid any bugs or pain associated with the
> change if they don't care about enabling it. This is basically the classic
> "should I use a feature flag or not?" discussion and we need to have it on
> a case-by-case basis.
>
> I think it's worth calling out that having a 1:1 mapping between IBP
> versions and metadata.versions will result in some metadata.versions that
> "don't do anything" (aka they do the same thing as the previous
> metadata.version). For example, if we change StopReplicaRequest again, that
> will not affect KRaft mode, but probably would require an IBP bump and
> hence a metadata.version bump. I think that's OK. It's not that different
> from updating your IBP and getting support for ZStandard, when your
> deployment doesn't use ZStandard compression.
>
> best,
> Colin
>
> > Thanks,
> > Jason
> >
> >
> >
> > On Wed, Nov 17, 2021 at 10:17 AM Jun Rao 
> wrote:
> >
> >> Hi, Colin,
> >>
> >> Thanks for the reply.
> >>
> >> For case b, I am not sure that I understand your suggestion. Does "each
> >> subsequent level for metadata.version corresponds to an IBP version"
> mean
> >> that we need to keep IBP forever? Could you describe the upgrade
> process in
> >> this case?
> >>
> >> Jun
> >>
> >> On Tue, Nov 16, 2021 at 3:45 PM Colin McCabe 
> wrote:
> >>
> >> > On Tue, Nov 16, 2021, at 15:13, Jun Rao wrote:
> >> > > Hi, David, Colin,
> >> > >
> >> > > Thanks for the reply.
> >> > >
> >> > > 16. Discussed with David offline a bit. We have 3 cases.
> >> > > a. We upgrade from an old version where the metadata.version has
> >> already
> >> > > been finalized. In this case it makes sense to stay with that
> feature
> >> > > version after the upgrade.
> >> >
> >> > +1
> >> >
> >> > > b. We upgrade from an old version where no metadata.version has been
> >> > > finalized. In this case, it makes sense to leave metadata.version
> >> > disabled
> >> > > since we don't know if all brokers have been upgraded.
> >> >
> >> > This is the scenario I was hoping to avoid by saying that ALL KRaft
> >> > clusters have metadata.version of at least 1, and each subsequent
> level
> >> for
> >> > metadata.version corresponds to an IBP version. The existing KRaft
> >> clusters
> >> > in 3.0 and earlier are preview (not for production) so I think this
> >> change
> >> > is OK for 3.x (given that it affects only KRaft). Then IBP is
> irrelevant
> >> > for KRaft clusters (the config is ignored, possibly with a WARN or
> ERROR
> >> > message generated if it is set).
> >> >
> >> > > c. We are starting from a brand new cluster and of course no
> >> > > metadata.version has been finalized. In this case, the KIP says it
> will
> >> > > pick the metadata.version in meta.properties. In the common case,
> >> people
> >> > > probably won't set the metadata.version in the meta.properties file
> >> > > explicitly. So, it will be useful to put a default (stable) version
> >> there
> >> > > when the meta.properties.
> >> >
> >> > Hmm. I was assuming that clusters where the admin didn't specify any
> >> > metadata.version during formatting would get the latest
> metadata.version.
> >> > Partly, because this is what we do for IBP 

Re: [DISCUSS] KIP-778 KRaft Upgrades

2021-11-17 Thread Colin McCabe
On Wed, Nov 17, 2021, at 11:28, Jason Gustafson wrote:
> Hi David,
>
> Forgive me if this ground has been covered already. Today, we have a few
> other things that we have latched onto the IBP, such as upgrades to the
> format of records in __consumer_offsets. I've been assuming that
> metadata.version is not covering this. Is that right or is there some other
> plan to take care of cases like this?
>

I think metadata.version could cover changes to things like __consumer_offsets, 
if people want it to. Or to put it another way, that is out of scope for this 
KIP.

Like David said, basically it boils down to creating a feature flag for the new 
proposed __consumer_offsets version, or using a new IBP/metadata.version for 
it. Both approaches have pros and cons. Using an IBP/metadata.version bump 
reduces the size of the testing matrix. But using a feature flag allows people 
to avoid any bugs or pain associated with the change if they don't care about 
enabling it. This is basically the classic "should I use a feature flag or 
not?" discussion and we need to have it on a case-by-case basis.

I think it's worth calling out that having a 1:1 mapping between IBP versions 
and metadata.versions will result in some metadata.versions that "don't do 
anything" (aka they do the same thing as the previous metadata.version). For 
example, if we change StopReplicaRequest again, that will not affect KRaft 
mode, but probably would require an IBP bump and hence a metadata.version bump. 
I think that's OK. It's not that different from updating your IBP and getting 
support for ZStandard, when your deployment doesn't use ZStandard compression.

best,
Colin

> Thanks,
> Jason
>
>
>
> On Wed, Nov 17, 2021 at 10:17 AM Jun Rao  wrote:
>
>> Hi, Colin,
>>
>> Thanks for the reply.
>>
>> For case b, I am not sure that I understand your suggestion. Does "each
>> subsequent level for metadata.version corresponds to an IBP version" mean
>> that we need to keep IBP forever? Could you describe the upgrade process in
>> this case?
>>
>> Jun
>>
>> On Tue, Nov 16, 2021 at 3:45 PM Colin McCabe  wrote:
>>
>> > On Tue, Nov 16, 2021, at 15:13, Jun Rao wrote:
>> > > Hi, David, Colin,
>> > >
>> > > Thanks for the reply.
>> > >
>> > > 16. Discussed with David offline a bit. We have 3 cases.
>> > > a. We upgrade from an old version where the metadata.version has
>> already
>> > > been finalized. In this case it makes sense to stay with that feature
>> > > version after the upgrade.
>> >
>> > +1
>> >
>> > > b. We upgrade from an old version where no metadata.version has been
>> > > finalized. In this case, it makes sense to leave metadata.version
>> > disabled
>> > > since we don't know if all brokers have been upgraded.
>> >
>> > This is the scenario I was hoping to avoid by saying that ALL KRaft
>> > clusters have metadata.version of at least 1, and each subsequent level
>> for
>> > metadata.version corresponds to an IBP version. The existing KRaft
>> clusters
>> > in 3.0 and earlier are preview (not for production) so I think this
>> change
>> > is OK for 3.x (given that it affects only KRaft). Then IBP is irrelevant
>> > for KRaft clusters (the config is ignored, possibly with a WARN or ERROR
>> > message generated if it is set).
>> >
>> > > c. We are starting from a brand new cluster and of course no
>> > > metadata.version has been finalized. In this case, the KIP says it will
>> > > pick the metadata.version in meta.properties. In the common case,
>> people
>> > > probably won't set the metadata.version in the meta.properties file
>> > > explicitly. So, it will be useful to put a default (stable) version
>> there
>> > > when the meta.properties.
>> >
>> > Hmm. I was assuming that clusters where the admin didn't specify any
>> > metadata.version during formatting would get the latest metadata.version.
>> > Partly, because this is what we do for IBP today. It would be good to
>> > clarify this...
>> >
>> > >
>> > > Also, it would be useful to clarify that if a FeatureLevelRecord exists
>> > for
>> > > metadata.version, the metadata.version in meta.properties will be
>> > ignored.
>> > >
>> >
>> > Yeah, I agree.
>> >
>> > best,
>> > Colin
>> >
>> > > Thanks,
>> > >
>> > > Jun
>> > >
>> > >
>> > > On Tue, Nov 16, 2021 at 12:39 PM Colin McCabe 
>> > wrote:
>> > >
>> > >> On Fri, Nov 5, 2021, at 15:18, Jun Rao wrote:
>> > >> > Hi, David,
>> > >> >
>> > >> > Thanks for the reply.
>> > >> >
>> > >> > 16. My first concern is that the KIP picks up meta.version
>> > inconsistently
>> > >> > during the deployment. If a new cluster is started, we pick up the
>> > >> highest
>> > >> > version. If we upgrade, we leave the feature version unchanged.
>> > >>
>> > >> Hi Jun,
>> > >>
>> > >> Thanks again for taking a look.
>> > >>
>> > >> The proposed behavior in KIP-778 is consistent with how it works
>> today.
>> > >> Upgrading the software is distinct from upgrading the IBP.
>> > >>
>> > >> I think it is important to keep these two operations ("upgrading

Re: [DISCUSS] KIP-796: Interactive Query v2

2021-11-17 Thread Guozhang Wang
Thanks John.

I made another pass on the KIP and overall it already looks pretty good. I
just have a couple more minor comments:

13: What do you think about just removing the following function in
QueryResult

  // returns a failed query result because caller requested a "latest"
bound, but the task was
  // not active and running.
  public static  QueryResult notActive(String currentState);

Instead just use `notUpToBound` for the case when `latest` bound is
requested but the node is not the active replica. My main motivation is
trying to abstract away the notion of active/standby from the public APIs
itself, and hence capturing both this case as well as just a
normal "position bound not achieved" in the same return signal, until later
when we think it is indeed needed to separate them with different returns.

14: Regarding the possible exceptions being thrown from `query`, it seems
more exception types are possible from KIP-216:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-216%3A+IQ+should+throw+different+exceptions+for+different+errors,
should we include all in the javadocs?


Guozhang



On Wed, Nov 17, 2021 at 3:25 PM John Roesler  wrote:

> Thanks for the reply, Guozhang!
>
> I have updated the KIP to tie up the remaining points that
> we have discussed. I really appreciate your time in refining
> the proposal. I included a quick summary of the final state
> of our discussion points below.
>
> Since it seems like this discussion thread is pretty
> convergent, I'll go ahead and start the voting thread soon.
>
> Thanks again!
> -John
>
> P.S.: the final state of our discussion points:
>
> 1. I removed serdesForStore from the proposal (and moved it
> to Rejected Alternatives)
>
> 2. Thanks for that reference. I had overlooked that
> implementation. I'd note that the ListValuesStore is
> currently only used in the KStream API, which doesn't
> support queries at all. Due to its interface, it could
> theoretically be used to materialize a KTable, though it has
> no supplier provided in the typical Stores factory class.
>
> Regardless, I think that it would still be a similar story
> to the Segmented store. The ListValues store would simply
> choose to terminate the query on its own and not delegate to
> any of the wrapped KeyValue stores. It wouldn't matter that
> the wrapped stores have a query-handling facility of their
> own, if the wrapping store doesn't choose to delegate, the
> wrapped store will not try to execute any queries.
>
> Specifically regarding the key transformation that these
> "formatted" stores perform, when they handle the query, they
> would have the ability to execute the query in any way that
> makes sense OR to just reject the query if it doesn't make
> sense.
>
> 3, 4: nothing to do
>
> 5: I updated the KIP to specify the exceptions that may be
> thrown in `KafkaStreams#query` and to clarify that per-
> partition failures will be reported as per-partition failed
> QueryResult objects instead of thrown exceptions. That
> allows us to successfully serve some partitions of the
> request even if others fail.
>
> 6: I added a note that updating the metadata APIs is left
> for future work.
>
> 7: nothing to do
>
> 8. I went with StateQueryRequest and StateQueryResponse.
>
> 9, 10: nothing to do.
>
> 11: Ah, I see. That's a good point, but it's not fundamental
> to the framework. I think we can tackle it when we propose
> the actual queries.
>
> 12: Cool. I went ahead and dropped the "serdesForStore"
> method. I think you're onto something there, and we should
> tackle it separately when we propose the actual queries.
>
>
>
>
> On Tue, 2021-11-16 at 15:59 -0800, Guozhang Wang wrote:
> > Thanks John! Some more thoughts inlined below.
> >
> > On Mon, Nov 15, 2021 at 10:07 PM John Roesler 
> wrote:
> >
> > > Thanks for the review, Guozhang!
> > >
> > > 1. This is a great point. I fell into the age-old trap of
> > > only considering the simplest store type and forgot about
> > > this extra wrinkle of the "key schema" that we use in
> > > Windowed and Session stores.
> > >
> > > Depending on how we want to forge forward with our provided
> > > queries, I think it can still work out ok. The simplest
> > > solution is just to have windowed versions of our queries
> > > for use on the windowed stores. That should work naively
> > > because we're basically just preserving the existing
> > > interactions. It might not be ideal in the long run, but at
> > > least it lets us make IQv2 orthogonal from other efforts to
> > > simplify the stores themselves.
> > >
> > > If we do that, then it would actually be correct to go ahead
> > > and just return the serdes that are present in the Metered
> > > stores today. For example, if I have a Windowed store with
> > > Integer keys, then the key serde I get from serdesForStore
> > > would just be the IntegerSerde. The query I'd use the
> > > serialized key with would be a RawWindowedKeyQuery, which
> > > takes a byte[] key and a timestamp. Then, 

Re: [DISCUSS] KIP-778 KRaft Upgrades

2021-11-17 Thread Colin McCabe
On Wed, Nov 17, 2021, at 12:28, David Arthur wrote:
> Colin,
>
> I wasn't intending to suggest that IBP and metadata.version co-exist long
> term in KRaft. I was thinking we would have one final IBP that would signal
> KRaft clusters to start looking at metadata.version instead. This
> was working under the assumption that we should be able to upgrade 3.0
> clusters to a version with metadata.version. I don't think it would be too
> difficult since we could treat the absence of a feature flag as an implicit
> version 0 which would be compatible with 3.0.
>

Hi David,

Hmm... do we really have to have "one final IBP" for upgraded 3.0 / 3.1 KRaft 
clusters? It seems like we could just initialize their cluster.metadata to 1 
when the software is upgraded to 3.2.

Admittedly this is a bit of a hack -- in general, we don't want to couple 
software upgrades and metadata version bumps. But 3.0 and 3.1 were "preview," 
not intended for production, and this is a pretty minor speed bump for someone 
testing out pre-production software.

This would greatly simplify the system by getting rid of IBP completely in 
KRaft clusters, by substituting it with metadata.version.

>
> Assuming this lands in 3.2, we would have
>
> SoftwareIBPmetadata.versioneffective version
> 3.0 3.0-   0
> 3.2 3.0-   0
> 3.2 3.2-   0
> 3.2 3.21   1
>
> where metadata.version=0 is compatible with what we have today in KRaft.
>
> If we don't support 3.0 -> 3.x KRaft upgrades, then early adopters may be
> rather inconvenienced :) I can't say for sure that the extra complexity is
> worth the convenience of allowing upgrades from the preview versions.
>

So let's drill down to what the inconvenience would be. It would basically be 
an unexpected (if they didn't read the upgrade docs) IBP bump to whatever the 
IBP is in 3.2. Assuming that they started on KAFKA_3_0_IV0 and got 
auto-upgraded to the latest, after the upgrade they would have these additional 
features:

> 1. Introduce ListOffsets V7 (KIP-724)
> 2. Add topic IDs to Fetch requests/responses (KIP-516)

That seems pretty minor (again, noting that this ONLY applies to KRaft 3.0 
Preview users, NOT to ZK users). I don't think it's worth keeping IBP around 
just to prevent this inconvenience.

We have a limited amount of time and effort available to manage versioning 
issues. I don't think it's realistic to have to think about the cross-product 
of all possible metadata.version and IBP values when making a change.

We also have to expect new features to keep going in while the transition is 
taking place. That's why I think having a 1:1 mapping between new IBP versions 
and metadata.versions is helpful.

best,
Colin

>
> We probably need to make a decision on this since it impacts a few things
> in the KIP. What do folks think?
>
> -David
>
>
> On Wed, Nov 17, 2021 at 2:28 PM Jason Gustafson 
> wrote:
>
>> Hi David,
>>
>> Forgive me if this ground has been covered already. Today, we have a few
>> other things that we have latched onto the IBP, such as upgrades to the
>> format of records in __consumer_offsets. I've been assuming that
>> metadata.version is not covering this. Is that right or is there some other
>> plan to take care of cases like this?
>>
>> Thanks,
>> Jason
>>
>>
>>
>> On Wed, Nov 17, 2021 at 10:17 AM Jun Rao  wrote:
>>
>> > Hi, Colin,
>> >
>> > Thanks for the reply.
>> >
>> > For case b, I am not sure that I understand your suggestion. Does "each
>> > subsequent level for metadata.version corresponds to an IBP version" mean
>> > that we need to keep IBP forever? Could you describe the upgrade process
>> in
>> > this case?
>> >
>> > Jun
>> >
>> > On Tue, Nov 16, 2021 at 3:45 PM Colin McCabe  wrote:
>> >
>> > > On Tue, Nov 16, 2021, at 15:13, Jun Rao wrote:
>> > > > Hi, David, Colin,
>> > > >
>> > > > Thanks for the reply.
>> > > >
>> > > > 16. Discussed with David offline a bit. We have 3 cases.
>> > > > a. We upgrade from an old version where the metadata.version has
>> > already
>> > > > been finalized. In this case it makes sense to stay with that feature
>> > > > version after the upgrade.
>> > >
>> > > +1
>> > >
>> > > > b. We upgrade from an old version where no metadata.version has been
>> > > > finalized. In this case, it makes sense to leave metadata.version
>> > > disabled
>> > > > since we don't know if all brokers have been upgraded.
>> > >
>> > > This is the scenario I was hoping to avoid by saying that ALL KRaft
>> > > clusters have metadata.version of at least 1, and each subsequent level
>> > for
>> > > metadata.version corresponds to an IBP version. The existing KRaft
>> > clusters
>> > > in 3.0 and earlier are preview (not for production) so I think this
>> > change
>> > > is OK for 3.x (given that it affects only KRaft). Then IBP is
>> irrelevant
>> > > for KRaft clusters (the config is ignored, possibly with a WARN or
>> ERROR
>> > > 

Re: [VOTE] KIP-796: Interactive Query v2

2021-11-17 Thread John Roesler
Obviously, I will be voting +1 (binding)

Thanks,
-John

On Wed, 2021-11-17 at 17:27 -0600, John Roesler wrote:
> Hello all,
> 
> I'd like to open the vote for KIP-796, which proposes
> a revamp of the Interactive Query APIs in Kafka Streams.
> 
> The proposal is here:
> https://cwiki.apache.org/confluence/x/34xnCw
> 
> Thanks to all who reviewed the proposal, and thanks in
> advance for taking the time to vote!
> 
> Thank you,
> -John



[VOTE} KIP-796: Interactive Query v2

2021-11-17 Thread John Roesler
Hello all,

I'd like to open the vote for KIP-796, which proposes
a revamp of the Interactive Query APIs in Kafka Streams.

The proposal is here:
https://cwiki.apache.org/confluence/x/34xnCw

Thanks to all who reviewed the proposal, and thanks in
advance for taking the time to vote!

Thank you,
-John


Re: [DISCUSS] KIP-796: Interactive Query v2

2021-11-17 Thread John Roesler
Thanks for the reply, Guozhang!

I have updated the KIP to tie up the remaining points that
we have discussed. I really appreciate your time in refining
the proposal. I included a quick summary of the final state
of our discussion points below.

Since it seems like this discussion thread is pretty
convergent, I'll go ahead and start the voting thread soon.

Thanks again!
-John

P.S.: the final state of our discussion points:

1. I removed serdesForStore from the proposal (and moved it
to Rejected Alternatives)

2. Thanks for that reference. I had overlooked that
implementation. I'd note that the ListValuesStore is
currently only used in the KStream API, which doesn't
support queries at all. Due to its interface, it could
theoretically be used to materialize a KTable, though it has
no supplier provided in the typical Stores factory class.

Regardless, I think that it would still be a similar story
to the Segmented store. The ListValues store would simply
choose to terminate the query on its own and not delegate to
any of the wrapped KeyValue stores. It wouldn't matter that
the wrapped stores have a query-handling facility of their
own, if the wrapping store doesn't choose to delegate, the
wrapped store will not try to execute any queries.

Specifically regarding the key transformation that these
"formatted" stores perform, when they handle the query, they
would have the ability to execute the query in any way that
makes sense OR to just reject the query if it doesn't make
sense.

3, 4: nothing to do

5: I updated the KIP to specify the exceptions that may be
thrown in `KafkaStreams#query` and to clarify that per-
partition failures will be reported as per-partition failed
QueryResult objects instead of thrown exceptions. That
allows us to successfully serve some partitions of the
request even if others fail.

6: I added a note that updating the metadata APIs is left
for future work.

7: nothing to do

8. I went with StateQueryRequest and StateQueryResponse.

9, 10: nothing to do.

11: Ah, I see. That's a good point, but it's not fundamental
to the framework. I think we can tackle it when we propose
the actual queries.

12: Cool. I went ahead and dropped the "serdesForStore"
method. I think you're onto something there, and we should
tackle it separately when we propose the actual queries.




On Tue, 2021-11-16 at 15:59 -0800, Guozhang Wang wrote:
> Thanks John! Some more thoughts inlined below.
> 
> On Mon, Nov 15, 2021 at 10:07 PM John Roesler  wrote:
> 
> > Thanks for the review, Guozhang!
> > 
> > 1. This is a great point. I fell into the age-old trap of
> > only considering the simplest store type and forgot about
> > this extra wrinkle of the "key schema" that we use in
> > Windowed and Session stores.
> > 
> > Depending on how we want to forge forward with our provided
> > queries, I think it can still work out ok. The simplest
> > solution is just to have windowed versions of our queries
> > for use on the windowed stores. That should work naively
> > because we're basically just preserving the existing
> > interactions. It might not be ideal in the long run, but at
> > least it lets us make IQv2 orthogonal from other efforts to
> > simplify the stores themselves.
> > 
> > If we do that, then it would actually be correct to go ahead
> > and just return the serdes that are present in the Metered
> > stores today. For example, if I have a Windowed store with
> > Integer keys, then the key serde I get from serdesForStore
> > would just be the IntegerSerde. The query I'd use the
> > serialized key with would be a RawWindowedKeyQuery, which
> > takes a byte[] key and a timestamp. Then, the low-level
> > store (the segmented store in this case) would have to take
> > the next step to use its schema before making that last-mile
> > query. Note, this is precisely how fetch is implemented
> > today in RocksDBWindowStore:
> > 
> > public byte[] fetch(final Bytes key, final long timestamp) {
> >   return wrapped().get(WindowKeySchema.toStoreKeyBinary(key,
> > timestamp, seqnum));
> > }
> > 
> > In other words, if we set up our provided Query types to
> > stick close to the current store query methods, then
> > everything "should work out" (tm).
> > 
> > I think where things start to get more complicated would be
> > if we wanted to expose the actual, raw, on-disk binary key
> > to the user, along with a serde that can interpret it. Then,
> > we would have to pack up the serde and the schema. If we go
> > down that road, then knowing which one (the key serde or the
> > windowed schema + the key serde) the user wants when they
> > ask for "the serde" would be a challenge.
> > 
> > I'm actually thinking maybe we don't need to include the
> > serdesForStore method in this proposal, but instead leave it
> > for follow-on work (if desired) to add it along with raw
> > queries, since it's really only needed if you want raw
> > queries and (as you mentioned later) there may be better
> > ways to present the serdes, 

Jenkins build is still unstable: Kafka » Kafka Branch Builder » 3.1 #19

2021-11-17 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-778 KRaft Upgrades

2021-11-17 Thread David Arthur
Jason, thanks for the review! No, I don't think that has been covered
explicitly and no metadata.version will not gate things like our offsets
format. The intention is that we will introduce new KIP-584 feature flags
for those formats once the need arises. For ZK clusters, we'll probably
keep using the IBP mechanism and only worry about introducing the feature
flag for KRaft clusters. I'll add a short section to the KIP with this
clarification.

Thanks!
-David

On Wed, Nov 17, 2021 at 3:28 PM David Arthur 
wrote:

> Colin,
>
> I wasn't intending to suggest that IBP and metadata.version co-exist long
> term in KRaft. I was thinking we would have one final IBP that would signal
> KRaft clusters to start looking at metadata.version instead. This
> was working under the assumption that we should be able to upgrade 3.0
> clusters to a version with metadata.version. I don't think it would be too
> difficult since we could treat the absence of a feature flag as an implicit
> version 0 which would be compatible with 3.0.
>
> Assuming this lands in 3.2, we would have
>
> SoftwareIBPmetadata.versioneffective version
> 3.0 3.0-   0
> 3.2 3.0-   0
> 3.2 3.2-   0
> 3.2 3.21   1
>
> where metadata.version=0 is compatible with what we have today in KRaft.
>
> If we don't support 3.0 -> 3.x KRaft upgrades, then early adopters may be
> rather inconvenienced :) I can't say for sure that the extra complexity is
> worth the convenience of allowing upgrades from the preview versions.
>
> We probably need to make a decision on this since it impacts a few things
> in the KIP. What do folks think?
>
> -David
>
>
> On Wed, Nov 17, 2021 at 2:28 PM Jason Gustafson 
> wrote:
>
>> Hi David,
>>
>> Forgive me if this ground has been covered already. Today, we have a few
>> other things that we have latched onto the IBP, such as upgrades to the
>> format of records in __consumer_offsets. I've been assuming that
>> metadata.version is not covering this. Is that right or is there some
>> other
>> plan to take care of cases like this?
>>
>> Thanks,
>> Jason
>>
>>
>>
>> On Wed, Nov 17, 2021 at 10:17 AM Jun Rao 
>> wrote:
>>
>> > Hi, Colin,
>> >
>> > Thanks for the reply.
>> >
>> > For case b, I am not sure that I understand your suggestion. Does "each
>> > subsequent level for metadata.version corresponds to an IBP version"
>> mean
>> > that we need to keep IBP forever? Could you describe the upgrade
>> process in
>> > this case?
>> >
>> > Jun
>> >
>> > On Tue, Nov 16, 2021 at 3:45 PM Colin McCabe 
>> wrote:
>> >
>> > > On Tue, Nov 16, 2021, at 15:13, Jun Rao wrote:
>> > > > Hi, David, Colin,
>> > > >
>> > > > Thanks for the reply.
>> > > >
>> > > > 16. Discussed with David offline a bit. We have 3 cases.
>> > > > a. We upgrade from an old version where the metadata.version has
>> > already
>> > > > been finalized. In this case it makes sense to stay with that
>> feature
>> > > > version after the upgrade.
>> > >
>> > > +1
>> > >
>> > > > b. We upgrade from an old version where no metadata.version has been
>> > > > finalized. In this case, it makes sense to leave metadata.version
>> > > disabled
>> > > > since we don't know if all brokers have been upgraded.
>> > >
>> > > This is the scenario I was hoping to avoid by saying that ALL KRaft
>> > > clusters have metadata.version of at least 1, and each subsequent
>> level
>> > for
>> > > metadata.version corresponds to an IBP version. The existing KRaft
>> > clusters
>> > > in 3.0 and earlier are preview (not for production) so I think this
>> > change
>> > > is OK for 3.x (given that it affects only KRaft). Then IBP is
>> irrelevant
>> > > for KRaft clusters (the config is ignored, possibly with a WARN or
>> ERROR
>> > > message generated if it is set).
>> > >
>> > > > c. We are starting from a brand new cluster and of course no
>> > > > metadata.version has been finalized. In this case, the KIP says it
>> will
>> > > > pick the metadata.version in meta.properties. In the common case,
>> > people
>> > > > probably won't set the metadata.version in the meta.properties file
>> > > > explicitly. So, it will be useful to put a default (stable) version
>> > there
>> > > > when the meta.properties.
>> > >
>> > > Hmm. I was assuming that clusters where the admin didn't specify any
>> > > metadata.version during formatting would get the latest
>> metadata.version.
>> > > Partly, because this is what we do for IBP today. It would be good to
>> > > clarify this...
>> > >
>> > > >
>> > > > Also, it would be useful to clarify that if a FeatureLevelRecord
>> exists
>> > > for
>> > > > metadata.version, the metadata.version in meta.properties will be
>> > > ignored.
>> > > >
>> > >
>> > > Yeah, I agree.
>> > >
>> > > best,
>> > > Colin
>> > >
>> > > > Thanks,
>> > > >
>> > > > Jun
>> > > >
>> > > >
>> > > > On Tue, Nov 16, 2021 at 12:39 PM Colin McCabe 
>> > > wrote:
>> > > >
>> > > 

Re: [DISCUSS] KIP-778 KRaft Upgrades

2021-11-17 Thread David Arthur
Colin,

I wasn't intending to suggest that IBP and metadata.version co-exist long
term in KRaft. I was thinking we would have one final IBP that would signal
KRaft clusters to start looking at metadata.version instead. This
was working under the assumption that we should be able to upgrade 3.0
clusters to a version with metadata.version. I don't think it would be too
difficult since we could treat the absence of a feature flag as an implicit
version 0 which would be compatible with 3.0.

Assuming this lands in 3.2, we would have

SoftwareIBPmetadata.versioneffective version
3.0 3.0-   0
3.2 3.0-   0
3.2 3.2-   0
3.2 3.21   1

where metadata.version=0 is compatible with what we have today in KRaft.

If we don't support 3.0 -> 3.x KRaft upgrades, then early adopters may be
rather inconvenienced :) I can't say for sure that the extra complexity is
worth the convenience of allowing upgrades from the preview versions.

We probably need to make a decision on this since it impacts a few things
in the KIP. What do folks think?

-David


On Wed, Nov 17, 2021 at 2:28 PM Jason Gustafson 
wrote:

> Hi David,
>
> Forgive me if this ground has been covered already. Today, we have a few
> other things that we have latched onto the IBP, such as upgrades to the
> format of records in __consumer_offsets. I've been assuming that
> metadata.version is not covering this. Is that right or is there some other
> plan to take care of cases like this?
>
> Thanks,
> Jason
>
>
>
> On Wed, Nov 17, 2021 at 10:17 AM Jun Rao  wrote:
>
> > Hi, Colin,
> >
> > Thanks for the reply.
> >
> > For case b, I am not sure that I understand your suggestion. Does "each
> > subsequent level for metadata.version corresponds to an IBP version" mean
> > that we need to keep IBP forever? Could you describe the upgrade process
> in
> > this case?
> >
> > Jun
> >
> > On Tue, Nov 16, 2021 at 3:45 PM Colin McCabe  wrote:
> >
> > > On Tue, Nov 16, 2021, at 15:13, Jun Rao wrote:
> > > > Hi, David, Colin,
> > > >
> > > > Thanks for the reply.
> > > >
> > > > 16. Discussed with David offline a bit. We have 3 cases.
> > > > a. We upgrade from an old version where the metadata.version has
> > already
> > > > been finalized. In this case it makes sense to stay with that feature
> > > > version after the upgrade.
> > >
> > > +1
> > >
> > > > b. We upgrade from an old version where no metadata.version has been
> > > > finalized. In this case, it makes sense to leave metadata.version
> > > disabled
> > > > since we don't know if all brokers have been upgraded.
> > >
> > > This is the scenario I was hoping to avoid by saying that ALL KRaft
> > > clusters have metadata.version of at least 1, and each subsequent level
> > for
> > > metadata.version corresponds to an IBP version. The existing KRaft
> > clusters
> > > in 3.0 and earlier are preview (not for production) so I think this
> > change
> > > is OK for 3.x (given that it affects only KRaft). Then IBP is
> irrelevant
> > > for KRaft clusters (the config is ignored, possibly with a WARN or
> ERROR
> > > message generated if it is set).
> > >
> > > > c. We are starting from a brand new cluster and of course no
> > > > metadata.version has been finalized. In this case, the KIP says it
> will
> > > > pick the metadata.version in meta.properties. In the common case,
> > people
> > > > probably won't set the metadata.version in the meta.properties file
> > > > explicitly. So, it will be useful to put a default (stable) version
> > there
> > > > when the meta.properties.
> > >
> > > Hmm. I was assuming that clusters where the admin didn't specify any
> > > metadata.version during formatting would get the latest
> metadata.version.
> > > Partly, because this is what we do for IBP today. It would be good to
> > > clarify this...
> > >
> > > >
> > > > Also, it would be useful to clarify that if a FeatureLevelRecord
> exists
> > > for
> > > > metadata.version, the metadata.version in meta.properties will be
> > > ignored.
> > > >
> > >
> > > Yeah, I agree.
> > >
> > > best,
> > > Colin
> > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > >
> > > > On Tue, Nov 16, 2021 at 12:39 PM Colin McCabe 
> > > wrote:
> > > >
> > > >> On Fri, Nov 5, 2021, at 15:18, Jun Rao wrote:
> > > >> > Hi, David,
> > > >> >
> > > >> > Thanks for the reply.
> > > >> >
> > > >> > 16. My first concern is that the KIP picks up meta.version
> > > inconsistently
> > > >> > during the deployment. If a new cluster is started, we pick up the
> > > >> highest
> > > >> > version. If we upgrade, we leave the feature version unchanged.
> > > >>
> > > >> Hi Jun,
> > > >>
> > > >> Thanks again for taking a look.
> > > >>
> > > >> The proposed behavior in KIP-778 is consistent with how it works
> > today.
> > > >> Upgrading the software is distinct from upgrading the IBP.
> > > >>
> > > >> I think it is important to keep these two 

[jira] [Created] (KAFKA-13461) KafkaController stops functioning as active controller after ZooKeeperClient auth failure

2021-11-17 Thread Vincent Jiang (Jira)
Vincent Jiang created KAFKA-13461:
-

 Summary: KafkaController stops functioning as active controller 
after ZooKeeperClient auth failure
 Key: KAFKA-13461
 URL: https://issues.apache.org/jira/browse/KAFKA-13461
 Project: Kafka
  Issue Type: Bug
  Components: zkclient
Reporter: Vincent Jiang


When java.security.auth.login.config is present, but there is no "Client" 
section,  ZookeeperSaslClient creation fails and raises LoginExcpetion, result 
in warning log:
{code:java}
WARN SASL configuration failed: javax.security.auth.login.LoginException: No 
JAAS configuration section named 'Client' was found in specified JAAS 
configuration file: '***'. Will continue connection to Zookeeper server without 
SASL authentication, if Zookeeper server allows it.{code}
When this happens after initial startup, ClientCnxn enqueues an AuthFailed 
event which will trigger following sequence:
 # zkclient reinitialization is triggered
 # the controller resigns.
 # Before the controller's ZK session expires, the controller successfully 
connect to ZK and maintains the current session
 # In KafkaController.elect(), the controller sets activeControllerId to itself 
and short-circuits the rest of the elect. Since the controller resigned earlier 
and also skips the call to onControllerFailover(), the controller is not 
actually functioning as the active controller (e.g. the necessary ZK watchers 
haven't been registered).

 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Jenkins build is still unstable: Kafka » Kafka Branch Builder » 3.1 #18

2021-11-17 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-778 KRaft Upgrades

2021-11-17 Thread Jason Gustafson
Hi David,

Forgive me if this ground has been covered already. Today, we have a few
other things that we have latched onto the IBP, such as upgrades to the
format of records in __consumer_offsets. I've been assuming that
metadata.version is not covering this. Is that right or is there some other
plan to take care of cases like this?

Thanks,
Jason



On Wed, Nov 17, 2021 at 10:17 AM Jun Rao  wrote:

> Hi, Colin,
>
> Thanks for the reply.
>
> For case b, I am not sure that I understand your suggestion. Does "each
> subsequent level for metadata.version corresponds to an IBP version" mean
> that we need to keep IBP forever? Could you describe the upgrade process in
> this case?
>
> Jun
>
> On Tue, Nov 16, 2021 at 3:45 PM Colin McCabe  wrote:
>
> > On Tue, Nov 16, 2021, at 15:13, Jun Rao wrote:
> > > Hi, David, Colin,
> > >
> > > Thanks for the reply.
> > >
> > > 16. Discussed with David offline a bit. We have 3 cases.
> > > a. We upgrade from an old version where the metadata.version has
> already
> > > been finalized. In this case it makes sense to stay with that feature
> > > version after the upgrade.
> >
> > +1
> >
> > > b. We upgrade from an old version where no metadata.version has been
> > > finalized. In this case, it makes sense to leave metadata.version
> > disabled
> > > since we don't know if all brokers have been upgraded.
> >
> > This is the scenario I was hoping to avoid by saying that ALL KRaft
> > clusters have metadata.version of at least 1, and each subsequent level
> for
> > metadata.version corresponds to an IBP version. The existing KRaft
> clusters
> > in 3.0 and earlier are preview (not for production) so I think this
> change
> > is OK for 3.x (given that it affects only KRaft). Then IBP is irrelevant
> > for KRaft clusters (the config is ignored, possibly with a WARN or ERROR
> > message generated if it is set).
> >
> > > c. We are starting from a brand new cluster and of course no
> > > metadata.version has been finalized. In this case, the KIP says it will
> > > pick the metadata.version in meta.properties. In the common case,
> people
> > > probably won't set the metadata.version in the meta.properties file
> > > explicitly. So, it will be useful to put a default (stable) version
> there
> > > when the meta.properties.
> >
> > Hmm. I was assuming that clusters where the admin didn't specify any
> > metadata.version during formatting would get the latest metadata.version.
> > Partly, because this is what we do for IBP today. It would be good to
> > clarify this...
> >
> > >
> > > Also, it would be useful to clarify that if a FeatureLevelRecord exists
> > for
> > > metadata.version, the metadata.version in meta.properties will be
> > ignored.
> > >
> >
> > Yeah, I agree.
> >
> > best,
> > Colin
> >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > > On Tue, Nov 16, 2021 at 12:39 PM Colin McCabe 
> > wrote:
> > >
> > >> On Fri, Nov 5, 2021, at 15:18, Jun Rao wrote:
> > >> > Hi, David,
> > >> >
> > >> > Thanks for the reply.
> > >> >
> > >> > 16. My first concern is that the KIP picks up meta.version
> > inconsistently
> > >> > during the deployment. If a new cluster is started, we pick up the
> > >> highest
> > >> > version. If we upgrade, we leave the feature version unchanged.
> > >>
> > >> Hi Jun,
> > >>
> > >> Thanks again for taking a look.
> > >>
> > >> The proposed behavior in KIP-778 is consistent with how it works
> today.
> > >> Upgrading the software is distinct from upgrading the IBP.
> > >>
> > >> I think it is important to keep these two operations ("upgrading
> > >> IBP/metadata version" and "upgrading software version") separate. If
> > they
> > >> are coupled it will create a situation where software upgrades are
> > >> difficult and dangerous.
> > >>
> > >> Consider a situation where you find some bug in your current software,
> > and
> > >> you want to upgrade to new software that fixes the bug. If upgrades
> and
> > IBP
> > >> bumps are coupled, you can't do this without also bumping the IBP,
> > which is
> > >> usually considered a high-risk change. That means that either you have
> > to
> > >> make a special build that includes only the fix (time-consuming and
> > >> error-prone), live with the bug for longer, or be very conservative
> > about
> > >> ever introducing new IBP/metadata versions. None of those are really
> > good
> > >> choices.
> > >>
> > >> > Intuitively, it seems that independent of how a cluster is deployed,
> > we
> > >> > should always pick the same feature version.
> > >>
> > >> I think it makes sense to draw a distinction between upgrading an
> > existing
> > >> cluster and deploying a new one. What most people want out of upgrades
> > is
> > >> that things should keep working, but with bug fixes. If we change
> that,
> > it
> > >> just makes people more reluctant to upgrade (which is always a
> > problem...)
> > >>
> > >> > I think we need to think this through in this KIP. My second concern
> > is
> > >> > that as a particular version 

Re: [DISCUSS] KIP-778 KRaft Upgrades

2021-11-17 Thread Jun Rao
Hi, Colin,

Thanks for the reply.

For case b, I am not sure that I understand your suggestion. Does "each
subsequent level for metadata.version corresponds to an IBP version" mean
that we need to keep IBP forever? Could you describe the upgrade process in
this case?

Jun

On Tue, Nov 16, 2021 at 3:45 PM Colin McCabe  wrote:

> On Tue, Nov 16, 2021, at 15:13, Jun Rao wrote:
> > Hi, David, Colin,
> >
> > Thanks for the reply.
> >
> > 16. Discussed with David offline a bit. We have 3 cases.
> > a. We upgrade from an old version where the metadata.version has already
> > been finalized. In this case it makes sense to stay with that feature
> > version after the upgrade.
>
> +1
>
> > b. We upgrade from an old version where no metadata.version has been
> > finalized. In this case, it makes sense to leave metadata.version
> disabled
> > since we don't know if all brokers have been upgraded.
>
> This is the scenario I was hoping to avoid by saying that ALL KRaft
> clusters have metadata.version of at least 1, and each subsequent level for
> metadata.version corresponds to an IBP version. The existing KRaft clusters
> in 3.0 and earlier are preview (not for production) so I think this change
> is OK for 3.x (given that it affects only KRaft). Then IBP is irrelevant
> for KRaft clusters (the config is ignored, possibly with a WARN or ERROR
> message generated if it is set).
>
> > c. We are starting from a brand new cluster and of course no
> > metadata.version has been finalized. In this case, the KIP says it will
> > pick the metadata.version in meta.properties. In the common case, people
> > probably won't set the metadata.version in the meta.properties file
> > explicitly. So, it will be useful to put a default (stable) version there
> > when the meta.properties.
>
> Hmm. I was assuming that clusters where the admin didn't specify any
> metadata.version during formatting would get the latest metadata.version.
> Partly, because this is what we do for IBP today. It would be good to
> clarify this...
>
> >
> > Also, it would be useful to clarify that if a FeatureLevelRecord exists
> for
> > metadata.version, the metadata.version in meta.properties will be
> ignored.
> >
>
> Yeah, I agree.
>
> best,
> Colin
>
> > Thanks,
> >
> > Jun
> >
> >
> > On Tue, Nov 16, 2021 at 12:39 PM Colin McCabe 
> wrote:
> >
> >> On Fri, Nov 5, 2021, at 15:18, Jun Rao wrote:
> >> > Hi, David,
> >> >
> >> > Thanks for the reply.
> >> >
> >> > 16. My first concern is that the KIP picks up meta.version
> inconsistently
> >> > during the deployment. If a new cluster is started, we pick up the
> >> highest
> >> > version. If we upgrade, we leave the feature version unchanged.
> >>
> >> Hi Jun,
> >>
> >> Thanks again for taking a look.
> >>
> >> The proposed behavior in KIP-778 is consistent with how it works today.
> >> Upgrading the software is distinct from upgrading the IBP.
> >>
> >> I think it is important to keep these two operations ("upgrading
> >> IBP/metadata version" and "upgrading software version") separate. If
> they
> >> are coupled it will create a situation where software upgrades are
> >> difficult and dangerous.
> >>
> >> Consider a situation where you find some bug in your current software,
> and
> >> you want to upgrade to new software that fixes the bug. If upgrades and
> IBP
> >> bumps are coupled, you can't do this without also bumping the IBP,
> which is
> >> usually considered a high-risk change. That means that either you have
> to
> >> make a special build that includes only the fix (time-consuming and
> >> error-prone), live with the bug for longer, or be very conservative
> about
> >> ever introducing new IBP/metadata versions. None of those are really
> good
> >> choices.
> >>
> >> > Intuitively, it seems that independent of how a cluster is deployed,
> we
> >> > should always pick the same feature version.
> >>
> >> I think it makes sense to draw a distinction between upgrading an
> existing
> >> cluster and deploying a new one. What most people want out of upgrades
> is
> >> that things should keep working, but with bug fixes. If we change that,
> it
> >> just makes people more reluctant to upgrade (which is always a
> problem...)
> >>
> >> > I think we need to think this through in this KIP. My second concern
> is
> >> > that as a particular version matures, it's inconvenient for a user to
> >> manually
> >> > upgrade every feature version. As long as we have a path to achieve
> that
> >> in
> >> > the future, we don't need to address that in this KIP.
> >>
> >> If people are managing a large number of Kafka clusters, they will want
> to
> >> do some sort of A/B testing with IBP/metadata versions. So if you have
> 1000
> >> Kafka clusters, you roll out the new IBP version to 10 of them and see
> how
> >> it goes. If that goes well, you roll it out to more, etc.
> >>
> >> So, the automation needs to be at the cluster management layer, not at
> the
> >> Kafka layer. Each Kafka cluster doesn't know how well 

[jira] [Resolved] (KAFKA-12257) Consumer mishandles topics deleted and recreated with the same name

2021-11-17 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12257?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-12257.
-
Resolution: Fixed

>  Consumer mishandles topics deleted and recreated with the same name
> 
>
> Key: KAFKA-12257
> URL: https://issues.apache.org/jira/browse/KAFKA-12257
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.2.2, 2.3.1, 2.4.1, 2.5.1, 2.6.1, 2.7.1, 2.8.1
>Reporter: Ryan Leslie
>Assignee: lqjacklee
>Priority: Blocker
> Fix For: 3.1.0, 2.8.2, 3.0.0
>
> Attachments: KAFKA-12257-1.patch, KAFKA-12257.patch
>
>
> In KAFKA-7738, caching of leader epochs (KIP-320) was added to 
> o.a.k.c.Metadata to ignore metadata responses with epochs smaller than the 
> last seen epoch.
> The current implementation can cause problems in cases where a consumer is 
> subscribed to a topic that has been deleted and then recreated with the same 
> name. This is something seen more often in consumers that subscribe to a 
> multitude of topics using a wildcard.
> Currently, when a topic is deleted and the Fetcher receives 
> UNKNOWN_TOPIC_OR_PARTITION, the leader epoch is not cleared. If at a later 
> time while the consumer is still running a topic is created with the same 
> name, the leader epochs are set to 0 for the new topics partitions, and are 
> likely smaller than those for the previous topic. For example, if a broker 
> had restarted during the lifespan of the previous topic, the leader epoch 
> would be at least 1 or 2. In this case the metadata will be ignored since it 
> is incorrectly considered stale. Of course, the user will sometimes get 
> lucky, and if a topic was only recently created so that the epoch is still 0, 
> no problem will occur on recreation. The issue is also not seen when 
> consumers happen to have been restarted in between deletion and recreation.
> The most common side effect of the new metadata being disregarded is that the 
> new partitions end up assigned but the Fetcher is unable to fetch data 
> because it does not know the leaders. When recreating a topic with the same 
> name it is likely that the partition leaders are not the same as for the 
> previous topic, and the number of partitions may even be different. Besides 
> not being able to retrieve data for the new topic, there is a more sinister 
> side effect of the Fetcher triggering a metadata update after the fetch 
> fails. The subsequent update will again ignore the topic's metadata if the 
> leader epoch is still smaller than the cached value. This metadata refresh 
> loop can continue indefinitely and with a sufficient number of consumers may 
> even put a strain on a cluster since the requests are occurring in a tight 
> loop. This can also be hard for clients to identify since there is nothing 
> logged by default that would indicate what's happening. Both the Metadata 
> class's logging of "_Not replacing existing epoch_", and the Fetcher's 
> logging of "_Leader for partition  is unknown_" are at DEBUG level.
> A second possible side effect was observed where if the consumer is acting as 
> leader of the group and happens to not have any current data for the previous 
> topic, e.g. it was cleared due to a metadata error from a broker failure, 
> then the new topic's partitions may simply end up unassigned within the 
> group. This is because while the subscription list contains the recreated 
> topic the metadata for it was previously ignored due to the leader epochs. In 
> this case the user would see logs such as:
> {noformat}
> WARN o.a.k.c.c.i.ConsumerCoordinator [Consumer clientId=myClientId, 
> groupId=myGroup] The following subscribed topics are not assigned to any 
> members: [myTopic]{noformat}
> Interestingly, I believe the Producer is less affected by this problem since 
> o.a.k.c.p.i.ProducerMetadata explicitly clears knowledge of its topics in 
> retainTopics() after each metadata expiration. ConsumerMetadata does no such 
> thing.
> To reproduce this issue:
>  # Turn on DEBUG logging, e.g. for org.apache.kafka.clients.consumer and 
> org.apache.kafka.clients.Metadata
>  # Begin a consumer for a topic (or multiple topics)
>  # Restart a broker that happens to be a leader for one of the topic's 
> partitions
>  # Delete the topic
>  # Create another topic with the same name
>  # Publish data for the new topic
>  # The consumer will not receive data for the new topic, and there will be a 
> high rate of metadata requests.
>  # The issue can be corrected by restarting the consumer or restarting 
> brokers until leader epochs are large enough
> I believe KIP-516 (unique topic ids) will likely fix this problem, since 
> after those changes the leader epoch map should be keyed off of 

Re: [DISCUSS] KIP-796: Interactive Query v2

2021-11-17 Thread John Roesler
Thanks for the reply, Sagar,

Thanks for bringing up the point about documentation, I do
think it would be a great idea for us to add a section to
the IQ doc page that's basically a "store extension guide"
that gives an overview of how to implement custom queries
and custom stores. That would help people see how to go
about extending Streams to meet their own needs, and also
how to put together a PR to add new queries to Kafka Streams
if/when they want to contribute their new queries upstream.

I will mention that when I make my next batch of updates to
the KIP (hopefully today).



Regarding remote query, the short answer is that, no, this
KIP doesn't imclude any new remote query capabilities.

I have mulled over remote queries for a while now. On one
hand, I would be really cool if Streams provided that
functionality natively. On the other hand, it introduces an
entirely new client-to-client communication pattern, which
doesn't exist anywhere in Apache Kafka today. I'm worried
that such an expansion would open Pandora's box in terms of
the complexity of configuring Streams, security models, etc.
It's possible, if IQ becomes a much more significant part of
Streams's capabilities, that the benefits of implementing
remote query could one day overcome the costs, but it
doesn't seem like that day is today.

That's the main reason I've held off from proposing remote
query capabilities in the past. Specifically for this KIP,
it's just outside the scope; this KIP is really focused on
improving the framework for executing local queries.

Thanks again!
-John

On Wed, 2021-11-17 at 22:09 +0530, Sagar wrote:
> Thanks John for answering the 2 questions. Pt #1 makes sense to me now.
> 
> Regarding Pt #2, first of all thanks for bringing up KIP-614 :D I did learn
> about the interfaces the hard way and probably due to that, the PR really
> stretched a lot. Having said that, the point that you mentioned about any
> future implementations needing to worry about the base stores, caching and
> metered stores, would it make sense to add them explicitly to the KIP and
> also to Javadocs if possible? That would guide the future contributors.
> WDYT?
> 
> The other question I have is (may be irrelevant) but with these changes, is
> there going to be any impact on remote state store querying capabilities?
> 
> Thanks!
> Sagar.
> 
> On Tue, Nov 16, 2021 at 4:22 AM John Roesler  wrote:
> 
> > Hi Patrick and Sagar,
> > 
> > Thanks for the feedback! I'll just break out the questions
> > and address them one at a time.
> > 
> > Patrick 1.
> > The default bound that I'm proposing is only to let active
> > tasks answer queries (which is also the default with IQ
> > today). Therefore, calling getPositionBound() would return a
> > PositionBound for which isLatest() is true.
> > 
> > Patrick 2.
> > I might have missed something in revision, but I'm not sure
> > what you're referring to exactly when you say they are
> > different. The IQRequest only has a PositionBound, and the
> > IQResponse only has a (concrete) Position, so I think they
> > are named accordingly (getPositionBound and getPosition). Am
> > I overlooking what you are talking about?
> > 
> > Sagar 1.
> > I think you're talking about the KeyValueStore#get(key)
> > method? This is a really good question. I went ahead and
> > dropped in an addendum to the KeyQuery example to show how
> > you would run the query in today's API. Here's a caracature
> > of the two APIS:
> > 
> > current:
> >   KeyValueStore store = kafkaStreams.store(
> > "mystore",
> > keyValueStore())
> >   int value = store.get(key);
> > 
> > proposed:
> >   int value = kafkaStreams.query(
> > "mystore",
> > KeyQuery.withKey(key));
> > 
> > So, today we first get the store interface and then we
> > invoke the method, and under the proposal, we would instead
> > just ask KafkaStreams to execute the query on the store. In
> > addition to all the other stuff I said in the motivation,
> > one thing I think is neat about this API is that it means we
> > can re-use queries across stores. So, for example, we could
> > also use KeyQuery on WindowStores, even though there's no
> > common interface between WindowStore and KeyValueStore.
> > 
> > In other words, stores can support any queries that make
> > sense and _not_ support any queries that don't make sense.
> > This gets into your second question...
> > 
> > Sagar 2.
> > Very good question. Your experience with your KIP-614
> > contribution was one of the things that made me want to
> > revise IQ to begin with. It seems like there's a really
> > stark gap between how straightforward the proposal is to add
> > a new store operation, and then how hard it is to actually
> > implement a new operation, due to all those intervening
> > wrappers.
> > 
> > There are two categories of wrappers to worry about:
> > - Facades: These only exist to disallow access to write
> > APIs, which are exposed through IQ today but shouldn't be
> > called. These are 

[jira] [Resolved] (KAFKA-13397) MirrorMaker2 shouldn't mirror Connect's internal topics

2021-11-17 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13397?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison resolved KAFKA-13397.

Fix Version/s: 3.2.0
   Resolution: Fixed

> MirrorMaker2 shouldn't mirror Connect's internal topics
> ---
>
> Key: KAFKA-13397
> URL: https://issues.apache.org/jira/browse/KAFKA-13397
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Reporter: Dongjin Lee
>Assignee: Dongjin Lee
>Priority: Minor
> Fix For: 3.2.0
>
>
> This issue is a follow-up of KAFKA-10777 
> ([KIP-690|https://cwiki.apache.org/confluence/display/KAFKA/KIP-690%3A+Add+additional+configuration+to+control+MirrorMaker+2+internal+topics+naming+convention]).
> As of present, a user can set custom 'replication.policy.separator' 
> configuration in MirrorMaker 2. It determines the topic name of internal 
> topics like heartbeats, checkpoints, and offset-syncs 
> ([KIP-690|https://cwiki.apache.org/confluence/display/KAFKA/KIP-690%3A+Add+additional+configuration+to+control+MirrorMaker+2+internal+topics+naming+convention]).
> However, there are some glitches here:
>  # MirrorMaker2 creates internal topics to track the offsets, configs, and 
> status of the MM2 tasks. But, these topics are not affected by a custom 
> 'replication.policy.separator' settings - that is, these topics may be 
> replicated against the user`s intention.
>  # The internal topic names include a dash in their name (e.g., 
> 'mm2-offsets.\{source}.internal') so, a single '-' should be disallowed when 
> configuring 'replication.policy.separator'.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: Wiki Permissions

2021-11-17 Thread Matthias J. Sax

Done.

On 11/16/21 9:59 PM, Riven Sun wrote:

Hello, team
I'd like to be added to the contributors list, so I can submit a KIP. My
Jira ID is: RivenSun
Wiki ID: rivensun

Best Regards

---

RivenSun



[jira] [Resolved] (KAFKA-13460) Issue reporting

2021-11-17 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13460?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-13460.
-
Resolution: Won't Fix

> Issue reporting
> ---
>
> Key: KAFKA-13460
> URL: https://issues.apache.org/jira/browse/KAFKA-13460
> Project: Kafka
>  Issue Type: Wish
>  Components: KafkaConnect
>Affects Versions: 1.1.1
>Reporter: Mikolaj Ryll
>Priority: Critical
> Fix For: 1.0.3
>
>
> I would like to be able to report issue using github. Plx.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13460) Issue reporting

2021-11-17 Thread Mikolaj Ryll (Jira)
Mikolaj Ryll created KAFKA-13460:


 Summary: Issue reporting
 Key: KAFKA-13460
 URL: https://issues.apache.org/jira/browse/KAFKA-13460
 Project: Kafka
  Issue Type: Wish
  Components: KafkaConnect
Affects Versions: 1.1.1
Reporter: Mikolaj Ryll
 Fix For: 1.0.3


I would like to be able to report issue using github. Plx.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] KIP-796: Interactive Query v2

2021-11-17 Thread Sagar
Thanks John for answering the 2 questions. Pt #1 makes sense to me now.

Regarding Pt #2, first of all thanks for bringing up KIP-614 :D I did learn
about the interfaces the hard way and probably due to that, the PR really
stretched a lot. Having said that, the point that you mentioned about any
future implementations needing to worry about the base stores, caching and
metered stores, would it make sense to add them explicitly to the KIP and
also to Javadocs if possible? That would guide the future contributors.
WDYT?

The other question I have is (may be irrelevant) but with these changes, is
there going to be any impact on remote state store querying capabilities?

Thanks!
Sagar.

On Tue, Nov 16, 2021 at 4:22 AM John Roesler  wrote:

> Hi Patrick and Sagar,
>
> Thanks for the feedback! I'll just break out the questions
> and address them one at a time.
>
> Patrick 1.
> The default bound that I'm proposing is only to let active
> tasks answer queries (which is also the default with IQ
> today). Therefore, calling getPositionBound() would return a
> PositionBound for which isLatest() is true.
>
> Patrick 2.
> I might have missed something in revision, but I'm not sure
> what you're referring to exactly when you say they are
> different. The IQRequest only has a PositionBound, and the
> IQResponse only has a (concrete) Position, so I think they
> are named accordingly (getPositionBound and getPosition). Am
> I overlooking what you are talking about?
>
> Sagar 1.
> I think you're talking about the KeyValueStore#get(key)
> method? This is a really good question. I went ahead and
> dropped in an addendum to the KeyQuery example to show how
> you would run the query in today's API. Here's a caracature
> of the two APIS:
>
> current:
>   KeyValueStore store = kafkaStreams.store(
> "mystore",
> keyValueStore())
>   int value = store.get(key);
>
> proposed:
>   int value = kafkaStreams.query(
> "mystore",
> KeyQuery.withKey(key));
>
> So, today we first get the store interface and then we
> invoke the method, and under the proposal, we would instead
> just ask KafkaStreams to execute the query on the store. In
> addition to all the other stuff I said in the motivation,
> one thing I think is neat about this API is that it means we
> can re-use queries across stores. So, for example, we could
> also use KeyQuery on WindowStores, even though there's no
> common interface between WindowStore and KeyValueStore.
>
> In other words, stores can support any queries that make
> sense and _not_ support any queries that don't make sense.
> This gets into your second question...
>
> Sagar 2.
> Very good question. Your experience with your KIP-614
> contribution was one of the things that made me want to
> revise IQ to begin with. It seems like there's a really
> stark gap between how straightforward the proposal is to add
> a new store operation, and then how hard it is to actually
> implement a new operation, due to all those intervening
> wrappers.
>
> There are two categories of wrappers to worry about:
> - Facades: These only exist to disallow access to write
> APIs, which are exposed through IQ today but shouldn't be
> called. These are simply unnecessary under IQv2, since we
> only run queries instead of returning the whole store.
> - Store Layers: This is what you provided examples of. We
> have store layers that let us compose features like
> de/serialization and metering, changelogging, caching, etc.
> A nice thing about this design is that we mostly don't have
> to worry at all about those wrapper layers at all. Each of
> these stores would simply delegate any query to lower layers
> unless there is something they need to do. In my POC, I
> simply added a delegating implementation to
> WrappedStateStore, which meant that I didn't need to touch
> most of the wrappers when I added a new query.
>
> Here's what I think future contributors will have to worry
> about:
> 1. The basic query execution in the base byte stores
> (RocksDB and InMemory)
> 2. The Caching stores IF they want the query to be served
> from the cache
> 3. The Metered stores IF some serialization needs to be done
> for the query
>
> And that's it! We should be able to add new queries without
> touching any other store layer besides those, and each one
> of those is involved because it has some specific reason to
> be.
>
>
> Thanks again, Patrick and Sagar! Please let me know if I
> failed to address your questions, or if you have any more.
>
> Thanks,
> -John
>
> On Mon, 2021-11-15 at 22:37 +0530, Sagar wrote:
> > Hi John,
> >
> > Thanks for the great writeup! Couple of things I wanted to bring up(may
> or
> > mayn't be relevant):
> >
> > 1) The sample implementation that you have presented for KeyQuery is very
> > helpful. One thing which may be added to it is how it connects to the
> > KeyValue.get(key) method. That's something that atleast I couldn't
> totally
> > figure out-not sure about others though. I understand that it is out 

[jira] [Created] (KAFKA-13459) MM2 should be able to add the source offset to the record header

2021-11-17 Thread Daniel Urban (Jira)
Daniel Urban created KAFKA-13459:


 Summary: MM2 should be able to add the source offset to the record 
header
 Key: KAFKA-13459
 URL: https://issues.apache.org/jira/browse/KAFKA-13459
 Project: Kafka
  Issue Type: Improvement
  Components: mirrormaker
Reporter: Daniel Urban


MM2 could add the source offset to the record header to help with diagnostics 
in some use-cases.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Wiki Permissions

2021-11-17 Thread Riven Sun
Hello, team
I'd like to be added to the contributors list, so I can submit a KIP. My
Jira ID is: RivenSun
Wiki ID: rivensun

Best Regards

---

RivenSun


Jenkins build is unstable: Kafka » Kafka Branch Builder » 3.1 #17

2021-11-17 Thread Apache Jenkins Server
See 




[jira] [Created] (KAFKA-13458) The Stream is not able to consume from some of the partitions

2021-11-17 Thread Darshan Marathe (Jira)
Darshan Marathe created KAFKA-13458:
---

 Summary: The Stream is not able to consume from some of the 
partitions
 Key: KAFKA-13458
 URL: https://issues.apache.org/jira/browse/KAFKA-13458
 Project: Kafka
  Issue Type: Bug
Reporter: Darshan Marathe


Hi Team

Kafka-stream version: 2.6.0

some messages are stuck in the following partitions, and the stream is not able 
to consume them from those partitions.

Restart the stream multiple times, but still issue is same.


Have faced the following issue,
The following partitions still have unstable offsets which are not cleared on 
the broker side: [TASK_STREAM-29], this could be either transactional offsets 
waiting for completion, or normal offsets waiting for replication after 
appending to local log



The following partitions still have unstable offsets which are not cleared on 
the broker side: [TASK_STREAM-0]

The following partitions still have unstable offsets which are not cleared on 
the broker side: [TASK_STREAM-3]

The following partitions still have unstable offsets which are not cleared on 
the broker side: [TASK_STREAM-9]

The following partitions still have unstable offsets which are not cleared on 
the broker side: [TASK_STREAM-14]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: Requesting permissions to contribute to Apache Kafka

2021-11-17 Thread Gunnar Morling
Excellent, thank you so much for the quick help, David!

--Gunnar


Am Mi., 17. Nov. 2021 um 10:42 Uhr schrieb David Jacot
:

> Hi Gunnar,
>
> I have granted you the requested permissions.
>
> I am looking forward to your contributions.
>
> Best,
> David
>
> On Wed, Nov 17, 2021 at 10:32 AM Gunnar Morling
>  wrote:
> >
> > Hi,
> >
> > As per the instructions given in [1], I would like to request the
> > permissions for creating a KIP. My ids are:
> >
> > * Wiki: gunnarmorling
> > * Jira: gunnar.morling
> >
> > Thanks a lot,
> >
> > --Gunnar
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals
>


Re: Requesting permissions to contribute to Apache Kafka

2021-11-17 Thread David Jacot
Hi Gunnar,

I have granted you the requested permissions.

I am looking forward to your contributions.

Best,
David

On Wed, Nov 17, 2021 at 10:32 AM Gunnar Morling
 wrote:
>
> Hi,
>
> As per the instructions given in [1], I would like to request the
> permissions for creating a KIP. My ids are:
>
> * Wiki: gunnarmorling
> * Jira: gunnar.morling
>
> Thanks a lot,
>
> --Gunnar
>
> [1]
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals


Requesting permissions to contribute to Apache Kafka

2021-11-17 Thread Gunnar Morling
Hi,

As per the instructions given in [1], I would like to request the
permissions for creating a KIP. My ids are:

* Wiki: gunnarmorling
* Jira: gunnar.morling

Thanks a lot,

--Gunnar

[1]
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals