[jira] [Commented] (KAFKA-15768) StateQueryResult#getOnlyPartitionResult should not throw for FailedQueryResult

2023-11-28 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15768?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17790603#comment-17790603
 ] 

John Roesler commented on KAFKA-15768:
--

Hey [~hanyuzheng] , thanks for the bug report!

I agree with you that if there is exactly one partition responding and it 
responds with a FailedQueryResult, then it could make sense to return it 
instead of throwing an exception.

However, I do want to clarify that an expected usage of this method is to do 
something like issue a key-based query to multiple partitions (because you 
don't know which partition hosts the key, if any) and then getting the result 
back from whichever partition responded with a non-null result. In other words, 
it should return the result if and only if all queried partitions responded 
successfully AND at most one partition returned a non-null result.

>From that perspective, it might actually be *more confusing* to return the 
>FailedQueryResult instead of throwing an exception in the 
>single-partition-response case, since it means that callers have two error 
>paths to handle. I.e., they will have to write code like:

```

{{try {}}

{{  }}{{onlyResult = result.getOnlyPartitionResult()}}

{{  if (onlyResult.isSuccessful()) {}}

{{    doSomething(onlyResult);}}

{{  } else {}}

{{    handleFailureFromResult(onlyResult);}}

{{  }}}

{{} catch (RuntimeException e) {}}

{{  handleFailureFromException(e);}}

{{}}}

```

 

Answer to the side-note question: Maybe it's a bit philosophical, but the 
object upon which you call an instance method is in some sense an argument to 
the method (e.g., `this` is always an argument to an instance method). I don't 
think IllegalStateException would be better, since the application isn't in an 
illegal state (an illegal state is one which the code author thinks should 
never be reached, I.e., it would indicate a programming error within the 
framework).

I could see an "argument" for choosing another exception type, or maybe 
declaring one for this purpose. I.e., something like an "assertion violated" 
exception might be more clear than IllegalArgumentException.

> StateQueryResult#getOnlyPartitionResult should not throw for FailedQueryResult
> --
>
> Key: KAFKA-15768
> URL: https://issues.apache.org/jira/browse/KAFKA-15768
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Hanyu Zheng
>Priority: Major
>
> Calling `StateQueryResult#getOnlyPartitionResult` crashes with an incorrect 
> `IllegalArgumentException` if any result is a `FailedQueryResult` (and even 
> if there is only a single FailedQueryResult).
> The issue is the internal `filter(r -> r.getResult() != 0)` step, that 
> blindly (and incorrectly) calls `getResult`.
> Given the semantics of `getOnlyPartitionResult` we should not care if the 
> result is SuccessQueryResult or FailedQueryResult, but only check if there is 
> a single result or not. (The user has no means to avoid getting the 
> underlying error otherwise.)
> Side-note: why does `FailedQueryResult#getResult` throw an 
> IllegalArgumentException (there is no argument passed into the method – it 
> should rather be an `IllegalStateException` – but I guess we would need a KIP 
> for this fix?)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-13531) Flaky test NamedTopologyIntegrationTest

2023-06-07 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13531?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-13531:
-
Attachment: 
org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldRemoveOneNamedTopologyWhileAnotherContinuesProcessing().test.stdout

> Flaky test NamedTopologyIntegrationTest
> ---
>
> Key: KAFKA-13531
> URL: https://issues.apache.org/jira/browse/KAFKA-13531
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Matthias J. Sax
>Assignee: Matthew de Detrich
>Priority: Critical
>  Labels: flaky-test
> Attachments: 
> org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldRemoveOneNamedTopologyWhileAnotherContinuesProcessing().test.stdout
>
>
> org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldRemoveNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets
> {quote}java.lang.AssertionError: Did not receive all 3 records from topic 
> output-stream-2 within 6 ms, currently accumulated data is [] Expected: 
> is a value equal to or greater than <3> but: <0> was less than <3> at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:648)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:368)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:336)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:644)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:617)
>  at 
> org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldRemoveNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets(NamedTopologyIntegrationTest.java:439){quote}
> STDERR
> {quote}java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.GroupSubscribedToTopicException: Deleting 
> offsets of a topic is forbidden while the consumer group is actively 
> subscribed to it. at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) 
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
>  at 
> org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper.lambda$removeNamedTopology$3(KafkaStreamsNamedTopologyWrapper.java:213)
>  at 
> org.apache.kafka.common.internals.KafkaFutureImpl.lambda$whenComplete$2(KafkaFutureImpl.java:107)
>  at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>  at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>  at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>  at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) 
> at 
> org.apache.kafka.common.internals.KafkaCompletableFuture.kafkaComplete(KafkaCompletableFuture.java:39)
>  at 
> org.apache.kafka.common.internals.KafkaFutureImpl.complete(KafkaFutureImpl.java:122)
>  at 
> org.apache.kafka.streams.processor.internals.TopologyMetadata.maybeNotifyTopologyVersionWaiters(TopologyMetadata.java:154)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.checkForTopologyUpdates(StreamThread.java:916)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:598)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:575)
>  Caused by: org.apache.kafka.common.errors.GroupSubscribedToTopicException: 
> Deleting offsets of a topic is forbidden while the consumer group is actively 
> subscribed to it. java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.GroupSubscribedToTopicException: Deleting 
> offsets of a topic is forbidden while the consumer group is actively 
> subscribed to it. at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) 
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
>  at 
> org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper.lambda$removeNamedTopology$3(KafkaStreamsNamedTopologyWrapper.java:213)
>  at 
> org.apache.kafka.common.internals.KafkaFutureImpl.lambda$whenComplete$2(KafkaFutureImpl.java:107)
>  at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>  at 
> 

[jira] [Commented] (KAFKA-13531) Flaky test NamedTopologyIntegrationTest

2023-06-07 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17730190#comment-17730190
 ] 

John Roesler commented on KAFKA-13531:
--

Seen again while verifying the 3.5.0 release artifacts (this was the only test 
failure):

Gradle Test Run :streams:test > Gradle Test Executor 554 > 
NamedTopologyIntegrationTest > 
shouldRemoveOneNamedTopologyWhileAnotherContinuesProcessing() FAILED
    java.lang.AssertionError: Did not receive all 3 records from topic 
output-stream-1 within 6 ms,  currently accumulated data is []
    Expected: is a value equal to or greater than <3>
         but: <0> was less than <3>
        at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
        at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:730)
        at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:385)
        at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:353)
        at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:726)
        at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:699)
        at 
org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldRemoveOneNamedTopologyWhileAnotherContinuesProcessing(NamedTopologyIntegrationTest.java:563)

 

logs: 
[^org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldRemoveOneNamedTopologyWhileAnotherContinuesProcessing().test.stdout]

> Flaky test NamedTopologyIntegrationTest
> ---
>
> Key: KAFKA-13531
> URL: https://issues.apache.org/jira/browse/KAFKA-13531
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Matthias J. Sax
>Assignee: Matthew de Detrich
>Priority: Critical
>  Labels: flaky-test
> Attachments: 
> org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldRemoveOneNamedTopologyWhileAnotherContinuesProcessing().test.stdout
>
>
> org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldRemoveNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets
> {quote}java.lang.AssertionError: Did not receive all 3 records from topic 
> output-stream-2 within 6 ms, currently accumulated data is [] Expected: 
> is a value equal to or greater than <3> but: <0> was less than <3> at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:648)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:368)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:336)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:644)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:617)
>  at 
> org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldRemoveNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets(NamedTopologyIntegrationTest.java:439){quote}
> STDERR
> {quote}java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.GroupSubscribedToTopicException: Deleting 
> offsets of a topic is forbidden while the consumer group is actively 
> subscribed to it. at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) 
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
>  at 
> org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper.lambda$removeNamedTopology$3(KafkaStreamsNamedTopologyWrapper.java:213)
>  at 
> org.apache.kafka.common.internals.KafkaFutureImpl.lambda$whenComplete$2(KafkaFutureImpl.java:107)
>  at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>  at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>  at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>  at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) 
> at 
> org.apache.kafka.common.internals.KafkaCompletableFuture.kafkaComplete(KafkaCompletableFuture.java:39)
>  at 
> org.apache.kafka.common.internals.KafkaFutureImpl.complete(KafkaFutureImpl.java:122)
>  at 
> 

[jira] [Updated] (KAFKA-14995) Automate asf.yaml collaborators refresh

2023-05-12 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14995?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-14995:
-
Labels: newbie  (was: )

