Re: [Streams] GlobalStateManagerImpl#restoreState locally restores previously deleted records

2022-12-08 Thread Patrick D’Addona
> In your case you also delete if the value is not null and if the value 
> not-equals "deleteme", right? Ie, you use non-tombstone records as deletes 
> what is just not allowed/supported.

The "deleteme" String was only for testing, the issue also happens without it, 
i.e. if there is a "real" tombstone with `value == null` on the input topic.
I do use the input topic as a changelog for my global table. tombstones are 
sent directly to that topic from a kafka streams operation before the actual 
store.

> I cannot explain why all() and get(key) actually give you different
result with respect to `key`. If a key is resurrected during a restore,
both method should return it. Not sure why `get(key)` returns `null`
even if `all()` contains the key... I would rather expect that both
return the resurrected key.

That's why I think this is different from KAFKA-7663.
The **foo.bar.globaltopic** topic currently looks like this
|timestamp|key|value|
|2022-08-10T14:23:51.768|foo|foo|
|2022-08-10T14:23:51.836|foo|foo|
|2022-08-10T14:23:52.126|bar|bar|
|2022-08-10T14:23:52.398|foo|foo|
|2022-08-10T14:23:53.353|bar|bar|
|2022-08-10T14:23:53.098|foo||
|2022-08-10T14:23:54.367|bar|bar|

After I delete the kafka-streams.state.dir and restart the application, I get
store.get("foo") -> null
store.get("bar") -> "bar"
store.all() -> "foo" and "bar"

Hope that explains it better.

- Patrick




Patrick D’Addona
Senior Lead IT Architect


Mobile: +49 151 544 22 161
patrick.dadd...@maibornwolff.de
Theresienhöhe 13, 80339 München

MaibornWolff GmbH, Theresienhoehe 13, D-80339 Munich, Germany
www.maibornwolff.de, Phone +49 89 544 253 000
USt-ID DE 129 299 525, Munich District Court HRB 98058
Managing Directors: Volker Maiborn, Holger Wolff, Alexander Hofmann,
Florian Theimer, Marcus Adlwart, Dr. Martina Beck, Christian Loos.



From: Matthias J. Sax 
Sent: Friday, December 9, 2022 01:11
To: dev@kafka.apache.org 
Subject: Re: [Streams] GlobalStateManagerImpl#restoreState locally restores 
previously deleted records

> The way I see it, KAFKA-7663 says, "a global store will be exactly the input 
> topic after restore, regardless of the processor"

Not sure what you mean by this? The issue the tickets describe is, that
if you don't do a plain `put(key,value)` in your processor, stuff breaks
right now. (Note that `delete(key)` and `put(key,null)` is the same).


It's a known issue, bad API, and also bad documentation on our side, and
I guess you can call it a bug if you wish. However, you can only use
tombstones as deletes right now. Thus, what you do "wrong" is

> if (record.value() == null == record.value().equals("deleteme")) {
> store.delete(record.key());
> }

In your case you also delete if the value is not null and if the value
not-equals "deleteme", right? Ie, you use non-tombstone records as
deletes what is just not allowed/supported.

The issue is that during restore only `null` values, ie, actual
tombstones are handled as deletes and thus, if you delete a key using a
non-tombstone record in your processor, this key can be resurrected
during restore.


I cannot explain why all() and get(key) actually give you different
result with respect to `key`. If a key is resurrected during a restore,
both method should return it. Not sure why `get(key)` returns `null`
even if `all()` contains the key... I would rather expect that both
return the resurrected key.

Hope this helps.


-Matthias


On 12/8/22 12:00 PM, Patrick D’Addona wrote:
> Hi,
>
> I don't think this issue is exactly the same as KAFKA-7663.
>
> The way I see it, KAFKA-7663 says, "a global store will be exactly the input 
> topic after restore, regardless of the processor"
> My issue here, is that the global store after restore is inconsistent with 
> the input topic and the store itself.
> Because it finds records with key "foo" using **store.all()** that it can not 
> find via **store.get("foo")**.
> The **store.get()** is consistent with my input topic, where the tombstone is 
> the latest entry for the key "foo", reflecting the **delete("foo")** 
> operation on the store.
> But still, looping over the store returns a record with "foo" as a key and a 
> non null value.
>
> If the store acts like a Map, where you can call **get(k)** and **put(k, 
> v)**, then looping over it should only find entries, that actually exist and 
> have a value when using **get(k)**.
> Restoring something that breaks this connection seems wrong, even if that 
> restoring ignores the processor and directly writes to the store.
> It should remove keys, for which the last entry is a tombstone from the 
> **all()** iterator, regardless whether the restore process uses a custom 
> processor as KAFKA-7663 wants, or simply reads the topic as it currently does.
>
> Kind Regards,
> Patrick
>
> 
> From: Colt McNealy 
> Sent: Thursday, December 8, 2022 17:54

Build failed in Jenkins: Kafka » Kafka Branch Builder » 3.1 #137

2022-12-08 Thread Apache Jenkins Server
See 


Changes:


--
[...truncated 440141 lines...]
[2022-12-08T22:15:13.639Z] 
[2022-12-08T22:15:13.639Z] You can use '--warning-mode all' to show the 
individual deprecation warnings and determine if they come from your own 
scripts or plugins.
[2022-12-08T22:15:13.639Z] 
[2022-12-08T22:15:13.639Z] See 
https://docs.gradle.org/7.2/userguide/command_line_interface.html#sec:command_line_warnings
[2022-12-08T22:15:13.639Z] 
[2022-12-08T22:15:13.639Z] Execution optimizations have been disabled for 3 
invalid unit(s) of work during this build to ensure correctness.
[2022-12-08T22:15:13.639Z] Please consult deprecation warnings for more details.
[2022-12-08T22:15:13.639Z] 
[2022-12-08T22:15:13.639Z] BUILD SUCCESSFUL in 3m 44s
[2022-12-08T22:15:13.639Z] 77 actionable tasks: 38 executed, 39 up-to-date
[Pipeline] sh
[2022-12-08T22:15:17.067Z] + grep ^version= gradle.properties
[2022-12-08T22:15:17.067Z] + cut -d= -f 2
[Pipeline] dir
[2022-12-08T22:15:18.071Z] Running in 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka_3.1/streams/quickstart
[Pipeline] {
[Pipeline] sh
[2022-12-08T22:15:20.753Z] + mvn clean install -Dgpg.skip
[2022-12-08T22:15:21.754Z] [INFO] Scanning for projects...
[2022-12-08T22:15:21.754Z] [INFO] 

[2022-12-08T22:15:21.754Z] [INFO] Reactor Build Order:
[2022-12-08T22:15:21.754Z] [INFO] 
[2022-12-08T22:15:21.754Z] [INFO] Kafka Streams :: Quickstart   
 [pom]
[2022-12-08T22:15:21.754Z] [INFO] streams-quickstart-java   
 [maven-archetype]
[2022-12-08T22:15:21.754Z] [INFO] 
[2022-12-08T22:15:21.754Z] [INFO] < 
org.apache.kafka:streams-quickstart >-
[2022-12-08T22:15:21.754Z] [INFO] Building Kafka Streams :: Quickstart 
3.1.3-SNAPSHOT[1/2]
[2022-12-08T22:15:21.754Z] [INFO] [ pom 
]-
[2022-12-08T22:15:22.755Z] [INFO] 
[2022-12-08T22:15:22.755Z] [INFO] --- maven-clean-plugin:3.0.0:clean 
(default-clean) @ streams-quickstart ---
[2022-12-08T22:15:22.755Z] [INFO] 
[2022-12-08T22:15:22.755Z] [INFO] --- maven-remote-resources-plugin:1.5:process 
(process-resource-bundles) @ streams-quickstart ---
[2022-12-08T22:15:23.757Z] [INFO] 
[2022-12-08T22:15:23.757Z] [INFO] --- maven-site-plugin:3.5.1:attach-descriptor 
(attach-descriptor) @ streams-quickstart ---
[2022-12-08T22:15:25.762Z] [INFO] 
[2022-12-08T22:15:25.762Z] [INFO] --- maven-gpg-plugin:1.6:sign 
(sign-artifacts) @ streams-quickstart ---
[2022-12-08T22:15:25.762Z] [INFO] 
[2022-12-08T22:15:25.762Z] [INFO] --- maven-install-plugin:2.5.2:install 
(default-install) @ streams-quickstart ---
[2022-12-08T22:15:25.762Z] [INFO] Installing 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka_3.1/streams/quickstart/pom.xml
 to 
/home/jenkins/.m2/repository/org/apache/kafka/streams-quickstart/3.1.3-SNAPSHOT/streams-quickstart-3.1.3-SNAPSHOT.pom
[2022-12-08T22:15:25.762Z] [INFO] 
[2022-12-08T22:15:25.762Z] [INFO] --< 
org.apache.kafka:streams-quickstart-java >--
[2022-12-08T22:15:25.762Z] [INFO] Building streams-quickstart-java 
3.1.3-SNAPSHOT[2/2]
[2022-12-08T22:15:25.762Z] [INFO] --[ maven-archetype 
]---
[2022-12-08T22:15:25.762Z] [INFO] 
[2022-12-08T22:15:25.762Z] [INFO] --- maven-clean-plugin:3.0.0:clean 
(default-clean) @ streams-quickstart-java ---
[2022-12-08T22:15:25.762Z] [INFO] 
[2022-12-08T22:15:25.762Z] [INFO] --- maven-remote-resources-plugin:1.5:process 
(process-resource-bundles) @ streams-quickstart-java ---
[2022-12-08T22:15:25.762Z] [INFO] 
[2022-12-08T22:15:25.762Z] [INFO] --- maven-resources-plugin:2.7:resources 
(default-resources) @ streams-quickstart-java ---
[2022-12-08T22:15:25.762Z] [INFO] Using 'UTF-8' encoding to copy filtered 
resources.
[2022-12-08T22:15:25.762Z] [INFO] Copying 6 resources
[2022-12-08T22:15:25.762Z] [INFO] Copying 3 resources
[2022-12-08T22:15:25.762Z] [INFO] 
[2022-12-08T22:15:25.762Z] [INFO] --- maven-resources-plugin:2.7:testResources 
(default-testResources) @ streams-quickstart-java ---
[2022-12-08T22:15:25.762Z] [INFO] Using 'UTF-8' encoding to copy filtered 
resources.
[2022-12-08T22:15:25.762Z] [INFO] Copying 2 resources
[2022-12-08T22:15:25.762Z] [INFO] Copying 3 resources
[2022-12-08T22:15:25.762Z] [INFO] 
[2022-12-08T22:15:25.762Z] [INFO] --- maven-archetype-plugin:2.2:jar 
(default-jar) @ streams-quickstart-java ---
[2022-12-08T22:15:26.767Z] [INFO] Building archetype jar: 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka_3.1/streams/quickstart/java/target/streams-quickstart-java-3.1.3-SNAPSHOT
[2022-12-08T22:15:26.767Z] [INFO] 
[2022-12-08T22:15:26.767Z] [INFO] --- maven-site-plugin:3.5.1:attach-descriptor 
(attach-descriptor) @ 

Re: [VOTE] KIP-837 Allow MultiCasting a Result Record.

2022-12-08 Thread Sagar
Thanks Matthias,

Well, as things stand, we did have internal discussions on this and it
seemed ok to open it up for IQ and more importantly not ok to have it
opened up for FK-Join. And more importantly, the PR for this is already
merged and some of these things came up during that. Here's the PR link:
https://github.com/apache/kafka/pull/12803.

Thanks!
Sagar.


On Fri, Dec 9, 2022 at 5:15 AM Matthias J. Sax  wrote:

> Ah. Missed it as it does not have a nice "code block" similar to
> `StreamPartitioner` changes.
>
> I understand the motivation, but I am wondering if we might head into a
> tricky direction? State stores (at least the built-in ones) and IQ are
> kinda build with the idea to have sharded data and that a multi-cast of
> keys is an anti-pattern?
>
> Maybe it's fine, but I also don't want to open Pandora's Box. Are we
> sure that generalizing the concepts does not cause issues in the future?
>
> Ie, should we claim that the multi-cast feature should be used for
> KStreams only, but not for KTables?
>
> Just want to double check that we are not doing something we regret later.
>
>
> -Matthias
>
>
> On 12/7/22 6:45 PM, Sagar wrote:
> > Hi Mathias,
> >
> > I did save it. The changes are added under Public Interfaces (Pt#2 about
> > enhancing KeyQueryMetadata with partitions method) and
> > throwing IllegalArgumentException when StreamPartitioner#partitions
> method
> > returns multiple partitions for just FK-join instead of the earlier
> decided
> > FK-Join and IQ.
> >
> > The background is that for IQ, if the users have multi casted records to
> > multiple partitions during ingestion but the fetch returns only a single
> > partition, then it would be wrong. That's why the restriction was lifted
> > for IQ and that's the reason KeyQueryMetadata now has another
> partitions()
> > method to signify the same.
> >
> > FK-Join also has a similar case, but while reviewing it was felt that
> > FK-Join on it's own is fairly complicated and we don't need this feature
> > right away so the restriction still exists.
> >
> > Thanks!
> > Sagar.
> >
> >
> > On Wed, Dec 7, 2022 at 9:42 PM Matthias J. Sax  wrote:
> >
> >> I don't see any update on the wiki about it. Did you forget to hit
> "save"?
> >>
> >> Can you also provide some background? I am not sure right now if I
> >> understand the proposed changes?
> >>
> >>
> >> -Matthias
> >>
> >> On 12/6/22 6:36 PM, Sophie Blee-Goldman wrote:
> >>> Thanks Sagar, this makes sense to me -- we clearly need additional
> >> changes
> >>> to
> >>> avoid breaking IQ when using this feature, but I agree with continuing
> to
> >>> restrict
> >>> FKJ since they wouldn't stop working without it, and would become much
> >>> harder
> >>> to reason about (than they already are) if we did enable them to use
> it.
> >>>
> >>> And of course, they can still multicast the final results of a FKJ,
> they
> >>> just can't
> >>> mess with the internal workings of it in this way.
> >>>
> >>> On Tue, Dec 6, 2022 at 9:48 AM Sagar 
> wrote:
> >>>
>  Hi All,
> 
>  I made a couple of edits to the KIP which came up during the code
> >> review.
>  Changes at a high level are:
> 
>  1) KeyQueryMetada enhanced to have a new method called partitions().
>  2) Lifting the restriction of a single partition for IQ. Now the
>  restriction holds only for FK Join.
> 
>  Updated KIP:
> 
> >>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883356
> 
>  Thanks!
>  Sagar.
> 
>  On Mon, Sep 12, 2022 at 6:43 PM Sagar 
> >> wrote:
> 
> > Thanks Bruno,
> >
> > Marking this as accepted.
> >
> > Thanks everyone for their comments/feedback.
> >
> > Thanks!
> > Sagar.
> >
> > On Mon, Sep 12, 2022 at 1:53 PM Bruno Cadonna 
>  wrote:
> >
> >> Hi Sagar,
> >>
> >> Thanks for the update and the PR!
> >>
> >> +1 (binding)
> >>
> >> Best,
> >> Bruno
> >>
> >> On 10.09.22 18:57, Sagar wrote:
> >>> Hi Bruno,
> >>>
> >>> Thanks, I think these changes make sense to me. I have updated the
> >> KIP
> >>> accordingly.
> >>>
> >>> Thanks!
> >>> Sagar.
> >>>
> >>> On Wed, Sep 7, 2022 at 2:16 PM Bruno Cadonna 
> >> wrote:
> >>>
>  Hi Sagar,
> 
>  I would not drop the support for dropping records. I would also
> not
>  return null from partitions(). Maybe an Optional can help here. An
> >> empty
>  Optional would mean to use the default partitioning behavior of
> the
>  producer. So we would have:
> 
>  - non-empty Optional, non-empty list of integers: partitions to
> send
> >> the
>  record to
>  - non-empty Optional, empty list of integers: drop the record
>  - empty Optional: use default behavior
> 
>  What do other think?
> 
>  Best,
>  Bruno
> 
>  On 02.09.22 

