[jira] [Created] (KAFKA-14995) Automate asf.yaml collaborators refresh

2023-05-12 Thread John Roesler (Jira)
John Roesler created KAFKA-14995:


 Summary: Automate asf.yaml collaborators refresh
 Key: KAFKA-14995
 URL: https://issues.apache.org/jira/browse/KAFKA-14995
 Project: Kafka
  Issue Type: Improvement
Reporter: John Roesler


We have added a policy to use the asf.yaml Github Collaborators: 
[https://github.com/apache/kafka-site/pull/510]

The policy states that we set this list to be the top 20 commit authors who are 
not Kafka committers. Unfortunately, it's not trivial to compute this list.

Here is the process I followed to generate the list the first time (note that I 
generated this list on 2023-04-28, so the lookback is one year:

1. List authors by commit volume in the last year:
{code:java}
$ git shortlog --email --numbered --summary --since=2022-04-28 | vim {code}
2. manually filter out the authors who are committers, based on 
[https://kafka.apache.org/committers]

3. truncate the list to 20 authors

4. for each author

4a. Find a commit in the `git log` that they were the author on:
{code:java}
commit 440bed2391338dc10fe4d36ab17dc104b61b85e8
Author: hudeqi <1217150...@qq.com>
Date:   Fri May 12 14:03:17 2023 +0800
...{code}
4b. Look up that commit in Github: 
[https://github.com/apache/kafka/commit/440bed2391338dc10fe4d36ab17dc104b61b85e8]

4c. Copy their Github username into .asf.yaml under both the PR whitelist and 
the Collaborators lists.

5. Send a PR to update .asf.yaml: [https://github.com/apache/kafka/pull/13713]

 

This is pretty time consuming and is very scriptable. Two complications:
 * To do the filtering, we need to map from Git log "Author" to documented 
Kafka "Committer" that we can use to perform the filter. Suggestion: just 
update the structure of the "Committers" page to include their Git "Author" 
name and email 
([https://github.com/apache/kafka-site/blob/asf-site/committers.html)]
 * To generate the YAML lists, we need to map from Git log "Author" to Github 
username. There's presumably some way to do this in the Github REST API (the 
mapping is based on the email, IIUC), or we could also just update the 
Committers page to also document each committer's Github username.

 

Ideally, we would write this script (to be stored in the Apache Kafka repo) and 
create a Github Action to run it every three months.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14364) Support evolving serde with Foreign Key Join

2022-11-07 Thread John Roesler (Jira)
John Roesler created KAFKA-14364:


 Summary: Support evolving serde with Foreign Key Join
 Key: KAFKA-14364
 URL: https://issues.apache.org/jira/browse/KAFKA-14364
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: John Roesler


The current implementation of Foreign-Key join uses a hash comparison to 
determine whether it should emit join results or not. See 
[https://github.com/apache/kafka/blob/807c5b4d282e7a7a16d0bb94aa2cda9566a7cc2d/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplier.java#L94-L110]

As specified in KIP-213 
([https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable]
 ), we must do a comparison of this nature in order to get correct results when 
the foreign-key reference changes, as the old reference might emit delayed 
results after the new instance generates its updated results, leading to an 
incorrect final join state.

The hash comparison prevents this race condition by ensuring that any emitted 
results correspond to the _current_ version of the left-hand-side record (and 
therefore that the foreign-key reference itself has not changed).

An undesired side-effect of this is that if users update their serdes (in a 
compatible way), for example to add a new optional field to the record, then 
the resulting hash will change for existing records. This will cause Streams to 
stop emitting results for those records until a new left-hand-side update comes 
in, recording a new hash for those records.

It should be possible to provide a fix. Some ideas:
 * only consider the foreign-key references itself in the hash function (this 
was the original proposal, but we opted to hash the entire record as an 
optimization to suppress unnecessary updates).
 * provide a user-overridable hash function. This would be more flexible, but 
also pushes a lot of complexity onto users, and opens up the possibility to 
completely break semantics.

We will need to design the solution carefully so that we can preserve the 
desired correctness guarantee.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14254) Format timestamps in assignor logs as dates instead of integers

2022-09-21 Thread John Roesler (Jira)
John Roesler created KAFKA-14254:


 Summary: Format timestamps in assignor logs as dates instead of 
integers
 Key: KAFKA-14254
 URL: https://issues.apache.org/jira/browse/KAFKA-14254
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: John Roesler


This is a follow-on task from [https://github.com/apache/kafka/pull/12582]

There is another log line that prints the same timestamp: "Triggering the 
followup rebalance scheduled for ...", which should also be printed as a 
date/time in the same manner as PR 12582.

We should also search the codebase a little to see if we're printing timestamps 
in other log lines that would be better off as date/times.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14253) StreamsPartitionAssignor should print the member count in assignment logs

2022-09-21 Thread John Roesler (Jira)
John Roesler created KAFKA-14253:


 Summary: StreamsPartitionAssignor should print the member count in 
assignment logs
 Key: KAFKA-14253
 URL: https://issues.apache.org/jira/browse/KAFKA-14253
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: John Roesler


Debugging rebalance and assignment issues is harder than it needs to be. One 
simple thing that can help is to print out information in the logs that users 
have to compute today.

For example, the StreamsPartitionAssignor prints two messages that contain the 
the newline-delimited group membership:
{code:java}
[StreamsPartitionAssignor] [...-StreamThread-1] stream-thread 
[...-StreamThread-1-consumer] All members participating in this rebalance:

: []

: []

: []{code}
and
{code:java}
[StreamsPartitionAssignor] [...-StreamThread-1] stream-thread 
[...-StreamThread-1-consumer] Assigned tasks [...] including stateful [...] to 
clients as:

=[activeTasks: ([...]) standbyTasks: ([...])]

=[activeTasks: ([...]) standbyTasks: ([...])]

=[activeTasks: ([...]) standbyTasks: ([...])
{code}
 

In both of these cases, it would be nice to:
 # Include the number of members in the group (I.e., "15 members participating" 
and "to 15 clients as")
 # sort the member ids (to help compare the membership and assignment across 
rebalances)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14202) IQv2: Expose binary store schema to store implementations

2022-09-06 Thread John Roesler (Jira)
John Roesler created KAFKA-14202:


 Summary: IQv2: Expose binary store schema to store implementations
 Key: KAFKA-14202
 URL: https://issues.apache.org/jira/browse/KAFKA-14202
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: John Roesler


One feature of IQv2 is that store implementations can handle custom queries. 
Many custom query handlers will need to process the key or value bytes, for 
example deserializing them to implement some filter or aggregations, or even 
performing binary operations on them.

For the most part, this should be straightforward for users, since they provide 
Streams with the serdes, the store implementation, and the custom queries.

However, Streams will sometimes pack extra data around the data produced by the 
user-provided serdes. For example, the Timestamped store wrappers add a 
timestamp on the beginning of the value byte array. And in Windowed stores, we 
add window timestamps to the key bytes.

It would be nice to have some generic mechanism to communicate those schemas to 
the user-provided inner store layers to support users who need to write custom 
queries. For example, perhaps we can add an extractor class to the state store 
context



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14020) Performance regression in Producer

2022-06-24 Thread John Roesler (Jira)
John Roesler created KAFKA-14020:


 Summary: Performance regression in Producer
 Key: KAFKA-14020
 URL: https://issues.apache.org/jira/browse/KAFKA-14020
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 3.3.0
Reporter: John Roesler


[https://github.com/apache/kafka/commit/f7db6031b84a136ad0e257df722b20faa7c37b8a]
 introduced a 10% performance regression in the KafkaProducer under a default 
config.

 

The context for this result is a benchmark that we run for Kafka Streams. The 
benchmark provisions 5 independent AWS clusters, including one broker node on 
an i3.large and one client node on an i3.large. During a benchmark run, we 
first run the Producer for 10 minutes to generate test data, and then we run 
Kafka Streams under a number of configurations to measure its performance.

Our observation was a 10% regression in throughput under the simplest 
configuration, in which Streams simply consumes from a topic and does nothing 
else. That benchmark actually runs faster than the producer that generates the 
test data, so its thoughput is bounded by the data generator's throughput. 
After investigation, we realized that the regression was in the data generator, 
not the consumer or Streams.

We have numerous benchmark runs leading up to the commit in question, and they 
all show a throughput in the neighborhood of 115,000 records per second. We 
also have 40 runs including and after that commit, and they all show a 
throughput in the neighborhood of 105,000 records per second. A test on [trunk 
with the commit reverted |https://github.com/apache/kafka/pull/12342] shows a 
return to around 115,000 records per second.

Config:
{code:java}
final Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class);
{code}
Here's the producer code in the data generator. Our tests were running with 
three produceThreads.
{code:java}
 for (int t = 0; t < produceThreads; t++) {
futures.add(executorService.submit(() -> {
int threadTotal = 0;
long lastPrint = start;
final long printInterval = Duration.ofSeconds(10).toMillis();
long now;
try (final org.apache.kafka.clients.producer.Producer 
producer = new KafkaProducer<>(producerConfig(broker))) {
while (limit > (now = System.currentTimeMillis()) - start) {
for (int i = 0; i < 1000; i++) {
final String key = keys.next();
final String data = dataGen.generate();

producer.send(new ProducerRecord<>(topic, key, 
valueBuilder.apply(key, data)));

threadTotal++;
}

if ((now - lastPrint) > printInterval) {
System.out.println(Thread.currentThread().getName() + " 
produced " + numberFormat.format(threadTotal) + " to " + topic + " in " + 
Duration.ofMillis(now - start));
lastPrint = now;
}
}
}
total.addAndGet(threadTotal);
System.out.println(Thread.currentThread().getName() + " finished (" + 
numberFormat.format(threadTotal) + ") in " + Duration.ofMillis(now - start));
}));
}{code}
As you can see, this is a very basic usage.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Resolved] (KAFKA-13654) Extend KStream process with new Processor API

2022-04-19 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13654?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-13654.
--
Fix Version/s: 3.3.0
   Resolution: Fixed

> Extend KStream process with new Processor API
> -
>
> Key: KAFKA-13654
> URL: https://issues.apache.org/jira/browse/KAFKA-13654
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jorge Esteban Quilcate Otoya
>Assignee: Jorge Esteban Quilcate Otoya
>Priority: Major
>  Labels: kafka-streams, kip-required, needs-kip, streams
> Fix For: 3.3.0
>
>
> Extending KStream#process to use latest Processor API adopted here: 
> https://issues.apache.org/jira/browse/KAFKA-8410
> This new API allow typed returned KStream that will allow to chain results 
> from processors, becoming a new way to transform records with more control 
> over whats forwarded.
> KIP: https://cwiki.apache.org/confluence/x/yKbkCw



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (KAFKA-13820) Add debug-level logs to explain why a store is filtered out during interactive query

2022-04-11 Thread John Roesler (Jira)
John Roesler created KAFKA-13820:


 Summary: Add debug-level logs to explain why a store is filtered 
out during interactive query
 Key: KAFKA-13820
 URL: https://issues.apache.org/jira/browse/KAFKA-13820
 Project: Kafka
  Issue Type: Improvement
Reporter: John Roesler


Currently Kafka Streams throws an InvalidStateStoreException when the desired 
store is not present on the local instance. It also throws the same exception 
with the same message when the store is present, but it not active (and stale 
queries are disabled).

This is an important distinction when debugging store unavailability, and a 
debug-level log is an un-intrusive mechanism to expose the information.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13819) Add application.server to Streams assignor logs when set

2022-04-11 Thread John Roesler (Jira)
John Roesler created KAFKA-13819:


 Summary: Add application.server to Streams assignor logs when set
 Key: KAFKA-13819
 URL: https://issues.apache.org/jira/browse/KAFKA-13819
 Project: Kafka
  Issue Type: Improvement
Reporter: John Roesler


Currently, Streams assignment logs only include the consumer client id and the 
streams application id, but those are both randomly generated UUIDs that are 
not easy to coordinate to users' concept of the name of a host. To help bridge 
this gap, we can include the application.server (when set) in assignment logs. 
That way, users will also be able to see which host and port each member is 
associated with.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13818) Add generation to consumer assignor logs

2022-04-11 Thread John Roesler (Jira)
John Roesler created KAFKA-13818:


 Summary: Add generation to consumer assignor logs
 Key: KAFKA-13818
 URL: https://issues.apache.org/jira/browse/KAFKA-13818
 Project: Kafka
  Issue Type: Improvement