> Automate asf.yaml collaborators refresh
> ---
>
> Key: KAFKA-14995
> URL: https://issues.apache.org/jira/browse/KAFKA-14995
> Project: Kafka
>  Issue Type: Improvement
>Reporter: John Roesler
>Priority: Minor
>  Labels: newbie
>
> We have added a policy to use the asf.yaml Github Collaborators: 
> [https://github.com/apache/kafka-site/pull/510]
> The policy states that we set this list to be the top 20 commit authors who 
> are not Kafka committers. Unfortunately, it's not trivial to compute this 
> list.
> Here is the process I followed to generate the list the first time (note that 
> I generated this list on 2023-04-28, so the lookback is one year:
> 1. List authors by commit volume in the last year:
> {code:java}
> $ git shortlog --email --numbered --summary --since=2022-04-28 | vim {code}
> 2. manually filter out the authors who are committers, based on 
> [https://kafka.apache.org/committers]
> 3. truncate the list to 20 authors
> 4. for each author
> 4a. Find a commit in the `git log` that they were the author on:
> {code:java}
> commit 440bed2391338dc10fe4d36ab17dc104b61b85e8
> Author: hudeqi <1217150...@qq.com>
> Date:   Fri May 12 14:03:17 2023 +0800
> ...{code}
> 4b. Look up that commit in Github: 
> [https://github.com/apache/kafka/commit/440bed2391338dc10fe4d36ab17dc104b61b85e8]
> 4c. Copy their Github username into .asf.yaml under both the PR whitelist and 
> the Collaborators lists.
> 5. Send a PR to update .asf.yaml: [https://github.com/apache/kafka/pull/13713]
>  
> This is pretty time consuming and is very scriptable. Two complications:
>  * To do the filtering, we need to map from Git log "Author" to documented 
> Kafka "Committer" that we can use to perform the filter. Suggestion: just 
> update the structure of the "Committers" page to include their Git "Author" 
> name and email 
> ([https://github.com/apache/kafka-site/blob/asf-site/committers.html)]
>  * To generate the YAML lists, we need to map from Git log "Author" to Github 
> username. There's presumably some way to do this in the Github REST API (the 
> mapping is based on the email, IIUC), or we could also just update the 
> Committers page to also document each committer's Github username.
>  
> Ideally, we would write this script (to be stored in the Apache Kafka repo) 
> and create a Github Action to run it every three months.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14995) Automate asf.yaml collaborators refresh

2023-05-12 Thread John Roesler (Jira)
John Roesler created KAFKA-14995:


 Summary: Automate asf.yaml collaborators refresh
 Key: KAFKA-14995
 URL: https://issues.apache.org/jira/browse/KAFKA-14995
 Project: Kafka
  Issue Type: Improvement
Reporter: John Roesler


We have added a policy to use the asf.yaml Github Collaborators: 
[https://github.com/apache/kafka-site/pull/510]

The policy states that we set this list to be the top 20 commit authors who are 
not Kafka committers. Unfortunately, it's not trivial to compute this list.

Here is the process I followed to generate the list the first time (note that I 
generated this list on 2023-04-28, so the lookback is one year:

1. List authors by commit volume in the last year:
{code:java}
$ git shortlog --email --numbered --summary --since=2022-04-28 | vim {code}
2. manually filter out the authors who are committers, based on 
[https://kafka.apache.org/committers]

3. truncate the list to 20 authors

4. for each author

4a. Find a commit in the `git log` that they were the author on:
{code:java}
commit 440bed2391338dc10fe4d36ab17dc104b61b85e8
Author: hudeqi <1217150...@qq.com>
Date:   Fri May 12 14:03:17 2023 +0800
...{code}
4b. Look up that commit in Github: 
[https://github.com/apache/kafka/commit/440bed2391338dc10fe4d36ab17dc104b61b85e8]

4c. Copy their Github username into .asf.yaml under both the PR whitelist and 
the Collaborators lists.

5. Send a PR to update .asf.yaml: [https://github.com/apache/kafka/pull/13713]

 

This is pretty time consuming and is very scriptable. Two complications:
 * To do the filtering, we need to map from Git log "Author" to documented 
Kafka "Committer" that we can use to perform the filter. Suggestion: just 
update the structure of the "Committers" page to include their Git "Author" 
name and email 
([https://github.com/apache/kafka-site/blob/asf-site/committers.html)]
 * To generate the YAML lists, we need to map from Git log "Author" to Github 
username. There's presumably some way to do this in the Github REST API (the 
mapping is based on the email, IIUC), or we could also just update the 
Committers page to also document each committer's Github username.

 

Ideally, we would write this script (to be stored in the Apache Kafka repo) and 
create a Github Action to run it every three months.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14364) Support evolving serde with Foreign Key Join

2022-11-07 Thread John Roesler (Jira)
John Roesler created KAFKA-14364:


 Summary: Support evolving serde with Foreign Key Join
 Key: KAFKA-14364
 URL: https://issues.apache.org/jira/browse/KAFKA-14364
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: John Roesler


The current implementation of Foreign-Key join uses a hash comparison to 
determine whether it should emit join results or not. See 
[https://github.com/apache/kafka/blob/807c5b4d282e7a7a16d0bb94aa2cda9566a7cc2d/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplier.java#L94-L110]

As specified in KIP-213 
([https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable]
 ), we must do a comparison of this nature in order to get correct results when 
the foreign-key reference changes, as the old reference might emit delayed 
results after the new instance generates its updated results, leading to an 
incorrect final join state.

The hash comparison prevents this race condition by ensuring that any emitted 
results correspond to the _current_ version of the left-hand-side record (and 
therefore that the foreign-key reference itself has not changed).

An undesired side-effect of this is that if users update their serdes (in a 
compatible way), for example to add a new optional field to the record, then 
the resulting hash will change for existing records. This will cause Streams to 
stop emitting results for those records until a new left-hand-side update comes 
in, recording a new hash for those records.

It should be possible to provide a fix. Some ideas:
 * only consider the foreign-key references itself in the hash function (this 
was the original proposal, but we opted to hash the entire record as an 
optimization to suppress unnecessary updates).
 * provide a user-overridable hash function. This would be more flexible, but 
also pushes a lot of complexity onto users, and opens up the possibility to 
completely break semantics.

We will need to design the solution carefully so that we can preserve the 
desired correctness guarantee.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14254) Format timestamps in assignor logs as dates instead of integers

2022-09-21 Thread John Roesler (Jira)
John Roesler created KAFKA-14254:


 Summary: Format timestamps in assignor logs as dates instead of 
integers
 Key: KAFKA-14254
 URL: https://issues.apache.org/jira/browse/KAFKA-14254
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: John Roesler


This is a follow-on task from [https://github.com/apache/kafka/pull/12582]

There is another log line that prints the same timestamp: "Triggering the 
followup rebalance scheduled for ...", which should also be printed as a 
date/time in the same manner as PR 12582.

We should also search the codebase a little to see if we're printing timestamps 
in other log lines that would be better off as date/times.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14254) Format timestamps in assignor logs as dates instead of integers

2022-09-21 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14254?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-14254:
-
Labels: newbie newbie++  (was: )

> Format timestamps in assignor logs as dates instead of integers
> ---
>
> Key: KAFKA-14254
> URL: https://issues.apache.org/jira/browse/KAFKA-14254
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Minor
>  Labels: newbie, newbie++
>
> This is a follow-on task from [https://github.com/apache/kafka/pull/12582]
> There is another log line that prints the same timestamp: "Triggering the 
> followup rebalance scheduled for ...", which should also be printed as a 
> date/time in the same manner as PR 12582.
> We should also search the codebase a little to see if we're printing 
> timestamps in other log lines that would be better off as date/times.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14253) StreamsPartitionAssignor should print the member count in assignment logs

2022-09-21 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-14253:
-
Labels: newbie newbie++  (was: newbie++)

> StreamsPartitionAssignor should print the member count in assignment logs
> -
>
> Key: KAFKA-14253
> URL: https://issues.apache.org/jira/browse/KAFKA-14253
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Minor
>  Labels: newbie, newbie++
>
> Debugging rebalance and assignment issues is harder than it needs to be. One 
> simple thing that can help is to print out information in the logs that users 
> have to compute today.
> For example, the StreamsPartitionAssignor prints two messages that contain 
> the the newline-delimited group membership:
> {code:java}
> [StreamsPartitionAssignor] [...-StreamThread-1] stream-thread 
> [...-StreamThread-1-consumer] All members participating in this rebalance:
> : []
> : []
> : []{code}
> and
> {code:java}
> [StreamsPartitionAssignor] [...-StreamThread-1] stream-thread 
> [...-StreamThread-1-consumer] Assigned tasks [...] including stateful [...] 
> to clients as:
> =[activeTasks: ([...]) standbyTasks: ([...])]
> =[activeTasks: ([...]) standbyTasks: ([...])]
> =[activeTasks: ([...]) standbyTasks: ([...])
> {code}
>  
> In both of these cases, it would be nice to:
>  # Include the number of members in the group (I.e., "15 members 
> participating" and "to 15 clients as")
>  # sort the member ids (to help compare the membership and assignment across 
> rebalances)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14253) StreamsPartitionAssignor should print the member count in assignment logs

2022-09-21 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-14253:
-
Labels: newbie++  (was: )

> StreamsPartitionAssignor should print the member count in assignment logs
> -
>
> Key: KAFKA-14253
> URL: https://issues.apache.org/jira/browse/KAFKA-14253
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Minor
>  Labels: newbie++
>
> Debugging rebalance and assignment issues is harder than it needs to be. One 
> simple thing that can help is to print out information in the logs that users 
> have to compute today.
> For example, the StreamsPartitionAssignor prints two messages that contain 
> the the newline-delimited group membership:
> {code:java}
> [StreamsPartitionAssignor] [...-StreamThread-1] stream-thread 
> [...-StreamThread-1-consumer] All members participating in this rebalance:
> : []
> : []
> : []{code}
> and
> {code:java}
> [StreamsPartitionAssignor] [...-StreamThread-1] stream-thread 
> [...-StreamThread-1-consumer] Assigned tasks [...] including stateful [...] 
> to clients as:
> =[activeTasks: ([...]) standbyTasks: ([...])]
> =[activeTasks: ([...]) standbyTasks: ([...])]
> =[activeTasks: ([...]) standbyTasks: ([...])
> {code}
>  
> In both of these cases, it would be nice to:
>  # Include the number of members in the group (I.e., "15 members 
> participating" and "to 15 clients as")
>  # sort the member ids (to help compare the membership and assignment across 
> rebalances)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14253) StreamsPartitionAssignor should print the member count in assignment logs

2022-09-21 Thread John Roesler (Jira)
John Roesler created KAFKA-14253:


 Summary: StreamsPartitionAssignor should print the member count in 
assignment logs
 Key: KAFKA-14253
 URL: https://issues.apache.org/jira/browse/KAFKA-14253
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: John Roesler


Debugging rebalance and assignment issues is harder than it needs to be. One 
simple thing that can help is to print out information in the logs that users 
have to compute today.

For example, the StreamsPartitionAssignor prints two messages that contain the 
the newline-delimited group membership:
{code:java}
[StreamsPartitionAssignor] [...-StreamThread-1] stream-thread 
[...-StreamThread-1-consumer] All members participating in this rebalance:

: []

: []

: []{code}
and
{code:java}
[StreamsPartitionAssignor] [...-StreamThread-1] stream-thread 
[...-StreamThread-1-consumer] Assigned tasks [...] including stateful [...] to 
clients as:

=[activeTasks: ([...]) standbyTasks: ([...])]

=[activeTasks: ([...]) standbyTasks: ([...])]

=[activeTasks: ([...]) standbyTasks: ([...])
{code}
 

In both of these cases, it would be nice to:
 # Include the number of members in the group (I.e., "15 members participating" 
and "to 15 clients as")
 # sort the member ids (to help compare the membership and assignment across 
rebalances)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14202) IQv2: Expose binary store schema to store implementations

2022-09-06 Thread John Roesler (Jira)
John Roesler created KAFKA-14202:


 Summary: IQv2: Expose binary store schema to store implementations
 Key: KAFKA-14202
 URL: https://issues.apache.org/jira/browse/KAFKA-14202
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: John Roesler


One feature of IQv2 is that store implementations can handle custom queries. 
Many custom query handlers will need to process the key or value bytes, for 
example deserializing them to implement some filter or aggregations, or even 
performing binary operations on them.

For the most part, this should be straightforward for users, since they provide 
Streams with the serdes, the store implementation, and the custom queries.

However, Streams will sometimes pack extra data around the data produced by the 
user-provided serdes. For example, the Timestamped store wrappers add a 
timestamp on the beginning of the value byte array. And in Windowed stores, we 
add window timestamps to the key bytes.

It would be nice to have some generic mechanism to communicate those schemas to 
the user-provided inner store layers to support users who need to write custom 
queries. For example, perhaps we can add an extractor class to the state store 
context



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-13978) Trigger UncaughtExceptionHandler for IllegalArgument and IllegalState exceptions

2022-08-10 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17578023#comment-17578023
 ] 

John Roesler commented on KAFKA-13978:
--

Hey [~bryves]  , I just checked trunk and 3.3, and I don't see the code that 
you're removing in your PR. It seems like someone else may have fixed this 
issue in the mean time. Can you confirm whether or not the issue is resolved?

> Trigger UncaughtExceptionHandler for IllegalArgument and IllegalState 
> exceptions
> 
>
> Key: KAFKA-13978
> URL: https://issues.apache.org/jira/browse/KAFKA-13978
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.1.0, 3.2.0, 3.1.1
>Reporter: Ben
>Assignee: Ben
>Priority: Blocker
> Fix For: 3.3.0
>
>
> In [KAFKA-12887|https://issues.apache.org/jira/browse/KAFKA-12887] 
> [(PR)|https://github.com/apache/kafka/pull/11228/files] changes were made to 
> prevent thrown IllegalStateException and IllegalArgumentExceptions from being 
> passed to a registered exception handler.
>  
> I believe these changes should be reverted for the following reasons:
>  * Making this change is backwards incompatible with existing applications 
> which may have expected those exceptions to be handled.
>  
>  * Users can (and do!) throw these exceptions, often for legitimate reasons. 
> For instance, IllegalArgumentException is thrown when a method is passed the 
> wrong argument. This is exactly the type of uncaught exception a user would 
> expect to be handled by the uncaught exception handler, rather than by the 
> calling code.
>  
>  * The change is inconsistent. Why only these two exceptions, and not all 
> runtime exceptions?
>  
>  * The change is not well documented. There are even tutorial resources which 
> actually use these exceptions, [for example 
> here|https://developer.confluent.io/tutorials/error-handling/confluent.html]. 
> If we make this change, it should be better communicated. As implemented, it 
> is extremely surprising that this happens.
>  
>  * Finally, what value is the change actually adding to the project? It 
> restricts user freedom, increases complexity, and does not improve safety. We 
> should only make a backwards-incompatible change like this if there is clear 
> value in doing so.
>  
> As a note, reverting this is not (in my view) going to impact users 
> negatively. It is unlikely many people depend on this functionality, and if 
> they do, it should be easy to communicate in the release notes, and for them 
> to adjust their code accordingly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14020) Performance regression in Producer

2022-07-12 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17566017#comment-17566017
 ] 

John Roesler commented on KAFKA-14020:
--

Hey [~alivshits] , thanks for your work on 
[https://github.com/apache/kafka/pull/12365] .

I've just re-run the same benchmark above and confirmed that your PR fixes the 
perf regression. Thank you!

As a reminder, this was the baseline for "good" performance:
Commit: 
[{{e3202b9}}|https://github.com/apache/kafka/commit/e3202b9ef4c63aab2e5ab049978704282792]
 (the parent of the problematic commit)
TPut: *118k±1k*

And when I ran the same benchmark on 
[{{3a6500b}}|https://github.com/apache/kafka/commit/3a6500bb12b8c5716f7d99b6cec1c521f6f029c2]
 , I got:
TPut: *117k±1k*

> Performance regression in Producer
> --
>
> Key: KAFKA-14020
> URL: https://issues.apache.org/jira/browse/KAFKA-14020
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 3.3.0
>Reporter: John Roesler
>Assignee: Artem Livshits
>Priority: Blocker
>
> [https://github.com/apache/kafka/commit/f7db6031b84a136ad0e257df722b20faa7c37b8a]
>  introduced a 10% performance regression in the KafkaProducer under a default 
> config.
>  
> The context for this result is a benchmark that we run for Kafka Streams. The 
> benchmark provisions 5 independent AWS clusters, including one broker node on 
> an i3.large and one client node on an i3.large. During a benchmark run, we 
> first run the Producer for 10 minutes to generate test data, and then we run 
> Kafka Streams under a number of configurations to measure its performance.
> Our observation was a 10% regression in throughput under the simplest 
> configuration, in which Streams simply consumes from a topic and does nothing 
> else. That benchmark actually runs faster than the producer that generates 
> the test data, so its thoughput is bounded by the data generator's 
> throughput. After investigation, we realized that the regression was in the 
> data generator, not the consumer or Streams.
> We have numerous benchmark runs leading up to the commit in question, and 
> they all show a throughput in the neighborhood of 115,000 records per second. 
> We also have 40 runs including and after that commit, and they all show a 
> throughput in the neighborhood of 105,000 records per second. A test on 
> [trunk with the commit reverted |https://github.com/apache/kafka/pull/12342] 
> shows a return to around 115,000 records per second.
> Config:
> {code:java}
> final Properties properties = new Properties();
> properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker);
> properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class);
> properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class);
> {code}
> Here's the producer code in the data generator. Our tests were running with 
> three produceThreads.
> {code:java}
>  for (int t = 0; t < produceThreads; t++) {
> futures.add(executorService.submit(() -> {
> int threadTotal = 0;
> long lastPrint = start;
> final long printInterval = Duration.ofSeconds(10).toMillis();
> long now;
> try (final org.apache.kafka.clients.producer.Producer 
> producer = new KafkaProducer<>(producerConfig(broker))) {
> while (limit > (now = System.currentTimeMillis()) - start) {
> for (int i = 0; i < 1000; i++) {
> final String key = keys.next();
> final String data = dataGen.generate();
> producer.send(new ProducerRecord<>(topic, key, 
> valueBuilder.apply(key, data)));
> threadTotal++;
> }
> if ((now - lastPrint) > printInterval) {
> System.out.println(Thread.currentThread().getName() + " 
> produced " + numberFormat.format(threadTotal) + " to " + topic + " in " + 
> Duration.ofMillis(now - start));
> lastPrint = now;
> }
> }
> }
> total.addAndGet(threadTotal);
> System.out.println(Thread.currentThread().getName() + " finished (" + 
> numberFormat.format(threadTotal) + ") in " + Duration.ofMillis(now - start));
> }));
> }{code}
> As you can see, this is a very basic usage.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14020) Performance regression in Producer

2022-06-25 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17558796#comment-17558796
 ] 

John Roesler commented on KAFKA-14020:
--

{color:#1d1c1d}FYI, just setting the partitioner back to the 
{color}{{DefaultPartitioner}}{color:#1d1c1d} does not appear to help. The 
throughput of that test was 105k±2k records per second.{color}

{color:#1d1c1d}Code under test: 
{color}[https://github.com/apache/kafka/commit/6c67adb8beedafca0316d1c9ec4a3c219aaec219]

> Performance regression in Producer
> --
>
> Key: KAFKA-14020
> URL: https://issues.apache.org/jira/browse/KAFKA-14020
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 3.3.0
>Reporter: John Roesler
>Priority: Blocker
>
> [https://github.com/apache/kafka/commit/f7db6031b84a136ad0e257df722b20faa7c37b8a]
>  introduced a 10% performance regression in the KafkaProducer under a default 
> config.
>  
> The context for this result is a benchmark that we run for Kafka Streams. The 
> benchmark provisions 5 independent AWS clusters, including one broker node on 
> an i3.large and one client node on an i3.large. During a benchmark run, we 
> first run the Producer for 10 minutes to generate test data, and then we run 
> Kafka Streams under a number of configurations to measure its performance.
> Our observation was a 10% regression in throughput under the simplest 
> configuration, in which Streams simply consumes from a topic and does nothing 
> else. That benchmark actually runs faster than the producer that generates 
> the test data, so its thoughput is bounded by the data generator's 
> throughput. After investigation, we realized that the regression was in the 
> data generator, not the consumer or Streams.
> We have numerous benchmark runs leading up to the commit in question, and 
> they all show a throughput in the neighborhood of 115,000 records per second. 
> We also have 40 runs including and after that commit, and they all show a 
> throughput in the neighborhood of 105,000 records per second. A test on 
> [trunk with the commit reverted |https://github.com/apache/kafka/pull/12342] 
> shows a return to around 115,000 records per second.
> Config:
> {code:java}
> final Properties properties = new Properties();
> properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker);
> properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class);
> properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class);
> {code}
> Here's the producer code in the data generator. Our tests were running with 
> three produceThreads.
> {code:java}
>  for (int t = 0; t < produceThreads; t++) {
> futures.add(executorService.submit(() -> {
> int threadTotal = 0;
> long lastPrint = start;
> final long printInterval = Duration.ofSeconds(10).toMillis();
> long now;
> try (final org.apache.kafka.clients.producer.Producer 
> producer = new KafkaProducer<>(producerConfig(broker))) {
> while (limit > (now = System.currentTimeMillis()) - start) {
> for (int i = 0; i < 1000; i++) {
> final String key = keys.next();
> final String data = dataGen.generate();
> producer.send(new ProducerRecord<>(topic, key, 
> valueBuilder.apply(key, data)));
> threadTotal++;
> }
> if ((now - lastPrint) > printInterval) {
> System.out.println(Thread.currentThread().getName() + " 
> produced " + numberFormat.format(threadTotal) + " to " + topic + " in " + 
> Duration.ofMillis(now - start));
> lastPrint = now;
> }
> }
> }
> total.addAndGet(threadTotal);
> System.out.println(Thread.currentThread().getName() + " finished (" + 
> numberFormat.format(threadTotal) + ") in " + Duration.ofMillis(now - start));
> }));
> }{code}
> As you can see, this is a very basic usage.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (KAFKA-14020) Performance regression in Producer

2022-06-24 Thread John Roesler (Jira)
John Roesler created KAFKA-14020:


 Summary: Performance regression in Producer
 Key: KAFKA-14020
 URL: https://issues.apache.org/jira/browse/KAFKA-14020
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 3.3.0
Reporter: John Roesler


[https://github.com/apache/kafka/commit/f7db6031b84a136ad0e257df722b20faa7c37b8a]
 introduced a 10% performance regression in the KafkaProducer under a default 
config.

 

The context for this result is a benchmark that we run for Kafka Streams. The 
benchmark provisions 5 independent AWS clusters, including one broker node on 
an i3.large and one client node on an i3.large. During a benchmark run, we 
first run the Producer for 10 minutes to generate test data, and then we run 
Kafka Streams under a number of configurations to measure its performance.

Our observation was a 10% regression in throughput under the simplest 
configuration, in which Streams simply consumes from a topic and does nothing 
else. That benchmark actually runs faster than the producer that generates the 
test data, so its thoughput is bounded by the data generator's throughput. 
After investigation, we realized that the regression was in the data generator, 
not the consumer or Streams.

We have numerous benchmark runs leading up to the commit in question, and they 
all show a throughput in the neighborhood of 115,000 records per second. We 
also have 40 runs including and after that commit, and they all show a 
throughput in the neighborhood of 105,000 records per second. A test on [trunk 
with the commit reverted |https://github.com/apache/kafka/pull/12342] shows a 
return to around 115,000 records per second.

Config:
{code:java}
final Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class);
{code}
Here's the producer code in the data generator. Our tests were running with 
three produceThreads.
{code:java}
 for (int t = 0; t < produceThreads; t++) {
futures.add(executorService.submit(() -> {
int threadTotal = 0;
long lastPrint = start;
final long printInterval = Duration.ofSeconds(10).toMillis();
long now;
try (final org.apache.kafka.clients.producer.Producer 
producer = new KafkaProducer<>(producerConfig(broker))) {
while (limit > (now = System.currentTimeMillis()) - start) {
for (int i = 0; i < 1000; i++) {
final String key = keys.next();
final String data = dataGen.generate();

producer.send(new ProducerRecord<>(topic, key, 
valueBuilder.apply(key, data)));

threadTotal++;
}

if ((now - lastPrint) > printInterval) {
System.out.println(Thread.currentThread().getName() + " 
produced " + numberFormat.format(threadTotal) + " to " + topic + " in " + 
Duration.ofMillis(now - start));
lastPrint = now;
}
}
}
total.addAndGet(threadTotal);
System.out.println(Thread.currentThread().getName() + " finished (" + 
numberFormat.format(threadTotal) + ") in " + Duration.ofMillis(now - start));
}));
}{code}
As you can see, this is a very basic usage.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Resolved] (KAFKA-13654) Extend KStream process with new Processor API

2022-04-19 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13654?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-13654.
--
Fix Version/s: 3.3.0
   Resolution: Fixed

> Extend KStream process with new Processor API
> -
>
> Key: KAFKA-13654
> URL: https://issues.apache.org/jira/browse/KAFKA-13654
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jorge Esteban Quilcate Otoya
>Assignee: Jorge Esteban Quilcate Otoya
>Priority: Major
>  Labels: kafka-streams, kip-required, needs-kip, streams
> Fix For: 3.3.0
>
>
> Extending KStream#process to use latest Processor API adopted here: 
> https://issues.apache.org/jira/browse/KAFKA-8410
> This new API allow typed returned KStream that will allow to chain results 
> from processors, becoming a new way to transform records with more control 
> over whats forwarded.
> KIP: https://cwiki.apache.org/confluence/x/yKbkCw



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Assigned] (KAFKA-13654) Extend KStream process with new Processor API

2022-04-19 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13654?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler reassigned KAFKA-13654:


Assignee: Jorge Esteban Quilcate Otoya

> Extend KStream process with new Processor API
> -
>
> Key: KAFKA-13654
> URL: https://issues.apache.org/jira/browse/KAFKA-13654
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jorge Esteban Quilcate Otoya
>Assignee: Jorge Esteban Quilcate Otoya
>Priority: Major
>  Labels: kafka-streams, kip-required, needs-kip, streams
>
> Extending KStream#process to use latest Processor API adopted here: 
> https://issues.apache.org/jira/browse/KAFKA-8410
> This new API allow typed returned KStream that will allow to chain results 
> from processors, becoming a new way to transform records with more control 
> over whats forwarded.
> KIP: https://cwiki.apache.org/confluence/x/yKbkCw



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (KAFKA-13820) Add debug-level logs to explain why a store is filtered out during interactive query

2022-04-11 Thread John Roesler (Jira)
John Roesler created KAFKA-13820:


 Summary: Add debug-level logs to explain why a store is filtered 
out during interactive query
 Key: KAFKA-13820
 URL: https://issues.apache.org/jira/browse/KAFKA-13820
 Project: Kafka
  Issue Type: Improvement
Reporter: John Roesler


Currently Kafka Streams throws an InvalidStateStoreException when the desired 
store is not present on the local instance. It also throws the same exception 
with the same message when the store is present, but it not active (and stale 
queries are disabled).

This is an important distinction when debugging store unavailability, and a 
debug-level log is an un-intrusive mechanism to expose the information.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13819) Add application.server to Streams assignor logs when set

2022-04-11 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13819?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-13819:
-
Component/s: streams

> Add application.server to Streams assignor logs when set
> 
>
> Key: KAFKA-13819
> URL: https://issues.apache.org/jira/browse/KAFKA-13819
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Minor
>
> Currently, Streams assignment logs only include the consumer client id and 
> the streams application id, but those are both randomly generated UUIDs that 
> are not easy to coordinate to users' concept of the name of a host. To help 
> bridge this gap, we can include the application.server (when set) in 
> assignment logs. That way, users will also be able to see which host and port 
> each member is associated with.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13818) Add generation to consumer assignor logs

2022-04-11 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13818?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-13818:
-
Component/s: consumer

> Add generation to consumer assignor logs
> 
>
> Key: KAFKA-13818
> URL: https://issues.apache.org/jira/browse/KAFKA-13818
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Minor
>
> Reading assignor logs is really confusing in large part because they are
> spread across different layers of abstraction (the ConsumerCoordinator
> and the ConsumerPartitionAssignor, which in Streams consists of several
> layers of its own). Each layer in the abstraction reports useful information
> that only it has access to, but because they are split over multiple lines, 
> with
> multiple members in the cluster, and (often) multiple rebalances taking place
> in rapid succession, it's often hard to understand which logs are part of
> which rebalance.
>  
> One thing we don't want to do is break encapsulation by exposing too much of 
> the ConsumerCoordinator's internal state to components like the pluggable 
> ConsumerPartitionAssignor.
>  
> We can accomplish what we want by adding the concept of a dynamic log 
> context, so that the ConsumerCoordinator can add dynamic information like the 
> generation id to be logged for correlation in other components without 
> exposing any new information or metadata to those components themselves.
> See [https://github.com/apache/kafka/pull/12020] for example.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13820) Add debug-level logs to explain why a store is filtered out during interactive query

2022-04-11 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13820?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-13820:
-
Component/s: streams

> Add debug-level logs to explain why a store is filtered out during 
> interactive query
> 
>
> Key: KAFKA-13820
> URL: https://issues.apache.org/jira/browse/KAFKA-13820
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Minor
>
> Currently Kafka Streams throws an InvalidStateStoreException when the desired 
> store is not present on the local instance. It also throws the same exception 
> with the same message when the store is present, but it not active (and stale 
> queries are disabled).
> This is an important distinction when debugging store unavailability, and a 
> debug-level log is an un-intrusive mechanism to expose the information.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13819) Add application.server to Streams assignor logs when set

2022-04-11 Thread John Roesler (Jira)
John Roesler created KAFKA-13819:


 Summary: Add application.server to Streams assignor logs when set
 Key: KAFKA-13819
 URL: https://issues.apache.org/jira/browse/KAFKA-13819
 Project: Kafka
  Issue Type: Improvement
Reporter: John Roesler


Currently, Streams assignment logs only include the consumer client id and the 
streams application id, but those are both randomly generated UUIDs that are 
not easy to coordinate to users' concept of the name of a host. To help bridge 
this gap, we can include the application.server (when set) in assignment logs. 
That way, users will also be able to see which host and port each member is 
associated with.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13818) Add generation to consumer assignor logs

2022-04-11 Thread John Roesler (Jira)
John Roesler created KAFKA-13818:


 Summary: Add generation to consumer assignor logs
 Key: KAFKA-13818
 URL: https://issues.apache.org/jira/browse/KAFKA-13818
 Project: Kafka
  Issue Type: Improvement
Reporter: John Roesler


Reading assignor logs is really confusing in large part because they are
spread across different layers of abstraction (the ConsumerCoordinator
and the ConsumerPartitionAssignor, which in Streams consists of several
layers of its own). Each layer in the abstraction reports useful information
that only it has access to, but because they are split over multiple lines, with
multiple members in the cluster, and (often) multiple rebalances taking place
in rapid succession, it's often hard to understand which logs are part of
which rebalance.

 

One thing we don't want to do is break encapsulation by exposing too much of 
the ConsumerCoordinator's internal state to components like the pluggable 
ConsumerPartitionAssignor.

 

We can accomplish what we want by adding the concept of a dynamic log context, 
so that the ConsumerCoordinator can add dynamic information like the generation 
id to be logged for correlation in other components without exposing any new 
information or metadata to those components themselves.

See [https://github.com/apache/kafka/pull/12020] for example.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (KAFKA-13818) Add generation to consumer assignor logs

2022-04-11 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13818?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler reassigned KAFKA-13818:


Assignee: John Roesler

> Add generation to consumer assignor logs
> 
>
> Key: KAFKA-13818
> URL: https://issues.apache.org/jira/browse/KAFKA-13818
> Project: Kafka
>  Issue Type: Improvement
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Minor
>
> Reading assignor logs is really confusing in large part because they are
> spread across different layers of abstraction (the ConsumerCoordinator
> and the ConsumerPartitionAssignor, which in Streams consists of several
> layers of its own). Each layer in the abstraction reports useful information
> that only it has access to, but because they are split over multiple lines, 
> with
> multiple members in the cluster, and (often) multiple rebalances taking place
> in rapid succession, it's often hard to understand which logs are part of
> which rebalance.
>  
> One thing we don't want to do is break encapsulation by exposing too much of 
> the ConsumerCoordinator's internal state to components like the pluggable 
> ConsumerPartitionAssignor.
>  
> We can accomplish what we want by adding the concept of a dynamic log 
> context, so that the ConsumerCoordinator can add dynamic information like the 
> generation id to be logged for correlation in other components without 
> exposing any new information or metadata to those components themselves.
> See [https://github.com/apache/kafka/pull/12020] for example.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-13479) Interactive Query v2

2022-04-06 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13479?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-13479.
--
Fix Version/s: 3.2.0
   Resolution: Fixed

> Interactive Query v2
> 
>
> Key: KAFKA-13479
> URL: https://issues.apache.org/jira/browse/KAFKA-13479
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
> Fix For: 3.2.0
>
>
> Kafka Streams supports an interesting and innovative API for "peeking" into 
> the internal state of running stateful stream processors from outside of the 
> application, called Interactive Query (IQ). This functionality has proven 
> invaluable to users over the years for everything from debugging running 
> applications to serving low latency queries straight from the Streams runtime.
> However, the actual interfaces for IQ were designed in the very early days of 
> Kafka Streams, before the project had gained significant adoption, and in the 
> absence of much precedent for this kind of API in peer projects. With the 
> benefit of hindsight, we can observe several problems with the original 
> design that we hope to address in a revised framework that will serve Streams 
> users well for many years to come.
>  
> This ticket tracks the implementation of KIP-796: 
> https://cwiki.apache.org/confluence/x/34xnCw



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Closed] (KAFKA-13479) Interactive Query v2

2022-04-06 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13479?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler closed KAFKA-13479.


> Interactive Query v2
> 
>
> Key: KAFKA-13479
> URL: https://issues.apache.org/jira/browse/KAFKA-13479
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
> Fix For: 3.2.0
>
>
> Kafka Streams supports an interesting and innovative API for "peeking" into 
> the internal state of running stateful stream processors from outside of the 
> application, called Interactive Query (IQ). This functionality has proven 
> invaluable to users over the years for everything from debugging running 
> applications to serving low latency queries straight from the Streams runtime.
> However, the actual interfaces for IQ were designed in the very early days of 
> Kafka Streams, before the project had gained significant adoption, and in the 
> absence of much precedent for this kind of API in peer projects. With the 
> benefit of hindsight, we can observe several problems with the original 
> design that we hope to address in a revised framework that will serve Streams 
> users well for many years to come.
>  
> This ticket tracks the implementation of KIP-796: 
> https://cwiki.apache.org/confluence/x/34xnCw



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13622) Revisit the complexity of position tracking in state stores

2022-04-06 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13622?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-13622:
-
Parent: (was: KAFKA-13479)
Issue Type: Improvement  (was: Sub-task)

> Revisit the complexity of position tracking in state stores
> ---
>
> Key: KAFKA-13622
> URL: https://issues.apache.org/jira/browse/KAFKA-13622
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Minor
>  Labels: IQv2
>
> Currently, state store implementers have a significant burden to track 
> position correctly. They have to:
>  * update the position during all puts
>  * implement the RecordBatchingStateRestoreCallback and use the 
> {color:#00}ChangelogRecordDeserializationHelper to update the position 
> based on record headers{color}
>  * {color:#00}implement some mechanism to restore the position after a 
> restart if the store is persistent (such as supply a CommitCallback to write 
> the position to a local file and then read the file during init){color}
> {color:#00}[~guozhang] pointed out during review that this is probably 
> too much responsibility (and certainly too much opportunity for error). We 
> should see what we can do to simplify these responsibilities, if not 
> eliminate them entirely from the store implementer's scope of concern.
> {color}
>  
> {color:#00}See 
> https://github.com/apache/kafka/pull/11676#discussion_r790358058{color}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13548) IQv2: revisit WindowKeyQuery and WindowRangeQuery

2022-04-06 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-13548:
-
Parent: (was: KAFKA-13479)
Issue Type: Improvement  (was: Sub-task)

> IQv2: revisit WindowKeyQuery and WindowRangeQuery
> -
>
> Key: KAFKA-13548
> URL: https://issues.apache.org/jira/browse/KAFKA-13548
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Minor
>  Labels: IQv2
>
> During discussion of KIP-806, there was a suggestion to refactor the queries 
> following a builder pattern so that we can compactly and flexibly specify 
> lower and upper bounds on the keys, window start times, and window end times.
> We should circle back and try to generalize the queries' interfaces before 
> the first release of IQv2.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13622) Revisit the complexity of position tracking in state stores

2022-04-06 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13622?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-13622:
-
Labels: IQv2  (was: )

> Revisit the complexity of position tracking in state stores
> ---
>
> Key: KAFKA-13622
> URL: https://issues.apache.org/jira/browse/KAFKA-13622
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: John Roesler
>Priority: Minor
>  Labels: IQv2
>
> Currently, state store implementers have a significant burden to track 
> position correctly. They have to:
>  * update the position during all puts
>  * implement the RecordBatchingStateRestoreCallback and use the 
> {color:#00}ChangelogRecordDeserializationHelper to update the position 
> based on record headers{color}
>  * {color:#00}implement some mechanism to restore the position after a 
> restart if the store is persistent (such as supply a CommitCallback to write 
> the position to a local file and then read the file during init){color}
> {color:#00}[~guozhang] pointed out during review that this is probably 
> too much responsibility (and certainly too much opportunity for error). We 
> should see what we can do to simplify these responsibilities, if not 
> eliminate them entirely from the store implementer's scope of concern.
> {color}
>  
> {color:#00}See 
> https://github.com/apache/kafka/pull/11676#discussion_r790358058{color}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13622) Revisit the complexity of position tracking in state stores

2022-04-06 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13622?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-13622:
-
Component/s: streams

> Revisit the complexity of position tracking in state stores
> ---
>
> Key: KAFKA-13622
> URL: https://issues.apache.org/jira/browse/KAFKA-13622
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: John Roesler
>Priority: Minor
>  Labels: IQv2
>
> Currently, state store implementers have a significant burden to track 
> position correctly. They have to:
>  * update the position during all puts
>  * implement the RecordBatchingStateRestoreCallback and use the 
> {color:#00}ChangelogRecordDeserializationHelper to update the position 
> based on record headers{color}
>  * {color:#00}implement some mechanism to restore the position after a 
> restart if the store is persistent (such as supply a CommitCallback to write 
> the position to a local file and then read the file during init){color}
> {color:#00}[~guozhang] pointed out during review that this is probably 
> too much responsibility (and certainly too much opportunity for error). We 
> should see what we can do to simplify these responsibilities, if not 
> eliminate them entirely from the store implementer's scope of concern.
> {color}
>  
> {color:#00}See 
> https://github.com/apache/kafka/pull/11676#discussion_r790358058{color}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13548) IQv2: revisit WindowKeyQuery and WindowRangeQuery

2022-04-06 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-13548:
-
Component/s: streams

> IQv2: revisit WindowKeyQuery and WindowRangeQuery
> -
>
> Key: KAFKA-13548
> URL: https://issues.apache.org/jira/browse/KAFKA-13548
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: John Roesler
>Priority: Minor
>  Labels: IQv2
>
> During discussion of KIP-806, there was a suggestion to refactor the queries 
> following a builder pattern so that we can compactly and flexibly specify 
> lower and upper bounds on the keys, window start times, and window end times.
> We should circle back and try to generalize the queries' interfaces before 
> the first release of IQv2.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13541) Make IQv2 query/store interface type safe

2022-04-06 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-13541:
-
Parent: (was: KAFKA-13479)
Issue Type: Improvement  (was: Sub-task)

> Make IQv2 query/store interface type safe
> -
>
> Key: KAFKA-13541
> URL: https://issues.apache.org/jira/browse/KAFKA-13541
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Patrick Stuedi
>Assignee: Patrick Stuedi
>Priority: Minor
>  Labels: IQv2
>
> Currently the new IQv2 interface allows applications to query state stores 
> using subclasses of the Query type. Unfortunately there is currently no 
> way to check that the template type of the query matches the type of the 
> relevant store the query is executed on. As a consequence stores have to do a 
> set of unsafe casts.
> This ticket is to explore ways to make the query interface type safe where 
> only type mismatches are detected at compile time.  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13548) IQv2: revisit WindowKeyQuery and WindowRangeQuery

2022-04-06 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-13548:
-
Labels: IQv2  (was: )

> IQv2: revisit WindowKeyQuery and WindowRangeQuery
> -
>
> Key: KAFKA-13548
> URL: https://issues.apache.org/jira/browse/KAFKA-13548
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: John Roesler
>Priority: Minor
>  Labels: IQv2
>
> During discussion of KIP-806, there was a suggestion to refactor the queries 
> following a builder pattern so that we can compactly and flexibly specify 
> lower and upper bounds on the keys, window start times, and window end times.
> We should circle back and try to generalize the queries' interfaces before 
> the first release of IQv2.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13479) Interactive Query v2

2022-04-06 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17518567#comment-17518567
 ] 

John Roesler commented on KAFKA-13479:
--

Sorry about that, [~cadonna] . I'll do it now.

All the follow-on IQv2 work will be available under the label `IQv2`: 
https://issues.apache.org/jira/issues/?jql=labels%20%3D%20IQv2

> Interactive Query v2
> 
>
> Key: KAFKA-13479
> URL: https://issues.apache.org/jira/browse/KAFKA-13479
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
>
> Kafka Streams supports an interesting and innovative API for "peeking" into 
> the internal state of running stateful stream processors from outside of the 
> application, called Interactive Query (IQ). This functionality has proven 
> invaluable to users over the years for everything from debugging running 
> applications to serving low latency queries straight from the Streams runtime.
> However, the actual interfaces for IQ were designed in the very early days of 
> Kafka Streams, before the project had gained significant adoption, and in the 
> absence of much precedent for this kind of API in peer projects. With the 
> benefit of hindsight, we can observe several problems with the original 
> design that we hope to address in a revised framework that will serve Streams 
> users well for many years to come.
>  
> This ticket tracks the implementation of KIP-796: 
> https://cwiki.apache.org/confluence/x/34xnCw



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13541) Make IQv2 query/store interface type safe

2022-04-06 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-13541:
-
Component/s: streams

> Make IQv2 query/store interface type safe
> -
>
> Key: KAFKA-13541
> URL: https://issues.apache.org/jira/browse/KAFKA-13541
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Patrick Stuedi
>Assignee: Patrick Stuedi
>Priority: Minor
>  Labels: IQv2
>
> Currently the new IQv2 interface allows applications to query state stores 
> using subclasses of the Query type. Unfortunately there is currently no 
> way to check that the template type of the query matches the type of the 
> relevant store the query is executed on. As a consequence stores have to do a 
> set of unsafe casts.
> This ticket is to explore ways to make the query interface type safe where 
> only type mismatches are detected at compile time.  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13541) Make IQv2 query/store interface type safe

2022-04-06 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-13541:
-
Labels: IQv2  (was: )

> Make IQv2 query/store interface type safe
> -
>
> Key: KAFKA-13541
> URL: https://issues.apache.org/jira/browse/KAFKA-13541
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Patrick Stuedi
>Assignee: Patrick Stuedi
>Priority: Minor
>  Labels: IQv2
>
> Currently the new IQv2 interface allows applications to query state stores 
> using subclasses of the Query type. Unfortunately there is currently no 
> way to check that the template type of the query matches the type of the 
> relevant store the query is executed on. As a consequence stores have to do a 
> set of unsafe casts.
> This ticket is to explore ways to make the query interface type safe where 
> only type mismatches are detected at compile time.  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13526) IQv2: Consider more generic logic for mapping between binary and typed queries

2022-04-06 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-13526:
-
Labels: IQv2  (was: )

> IQv2: Consider more generic logic for mapping between binary and typed queries
> --
>
> Key: KAFKA-13526
> URL: https://issues.apache.org/jira/browse/KAFKA-13526
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: John Roesler
>Priority: Minor
>  Labels: IQv2
>
> Right now, typed queries (like KeyQuery) need to be specially handled and 
> translated to their binary counterparts (like RawKeyQuery). This happens in 
> the Metered store layers, where the serdes are known. It is necessary because 
> lower store layers are only able to handle binary data (because they don't 
> know the serdes).
> This situation is not ideal, since the Metered store layers will grow to host 
> quite a bit of query handling and translation logic, because the relationship 
> between typed queries and binary counterparts is not obvious, and because we 
> can only automatically translate known query types. User-supplied queries and 
> stores will have to work things out using their a-priori knowledge of the 
> serdes.
>  
> One suggestion (from [~mjsax] ) is to come up with some kind of generic 
> "query mapping" API, which the Metered stores would use to translate back and 
> forth between typed and raw keys and values. Users would be able to supply 
> their own mappings along with their custom queries.
> Another option would be to have the Metered stores attach the serdes to the 
> query on the way down and then to the result on the way up. Then, the serdes 
> would be available in the bytes store (as part of the request) and to the 
> users when they get their results (as part of the response).
> Other options may also surface once we start playing with ideas.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13526) IQv2: Consider more generic logic for mapping between binary and typed queries

2022-04-06 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-13526:
-
Parent: (was: KAFKA-13479)
Issue Type: Improvement  (was: Sub-task)

> IQv2: Consider more generic logic for mapping between binary and typed queries
> --
>
> Key: KAFKA-13526
> URL: https://issues.apache.org/jira/browse/KAFKA-13526
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Minor
>  Labels: IQv2
>
> Right now, typed queries (like KeyQuery) need to be specially handled and 
> translated to their binary counterparts (like RawKeyQuery). This happens in 
> the Metered store layers, where the serdes are known. It is necessary because 
> lower store layers are only able to handle binary data (because they don't 
> know the serdes).
> This situation is not ideal, since the Metered store layers will grow to host 
> quite a bit of query handling and translation logic, because the relationship 
> between typed queries and binary counterparts is not obvious, and because we 
> can only automatically translate known query types. User-supplied queries and 
> stores will have to work things out using their a-priori knowledge of the 
> serdes.
>  
> One suggestion (from [~mjsax] ) is to come up with some kind of generic 
> "query mapping" API, which the Metered stores would use to translate back and 
> forth between typed and raw keys and values. Users would be able to supply 
> their own mappings along with their custom queries.
> Another option would be to have the Metered stores attach the serdes to the 
> query on the way down and then to the result on the way up. Then, the serdes 
> would be available in the bytes store (as part of the request) and to the 
> users when they get their results (as part of the response).
> Other options may also surface once we start playing with ideas.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13526) IQv2: Consider more generic logic for mapping between binary and typed queries

2022-04-06 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-13526:
-
Component/s: streams

> IQv2: Consider more generic logic for mapping between binary and typed queries
> --
>
> Key: KAFKA-13526
> URL: https://issues.apache.org/jira/browse/KAFKA-13526
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: John Roesler
>Priority: Minor
>  Labels: IQv2
>
> Right now, typed queries (like KeyQuery) need to be specially handled and 
> translated to their binary counterparts (like RawKeyQuery). This happens in 
> the Metered store layers, where the serdes are known. It is necessary because 
> lower store layers are only able to handle binary data (because they don't 
> know the serdes).
> This situation is not ideal, since the Metered store layers will grow to host 
> quite a bit of query handling and translation logic, because the relationship 
> between typed queries and binary counterparts is not obvious, and because we 
> can only automatically translate known query types. User-supplied queries and 
> stores will have to work things out using their a-priori knowledge of the 
> serdes.
>  
> One suggestion (from [~mjsax] ) is to come up with some kind of generic 
> "query mapping" API, which the Metered stores would use to translate back and 
> forth between typed and raw keys and values. Users would be able to supply 
> their own mappings along with their custom queries.
> Another option would be to have the Metered stores attach the serdes to the 
> query on the way down and then to the result on the way up. Then, the serdes 
> would be available in the bytes store (as part of the request) and to the 
> users when they get their results (as part of the response).
> Other options may also surface once we start playing with ideas.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13523) Implement IQv2 support in global stores

2022-04-06 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13523?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-13523:
-
Priority: Major  (was: Blocker)

> Implement IQv2 support in global stores
> ---
>
> Key: KAFKA-13523
> URL: https://issues.apache.org/jira/browse/KAFKA-13523
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 3.2.0
>Reporter: John Roesler
>Priority: Major
>  Labels: IQv2
>
> Global stores pose one significant problem for IQv2: when they start up, they 
> skip the regular ingest pipeline and instead use the "restoration" pipeline 
> to read up until the current end offset. Then, they switch over to the 
> regular ingest pipeline.
> IQv2 position tracking expects to track the position of each record from the 
> input topic through the ingest pipeline and then get the position headers 
> through the restoration pipeline via the changelog topic. The fact that 
> global stores "restore" the input topic instead of ingesting it violates our 
> expectations.
> It has also caused other problems, so we may want to consider switching the 
> global store processing to use the normal paradigm rather than adding 
> special-case logic to track positions in global stores.
>  
> Note: there are two primary reasons that global stores behave this way:
>  # We can write in batches during restoration, so the I/O may be more 
> efficient
>  # The global thread does not transition to RUNNING state until it reaches 
> the (current) end of the input topic, which blocks other threads from joining 
> against it, thereby improving the time synchronization of global KTable joins.
> If we want to propose changing the bootstrapping pipeline for global threads, 
> we should have some kind of answer to these concerns.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13523) Implement IQv2 support in global stores

2022-04-06 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13523?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-13523:
-
Affects Version/s: (was: 3.2.0)

> Implement IQv2 support in global stores
> ---
>
> Key: KAFKA-13523
> URL: https://issues.apache.org/jira/browse/KAFKA-13523
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>  Labels: IQv2
>
> Global stores pose one significant problem for IQv2: when they start up, they 
> skip the regular ingest pipeline and instead use the "restoration" pipeline 
> to read up until the current end offset. Then, they switch over to the 
> regular ingest pipeline.
> IQv2 position tracking expects to track the position of each record from the 
> input topic through the ingest pipeline and then get the position headers 
> through the restoration pipeline via the changelog topic. The fact that 
> global stores "restore" the input topic instead of ingesting it violates our 
> expectations.
> It has also caused other problems, so we may want to consider switching the 
> global store processing to use the normal paradigm rather than adding 
> special-case logic to track positions in global stores.
>  
> Note: there are two primary reasons that global stores behave this way:
>  # We can write in batches during restoration, so the I/O may be more 
> efficient
>  # The global thread does not transition to RUNNING state until it reaches 
> the (current) end of the input topic, which blocks other threads from joining 
> against it, thereby improving the time synchronization of global KTable joins.
> If we want to propose changing the bootstrapping pipeline for global threads, 
> we should have some kind of answer to these concerns.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13523) Implement IQv2 support in global stores

2022-04-06 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13523?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-13523:
-
Component/s: streams

> Implement IQv2 support in global stores
> ---
>
> Key: KAFKA-13523
> URL: https://issues.apache.org/jira/browse/KAFKA-13523
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 3.2.0
>Reporter: John Roesler
>Priority: Blocker
>  Labels: IQv2
>
> Global stores pose one significant problem for IQv2: when they start up, they 
> skip the regular ingest pipeline and instead use the "restoration" pipeline 
> to read up until the current end offset. Then, they switch over to the 
> regular ingest pipeline.
> IQv2 position tracking expects to track the position of each record from the 
> input topic through the ingest pipeline and then get the position headers 
> through the restoration pipeline via the changelog topic. The fact that 
> global stores "restore" the input topic instead of ingesting it violates our 
> expectations.
> It has also caused other problems, so we may want to consider switching the 
> global store processing to use the normal paradigm rather than adding 
> special-case logic to track positions in global stores.
>  
> Note: there are two primary reasons that global stores behave this way:
>  # We can write in batches during restoration, so the I/O may be more 
> efficient
>  # The global thread does not transition to RUNNING state until it reaches 
> the (current) end of the input topic, which blocks other threads from joining 
> against it, thereby improving the time synchronization of global KTable joins.
> If we want to propose changing the bootstrapping pipeline for global threads, 
> we should have some kind of answer to these concerns.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13523) Implement IQv2 support in global stores

2022-04-06 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13523?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-13523:
-
Parent: (was: KAFKA-13479)
Issue Type: Improvement  (was: Sub-task)

> Implement IQv2 support in global stores
> ---
>
> Key: KAFKA-13523
> URL: https://issues.apache.org/jira/browse/KAFKA-13523
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 3.2.0
>Reporter: John Roesler
>Priority: Blocker
>  Labels: IQv2
>
> Global stores pose one significant problem for IQv2: when they start up, they 
> skip the regular ingest pipeline and instead use the "restoration" pipeline 
> to read up until the current end offset. Then, they switch over to the 
> regular ingest pipeline.
> IQv2 position tracking expects to track the position of each record from the 
> input topic through the ingest pipeline and then get the position headers 
> through the restoration pipeline via the changelog topic. The fact that 
> global stores "restore" the input topic instead of ingesting it violates our 
> expectations.
> It has also caused other problems, so we may want to consider switching the 
> global store processing to use the normal paradigm rather than adding 
> special-case logic to track positions in global stores.
>  
> Note: there are two primary reasons that global stores behave this way:
>  # We can write in batches during restoration, so the I/O may be more 
> efficient
>  # The global thread does not transition to RUNNING state until it reaches 
> the (current) end of the input topic, which blocks other threads from joining 
> against it, thereby improving the time synchronization of global KTable joins.
> If we want to propose changing the bootstrapping pipeline for global threads, 
> we should have some kind of answer to these concerns.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13523) Implement IQv2 support in global stores

2022-04-06 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13523?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-13523:
-
Labels: IQv2  (was: )

> Implement IQv2 support in global stores
> ---
>
> Key: KAFKA-13523
> URL: https://issues.apache.org/jira/browse/KAFKA-13523
> Project: Kafka
>  Issue Type: Sub-task
>Affects Versions: 3.2.0
>Reporter: John Roesler
>Priority: Blocker
>  Labels: IQv2
>
> Global stores pose one significant problem for IQv2: when they start up, they 
> skip the regular ingest pipeline and instead use the "restoration" pipeline 
> to read up until the current end offset. Then, they switch over to the 
> regular ingest pipeline.
> IQv2 position tracking expects to track the position of each record from the 
> input topic through the ingest pipeline and then get the position headers 
> through the restoration pipeline via the changelog topic. The fact that 
> global stores "restore" the input topic instead of ingesting it violates our 
> expectations.
> It has also caused other problems, so we may want to consider switching the 
> global store processing to use the normal paradigm rather than adding 
> special-case logic to track positions in global stores.
>  
> Note: there are two primary reasons that global stores behave this way:
>  # We can write in batches during restoration, so the I/O may be more 
> efficient
>  # The global thread does not transition to RUNNING state until it reaches 
> the (current) end of the input topic, which blocks other threads from joining 
> against it, thereby improving the time synchronization of global KTable joins.
> If we want to propose changing the bootstrapping pipeline for global threads, 
> we should have some kind of answer to these concerns.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-13554) Rename RangeQuery to KeyRangeQuery