Re: [VOTE] KIP-878: Internal Topic Autoscaling for Kafka Streams

2022-12-08 Thread Sophie Blee-Goldman
Thanks all! That closes the voting out and the KIP is accepted with four +1
(binding) votes
from Matthias, Bruno, and Bill (and myself).

Best,
Sophie

On Thu, Dec 8, 2022 at 8:45 AM Bill Bejeck  wrote:

> Thanks for the KIP, Sophie.
>
> +1(binding)
>
> -Bill
>
> On Thu, Dec 8, 2022 at 11:39 AM Bruno Cadonna  wrote:
>
> > Thanks for the KIP!
> >
> > +1 (binding)
> >
> > Best,
> > Bruno
> >
> > On 07.12.22 17:24, Matthias J. Sax wrote:
> > > +1 (binding)
> > >
> > > On 12/1/22 9:39 PM, Sophie Blee-Goldman wrote:
> > >> Thanks to all who participated for a great discussion on this KIP.
> Seems
> > >> we're ready to kick off the voting on this, but please don't hesitate
> to
> > >> call
> > >> out anything of concern or raise questions over on the voting thread.
> > >>
> > >> Otherwise, please give it a final look over and cast your vote!
> > >>
> > >> KIP-878: Internal Topic Autoscaling for Kafka Streams
> > >> <
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-878%3A+Internal+Topic+Autoscaling+for+Kafka+Streams
> > >
> > >> (note the change in name to reflect the decisions in the KIP
> discussion)
> > >>
> > >> Thanks,
> > >> Sophie
> > >>
> >
>


Jenkins build is unstable: Kafka » Kafka Branch Builder » trunk #1418

2022-12-08 Thread Apache Jenkins Server
See 




Build failed in Jenkins: Kafka » Kafka Branch Builder » 3.3 #132

2022-12-08 Thread Apache Jenkins Server
See 


Changes:


--
[...truncated 339489 lines...]
[2022-12-09T01:18:21.724Z] > Task :connect:api:javadocJar
[2022-12-09T01:18:21.724Z] > Task 
:connect:json:publishMavenJavaPublicationToMavenLocal
[2022-12-09T01:18:21.724Z] > Task :connect:json:publishToMavenLocal
[2022-12-09T01:18:21.724Z] > Task :connect:api:compileTestJava UP-TO-DATE
[2022-12-09T01:18:21.724Z] > Task :connect:api:testClasses UP-TO-DATE
[2022-12-09T01:18:21.724Z] > Task :connect:api:testJar
[2022-12-09T01:18:21.724Z] > Task :connect:api:testSrcJar
[2022-12-09T01:18:21.724Z] > Task 
:connect:api:publishMavenJavaPublicationToMavenLocal
[2022-12-09T01:18:21.724Z] > Task :connect:api:publishToMavenLocal
[2022-12-09T01:18:23.466Z] 
[2022-12-09T01:18:23.466Z] > Task :streams:javadoc
[2022-12-09T01:18:23.466Z] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka_3.3/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java:890:
 warning - Tag @link: reference not found: DefaultPartitioner
[2022-12-09T01:18:23.466Z] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka_3.3/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java:919:
 warning - Tag @link: reference not found: DefaultPartitioner
[2022-12-09T01:18:23.466Z] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka_3.3/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java:939:
 warning - Tag @link: reference not found: DefaultPartitioner
[2022-12-09T01:18:23.466Z] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka_3.3/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java:854:
 warning - Tag @link: reference not found: DefaultPartitioner
[2022-12-09T01:18:23.466Z] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka_3.3/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java:890:
 warning - Tag @link: reference not found: DefaultPartitioner
[2022-12-09T01:18:23.466Z] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka_3.3/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java:919:
 warning - Tag @link: reference not found: DefaultPartitioner
[2022-12-09T01:18:23.466Z] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka_3.3/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java:939:
 warning - Tag @link: reference not found: DefaultPartitioner
[2022-12-09T01:18:23.466Z] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka_3.3/streams/src/main/java/org/apache/kafka/streams/kstream/Produced.java:84:
 warning - Tag @link: reference not found: DefaultPartitioner
[2022-12-09T01:18:23.466Z] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka_3.3/streams/src/main/java/org/apache/kafka/streams/kstream/Produced.java:136:
 warning - Tag @link: reference not found: DefaultPartitioner
[2022-12-09T01:18:23.466Z] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka_3.3/streams/src/main/java/org/apache/kafka/streams/kstream/Produced.java:147:
 warning - Tag @link: reference not found: DefaultPartitioner
[2022-12-09T01:18:23.466Z] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka_3.3/streams/src/main/java/org/apache/kafka/streams/kstream/Repartitioned.java:101:
 warning - Tag @link: reference not found: DefaultPartitioner
[2022-12-09T01:18:23.466Z] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka_3.3/streams/src/main/java/org/apache/kafka/streams/kstream/Repartitioned.java:167:
 warning - Tag @link: reference not found: DefaultPartitioner
[2022-12-09T01:18:23.466Z] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka_3.3/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java:58:
 warning - Tag @link: missing '#': "org.apache.kafka.streams.StreamsBuilder()"