Reporter: John Roesler


Reading assignor logs is really confusing in large part because they are
spread across different layers of abstraction (the ConsumerCoordinator
and the ConsumerPartitionAssignor, which in Streams consists of several
layers of its own). Each layer in the abstraction reports useful information
that only it has access to, but because they are split over multiple lines, with
multiple members in the cluster, and (often) multiple rebalances taking place
in rapid succession, it's often hard to understand which logs are part of
which rebalance.

 

One thing we don't want to do is break encapsulation by exposing too much of 
the ConsumerCoordinator's internal state to components like the pluggable 
ConsumerPartitionAssignor.

 

We can accomplish what we want by adding the concept of a dynamic log context, 
so that the ConsumerCoordinator can add dynamic information like the generation 
id to be logged for correlation in other components without exposing any new 
information or metadata to those components themselves.

See [https://github.com/apache/kafka/pull/12020] for example.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-13479) Interactive Query v2

2022-04-06 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13479?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-13479.
--
Fix Version/s: 3.2.0
   Resolution: Fixed

> Interactive Query v2
> 
>
> Key: KAFKA-13479
> URL: https://issues.apache.org/jira/browse/KAFKA-13479
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
> Fix For: 3.2.0
>
>
> Kafka Streams supports an interesting and innovative API for "peeking" into 
> the internal state of running stateful stream processors from outside of the 
> application, called Interactive Query (IQ). This functionality has proven 
> invaluable to users over the years for everything from debugging running 
> applications to serving low latency queries straight from the Streams runtime.
> However, the actual interfaces for IQ were designed in the very early days of 
> Kafka Streams, before the project had gained significant adoption, and in the 
> absence of much precedent for this kind of API in peer projects. With the 
> benefit of hindsight, we can observe several problems with the original 
> design that we hope to address in a revised framework that will serve Streams 
> users well for many years to come.
>  
> This ticket tracks the implementation of KIP-796: 
> https://cwiki.apache.org/confluence/x/34xnCw



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-13554) Rename RangeQuery to KeyRangeQuery

2022-03-24 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13554?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-13554.
--
Resolution: Won't Fix

> Rename RangeQuery to KeyRangeQuery
> --
>
> Key: KAFKA-13554
> URL: https://issues.apache.org/jira/browse/KAFKA-13554
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Minor
>
> Just to avoid confusion wrt WindowRangeQuery



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-13714) Flaky test IQv2StoreIntegrationTest

2022-03-23 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13714?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-13714.
--
Fix Version/s: 3.2.0
 Assignee: John Roesler
   Resolution: Fixed