2022-03-24 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13554?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-13554.
--
Resolution: Won't Fix

> Rename RangeQuery to KeyRangeQuery
> --
>
> Key: KAFKA-13554
> URL: https://issues.apache.org/jira/browse/KAFKA-13554
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Minor
>
> Just to avoid confusion wrt WindowRangeQuery



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Closed] (KAFKA-13554) Rename RangeQuery to KeyRangeQuery

2022-03-24 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13554?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler closed KAFKA-13554.


> Rename RangeQuery to KeyRangeQuery
> --
>
> Key: KAFKA-13554
> URL: https://issues.apache.org/jira/browse/KAFKA-13554
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Minor
>
> Just to avoid confusion wrt WindowRangeQuery



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13554) Rename RangeQuery to KeyRangeQuery

2022-03-24 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13554?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17512096#comment-17512096
 ] 

John Roesler commented on KAFKA-13554:
--

I thought that I filed this because it had been insisted upon as a follow-on in 
the discussion thread or the PR, but I don't see any evidence of that, so I 
think we're better off leaving it alone.

 

[https://lists.apache.org/thread/4clhz43yy9nk6kkggbcn0y3v61b05sp1]

[https://github.com/apache/kafka/pull/11598]

 

> Rename RangeQuery to KeyRangeQuery
> --
>
> Key: KAFKA-13554
> URL: https://issues.apache.org/jira/browse/KAFKA-13554
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Minor
>
> Just to avoid confusion wrt WindowRangeQuery



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13554) Rename RangeQuery to KeyRangeQuery

2022-03-24 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13554?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-13554:
-
Affects Version/s: (was: 3.2.0)

> Rename RangeQuery to KeyRangeQuery
> --
>
> Key: KAFKA-13554
> URL: https://issues.apache.org/jira/browse/KAFKA-13554
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Blocker
>
> Just to avoid confusion wrt WindowRangeQuery



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13554) Rename RangeQuery to KeyRangeQuery