[2022-12-09T01:18:23.466Z] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka_3.3/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java:58:
 warning - Tag @link: can't find org.apache.kafka.streams.StreamsBuilder() in 
org.apache.kafka.streams.TopologyConfig
[2022-12-09T01:18:23.466Z] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka_3.3/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java:38:
 warning - Tag @link: reference not found: ProcessorContext#forward(Object, 
Object) forwards
[2022-12-09T01:18:24.395Z] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka_3.3/streams/src/main/java/org/apache/kafka/streams/query/Position.java:44:
 warning - Tag @link: can't find query(Query,
[2022-12-09T01:18:24.395Z]  PositionBound, boolean) in 
org.apache.kafka.streams.processor.StateStore
[2022-12-09T01:18:24.395Z] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka_3.3/streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java:44:
 warning - Tag @link: can't find query(Query, PositionBound, boolean) in 
org.apache.kafka.streams.processor.StateStore
[2022-12-09T01:18:24.395Z] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka_3.3/streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java:36:
 warning - Tag 

Re: [Streams] GlobalStateManagerImpl#restoreState locally restores previously deleted records

2022-12-08 Thread Matthias J. Sax

The way I see it, KAFKA-7663 says, "a global store will be exactly the input topic 
after restore, regardless of the processor"


Not sure what you mean by this? The issue the tickets describe is, that 
if you don't do a plain `put(key,value)` in your processor, stuff breaks 
right now. (Note that `delete(key)` and `put(key,null)` is the same).



It's a known issue, bad API, and also bad documentation on our side, and 
I guess you can call it a bug if you wish. However, you can only use 
tombstones as deletes right now. Thus, what you do "wrong" is



if (record.value() == null == record.value().equals("deleteme")) {
store.delete(record.key());
} 


In your case you also delete if the value is not null and if the value 
not-equals "deleteme", right? Ie, you use non-tombstone records as 
deletes what is just not allowed/supported.


The issue is that during restore only `null` values, ie, actual 
tombstones are handled as deletes and thus, if you delete a key using a 
non-tombstone record in your processor, this key can be resurrected 
during restore.



I cannot explain why all() and get(key) actually give you different 
result with respect to `key`. If a key is resurrected during a restore, 
both method should return it. Not sure why `get(key)` returns `null` 
even if `all()` contains the key... I would rather expect that both 
return the resurrected key.


Hope this helps.


-Matthias


On 12/8/22 12:00 PM, Patrick D’Addona wrote:

Hi,

I don't think this issue is exactly the same as KAFKA-7663.

The way I see it, KAFKA-7663 says, "a global store will be exactly the input topic 
after restore, regardless of the processor"
My issue here, is that the global store after restore is inconsistent with the 
input topic and the store itself.
Because it finds records with key "foo" using **store.all()** that it can not find via 
**store.get("foo")**.
The **store.get()** is consistent with my input topic, where the tombstone is the latest entry for 
the key "foo", reflecting the **delete("foo")** operation on the store.
But still, looping over the store returns a record with "foo" as a key and a 
non null value.

If the store acts like a Map, where you can call **get(k)** and **put(k, v)**, 
then looping over it should only find entries, that actually exist and have a 
value when using **get(k)**.
Restoring something that breaks this connection seems wrong, even if that 
restoring ignores the processor and directly writes to the store.
It should remove keys, for which the last entry is a tombstone from the 
**all()** iterator, regardless whether the restore process uses a custom 
processor as KAFKA-7663 wants, or simply reads the topic as it currently does.

Kind Regards,
Patrick


From: Colt McNealy 
Sent: Thursday, December 8, 2022 17:54
To: patrick.dadd...@maibornwolff.de.invalid 

Cc: dev@kafka.apache.org 
Subject: Re: [Streams] GlobalStateManagerImpl#restoreState locally restores 
previously deleted records

Hi Patrick,

Your issue is in fact identical to KAFKA-7663. As per that
issue/bug/discussion, if your processor does anything other than simply
pass-through records, the results of initial processing vs restoration are
different.

Global State Stores don't have a changelog topic (for example, in the
processor API, Global State Stores are only valid if the builder has
.withLoggingDisabled()). That's because the processor for the global store
runs on each of your N streams instances, and if the processor on each
instance published to the changelog, then each put/delete would be written
N times, which is wasteful.

The implications to this are that your input topic should be "like" a
changelog:
- Your input topic should NOT have limited retention otherwise you'll lose
old data.
- Your input topic should ideally be compacted if possible

I agree that the API as it stands is highly confusing—why allow users to
provide a processor if it offers a way to "shoot oneself in one's foot?"

Changing that API would probably require a KIP. I don't quite have the
bandwidth to propose + implement such a KIP right now, but if you would
like to, feel free! (perhaps in the spring I may have time)

Your workaround (the init() method) is a good one. Another way to do it
might be to simply have a regular processing step which converts the input
topic into the true "changelog" format before you push it to a global store.

Cheers,
Colt McNealy
*Founder, LittleHorse.io*


On Thu, Dec 8, 2022 at 8:41 AM Patrick D’Addona
 wrote:


Hello,

I have a quarkus application using
**org.apache.kafka:kafka-streams:3.1.0** and found that
* when creating a global table using a compacted topic as input
* entries that have been deleted at some point
* are then no longer returned when iterating over the store with
**store.all()** - as expected
* but after the pod restarts and its kafka streams state directory is
deleted, after restoring from the topic using

Re: [VOTE] KIP-837 Allow MultiCasting a Result Record.

2022-12-08 Thread Matthias J. Sax
Ah. Missed it as it does not have a nice "code block" similar to 
`StreamPartitioner` changes.


I understand the motivation, but I am wondering if we might head into a 
tricky direction? State stores (at least the built-in ones) and IQ are 
kinda build with the idea to have sharded data and that a multi-cast of 
keys is an anti-pattern?


Maybe it's fine, but I also don't want to open Pandora's Box. Are we 
sure that generalizing the concepts does not cause issues in the future?


Ie, should we claim that the multi-cast feature should be used for 
KStreams only, but not for KTables?


Just want to double check that we are not doing something we regret later.


-Matthias


On 12/7/22 6:45 PM, Sagar wrote:

Hi Mathias,

I did save it. The changes are added under Public Interfaces (Pt#2 about
enhancing KeyQueryMetadata with partitions method) and
throwing IllegalArgumentException when StreamPartitioner#partitions method
returns multiple partitions for just FK-join instead of the earlier decided
FK-Join and IQ.

The background is that for IQ, if the users have multi casted records to
multiple partitions during ingestion but the fetch returns only a single
partition, then it would be wrong. That's why the restriction was lifted
for IQ and that's the reason KeyQueryMetadata now has another partitions()
method to signify the same.

FK-Join also has a similar case, but while reviewing it was felt that
FK-Join on it's own is fairly complicated and we don't need this feature
right away so the restriction still exists.

Thanks!
Sagar.


On Wed, Dec 7, 2022 at 9:42 PM Matthias J. Sax  wrote:


I don't see any update on the wiki about it. Did you forget to hit "save"?

Can you also provide some background? I am not sure right now if I
understand the proposed changes?


-Matthias

On 12/6/22 6:36 PM, Sophie Blee-Goldman wrote:

Thanks Sagar, this makes sense to me -- we clearly need additional

changes

to
avoid breaking IQ when using this feature, but I agree with continuing to
restrict
FKJ since they wouldn't stop working without it, and would become much
harder
to reason about (than they already are) if we did enable them to use it.

And of course, they can still multicast the final results of a FKJ, they
just can't
mess with the internal workings of it in this way.

On Tue, Dec 6, 2022 at 9:48 AM Sagar  wrote:


Hi All,

I made a couple of edits to the KIP which came up during the code

review.

Changes at a high level are:

1) KeyQueryMetada enhanced to have a new method called partitions().
2) Lifting the restriction of a single partition for IQ. Now the
restriction holds only for FK Join.

Updated KIP:


https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883356


Thanks!
Sagar.

On Mon, Sep 12, 2022 at 6:43 PM Sagar 

wrote:



Thanks Bruno,

Marking this as accepted.

Thanks everyone for their comments/feedback.

Thanks!
Sagar.

On Mon, Sep 12, 2022 at 1:53 PM Bruno Cadonna 

wrote:



Hi Sagar,

Thanks for the update and the PR!

+1 (binding)

Best,
Bruno

On 10.09.22 18:57, Sagar wrote:

Hi Bruno,

Thanks, I think these changes make sense to me. I have updated the

KIP

accordingly.

Thanks!
Sagar.

On Wed, Sep 7, 2022 at 2:16 PM Bruno Cadonna 

wrote:



Hi Sagar,

I would not drop the support for dropping records. I would also not
return null from partitions(). Maybe an Optional can help here. An

empty

Optional would mean to use the default partitioning behavior of the
producer. So we would have:

- non-empty Optional, non-empty list of integers: partitions to send

the

record to
- non-empty Optional, empty list of integers: drop the record
- empty Optional: use default behavior

What do other think?

Best,
Bruno

On 02.09.22 13:53, Sagar wrote:

Hello Bruno/Chris,

Since these are the last set of changes(I am assuming haha), it

would

be

great if you could review the 2 options from above so that we can

close

the

voting. Of course I am happy to incorporate any other requisite

changes.


Thanks!
Sagar.

On Wed, Aug 31, 2022 at 10:07 PM Sagar 

wrote:



Thanks Bruno for the great points.

I see 2 options here =>

1) As Chris suggested, drop the support for dropping records in

the

partitioner. That way, an empty list could signify the usage of a

default

partitioner. Also, if the deprecated partition() method returns

null

thereby signifying the default partitioner, the partitions() can

return

an

empty list i.e default partitioner.

2) OR we treat a null return type of partitions() method to

signify

the

usage of the default partitioner. In the default implementation of
partitions() method, if partition() returns null, then even

partitions()

can return null(instead of an empty list). The RecordCollectorImpl

code

can

also be modified accordingly. @Chris, to your point, we can even

drop

the

support of dropping of records. It came up during KIP discussion,

and I

thought it might be a useful feature. Let me know what you think.

3) Lastly about the partition number 

Build failed in Jenkins: Kafka » Kafka Branch Builder » 3.4 #5

2022-12-08 Thread Apache Jenkins Server
See 


Changes:


--
[...truncated 521506 lines...]
[2022-12-08T23:23:47.001Z] 
[2022-12-08T23:23:47.001Z] Gradle Test Run :core:integrationTest > Gradle Test 
Executor 163 > LogOffsetTest > testGetOffsetsForUnknownTopic(String) > 
kafka.server.LogOffsetTest.testGetOffsetsForUnknownTopic(String)[1] STARTED
[2022-12-08T23:23:48.002Z] 
[2022-12-08T23:23:48.002Z] Gradle Test Run :core:integrationTest > Gradle Test 
Executor 163 > LogOffsetTest > testGetOffsetsForUnknownTopic(String) > 
kafka.server.LogOffsetTest.testGetOffsetsForUnknownTopic(String)[1] PASSED
[2022-12-08T23:23:48.002Z] 
[2022-12-08T23:23:48.002Z] Gradle Test Run :core:integrationTest > Gradle Test 
Executor 163 > LogOffsetTest > testGetOffsetsForUnknownTopic(String) > 
kafka.server.LogOffsetTest.testGetOffsetsForUnknownTopic(String)[2] STARTED
[2022-12-08T23:23:52.649Z] 
[2022-12-08T23:23:52.649Z] Gradle Test Run :core:integrationTest > Gradle Test 
Executor 163 > LogOffsetTest > testGetOffsetsForUnknownTopic(String) > 
kafka.server.LogOffsetTest.testGetOffsetsForUnknownTopic(String)[2] PASSED
[2022-12-08T23:23:52.649Z] 
[2022-12-08T23:23:52.649Z] Gradle Test Run :core:integrationTest > Gradle Test 
Executor 163 > LogOffsetTest > 
testFetchOffsetByTimestampForMaxTimestampWithEmptyLog(String) > 
kafka.server.LogOffsetTest.testFetchOffsetByTimestampForMaxTimestampWithEmptyLog(String)[1]
 STARTED
[2022-12-08T23:23:54.621Z] 
[2022-12-08T23:23:54.621Z] Gradle Test Run :core:integrationTest > Gradle Test 
Executor 163 > LogOffsetTest > 
testFetchOffsetByTimestampForMaxTimestampWithEmptyLog(String) > 
kafka.server.LogOffsetTest.testFetchOffsetByTimestampForMaxTimestampWithEmptyLog(String)[1]
 PASSED
[2022-12-08T23:23:54.621Z] 
[2022-12-08T23:23:54.621Z] Gradle Test Run :core:integrationTest > Gradle Test 
Executor 163 > LogOffsetTest > 
testFetchOffsetByTimestampForMaxTimestampWithEmptyLog(String) > 
kafka.server.LogOffsetTest.testFetchOffsetByTimestampForMaxTimestampWithEmptyLog(String)[2]
 STARTED
[2022-12-08T23:23:58.307Z] 
[2022-12-08T23:23:58.307Z] Gradle Test Run :core:integrationTest > Gradle Test 
Executor 163 > LogOffsetTest > 
testFetchOffsetByTimestampForMaxTimestampWithEmptyLog(String) > 
kafka.server.LogOffsetTest.testFetchOffsetByTimestampForMaxTimestampWithEmptyLog(String)[2]
 PASSED
[2022-12-08T23:23:58.307Z] 
[2022-12-08T23:23:58.307Z] Gradle Test Run :core:integrationTest > Gradle Test 
Executor 163 > LogOffsetTest > testEmptyLogsGetOffsets(String) > 
kafka.server.LogOffsetTest.testEmptyLogsGetOffsets(String)[1] STARTED
[2022-12-08T23:24:01.352Z] 
[2022-12-08T23:24:01.352Z] Gradle Test Run :core:integrationTest > Gradle Test 
Executor 163 > LogOffsetTest > testEmptyLogsGetOffsets(String) > 
kafka.server.LogOffsetTest.testEmptyLogsGetOffsets(String)[1] PASSED
[2022-12-08T23:24:01.352Z] 
[2022-12-08T23:24:01.352Z] Gradle Test Run :core:integrationTest > Gradle Test 
Executor 163 > LogOffsetTest > testEmptyLogsGetOffsets(String) > 
kafka.server.LogOffsetTest.testEmptyLogsGetOffsets(String)[2] STARTED
[2022-12-08T23:24:05.038Z] 
[2022-12-08T23:24:05.038Z] Gradle Test Run :core:integrationTest > Gradle Test 
Executor 163 > LogOffsetTest > testEmptyLogsGetOffsets(String) > 
kafka.server.LogOffsetTest.testEmptyLogsGetOffsets(String)[2] PASSED
[2022-12-08T23:24:05.038Z] 
[2022-12-08T23:24:05.038Z] Gradle Test Run :core:integrationTest > Gradle Test 
Executor 163 > LogOffsetTest > 
testFetchOffsetsBeforeWithChangingSegments(String) > 
kafka.server.LogOffsetTest.testFetchOffsetsBeforeWithChangingSegments(String)[1]
 STARTED
[2022-12-08T23:24:06.960Z] 
[2022-12-08T23:24:06.960Z] Gradle Test Run :core:integrationTest > Gradle Test 
Executor 163 > LogOffsetTest > 
testFetchOffsetsBeforeWithChangingSegments(String) > 
kafka.server.LogOffsetTest.testFetchOffsetsBeforeWithChangingSegments(String)[1]
 PASSED
[2022-12-08T23:24:06.960Z] 
[2022-12-08T23:24:06.960Z] Gradle Test Run :core:integrationTest > Gradle Test 
Executor 163 > LogOffsetTest > 
testFetchOffsetsBeforeWithChangingSegments(String) > 
kafka.server.LogOffsetTest.testFetchOffsetsBeforeWithChangingSegments(String)[2]
 STARTED
[2022-12-08T23:24:12.069Z] 
[2022-12-08T23:24:12.069Z] Gradle Test Run :core:integrationTest > Gradle Test 
Executor 163 > LogOffsetTest > 
testFetchOffsetsBeforeWithChangingSegments(String) > 
kafka.server.LogOffsetTest.testFetchOffsetsBeforeWithChangingSegments(String)[2]
 PASSED
[2022-12-08T23:24:12.069Z] 
[2022-12-08T23:24:12.069Z] Gradle Test Run :core:integrationTest > Gradle Test 
Executor 163 > LogOffsetTest > testGetOffsetsBeforeLatestTime(String) > 
kafka.server.LogOffsetTest.testGetOffsetsBeforeLatestTime(String)[1] STARTED
[2022-12-08T23:24:16.750Z] 
[2022-12-08T23:24:16.750Z] Gradle Test Run :core:integrationTest > Gradle Test 
Executor 163 > LogOffsetTest > testGetOffsetsBeforeLatestTime(String) > 

Build failed in Jenkins: Kafka » Kafka Branch Builder » 3.0 #214

2022-12-08 Thread Apache Jenkins Server
See 


Changes:


--
[...truncated 355232 lines...]
[2022-12-08T23:18:58.448Z] [INFO] --- maven-remote-resources-plugin:1.5:process 
(process-resource-bundles) @ streams-quickstart ---
[2022-12-08T23:18:59.489Z] [INFO] 
[2022-12-08T23:18:59.489Z] [INFO] --- maven-site-plugin:3.5.1:attach-descriptor 
(attach-descriptor) @ streams-quickstart ---
[2022-12-08T23:19:00.406Z] [INFO] 
[2022-12-08T23:19:00.406Z] [INFO] --- maven-gpg-plugin:1.6:sign 
(sign-artifacts) @ streams-quickstart ---
[2022-12-08T23:19:00.406Z] [INFO] 
[2022-12-08T23:19:00.406Z] [INFO] --- maven-install-plugin:2.5.2:install 
(default-install) @ streams-quickstart ---
[2022-12-08T23:19:00.406Z] [INFO] Installing 
/home/jenkins/workspace/Kafka_kafka_3.0/streams/quickstart/pom.xml to 
/home/jenkins/.m2/repository/org/apache/kafka/streams-quickstart/3.0.3-SNAPSHOT/streams-quickstart-3.0.3-SNAPSHOT.pom
[2022-12-08T23:19:00.406Z] [INFO] 
[2022-12-08T23:19:00.406Z] [INFO] --< 
org.apache.kafka:streams-quickstart-java >--
[2022-12-08T23:19:00.406Z] [INFO] Building streams-quickstart-java 
3.0.3-SNAPSHOT[2/2]
[2022-12-08T23:19:00.406Z] [INFO] --[ maven-archetype 
]---
[2022-12-08T23:19:01.404Z] [INFO] 
[2022-12-08T23:19:01.404Z] [INFO] --- maven-clean-plugin:3.0.0:clean 
(default-clean) @ streams-quickstart-java ---
[2022-12-08T23:19:01.404Z] [INFO] 
[2022-12-08T23:19:01.404Z] [INFO] --- maven-remote-resources-plugin:1.5:process 
(process-resource-bundles) @ streams-quickstart-java ---
[2022-12-08T23:19:01.404Z] [INFO] 
[2022-12-08T23:19:01.404Z] [INFO] --- maven-resources-plugin:2.7:resources 
(default-resources) @ streams-quickstart-java ---
[2022-12-08T23:19:01.404Z] [INFO] Using 'UTF-8' encoding to copy filtered 
resources.
[2022-12-08T23:19:01.404Z] [INFO] Copying 6 resources
[2022-12-08T23:19:01.404Z] [INFO] Copying 3 resources
[2022-12-08T23:19:01.404Z] [INFO] 
[2022-12-08T23:19:01.404Z] [INFO] --- maven-resources-plugin:2.7:testResources 
(default-testResources) @ streams-quickstart-java ---
[2022-12-08T23:19:01.404Z] [INFO] Using 'UTF-8' encoding to copy filtered 
resources.
[2022-12-08T23:19:01.404Z] [INFO] Copying 2 resources
[2022-12-08T23:19:01.404Z] [INFO] Copying 3 resources
[2022-12-08T23:19:01.404Z] [INFO] 
[2022-12-08T23:19:01.404Z] [INFO] --- maven-archetype-plugin:2.2:jar 
(default-jar) @ streams-quickstart-java ---
[2022-12-08T23:19:02.651Z] [INFO] Building archetype jar: 
/home/jenkins/workspace/Kafka_kafka_3.0/streams/quickstart/java/target/streams-quickstart-java-3.0.3-SNAPSHOT
[2022-12-08T23:19:02.651Z] [INFO] 
[2022-12-08T23:19:02.651Z] [INFO] --- maven-site-plugin:3.5.1:attach-descriptor 
(attach-descriptor) @ streams-quickstart-java ---
[2022-12-08T23:19:02.651Z] [INFO] 
[2022-12-08T23:19:02.651Z] [INFO] --- 
maven-archetype-plugin:2.2:integration-test (default-integration-test) @ 
streams-quickstart-java ---
[2022-12-08T23:19:02.651Z] [INFO] 
[2022-12-08T23:19:02.651Z] [INFO] --- maven-gpg-plugin:1.6:sign 
(sign-artifacts) @ streams-quickstart-java ---
[2022-12-08T23:19:02.651Z] [INFO] 
[2022-12-08T23:19:02.651Z] [INFO] --- maven-install-plugin:2.5.2:install 
(default-install) @ streams-quickstart-java ---
[2022-12-08T23:19:02.651Z] [INFO] Installing 
/home/jenkins/workspace/Kafka_kafka_3.0/streams/quickstart/java/target/streams-quickstart-java-3.0.3-SNAPSHOT.jar
 to 