> Flaky test IQv2StoreIntegrationTest
> ---
>
> Key: KAFKA-13714
> URL: https://issues.apache.org/jira/browse/KAFKA-13714
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.2.0
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Blocker
> Fix For: 3.2.0
>
>
> I have observed multiple consistency violations in the 
> IQv2StoreIntegrationTest. Since this is the first release of IQv2, and it's 
> apparently a major flaw in the feature, we should not release with this bug 
> outstanding. Depending on the time-table, we may want to block the release or 
> pull the feature until the next release.
>  
> The first observation I have is from 23 Feb 2022. So far all observations 
> point to the range query in particular, and all observations have been for 
> RocksDB stores, including RocksDBStore, TimestampedRocksDBStore, and the 
> windowed store built on RocksDB segments.
> For reference, range queries were implemented on 16 Feb 2022: 
> [https://github.com/apache/kafka/commit/b38f6ba5cc989702180f5d5f8e55ba20444ea884]
> The window-specific range query test has also failed once that I have seen. 
> That feature was implemented on 2 Jan 2022: 
> [https://github.com/apache/kafka/commit/b8f1cf14c396ab04b8968a8fa04d8cf67dd3254c]
>  
> Here are some stack traces I have seen:
> {code:java}
> verifyStore[cache=true, log=true, supplier=ROCKS_KV, kind=PAPI]
> java.lang.AssertionError: 
> Expected: is <[1, 2, 3]>
>  but: was <[1, 2]>
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6)
>   at 
> org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQuery(IQv2StoreIntegrationTest.java:1125)
>   at 
> org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQueries(IQv2StoreIntegrationTest.java:803)
>   at 
> org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.verifyStore(IQv2StoreIntegrationTest.java:776)
>  {code}
> {code:java}
> verifyStore[cache=true, log=true, supplier=TIME_ROCKS_KV, kind=PAPI]
> java.lang.AssertionError: 
> Expected: is <[1, 2, 3]>
>  but: was <[1, 3]>
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6)
>   at 
> org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQuery(IQv2StoreIntegrationTest.java:1131)
>   at 
> org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQueries(IQv2StoreIntegrationTest.java:809)
>   at 
> org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.verifyStore(IQv2StoreIntegrationTest.java:778)
>{code}
> {code:java}
> verifyStore[cache=true, log=true, supplier=ROCKS_KV, kind=PAPI]
> java.lang.AssertionError: 
> Result:StateQueryResult{partitionResults={0=SucceededQueryResult{result=org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueTimestampedIterator@35025a0a,
>  executionInfo=[], position=Position{position={input-topic={0=1, 
> 1=SucceededQueryResult{result=org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueTimestampedIterator@38732364,
>  executionInfo=[], position=Position{position={input-topic={1=1}, 
> globalResult=null}
> Expected: is <[1, 2, 3]>
>  but: was <[1, 2]>
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
>   at 
> org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQuery(IQv2StoreIntegrationTest.java:1129)
>   at 
> org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQueries(IQv2StoreIntegrationTest.java:807)
>   at 
> org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.verifyStore(IQv2StoreIntegrationTest.java:780)
>  {code}
> {code:java}
> verifyStore[cache=true, log=false, supplier=ROCKS_WINDOW, kind=DSL] 
>     java.lang.AssertionError: 
> Result:StateQueryResult{partitionResults={0=SucceededQueryResult{result=org.apache.kafka.streams.state.internals.MeteredWindowedKeyValueIterator@2a32fb6,
>  executionInfo=[], position=Position{position={input-topic={0=1, 
> 1=SucceededQueryResult{result=org.apache.kafka.streams.state.internals.MeteredWindowedKeyValueIterator@6107165,
>  executionInfo=[], position=Position{position={input-topic={1=1}, 
> globalResult=null}
>     Expected: is <[0, 1, 2, 3]> 
>          but: was <[0, 2, 3]>
>         at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
>        

[jira] [Created] (KAFKA-13714) Flaky test IQv2StoreIntegrationTest

2022-03-07 Thread John Roesler (Jira)
John Roesler created KAFKA-13714:


 Summary: Flaky test IQv2StoreIntegrationTest
 Key: KAFKA-13714
 URL: https://issues.apache.org/jira/browse/KAFKA-13714
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.2.0
Reporter: John Roesler


I have observed multiple consistency violations in the 
IQv2StoreIntegrationTest. Since this is the first release of IQv2, and it's 
apparently a major flaw in the feature, we should not release with this bug 
outstanding. Depending on the time-table, we may want to block the release or 
pull the feature until the next release.

 

The first observation I have is from 23 Feb 2022. So far all observations point 
to the range query in particular, and all observations have been for RocksDB 
stores, including RocksDBStore, TimestampedRocksDBStore, and the windowed store 
built on RocksDB segments.

For reference, range queries were implemented on 16 Feb 2022: 
[https://github.com/apache/kafka/commit/b38f6ba5cc989702180f5d5f8e55ba20444ea884]

The window-specific range query test has also failed once that I have seen. 
That feature was implemented on 2 Jan 2022: 
[https://github.com/apache/kafka/commit/b8f1cf14c396ab04b8968a8fa04d8cf67dd3254c]

 

Here are some stack traces I have seen:
{code:java}
verifyStore[cache=true, log=true, supplier=ROCKS_KV, kind=PAPI]

java.lang.AssertionError: 
Expected: is <[1, 2, 3]>
 but: was <[1, 2]>
at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6)
at 
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQuery(IQv2StoreIntegrationTest.java:1125)
at 
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQueries(IQv2StoreIntegrationTest.java:803)
at 
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.verifyStore(IQv2StoreIntegrationTest.java:776)
 {code}
{code:java}
verifyStore[cache=true, log=true, supplier=TIME_ROCKS_KV, kind=PAPI]

java.lang.AssertionError: 
Expected: is <[1, 2, 3]>
 but: was <[1, 3]>
at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6)
at 
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQuery(IQv2StoreIntegrationTest.java:1131)
at 
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQueries(IQv2StoreIntegrationTest.java:809)
at 
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.verifyStore(IQv2StoreIntegrationTest.java:778)
 {code}
{code:java}
verifyStore[cache=true, log=true, supplier=ROCKS_KV, kind=PAPI]

java.lang.AssertionError: 
Result:StateQueryResult{partitionResults={0=SucceededQueryResult{result=org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueTimestampedIterator@35025a0a,
 executionInfo=[], position=Position{position={input-topic={0=1, 
1=SucceededQueryResult{result=org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueTimestampedIterator@38732364,
 executionInfo=[], position=Position{position={input-topic={1=1}, 
globalResult=null}
Expected: is <[1, 2, 3]>
 but: was <[1, 2]>
at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
at 
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQuery(IQv2StoreIntegrationTest.java:1129)
at 
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQueries(IQv2StoreIntegrationTest.java:807)
at 
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.verifyStore(IQv2StoreIntegrationTest.java:780)
 {code}
{code:java}
verifyStore[cache=true, log=false, supplier=ROCKS_WINDOW, kind=DSL] 

    java.lang.AssertionError: 
Result:StateQueryResult{partitionResults={0=SucceededQueryResult{result=org.apache.kafka.streams.state.internals.MeteredWindowedKeyValueIterator@2a32fb6,
 executionInfo=[], position=Position{position={input-topic={0=1, 
1=SucceededQueryResult{result=org.apache.kafka.streams.state.internals.MeteredWindowedKeyValueIterator@6107165,
 executionInfo=[], position=Position{position={input-topic={1=1}, 
globalResult=null}
    Expected: is <[0, 1, 2, 3]> 
         but: was <[0, 2, 3]>
        at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
        at 
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleWindowRangeQuery(IQv2StoreIntegrationTest.java:1234)
        at 
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleWindowRangeQueries(IQv2StoreIntegrationTest.java:880)
        at 
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.verifyStore(IQv2StoreIntegrationTest.java:793)
 {code}
 

Some observations:
 * After I added the 

[jira] [Created] (KAFKA-13622) Revisit the complexity of position tracking in state stores

2022-01-26 Thread John Roesler (Jira)
John Roesler created KAFKA-13622:


 Summary: Revisit the complexity of position tracking in state 
stores
 Key: KAFKA-13622
 URL: https://issues.apache.org/jira/browse/KAFKA-13622
 Project: Kafka
  Issue Type: Sub-task
Reporter: John Roesler


Currently, state store implementers have a significant burden to track position 
correctly. They have to:
 * update the position during all puts
 * implement the RecordBatchingStateRestoreCallback and use the 
{color:#00}ChangelogRecordDeserializationHelper to update the position 
based on record headers{color}
 * {color:#00}implement some mechanism to restore the position after a 
restart if the store is persistent (such as supply a CommitCallback to write 
the position to a local file and then read the file during init){color}

{color:#00}[~guozhang] pointed out during review that this is probably too 
much responsibility (and certainly too much opportunity for error). We should 
see what we can do to simplify these responsibilities, if not eliminate them 
entirely from the store implementer's scope of concern.
{color}

 

{color:#00}See 
https://github.com/apache/kafka/pull/11676#discussion_r790358058{color}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-13608) Implement Position restoration for all in-memory state stores

2022-01-26 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-13608.
--
Resolution: Duplicate

> Implement Position restoration for all in-memory state stores
> -
>
> Key: KAFKA-13608
> URL: https://issues.apache.org/jira/browse/KAFKA-13608
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Vicky Papavasileiou
>Priority: Major
>
> In-memory state stores restore their state from the changelog (as opposed to 
> RocksDB stores that restore from disk). In-memory stores currently don't 
> handle restoring of the Position



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-13524) IQv2: Implement KeyQuery from the RecordCache

2022-01-26 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13524?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-13524.
--
Resolution: Fixed

> IQv2: Implement KeyQuery from the RecordCache
> -
>
> Key: KAFKA-13524
> URL: https://issues.apache.org/jira/browse/KAFKA-13524
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: John Roesler
>Assignee: Vicky Papavasileiou
>Priority: Major
>
> The Record Cache in Kafka Streams is more properly termed a write buffer, 
> since it only caches writes, not reads, and its intent is to buffer the 
> writes before flushing them in bulk into lower store layers.
> Unlike scan-type queries, which require scanning both the record cache and 
> the underlying store and collating the results, the KeyQuery (and any other 
> point lookup) can straightforwardly be served from the record cache if it is 
> buffered or fall through to the underlying store if not.
> In contrast to scan-type operations, benchmarks reveal that key-based cache 
> reads are faster than always skipping the cache as well.
> Therefore, it makes sense to implement a handler in the CachingKeyValueStore 
> for the KeyQuery specifically in order to serve fresher key-based lookups. 
> Scan queries may also be useful, but their less flattering performance 
> profile makes it reasonable to leave them for follow-on work.
> We could add an option to disable cache reads on the KeyQuery, but since they 
> seem to be always better, I'm leaning toward just unilaterally serving cached 
> records if they exist.
>  
> I did a quick POC of this: 
> [https://github.com/vvcephei/kafka/pull/new/iqv2-poc-cache-queries]
>  
> The internal code of the caching stores should be refactored to share logic 
> with the regular store methods. Scan queries will be more complicated, since 
> they require merging the cache with the wrapped result.
> There is a bug related to that non-timestamped-store-serde hack (see the 
> failing test when you run IQv2StoreIntegrationTest). Even though the inner 
> store is not timestamped, the cache returns a timestamped value. We'll have 
> to discuss options to fix it.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-13525) IQv2: Implement KeyQuery from the KIP

2021-12-20 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13525?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-13525.
--
Resolution: Fixed

> IQv2: Implement KeyQuery from the KIP
> -
>
> Key: KAFKA-13525
> URL: https://issues.apache.org/jira/browse/KAFKA-13525
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13557) IQv2: Remove swapResult from the public API

2021-12-20 Thread John Roesler (Jira)
John Roesler created KAFKA-13557:


 Summary: IQv2: Remove swapResult from the public API
 Key: KAFKA-13557
 URL: https://issues.apache.org/jira/browse/KAFKA-13557
 Project: Kafka
  Issue Type: Sub-task
Reporter: John Roesler


During the review for [https://github.com/apache/kafka/pull/11582,] it was 
pointed out that QueryResult#swapResult doesn't really need to be in the public 
API, since it only supports internal code.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13554) Rename RangeQuery to KeyRangeQuery

2021-12-17 Thread John Roesler (Jira)
John Roesler created KAFKA-13554:


 Summary: Rename RangeQuery to KeyRangeQuery
 Key: KAFKA-13554
 URL: https://issues.apache.org/jira/browse/KAFKA-13554
 Project: Kafka
  Issue Type: Sub-task
Reporter: John Roesler


Just to avoid confusion wrt WindowRangeQuery



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13553) Add DSL stores to IQv2StoreIntegrationTest

2021-12-17 Thread John Roesler (Jira)
John Roesler created KAFKA-13553:


 Summary: Add DSL stores to IQv2StoreIntegrationTest
 Key: KAFKA-13553
 URL: https://issues.apache.org/jira/browse/KAFKA-13553
 Project: Kafka
  Issue Type: Sub-task
Reporter: John Roesler


Right now, we only test stores registered via the DSL. To be truly 
comprehensive, we must also test stores registered via the PAPI.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-13492) IQ Parity: queries for key/value store range and scan

2021-12-16 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13492?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-13492.
--
Resolution: Fixed

> IQ Parity: queries for key/value store range and scan
> -
>
> Key: KAFKA-13492
> URL: https://issues.apache.org/jira/browse/KAFKA-13492
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: John Roesler
>Assignee: Vicky Papavasileiou
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13548) IQv2: revisit WindowKeyQuery and WindowRangeQuery

2021-12-15 Thread John Roesler (Jira)
John Roesler created KAFKA-13548:


 Summary: IQv2: revisit WindowKeyQuery and WindowRangeQuery
 Key: KAFKA-13548
 URL: https://issues.apache.org/jira/browse/KAFKA-13548
 Project: Kafka
  Issue Type: Sub-task
Reporter: John Roesler


During discussion of KIP-806, there was a suggestion to refactor the queries 
following a builder pattern so that we can compactly and flexibly specify lower 
and upper bounds on the keys, window start times, and window end times.

We should circle back and try to generalize the queries' interfaces before the 
first release of IQv2.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-13532) Flaky test KafkaStreamsTest.testInitializesAndDestroysMetricsReporters

2021-12-14 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13532?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-13532.
--
Resolution: Cannot Reproduce

I looked in every branch build I could find, and didn't see any failures for 
this test. I think it's more likely that there was actually something wrong 
with whatever PR this was on (i.e., the failure was legit and not flaky).

 

Rather than leaving this hanging around, I'll go ahead and close it. We can 
re-open if it does start to pop up in a flaky fashion.

> Flaky test KafkaStreamsTest.testInitializesAndDestroysMetricsReporters
> --
>
> Key: KAFKA-13532
> URL: https://issues.apache.org/jira/browse/KAFKA-13532
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Matthias J. Sax
>Priority: Critical
>  Labels: flaky-test
>
> {quote}java.lang.AssertionError: expected:<26> but was:<27> at 
> org.junit.Assert.fail(Assert.java:89) at 
> org.junit.Assert.failNotEquals(Assert.java:835) at 
> org.junit.Assert.assertEquals(Assert.java:647) at 
> org.junit.Assert.assertEquals(Assert.java:633) at 
> org.apache.kafka.streams.KafkaStreamsTest.testInitializesAndDestroysMetricsReporters(KafkaStreamsTest.java:556){quote}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13526) IQv2: Consider more generic logic for mapping between binary and typed queries

2021-12-08 Thread John Roesler (Jira)
John Roesler created KAFKA-13526:


 Summary: IQv2: Consider more generic logic for mapping between 
binary and typed queries
 Key: KAFKA-13526
 URL: https://issues.apache.org/jira/browse/KAFKA-13526
 Project: Kafka
  Issue Type: Sub-task
Reporter: John Roesler


Right now, typed queries (like KeyQuery) need to be specially handled and 
translated to their binary counterparts (like RawKeyQuery). This happens in the 
Metered store layers, where the serdes are known. It is necessary because lower 
store layers are only able to handle binary data (because they don't know the 
serdes).

This situation is not ideal, since the Metered store layers will grow to host 
quite a bit of query handling and translation logic, because the relationship 
between typed queries and binary counterparts is not obvious, and because we 
can only automatically translate known query types. User-supplied queries and 
stores will have to work things out using their a-priori knowledge of the 
serdes.

 

One suggestion (from [~mjsax] ) is to come up with some kind of generic "query 
mapping" API, which the Metered stores would use to translate back and forth 
between typed and raw keys and values. Users would be able to supply their own 
mappings along with their custom queries.

Another option would be to have the Metered stores attach the serdes to the 
query on the way down and then to the result on the way up. Then, the serdes 
would be available in the bytes store (as part of the request) and to the users 
when they get their results (as part of the response).

Other options may also surface once we start playing with ideas.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13525) IQv2: Implement KeyQuery from the KIP

2021-12-08 Thread John Roesler (Jira)
John Roesler created KAFKA-13525:


 Summary: IQv2: Implement KeyQuery from the KIP
 Key: KAFKA-13525
 URL: https://issues.apache.org/jira/browse/KAFKA-13525
 Project: Kafka
  Issue Type: Sub-task
Reporter: John Roesler






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13524) IQv2: Add option to query from caches

2021-12-08 Thread John Roesler (Jira)
John Roesler created KAFKA-13524:


 Summary: IQv2: Add option to query from caches
 Key: KAFKA-13524
 URL: https://issues.apache.org/jira/browse/KAFKA-13524
 Project: Kafka
  Issue Type: Sub-task
Reporter: John Roesler






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13523) Implement IQv2 support in global stores

2021-12-08 Thread John Roesler (Jira)
John Roesler created KAFKA-13523:


 Summary: Implement IQv2 support in global stores
 Key: KAFKA-13523
 URL: https://issues.apache.org/jira/browse/KAFKA-13523
 Project: Kafka
  Issue Type: Sub-task
Reporter: John Roesler


Global stores pose one significant problem for IQv2: when they start up, they 
skip the regular ingest pipeline and instead use the "restoration" pipeline to 
read up until the current end offset. Then, they switch over to the regular 
ingest pipeline.

IQv2 position tracking expects to track the position of each record from the 
input topic through the ingest pipeline and then get the position headers 
through the restoration pipeline via the changelog topic. The fact that global 
stores "restore" the input topic instead of ingesting it violates our 
expectations.

It has also caused other problems, so we may want to consider switching the 
global store processing to use the normal paradigm rather than adding 
special-case logic to track positions in global stores.

 

Note: there are two primary reasons that global stores behave this way:
 # We can write in batches during restoration, so the I/O may be more efficient
 # The global thread does not transition to RUNNING state until it reaches the 
(current) end of the input topic, which blocks other threads from joining 
against it, thereby improving the time synchronization of global KTable joins.

If we want to propose changing the bootstrapping pipeline for global threads, 
we should have some kind of answer to these concerns.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-13506) IQv2: Transmit position to standbys

2021-12-08 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-13506.
--
Resolution: Fixed

> IQv2: Transmit position to standbys
> ---
>
> Key: KAFKA-13506
> URL: https://issues.apache.org/jira/browse/KAFKA-13506
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: John Roesler
>Assignee: Vicky Papavasileiou
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-13491) Implement IQv2 Framework

2021-12-08 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13491?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-13491.
--
Resolution: Fixed

> Implement IQv2 Framework
> 
>
> Key: KAFKA-13491
> URL: https://issues.apache.org/jira/browse/KAFKA-13491
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
>
> See https://cwiki.apache.org/confluence/x/85OqCw



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13522) IQv2: Implement position tracking and bounding in API

2021-12-08 Thread John Roesler (Jira)
John Roesler created KAFKA-13522:


 Summary: IQv2: Implement position tracking and bounding in API
 Key: KAFKA-13522
 URL: https://issues.apache.org/jira/browse/KAFKA-13522
 Project: Kafka
  Issue Type: Sub-task
Reporter: John Roesler






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13506) IQv2: Transmit position to standbys

2021-12-03 Thread John Roesler (Jira)
John Roesler created KAFKA-13506:


 Summary: IQv2: Transmit position to standbys
 Key: KAFKA-13506
 URL: https://issues.apache.org/jira/browse/KAFKA-13506
 Project: Kafka
  Issue Type: Sub-task
Reporter: John Roesler






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-13498) IQv2: Track Position in remaining stores

2021-12-01 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13498?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-13498.
--
Resolution: Fixed

> IQv2: Track Position in remaining stores
> 
>
> Key: KAFKA-13498
> URL: https://issues.apache.org/jira/browse/KAFKA-13498
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: John Roesler
>Assignee: Patrick Stuedi
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13498) IQv2: Track Position in remaining stores

2021-12-01 Thread John Roesler (Jira)
John Roesler created KAFKA-13498:


 Summary: IQv2: Track Position in remaining stores
 Key: KAFKA-13498
 URL: https://issues.apache.org/jira/browse/KAFKA-13498
 Project: Kafka
  Issue Type: Sub-task
Reporter: John Roesler






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13492) IQ Parity: queries for key/value store range and scan

2021-11-30 Thread John Roesler (Jira)
John Roesler created KAFKA-13492:


 Summary: IQ Parity: queries for key/value store range and scan
 Key: KAFKA-13492
 URL: https://issues.apache.org/jira/browse/KAFKA-13492
 Project: Kafka
  Issue Type: Sub-task
Reporter: John Roesler






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13491) Implement IQv2 Framework

2021-11-30 Thread John Roesler (Jira)
John Roesler created KAFKA-13491:


 Summary: Implement IQv2 Framework
 Key: KAFKA-13491
 URL: https://issues.apache.org/jira/browse/KAFKA-13491
 Project: Kafka
  Issue Type: Sub-task
Reporter: John Roesler


See https://cwiki.apache.org/confluence/x/85OqCw



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-13480) IQv2: Track Position in KeyValue stores

2021-11-24 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13480?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-13480.
--
Resolution: Fixed

> IQv2: Track Position in KeyValue stores
> ---
>
> Key: KAFKA-13480
> URL: https://issues.apache.org/jira/browse/KAFKA-13480
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: John Roesler
>Assignee: Patrick Stuedi
>Priority: Major
>
> Book-keep the latest seen position in key-value stores to support IQv2



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13480) IQv2: Track Position in KeyValue stores

2021-11-24 Thread John Roesler (Jira)
John Roesler created KAFKA-13480:


 Summary: IQv2: Track Position in KeyValue stores
 Key: KAFKA-13480
 URL: https://issues.apache.org/jira/browse/KAFKA-13480
 Project: Kafka
  Issue Type: Sub-task
Reporter: John Roesler


Book-keep the latest seen position in key-value stores to support IQv2



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13479) Interactive Query v2

2021-11-24 Thread John Roesler (Jira)
John Roesler created KAFKA-13479:


 Summary: Interactive Query v2
 Key: KAFKA-13479
 URL: https://issues.apache.org/jira/browse/KAFKA-13479
 Project: Kafka
  Issue Type: New Feature
  Components: streams
Reporter: John Roesler


Kafka Streams supports an interesting and innovative API for "peeking" into the 
internal state of running stateful stream processors from outside of the 
application, called Interactive Query (IQ). This functionality has proven 
invaluable to users over the years for everything from debugging running 
applications to serving low latency queries straight from the Streams runtime.

However, the actual interfaces for IQ were designed in the very early days of 
Kafka Streams, before the project had gained significant adoption, and in the 
absence of much precedent for this kind of API in peer projects. With the 
benefit of hindsight, we can observe several problems with the original design 
that we hope to address in a revised framework that will serve Streams users 
well for many years to come.

 

This ticket tracks the implementation of KIP-796: 
https://cwiki.apache.org/confluence/x/34xnCw



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-13426) Add recordMetadata to StateStoreContext

2021-11-16 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13426?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-13426.
--
Resolution: Fixed

> Add recordMetadata to StateStoreContext
> ---
>
> Key: KAFKA-13426
> URL: https://issues.apache.org/jira/browse/KAFKA-13426
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Patrick Stuedi
>Assignee: Patrick Stuedi
>Priority: Minor
>  Labels: kip
>
> KIP-791: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-791%3A+Add+Record+Metadata+to+State+Store+Context]
> In order for state stores to provide stronger consistency in the future 
> (e.g., RYW consistency) they need to be able to collect record metadata 
> (e.g., offset information).
> Today, we already make record metadata available in the 
> AbstractProcessContext (recordMetadata()), but the call is not currently 
> exposed through the StateStoreContext interface that is used by the state 
> store. 
> The task of this ticket is to expose recordMetadata in the StateStoreContext. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-13429) Update gitignore to include new modules

2021-11-10 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13429?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-13429.
--
Resolution: Fixed

> Update gitignore to include new modules
> ---
>
> Key: KAFKA-13429
> URL: https://issues.apache.org/jira/browse/KAFKA-13429
> Project: Kafka
>  Issue Type: Task
>Reporter: Jorge Esteban Quilcate Otoya
>Assignee: Jorge Esteban Quilcate Otoya
>Priority: Trivial
>
> Add `/bin/` to `.gitignore` for the following modules:
>  * connect/basic-auth-extension
>  * metadata
>  * raft
>  * tools
>  * togdor
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-4064) Add support for infinite endpoints for range queries in Kafka Streams KV stores

2021-07-29 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4064?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-4064.
-
Fix Version/s: 3.1.0
   Resolution: Fixed

> Add support for infinite endpoints for range queries in Kafka Streams KV 
> stores
> ---
>
> Key: KAFKA-4064
> URL: https://issues.apache.org/jira/browse/KAFKA-4064
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.1.0, 0.10.2.0
>Reporter: Roger Hoover
>Assignee: Patrick Stuedi
>Priority: Minor
>  Labels: needs-kip
> Fix For: 3.1.0
>
>
> In some applications, it's useful to iterate over the key-value store either:
> 1. from the beginning up to a certain key
> 2. from a certain key to the end
> We can add two new methods rangeUtil() and rangeFrom() easily to support this.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13117) After processors, migrate TupleForwarder and CacheFlushListener

2021-07-21 Thread John Roesler (Jira)
John Roesler created KAFKA-13117:


 Summary: After processors, migrate TupleForwarder and 
CacheFlushListener
 Key: KAFKA-13117
 URL: https://issues.apache.org/jira/browse/KAFKA-13117
 Project: Kafka
  Issue Type: Sub-task
Reporter: John Roesler


Currently, both of these interfaces take plain values in combination with 
timestamps:

CacheFlushListener:
{code:java}
void apply(K key, V newValue, V oldValue, long timestamp)
{code}
TimestampedTupleForwarder
{code:java}
 void maybeForward(K key,
   V newValue,
   V oldValue,
   long timestamp){code}
These are internally translated to the new PAPI, but after the processors are 
migrated, there won't be a need to have this translation. We should update both 
of these APIs to just accept {{Record>}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10546) KIP-478: Deprecate the old PAPI interfaces

2021-07-21 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10546?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-10546.
--
Resolution: Fixed

> KIP-478: Deprecate the old PAPI interfaces
> --
>
> Key: KAFKA-10546
> URL: https://issues.apache.org/jira/browse/KAFKA-10546
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
>
> Can't be done until after the DSL internals are migrated.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Reopened] (KAFKA-8410) Strengthen the types of Processors, at least in the DSL, maybe in the PAPI as well

2021-07-20 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8410?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler reopened KAFKA-8410:
-

> Strengthen the types of Processors, at least in the DSL, maybe in the PAPI as 
> well
> --
>
> Key: KAFKA-8410
> URL: https://issues.apache.org/jira/browse/KAFKA-8410
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
>  Labels: tech-debt
>
> Presently, it's very difficult to have confidence when adding to or modifying 
> processors in the DSL. There's a lot of raw types, duck-typing, and casting 
> that contribute to this problem.
> The root, though, is that the generic types on `Processor` refer only to 
> the _input_ key and value types. No information is captured or verified about 
> what the _output_ types of a processor are. For example, this leads to 
> widespread confusion in the code base about whether a processor produces `V`s 
> or `Change`s. The type system actually makes matters worse, since we use 
> casts to make the processors conform to declared types that are in fact 
> wrong, but are never checked due to erasure.
> We can start to make some headway on this tech debt by adding some types to 
> the ProcessorContext that bound the `` that may be passed to 
> `context.forward`. Then, we can build on this by fully specifying the input 
> and output types of the Processors, which in turn would let us eliminate the 
> majority of unchecked casts in the DSL operators.
> I'm not sure whether adding these generic types to the existing 
> ProcessorContext and Processor interfaces, which would also affect the PAPI 
> has any utility, or whether we should make this purely an internal change by 
> introducing GenericProcessorContext and GenericProcessor peer interfaces for 
> the DSL to use.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10091) Improve task idling

2021-07-12 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-10091.
--
Resolution: Fixed

> Improve task idling
> ---
>
> Key: KAFKA-10091
> URL: https://issues.apache.org/jira/browse/KAFKA-10091
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Blocker
>  Labels: needs-kip
> Fix For: 3.0.0
>
>
> When Streams is processing a task with multiple inputs, each time it is ready 
> to process a record, it has to choose which input to process next. It always 
> takes from the input for which the next record has the least timestamp. The 
> result of this is that Streams processes data in timestamp order. However, if 
> the buffer for one of the inputs is empty, Streams doesn't know what 
> timestamp the next record for that input will be.
> Streams introduced a configuration "max.task.idle.ms" in KIP-353 to address 
> this issue.
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-353%3A+Improve+Kafka+Streams+Timestamp+Synchronization]
> The config allows Streams to wait some amount of time for data to arrive on 
> the empty input, so that it can make a timestamp-ordered decision about which 
> input to pull from next.
> However, this config can be hard to use reliably and efficiently, since what 
> we're really waiting for is the next poll that _would_ return data from the 
> empty input's partition, and this guarantee is a function of the poll 
> interval, the max poll interval, and the internal logic that governs when 
> Streams will poll again.
> The ideal case is you'd be able to guarantee at a minimum that _any_ amount 
> of idling would guarantee you poll data from the empty partition if there's 
> data to fetch.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12360) Improve documentation of max.task.idle.ms (kafka-streams)

2021-07-12 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-12360.
--
Resolution: Fixed

> Improve documentation of max.task.idle.ms (kafka-streams)
> -
>
> Key: KAFKA-12360
> URL: https://issues.apache.org/jira/browse/KAFKA-12360
> Project: Kafka
>  Issue Type: Sub-task
>  Components: docs, streams
>Reporter: Domenico Delle Side
>Assignee: John Roesler
>Priority: Minor
>  Labels: beginner, newbie, trivial
>
> _max.task.idle.ms_ is an handy way to pause processing in a *_kafka-streams_* 
> application. This is very useful when you need to join two topics that are 
> out of sync, i.e when data in a topic may be produced _before_ you receive 
> join information in the other topic.
> In the documentation, however, it is not specified that the value of 
> _max.task.idle.ms_ *must* be lower than _max.poll.intervall.ms_, otherwise 
> you'll incur into an endless rebalancing problem.
> I think it is better to clearly state this in the documentation.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-8478) Poll for more records before forced processing

2021-07-08 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8478?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-8478.
-
Resolution: Fixed

> Poll for more records before forced processing
> --
>
> Key: KAFKA-8478
> URL: https://issues.apache.org/jira/browse/KAFKA-8478
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
> Fix For: 3.0.0
>
>
> While analyzing the algorithm of Streams's poll/process loop, I noticed the 
> following:
> The algorithm of runOnce is:
> {code}
> loop0:
>   long poll for records (100ms)
>   loop1:
> loop2: for BATCH_SIZE iterations:
>   process one record in each task that has data enqueued
> adjust BATCH_SIZE
> if loop2 processed any records, repeat loop 1
> else, break loop1 and repeat loop0
> {code}
> There's potentially an unwanted interaction between "keep processing as long 
> as any record is processed" and forcing processing after `max.task.idle.ms`.
> If there are two tasks, A and B, and A runs out of records on one input 
> before B, then B could keep the processing loop running, and hence prevent A 
> from getting any new records, until max.task.idle.ms expires, at which point 
> A will force processing on its other input partition. The intent of idling is 
> to at least give A a chance of getting more records on the empty input, but 
> under this situation, we'd never even check for more records before forcing 
> processing.
> I'm thinking we should only enforce processing if there was a completed poll 
> since we noticed the task was missing inputs (otherwise, we may as well not 
> bother idling at all).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-8315) Historical join issues

2021-07-08 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-8315.
-
Resolution: Fixed

> Historical join issues
> --
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Fix For: 3.0.0
>
> Attachments: code.java
>
>
> The problem we are experiencing is that we cannot reliably perform simple 
> joins over pre-populated kafka topics. This seems more apparent where one 
> topic has records at less frequent record timestamp intervals that the other.
>  An example of the issue is provided in this repository :
> [https://github.com/the4thamigo-uk/join-example]
>  
> The only way to increase the period of historically joined records is to 
> increase the grace period for the join windows, and this has repercussions 
> when you extend it to a large period e.g. 2 years of minute-by-minute records.
> Related slack conversations : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1557733979453900]
>  
>  Research on this issue has gone through a few phases :
> 1) This issue was initially thought to be due to the inability to set the 
> retention period for a join window via {{Materialized: i.e.}}
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
> This was considered to be a problem with the documentation not with the API 
> and is addressed in [https://github.com/apache/kafka/pull/6664]
> 2) We then found an apparent issue in the code which would affect the 
> partition that is selected to deliver the next record to the join. This would 
> only be a problem for data that is out-of-order, and join-example uses data 
> that is in order of timestamp in both topics. So this fix is thought not to 
> affect join-example.
> This was considered to be an issue and is being addressed in 
> [https://github.com/apache/kafka/pull/6719]
>  3) Further investigation using a crafted unit test seems to show that the 
> partition-selection and ordering (PartitionGroup/RecordQueue) seems to work ok
> [https://github.com/the4thamigo-uk/kafka/commit/5121851491f2fd0471d8f3c49940175e23a26f1b]
> 4) the current assumption is that the issue is rooted in the way records are 
> consumed from the topics :
> We have tried to set various options to suppress reads form the source topics 
> but it doesnt seem to make any difference : 
> [https://github.com/the4thamigo-uk/join-example/commit/c674977fd0fdc689152695065d9277abea6bef63]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-7458) Avoid enforced processing during bootstrap phase

2021-07-07 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7458?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-7458.
-
Resolution: Fixed

> Avoid enforced processing during bootstrap phase
> 
>
> Key: KAFKA-7458
> URL: https://issues.apache.org/jira/browse/KAFKA-7458
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: John Roesler
>Priority: Major
> Fix For: 3.0.0
>
>
> In KAFKA-3514, we introduced a new config for allowing users to delay 
> enforcing processing without all input topic partitions to have data. This 
> config's default value is 0, which means that as long as the first fetch does 
> not contains some records for all the partitions it will fall into enforced 
> processing immediately, which is a high risk especially under bootstrap case.
> We should consider leveraging on pause / resume to make sure that upon 
> starting, some partition indeed does not have any data before we fall into 
> enforced processing



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-8410) Strengthen the types of Processors, at least in the DSL, maybe in the PAPI as well

2021-07-07 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8410?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-8410.
-
Resolution: Fixed

> Strengthen the types of Processors, at least in the DSL, maybe in the PAPI as 
> well
> --
>
> Key: KAFKA-8410
> URL: https://issues.apache.org/jira/browse/KAFKA-8410
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
>  Labels: tech-debt
>
> Presently, it's very difficult to have confidence when adding to or modifying 
> processors in the DSL. There's a lot of raw types, duck-typing, and casting 
> that contribute to this problem.
> The root, though, is that the generic types on `Processor` refer only to 
> the _input_ key and value types. No information is captured or verified about 
> what the _output_ types of a processor are. For example, this leads to 
> widespread confusion in the code base about whether a processor produces `V`s 
> or `Change`s. The type system actually makes matters worse, since we use 
> casts to make the processors conform to declared types that are in fact 
> wrong, but are never checked due to erasure.
> We can start to make some headway on this tech debt by adding some types to 
> the ProcessorContext that bound the `` that may be passed to 
> `context.forward`. Then, we can build on this by fully specifying the input 
> and output types of the Processors, which in turn would let us eliminate the 
> majority of unchecked casts in the DSL operators.
> I'm not sure whether adding these generic types to the existing 
> ProcessorContext and Processor interfaces, which would also affect the PAPI 
> has any utility, or whether we should make this purely an internal change by 
> introducing GenericProcessorContext and GenericProcessor peer interfaces for 
> the DSL to use.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13002) dev branch Streams not able to fetch end offsets from pre-3.0 brokers

2021-06-28 Thread John Roesler (Jira)
John Roesler created KAFKA-13002:


 Summary: dev branch Streams not able to fetch end offsets from 
pre-3.0 brokers
 Key: KAFKA-13002
 URL: https://issues.apache.org/jira/browse/KAFKA-13002
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: John Roesler
 Fix For: 3.0.0
 Attachments: soaks.png

Note: this is not a report against a released version of AK. It seems to be a 
problem on the trunk development branch only.

After deploying our soak test against `trunk/HEAD` on Friday, I noticed that 
Streams is no longer processing:

!soaks.png!

I found this stacktrace in the logs during startup:
{code:java}
5075 [2021-06-25T16:50:44-05:00] 
(streams-soak-trunk-ccloud-alos_soak_i-0691913411e8c77c3_streamslog) 
[2021-06-25 21:50:44,499] WARN [i-0691913411e8c77c3-StreamThread-1] The 
listOffsets request failed. 
(org.apache.kafka.streams.processor.internals.ClientUtils)
 5076 [2021-06-25T16:50:44-05:00] 
(streams-soak-trunk-ccloud-alos_soak_i-0691913411e8c77c3_streamslog) 
java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not 
support LIST_OFFSETS with version in range [7,7].   The supported range is 
[0,6].
 5077 at 
org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
 5078 at 
org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
 5079 at 
org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
 5080 at 
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
 5081 at 
org.apache.kafka.streams.processor.internals.ClientUtils.getEndOffsets(ClientUtils.java:147)
 5082 at 
org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.populateClientStatesMap(StreamsPartitionAssignor.java:643)
 5083 at 
org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assignTasksToClients(StreamsPartitionAssignor.java:579)
 5084 at 
org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:387)
 5085 at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:589)
 5086 at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:689)
 5087 at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1000(AbstractCoordinator.java:111)
 5088 at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:593)
 5089 at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:556)
 5090 at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1178)
 5091 at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1153)
 5092 at 
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206)
 5093 at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169)
 5094 at 
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129)
 5095 at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:602)
 5096 at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:412)
 5097 at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297)
 5098 at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
 5099 at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1297)
 5100 at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238)
 5101 at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
 5102 at 