2022-03-24 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13554?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-13554:
-
Priority: Minor  (was: Blocker)

> Rename RangeQuery to KeyRangeQuery
> --
>
> Key: KAFKA-13554
> URL: https://issues.apache.org/jira/browse/KAFKA-13554
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Minor
>
> Just to avoid confusion wrt WindowRangeQuery



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13622) Revisit the complexity of position tracking in state stores

2022-03-24 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13622?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-13622:
-
Priority: Minor  (was: Blocker)

> Revisit the complexity of position tracking in state stores
> ---
>
> Key: KAFKA-13622
> URL: https://issues.apache.org/jira/browse/KAFKA-13622
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: John Roesler
>Priority: Minor
>
> Currently, state store implementers have a significant burden to track 
> position correctly. They have to:
>  * update the position during all puts
>  * implement the RecordBatchingStateRestoreCallback and use the 
> {color:#00}ChangelogRecordDeserializationHelper to update the position 
> based on record headers{color}
>  * {color:#00}implement some mechanism to restore the position after a 
> restart if the store is persistent (such as supply a CommitCallback to write 
> the position to a local file and then read the file during init){color}
> {color:#00}[~guozhang] pointed out during review that this is probably 
> too much responsibility (and certainly too much opportunity for error). We 
> should see what we can do to simplify these responsibilities, if not 
> eliminate them entirely from the store implementer's scope of concern.
> {color}
>  
> {color:#00}See 
> https://github.com/apache/kafka/pull/11676#discussion_r790358058{color}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13622) Revisit the complexity of position tracking in state stores

2022-03-24 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13622?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-13622:
-
Affects Version/s: (was: 3.2.0)

> Revisit the complexity of position tracking in state stores
> ---
>
> Key: KAFKA-13622
> URL: https://issues.apache.org/jira/browse/KAFKA-13622
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: John Roesler
>Priority: Blocker
>
> Currently, state store implementers have a significant burden to track 
> position correctly. They have to:
>  * update the position during all puts
>  * implement the RecordBatchingStateRestoreCallback and use the 
> {color:#00}ChangelogRecordDeserializationHelper to update the position 
> based on record headers{color}
>  * {color:#00}implement some mechanism to restore the position after a 
> restart if the store is persistent (such as supply a CommitCallback to write 
> the position to a local file and then read the file during init){color}
> {color:#00}[~guozhang] pointed out during review that this is probably 
> too much responsibility (and certainly too much opportunity for error). We 
> should see what we can do to simplify these responsibilities, if not 
> eliminate them entirely from the store implementer's scope of concern.
> {color}
>  
> {color:#00}See 
> https://github.com/apache/kafka/pull/11676#discussion_r790358058{color}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13541) Make IQv2 query/store interface type safe

2022-03-24 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-13541:
-
Affects Version/s: (was: 3.2.0)

> Make IQv2 query/store interface type safe
> -
>
> Key: KAFKA-13541
> URL: https://issues.apache.org/jira/browse/KAFKA-13541
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Patrick Stuedi
>Assignee: Patrick Stuedi
>Priority: Minor
>
> Currently the new IQv2 interface allows applications to query state stores 
> using subclasses of the Query type. Unfortunately there is currently no 
> way to check that the template type of the query matches the type of the 
> relevant store the query is executed on. As a consequence stores have to do a 
> set of unsafe casts.
> This ticket is to explore ways to make the query interface type safe where 
> only type mismatches are detected at compile time.  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (KAFKA-13554) Rename RangeQuery to KeyRangeQuery

2022-03-24 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13554?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler reassigned KAFKA-13554:


Assignee: John Roesler  (was: Vicky Papavasileiou)

> Rename RangeQuery to KeyRangeQuery
> --
>
> Key: KAFKA-13554
> URL: https://issues.apache.org/jira/browse/KAFKA-13554
> Project: Kafka
>  Issue Type: Sub-task
>Affects Versions: 3.2.0
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Blocker
>
> Just to avoid confusion wrt WindowRangeQuery



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13541) Make IQv2 query/store interface type safe

2022-03-24 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-13541:
-
Priority: Minor  (was: Blocker)

> Make IQv2 query/store interface type safe
> -
>
> Key: KAFKA-13541
> URL: https://issues.apache.org/jira/browse/KAFKA-13541
> Project: Kafka
>  Issue Type: Sub-task
>Affects Versions: 3.2.0
>Reporter: Patrick Stuedi
>Assignee: Patrick Stuedi
>Priority: Minor
>
> Currently the new IQv2 interface allows applications to query state stores 
> using subclasses of the Query type. Unfortunately there is currently no 
> way to check that the template type of the query matches the type of the 
> relevant store the query is executed on. As a consequence stores have to do a 
> set of unsafe casts.
> This ticket is to explore ways to make the query interface type safe where 
> only type mismatches are detected at compile time.  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13526) IQv2: Consider more generic logic for mapping between binary and typed queries

2022-03-24 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-13526:
-
Priority: Minor  (was: Blocker)

> IQv2: Consider more generic logic for mapping between binary and typed queries
> --
>
> Key: KAFKA-13526
> URL: https://issues.apache.org/jira/browse/KAFKA-13526
> Project: Kafka
>  Issue Type: Sub-task
>Affects Versions: 3.2.0
>Reporter: John Roesler
>Priority: Minor
>
> Right now, typed queries (like KeyQuery) need to be specially handled and 
> translated to their binary counterparts (like RawKeyQuery). This happens in 
> the Metered store layers, where the serdes are known. It is necessary because 
> lower store layers are only able to handle binary data (because they don't 
> know the serdes).
> This situation is not ideal, since the Metered store layers will grow to host 
> quite a bit of query handling and translation logic, because the relationship 
> between typed queries and binary counterparts is not obvious, and because we 
> can only automatically translate known query types. User-supplied queries and 
> stores will have to work things out using their a-priori knowledge of the 
> serdes.
>  
> One suggestion (from [~mjsax] ) is to come up with some kind of generic 
> "query mapping" API, which the Metered stores would use to translate back and 
> forth between typed and raw keys and values. Users would be able to supply 
> their own mappings along with their custom queries.
> Another option would be to have the Metered stores attach the serdes to the 
> query on the way down and then to the result on the way up. Then, the serdes 
> would be available in the bytes store (as part of the request) and to the 
> users when they get their results (as part of the response).
> Other options may also surface once we start playing with ideas.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13526) IQv2: Consider more generic logic for mapping between binary and typed queries

2022-03-24 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-13526:
-
Affects Version/s: (was: 3.2.0)

> IQv2: Consider more generic logic for mapping between binary and typed queries
> --
>
> Key: KAFKA-13526
> URL: https://issues.apache.org/jira/browse/KAFKA-13526
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: John Roesler
>Priority: Minor
>
> Right now, typed queries (like KeyQuery) need to be specially handled and 
> translated to their binary counterparts (like RawKeyQuery). This happens in 
> the Metered store layers, where the serdes are known. It is necessary because 
> lower store layers are only able to handle binary data (because they don't 
> know the serdes).
> This situation is not ideal, since the Metered store layers will grow to host 
> quite a bit of query handling and translation logic, because the relationship 
> between typed queries and binary counterparts is not obvious, and because we 
> can only automatically translate known query types. User-supplied queries and 
> stores will have to work things out using their a-priori knowledge of the 
> serdes.
>  
> One suggestion (from [~mjsax] ) is to come up with some kind of generic 
> "query mapping" API, which the Metered stores would use to translate back and 
> forth between typed and raw keys and values. Users would be able to supply 
> their own mappings along with their custom queries.
> Another option would be to have the Metered stores attach the serdes to the 
> query on the way down and then to the result on the way up. Then, the serdes 
> would be available in the bytes store (as part of the request) and to the 
> users when they get their results (as part of the response).
> Other options may also surface once we start playing with ideas.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13548) IQv2: revisit WindowKeyQuery and WindowRangeQuery

2022-03-24 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-13548:
-
Priority: Minor  (was: Blocker)

> IQv2: revisit WindowKeyQuery and WindowRangeQuery
> -
>
> Key: KAFKA-13548
> URL: https://issues.apache.org/jira/browse/KAFKA-13548
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: John Roesler
>Priority: Minor
>
> During discussion of KIP-806, there was a suggestion to refactor the queries 
> following a builder pattern so that we can compactly and flexibly specify 
> lower and upper bounds on the keys, window start times, and window end times.
> We should circle back and try to generalize the queries' interfaces before 
> the first release of IQv2.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13548) IQv2: revisit WindowKeyQuery and WindowRangeQuery

2022-03-24 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-13548:
-
Affects Version/s: (was: 3.2.0)

> IQv2: revisit WindowKeyQuery and WindowRangeQuery
> -
>
> Key: KAFKA-13548
> URL: https://issues.apache.org/jira/browse/KAFKA-13548
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: John Roesler
>Priority: Blocker
>
> During discussion of KIP-806, there was a suggestion to refactor the queries 
> following a builder pattern so that we can compactly and flexibly specify 
> lower and upper bounds on the keys, window start times, and window end times.
> We should circle back and try to generalize the queries' interfaces before 
> the first release of IQv2.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13479) Interactive Query v2

2022-03-24 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13479?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-13479:
-
Fix Version/s: (was: 3.2.0)

> Interactive Query v2
> 
>
> Key: KAFKA-13479
> URL: https://issues.apache.org/jira/browse/KAFKA-13479
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
>
> Kafka Streams supports an interesting and innovative API for "peeking" into 
> the internal state of running stateful stream processors from outside of the 
> application, called Interactive Query (IQ). This functionality has proven 
> invaluable to users over the years for everything from debugging running 
> applications to serving low latency queries straight from the Streams runtime.
> However, the actual interfaces for IQ were designed in the very early days of 
> Kafka Streams, before the project had gained significant adoption, and in the 
> absence of much precedent for this kind of API in peer projects. With the 
> benefit of hindsight, we can observe several problems with the original 
> design that we hope to address in a revised framework that will serve Streams 
> users well for many years to come.
>  
> This ticket tracks the implementation of KIP-796: 
> https://cwiki.apache.org/confluence/x/34xnCw



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-13714) Flaky test IQv2StoreIntegrationTest

2022-03-23 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13714?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-13714.
--
Fix Version/s: 3.2.0
 Assignee: John Roesler
   Resolution: Fixed

> Flaky test IQv2StoreIntegrationTest
> ---
>
> Key: KAFKA-13714
> URL: https://issues.apache.org/jira/browse/KAFKA-13714
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.2.0
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Blocker
> Fix For: 3.2.0
>
>
> I have observed multiple consistency violations in the 
> IQv2StoreIntegrationTest. Since this is the first release of IQv2, and it's 
> apparently a major flaw in the feature, we should not release with this bug 
> outstanding. Depending on the time-table, we may want to block the release or 
> pull the feature until the next release.
>  
> The first observation I have is from 23 Feb 2022. So far all observations 
> point to the range query in particular, and all observations have been for 
> RocksDB stores, including RocksDBStore, TimestampedRocksDBStore, and the 
> windowed store built on RocksDB segments.
> For reference, range queries were implemented on 16 Feb 2022: 
> [https://github.com/apache/kafka/commit/b38f6ba5cc989702180f5d5f8e55ba20444ea884]
> The window-specific range query test has also failed once that I have seen. 
> That feature was implemented on 2 Jan 2022: 
> [https://github.com/apache/kafka/commit/b8f1cf14c396ab04b8968a8fa04d8cf67dd3254c]
>  
> Here are some stack traces I have seen:
> {code:java}
> verifyStore[cache=true, log=true, supplier=ROCKS_KV, kind=PAPI]
> java.lang.AssertionError: 
> Expected: is <[1, 2, 3]>
>  but: was <[1, 2]>
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6)
>   at 
> org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQuery(IQv2StoreIntegrationTest.java:1125)
>   at 
> org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQueries(IQv2StoreIntegrationTest.java:803)
>   at 
> org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.verifyStore(IQv2StoreIntegrationTest.java:776)
>  {code}
> {code:java}
> verifyStore[cache=true, log=true, supplier=TIME_ROCKS_KV, kind=PAPI]
> java.lang.AssertionError: 
> Expected: is <[1, 2, 3]>
>  but: was <[1, 3]>
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6)
>   at 
> org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQuery(IQv2StoreIntegrationTest.java:1131)
>   at 
> org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQueries(IQv2StoreIntegrationTest.java:809)
>   at 
> org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.verifyStore(IQv2StoreIntegrationTest.java:778)
>{code}
> {code:java}
> verifyStore[cache=true, log=true, supplier=ROCKS_KV, kind=PAPI]
> java.lang.AssertionError: 
> Result:StateQueryResult{partitionResults={0=SucceededQueryResult{result=org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueTimestampedIterator@35025a0a,
>  executionInfo=[], position=Position{position={input-topic={0=1, 
> 1=SucceededQueryResult{result=org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueTimestampedIterator@38732364,
>  executionInfo=[], position=Position{position={input-topic={1=1}, 
> globalResult=null}
> Expected: is <[1, 2, 3]>
>  but: was <[1, 2]>
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
>   at 
> org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQuery(IQv2StoreIntegrationTest.java:1129)
>   at 
> org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQueries(IQv2StoreIntegrationTest.java:807)
>   at 
> org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.verifyStore(IQv2StoreIntegrationTest.java:780)
>  {code}
> {code:java}
> verifyStore[cache=true, log=false, supplier=ROCKS_WINDOW, kind=DSL] 
>     java.lang.AssertionError: 
> Result:StateQueryResult{partitionResults={0=SucceededQueryResult{result=org.apache.kafka.streams.state.internals.MeteredWindowedKeyValueIterator@2a32fb6,
>  executionInfo=[], position=Position{position={input-topic={0=1, 
> 1=SucceededQueryResult{result=org.apache.kafka.streams.state.internals.MeteredWindowedKeyValueIterator@6107165,
>  executionInfo=[], position=Position{position={input-topic={1=1}, 
> globalResult=null}
>     Expected: is <[0, 1, 2, 3]> 
>          but: was <[0, 2, 3]>
>         at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
>        

[jira] [Commented] (KAFKA-13714) Flaky test IQv2StoreIntegrationTest

2022-03-20 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17509543#comment-17509543
 ] 

John Roesler commented on KAFKA-13714:
--

Oh, before I forget, here's how I'm getting repros:
{code:java}
I=0; while ./gradlew :streams:test -Prerun-tests --tests 
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest --fail-fast; do 
(( I=$I+1 )); echo "Completed run: $I"; sleep 1; done{code}
It usually takes about 500 iterations before it shows up. This depends on a 
gradle change that I'm planning to commit to support this use case.

> Flaky test IQv2StoreIntegrationTest
> ---
>
> Key: KAFKA-13714
> URL: https://issues.apache.org/jira/browse/KAFKA-13714
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.2.0
>Reporter: John Roesler
>Priority: Blocker
>
> I have observed multiple consistency violations in the 
> IQv2StoreIntegrationTest. Since this is the first release of IQv2, and it's 
> apparently a major flaw in the feature, we should not release with this bug 
> outstanding. Depending on the time-table, we may want to block the release or 
> pull the feature until the next release.
>  
> The first observation I have is from 23 Feb 2022. So far all observations 
> point to the range query in particular, and all observations have been for 
> RocksDB stores, including RocksDBStore, TimestampedRocksDBStore, and the 
> windowed store built on RocksDB segments.
> For reference, range queries were implemented on 16 Feb 2022: 
> [https://github.com/apache/kafka/commit/b38f6ba5cc989702180f5d5f8e55ba20444ea884]
> The window-specific range query test has also failed once that I have seen. 
> That feature was implemented on 2 Jan 2022: 
> [https://github.com/apache/kafka/commit/b8f1cf14c396ab04b8968a8fa04d8cf67dd3254c]
>  
> Here are some stack traces I have seen:
> {code:java}
> verifyStore[cache=true, log=true, supplier=ROCKS_KV, kind=PAPI]
> java.lang.AssertionError: 
> Expected: is <[1, 2, 3]>
>  but: was <[1, 2]>
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6)
>   at 
> org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQuery(IQv2StoreIntegrationTest.java:1125)
>   at 
> org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQueries(IQv2StoreIntegrationTest.java:803)
>   at 
> org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.verifyStore(IQv2StoreIntegrationTest.java:776)
>  {code}
> {code:java}
> verifyStore[cache=true, log=true, supplier=TIME_ROCKS_KV, kind=PAPI]
> java.lang.AssertionError: 
> Expected: is <[1, 2, 3]>
>  but: was <[1, 3]>
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6)
>   at 
> org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQuery(IQv2StoreIntegrationTest.java:1131)
>   at 
> org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQueries(IQv2StoreIntegrationTest.java:809)
>   at 
> org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.verifyStore(IQv2StoreIntegrationTest.java:778)
>{code}
> {code:java}
> verifyStore[cache=true, log=true, supplier=ROCKS_KV, kind=PAPI]
> java.lang.AssertionError: 
> Result:StateQueryResult{partitionResults={0=SucceededQueryResult{result=org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueTimestampedIterator@35025a0a,
>  executionInfo=[], position=Position{position={input-topic={0=1, 
> 1=SucceededQueryResult{result=org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueTimestampedIterator@38732364,
>  executionInfo=[], position=Position{position={input-topic={1=1}, 
> globalResult=null}
> Expected: is <[1, 2, 3]>
>  but: was <[1, 2]>
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
>   at 
> org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQuery(IQv2StoreIntegrationTest.java:1129)
>   at 
> org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQueries(IQv2StoreIntegrationTest.java:807)
>   at 
> org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.verifyStore(IQv2StoreIntegrationTest.java:780)
>  {code}
> {code:java}
> verifyStore[cache=true, log=false, supplier=ROCKS_WINDOW, kind=DSL] 
>     java.lang.AssertionError: 
> Result:StateQueryResult{partitionResults={0=SucceededQueryResult{result=org.apache.kafka.streams.state.internals.MeteredWindowedKeyValueIterator@2a32fb6,
>  executionInfo=[], position=Position{position={input-topic={0=1, 
> 

[jira] [Commented] (KAFKA-13714) Flaky test IQv2StoreIntegrationTest

2022-03-20 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17509541#comment-17509541
 ] 

John Roesler commented on KAFKA-13714:
--

Added some more logs, and I think I'm onto something
{code:java}
verifyStore[cache=true, log=true, supplier=TIME_ROCKS_KV, kind=PAPI]

java.lang.AssertionError: 
Result:StateQueryResult{partitionResults={0=SucceededQueryResult{result=org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueTimestampedIterator@1f193686,
 executionInfo=[Handled in class 
org.apache.kafka.streams.state.internals.RocksDBTimestampedStore in 2302842ns, 
Handled in class 
org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedKeyValueBytesStore
 via WrappedStateStore in 3051842ns, Handled in class 
org.apache.kafka.streams.state.internals.CachingKeyValueStore in 3074902ns, 
Handled in class 
org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore with 
serdes org.apache.kafka.streams.state.StateSerdes@58294867 in 3956557ns], 
position=Position{position={input-topic={0=1, 
1=SucceededQueryResult{result=org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueTimestampedIterator@31e72cbc,
 executionInfo=[Handled in class 
org.apache.kafka.streams.state.internals.RocksDBTimestampedStore in 415148ns, 
Handled in class 
org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedKeyValueBytesStore
 via WrappedStateStore in 1033935ns, Handled in class 
org.apache.kafka.streams.state.internals.CachingKeyValueStore in 1053899ns, 
Handled in class 
org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore with 
serdes org.apache.kafka.streams.state.StateSerdes@67c277a0 in 1106865ns], 
position=Position{position={input-topic={1=1}, globalResult=null}
Expected: is <[1, 2, 3]>
 but: was <[1, 2]>
at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
at 
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQuery(IQv2StoreIntegrationTest.java:1140)
at 
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQueries(IQv2StoreIntegrationTest.java:818)
at 
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.verifyStore(IQv2StoreIntegrationTest.java:782)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at org.junit.runners.Suite.runChild(Suite.java:128)
at org.junit.runners.Suite.runChild(Suite.java:27)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at 

[jira] [Commented] (KAFKA-13714) Flaky test IQv2StoreIntegrationTest

2022-03-20 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17509379#comment-17509379
 ] 

John Roesler commented on KAFKA-13714:
--

Another:
{code:java}
verifyStore[cache=true, log=true, supplier=TIME_ROCKS_KV, kind=PAPI]

java.lang.AssertionError: 
Result:StateQueryResult{partitionResults={0=SucceededQueryResult{result=org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueTimestampedIterator@72e789cb,
 executionInfo=[Handled in class 
org.apache.kafka.streams.state.internals.RocksDBTimestampedStore in 87900ns, 
Handled in class 
org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedKeyValueBytesStore
 via WrappedStateStore in 366097ns, Handled in class 
org.apache.kafka.streams.state.internals.CachingKeyValueStore in 373038ns, 
Handled in class 
org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore with 
serdes org.apache.kafka.streams.state.StateSerdes@627d8516 in 400408ns], 
position=Position{position={input-topic={0=1, 
1=SucceededQueryResult{result=org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueTimestampedIterator@7c1812b3,
 executionInfo=[Handled in class 
org.apache.kafka.streams.state.internals.RocksDBTimestampedStore in 27551ns, 
Handled in class 
org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedKeyValueBytesStore
 via WrappedStateStore in 406916ns, Handled in class 
org.apache.kafka.streams.state.internals.CachingKeyValueStore in 413227ns, 
Handled in class 
org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore with 
serdes org.apache.kafka.streams.state.StateSerdes@5c10285a in 427044ns], 
position=Position{position={input-topic={1=1}, globalResult=null}
Expected: is <[1, 2, 3]>
 but: was <[1, 3]>
at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
at 
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQuery(IQv2StoreIntegrationTest.java:1140)
at 
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQueries(IQv2StoreIntegrationTest.java:818)
at 
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.verifyStore(IQv2StoreIntegrationTest.java:782)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at org.junit.runners.Suite.runChild(Suite.java:128)
at org.junit.runners.Suite.runChild(Suite.java:27)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at 

[jira] [Commented] (KAFKA-13714) Flaky test IQv2StoreIntegrationTest

2022-03-19 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17509366#comment-17509366
 ] 

John Roesler commented on KAFKA-13714:
--

Continuing debugging:
h3. verifyStore[cache=true, log=true, supplier=TIME_ROCKS_KV, kind=PAPI]
{code:java}
java.lang.AssertionError: 
Result:StateQueryResult{partitionResults={0=SucceededQueryResult{result=org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueTimestampedIterator@1f193686,
 executionInfo=[Handled in class 
org.apache.kafka.streams.state.internals.RocksDBTimestampedStore in 2335793ns, 
Handled in class 
org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedKeyValueBytesStore
 via WrappedStateStore in 3045186ns, Handled in class 
org.apache.kafka.streams.state.internals.CachingKeyValueStore in 3068465ns, 
Handled in class 
org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore with 
serdes org.apache.kafka.streams.state.StateSerdes@58294867 in 3974765ns], 
position=Position{position={input-topic={0=1, 
1=SucceededQueryResult{result=org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueTimestampedIterator@31e72cbc,
 executionInfo=[Handled in class 
org.apache.kafka.streams.state.internals.RocksDBTimestampedStore in 112183ns, 
Handled in class 
org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedKeyValueBytesStore
 via WrappedStateStore in 775416ns, Handled in class 
org.apache.kafka.streams.state.internals.CachingKeyValueStore in 796244ns, 
Handled in class 
org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore with 
serdes org.apache.kafka.streams.state.StateSerdes@67c277a0 in 849835ns], 
position=Position{position={input-topic={1=1}, globalResult=null}
Expected: is <[1, 2, 3]>
 but: was <[1, 2]>
at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
at 
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQuery(IQv2StoreIntegrationTest.java:1140)
at 
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQueries(IQv2StoreIntegrationTest.java:818)
at 
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.verifyStore(IQv2StoreIntegrationTest.java:782)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at org.junit.runners.Suite.runChild(Suite.java:128)
at org.junit.runners.Suite.runChild(Suite.java:27)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at 

[jira] [Comment Edited] (KAFKA-13714) Flaky test IQv2StoreIntegrationTest

2022-03-17 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17508386#comment-17508386
 ] 

John Roesler edited comment on KAFKA-13714 at 3/17/22, 7:32 PM:


Another local repro:

 
{code:java}
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest > 
verifyStore[cache=true, log=true, supplier=TIME_ROCKS_KV, kind=PAPI] FAILED
    java.lang.AssertionError: Result:StateQueryResult{partitionResults={

0=SucceededQueryResult{

result=org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueTimestampedIterator@3f702946,
 
executionInfo=[
  Handled in class 
org.apache.kafka.streams.state.internals.RocksDBTimestampedStore in 1153925ns, 
  Handled in class 
org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedKeyValueBytesStore
 via WrappedStateStore in 1165952ns, 
  Handled in class 
org.apache.kafka.streams.state.internals.CachingKeyValueStore in 1181616ns, 
  Handled in class 
org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore with 
serdes org.apache.kafka.streams.state.StateSerdes@278667fd in 1260365ns
], 
position=Position{position={input-topic={0=1, 

1=SucceededQueryResult{

result=org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueTimestampedIterator@42b6d0cc,

executionInfo=[
  Handled in class 
org.apache.kafka.streams.state.internals.RocksDBTimestampedStore in 109311ns, 
  Handled in class 
org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedKeyValueBytesStore
 via WrappedStateStore in 116767ns, 
  Handled in class 
org.apache.kafka.streams.state.internals.CachingKeyValueStore in 128961ns, 
  Handled in class 
org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore with 
serdes org.apache.kafka.streams.state.StateSerdes@684b31de in 185521ns
],
position=Position{position={input-topic={1=1}, 

globalResult=null}
    Expected: is <[1, 2, 3]>
         but: was <[1, 2]>
        at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
        at 
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQuery(IQv2StoreIntegrationTest.java:1129)
        at 
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQueries(IQv2StoreIntegrationTest.java:807)
        at 
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.verifyStore(IQv2StoreIntegrationTest.java:776)
 

{code}
 

logs:
{code:java}
[2022-03-17 07:31:56,286] INFO stream-client 
[app-org.apache.kafka.streams.integration.IQv2StoreIntegrationTest-true-true-TIME_ROCKS_KV-PAPI-1159953973-83d014b6-61fa-4ba5-a7b3-f035b42c2138]
 Kafka Streams version: test-version (org.apache.kafka.streams.KafkaStreams:912)
[2022-03-17 07:31:56,286] INFO stream-client 
[app-org.apache.kafka.streams.integration.IQv2StoreIntegrationTest-true-true-TIME_ROCKS_KV-PAPI-1159953973-83d014b6-61fa-4ba5-a7b3-f035b42c2138]
 Kafka Streams commit ID: test-commit-ID 
(org.apache.kafka.streams.KafkaStreams:913)
[2022-03-17 07:31:56,288] INFO stream-thread 
[app-org.apache.kafka.streams.integration.IQv2StoreIntegrationTest-true-true-TIME_ROCKS_KV-PAPI-1159953973-83d014b6-61fa-4ba5-a7b3-f035b42c2138-StreamThread-1]
 Creating restore consumer client 
(org.apache.kafka.streams.processor.internals.StreamThread:346)
[2022-03-17 07:31:56,295] INFO stream-thread 
[app-org.apache.kafka.streams.integration.IQv2StoreIntegrationTest-true-true-TIME_ROCKS_KV-PAPI-1159953973-83d014b6-61fa-4ba5-a7b3-f035b42c2138-StreamThread-1]
 Creating thread producer client 
(org.apache.kafka.streams.processor.internals.StreamThread:105)
[2022-03-17 07:31:56,297] INFO [Producer 
clientId=app-org.apache.kafka.streams.integration.IQv2StoreIntegrationTest-true-true-TIME_ROCKS_KV-PAPI-1159953973-83d014b6-61fa-4ba5-a7b3-f035b42c2138-StreamThread-1-producer]
 Instantiated an idempotent producer. 
(org.apache.kafka.clients.producer.KafkaProducer:532)
[2022-03-17 07:31:56,304] INFO stream-thread 
[app-org.apache.kafka.streams.integration.IQv2StoreIntegrationTest-true-true-TIME_ROCKS_KV-PAPI-1159953973-83d014b6-61fa-4ba5-a7b3-f035b42c2138-StreamThread-1]
 Creating consumer client 
(org.apache.kafka.streams.processor.internals.StreamThread:397)
[2022-03-17 07:31:56,308] INFO stream-thread 
[app-org.apache.kafka.streams.integration.IQv2StoreIntegrationTest-true-true-TIME_ROCKS_KV-PAPI-1159953973-83d014b6-61fa-4ba5-a7b3-f035b42c2138-StreamThread-1-consumer]
 Cooperative rebalancing protocol is enabled now 
(org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration:126)
[2022-03-17 07:31:56,308] INFO [Producer 
clientId=app-org.apache.kafka.streams.integration.IQv2StoreIntegrationTest-true-true-TIME_ROCKS_KV-PAPI-1159953973-83d014b6-61fa-4ba5-a7b3-f035b42c2138-StreamThread-1-producer]
 Cluster ID: iZBZzURBQr6rMZEB6oxg7g (org.apache.kafka.clients.Metadata:287)
[2022-03-17 07:31:56,309] INFO 

[jira] [Commented] (KAFKA-13714) Flaky test IQv2StoreIntegrationTest

2022-03-17 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17508386#comment-17508386
 ] 

John Roesler commented on KAFKA-13714:
--

Another local repro:

 
{code:java}
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest > 
verifyStore[cache=true, log=true, supplier=TIME_ROCKS_KV, kind=PAPI] FAILED
    java.lang.AssertionError: Result:StateQueryResult{partitionResults={

0=SucceededQueryResult{

result=org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueTimestampedIterator@3f702946,
 
executionInfo=[
  Handled in class 
org.apache.kafka.streams.state.internals.RocksDBTimestampedStore in 1153925ns, 
  Handled in class 
org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedKeyValueBytesStore
 via WrappedStateStore in 1165952ns, 
  Handled in class 
org.apache.kafka.streams.state.internals.CachingKeyValueStore in 1181616ns, 
  Handled in class 
org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore with 
serdes org.apache.kafka.streams.state.StateSerdes@278667fd in 1260365ns
], 
position=Position{position={input-topic={0=1, 

1=SucceededQueryResult{

result=org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueTimestampedIterator@42b6d0cc,

executionInfo=[
  Handled in class 
org.apache.kafka.streams.state.internals.RocksDBTimestampedStore in 109311ns, 
  Handled in class 
org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedKeyValueBytesStore
 via WrappedStateStore in 116767ns, 
  Handled in class 
org.apache.kafka.streams.state.internals.CachingKeyValueStore in 128961ns, 
  Handled in class 
org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore with 
serdes org.apache.kafka.streams.state.StateSerdes@684b31de in 185521ns
],
position=Position{position={input-topic={1=1}, 

globalResult=null}
    Expected: is <[1, 2, 3]>
         but: was <[1, 2]>
        at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
        at 
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQuery(IQv2StoreIntegrationTest.java:1129)
        at 
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQueries(IQv2StoreIntegrationTest.java:807)
        at 
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.verifyStore(IQv2StoreIntegrationTest.java:776)
 {code}

> Flaky test IQv2StoreIntegrationTest
> ---
>
> Key: KAFKA-13714
> URL: https://issues.apache.org/jira/browse/KAFKA-13714
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.2.0
>Reporter: John Roesler
>Priority: Blocker
>
> I have observed multiple consistency violations in the 
> IQv2StoreIntegrationTest. Since this is the first release of IQv2, and it's 
> apparently a major flaw in the feature, we should not release with this bug 
> outstanding. Depending on the time-table, we may want to block the release or 
> pull the feature until the next release.
>  
> The first observation I have is from 23 Feb 2022. So far all observations 
> point to the range query in particular, and all observations have been for 
> RocksDB stores, including RocksDBStore, TimestampedRocksDBStore, and the 
> windowed store built on RocksDB segments.
> For reference, range queries were implemented on 16 Feb 2022: 
> [https://github.com/apache/kafka/commit/b38f6ba5cc989702180f5d5f8e55ba20444ea884]
> The window-specific range query test has also failed once that I have seen. 
> That feature was implemented on 2 Jan 2022: 
> [https://github.com/apache/kafka/commit/b8f1cf14c396ab04b8968a8fa04d8cf67dd3254c]
>  
> Here are some stack traces I have seen:
> {code:java}
> verifyStore[cache=true, log=true, supplier=ROCKS_KV, kind=PAPI]
> java.lang.AssertionError: 
> Expected: is <[1, 2, 3]>
>  but: was <[1, 2]>
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6)
>   at 
> org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQuery(IQv2StoreIntegrationTest.java:1125)
>   at 
> org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQueries(IQv2StoreIntegrationTest.java:803)
>   at 
> org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.verifyStore(IQv2StoreIntegrationTest.java:776)
>  {code}
> {code:java}
> verifyStore[cache=true, log=true, supplier=TIME_ROCKS_KV, kind=PAPI]
> java.lang.AssertionError: 
> Expected: is <[1, 2, 3]>
>  but: was <[1, 3]>
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6)
>   at 
> 

[jira] [Created] (KAFKA-13714) Flaky test IQv2StoreIntegrationTest

2022-03-07 Thread John Roesler (Jira)
John Roesler created KAFKA-13714:


 Summary: Flaky test IQv2StoreIntegrationTest
 Key: KAFKA-13714
 URL: https://issues.apache.org/jira/browse/KAFKA-13714
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.2.0
Reporter: John Roesler


I have observed multiple consistency violations in the 
IQv2StoreIntegrationTest. Since this is the first release of IQv2, and it's 
apparently a major flaw in the feature, we should not release with this bug 
outstanding. Depending on the time-table, we may want to block the release or 
pull the feature until the next release.

 

The first observation I have is from 23 Feb 2022. So far all observations point 
to the range query in particular, and all observations have been for RocksDB 
stores, including RocksDBStore, TimestampedRocksDBStore, and the windowed store 
built on RocksDB segments.

For reference, range queries were implemented on 16 Feb 2022: 
[https://github.com/apache/kafka/commit/b38f6ba5cc989702180f5d5f8e55ba20444ea884]

The window-specific range query test has also failed once that I have seen. 
That feature was implemented on 2 Jan 2022: 
[https://github.com/apache/kafka/commit/b8f1cf14c396ab04b8968a8fa04d8cf67dd3254c]

 

Here are some stack traces I have seen:
{code:java}
verifyStore[cache=true, log=true, supplier=ROCKS_KV, kind=PAPI]

java.lang.AssertionError: 
Expected: is <[1, 2, 3]>
 but: was <[1, 2]>
at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6)
at 
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQuery(IQv2StoreIntegrationTest.java:1125)
at 
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQueries(IQv2StoreIntegrationTest.java:803)
at 
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.verifyStore(IQv2StoreIntegrationTest.java:776)
 {code}
{code:java}
verifyStore[cache=true, log=true, supplier=TIME_ROCKS_KV, kind=PAPI]

java.lang.AssertionError: 
Expected: is <[1, 2, 3]>
 but: was <[1, 3]>
at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6)
at 
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQuery(IQv2StoreIntegrationTest.java:1131)
at 
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQueries(IQv2StoreIntegrationTest.java:809)
at 
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.verifyStore(IQv2StoreIntegrationTest.java:778)
 {code}
{code:java}
verifyStore[cache=true, log=true, supplier=ROCKS_KV, kind=PAPI]

java.lang.AssertionError: 
Result:StateQueryResult{partitionResults={0=SucceededQueryResult{result=org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueTimestampedIterator@35025a0a,
 executionInfo=[], position=Position{position={input-topic={0=1, 
1=SucceededQueryResult{result=org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueTimestampedIterator@38732364,
 executionInfo=[], position=Position{position={input-topic={1=1}, 
globalResult=null}
Expected: is <[1, 2, 3]>
 but: was <[1, 2]>
at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
at 
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQuery(IQv2StoreIntegrationTest.java:1129)
at 
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQueries(IQv2StoreIntegrationTest.java:807)
at 
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.verifyStore(IQv2StoreIntegrationTest.java:780)
 {code}
{code:java}
verifyStore[cache=true, log=false, supplier=ROCKS_WINDOW, kind=DSL] 

    java.lang.AssertionError: 
Result:StateQueryResult{partitionResults={0=SucceededQueryResult{result=org.apache.kafka.streams.state.internals.MeteredWindowedKeyValueIterator@2a32fb6,
 executionInfo=[], position=Position{position={input-topic={0=1, 
1=SucceededQueryResult{result=org.apache.kafka.streams.state.internals.MeteredWindowedKeyValueIterator@6107165,
 executionInfo=[], position=Position{position={input-topic={1=1}, 
globalResult=null}
    Expected: is <[0, 1, 2, 3]> 
         but: was <[0, 2, 3]>
        at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
        at 
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleWindowRangeQuery(IQv2StoreIntegrationTest.java:1234)
        at 
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleWindowRangeQueries(IQv2StoreIntegrationTest.java:880)
        at 
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.verifyStore(IQv2StoreIntegrationTest.java:793)
 {code}
 

Some observations:
 * After I added the 

[jira] [Commented] (KAFKA-13600) Rebalances while streams is in degraded state can cause stores to be reassigned and restore from scratch

2022-02-03 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17486714#comment-17486714
 ] 

John Roesler commented on KAFKA-13600:
--

Hi [~tim.patterson] , thanks for the report and the patch!

 

It sounds like you're reporting two things here:
 # a bug around the acceptable recovery lag.
 # an improvement on assignment balance

If we can discuss those things independently, then we can definitely merge the 
bugfix immediately. Depending on the impact of the improvement, it might also 
fall into the category of a simple ticket, or it might be more appropriate to 
have a KIP as [~cadonna] suggested.

Regarding the bug, I find it completely plausible that we have a bug, but I 
have to confess that I'm not 100% sure I understand the report. Is the 
situation that there's an active that's happens to be processing quite a bit 
ahead of the replicas, such that when the active goes offline, there's no 
"caught-up" node, and instead of failing the task over to the least-lagging 
node, we just assign it to a fresh node? If that's it, then it is certainly not 
the desired behavior.

The notion of acceptableRecoveryLag was introduced because follower replicas 
will always lag the active task, by definition. We want task ownership to be 
able to swap over from the active to a warm-up when it's caught up, but it will 
never be 100% caught up (because it is a follower until it takes over). 
acceptableRecoveryLag is a way to define a small amount of lag that 
"acceptable" so that when a warm-up is only lagging by that amount, we can 
consider it to be effectively caught up and move the active to the warm-up node.

As you can see, this has nothing at all to do with which nodes are eligible to 
take over when an active exits the cluster. In that case, it was always the 
intent that the most-caught-up node should take over active processing, 
regardless of its lag.

I've been squinting at our existing code, and also your patch 
([https://github.com/apache/kafka/commit/a4b622685423fbfd68b1291dad85cc1f44b086f1)]
 . It looks to me like the flaw in the existing implementation is essentially 
just here:

[https://github.com/apache/kafka/commit/a4b622685423fbfd68b1291dad85cc1f44b086f1#diff-83a301514ee18b410df40a91595f6f1afd51f6152ff813b5789516cf5c3605baL92-L96]
{code:java}
// if the desired client is not caught up, and there is another client that 
_is_ caught up, then
// we schedule a movement, so we can move the active task to the caught-up 
client. We'll try to
// assign a warm-up to the desired client so that we can move it later on.{code}
which should indeed be just like what you described:
{code:java}
// if the desired client is not caught up, and there is another client that 
_is_ more caught up,
// then we schedule a movement [to] move the active task to the [most] 
caught-up client. 
// We'll try to assign a warm-up to the desired client so that we can move it 
later on.{code}
On the other hand, we should not lose this important predicate to determine 
whether a task is considered "caught up:

[https://github.com/apache/kafka/commit/a4b622685423fbfd68b1291dad85cc1f44b086f1#diff-e50a755ba2a4d2f7306d1016d079018cba22f9f32993ef5dd64408d1a94d79acL245]
{code:java}
activeRunning(taskLag) || unbounded(acceptableRecoveryLag) || 
acceptable(acceptableRecoveryLag, taskLag) {code}
This captures a couple of subtleties in addition to the obvious "a task is 
caught up if it's under the acceptable recovery lag":
 # A running, active task doesn't have a real lag at all, but instead its "lag" 
is the sentinel value `-2`
 # You can disable the "warm up" phase completely by setting 
acceptableRecoveryLag to `Long.MAX_VALUE`, in which case, we ignore lags 
completely and consider all nodes to be caught up, even if they didn't report a 
lag at all.

 

One extra thing I like about your patch is this:

[https://github.com/apache/kafka/commit/a4b622685423fbfd68b1291dad85cc1f44b086f1#diff-83a301514ee18b410df40a91595f6f1afd51f6152ff813b5789516cf5c3605baR54-R56]
{code:java}
// Even if there is a more caught up client, as long as we're within allowable 
lag then
// its best just to stick with what we've got {code}
I agree that, if two nodes are within the acceptableRecoveryLag of each other, 
we should consider their lags to be effectively the same. That's something I 
wanted to do when we wrote this code, but couldn't figure out a good way to do 
it.

 

One thing I'd need more time on is the TaskMovementTest. At first glance, it 
looks like those changes are just about the slightly different method 
signature, but I'd want to be very sure that we're still testing the same 
invariants that we wanted to test.

Would you be willing to submit this bugfix as a PR so that we can formally 
review and merge it?

> Rebalances while streams is in degraded state can cause stores to be 
> reassigned and restore from scratch
> 

[jira] [Assigned] (KAFKA-13600) Rebalances while streams is in degraded state can cause stores to be reassigned and restore from scratch

2022-02-03 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler reassigned KAFKA-13600:


Assignee: (was: John Roesler)

> Rebalances while streams is in degraded state can cause stores to be 
> reassigned and restore from scratch
> 
>
> Key: KAFKA-13600
> URL: https://issues.apache.org/jira/browse/KAFKA-13600
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.1.0, 2.8.1, 3.0.0
>Reporter: Tim Patterson
>Priority: Major
>
> Consider this scenario:
>  # A node is lost from the cluster.
>  # A rebalance is kicked off with a new "target assignment"'s(ie the 
> rebalance is attempting to move a lot of tasks - see 
> https://issues.apache.org/jira/browse/KAFKA-10121).
>  # The kafka cluster is now a bit more sluggish from the increased load.
>  # A Rolling Deploy happens triggering rebalances, during the rebalance 
> processing continues but offsets can't be committed(Or nodes are restarted 
> but fail to commit offsets)
>  # The most caught up nodes now aren't within `acceptableRecoveryLag` and so 
> the task is started in it's "target assignment" location, restoring all state 
> from scratch and delaying further processing instead of using the "almost 
> caught up" node.
> We've hit this a few times and having lots of state (~25TB worth) and being 
> heavy users of IQ this is not ideal for us.
> While we can increase `acceptableRecoveryLag` to larger values to try get 
> around this that causes other issues (ie a warmup becoming active when its 
> still quite far behind)
> The solution seems to be to balance "balanced assignment" with "most caught 
> up nodes".
> We've got a fork where we do just this and it's made a huge difference to the 
> reliability of our cluster.
> Our change is to simply use the most caught up node if the "target node" is 
> more than `acceptableRecoveryLag` behind.
> This gives up some of the load balancing type behaviour of the existing code 
> but in practise doesn't seem to matter too much.
> I guess maybe an algorithm that identified candidate nodes as those being 
> within `acceptableRecoveryLag` of the most caught up node might allow the 
> best of both worlds.
>  
> Our fork is
> [https://github.com/apache/kafka/compare/trunk...tim-patterson:fix_balance_uncaughtup?expand=1]
> (We also moved the capacity constraint code to happen after all the stateful 
> assignment to prioritise standby tasks over warmup tasks)
> Ideally we don't want to maintain a fork of kafka streams going forward so 
> are hoping to get a bit of discussion / agreement on the best way to handle 
> this.
> More than happy to contribute code/test different algo's in production system 
> or anything else to help with this issue



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (KAFKA-13600) Rebalances while streams is in degraded state can cause stores to be reassigned and restore from scratch

2022-02-03 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler reassigned KAFKA-13600:


Assignee: John Roesler

> Rebalances while streams is in degraded state can cause stores to be 
> reassigned and restore from scratch
> 
>
> Key: KAFKA-13600
> URL: https://issues.apache.org/jira/browse/KAFKA-13600
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.1, 3.0.0
>Reporter: Tim Patterson
>Assignee: John Roesler
>Priority: Major
>
> Consider this scenario:
>  # A node is lost from the cluster.
>  # A rebalance is kicked off with a new "target assignment"'s(ie the 
> rebalance is attempting to move a lot of tasks - see 
> https://issues.apache.org/jira/browse/KAFKA-10121).
>  # The kafka cluster is now a bit more sluggish from the increased load.
>  # A Rolling Deploy happens triggering rebalances, during the rebalance 
> processing continues but offsets can't be committed(Or nodes are restarted 
> but fail to commit offsets)
>  # The most caught up nodes now aren't within `acceptableRecoveryLag` and so 
> the task is started in it's "target assignment" location, restoring all state 
> from scratch and delaying further processing instead of using the "almost 
> caught up" node.
> We've hit this a few times and having lots of state (~25TB worth) and being 
> heavy users of IQ this is not ideal for us.
> While we can increase `acceptableRecoveryLag` to larger values to try get 
> around this that causes other issues (ie a warmup becoming active when its 
> still quite far behind)
> The solution seems to be to balance "balanced assignment" with "most caught 
> up nodes".
> We've got a fork where we do just this and it's made a huge difference to the 
> reliability of our cluster.
> Our change is to simply use the most caught up node if the "target node" is 
> more than `acceptableRecoveryLag` behind.
> This gives up some of the load balancing type behaviour of the existing code 
> but in practise doesn't seem to matter too much.
> I guess maybe an algorithm that identified candidate nodes as those being 
> within `acceptableRecoveryLag` of the most caught up node might allow the 
> best of both worlds.
>  
> Our fork is
> [https://github.com/apache/kafka/compare/trunk...tim-patterson:fix_balance_uncaughtup?expand=1]
> (We also moved the capacity constraint code to happen after all the stateful 
> assignment to prioritise standby tasks over warmup tasks)
> Ideally we don't want to maintain a fork of kafka streams going forward so 
> are hoping to get a bit of discussion / agreement on the best way to handle 
> this.
> More than happy to contribute code/test different algo's in production system 
> or anything else to help with this issue



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13600) Rebalances while streams is in degraded state can cause stores to be reassigned and restore from scratch

2022-02-03 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-13600:
-
Affects Version/s: 3.1.0

> Rebalances while streams is in degraded state can cause stores to be 
> reassigned and restore from scratch
> 
>
> Key: KAFKA-13600
> URL: https://issues.apache.org/jira/browse/KAFKA-13600
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.1.0, 2.8.1, 3.0.0
>Reporter: Tim Patterson
>Assignee: John Roesler
>Priority: Major
>
> Consider this scenario:
>  # A node is lost from the cluster.
>  # A rebalance is kicked off with a new "target assignment"'s(ie the 
> rebalance is attempting to move a lot of tasks - see 
> https://issues.apache.org/jira/browse/KAFKA-10121).
>  # The kafka cluster is now a bit more sluggish from the increased load.
>  # A Rolling Deploy happens triggering rebalances, during the rebalance 
> processing continues but offsets can't be committed(Or nodes are restarted 
> but fail to commit offsets)
>  # The most caught up nodes now aren't within `acceptableRecoveryLag` and so 
> the task is started in it's "target assignment" location, restoring all state 
> from scratch and delaying further processing instead of using the "almost 
> caught up" node.
> We've hit this a few times and having lots of state (~25TB worth) and being 
> heavy users of IQ this is not ideal for us.
> While we can increase `acceptableRecoveryLag` to larger values to try get 
> around this that causes other issues (ie a warmup becoming active when its 
> still quite far behind)
> The solution seems to be to balance "balanced assignment" with "most caught 
> up nodes".
> We've got a fork where we do just this and it's made a huge difference to the 
> reliability of our cluster.
> Our change is to simply use the most caught up node if the "target node" is 
> more than `acceptableRecoveryLag` behind.
> This gives up some of the load balancing type behaviour of the existing code 
> but in practise doesn't seem to matter too much.
> I guess maybe an algorithm that identified candidate nodes as those being 
> within `acceptableRecoveryLag` of the most caught up node might allow the 
> best of both worlds.
>  
> Our fork is
> [https://github.com/apache/kafka/compare/trunk...tim-patterson:fix_balance_uncaughtup?expand=1]
> (We also moved the capacity constraint code to happen after all the stateful 
> assignment to prioritise standby tasks over warmup tasks)
> Ideally we don't want to maintain a fork of kafka streams going forward so 
> are hoping to get a bit of discussion / agreement on the best way to handle 
> this.
> More than happy to contribute code/test different algo's in production system 
> or anything else to help with this issue



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13523) Implement IQv2 support in global stores

2022-01-26 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13523?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-13523:
-
Affects Version/s: 3.2.0

> Implement IQv2 support in global stores
> ---
>
> Key: KAFKA-13523
> URL: https://issues.apache.org/jira/browse/KAFKA-13523
> Project: Kafka
>  Issue Type: Sub-task
>Affects Versions: 3.2.0
>Reporter: John Roesler
>Priority: Blocker
>
> Global stores pose one significant problem for IQv2: when they start up, they 
> skip the regular ingest pipeline and instead use the "restoration" pipeline 
> to read up until the current end offset. Then, they switch over to the 
> regular ingest pipeline.
> IQv2 position tracking expects to track the position of each record from the 
> input topic through the ingest pipeline and then get the position headers 
> through the restoration pipeline via the changelog topic. The fact that 
> global stores "restore" the input topic instead of ingesting it violates our 
> expectations.
> It has also caused other problems, so we may want to consider switching the 
> global store processing to use the normal paradigm rather than adding 
> special-case logic to track positions in global stores.
>  
> Note: there are two primary reasons that global stores behave this way:
>  # We can write in batches during restoration, so the I/O may be more 
> efficient
>  # The global thread does not transition to RUNNING state until it reaches 
> the (current) end of the input topic, which blocks other threads from joining 
> against it, thereby improving the time synchronization of global KTable joins.
> If we want to propose changing the bootstrapping pipeline for global threads, 
> we should have some kind of answer to these concerns.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13526) IQv2: Consider more generic logic for mapping between binary and typed queries

2022-01-26 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-13526:
-
Priority: Blocker  (was: Major)

> IQv2: Consider more generic logic for mapping between binary and typed queries
> --
>
> Key: KAFKA-13526
> URL: https://issues.apache.org/jira/browse/KAFKA-13526
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: John Roesler
>Priority: Blocker
>
> Right now, typed queries (like KeyQuery) need to be specially handled and 
> translated to their binary counterparts (like RawKeyQuery). This happens in 
> the Metered store layers, where the serdes are known. It is necessary because 
> lower store layers are only able to handle binary data (because they don't 
> know the serdes).
> This situation is not ideal, since the Metered store layers will grow to host 
> quite a bit of query handling and translation logic, because the relationship 
> between typed queries and binary counterparts is not obvious, and because we 
> can only automatically translate known query types. User-supplied queries and 
> stores will have to work things out using their a-priori knowledge of the 
> serdes.
>  
> One suggestion (from [~mjsax] ) is to come up with some kind of generic 
> "query mapping" API, which the Metered stores would use to translate back and 
> forth between typed and raw keys and values. Users would be able to supply 
> their own mappings along with their custom queries.
> Another option would be to have the Metered stores attach the serdes to the 
> query on the way down and then to the result on the way up. Then, the serdes 
> would be available in the bytes store (as part of the request) and to the 
> users when they get their results (as part of the response).
> Other options may also surface once we start playing with ideas.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13548) IQv2: revisit WindowKeyQuery and WindowRangeQuery

2022-01-26 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-13548:
-
Priority: Blocker  (was: Major)

> IQv2: revisit WindowKeyQuery and WindowRangeQuery
> -
>
> Key: KAFKA-13548
> URL: https://issues.apache.org/jira/browse/KAFKA-13548
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: John Roesler
>Priority: Blocker
>
> During discussion of KIP-806, there was a suggestion to refactor the queries 
> following a builder pattern so that we can compactly and flexibly specify 
> lower and upper bounds on the keys, window start times, and window end times.
> We should circle back and try to generalize the queries' interfaces before 
> the first release of IQv2.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13622) Revisit the complexity of position tracking in state stores

2022-01-26 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13622?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-13622:
-
Affects Version/s: 3.2.0

> Revisit the complexity of position tracking in state stores
> ---
>
> Key: KAFKA-13622
> URL: https://issues.apache.org/jira/browse/KAFKA-13622
> Project: Kafka
>  Issue Type: Sub-task
>Affects Versions: 3.2.0
>Reporter: John Roesler
>Priority: Blocker
>
> Currently, state store implementers have a significant burden to track 
> position correctly. They have to:
>  * update the position during all puts
>  * implement the RecordBatchingStateRestoreCallback and use the 
> {color:#00}ChangelogRecordDeserializationHelper to update the position 
> based on record headers{color}
>  * {color:#00}implement some mechanism to restore the position after a 
> restart if the store is persistent (such as supply a CommitCallback to write 
> the position to a local file and then read the file during init){color}
> {color:#00}[~guozhang] pointed out during review that this is probably 
> too much responsibility (and certainly too much opportunity for error). We 
> should see what we can do to simplify these responsibilities, if not 
> eliminate them entirely from the store implementer's scope of concern.
> {color}
>  
> {color:#00}See 
> https://github.com/apache/kafka/pull/11676#discussion_r790358058{color}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13541) Make IQv2 query/store interface type safe

2022-01-26 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-13541:
-
Affects Version/s: 3.2.0

> Make IQv2 query/store interface type safe
> -
>
> Key: KAFKA-13541
> URL: https://issues.apache.org/jira/browse/KAFKA-13541
> Project: Kafka
>  Issue Type: Sub-task
>Affects Versions: 3.2.0
>Reporter: Patrick Stuedi
>Assignee: Patrick Stuedi
>Priority: Blocker
>
> Currently the new IQv2 interface allows applications to query state stores 
> using subclasses of the Query type. Unfortunately there is currently no 
> way to check that the template type of the query matches the type of the 
> relevant store the query is executed on. As a consequence stores have to do a 
> set of unsafe casts.
> This ticket is to explore ways to make the query interface type safe where 
> only type mismatches are detected at compile time.  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13541) Make IQv2 query/store interface type safe

2022-01-26 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-13541:
-
Priority: Blocker  (was: Major)

> Make IQv2 query/store interface type safe
> -
>
> Key: KAFKA-13541
> URL: https://issues.apache.org/jira/browse/KAFKA-13541
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Patrick Stuedi
>Assignee: Patrick Stuedi
>Priority: Blocker
>
> Currently the new IQv2 interface allows applications to query state stores 
> using subclasses of the Query type. Unfortunately there is currently no 
> way to check that the template type of the query matches the type of the 
> relevant store the query is executed on. As a consequence stores have to do a 
> set of unsafe casts.
> This ticket is to explore ways to make the query interface type safe where 
> only type mismatches are detected at compile time.  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13523) Implement IQv2 support in global stores

2022-01-26 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13523?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-13523:
-
Priority: Blocker  (was: Major)

> Implement IQv2 support in global stores
> ---
>
> Key: KAFKA-13523
> URL: https://issues.apache.org/jira/browse/KAFKA-13523
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: John Roesler
>Priority: Blocker
>
> Global stores pose one significant problem for IQv2: when they start up, they 
> skip the regular ingest pipeline and instead use the "restoration" pipeline 
> to read up until the current end offset. Then, they switch over to the 
> regular ingest pipeline.
> IQv2 position tracking expects to track the position of each record from the 
> input topic through the ingest pipeline and then get the position headers 
> through the restoration pipeline via the changelog topic. The fact that 
> global stores "restore" the input topic instead of ingesting it violates our 
> expectations.
> It has also caused other problems, so we may want to consider switching the 
> global store processing to use the normal paradigm rather than adding 
> special-case logic to track positions in global stores.
>  
> Note: there are two primary reasons that global stores behave this way:
>  # We can write in batches during restoration, so the I/O may be more 
> efficient
>  # The global thread does not transition to RUNNING state until it reaches 
> the (current) end of the input topic, which blocks other threads from joining 
> against it, thereby improving the time synchronization of global KTable joins.
> If we want to propose changing the bootstrapping pipeline for global threads, 
> we should have some kind of answer to these concerns.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13548) IQv2: revisit WindowKeyQuery and WindowRangeQuery

2022-01-26 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-13548:
-
Affects Version/s: 3.2.0

> IQv2: revisit WindowKeyQuery and WindowRangeQuery
> -
>
> Key: KAFKA-13548
> URL: https://issues.apache.org/jira/browse/KAFKA-13548
> Project: Kafka
>  Issue Type: Sub-task
>Affects Versions: 3.2.0
>Reporter: John Roesler
>Priority: Blocker
>
> During discussion of KIP-806, there was a suggestion to refactor the queries 
> following a builder pattern so that we can compactly and flexibly specify 
> lower and upper bounds on the keys, window start times, and window end times.
> We should circle back and try to generalize the queries' interfaces before 
> the first release of IQv2.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13526) IQv2: Consider more generic logic for mapping between binary and typed queries

2022-01-26 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-13526:
-
Affects Version/s: 3.2.0

> IQv2: Consider more generic logic for mapping between binary and typed queries
> --
>
> Key: KAFKA-13526
> URL: https://issues.apache.org/jira/browse/KAFKA-13526
> Project: Kafka
>  Issue Type: Sub-task
>Affects Versions: 3.2.0
>Reporter: John Roesler
>Priority: Blocker
>
> Right now, typed queries (like KeyQuery) need to be specially handled and 
> translated to their binary counterparts (like RawKeyQuery). This happens in 
> the Metered store layers, where the serdes are known. It is necessary because 
> lower store layers are only able to handle binary data (because they don't 
> know the serdes).
> This situation is not ideal, since the Metered store layers will grow to host 
> quite a bit of query handling and translation logic, because the relationship 
> between typed queries and binary counterparts is not obvious, and because we 
> can only automatically translate known query types. User-supplied queries and 
> stores will have to work things out using their a-priori knowledge of the 
> serdes.
>  
> One suggestion (from [~mjsax] ) is to come up with some kind of generic 
> "query mapping" API, which the Metered stores would use to translate back and 
> forth between typed and raw keys and values. Users would be able to supply 
> their own mappings along with their custom queries.
> Another option would be to have the Metered stores attach the serdes to the 
> query on the way down and then to the result on the way up. Then, the serdes 
> would be available in the bytes store (as part of the request) and to the 
> users when they get their results (as part of the response).
> Other options may also surface once we start playing with ideas.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13554) Rename RangeQuery to KeyRangeQuery

2022-01-26 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13554?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-13554:
-
Affects Version/s: 3.2.0

> Rename RangeQuery to KeyRangeQuery
> --
>
> Key: KAFKA-13554
> URL: https://issues.apache.org/jira/browse/KAFKA-13554
> Project: Kafka
>  Issue Type: Sub-task
>Affects Versions: 3.2.0
>Reporter: John Roesler
>Assignee: Vicky Papavasileiou
>Priority: Major
>
> Just to avoid confusion wrt WindowRangeQuery



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13554) Rename RangeQuery to KeyRangeQuery

2022-01-26 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13554?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-13554:
-
Priority: Blocker  (was: Major)

> Rename RangeQuery to KeyRangeQuery
> --
>
> Key: KAFKA-13554
> URL: https://issues.apache.org/jira/browse/KAFKA-13554
> Project: Kafka
>  Issue Type: Sub-task
>Affects Versions: 3.2.0
>Reporter: John Roesler
>Assignee: Vicky Papavasileiou
>Priority: Blocker
>
> Just to avoid confusion wrt WindowRangeQuery



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13622) Revisit the complexity of position tracking in state stores

2022-01-26 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13622?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-13622:
-
Priority: Blocker  (was: Major)

> Revisit the complexity of position tracking in state stores
> ---
>
> Key: KAFKA-13622
> URL: https://issues.apache.org/jira/browse/KAFKA-13622
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: John Roesler
>Priority: Blocker
>
> Currently, state store implementers have a significant burden to track 
> position correctly. They have to:
>  * update the position during all puts
>  * implement the RecordBatchingStateRestoreCallback and use the 
> {color:#00}ChangelogRecordDeserializationHelper to update the position 
> based on record headers{color}
>  * {color:#00}implement some mechanism to restore the position after a 
> restart if the store is persistent (such as supply a CommitCallback to write 
> the position to a local file and then read the file during init){color}
> {color:#00}[~guozhang] pointed out during review that this is probably 
> too much responsibility (and certainly too much opportunity for error). We 
> should see what we can do to simplify these responsibilities, if not 
> eliminate them entirely from the store implementer's scope of concern.
> {color}
>  
> {color:#00}See 
> https://github.com/apache/kafka/pull/11676#discussion_r790358058{color}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13622) Revisit the complexity of position tracking in state stores

2022-01-26 Thread John Roesler (Jira)
John Roesler created KAFKA-13622:


 Summary: Revisit the complexity of position tracking in state 
stores
 Key: KAFKA-13622
 URL: https://issues.apache.org/jira/browse/KAFKA-13622
 Project: Kafka
  Issue Type: Sub-task
Reporter: John Roesler


Currently, state store implementers have a significant burden to track position 
correctly. They have to:
 * update the position during all puts
 * implement the RecordBatchingStateRestoreCallback and use the 
{color:#00}ChangelogRecordDeserializationHelper to update the position 
based on record headers{color}
 * {color:#00}implement some mechanism to restore the position after a 
restart if the store is persistent (such as supply a CommitCallback to write 
the position to a local file and then read the file during init){color}

{color:#00}[~guozhang] pointed out during review that this is probably too 
much responsibility (and certainly too much opportunity for error). We should 
see what we can do to simplify these responsibilities, if not eliminate them 
entirely from the store implementer's scope of concern.
{color}

 

{color:#00}See 
https://github.com/apache/kafka/pull/11676#discussion_r790358058{color}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-13608) Implement Position restoration for all in-memory state stores

2022-01-26 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-13608.
--
Resolution: Duplicate

> Implement Position restoration for all in-memory state stores
> -
>
> Key: KAFKA-13608
> URL: https://issues.apache.org/jira/browse/KAFKA-13608
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Vicky Papavasileiou
>Priority: Major
>
> In-memory state stores restore their state from the changelog (as opposed to 
> RocksDB stores that restore from disk). In-memory stores currently don't 
> handle restoring of the Position



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13608) Implement Position restoration for all in-memory state stores

2022-01-26 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17482866#comment-17482866
 ] 

John Roesler commented on KAFKA-13608:
--

Ah, I didn't realize there was a ticket for this. I just happened to take care 
of it while making sure that the testing for 
[https://github.com/apache/kafka/pull/11676] was complete.

> Implement Position restoration for all in-memory state stores
> -
>
> Key: KAFKA-13608
> URL: https://issues.apache.org/jira/browse/KAFKA-13608
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Vicky Papavasileiou
>Priority: Major
>
> In-memory state stores restore their state from the changelog (as opposed to 
> RocksDB stores that restore from disk). In-memory stores currently don't 
> handle restoring of the Position



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-13524) IQv2: Implement KeyQuery from the RecordCache

2022-01-26 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13524?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-13524.
--
Resolution: Fixed

> IQv2: Implement KeyQuery from the RecordCache
> -
>
> Key: KAFKA-13524
> URL: https://issues.apache.org/jira/browse/KAFKA-13524
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: John Roesler
>Assignee: Vicky Papavasileiou
>Priority: Major
>
> The Record Cache in Kafka Streams is more properly termed a write buffer, 
> since it only caches writes, not reads, and its intent is to buffer the 
> writes before flushing them in bulk into lower store layers.
> Unlike scan-type queries, which require scanning both the record cache and 
> the underlying store and collating the results, the KeyQuery (and any other 
> point lookup) can straightforwardly be served from the record cache if it is 
> buffered or fall through to the underlying store if not.
> In contrast to scan-type operations, benchmarks reveal that key-based cache 
> reads are faster than always skipping the cache as well.
> Therefore, it makes sense to implement a handler in the CachingKeyValueStore 
> for the KeyQuery specifically in order to serve fresher key-based lookups. 
> Scan queries may also be useful, but their less flattering performance 
> profile makes it reasonable to leave them for follow-on work.
> We could add an option to disable cache reads on the KeyQuery, but since they 
> seem to be always better, I'm leaning toward just unilaterally serving cached 
> records if they exist.
>  
> I did a quick POC of this: 
> [https://github.com/vvcephei/kafka/pull/new/iqv2-poc-cache-queries]
>  
> The internal code of the caching stores should be refactored to share logic 
> with the regular store methods. Scan queries will be more complicated, since 
> they require merging the cache with the wrapped result.
> There is a bug related to that non-timestamped-store-serde hack (see the 
> failing test when you run IQv2StoreIntegrationTest). Even though the inner 
> store is not timestamped, the cache returns a timestamped value. We'll have 
> to discuss options to fix it.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (KAFKA-13524) IQv2: Implement KeyQuery from the RecordCache

2022-01-26 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13524?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler reassigned KAFKA-13524:


Assignee: Vicky Papavasileiou

> IQv2: Implement KeyQuery from the RecordCache
> -
>
> Key: KAFKA-13524
> URL: https://issues.apache.org/jira/browse/KAFKA-13524
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: John Roesler
>Assignee: Vicky Papavasileiou
>Priority: Major
>
> The Record Cache in Kafka Streams is more properly termed a write buffer, 
> since it only caches writes, not reads, and its intent is to buffer the 
> writes before flushing them in bulk into lower store layers.
> Unlike scan-type queries, which require scanning both the record cache and 
> the underlying store and collating the results, the KeyQuery (and any other 
> point lookup) can straightforwardly be served from the record cache if it is 
> buffered or fall through to the underlying store if not.
> In contrast to scan-type operations, benchmarks reveal that key-based cache 
> reads are faster than always skipping the cache as well.
> Therefore, it makes sense to implement a handler in the CachingKeyValueStore 
> for the KeyQuery specifically in order to serve fresher key-based lookups. 
> Scan queries may also be useful, but their less flattering performance 
> profile makes it reasonable to leave them for follow-on work.
> We could add an option to disable cache reads on the KeyQuery, but since they 
> seem to be always better, I'm leaning toward just unilaterally serving cached 
> records if they exist.
>  
> I did a quick POC of this: 
> [https://github.com/vvcephei/kafka/pull/new/iqv2-poc-cache-queries]
>  
> The internal code of the caching stores should be refactored to share logic 
> with the regular store methods. Scan queries will be more complicated, since 
> they require merging the cache with the wrapped result.
> There is a bug related to that non-timestamped-store-serde hack (see the 
> failing test when you run IQv2StoreIntegrationTest). Even though the inner 
> store is not timestamped, the cache returns a timestamped value. We'll have 
> to discuss options to fix it.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13605) Checkpoint position in state stores

2022-01-26 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13605?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-13605:
-
Parent: KAFKA-13479
Issue Type: Sub-task  (was: Improvement)

> Checkpoint position in state stores
> ---
>
> Key: KAFKA-13605
> URL: https://issues.apache.org/jira/browse/KAFKA-13605
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Patrick Stuedi
>Assignee: Patrick Stuedi
>Priority: Critical
>
> There are cases in which a state store neither has an in-memory position 
> built up nor has it gone through the state restoration process. If a store is 
> persistent (i.e., RocksDB), and we stop and restart Streams, we will have 
> neither of those continuity mechanisms available. This ticket is to fill in 
> that gap.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13524) IQv2: Implement KeyQuery from the RecordCache

2021-12-28 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13524?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-13524:
-
Description: 
The Record Cache in Kafka Streams is more properly termed a write buffer, since 
it only caches writes, not reads, and its intent is to buffer the writes before 
flushing them in bulk into lower store layers.

Unlike scan-type queries, which require scanning both the record cache and the 
underlying store and collating the results, the KeyQuery (and any other point 
lookup) can straightforwardly be served from the record cache if it is buffered 
or fall through to the underlying store if not.

In contrast to scan-type operations, benchmarks reveal that key-based cache 
reads are faster than always skipping the cache as well.

Therefore, it makes sense to implement a handler in the CachingKeyValueStore 
for the KeyQuery specifically in order to serve fresher key-based lookups. Scan 
queries may also be useful, but their less flattering performance profile makes 
it reasonable to leave them for follow-on work.

We could add an option to disable cache reads on the KeyQuery, but since they 
seem to be always better, I'm leaning toward just unilaterally serving cached 
records if they exist.

 

I did a quick POC of this: 
[https://github.com/vvcephei/kafka/pull/new/iqv2-poc-cache-queries]

 

The internal code of the caching stores should be refactored to share logic 
with the regular store methods. Scan queries will be more complicated, since 
they require merging the cache with the wrapped result.

There is a bug related to that non-timestamped-store-serde hack (see the 
failing test when you run IQv2StoreIntegrationTest). Even though the inner 
store is not timestamped, the cache returns a timestamped value. We'll have to 
discuss options to fix it.

  was:
I did a quick POC of this: 
[https://github.com/vvcephei/kafka/pull/new/iqv2-poc-cache-queries]

 

 

The internal code of the caching stores should be refactored to share logic 
with the regular store methods. Scan queries will be more complicated, since 
they require merging the cache with the wrapped result.

There is a bug related to that non-timestamped-store-serde hack (see the 
failing test when you run IQv2StoreIntegrationTest). Even though the inner 
store is not timestamped, the cache returns a timestamped value. We'll have to 
discuss options to fix it.


> IQv2: Implement KeyQuery from the RecordCache
> -
>
> Key: KAFKA-13524
> URL: https://issues.apache.org/jira/browse/KAFKA-13524
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: John Roesler
>Priority: Major
>
> The Record Cache in Kafka Streams is more properly termed a write buffer, 
> since it only caches writes, not reads, and its intent is to buffer the 
> writes before flushing them in bulk into lower store layers.
> Unlike scan-type queries, which require scanning both the record cache and 
> the underlying store and collating the results, the KeyQuery (and any other 
> point lookup) can straightforwardly be served from the record cache if it is 
> buffered or fall through to the underlying store if not.
> In contrast to scan-type operations, benchmarks reveal that key-based cache 
> reads are faster than always skipping the cache as well.
> Therefore, it makes sense to implement a handler in the CachingKeyValueStore 
> for the KeyQuery specifically in order to serve fresher key-based lookups. 
> Scan queries may also be useful, but their less flattering performance 
> profile makes it reasonable to leave them for follow-on work.
> We could add an option to disable cache reads on the KeyQuery, but since they 
> seem to be always better, I'm leaning toward just unilaterally serving cached 
> records if they exist.
>  
> I did a quick POC of this: 
> [https://github.com/vvcephei/kafka/pull/new/iqv2-poc-cache-queries]
>  
> The internal code of the caching stores should be refactored to share logic 
> with the regular store methods. Scan queries will be more complicated, since 
> they require merging the cache with the wrapped result.
> There is a bug related to that non-timestamped-store-serde hack (see the 
> failing test when you run IQv2StoreIntegrationTest). Even though the inner 
> store is not timestamped, the cache returns a timestamped value. We'll have 
> to discuss options to fix it.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13524) IQv2: Add option for KeyQuery from the RecordCache

2021-12-28 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13524?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-13524:
-
Summary: IQv2: Add option for KeyQuery from the RecordCache  (was: IQv2: 
Add option for KeyQuery from caches)

> IQv2: Add option for KeyQuery from the RecordCache
> --
>
> Key: KAFKA-13524
> URL: https://issues.apache.org/jira/browse/KAFKA-13524
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: John Roesler
>Priority: Major
>
> I did a quick POC of this: 
> [https://github.com/vvcephei/kafka/pull/new/iqv2-poc-cache-queries]
>  
>  
> The internal code of the caching stores should be refactored to share logic 
> with the regular store methods. Scan queries will be more complicated, since 
> they require merging the cache with the wrapped result.
> There is a bug related to that non-timestamped-store-serde hack (see the 
> failing test when you run IQv2StoreIntegrationTest). Even though the inner 
> store is not timestamped, the cache returns a timestamped value. We'll have 
> to discuss options to fix it.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13524) IQv2: Add option for KeyQuery from caches

2021-12-28 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13524?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-13524:
-
Summary: IQv2: Add option for KeyQuery from caches  (was: IQv2: Add option 
to query KV Stores from caches)

> IQv2: Add option for KeyQuery from caches
> -
>
> Key: KAFKA-13524
> URL: https://issues.apache.org/jira/browse/KAFKA-13524
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: John Roesler
>Priority: Major
>
> I did a quick POC of this: 
> [https://github.com/vvcephei/kafka/pull/new/iqv2-poc-cache-queries]
>  
> The internal code of the caching stores should be refactored to share logic 
> with the regular store methods. Scan queries will be more complicated, since 
> they require merging the cache with the wrapped result.
> There is a bug related to that non-timestamped-store-serde hack (see the 
> failing test when you run IQv2StoreIntegrationTest). Even though the inner 
> store is not timestamped, the cache returns a timestamped value. We'll have 
> to discuss options to fix it.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13524) IQv2: Implement KeyQuery from the RecordCache

2021-12-28 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13524?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-13524:
-
Summary: IQv2: Implement KeyQuery from the RecordCache  (was: IQv2: Add 
option for KeyQuery from the RecordCache)

> IQv2: Implement KeyQuery from the RecordCache
> -
>
> Key: KAFKA-13524
> URL: https://issues.apache.org/jira/browse/KAFKA-13524
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: John Roesler
>Priority: Major
>
> I did a quick POC of this: 
> [https://github.com/vvcephei/kafka/pull/new/iqv2-poc-cache-queries]
>  
>  
> The internal code of the caching stores should be refactored to share logic 
> with the regular store methods. Scan queries will be more complicated, since 
> they require merging the cache with the wrapped result.
> There is a bug related to that non-timestamped-store-serde hack (see the 
> failing test when you run IQv2StoreIntegrationTest). Even though the inner 
> store is not timestamped, the cache returns a timestamped value. We'll have 
> to discuss options to fix it.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13524) IQv2: Add option for KeyQuery from caches

2021-12-28 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13524?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-13524:
-
Description: 
I did a quick POC of this: 
[https://github.com/vvcephei/kafka/pull/new/iqv2-poc-cache-queries]

 

 

The internal code of the caching stores should be refactored to share logic 
with the regular store methods. Scan queries will be more complicated, since 
they require merging the cache with the wrapped result.

There is a bug related to that non-timestamped-store-serde hack (see the 
failing test when you run IQv2StoreIntegrationTest). Even though the inner 
store is not timestamped, the cache returns a timestamped value. We'll have to 
discuss options to fix it.

  was:
I did a quick POC of this: 
[https://github.com/vvcephei/kafka/pull/new/iqv2-poc-cache-queries]

 

The internal code of the caching stores should be refactored to share logic 
with the regular store methods. Scan queries will be more complicated, since 
they require merging the cache with the wrapped result.

There is a bug related to that non-timestamped-store-serde hack (see the 
failing test when you run IQv2StoreIntegrationTest). Even though the inner 
store is not timestamped, the cache returns a timestamped value. We'll have to 
discuss options to fix it.


> IQv2: Add option for KeyQuery from caches
> -
>
> Key: KAFKA-13524
> URL: https://issues.apache.org/jira/browse/KAFKA-13524
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: John Roesler
>Priority: Major
>
> I did a quick POC of this: 
> [https://github.com/vvcephei/kafka/pull/new/iqv2-poc-cache-queries]
>  
>  
> The internal code of the caching stores should be refactored to share logic 
> with the regular store methods. Scan queries will be more complicated, since 
> they require merging the cache with the wrapped result.
> There is a bug related to that non-timestamped-store-serde hack (see the 
> failing test when you run IQv2StoreIntegrationTest). Even though the inner 
> store is not timestamped, the cache returns a timestamped value. We'll have 
> to discuss options to fix it.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13524) IQv2: Add option to query KV Stores from caches

2021-12-27 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13524?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-13524:
-
Description: 
I did a quick POC of this: 
[https://github.com/vvcephei/kafka/pull/new/iqv2-poc-cache-queries]

 

The internal code of the caching stores should be refactored to share logic 
with the regular store methods. Scan queries will be more complicated, since 
they require merging the cache with the wrapped result.

There is a bug related to that non-timestamped-store-serde hack (see the 
failing test when you run IQv2StoreIntegrationTest). Even though the inner 
store is not timestamped, the cache returns a timestamped value. We'll have to 
discuss options to fix it.

> IQv2: Add option to query KV Stores from caches
> ---
>
> Key: KAFKA-13524
> URL: https://issues.apache.org/jira/browse/KAFKA-13524
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: John Roesler
>Priority: Major
>
> I did a quick POC of this: 
> [https://github.com/vvcephei/kafka/pull/new/iqv2-poc-cache-queries]
>  
> The internal code of the caching stores should be refactored to share logic 
> with the regular store methods. Scan queries will be more complicated, since 
> they require merging the cache with the wrapped result.
> There is a bug related to that non-timestamped-store-serde hack (see the 
> failing test when you run IQv2StoreIntegrationTest). Even though the inner 
> store is not timestamped, the cache returns a timestamped value. We'll have 
> to discuss options to fix it.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13524) IQv2: Add option to query KV Stores from caches

2021-12-27 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13524?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-13524:
-
Summary: IQv2: Add option to query KV Stores from caches  (was: IQv2: Add 
option to query from caches)

> IQv2: Add option to query KV Stores from caches
> ---
>
> Key: KAFKA-13524
> URL: https://issues.apache.org/jira/browse/KAFKA-13524
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: John Roesler
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13553) Add PAPI stores to IQv2StoreIntegrationTest

2021-12-27 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13553?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-13553:
-
Summary: Add PAPI stores to IQv2StoreIntegrationTest  (was: Add DSL stores 
to IQv2StoreIntegrationTest)

> Add PAPI stores to IQv2StoreIntegrationTest
> ---
>
> Key: KAFKA-13553
> URL: https://issues.apache.org/jira/browse/KAFKA-13553
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: John Roesler
>Priority: Major
>
> Right now, we only test stores registered via the DSL. To be truly 
> comprehensive, we must also test stores registered via the PAPI.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (KAFKA-13553) Add PAPI stores to IQv2StoreIntegrationTest

2021-12-27 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13553?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler reassigned KAFKA-13553:


Assignee: John Roesler

> Add PAPI stores to IQv2StoreIntegrationTest
> ---
>
> Key: KAFKA-13553
> URL: https://issues.apache.org/jira/browse/KAFKA-13553
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
>
> Right now, we only test stores registered via the DSL. To be truly 
> comprehensive, we must also test stores registered via the PAPI.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (KAFKA-13554) Rename RangeQuery to KeyRangeQuery

2021-12-27 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13554?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler reassigned KAFKA-13554:


Assignee: Vicky Papavasileiou

> Rename RangeQuery to KeyRangeQuery
> --
>
> Key: KAFKA-13554
> URL: https://issues.apache.org/jira/browse/KAFKA-13554
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: John Roesler
>Assignee: Vicky Papavasileiou
>Priority: Major
>
> Just to avoid confusion wrt WindowRangeQuery



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


  1   2   3   4   5   6   7   8   9   10   >