/home/jenkins/.m2/repository/org/apache/kafka/streams-quickstart-java/3.0.3-SNAPSHOT/streams-quickstart-java-3.0.3-SNAPSHOT.jar
[2022-12-08T23:19:02.651Z] [INFO] Installing 
/home/jenkins/workspace/Kafka_kafka_3.0/streams/quickstart/java/pom.xml to 
/home/jenkins/.m2/repository/org/apache/kafka/streams-quickstart-java/3.0.3-SNAPSHOT/streams-quickstart-java-3.0.3-SNAPSHOT.pom
[2022-12-08T23:19:02.651Z] [INFO] 
[2022-12-08T23:19:02.651Z] [INFO] --- 
maven-archetype-plugin:2.2:update-local-catalog (default-update-local-catalog) 
@ streams-quickstart-java ---
[2022-12-08T23:19:02.651Z] [INFO] 

[2022-12-08T23:19:02.651Z] [INFO] Reactor Summary for Kafka Streams :: 
Quickstart 3.0.3-SNAPSHOT:
[2022-12-08T23:19:02.651Z] [INFO] 
[2022-12-08T23:19:02.651Z] [INFO] Kafka Streams :: Quickstart 
 SUCCESS [  3.147 s]
[2022-12-08T23:19:02.651Z] [INFO] streams-quickstart-java 
 SUCCESS [  2.051 s]
[2022-12-08T23:19:02.651Z] [INFO] 

[2022-12-08T23:19:02.651Z] [INFO] BUILD SUCCESS
[2022-12-08T23:19:02.651Z] [INFO] 

[2022-12-08T23:19:02.651Z] [INFO] Total time:  5.549 s
[2022-12-08T23:19:02.651Z] [INFO] Finished at: 2022-12-08T23:19:02Z
[2022-12-08T23:19:02.651Z] [INFO] 

Re: [VOTE] KIP-884: Add config to configure KafkaClientSupplier in Kafka Streams

2022-12-08 Thread Matthias J. Sax

Thanks Hao. I think it makes sense to extend `StreamsConfig` as proposed.

-Matthias

On 12/7/22 10:26 AM, Hao Li wrote:

Hi all,

I updated the KIP to add a `getKafkaClientSupplier` method in
`StreamsConfig`. Let me know if you have any concerns.

Thanks,
Hao

On Wed, Nov 30, 2022 at 10:26 AM Hao Li  wrote:


Hi all,

Thanks for the vote. The vote passed with 4 binding votes (John, Matthias,
Sophie and Bruno).

I'll update KIP and submit a PR for this.

Thanks,
Hao

On Tue, Nov 22, 2022 at 11:08 PM Bruno Cadonna  wrote:


Hi Hao,

Thanks for the KIP!

+1 (binding)

Best,
Bruno

On 22.11.22 10:08, Sophie Blee-Goldman wrote:

Hey Hao, thanks for the KIP -- I'm +1 (binding)

On Mon, Nov 21, 2022 at 12:57 PM Matthias J. Sax 

wrote:



+1 (binding)

On 11/21/22 7:39 AM, John Roesler wrote:

I'm +1 (binding)

Thanks for the KIP!
-John

On 2022/11/17 21:06:29 Hao Li wrote:

Hi all,

I would like start a vote on KIP-884:





https://cwiki.apache.org/confluence/display/KAFKA/KIP-884%3A+Add+config+to+configure+KafkaClientSupplier+in+Kafka+Streams



Thanks,
Hao










--
Thanks,
Hao






Build failed in Jenkins: Kafka » Kafka Branch Builder » trunk #1417

2022-12-08 Thread Apache Jenkins Server
See 


Changes:


--
[...truncated 521006 lines...]
[2022-12-08T21:51:11.606Z] Gradle Test Run :core:integrationTest > Gradle Test 
Executor 163 > StopReplicaRequestTest > testStopReplicaRequest() PASSED
[2022-12-08T21:51:11.606Z] 
[2022-12-08T21:51:11.606Z] Gradle Test Run :core:integrationTest > Gradle Test 
Executor 163 > TopicIdWithOldInterBrokerProtocolTest > 
testFetchTopicIdsWithOldIBPWrongFetchVersion() STARTED
[2022-12-08T21:51:13.798Z] 
[2022-12-08T21:51:13.798Z] Gradle Test Run :core:integrationTest > Gradle Test 
Executor 163 > TopicIdWithOldInterBrokerProtocolTest > 
testFetchTopicIdsWithOldIBPWrongFetchVersion() PASSED
[2022-12-08T21:51:13.798Z] 
[2022-12-08T21:51:13.798Z] Gradle Test Run :core:integrationTest > Gradle Test 
Executor 163 > TopicIdWithOldInterBrokerProtocolTest > 
testDeleteTopicsWithOldIBP() STARTED
[2022-12-08T21:51:16.762Z] 
[2022-12-08T21:51:16.762Z] Gradle Test Run :core:integrationTest > Gradle Test 
Executor 163 > TopicIdWithOldInterBrokerProtocolTest > 
testDeleteTopicsWithOldIBP() PASSED
[2022-12-08T21:51:16.762Z] 
[2022-12-08T21:51:16.762Z] Gradle Test Run :core:integrationTest > Gradle Test 
Executor 163 > TopicIdWithOldInterBrokerProtocolTest > 
testFetchTopicIdsWithOldIBPCorrectFetchVersion() STARTED
[2022-12-08T21:51:18.840Z] 
[2022-12-08T21:51:18.840Z] Gradle Test Run :core:integrationTest > Gradle Test 
Executor 163 > TopicIdWithOldInterBrokerProtocolTest > 
testFetchTopicIdsWithOldIBPCorrectFetchVersion() PASSED
[2022-12-08T21:51:18.840Z] 
[2022-12-08T21:51:18.840Z] Gradle Test Run :core:integrationTest > Gradle Test 
Executor 163 > TopicIdWithOldInterBrokerProtocolTest > 
testMetadataTopicIdsWithOldIBP() STARTED
[2022-12-08T21:51:20.818Z] 
[2022-12-08T21:51:20.818Z] Gradle Test Run :core:integrationTest > Gradle Test 
Executor 163 > TopicIdWithOldInterBrokerProtocolTest > 
testMetadataTopicIdsWithOldIBP() PASSED
[2022-12-08T21:51:20.818Z] 
[2022-12-08T21:51:20.818Z] Gradle Test Run :core:integrationTest > Gradle Test 
Executor 163 > TopicIdWithOldInterBrokerProtocolTest > 
testDeleteTopicsWithOldIBPUsingIDs() STARTED
[2022-12-08T21:51:24.073Z] 
[2022-12-08T21:51:24.073Z] Gradle Test Run :core:integrationTest > Gradle Test 
Executor 163 > TopicIdWithOldInterBrokerProtocolTest > 
testDeleteTopicsWithOldIBPUsingIDs() PASSED
[2022-12-08T21:51:24.073Z] 
[2022-12-08T21:51:24.073Z] Gradle Test Run :core:integrationTest > Gradle Test 
Executor 163 > EpochDrivenReplicationProtocolAcceptanceWithIbp26Test > 
shouldSurviveFastLeaderChange() STARTED
[2022-12-08T21:51:42.419Z] 
[2022-12-08T21:51:42.419Z] Gradle Test Run :core:integrationTest > Gradle Test 
Executor 163 > EpochDrivenReplicationProtocolAcceptanceWithIbp26Test > 
shouldSurviveFastLeaderChange() PASSED
[2022-12-08T21:51:42.419Z] 
[2022-12-08T21:51:42.419Z] Gradle Test Run :core:integrationTest > Gradle Test 
Executor 163 > EpochDrivenReplicationProtocolAcceptanceWithIbp26Test > 
offsetsShouldNotGoBackwards() STARTED
[2022-12-08T21:52:52.899Z] 
[2022-12-08T21:52:52.899Z] Gradle Test Run :core:integrationTest > Gradle Test 
Executor 163 > EpochDrivenReplicationProtocolAcceptanceWithIbp26Test > 
offsetsShouldNotGoBackwards() PASSED
[2022-12-08T21:52:52.899Z] 
[2022-12-08T21:52:52.899Z] Gradle Test Run :core:integrationTest > Gradle Test 
Executor 163 > EpochDrivenReplicationProtocolAcceptanceWithIbp26Test > 
shouldFollowLeaderEpochBasicWorkflow() STARTED
[2022-12-08T21:52:56.244Z] 
[2022-12-08T21:52:56.244Z] Gradle Test Run :core:integrationTest > Gradle Test 
Executor 163 > EpochDrivenReplicationProtocolAcceptanceWithIbp26Test > 
shouldFollowLeaderEpochBasicWorkflow() PASSED
[2022-12-08T21:52:56.244Z] 
[2022-12-08T21:52:56.244Z] Gradle Test Run :core:integrationTest > Gradle Test 
Executor 163 > EpochDrivenReplicationProtocolAcceptanceWithIbp26Test > 
shouldNotAllowDivergentLogs() STARTED
[2022-12-08T21:53:07.530Z] 
[2022-12-08T21:53:07.530Z] Gradle Test Run :core:integrationTest > Gradle Test 
Executor 163 > EpochDrivenReplicationProtocolAcceptanceWithIbp26Test > 
shouldNotAllowDivergentLogs() PASSED
[2022-12-08T21:53:07.530Z] 
[2022-12-08T21:53:07.530Z] Gradle Test Run :core:integrationTest > Gradle Test 
Executor 163 > EpochDrivenReplicationProtocolAcceptanceWithIbp26Test > 
logsShouldNotDivergeOnUncleanLeaderElections() STARTED
[2022-12-08T21:53:17.419Z] 
[2022-12-08T21:53:17.419Z] Gradle Test Run :core:integrationTest > Gradle Test 
Executor 163 > EpochDrivenReplicationProtocolAcceptanceWithIbp26Test > 
logsShouldNotDivergeOnUncleanLeaderElections() PASSED
[2022-12-08T21:53:17.419Z] 
[2022-12-08T21:53:17.419Z] Gradle Test Run :core:integrationTest > Gradle Test 
Executor 163 > ReplicationUtilsTest > testUpdateLeaderAndIsr() STARTED
[2022-12-08T21:53:17.419Z] 
[2022-12-08T21:53:17.419Z] Gradle Test Run :core:integrationTest > Gradle Test 
Executor 163 > ReplicationUtilsTest > 