org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:932)
 5103 at 
org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:885)
 5104 at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:720)
 5105 at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)
 5106 at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:555)
 {code}
Just 

[jira] [Created] (KAFKA-12939) After migrating processors, search the codebase for missed migrations

2021-06-11 Thread John Roesler (Jira)
John Roesler created KAFKA-12939:


 Summary: After migrating processors, search the codebase for 
missed migrations
 Key: KAFKA-12939
 URL: https://issues.apache.org/jira/browse/KAFKA-12939
 Project: Kafka
  Issue Type: Sub-task
Reporter: John Roesler


This has to be done after all the other child tickets of KAFKA-8410. All the 
processors to be migrated were marked with:

`@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.`

As a final pass, we should search the codebase to make sure we didn't miss 
anything. A good, broad search would be for the string "Old PAPI" and then 
scrutinize anything that matches.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12842) Failing test: org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest.testSourceTaskNotBlockedOnShutdownWithNonExistentTopic

2021-05-24 Thread John Roesler (Jira)
John Roesler created KAFKA-12842:


 Summary: Failing test: 
org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest.testSourceTaskNotBlockedOnShutdownWithNonExistentTopic
 Key: KAFKA-12842
 URL: https://issues.apache.org/jira/browse/KAFKA-12842
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: John Roesler
 Fix For: 3.0.0


This test failed during a PR build, which means that it failed twice in a row, 
due to the test-retry logic in PR builds.

 

[https://github.com/apache/kafka/pull/10744/checks?check_run_id=2643417209]

 
{noformat}
java.lang.NullPointerException
at 
java.util.concurrent.ConcurrentHashMap.get(ConcurrentHashMap.java:936)
at org.reflections.Store.getAllIncluding(Store.java:82)
at org.reflections.Store.getAll(Store.java:93)
at org.reflections.Reflections.getSubTypesOf(Reflections.java:404)
at 
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.getPluginDesc(DelegatingClassLoader.java:352)
at 
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:337)
at 
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:268)
at 
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:216)
at 
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:209)
at 
org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:61)
at 
org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:93)
at 
org.apache.kafka.connect.util.clusters.WorkerHandle.start(WorkerHandle.java:50)
at 
org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.addWorker(EmbeddedConnectCluster.java:174)
at 
org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.startConnect(EmbeddedConnectCluster.java:260)
at 
org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.start(EmbeddedConnectCluster.java:141)
at 
org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest.testSourceTaskNotBlockedOnShutdownWithNonExistentTopic(ConnectWorkerIntegrationTest.java:303)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
at 
org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
at 
org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
at 

[jira] [Created] (KAFKA-12767) Properly set Streams system test runtime classpath

2021-05-10 Thread John Roesler (Jira)
John Roesler created KAFKA-12767:


 Summary: Properly set Streams system test runtime classpath
 Key: KAFKA-12767
 URL: https://issues.apache.org/jira/browse/KAFKA-12767
 Project: Kafka
  Issue Type: Task
  Components: streams, system tests
Reporter: John Roesler


Some of the streams system tests started to fail recently when we stopped 
exporting our transitive dependencies in the test jar.