Build failed in Jenkins: Kafka » Kafka Branch Builder » 3.2 #93

2022-12-08 Thread Apache Jenkins Server
See 


Changes:


--
[...truncated 456642 lines...]
[2022-12-08T21:08:00.656Z] > Task 
:connect:api:generateMetadataFileForMavenJavaPublication
[2022-12-08T21:08:00.656Z] > Task :connect:json:jar UP-TO-DATE
[2022-12-08T21:08:00.656Z] > Task 
:connect:json:generateMetadataFileForMavenJavaPublication
[2022-12-08T21:08:00.656Z] > Task 
:connect:json:publishMavenJavaPublicationToMavenLocal
[2022-12-08T21:08:00.656Z] > Task :connect:json:publishToMavenLocal
[2022-12-08T21:08:00.656Z] > Task :connect:api:javadocJar
[2022-12-08T21:08:00.656Z] > Task :connect:api:compileTestJava UP-TO-DATE
[2022-12-08T21:08:00.656Z] > Task :connect:api:testClasses UP-TO-DATE
[2022-12-08T21:08:00.656Z] > Task :connect:api:testJar
[2022-12-08T21:08:00.656Z] > Task :connect:api:testSrcJar
[2022-12-08T21:08:00.656Z] > Task 
:connect:api:publishMavenJavaPublicationToMavenLocal
[2022-12-08T21:08:00.656Z] > Task :connect:api:publishToMavenLocal
[2022-12-08T21:08:04.356Z] 
[2022-12-08T21:08:04.356Z] > Task :streams:javadoc
[2022-12-08T21:08:04.356Z] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka_3.2/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java:58:
 warning - Tag @link: missing '#': "org.apache.kafka.streams.StreamsBuilder()"
[2022-12-08T21:08:04.356Z] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka_3.2/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java:58:
 warning - Tag @link: can't find org.apache.kafka.streams.StreamsBuilder() in 
org.apache.kafka.streams.TopologyConfig
[2022-12-08T21:08:04.356Z] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka_3.2/streams/src/main/java/org/apache/kafka/streams/query/Position.java:44:
 warning - Tag @link: can't find query(Query,
[2022-12-08T21:08:04.356Z]  PositionBound, boolean) in 
org.apache.kafka.streams.processor.StateStore
[2022-12-08T21:08:04.356Z] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka_3.2/streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java:44:
 warning - Tag @link: can't find query(Query, PositionBound, boolean) in 
org.apache.kafka.streams.processor.StateStore
[2022-12-08T21:08:04.356Z] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka_3.2/streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java:36:
 warning - Tag @link: can't find query(Query, PositionBound, boolean) in 
org.apache.kafka.streams.processor.StateStore
[2022-12-08T21:08:04.356Z] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka_3.2/streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java:57:
 warning - Tag @link: can't find query(Query, PositionBound, boolean) in 
org.apache.kafka.streams.processor.StateStore
[2022-12-08T21:08:04.356Z] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka_3.2/streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java:74:
 warning - Tag @link: can't find query(Query, PositionBound, boolean) in 
org.apache.kafka.streams.processor.StateStore
[2022-12-08T21:08:04.356Z] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka_3.2/streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java:110:
 warning - Tag @link: reference not found: this#getResult()
[2022-12-08T21:08:04.356Z] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka_3.2/streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java:117:
 warning - Tag @link: reference not found: this#getFailureReason()
[2022-12-08T21:08:04.356Z] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka_3.2/streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java:117:
 warning - Tag @link: reference not found: this#getFailureMessage()
[2022-12-08T21:08:04.356Z] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka_3.2/streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java:155:
 warning - Tag @link: reference not found: this#isSuccess()
[2022-12-08T21:08:04.356Z] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka_3.2/streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java:155:
 warning - Tag @link: reference not found: this#isFailure()
[2022-12-08T21:08:05.277Z] 12 warnings
[2022-12-08T21:08:05.277Z] 
[2022-12-08T21:08:05.277Z] > Task :streams:javadocJar
[2022-12-08T21:08:05.277Z] > Task :streams:processTestResources UP-TO-DATE
[2022-12-08T21:08:06.280Z] 
[2022-12-08T21:08:06.280Z] > Task :clients:javadoc
[2022-12-08T21:08:06.280Z] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka_3.2/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/OAuthBearerLoginCallbackHandler.java:147:
 warning - Tag @link: reference not found: 
[2022-12-08T21:08:07.200Z] 1 warning
[2022-12-08T21:08:08.121Z] 
[2022-12-08T21:08:08.121Z] > Task :clients:javadocJar
[2022-12-08T21:08:09.208Z] 
[2022-12-08T21:08:09.208Z] > Task :clients:srcJar
[2022-12-08T21:08:09.208Z] Execution optimizations have been disabled for task 
':clients:srcJar' to ensure correctness due to the following reasons:

Jenkins build is unstable: Kafka » Kafka Branch Builder » 3.3 #131

2022-12-08 Thread Apache Jenkins Server
See 




Re: [Streams] GlobalStateManagerImpl#restoreState locally restores previously deleted records

2022-12-08 Thread Patrick D’Addona
Hi,

I don't think this issue is exactly the same as KAFKA-7663.

The way I see it, KAFKA-7663 says, "a global store will be exactly the input 
topic after restore, regardless of the processor"
My issue here, is that the global store after restore is inconsistent with the 
input topic and the store itself.
Because it finds records with key "foo" using **store.all()** that it can not 
find via **store.get("foo")**.
The **store.get()** is consistent with my input topic, where the tombstone is 
the latest entry for the key "foo", reflecting the **delete("foo")** operation 
on the store.
But still, looping over the store returns a record with "foo" as a key and a 
non null value.

If the store acts like a Map, where you can call **get(k)** and **put(k, v)**, 
then looping over it should only find entries, that actually exist and have a 
value when using **get(k)**.
Restoring something that breaks this connection seems wrong, even if that 
restoring ignores the processor and directly writes to the store.
It should remove keys, for which the last entry is a tombstone from the 
**all()** iterator, regardless whether the restore process uses a custom 
processor as KAFKA-7663 wants, or simply reads the topic as it currently does.

Kind Regards,
Patrick


From: Colt McNealy 
Sent: Thursday, December 8, 2022 17:54
To: patrick.dadd...@maibornwolff.de.invalid 

Cc: dev@kafka.apache.org 
Subject: Re: [Streams] GlobalStateManagerImpl#restoreState locally restores 
previously deleted records

Hi Patrick,

Your issue is in fact identical to KAFKA-7663. As per that
issue/bug/discussion, if your processor does anything other than simply
pass-through records, the results of initial processing vs restoration are
different.

Global State Stores don't have a changelog topic (for example, in the
processor API, Global State Stores are only valid if the builder has
.withLoggingDisabled()). That's because the processor for the global store
runs on each of your N streams instances, and if the processor on each
instance published to the changelog, then each put/delete would be written
N times, which is wasteful.

The implications to this are that your input topic should be "like" a
changelog:
- Your input topic should NOT have limited retention otherwise you'll lose
old data.
- Your input topic should ideally be compacted if possible

I agree that the API as it stands is highly confusing—why allow users to
provide a processor if it offers a way to "shoot oneself in one's foot?"

Changing that API would probably require a KIP. I don't quite have the
bandwidth to propose + implement such a KIP right now, but if you would
like to, feel free! (perhaps in the spring I may have time)

Your workaround (the init() method) is a good one. Another way to do it
might be to simply have a regular processing step which converts the input
topic into the true "changelog" format before you push it to a global store.

Cheers,
Colt McNealy
*Founder, LittleHorse.io*


On Thu, Dec 8, 2022 at 8:41 AM Patrick D’Addona
 wrote:

> Hello,
>
> I have a quarkus application using
> **org.apache.kafka:kafka-streams:3.1.0** and found that
> * when creating a global table using a compacted topic as input
> * entries that have been deleted at some point
> * are then no longer returned when iterating over the store with
> **store.all()** - as expected
> * but after the pod restarts and its kafka streams state directory is
> deleted, after restoring from the topic using
> **org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl#restoreState**
> * those formerly deleted records are once again returned by that store
> when using **store.all()** - not expected
> * however they return null, using **store.get("foo")** - as expected
>
> This is somewhat similar to
> https://issues.apache.org/jira/browse/KAFKA-7663, in that I would like to
> be able to modify this restore behaviour.
> However it is also different, because I think it is not documented
> anywhere and it is unintuitive (to me) - since it changes how the
> application behaves after restarting it even if the kafka cluster itself
> was not changed - so I think it's more of a bug than missing documentation.
>
> Some more information, the topic is configured like this
> ```java
> cleanup.policy: compact
> compression.type: producer
> delete.retention.ms: 8640
> max.compaction.lag.ms: 9223372036854776000
> min.compaction.lag.ms: 0
> retention.bytes: -1
> retention.ms: 8640
> ```
>
> I am adding the global store like so
> ```java
> streamsBuilder.addGlobalStore(
> Stores.timestampedKeyValueStoreBuilder(
> Stores.persistentTimestampedKeyValueStore("foobar"),
> Serdes.String(),
> Serdes.String()),
> "foo.bar.globaltopic",
> Consumed.with(Serdes.String(), Serdes.String()),
> () -> new FooBarUpdateHandler(timeService)
> );
> ```
>
> and here is the definition of 'FooBarUpdateHandler'
> 

Re: [VOTE] KIP-893: The Kafka protocol should support nullable structs

2022-12-08 Thread David Jacot
The KIP is accepted with 3 binding votes (Colin, John, Ziming) and 1
non-binding vote (Kirk).

Thank you all!

Best,
David

On Tue, Dec 6, 2022 at 3:59 AM deng ziming  wrote:
>
> +1 (binding)
>
> --
> Thanks,
> Ziming
>
>
> > On Dec 6, 2022, at 10:48, John Roesler  wrote:
> >
> > +1 (binding)
> >
> > Thanks,
> > -John
> >
> > On Mon, Dec 5, 2022, at 16:57, Kirk True wrote:
> >> +1 (non-binding)
> >>
> >> On Mon, Dec 5, 2022, at 10:05 AM, Colin McCabe wrote:
> >>> +1 (binding)
> >>>
> >>> best,
> >>> Colin
> >>>
> >>> On Mon, Dec 5, 2022, at 10:03, David Jacot wrote:
>  Hi all,
> 
>  As this KIP-893 is trivial and non-controversial, I would like to
>  start the vote on it. The KIP is here:
>  https://cwiki.apache.org/confluence/x/YJIODg
> 
>  Thanks,
>  David
> >>>
>