[~lct45] was kind enough to submit [https://github.com/apache/kafka/pull/10631] 
to get the system tests running again, but that PR is only a stop-gap.

The real solution is to properly package the transitive dependencies and make 
them available to the system test runtime.

Here is the reason: PR#10631 gets past the issue by removing runtime usages on 
Hamcrest, but Hamcrest is still present in the compiletime classpath. Tomorrow, 
a new PR could add a reference to Hamcrest, and all the unit and integration 
tests would pass, but we would again see the mysterious system test failures. 
Only after another round of costly investigation would we realize the root 
cause, and we might again decide to just patch the test to remove the reference.

It would be far better in the long run to fix the underlying condition: the 
difference between the compiletime and runtime classpaths for the system tests.

 

To help get you started, I'll share some of the groundwork for this task, which 
I put together while trying to understand the nature of the problem.

The first step is to actually copy the transitive dependencies. We had to stop 
placing these dependencies in the `dependant-libs` build directory because that 
results in us actually shipping those dependencies with our releases. Copying a 
similar mechanism from the `:core` project, we can add a new build directory 
(arbitrarily: `dependant-testlibs`), and again copy the runtime dependencies 
there. Here is a commit in my fork that does just that: 
[https://github.com/vvcephei/kafka/commit/8d4552dee05f2a963b8072b86aae756415ea2482]

The next step is to place those jars on the classpath of the system test code. 
The mechanism for that is `kafka-run-class.sh`: 
[https://github.com/apache/kafka/blob/trunk/bin/kafka-run-class.sh]

A specific example of this is the special logic for upgrade tests:
 # If we are running upgrade tests, then we set the artifact directories to the 
relevant version. Otherwise, we use the current build artifacts. 
[https://github.com/apache/kafka/blob/fc405d792de12a50956195827eaf57bbf6c9/bin/kafka-run-class.sh#L77-L85]
 # The, here's where we specifically pull in Hamcrest from those build artifact 
directories: 
[https://github.com/apache/kafka/blob/fc405d792de12a50956195827eaf57bbf6c9/bin/kafka-run-class.sh#L128-L136]

It seems to me that since Hamcrest is actually a more general dependency of the 
tests, we might as well pack it up in `dependant-testlibs` and then pull it 
into the classpath from there any time we're running tests. It looks like we 
ought to set `streams_dependant_clients_lib_dir` to `dependant-testlibs` any 
time `INCLUDE_TEST_JARS` is `true`. But if we do have 
`UPGRADE_KAFKA_STREAMS_TEST_VERSION` set, then it should override the lib dir, 
since those artifacts to copy over the transitive dependencies for those older 
versions already.

 

Although the proposed fix itself is pretty small, I think the testing will take 
a few days. You might want to just put some `echo` statements in 
kafka-run-class.sh to see what jars are included on the classpath while you run 
different tests, both locally using Docker, and remotely using Jenkins.

I marked this ticket as `newbie++` because you don't need deep knowledge of the 
codebase to tackle this ticket, just a high pain tolerance for digging though 
gradle/docker/bash script debugging to make sure the right bits make it into 
the system tests.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12602) The LICENSE and NOTICE files don't list everything they should

2021-04-06 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-12602.
--
Resolution: Fixed

> The LICENSE and NOTICE files don't list everything they should
> --
>
> Key: KAFKA-12602
> URL: https://issues.apache.org/jira/browse/KAFKA-12602
> Project: Kafka
>  Issue Type: Bug
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Blocker
> Fix For: 2.8.0
>
>
> [~jmclean] raised this on the mailing list: 
> [https://lists.apache.org/thread.html/r2df54c11c10d3d38443054998bc7dd92d34362641733c2fb7c579b50%40%3Cdev.kafka.apache.org%3E]
>  
> We need to make  the license file match what we are actually shipping in 
> source and binary distributions.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12625) Fix the NOTICE file

2021-04-06 Thread John Roesler (Jira)
John Roesler created KAFKA-12625:


 Summary: Fix the NOTICE file
 Key: KAFKA-12625
 URL: https://issues.apache.org/jira/browse/KAFKA-12625
 Project: Kafka
  Issue Type: Task
Reporter: John Roesler
 Fix For: 3.0.0, 2.8.1


In https://issues.apache.org/jira/browse/KAFKA-12602, we fixed the license 
file, and in the comments, Justin noted that we really should fix the NOTICE 
file as well.

Basically, we need to look though each of the packaged dependencies and 
transmit each of their NOTICEs (for Apache2 deps) or otherwise, any copyright 
notices they assert.

It would be good to consider automating a check for this as well (see 
https://issues.apache.org/jira/browse/KAFKA-12622)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12624) Fix LICENSE in 2.6

2021-04-06 Thread John Roesler (Jira)
John Roesler created KAFKA-12624:


 Summary: Fix LICENSE in 2.6
 Key: KAFKA-12624
 URL: https://issues.apache.org/jira/browse/KAFKA-12624
 Project: Kafka
  Issue Type: Sub-task
Reporter: John Roesler
Assignee: A. Sophie Blee-Goldman
 Fix For: 2.6.2


Just splitting this out as a sub-task.

I've fixed the parent ticket on trunk and 2.8.

You'll need to cherry-pick the fix from 2.8 (see 
[https://github.com/apache/kafka/pull/10474)]

Then, you can follow the manual verification steps I detailed here: 
https://issues.apache.org/jira/browse/KAFKA-12622



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12623) Fix LICENSE in 2.7

2021-04-06 Thread John Roesler (Jira)
John Roesler created KAFKA-12623:


 Summary: Fix LICENSE in 2.7
 Key: KAFKA-12623
 URL: https://issues.apache.org/jira/browse/KAFKA-12623
 Project: Kafka
  Issue Type: Sub-task
Reporter: John Roesler
Assignee: Mickael Maison
 Fix For: 2.7.1


Just splitting this out as a sub-task.

I've fixed the parent ticket on trunk and 2.8.

You'll need to cherry-pick the fix from 2.8 (see 
[https://github.com/apache/kafka/pull/10474)]

Then, you can follow the manual verification steps I detailed here: 
https://issues.apache.org/jira/browse/KAFKA-12622



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12622) Automate LICENCSE file validation

2021-04-06 Thread John Roesler (Jira)
John Roesler created KAFKA-12622:


 Summary: Automate LICENCSE file validation
 Key: KAFKA-12622
 URL: https://issues.apache.org/jira/browse/KAFKA-12622
 Project: Kafka
  Issue Type: Task
Reporter: John Roesler
 Fix For: 3.0.0, 2.8.1


In https://issues.apache.org/jira/browse/KAFKA-12602, we manually constructed a 
correct license file for 2.8.0. This file will certainly become wrong again in 
later releases, so we need to write some kind of script to automate a check.

It crossed my mind to automate the generation of the file, but it seems to be 
an intractable problem, considering that each dependency may change licenses, 
may package license files, link to them from their poms, link to them from 
their repos, etc. I've also found multiple URLs listed with various delimiters, 
broken links that I have to chase down, etc.

Therefore, it seems like the solution to aim for is simply: list all the jars 
that we package, and print out a report of each jar that's extra or missing vs. 
the ones in our `LICENSE-binary` file.

Here's how I do this manually right now:
{code:java}
// build the binary artifacts
$ ./gradlewAll releaseTarGz

// unpack the binary artifact $ cd core/build/distributions/
$ tar xf kafka_2.13-X.Y.Z.tgz
$ cd xf kafka_2.13-X.Y.Z

// list the packaged jars 
// (you can ignore the jars for our own modules, like kafka, kafka-clients, 
etc.)
$ ls libs/

// cross check the jars with the packaged LICENSE
// make sure all dependencies are listed with the right versions
$ cat LICENSE

// also double check all the mentioned license files are present
$ ls licenses {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12593) Some Scala, Python, and Gradle files contain the wrong license header

2021-04-01 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12593?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-12593.
--
Resolution: Fixed

> Some Scala, Python, and Gradle files contain the wrong license header
> -
>
> Key: KAFKA-12593
> URL: https://issues.apache.org/jira/browse/KAFKA-12593
> Project: Kafka
>  Issue Type: Bug
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Blocker
> Fix For: 2.8.0, 2.7.1, 2.6.2
>
>
> Thanks to [~jmclean] for raising this issue in the mailing list thread:
> [https://lists.apache.org/thread.html/r2df54c11c10d3d38443054998bc7dd92d34362641733c2fb7c579b50%40%3Cdev.kafka.apache.org%3E]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12602) The LICENSE and NOTICE files don't list everything they should

2021-04-01 Thread John Roesler (Jira)
John Roesler created KAFKA-12602:


 Summary: The LICENSE and NOTICE files don't list everything they 
should
 Key: KAFKA-12602
 URL: https://issues.apache.org/jira/browse/KAFKA-12602
 Project: Kafka
  Issue Type: Bug
Reporter: John Roesler
Assignee: John Roesler
 Fix For: 2.8.0, 2.7.1, 2.6.2


[~jmclean] raised this on the mailing list: 
[https://lists.apache.org/thread.html/r2df54c11c10d3d38443054998bc7dd92d34362641733c2fb7c579b50%40%3Cdev.kafka.apache.org%3E]

 

We need to make  the license file match what we are actually shipping in source 
and binary distributions.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12593) The Streams-Scala contains the wrong license header

2021-03-31 Thread John Roesler (Jira)
John Roesler created KAFKA-12593:


 Summary: The Streams-Scala contains the wrong license header
 Key: KAFKA-12593
 URL: https://issues.apache.org/jira/browse/KAFKA-12593
 Project: Kafka
  Issue Type: Bug
Reporter: John Roesler
Assignee: John Roesler
 Fix For: 2.8.0, 2.7.1, 2.6.2


Thanks to [~jmclean] for raising this issue in the mailing list thread:

[https://lists.apache.org/thread.html/r2df54c11c10d3d38443054998bc7dd92d34362641733c2fb7c579b50%40%3Cdev.kafka.apache.org%3E]

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12557) org.apache.kafka.clients.admin.KafkaAdminClientTest#testClientSideTimeoutAfterFailureToReceiveResponse intermittently hangs indefinitely

2021-03-30 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12557?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-12557.
--
Resolution: Fixed

> org.apache.kafka.clients.admin.KafkaAdminClientTest#testClientSideTimeoutAfterFailureToReceiveResponse
>  intermittently hangs indefinitely
> 
>
> Key: KAFKA-12557
> URL: https://issues.apache.org/jira/browse/KAFKA-12557
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, core
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
> Fix For: 3.0.0, 2.8.0
>
>
> While running tests for [https://github.com/apache/kafka/pull/10397,] I got a 
> test timeout under Java 8.
> I ran it locally via `./gradlew clean -PscalaVersion=2.12 :clients:unitTest 
> --profile --no-daemon --continue 
> -PtestLoggingEvents=started,passed,skipped,failed -PignoreFailures=true 
> -PmaxTestRetries=1 -PmaxTestRetryFailures=5` (copied from the Jenkins log) 
> and was able to determine that the hanging test is:
> org.apache.kafka.clients.admin.KafkaAdminClientTest#testClientSideTimeoutAfterFailureToReceiveResponse
> It's odd, but it hangs most times on my branch, and I haven't seen it hang on 
> trunk, despite the fact that my PR doesn't touch the client or core code at 
> all.
> Some debugging reveals that when the client is hanging, it's because the 
> listTopics request is still sitting in its pendingRequests queue, and if I 
> understand the test setup correctly, it would never be completed, since we 
> will never advance time or queue up a metadata response for it.
> I figure a reasonable blanket response to this is just to make sure that the 
> test harness will close the admin client eagerly instead of lazily.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12435) Several streams-test-utils classes missing from javadoc

2021-03-30 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12435?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-12435.
--
Resolution: Fixed

> Several streams-test-utils classes missing from javadoc
> ---
>
> Key: KAFKA-12435
> URL: https://issues.apache.org/jira/browse/KAFKA-12435
> Project: Kafka
>  Issue Type: Bug
>  Components: docs, streams-test-utils
>Reporter: Ismael Juma
>Assignee: John Roesler
>Priority: Blocker
> Fix For: 2.8.0
>
> Attachments: image-2021-03-05-14-22-45-891.png
>
>
> !image-2021-03-05-14-22-45-891.png!
> Only 3 of them show up currently ^. Source: 
> https://kafka.apache.org/27/javadoc/index.html



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12508) Emit-on-change tables may lose updates on error or restart in at_least_once

2021-03-25 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-12508.
--
Resolution: Fixed

Disabled KIP-557 entirely.

> Emit-on-change tables may lose updates on error or restart in at_least_once
> ---
>
> Key: KAFKA-12508
> URL: https://issues.apache.org/jira/browse/KAFKA-12508
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0, 2.7.0, 2.6.1
>Reporter: Nico Habermann
>Assignee: John Roesler
>Priority: Blocker
> Fix For: 2.8.0, 2.7.1, 2.6.2
>
>
> [KIP-557|https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams]
>  added emit-on-change semantics to KTables that suppress updates for 
> duplicate values.
> However, this may cause data loss in at_least_once topologies when records 
> are retried from the last commit due to an error / restart / etc.
>  
> Consider the following example:
> {code:java}
> streams.table(source, materialized)
> .toStream()
> .map(mayThrow())
> .to(output){code}
>  
>  # Record A gets read
>  # Record A is stored in the table
>  # The update for record A is forwarded through the topology
>  # Map() throws (or alternatively, any restart while the forwarded update was 
> still being processed and not yet produced to the output topic)
>  # The stream is restarted and "retries" from the last commit
>  # Record A gets read again
>  # The table will discard the update for record A because
>  ## The value is the same
>  ## The timestamp is the same
>  # Eventually the stream will commit
>  # There is absolutely no output for Record A even though we're running in 
> at_least_once
>  
> This behaviour does not seem intentional. [The emit-on-change logic 
> explicitly forwards records that have the same value and an older 
> timestamp.|https://github.com/apache/kafka/blob/367eca083b44261d4e5fa8aa61b7990a8b35f8b0/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java#L50]
> This logic should probably be changed to also forward updates that have an 
> older *or equal* timestamp.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12557) org.apache.kafka.clients.admin.KafkaAdminClientTest#testClientSideTimeoutAfterFailureToReceiveResponse intermittently hangs indefinitely

2021-03-25 Thread John Roesler (Jira)
John Roesler created KAFKA-12557:


 Summary: 
org.apache.kafka.clients.admin.KafkaAdminClientTest#testClientSideTimeoutAfterFailureToReceiveResponse
 intermittently hangs indefinitely
 Key: KAFKA-12557
 URL: https://issues.apache.org/jira/browse/KAFKA-12557
 Project: Kafka
  Issue Type: Bug
  Components: clients, core
Reporter: John Roesler
Assignee: John Roesler
 Fix For: 3.0.0, 2.8.0


While running tests for [https://github.com/apache/kafka/pull/10397,] I got a 
test timeout under Java 8.

I ran it locally via `./gradlew clean -PscalaVersion=2.12 :clients:unitTest 
--profile --no-daemon --continue 
-PtestLoggingEvents=started,passed,skipped,failed -PignoreFailures=true 
-PmaxTestRetries=1 -PmaxTestRetryFailures=5` (copied from the Jenkins log) and 
was able to determine that the hanging test is:

org.apache.kafka.clients.admin.KafkaAdminClientTest#testClientSideTimeoutAfterFailureToReceiveResponse

It's odd, but it hangs most times on my branch, and I haven't seen it hang on 
trunk, despite the fact that my PR doesn't touch the client or core code at all.

Some debugging reveals that when the client is hanging, it's because the 
listTopics request is still sitting in its pendingRequests queue, and if I 
understand the test setup correctly, it would never be completed, since we will 
never advance time or queue up a metadata response for it.

I figure a reasonable blanket response to this is just to make sure that the 
test harness will close the admin client eagerly instead of lazily.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Reopened] (KAFKA-12508) Emit-on-change tables may lose updates on error or restart in at_least_once

2021-03-24 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler reopened KAFKA-12508:
--
  Assignee: John Roesler  (was: Bruno Cadonna)

> Emit-on-change tables may lose updates on error or restart in at_least_once
> ---
>
> Key: KAFKA-12508
> URL: https://issues.apache.org/jira/browse/KAFKA-12508
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.7.0, 2.6.1
>Reporter: Nico Habermann
>Assignee: John Roesler
>Priority: Blocker
> Fix For: 2.8.0, 2.7.1, 2.6.2
>
>
> [KIP-557|https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams]
>  added emit-on-change semantics to KTables that suppress updates for 
> duplicate values.
> However, this may cause data loss in at_least_once topologies when records 
> are retried from the last commit due to an error / restart / etc.
>  
> Consider the following example:
> {code:java}
> streams.table(source, materialized)
> .toStream()
> .map(mayThrow())
> .to(output){code}
>  
>  # Record A gets read
>  # Record A is stored in the table
>  # The update for record A is forwarded through the topology
>  # Map() throws (or alternatively, any restart while the forwarded update was 
> still being processed and not yet produced to the output topic)
>  # The stream is restarted and "retries" from the last commit
>  # Record A gets read again
>  # The table will discard the update for record A because
>  ## The value is the same
>  ## The timestamp is the same
>  # Eventually the stream will commit
>  # There is absolutely no output for Record A even though we're running in 
> at_least_once
>  
> This behaviour does not seem intentional. [The emit-on-change logic 
> explicitly forwards records that have the same value and an older 
> timestamp.|https://github.com/apache/kafka/blob/367eca083b44261d4e5fa8aa61b7990a8b35f8b0/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java#L50]
> This logic should probably be changed to also forward updates that have an 
> older *or equal* timestamp.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12366) Performance regression in stream-table joins on trunk

2021-03-22 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-12366.
--
Resolution: Fixed

> Performance regression in stream-table joins on trunk
> -
>
> Key: KAFKA-12366
> URL: https://issues.apache.org/jira/browse/KAFKA-12366
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Victoria Xia
>Assignee: John Roesler
>Priority: Blocker
> Fix For: 3.0.0
>
>
> Stream-table join benchmarks have revealed a significant performance 
> regression on trunk as compared to the latest release version. We should 
> investigate as a blocker prior to the 2.8 release.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12514) NPE in SubscriptionState

2021-03-20 Thread John Roesler (Jira)
John Roesler created KAFKA-12514:


 Summary: NPE in SubscriptionState
 Key: KAFKA-12514
 URL: https://issues.apache.org/jira/browse/KAFKA-12514
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.0.0
Reporter: John Roesler
Assignee: John Roesler


In a soak test, we got this exception:

 
{code:java}
java.lang.NullPointerException  at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.partitionLag(SubscriptionState.java:545)
   at 
org.apache.kafka.clients.consumer.KafkaConsumer.currentLag(KafkaConsumer.java:2241)
  at 
org.apache.kafka.streams.processor.internals.PartitionGroup.readyToProcess(PartitionGroup.java:143)
  at 
org.apache.kafka.streams.processor.internals.StreamTask.isProcessable(StreamTask.java:650)
   at 
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:661)
 at 
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1114)
 {code}