[jira] [Resolved] (KAFKA-14417) Producer doesn't handle REQUEST_TIMED_OUT for InitProducerIdRequest, treats as fatal error

2022-12-08 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-14417.
-
Fix Version/s: 4.0.0
   3.3.2
   Resolution: Fixed

> Producer doesn't handle REQUEST_TIMED_OUT for InitProducerIdRequest, treats 
> as fatal error
> --
>
> Key: KAFKA-14417
> URL: https://issues.apache.org/jira/browse/KAFKA-14417
> Project: Kafka
>  Issue Type: Task
>Affects Versions: 3.1.0, 3.0.0, 3.2.0, 3.3.0, 3.4.0
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Blocker
> Fix For: 4.0.0, 3.3.2
>
>
> In TransactionManager we have a handler for InitProducerIdRequests 
> [https://github.com/apache/kafka/blob/19286449ee20f85cc81860e13df14467d4ce287c/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#LL1276C14-L1276C14]
> However, we have the potential to return a REQUEST_TIMED_OUT error in 
> RPCProducerIdManager when the BrokerToControllerChannel manager times out: 
> [https://github.com/apache/kafka/blob/19286449ee20f85cc81860e13df14467d4ce287c/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala#L236]
>  
> or when the poll returns null: 
> [https://github.com/apache/kafka/blob/19286449ee20f85cc81860e13df14467d4ce287c/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala#L170]
> Since REQUEST_TIMED_OUT is not handled by the producer, we treat it as a 
> fatal error and the producer fails. With the default of idempotent producers, 
> this can cause more issues.
> See this stack trace from 3.0:
> {code:java}
> ERROR [Producer clientId=console-producer] Aborting producer batches due to 
> fatal error (org.apache.kafka.clients.producer.internals.Sender)
> org.apache.kafka.common.KafkaException: Unexpected error in 
> InitProducerIdResponse; The request timed out.
>   at 
> org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1390)
>   at 
> org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1294)
>   at 
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
>   at 
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:658)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:650)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:418)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:316)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:256)
>   at java.lang.Thread.run(Thread.java:748)
> {code}
> Seems like the commit that introduced the changes was this one: 
> [https://github.com/apache/kafka/commit/72d108274c98dca44514007254552481c731c958]
>  so we are vulnerable when the server code is ibp 3.0 and beyond.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [Streams] GlobalStateManagerImpl#restoreState locally restores previously deleted records

2022-12-08 Thread Colt McNealy
Hi Patrick,

Your issue is in fact identical to KAFKA-7663. As per that
issue/bug/discussion, if your processor does anything other than simply
pass-through records, the results of initial processing vs restoration are
different.

Global State Stores don't have a changelog topic (for example, in the
processor API, Global State Stores are only valid if the builder has
.withLoggingDisabled()). That's because the processor for the global store
runs on each of your N streams instances, and if the processor on each
instance published to the changelog, then each put/delete would be written
N times, which is wasteful.

The implications to this are that your input topic should be "like" a
changelog:
- Your input topic should NOT have limited retention otherwise you'll lose
old data.
- Your input topic should ideally be compacted if possible

I agree that the API as it stands is highly confusing—why allow users to
provide a processor if it offers a way to "shoot oneself in one's foot?"

Changing that API would probably require a KIP. I don't quite have the
bandwidth to propose + implement such a KIP right now, but if you would
like to, feel free! (perhaps in the spring I may have time)

Your workaround (the init() method) is a good one. Another way to do it
might be to simply have a regular processing step which converts the input
topic into the true "changelog" format before you push it to a global store.

Cheers,
Colt McNealy
*Founder, LittleHorse.io*


On Thu, Dec 8, 2022 at 8:41 AM Patrick D’Addona
 wrote:

> Hello,
>
> I have a quarkus application using
> **org.apache.kafka:kafka-streams:3.1.0** and found that
> * when creating a global table using a compacted topic as input
> * entries that have been deleted at some point
> * are then no longer returned when iterating over the store with
> **store.all()** - as expected
> * but after the pod restarts and its kafka streams state directory is
> deleted, after restoring from the topic using
> **org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl#restoreState**
> * those formerly deleted records are once again returned by that store
> when using **store.all()** - not expected
> * however they return null, using **store.get("foo")** - as expected
>
> This is somewhat similar to
> https://issues.apache.org/jira/browse/KAFKA-7663, in that I would like to
> be able to modify this restore behaviour.
> However it is also different, because I think it is not documented
> anywhere and it is unintuitive (to me) - since it changes how the
> application behaves after restarting it even if the kafka cluster itself
> was not changed - so I think it's more of a bug than missing documentation.
>
> Some more information, the topic is configured like this
> ```java
> cleanup.policy: compact
> compression.type: producer
> delete.retention.ms: 8640
> max.compaction.lag.ms: 9223372036854776000
> min.compaction.lag.ms: 0
> retention.bytes: -1
> retention.ms: 8640
> ```
>
> I am adding the global store like so
> ```java
> streamsBuilder.addGlobalStore(
> Stores.timestampedKeyValueStoreBuilder(
> Stores.persistentTimestampedKeyValueStore("foobar"),
> Serdes.String(),
> Serdes.String()),
> "foo.bar.globaltopic",
> Consumed.with(Serdes.String(), Serdes.String()),
> () -> new FooBarUpdateHandler(timeService)
> );
> ```
>
> and here is the definition of 'FooBarUpdateHandler'
> ```java
> import java.time.Instant;
> import java.util.ArrayList;
> import java.util.List;
> import org.apache.kafka.streams.processor.api.Processor;
> import org.apache.kafka.streams.processor.api.Record;
> import org.apache.kafka.streams.state.KeyValueIterator;
> import org.apache.kafka.streams.state.TimestampedKeyValueStore;
> import org.apache.kafka.streams.state.ValueAndTimestamp;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
>
> /**
>  * Internal class handling partFamily updates.
>  */
> public class FooBarUpdateHandler implements Processor Void, Void> {
>
> private static final Logger logger =
> LoggerFactory.getLogger(FooBarUpdateHandler.class);
> private TimestampedKeyValueStore store;
>
> @Override
> public void init(final
> org.apache.kafka.streams.processor.api.ProcessorContext
> context) {
> store = context.getStateStore("foobar");
> }
>
> @Override
> public void process(final Record record) {
>
> // handle tombstones from input topic
> if (record.value() == null == record.value().equals("deleteme")) {
> store.delete(record.key());
> } else {
> store.put(
> record.key(),
> ValueAndTimestamp.make(
> record.key(),
> Instant.now().toEpochMilli()
> )
> );
> }
>
> // this is not relevant
> // it's only to show the issue when restarting and restoring the
>  

Re: [VOTE] KIP-878: Internal Topic Autoscaling for Kafka Streams

2022-12-08 Thread Bill Bejeck
Thanks for the KIP, Sophie.

+1(binding)

-Bill

On Thu, Dec 8, 2022 at 11:39 AM Bruno Cadonna  wrote:

> Thanks for the KIP!
>
> +1 (binding)
>
> Best,
> Bruno
>
> On 07.12.22 17:24, Matthias J. Sax wrote:
> > +1 (binding)
> >
> > On 12/1/22 9:39 PM, Sophie Blee-Goldman wrote:
> >> Thanks to all who participated for a great discussion on this KIP. Seems
> >> we're ready to kick off the voting on this, but please don't hesitate to
> >> call
> >> out anything of concern or raise questions over on the voting thread.
> >>
> >> Otherwise, please give it a final look over and cast your vote!
> >>
> >> KIP-878: Internal Topic Autoscaling for Kafka Streams
> >> <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-878%3A+Internal+Topic+Autoscaling+for+Kafka+Streams
> >
> >> (note the change in name to reflect the decisions in the KIP discussion)
> >>
> >> Thanks,
> >> Sophie
> >>
>


[Streams] GlobalStateManagerImpl#restoreState locally restores previously deleted records

2022-12-08 Thread Patrick D’Addona
Hello,

I have a quarkus application using **org.apache.kafka:kafka-streams:3.1.0** and 
found that
* when creating a global table using a compacted topic as input
* entries that have been deleted at some point
* are then no longer returned when iterating over the store with 
**store.all()** - as expected
* but after the pod restarts and its kafka streams state directory is deleted, 
after restoring from the topic using 
**org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl#restoreState**
* those formerly deleted records are once again returned by that store when 
using **store.all()** - not expected
* however they return null, using **store.get("foo")** - as expected

This is somewhat similar to https://issues.apache.org/jira/browse/KAFKA-7663, 
in that I would like to be able to modify this restore behaviour.
However it is also different, because I think it is not documented anywhere and 
it is unintuitive (to me) - since it changes how the application behaves after 
restarting it even if the kafka cluster itself was not changed - so I think 
it's more of a bug than missing documentation.

Some more information, the topic is configured like this
```java
cleanup.policy: compact
compression.type: producer
delete.retention.ms: 8640
max.compaction.lag.ms: 9223372036854776000
min.compaction.lag.ms: 0
retention.bytes: -1
retention.ms: 8640
```

I am adding the global store like so
```java
streamsBuilder.addGlobalStore(
Stores.timestampedKeyValueStoreBuilder(
Stores.persistentTimestampedKeyValueStore("foobar"),
Serdes.String(),
Serdes.String()),
"foo.bar.globaltopic",
Consumed.with(Serdes.String(), Serdes.String()),
() -> new FooBarUpdateHandler(timeService)
);
```

and here is the definition of 'FooBarUpdateHandler'
```java
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Internal class handling partFamily updates.
 */