This is related to the implementation of:

[https://cwiki.apache.org/confluence/display/KAFKA/KIP-695%3A+Further+Improve+Kafka+Streams+Timestamp+Synchronization]

aka

https://issues.apache.org/jira/browse/KAFKA-10091

 

Luckily, the stack trace is pretty unambiguous. I'll open a PR shortly.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10866) Add fetched metadata to ConsumerRecords

2021-02-08 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10866?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-10866.
--
Resolution: Fixed

> Add fetched metadata to ConsumerRecords
> ---
>
> Key: KAFKA-10866
> URL: https://issues.apache.org/jira/browse/KAFKA-10866
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Blocker
> Fix For: 2.8.0
>
>
> Consumer-side changes for KIP-695



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Reopened] (KAFKA-10866) Add fetched metadata to ConsumerRecords

2021-02-08 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10866?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler reopened KAFKA-10866:
--

> Add fetched metadata to ConsumerRecords
> ---
>
> Key: KAFKA-10866
> URL: https://issues.apache.org/jira/browse/KAFKA-10866
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
> Fix For: 2.8.0
>
>
> Consumer-side changes for KIP-695



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Reopened] (KAFKA-10091) Improve task idling

2021-02-02 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler reopened KAFKA-10091:
--

> Improve task idling
> ---
>
> Key: KAFKA-10091
> URL: https://issues.apache.org/jira/browse/KAFKA-10091
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
> Fix For: 2.8.0
>
>
> When Streams is processing a task with multiple inputs, each time it is ready 
> to process a record, it has to choose which input to process next. It always 
> takes from the input for which the next record has the least timestamp. The 
> result of this is that Streams processes data in timestamp order. However, if 
> the buffer for one of the inputs is empty, Streams doesn't know what 
> timestamp the next record for that input will be.
> Streams introduced a configuration "max.task.idle.ms" in KIP-353 to address 
> this issue.
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-353%3A+Improve+Kafka+Streams+Timestamp+Synchronization]
> The config allows Streams to wait some amount of time for data to arrive on 
> the empty input, so that it can make a timestamp-ordered decision about which 
> input to pull from next.
> However, this config can be hard to use reliably and efficiently, since what 
> we're really waiting for is the next poll that _would_ return data from the 
> empty input's partition, and this guarantee is a function of the poll 
> interval, the max poll interval, and the internal logic that governs when 
> Streams will poll again.
> The ideal case is you'd be able to guarantee at a minimum that _any_ amount 
> of idling would guarantee you poll data from the empty partition if there's 
> data to fetch.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10867) Implement improved semantics using the ConsumerRecords meta

2020-12-18 Thread John Roesler (Jira)
John Roesler created KAFKA-10867:


 Summary: Implement improved semantics using the ConsumerRecords 
meta
 Key: KAFKA-10867
 URL: https://issues.apache.org/jira/browse/KAFKA-10867
 Project: Kafka
  Issue Type: Sub-task
  Components: streams
Reporter: John Roesler
Assignee: John Roesler
 Fix For: 2.8.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10866) Add fetched metadata to ConsumerRecords

2020-12-18 Thread John Roesler (Jira)
John Roesler created KAFKA-10866:


 Summary: Add fetched metadata to ConsumerRecords
 Key: KAFKA-10866
 URL: https://issues.apache.org/jira/browse/KAFKA-10866
 Project: Kafka
  Issue Type: Sub-task
  Components: consumer
Reporter: John Roesler
Assignee: John Roesler
 Fix For: 2.8.0


Consumer-side changes for KIP-695



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10662) Possible hanging test in 2.6 on JDK 11

2020-10-29 Thread John Roesler (Jira)
John Roesler created KAFKA-10662:


 Summary: Possible hanging test in 2.6 on JDK 11
 Key: KAFKA-10662
 URL: https://issues.apache.org/jira/browse/KAFKA-10662
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.6.1
Reporter: John Roesler
 Attachments: timeout-1.txt, timeout-2.txt, timeout-4.txt

While adding a Jenkinsfile to the 2.6 branch 
([https://github.com/apache/kafka/pull/9471),]

I observed the JDK 11 build specifically to hang, 3/5 times (and pass within a 
normal timeframe of 2.5 hours the other two times).

I haven't seen similar behavior on any other branch, so there may be something 
about the 2.6 codebase or the 2.6 tests themselves that interact poorly with 
Java 11.

 

I did some analysis on the failing results, and found that in all three hanging 
cases, all the tests that "STARTED" either "PASSED" or were "SKIPPED". So, I 
was not able to identify a specific culprit. I've attached the logs for these 
runs, in case they aid any investigation.

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10638) QueryableStateIntegrationTest fails due to stricter store checking

2020-10-29 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-10638.
--
Resolution: Fixed

> QueryableStateIntegrationTest fails due to stricter store checking
> --
>
> Key: KAFKA-10638
> URL: https://issues.apache.org/jira/browse/KAFKA-10638
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.7.0
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
> Fix For: 2.7.0
>
>
> Observed:
> {code:java}
> org.apache.kafka.streams.errors.InvalidStateStoreException: Cannot get state 
> store source-table because the stream thread is PARTITIONS_ASSIGNED, not 
> RUNNING
>   at 
> org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider.stores(StreamThreadStateStoreProvider.java:81)
>   at 
> org.apache.kafka.streams.state.internals.WrappingStoreProvider.stores(WrappingStoreProvider.java:50)
>   at 
> org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStore.get(CompositeReadOnlyKeyValueStore.java:52)
>   at 
> org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQuerySpecificActivePartitionStores(StoreQueryIntegrationTest.java:200)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>   at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>   at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>   at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
>   at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>   at sun.reflect.GeneratedMethodAccessor23.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>   at 
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
>   at 
> org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
>   at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
>   at 
> org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:119)
>   at 

[jira] [Created] (KAFKA-10638) QueryableStateIntegrationTest fails due to stricter store checking

2020-10-23 Thread John Roesler (Jira)
John Roesler created KAFKA-10638:


 Summary: QueryableStateIntegrationTest fails due to stricter store 
checking
 Key: KAFKA-10638
 URL: https://issues.apache.org/jira/browse/KAFKA-10638
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.7.0
Reporter: John Roesler
Assignee: John Roesler


Observed:
{code:java}
org.apache.kafka.streams.errors.InvalidStateStoreException: Cannot get state 
store source-table because the stream thread is PARTITIONS_ASSIGNED, not RUNNING
at 
org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider.stores(StreamThreadStateStoreProvider.java:81)
at 
org.apache.kafka.streams.state.internals.WrappingStoreProvider.stores(WrappingStoreProvider.java:50)
at 
org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStore.get(CompositeReadOnlyKeyValueStore.java:52)
at 
org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQuerySpecificActivePartitionStores(StoreQueryIntegrationTest.java:200)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
at 
org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
at 
org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
at sun.reflect.GeneratedMethodAccessor23.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
at 
org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
at 
org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
at 
org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:119)
at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
at 

[jira] [Resolved] (KAFKA-10284) Group membership update due to static member rejoin should be persisted

2020-10-22 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10284?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-10284.
--
Resolution: Fixed

> Group membership update due to static member rejoin should be persisted
> ---
>
> Key: KAFKA-10284
> URL: https://issues.apache.org/jira/browse/KAFKA-10284
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.3.0, 2.4.0, 2.5.0, 2.6.0
>Reporter: Boyang Chen
>Assignee: feyman
>Priority: Critical
>  Labels: help-wanted
> Fix For: 2.7.0
>
> Attachments: How to reproduce the issue in KAFKA-10284.md
>
>
> For known static members rejoin, we would update its corresponding member.id 
> without triggering a new rebalance. This serves the purpose for avoiding 
> unnecessary rebalance for static membership, as well as fencing purpose if 
> some still uses the old member.id. 
> The bug is that we don't actually persist the membership update, so if no 
> upcoming rebalance gets triggered, this new member.id information will get 
> lost during group coordinator immigration, thus bringing up the zombie member 
> identity.
> The bug find credit goes to [~hachikuji] 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10630) State Directory config could be improved

2020-10-22 Thread John Roesler (Jira)
John Roesler created KAFKA-10630:


 Summary: State Directory config could be improved
 Key: KAFKA-10630
 URL: https://issues.apache.org/jira/browse/KAFKA-10630
 Project: Kafka
  Issue Type: Task
  Components: streams
Reporter: John Roesler


During [https://github.com/apache/kafka/pull/9477,] I noticed that many tests 
wind up providing a state directory config purely to ensure a unique temp 
directory for the test. Since TopologyTestDriver and MockProcessorContext tests 
are typically unit tests, it would be more convenient to initialize those 
components with their own unique temp state directory, following the universal 
pattern from such tests:
{code:java}
props.setProperty(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getAbsolutePath()); {code}
Note that this literal setting is not ideal, since it actually creates a 
directory regardless of whether the application needs one. Instead, we should 
create a new TestUtil method to lazily generate a temp directory _name_ and 
then register a shutdown handler to delete it if it exists. Then, Streams would 
only create the directory if it actually needs persistence.

Also, the default value for that config is not platform independent. It is 
simply: {color:#067d17}"/tmp/kafka-streams"{color}. Perhaps instead we should 
set the default to something like "unset" or "" or "none". Then, instead of 
reading the property directly, when Streams actually needs the state directory, 
it could log a warning that the state directory config is not set and call the 
platform-independent Java api for creating a temporary directory.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10629) TopologyTestDriver should not require a Properties arg

2020-10-22 Thread John Roesler (Jira)
John Roesler created KAFKA-10629:


 Summary: TopologyTestDriver should not require a Properties arg
 Key: KAFKA-10629
 URL: https://issues.apache.org/jira/browse/KAFKA-10629
 Project: Kafka
  Issue Type: Task
  Components: streams, streams-test-utils
Reporter: John Roesler


As of [https://github.com/apache/kafka/pull/9477,] many TopologyTestDriver 
usages will have no configurations at all to specify, so we should provide a 
constructor that doesn't take a Properties argument. Right now, such 
configuration-free usages have to provide an empty Properties object.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10628) Follow-up: Remove all unnecessary dummy TopologyTestDriver configs

2020-10-22 Thread John Roesler (Jira)
John Roesler created KAFKA-10628:


 Summary: Follow-up: Remove all unnecessary dummy 
TopologyTestDriver configs
 Key: KAFKA-10628
 URL: https://issues.apache.org/jira/browse/KAFKA-10628
 Project: Kafka
  Issue Type: Task
Reporter: John Roesler


After [https://github.com/apache/kafka/pull/9477,] we no longer need to specify 
dummy values for bootstrap servers and application id when creating a 
TopologyTestDriver.

 

This task is to track down all those unnecessary parameters and delete them. 
You can consult the above pull request for some examples of this.

 

Note that there are times when the application id is actually significant, 
since it is used in conjunction with the state directory to give the driver a 
unique place to store local state. On the other hand, it would be sufficient to 
just set a unique state directory and not bother with the app id in that case.

 

During review, [~chia7712] pointed out that this comment 
([https://github.com/apache/kafka/blob/trunk/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java#L138])
 can be removed since it is not necessary anymore. (It's the mention of the 
dummy params from the javadoc of the TopologyTestDriver)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10605) KIP-478: deprecate the replaced Processor API members

2020-10-19 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10605?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-10605.
--
Resolution: Fixed

> KIP-478: deprecate the replaced Processor API members
> -
>
> Key: KAFKA-10605
> URL: https://issues.apache.org/jira/browse/KAFKA-10605
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Blocker
> Fix For: 2.7.0
>
>
> This is a minor task, but we shouldn't do the release without it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10455) Probing rebalances are not guaranteed to be triggered by non-leader members

2020-10-19 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10455?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-10455.
--
Resolution: Fixed

> Probing rebalances are not guaranteed to be triggered by non-leader members
> ---
>
> Key: KAFKA-10455
> URL: https://issues.apache.org/jira/browse/KAFKA-10455
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: A. Sophie Blee-Goldman
>Assignee: Leah Thomas
>Priority: Blocker
> Fix For: 2.7.0, 2.6.1
>
>
> Apparently, if a consumer rejoins the group with the same subscription 
> userdata that it previously sent, it will not trigger a rebalance. The one 
> exception here is that the group leader will always trigger a rebalance when 
> it rejoins the group.
> This has implications for KIP-441, where we rely on asking an arbitrary 
> thread to enforce the followup probing rebalances. Technically we do ask a 
> thread living on the same instance as the leader, so the odds that the leader 
> will be chosen aren't completely abysmal, but for any multithreaded 
> application they are still at best only 50%.
> Of course in general the userdata will have changed within a span of 10 
> minutes, so the actual likelihood of hitting this is much lower –  it can 
> only happen if the member's task offset sums remained unchanged. 
> Realistically, this probably requires that the member only have 
> fully-restored active tasks (encoded with the constant sentinel -2) and that 
> no tasks be added or removed.
>  
> One solution would be to make sure the leader is responsible for the probing 
> rebalance. To do this, we would need to somehow expose the memberId of the 
> thread's main consumer to the partition assignor. I'm actually not sure if 
> that's currently possible to figure out or not. If not, we could just assign 
> the probing rebalance to every thread on the leader's instance. This 
> shouldn't result in multiple followup rebalances as the rebalance schedule 
> will be updated/reset on the first followup rebalance.
> Another solution would be to make sure the userdata is always different. We 
> could encode an extra bit that flip-flops, but then we'd have to persist the 
> latest value somewhere/somehow. Alternatively we could just encode the next 
> probing rebalance time in the subscription userdata, since that is guaranteed 
> to always be different from the previous rebalance. This might get tricky 
> though, and certainly wastes space in the subscription userdata. Also, this 
> would only solve the problem for KIP-441 probing rebalances, meaning we'd 
> have to individually ensure the userdata has changed for every type of 
> followup rebalance (see related issue below). So the first proposal, 
> requiring the leader trigger the rebalance, would be preferable.
> Note that, imho, we should just allow anyone to trigger a rebalance by 
> rejoining the group. But this would presumably require a broker-side change 
> and thus we would still need a workaround for KIP-441 to work with brokers.
>  
> Related issue:
> This also means the Streams workaround for [KAFKA-9821|http://example.com] is 
> not airtight, as we encode the followup rebalance in the member who is 
> supposed to _receive_ a revoked partition, rather than the member who is 
> actually revoking said partition. While the member doing the revoking will be 
> guaranteed to have different userdata, the member receiving the partition may 
> not. Making it the responsibility of the leader to trigger _any_ type of 
> followup rebalance would solve this issue as well.
> Note that other types of followup rebalance (version probing, static 
> membership with host info change) are guaranteed to have a change in the 
> subscription userdata, and will not hit this bug



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10215) MockProcessorContext doesn't work with SessionStores

2020-10-13 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-10215.
--
Resolution: Fixed

Fixed in the new processor API.

> MockProcessorContext doesn't work with SessionStores
> 
>
> Key: KAFKA-10215
> URL: https://issues.apache.org/jira/browse/KAFKA-10215
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
> Fix For: 2.7.0
>
>
> The recommended pattern for testing custom Processor implementations is to 
> use the test-utils MockProcessorContext. If a Processor implementation needs 
> a store, the store also has to be initialized with the same context. However, 
> the existing (in-memory and persistent) Session store implementations perform 
> internal casts that result in class cast exceptions if you attempt to 
> initialize them with the MockProcessorContext.
> A workaround is to instead embed the processor in an application and use the 
> TopologyTestDriver instead.
> The fix is the same as for KAFKA-10200



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10605) KIP-478: deprecate the replaced Processor API members

2020-10-13 Thread John Roesler (Jira)
John Roesler created KAFKA-10605:


 Summary: KIP-478: deprecate the replaced Processor API members
 Key: KAFKA-10605
 URL: https://issues.apache.org/jira/browse/KAFKA-10605
 Project: Kafka
  Issue Type: Sub-task
Reporter: John Roesler
Assignee: John Roesler
 Fix For: 2.7.0


This is a minor task, but we shouldn't do the release without it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10562) KIP-478: Delegate the store wrappers to the new init method

2020-10-13 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10562?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-10562.
--
Resolution: Fixed

> KIP-478: Delegate the store wrappers to the new init method
> ---
>
> Key: KAFKA-10562
> URL: https://issues.apache.org/jira/browse/KAFKA-10562
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 2.7.0
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Blocker
> Fix For: 2.7.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10437) KIP-478: Implement test-utils changes

2020-10-13 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-10437.
--
Resolution: Fixed

> KIP-478: Implement test-utils changes
> -
>
> Key: KAFKA-10437
> URL: https://issues.apache.org/jira/browse/KAFKA-10437
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
> Fix For: 2.7.0
>
>
> In addition to implementing the KIP, search for and resolve these todos:
> {color:#008dde}TODO will be fixed in KAFKA-10437{color}
> Also, add unit tests in test-utils making sure we can initialize _all_ the 
> kinds of store with the MPC and MPC.getSSC.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10536) KIP-478: Implement KStream changes

2020-10-12 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10536?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-10536.
--
Resolution: Fixed

> KIP-478: Implement KStream changes
> --
>
> Key: KAFKA-10536
> URL: https://issues.apache.org/jira/browse/KAFKA-10536
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
> Fix For: 2.7.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10603) Re-design

2020-10-12 Thread John Roesler (Jira)
John Roesler created KAFKA-10603:


 Summary: Re-design 
 Key: KAFKA-10603
 URL: https://issues.apache.org/jira/browse/KAFKA-10603
 Project: Kafka
  Issue Type: New Feature
Reporter: John Roesler






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10598) KafkaStreams reports inappropriate error message for IQ

2020-10-12 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10598?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-10598.
--
Resolution: Fixed

> KafkaStreams reports inappropriate error message for IQ
> ---
>
> Key: KAFKA-10598
> URL: https://issues.apache.org/jira/browse/KAFKA-10598
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
> Fix For: 2.7.0, 2.6.1
>
>
> Presently, KafkaStreams#store , or calling methods on the returned store, 
> will throw an InvalidStateStoreException if the store name or type is wrong. 
> However, the message in the exception is "The state store may have migrated 
> to another instance", which is incorrect.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10598) KafkaStreams reports inappropriate error message for IQ

2020-10-10 Thread John Roesler (Jira)
John Roesler created KAFKA-10598:


 Summary: KafkaStreams reports inappropriate error message for IQ
 Key: KAFKA-10598
 URL: https://issues.apache.org/jira/browse/KAFKA-10598
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: John Roesler
Assignee: John Roesler
 Fix For: 2.7.0, 2.6.1


Presently, KafkaStreams#store , or calling methods on the returned store, will 
throw an InvalidStateStoreException if the store name or type is wrong. 
However, the message in the exception is "The state store may have migrated to 
another instance", which is incorrect.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10530) kafka-streams-application-reset misses some internal topics

2020-10-05 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10530?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-10530.
--
Resolution: Duplicate

Closing now, since this seems like a duplicate report, and visual code 
inspection indicates it should have been fixed.

If you do still see this [~oweiler] , please feel free to re-open the ticket.

> kafka-streams-application-reset misses some internal topics
> ---
>
> Key: KAFKA-10530
> URL: https://issues.apache.org/jira/browse/KAFKA-10530
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, tools
>Affects Versions: 2.6.0
>Reporter: Oliver Weiler
>Priority: Major
>
> While the \{{kafka-streams-application-reset}} tool works in most cases, it 
> misses some internal topics when using {{Foreign Key Table-Table Joins}}.
> After execution, there are still two internal topics left which were not 
> deleted
> {code}
> bv4-indexer-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-06-topic
> bbv4-indexer-717e6cc5-acb2-498d-9d08-4814aaa71c81-StreamThread-1-consumer 
> bbv4-indexer-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-14-topic
> {code}
> The reason seems to be the {{StreamsResetter.isInternalTopic}} which requires 
> the internal topic to end with {{-changelog}} or {{-repartition}} (which the 
> mentioned topics don't).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10535) KIP-478: Implement StateStoreContext and Record

2020-10-05 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10535?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-10535.
--
Resolution: Fixed

> KIP-478: Implement StateStoreContext and Record
> ---
>
> Key: KAFKA-10535
> URL: https://issues.apache.org/jira/browse/KAFKA-10535
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10562) Delegate the store wrappers to the new init method

2020-10-01 Thread John Roesler (Jira)
John Roesler created KAFKA-10562:


 Summary: Delegate the store wrappers to the new init method
 Key: KAFKA-10562
 URL: https://issues.apache.org/jira/browse/KAFKA-10562
 Project: Kafka
  Issue Type: Sub-task
Affects Versions: 2.7.0
Reporter: John Roesler
Assignee: John Roesler






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10546) KIP-478: Deprecate old PAPI

2020-09-29 Thread John Roesler (Jira)
John Roesler created KAFKA-10546:


 Summary: KIP-478: Deprecate old PAPI
 Key: KAFKA-10546
 URL: https://issues.apache.org/jira/browse/KAFKA-10546
 Project: Kafka
  Issue Type: Sub-task
Reporter: John Roesler
Assignee: John Roesler


Can't be done until after the DSL internals are migrated.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10544) Convert KTable aggregations to new PAPI

2020-09-29 Thread John Roesler (Jira)
John Roesler created KAFKA-10544:


 Summary: Convert KTable aggregations to new PAPI
 Key: KAFKA-10544
 URL: https://issues.apache.org/jira/browse/KAFKA-10544
 Project: Kafka
  Issue Type: Sub-task
Reporter: John Roesler






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10541) Convert KTable filters to new PAPI

2020-09-29 Thread John Roesler (Jira)
John Roesler created KAFKA-10541:


 Summary: Convert KTable filters to new PAPI
 Key: KAFKA-10541
 URL: https://issues.apache.org/jira/browse/KAFKA-10541
 Project: Kafka
  Issue Type: Sub-task
Reporter: John Roesler
Assignee: John Roesler






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10542) Convert KTable maps to new PAPI

2020-09-29 Thread John Roesler (Jira)
John Roesler created KAFKA-10542:


 Summary: Convert KTable maps to new PAPI
 Key: KAFKA-10542
 URL: https://issues.apache.org/jira/browse/KAFKA-10542
 Project: Kafka
  Issue Type: Sub-task
Reporter: John Roesler






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10543) Convert KTable joins to new PAPI

2020-09-29 Thread John Roesler (Jira)
John Roesler created KAFKA-10543:


 Summary: Convert KTable joins to new PAPI
 Key: KAFKA-10543
 URL: https://issues.apache.org/jira/browse/KAFKA-10543
 Project: Kafka
  Issue Type: Sub-task
Reporter: John Roesler






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


  1   2   3   >