public class FooBarUpdateHandler implements Processor {

private static final Logger logger = 
LoggerFactory.getLogger(FooBarUpdateHandler.class);
private TimestampedKeyValueStore store;

@Override
public void init(final 
org.apache.kafka.streams.processor.api.ProcessorContext context) {
store = context.getStateStore("foobar");
}

@Override
public void process(final Record record) {

// handle tombstones from input topic
if (record.value() == null == record.value().equals("deleteme")) {
store.delete(record.key());
} else {
store.put(
record.key(),
ValueAndTimestamp.make(
record.key(),
Instant.now().toEpochMilli()
)
);
}

// this is not relevant
// it's only to show the issue when restarting and restoring the
final List existingKeys = new ArrayList<>();
try (final KeyValueIterator> all = 
store.all()) {
all.forEachRemaining((r) -> {
existingKeys.add(r.key);
});
}
logger.info("Got {} records in the store, with keys {}", 
existingKeys.size(), String.join(",", existingKeys));
}
}
```

My workaround is to add this to the 'init' method of the 'FooBarUpdateHandler'
```java
try (final KeyValueIterator> all = 
store.all()) {
if (all == null) {
return;
}
logger.info("Removing already deleted records from rocksdb representing the 
global store {}", storeName);
all.forEachRemaining(r -> {
if (r != null && r.key != null && store.get(r.key) == null) {
store.delete(r.key);
}
});
}
```
Now it is again consistent across restarts.

Kind Regards,
Patrick


Patrick D’Addona
Senior Lead IT Architect


Mobile: +49 151 544 22 161
patrick.dadd...@maibornwolff.de
Theresienhöhe 13, 80339 München

MaibornWolff GmbH, Theresienhoehe 13, D-80339 Munich, Germany
www.maibornwolff.de, Phone +49 89 544 253 000
USt-ID DE 129 299 525, Munich District Court HRB 98058
Managing Directors: Volker Maiborn, Holger Wolff, Alexander Hofmann,
Florian Theimer, Marcus Adlwart, Dr. Martina Beck, Christian Loos.





Re: [VOTE] KIP-878: Internal Topic Autoscaling for Kafka Streams

2022-12-08 Thread Bruno Cadonna

Thanks for the KIP!

+1 (binding)

Best,
Bruno

On 07.12.22 17:24, Matthias J. Sax wrote:

+1 (binding)

On 12/1/22 9:39 PM, Sophie Blee-Goldman wrote:

Thanks to all who participated for a great discussion on this KIP. Seems
we're ready to kick off the voting on this, but please don't hesitate to
call
out anything of concern or raise questions over on the voting thread.

Otherwise, please give it a final look over and cast your vote!

KIP-878: Internal Topic Autoscaling for Kafka Streams

(note the change in name to reflect the decisions in the KIP discussion)

Thanks,
Sophie



[jira] [Resolved] (KAFKA-14352) Support rack-aware partition assignment for Kafka consumers

2022-12-08 Thread Rajini Sivaram (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14352?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram resolved KAFKA-14352.

  Reviewer: David Jacot
Resolution: Fixed

This includes protocol changes for rack-aware assignment. Default assignors 
will be made rack-aware under follow-on tickets in the next release.

> Support rack-aware partition assignment for Kafka consumers
> ---
>
> Key: KAFKA-14352
> URL: https://issues.apache.org/jira/browse/KAFKA-14352
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 3.4.0
>
>
> KIP-392 added support for consumers to fetch from the replica in their local 
> rack. To benefit from locality, consumers need to be assigned partitions 
> which have a replica in the same rack. This works well when replication 
> factor is the same as the number of racks, since every rack would then have a 
> replica with rack-aware replica assignment. If the number of racks is higher, 
> some racks may not have replicas of some partitions and hence consumers in 
> these racks will have to fetch from another rack. It will be useful to 
> propagate rack in the subscription metadata for consumers and provide a 
> rack-aware partition assignor for consumers.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14456) Fix AdminUtils startIndex for rack aware partition creations

2022-12-08 Thread Andrew Grant (Jira)
Andrew Grant created KAFKA-14456:


 Summary: Fix AdminUtils startIndex for rack aware partition 
creations 
 Key: KAFKA-14456
 URL: https://issues.apache.org/jira/browse/KAFKA-14456
 Project: Kafka
  Issue Type: Improvement
Reporter: Andrew Grant


When new partitions are added/created we calculate a start index based off all 
the brokers here 
[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/zk/AdminZkClient.scala#L270.]
 That start index is passed through to AdminUtils and is used to find a 
starting position in the list of brokers for making assignments. However, when 
we make rack aware assignments we use that index into a rack alternating list 
here 
[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/AdminUtils.scala#L160.]
 The meaning of the index gets lost: the index into the full list of brokers 
doesnt seem to have the same meaning as the index into a rack alternating list. 

 

I discovered this when I published 
[https://github.com/apache/kafka/pull/12943/files.] In that PR I added a test 
testRackAwarePartitionAssignment which does not work for ZK mode.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-879: Multi-level Rack Awareness

2022-12-08 Thread Andrew Otto
FWIW, the Wikimedia Foundation would find this change really helpful.  We
are going to soon experiment with a stretched Kafka cluster, and it would
be nice to be able to target datacenter AND racks for replica placement.

On Thu, Dec 8, 2022 at 3:37 AM ziming deng  wrote:

> Hi Viktor,
>
> As far as I know, we haven't make ReplicaPlacer a public interface, and we
> have no plan to make it public. I think you can submit a discussion or
> create a JIRA ticket directly without KIP if you have ideas on improving
> it, right?
>
> --
> Best,
> Ziming
>
> > On Nov 29, 2022, at 21:52, Viktor Somogyi-Vass <
> viktor.somo...@cloudera.com.INVALID> wrote:
> >
> > Hi All,
> >
> > I'd like to bump this. I've also updated the KIP to incorporate the new
> > KRaft changes (ReplicaPlacer). Luckily my proposals were quite similar to
> > that, so mostly I've made some minor rewording, naming changes, etc.
> >
> > Again, the brief summary of the KIP:
> > - expose replica placement strategies with a new config
> > - create an admin API and protocol to expose replica placement
> > functionality (mainly for the reassignment tool)
> > - create a new multi-level rack awareness strategy which improves
> > availability on stretch clusters
> >
> > I'm happy for any feedback.
> >
> > Best,
> > Viktor
> >
> > On Fri, Oct 28, 2022 at 4:14 PM Viktor Somogyi-Vass <
> > viktor.somo...@cloudera.com> wrote:
> >
> >> Hey all,
> >>
> >> I'd like to propose a new broker side replica assignment strategy and an
> >> interface that generalizes replica assignment on brokers and makes them
> >> pluggable.
> >>
> >> Briefly, the motivation for the new replica assignment strategy is that
> >> more and more of our customers would want to run their clusters in a
> >> stretched environment, where for instance a cluster is running over
> >> multiple regions (and multiple racks inside a region). Since this seems
> >> like a more common need, we'd like to contribute back our implementation
> >> and also make a generalized interface, so that new strategies that
> people
> >> may come up with could be served better.
> >>
> >> I welcome any feedback on this KIP.
> >>
> >> The link:
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-879%3A+Multi-level+Rack+Awareness
> >>
> >> Best to all,
> >> Viktor
> >>
>
>


[jira] [Created] (KAFKA-14455) Kafka Connect create and update REST APIs should surface failures while writing to the config topic

2022-12-08 Thread Yash Mayya (Jira)
Yash Mayya created KAFKA-14455:
--

 Summary: Kafka Connect create and update REST APIs should surface 
failures while writing to the config topic
 Key: KAFKA-14455
 URL: https://issues.apache.org/jira/browse/KAFKA-14455
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: Yash Mayya
Assignee: Yash Mayya


Kafka Connect's `POST /connectors` and `PUT /connectors/\{connector}/config` 
REST APIs internally simply write a message to the Connect cluster's internal 
config topic (which is then processed asynchronously by the herder). However, 
no callback is passed to the producer's send method and there is no error 
handling in place for producer send failures (see 
[here|https://github.com/apache/kafka/blob/c1a54671e8fc6c7daec5f5ec3d8c934be96b4989/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L716]
 / 
[here|https://github.com/apache/kafka/blob/c1a54671e8fc6c7daec5f5ec3d8c934be96b4989/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L726]).

Consider one such case where the Connect worker's principal doesn't have a 
WRITE ACL on the cluster's config topic. Now suppose the user submits a 
connector's configs via one of the above two APIs. The producer send 
[here|https://github.com/apache/kafka/blob/c1a54671e8fc6c7daec5f5ec3d8c934be96b4989/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L716]
 / 
[here|https://github.com/apache/kafka/blob/c1a54671e8fc6c7daec5f5ec3d8c934be96b4989/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L726]
 won't succeed (due to a TopicAuthorizationException) but the API responses 
will be `201 Created` success responses anyway. This is a very poor UX because 
the connector will actually never be created but the API response indicated 
success. Furthermore, this failure would only be detectable if TRACE logs are 
enabled (via [this 
log)|https://github.com/apache/kafka/blob/df29b17fc40f7c15460988d58bc652c3d66b60f8/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java]
 making it near impossible for users to debug. Producer callbacks should be 
used to surface write failures back to the user via the API response.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Jenkins build is unstable: Kafka » Kafka Branch Builder » trunk #1416

2022-12-08 Thread Apache Jenkins Server
See 




Jenkins build is still unstable: Kafka » Kafka Branch Builder » 3.4 #4

2022-12-08 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-879: Multi-level Rack Awareness

2022-12-08 Thread ziming deng
Hi Viktor,

As far as I know, we haven't make ReplicaPlacer a public interface, and we have 
no plan to make it public. I think you can submit a discussion or create a JIRA 
ticket directly without KIP if you have ideas on improving it, right?

--
Best,
Ziming

> On Nov 29, 2022, at 21:52, Viktor Somogyi-Vass 
>  wrote:
> 
> Hi All,
> 
> I'd like to bump this. I've also updated the KIP to incorporate the new
> KRaft changes (ReplicaPlacer). Luckily my proposals were quite similar to
> that, so mostly I've made some minor rewording, naming changes, etc.
> 
> Again, the brief summary of the KIP:
> - expose replica placement strategies with a new config
> - create an admin API and protocol to expose replica placement
> functionality (mainly for the reassignment tool)
> - create a new multi-level rack awareness strategy which improves
> availability on stretch clusters
> 
> I'm happy for any feedback.
> 
> Best,
> Viktor
> 
> On Fri, Oct 28, 2022 at 4:14 PM Viktor Somogyi-Vass <
> viktor.somo...@cloudera.com> wrote:
> 
>> Hey all,
>> 
>> I'd like to propose a new broker side replica assignment strategy and an
>> interface that generalizes replica assignment on brokers and makes them
>> pluggable.
>> 
>> Briefly, the motivation for the new replica assignment strategy is that
>> more and more of our customers would want to run their clusters in a
>> stretched environment, where for instance a cluster is running over
>> multiple regions (and multiple racks inside a region). Since this seems
>> like a more common need, we'd like to contribute back our implementation
>> and also make a generalized interface, so that new strategies that people
>> may come up with could be served better.
>> 
>> I welcome any feedback on this KIP.
>> 
>> The link:
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-879%3A+Multi-level+Rack+Awareness
>> 
>> Best to all,
>> Viktor
>>