[jira] [Resolved] (KAFKA-15242) FixedKeyProcessor testing is unusable

2024-05-21 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15242?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-15242.
-
  Assignee: (was: Alexander Aghili)
Resolution: Duplicate

> FixedKeyProcessor testing is unusable
> -
>
> Key: KAFKA-15242
> URL: https://issues.apache.org/jira/browse/KAFKA-15242
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Zlstibor Veljkovic
>Priority: Major
>
> Using mock processor context to get the forwarded message doesn't work.
> Also there is not a well documented way for testing FixedKeyProcessors.
> Please see the repo at [https://github.com/zveljkovic/kafka-repro]
> but most important piece is test file with runtime and compile time errors:
> [https://github.com/zveljkovic/kafka-repro/blob/master/src/test/java/com/example/demo/MyFixedKeyProcessorTest.java]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16644) FK join emits duplicate tombstone on left-side delete

2024-04-30 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16644?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-16644.
-
Resolution: Duplicate

> FK join emits duplicate tombstone on left-side delete
> -
>
> Key: KAFKA-16644
> URL: https://issues.apache.org/jira/browse/KAFKA-16644
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Matthias J. Sax
>Priority: Major
>
> We introduced a regression bug in 3.7.0 release via KAFKA-14748. When a 
> left-hand side record is deleted, the join now emits two tombstone records 
> instead of one.
> The problem was not detected via unit test, because the tests use a `Map` 
> instead of a `List` when verifying the result topic records 
> ([https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java#L250-L255).]
> We should update all test cases to use `List` when reading from the output 
> topic, and of course fix the introduced bug: The 
> `SubscriptionSendProcessorSupplier` is sending two subscription records 
> instead of just a single one: 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L133-L136]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16644) FK join emit duplicate tombstone on left-side delete

2024-04-29 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-16644:
---

 Summary: FK join emit duplicate tombstone on left-side delete
 Key: KAFKA-16644
 URL: https://issues.apache.org/jira/browse/KAFKA-16644
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.7.0
Reporter: Matthias J. Sax


We introduced a regression bug in 3.7.0 release via KAFKA-14778. When a 
left-hand side record is deleted, the join now emits two tombstone records 
instead of one.

The problem was not detected via unit test, because the test use a `Map` 
instead of a `List` when verifying the result topic records 
([https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java#L250-L255).]

We should update all test cases to use `List` when reading from the output 
topic, and of course fix the introduces bug: The 
`SubscriptionSendProcessorSupplier` is sending two subscription records instead 
of just a single one: 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L133-L136]

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16486) Integrate metric measurability changes in metrics collector

2024-04-19 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16486?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-16486.
-
Fix Version/s: 3.8.0
   Resolution: Done

> Integrate metric measurability changes in metrics collector
> ---
>
> Key: KAFKA-16486
> URL: https://issues.apache.org/jira/browse/KAFKA-16486
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Apoorv Mittal
>Assignee: Apoorv Mittal
>Priority: Major
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16586) Test TaskAssignorConvergenceTest failing

2024-04-18 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-16586:
---

 Summary: Test TaskAssignorConvergenceTest failing
 Key: KAFKA-16586
 URL: https://issues.apache.org/jira/browse/KAFKA-16586
 Project: Kafka
  Issue Type: Test
  Components: streams, unit tests
Reporter: Matthias J. Sax


{code:java}
java.lang.AssertionError: Assertion failed in randomized test. Reproduce with: 
`runRandomizedScenario(-538095696758490522)`.at 
org.apache.kafka.streams.processor.internals.assignment.TaskAssignorConvergenceTest.runRandomizedScenario(TaskAssignorConvergenceTest.java:545)
  at 
org.apache.kafka.streams.processor.internals.assignment.TaskAssignorConvergenceTest.randomClusterPerturbationsShouldConverge(TaskAssignorConvergenceTest.java:479){code}
This might expose an actual bug (or incorrect test setup) and should be 
reproducible (die not try it myself yet).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16280) Expose method to determine Metric Measurability

2024-04-18 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16280?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-16280.
-
Resolution: Done

> Expose method to determine Metric Measurability
> ---
>
> Key: KAFKA-16280
> URL: https://issues.apache.org/jira/browse/KAFKA-16280
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics
>Affects Versions: 3.8.0
>Reporter: Apoorv Mittal
>Assignee: Apoorv Mittal
>Priority: Major
>  Labels: kip
> Fix For: 3.8.0
>
>
> The Jira is to track the development of KIP-1019: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-1019%3A+Expose+method+to+determine+Metric+Measurability]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16575) Automatically remove KTable aggregation result when group becomes empty

2024-04-17 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-16575:
---

 Summary: Automatically remove KTable aggregation result when group 
becomes empty
 Key: KAFKA-16575
 URL: https://issues.apache.org/jira/browse/KAFKA-16575
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Matthias J. Sax


Using `KTable.groupBy(...).aggregate(...)` can handle updates (inserts, 
deletes, actual updates) of the input KTable, by calling the provided `Adder` 
and `Subtractor`. However, when all records from the input table (which map to 
the same group/row in the result table) get removed, the result entry is not 
removed automatically.

For example, if we implement a "count", the count would go to zero for a group 
by default, instead of removing the row from the result, if all input record 
for this group got deleted.

Users can let their `Subtractor` return `null` for this case, to actually 
delete the row, but it's not well documented and it seems it should be a 
built-in feature of the table-aggregation to remove "empty groups" from the 
result, instead of relying on "correct" behavior of user-code.

(Also the built-in `count()` does not return `null`, but actually zero...)

An internal counter how many elements are in a group should be sufficient. Of 
course, there is backward compatibility questions we need to answer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16508) Infinte loop if output topic does not exisit

2024-04-10 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-16508:
---

 Summary: Infinte loop if output topic does not exisit
 Key: KAFKA-16508
 URL: https://issues.apache.org/jira/browse/KAFKA-16508
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Matthias J. Sax


Kafka Streams supports `ProductionExceptionHandler` to drop records on error 
when writing into an output topic.

However, if the output topic does not exist, the corresponding error cannot be 
skipped over because the handler is not called.

The issue is, that the producer internally retires to fetch the output topic 
metadata until it times out, an a `TimeoutException` (which is a 
`RetriableException`) is returned via the registered `Callback`. However, for 
`RetriableException` there is different code path and the 
`ProductionExceptionHandler` is not called.

In general, Kafka Streams correctly tries to handle as many errors a possible 
internally, and a `RetriableError` falls into this category (and thus there is 
no need to call the handler). However, for this particular case, just retrying 
does not solve the issue – it's unclear if throwing a retryable 
`TimeoutException` is actually the right thing to do for the Producer? Also not 
sure what the right way to address this ticket would be (currently, we cannot 
really detect this case, except if we would do some nasty error message String 
comparison what sounds hacky...)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16357) Kafka Client JAR manifest breaks javac linting

2024-03-11 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16357?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-16357.
-
Resolution: Duplicate

> Kafka Client JAR manifest breaks javac linting
> --
>
> Key: KAFKA-16357
> URL: https://issues.apache.org/jira/browse/KAFKA-16357
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.7.0
> Environment: Linux, JDK 21 (Docker image eclipse-temurin:21-jdk-jammy)
>Reporter: Jacek Wojciechowski
>Priority: Critical
>
> I upgraded kafka-clients from 3.6.1 to 3.7.0 and discovered that my project 
> is not building anymore.
> The reason is that kafka-clients-3.7.0.jar contains the following entry in 
> its JAR manifest file:
> Class-Path: zstd-jni-1.5.5-6.jar lz4-java-1.8.0.jar snappy-java-1.1.10
>  .5.jar slf4j-api-1.7.36.jar
> I'm using Maven repo to keep my dependencies and those files are not in the 
> same directory as kafka-clients-3.7.0.jar, so the paths in the manifest's 
> Class-Path are not correct. It fails my build because we build with javac 
> with all linting options on, in particular -Xlint:-path. It produces the 
> following warnings coming from javac:
> [WARNING] COMPILATION WARNING : 
> [INFO] -
> [WARNING] [path] bad path element 
> "/home/ci/.m2/repository/org/apache/kafka/kafka-clients/3.7.0/zstd-jni-1.5.5-6.jar":
>  no such file or directory
> [WARNING] [path] bad path element 
> "/home/ci/.m2/repository/org/apache/kafka/kafka-clients/3.7.0/lz4-java-1.8.0.jar":
>  no such file or directory
> [WARNING] [path] bad path element 
> "/home/ci/.m2/repository/org/apache/kafka/kafka-clients/3.7.0/snappy-java-1.1.10.5.jar":
>  no such file or directory
> [WARNING] [path] bad path element 
> "/home/ci/.m2/repository/org/apache/kafka/kafka-clients/3.7.0/slf4j-api-1.7.36.jar":
>  no such file or directory
> Since we have also {{-Werror}} option enabled, it turns warnings into errors 
> and fails our build.
> I think our setup is quite typical: using Maven repo to store dependencies, 
> having linting on and -Werror. Unfortunatelly, it doesn't work with the 
> lastest kafka-clients because of the entries in the manifest's Class-Path. 
> And I think it might affect quite a lot of projects set up in a similar way.
> I don't know what was the reason to add Class-Path entry in the JAR manifest 
> file - but perhaps this effect was not considered.
> It would be great if you removed the Class-Path entry from the JAR manifest 
> file.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16360) Release plan of 3.x kafka releases.

2024-03-11 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-16360.
-
Resolution: Invalid

Please don't use Jira to ask questions. Jira tickets are for bug reports and 
features only.

Question should be asked on the user and/or dev mailing lists: 
https://kafka.apache.org/contact

> Release plan of 3.x kafka releases.
> ---
>
> Key: KAFKA-16360
> URL: https://issues.apache.org/jira/browse/KAFKA-16360
> Project: Kafka
>  Issue Type: Improvement
>Reporter: kaushik srinivas
>Priority: Major
>
> KIP 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-833%3A+Mark+KRaft+as+Production+Ready#KIP833:MarkKRaftasProductionReady-ReleaseTimeline]
>  mentions ,
> h2. Kafka 3.7
>  * January 2024
>  * Final release with ZK mode
> But we see in Jira, some tickets are marked for 3.8 release. Does apache 
> continue to make 3.x releases having zookeeper and kraft supported 
> independent of pure kraft 4.x releases ?
> If yes, how many more releases can be expected on 3.x release line ?
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16366) Refactor KTable source optimization

2024-03-11 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-16366:
---

 Summary: Refactor KTable source optimization
 Key: KAFKA-16366
 URL: https://issues.apache.org/jira/browse/KAFKA-16366
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Matthias J. Sax


Kafka Streams DSL offers an optimization to re-use an input topic as table 
changelog, in favor of creating a dedicated changelog topic.

So far, the Processor API did not support any such feature, and thus when the 
DSL compiles down into a Topology, we needed to access topology internal stuff 
to allow for this optimization.

With KIP-813 (merged for AK 3.8), we added `Topology#addReadOnlyStateStore` as 
public API, and thus we should refactor the DSL compilation code, to use this 
public API to build the `Topology` instead of internal APIs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15576) Add 3.6.0 to broker/client and streams upgrade/compatibility tests

2024-03-07 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-15576.
-
Resolution: Fixed

> Add 3.6.0 to broker/client and streams upgrade/compatibility tests
> --
>
> Key: KAFKA-15576
> URL: https://issues.apache.org/jira/browse/KAFKA-15576
> Project: Kafka
>  Issue Type: Task
>Reporter: Satish Duggana
>Priority: Major
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16350) StateUpdated does not init transaction after canceling task close action

2024-03-06 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-16350:
---

 Summary: StateUpdated does not init transaction after canceling 
task close action
 Key: KAFKA-16350
 URL: https://issues.apache.org/jira/browse/KAFKA-16350
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Matthias J. Sax


With EOSv2, we use a thread producer shared across all tasks. We init tx on the 
producer with each _task_ (due to EOSv1 which uses a producer per task), and 
have a guard in place to only init tx a single time.

If we hit an error, we close the producer and create a new one, which is still 
not initialized for transaction. At the same time, with state updater, we 
schedule a "close task" action on error.

For each task we get back, we do cancel the "close task" action, to actually 
keep the task. If this happens for _all_ tasks, we don't have any task in state 
CRATED at hand, and thus we never init the producer for transactions, because 
we assume this was already done.

On the first `send` request, we crash with an IllegalStateException:{{{}{}}}
{code:java}
Invalid transition attempted from state UNINITIALIZED to state IN_TRANSACTION 
{code}
This bug is exposed via EOSIntegrationTest (logs attached).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15417) JoinWindow does not seem to work properly with a KStream - KStream - LeftJoin()

2024-03-05 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-15417.
-
Fix Version/s: 3.8.0
   Resolution: Fixed

> JoinWindow does not  seem to work properly with a KStream - KStream - 
> LeftJoin()
> 
>
> Key: KAFKA-15417
> URL: https://issues.apache.org/jira/browse/KAFKA-15417
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.4.0
>Reporter: Victor van den Hoven
>Assignee: Victor van den Hoven
>Priority: Major
> Fix For: 3.8.0
>
> Attachments: Afbeelding 1-1.png, Afbeelding 1.png, 
> SimpleStreamTopology.java, SimpleStreamTopologyTest.java
>
>
> In Kafka-streams 3.4.0 :
> According to the javadoc of the Joinwindow:
> _There are three different window configuration supported:_
>  * _before = after = time-difference_
>  * _before = 0 and after = time-difference_
>  * _*before = time-difference and after = 0*_
>  
> However if I use a joinWindow with *before = time-difference and after = 0* 
> on a kstream-kstream-leftjoin the *after=0* part does not seem to work.
> When using _stream1.leftjoin(stream2, joinWindow)_ with 
> {_}joinWindow{_}.{_}after=0 and joinWindow.before=30s{_}, any new message on 
> stream 1 that can not be joined with any messages on stream2 should be joined 
> with a null-record after the _joinWindow.after_ has ended and a new message 
> has arrived on stream1.
> It does not.
> Only if the new message arrives after the value of _joinWindow.before_ the 
> previous message will be joined with a null-record.
>  
> Attached you can find two files with a TopologyTestDriver Unit test to 
> reproduce.
> topology:   stream1.leftjoin( stream2, joiner, joinwindow)
> joinWindow has before=5000ms and after=0
> message1(key1) ->  stream1
> after 4000ms message2(key2) -> stream1  ->  NO null-record join was made, but 
> the after period was expired.
> after 4900ms message2(key2) -> stream1  ->  NO null-record join was made, but 
> the after period was expired.
> after 5000ms message2(key2) -> stream1  ->  A null-record join was made,  
> before period was expired.
> after 6000ms message2(key2) -> stream1  ->  A null-record join was made,  
> before period was expired.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14747) FK join should record discarded subscription responses

2024-03-04 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14747?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-14747.
-
Fix Version/s: 3.8.0
   Resolution: Fixed

> FK join should record discarded subscription responses
> --
>
> Key: KAFKA-14747
> URL: https://issues.apache.org/jira/browse/KAFKA-14747
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Ayoub Omari
>Priority: Minor
>  Labels: beginner, newbie
> Fix For: 3.8.0
>
>
> FK-joins are subject to a race condition: If the left-hand side record is 
> updated, a subscription is sent to the right-hand side (including a hash 
> value of the left-hand side record), and the right-hand side might send back 
> join responses (also including the original hash). The left-hand side only 
> processed the responses if the returned hash matches to current hash of the 
> left-hand side record, because a different hash implies that the lef- hand 
> side record was updated in the mean time (including sending a new 
> subscription to the right hand side), and thus the data is stale and the 
> response should not be processed (joining the response to the new record 
> could lead to incorrect results).
> A similar thing can happen on a right-hand side update that triggers a 
> response, that might be dropped if the left-hand side record was updated in 
> parallel.
> While the behavior is correct, we don't record if this happens. We should 
> consider to record this using the existing "dropped record" sensor or maybe 
> add a new sensor.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-10603) Re-design KStream.process() and K*.transform*() operations

2024-03-04 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10603?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-10603.
-
Resolution: Fixed

> Re-design KStream.process() and K*.transform*() operations
> --
>
> Key: KAFKA-10603
> URL: https://issues.apache.org/jira/browse/KAFKA-10603
> Project: Kafka
>  Issue Type: New Feature
>Reporter: John Roesler
>Priority: Major
>  Labels: needs-kip
>
> After the implementation of KIP-478, we have the ability to reconsider all 
> these APIs, and maybe just replace them with
> {code:java}
> // KStream
> KStream process(ProcessorSupplier) 
> // KTable
> KTable process(ProcessorSupplier){code}
>  
> but it needs more thought and a KIP for sure.
>  
> This ticket probably supercedes 
> https://issues.apache.org/jira/browse/KAFKA-8396



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16339) Remove Deprecated "transformer" methods and classes

2024-03-04 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-16339:
---

 Summary: Remove Deprecated "transformer" methods and classes
 Key: KAFKA-16339
 URL: https://issues.apache.org/jira/browse/KAFKA-16339
 Project: Kafka
  Issue Type: Sub-task
  Components: streams
Reporter: Matthias J. Sax
 Fix For: 4.0.0


Cf 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-820%3A+Extend+KStream+process+with+new+Processor+API]
 * KStream#tranform
 * KStream#flatTransform
 * KStream#transformValue
 * KStream#flatTransformValues
 * and the corresponding Scala methods

Related to https://issues.apache.org/jira/browse/KAFKA-12829, and both tickets 
should be worked on together.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16338) Removed Deprecated configs from StreamsConfig

2024-03-04 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-16338:
---

 Summary: Removed Deprecated configs from StreamsConfig
 Key: KAFKA-16338
 URL: https://issues.apache.org/jira/browse/KAFKA-16338
 Project: Kafka
  Issue Type: Sub-task
  Components: streams
Reporter: Matthias J. Sax
 Fix For: 5.0.0


* "buffered.records.per.partition" were deprecated via 
[https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186878390] 
(KIP not fully implemented yet, so move this from the 4.0 into this 5.0 ticket)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16337) Remove Deprecates APIs of Kafka Streams in 5.0

2024-03-04 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-16337:
---

 Summary: Remove Deprecates APIs of Kafka Streams in 5.0
 Key: KAFKA-16337
 URL: https://issues.apache.org/jira/browse/KAFKA-16337
 Project: Kafka
  Issue Type: Task
  Components: streams, streams-test-utils
Reporter: Matthias J. Sax
 Fix For: 5.0.0


This is an umbrella ticket that tries to collect all APIs under Kafka Streams 
that were deprecated in 3.6 or later. When the release scheduled for 5.0 will 
be set, we might need to remove sub-tasks if they don't hit the 1-year 
threshold.

Each subtask will de focusing on a specific API, so it's easy to discuss if it 
should be removed by 5.0.0 or maybe even at a later point.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16336) Remove Deprecated metric standby-process-ratio

2024-03-04 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-16336:
---

 Summary: Remove Deprecated metric standby-process-ratio
 Key: KAFKA-16336
 URL: https://issues.apache.org/jira/browse/KAFKA-16336
 Project: Kafka
  Issue Type: Sub-task
  Components: streams
Reporter: Matthias J. Sax
 Fix For: 4.0.0


Metric "standby-process-ratio" was deprecated in 3.5 release via 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-869%3A+Improve+Streams+State+Restoration+Visibility



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16335) Remove Deprecated method on StreamPartitioner

2024-03-04 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-16335:
---

 Summary: Remove Deprecated method on StreamPartitioner
 Key: KAFKA-16335
 URL: https://issues.apache.org/jira/browse/KAFKA-16335
 Project: Kafka
  Issue Type: Sub-task
  Components: streams
Reporter: Matthias J. Sax
 Fix For: 4.0.0


Deprecated in 3.4 release via 
[https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883356]
 * StreamPartitioner#partition (singular)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16334) Remove Deprecated command line option from reset tool

2024-03-04 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-16334:
---

 Summary: Remove Deprecated command line option from reset tool
 Key: KAFKA-16334
 URL: https://issues.apache.org/jira/browse/KAFKA-16334
 Project: Kafka
  Issue Type: Sub-task
  Components: streams, tools
Reporter: Matthias J. Sax
 Fix For: 4.0.0


--bootstrap-server (singular) was deprecated in 3.4 release via 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-865%3A+Support+--bootstrap-server+in+kafka-streams-application-reset]
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16333) Removed Deprecated methods KTable#join

2024-03-04 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-16333:
---

 Summary: Removed Deprecated methods KTable#join
 Key: KAFKA-16333
 URL: https://issues.apache.org/jira/browse/KAFKA-16333
 Project: Kafka
  Issue Type: Sub-task
  Components: streams
Reporter: Matthias J. Sax
 Fix For: 4.0.0


KTable#join() methods taking a `Named` parameter got deprecated in 3.1 release 
via https://issues.apache.org/jira/browse/KAFKA-13813 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16332) Remove Deprecated builder methods for Time/Session/Join/SlidingWindows

2024-03-04 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-16332:
---

 Summary: Remove Deprecated builder methods for 
Time/Session/Join/SlidingWindows
 Key: KAFKA-16332
 URL: https://issues.apache.org/jira/browse/KAFKA-16332
 Project: Kafka
  Issue Type: Sub-task
Reporter: Matthias J. Sax


Deprecated in 3.0: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-633%3A+Deprecate+24-hour+Default+Grace+Period+for+Windowed+Operations+in+Streams]
 
 * TimeWindows#of
 * TimeWindows#grace
 * SessionWindows#with
 * SessionWindows#grace
 * JoinWindows#of
 * JoinWindows#grace
 * SlidingWindows#withTimeDifferencAndGrace

Me might want to hold-off to cleanup JoinWindows due to 
https://issues.apache.org/jira/browse/KAFKA-13813 (open for discussion)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16331) Remove Deprecated EOSv1

2024-03-04 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-16331:
---

 Summary: Remove Deprecated EOSv1
 Key: KAFKA-16331
 URL: https://issues.apache.org/jira/browse/KAFKA-16331
 Project: Kafka
  Issue Type: Sub-task
  Components: streams
Reporter: Matthias J. Sax
 Fix For: 4.0.0


EOSv1 was deprecated in AK 3.0 via 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-732%3A+Deprecate+eos-alpha+and+replace+eos-beta+with+eos-v2]
 * remove conifg
 * remove Producer#sendOffsetsToTransaction
 * cleanup code



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16330) Remove Deprecated methods/variables from TaskId

2024-03-04 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-16330:
---

 Summary: Remove Deprecated methods/variables from TaskId
 Key: KAFKA-16330
 URL: https://issues.apache.org/jira/browse/KAFKA-16330
 Project: Kafka
  Issue Type: Sub-task
  Components: streams
Reporter: Matthias J. Sax
 Fix For: 4.0.0


Cf [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181306557]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16329) Remove Deprecated Task/ThreadMetadata classes and related methods

2024-03-04 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-16329:
---

 Summary: Remove Deprecated Task/ThreadMetadata classes and related 
methods
 Key: KAFKA-16329
 URL: https://issues.apache.org/jira/browse/KAFKA-16329
 Project: Kafka
  Issue Type: Sub-task
  Components: streams
Reporter: Matthias J. Sax
 Fix For: 4.0.0


Deprecated in AK 3.0 via 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-744%3A+Migrate+TaskMetadata+and+ThreadMetadata+to+an+interface+with+internal+implementation]
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-12831) Remove Deprecated method StateStore#init

2024-03-04 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12831?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-12831.
-
Resolution: Fixed

> Remove Deprecated method StateStore#init
> 
>
> Key: KAFKA-12831
> URL: https://issues.apache.org/jira/browse/KAFKA-12831
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Josep Prat
>Priority: Blocker
> Fix For: 4.0.0
>
>
> The method 
> org.apache.kafka.streams.processor.StateStore#init(org.apache.kafka.streams.processor.ProcessorContext,
>  org.apache.kafka.streams.processor.StateStore) was deprected in version 2.7
>  
> See KAFKA-10562 and KIP-478
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-12825) Remove Deprecated method StreamsBuilder#addGlobalStore

2024-03-04 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12825?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-12825.
-
Resolution: Fixed

> Remove Deprecated method StreamsBuilder#addGlobalStore
> --
>
> Key: KAFKA-12825
> URL: https://issues.apache.org/jira/browse/KAFKA-12825
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Josep Prat
>Priority: Blocker
> Fix For: 4.0.0
>
>
> Methods:
> org.apache.kafka.streams.scala.StreamsBuilder#addGlobalStore
> org.apache.kafka.streams.StreamsBuilder#addGlobalStore(org.apache.kafka.streams.state.StoreBuilder,
>  java.lang.String, org.apache.kafka.streams.kstream.Consumed, 
> org.apache.kafka.streams.processor.ProcessorSupplier)
> were deprecated in 2.7
>  
> See KAFKA-10379 and KIP-478



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16328) Remove deprecated config StreamsConfig#retries

2024-03-04 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-16328:
---

 Summary: Remove deprecated config StreamsConfig#retries
 Key: KAFKA-16328
 URL: https://issues.apache.org/jira/browse/KAFKA-16328
 Project: Kafka
  Issue Type: Sub-task
  Components: streams
Reporter: Matthias J. Sax
 Fix For: 4.0.0


Deprecated in AK 2.7 – already unused – via 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-572%3A+Improve+timeouts+and+retries+in+Kafka+Streams



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16327) Remove Deprecated variable StreamsConfig#TOPOLOGY_OPTIMIZATION

2024-03-04 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-16327:
---

 Summary: Remove Deprecated variable 
StreamsConfig#TOPOLOGY_OPTIMIZATION 
 Key: KAFKA-16327
 URL: https://issues.apache.org/jira/browse/KAFKA-16327
 Project: Kafka
  Issue Type: Sub-task
  Components: streams
Reporter: Matthias J. Sax


Deprecated in 2.7 release via 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-626%3A+Rename+StreamsConfig+config+variable+name]
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-12549) Allow state stores to opt-in transactional support

2024-02-22 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-12549.
-
Resolution: Duplicate

Closing this ticket in favor of K14412.

> Allow state stores to opt-in transactional support
> --
>
> Key: KAFKA-12549
> URL: https://issues.apache.org/jira/browse/KAFKA-12549
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>
> Right now Kafka Stream's EOS implementation does not make any assumptions 
> about the state store's transactional support. Allowing the state stores to 
> optionally provide transactional support can have multiple benefits. E.g., if 
> we add some APIs into the {{StateStore}} interface, like {{beginTxn}}, 
> {{commitTxn}} and {{abortTxn}}. Streams library can determine if these are 
> supported via an additional {{boolean transactional()}} API, and if yes the 
> these APIs can be used under both ALOS and EOS like the following (otherwise 
> then just fallback to the normal processing logic):
> Within thread processing loops:
> 1. store.beginTxn
> 2. store.put // during processing
> 3. streams commit // either through eos protocol or not
> 4. store.commitTxn
> 5. start the next txn by store.beginTxn
> If the state stores allow Streams to do something like above, we can have the 
> following benefits:
> * Reduce the duplicated records upon crashes for ALOS (note this is not EOS 
> still, but some middle-ground where uncommitted data within a state store 
> would not be retained if store.commitTxn failed).
> * No need to wipe the state store and re-bootstrap from scratch upon crashes 
> for EOS. E.g., if a crash-failure happened between streams commit completes 
> and store.commitTxn. We can instead just roll-forward the transaction by 
> replaying the changelog from the second recent streams committed offset 
> towards the most recent committed offset.
> * Remote stores that support txn then do not need to support wiping 
> (https://issues.apache.org/jira/browse/KAFKA-12475).
> * We can fix the known issues of emit-on-change 
> (https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams).
> * We can support "query committed data only" for interactive queries (see 
> below for reasons).
> As for the implementation of these APIs, there are several options:
> * The state store itself have natural transaction features (e.g. RocksDB).
> * Use an in-memory buffer for all puts within a transaction, and upon 
> `commitTxn` write the whole buffer as a batch to the underlying state store, 
> or just drop the whole buffer upon aborting. Then for interactive queries, 
> one can optionally only query the underlying store for committed data only.
> * Use a separate store as the transient persistent buffer. Upon `beginTxn` 
> create a new empty transient store, and upon `commitTxn` merge the store into 
> the underlying store. Same applies for interactive querying committed-only 
> data. This has a benefit compared with the one above that there's no memory 
> pressure even with long transactions, but incurs more complexity / 
> performance overhead with the separate persistent store.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16295) Align RocksDB and in-memory store inti() sequence

2024-02-21 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-16295:
---

 Summary: Align RocksDB and in-memory store inti() sequence
 Key: KAFKA-16295
 URL: https://issues.apache.org/jira/browse/KAFKA-16295
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Matthias J. Sax


Cf [https://lists.apache.org/thread/f4z1vmpb21xhyxl6966xtcb3958fyx5d] 
{quote}For RocksDB stores, we open the store first and then call #register, 
[...] However the in-memory store actually registers itself *first*, before 
marking itself as open,[..].

I suppose it would make sense to align the two store implementations to have 
the same behavior, and the in-memory store is probably technically more correct.
{quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16279) Avoid leaking abstractions of `StateStore`

2024-02-19 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-16279:
---

 Summary: Avoid leaking abstractions of `StateStore`
 Key: KAFKA-16279
 URL: https://issues.apache.org/jira/browse/KAFKA-16279
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Matthias J. Sax


The `StateStore` interface is user facing and contains a few life-cycle 
management methods (like `init()` and `close()`) – those methods are exposed 
for users to develop custom state stores.

However, we also use `StateStore` as base interface for store-handles in the 
PAPI, and thus life-cycle management methods are leaking into the PAPI (maybe 
also others – would need a dedicated discussion which one we consider useful 
for PAPI users and which not).

We should consider to change what we expose in the PAPI (atm, we only document 
via JavaDocs that eg. `close()` should never be called; but it's of course not 
ideal and would be better if `close()` et al. would not be expose for `PAPI` 
users to begin with.)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16263) Add Kafka Streams docs about available listeners/callback

2024-02-15 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-16263:
---

 Summary: Add Kafka Streams docs about available listeners/callback
 Key: KAFKA-16263
 URL: https://issues.apache.org/jira/browse/KAFKA-16263
 Project: Kafka
  Issue Type: Task
  Components: docs, streams
Reporter: Matthias J. Sax


Kafka Streams allows to register all kind of listeners and callback (eg, 
uncaught-exception-handler, restore-listeners, etc) but those are not in the 
documentation.

A good place might be 
[https://kafka.apache.org/documentation/streams/developer-guide/running-app.html]
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16262) Add IQv2 to Kafka Streams documentation

2024-02-15 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-16262:
---

 Summary: Add IQv2 to Kafka Streams documentation
 Key: KAFKA-16262
 URL: https://issues.apache.org/jira/browse/KAFKA-16262
 Project: Kafka
  Issue Type: Task
  Components: docs, streams
Reporter: Matthias J. Sax


The new IQv2 API was added many release ago. While it is still not feature 
complete, we should add it to the docs 
([https://kafka.apache.org/documentation/streams/developer-guide/interactive-queries.html])
 to make users aware of the new API so they can start to try it out, report 
issue and provide feedback / feature requests.

We might still state that IQv2 is not yet feature complete, but should change 
the docs in a way to position is as the "new API", and have code exmples.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14957) Default value for state.dir is confusing

2024-02-10 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14957?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-14957.
-
Fix Version/s: 3.8.0
   Resolution: Fixed

> Default value for state.dir is confusing
> 
>
> Key: KAFKA-14957
> URL: https://issues.apache.org/jira/browse/KAFKA-14957
> Project: Kafka
>  Issue Type: Bug
>  Components: docs, streams
>Reporter: Mickael Maison
>Assignee: Owen C.H. Leung
>Priority: Minor
>  Labels: beginner, newbie
> Fix For: 3.8.0
>
>
> The default value for state.dir is documented as 
> /var/folders/0t/68svdzmx1sld0mxjl8dgmmzmgq/T//kafka-streams
> This is misleading, the value will be different in each environment as it 
> computed using System.getProperty("java.io.tmpdir"). We should update the 
> description to mention how the path is computed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16241) Kafka Streams hits IllegalStateException trying to recycle a task

2024-02-09 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-16241:
---

 Summary: Kafka Streams hits IllegalStateException trying to 
recycle a task
 Key: KAFKA-16241
 URL: https://issues.apache.org/jira/browse/KAFKA-16241
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Matthias J. Sax


Running with EOS-v2 (not sure if relevant or not) and hitting:
{code:java}
[2024-02-08 20:57:42,325] ERROR [i-0fede2697f39580f9-StreamThread-1] 
stream-thread [i-0fede2697f39580f9-StreamThread-1] Failed to recycle task 1_0 
cleanly. Attempting to close remaining tasks before re-throwing: 
(org.apache.kafka.streams.processor.internals.TaskManager)
java.lang.IllegalStateException: Illegal state RESTORING while recycling active 
task 1_0
    at 
org.apache.kafka.streams.processor.internals.StreamTask.prepareRecycle(StreamTask.java:582)
    at 
org.apache.kafka.streams.processor.internals.StandbyTaskCreator.createStandbyTaskFromActive(StandbyTaskCreator.java:125)
    at 
org.apache.kafka.streams.processor.internals.TaskManager.convertActiveToStandby(TaskManager.java:675)
    at 
org.apache.kafka.streams.processor.internals.TaskManager.closeAndRecycleTasks(TaskManager.java:651)
    at 
org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:350)
    at 
org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment(StreamsPartitionAssignor.java:1381)
    at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokeOnAssignment(ConsumerCoordinator.java:315)
    at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:469)
    at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:478)
    at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:389)
    at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:564)
    at 
org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1220)
    at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1179)
    at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:1014)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:954)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:766)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579)
 {code}
Logs of all three KS instances attached.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16236) Interactive Query v2 does not support Global KTables

2024-02-08 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16236?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-16236.
-
Resolution: Duplicate

Thanks for filing this tickets. It's duplicate of 
https://issues.apache.org/jira/browse/KAFKA-13523 – so yes, we want to close 
this gap, but I don't think it's on the roadmap already...

> Interactive Query v2 does not support Global KTables
> 
>
> Key: KAFKA-16236
> URL: https://issues.apache.org/jira/browse/KAFKA-16236
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Christian Zügner
>Priority: Major
>
> Query Global KTable using IQ v2 API is currently not supported:
> java.lang.IllegalArgumentException: Cannot get result for failed query. 
> Failure is UNKNOWN_QUERY_TYPE: Global stores do not yet support the 
> KafkaStreams#query API. Use KafkaStreams#store instead.
> I would kindly as ask if this feature could be implemented for GlobalKTable 
> as well?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16221) IllegalStateException from Producer

2024-02-05 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16221?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-16221.
-
Resolution: Fixed

> IllegalStateException from Producer
> ---
>
> Key: KAFKA-16221
> URL: https://issues.apache.org/jira/browse/KAFKA-16221
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.0
>Reporter: Matthias J. Sax
>Priority: Critical
> Fix For: 3.7.0
>
>
> https://issues.apache.org/jira/browse/KAFKA-14831 fixed a producer bug about 
> internal TX state transition and the producer is now throwing an 
> IllegalStateException in situations it did swallow an internal error before.
> This change surfaces a bug in Kafka Streams: Kafka Streams calls 
> `abortTransaction()` blindly when a task is closed dirty, even if the 
> Producer is already in an internal fatal state. However, if the Producer is 
> in a fatal state, Kafka Streams should skip `abortTransaction` and only 
> `close()` the Producer when closing a task dirty.
> The bug is surfaced after `commitTransaction()` did timeout or after an 
> `InvalidProducerEpochException` from a `send()` call, leading to the call to 
> `abortTransaction()` – Kafka Streams does not track right now if a commit-TX 
> is in progress.
> {code:java}
> java.lang.IllegalStateException: Cannot attempt operation `abortTransaction` 
> because the previous call to `commitTransaction` timed out and must be retried
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.handleCachedTransactionRequestResult(TransactionManager.java:1203)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.beginAbort(TransactionManager.java:326)
> at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:274) {code}
> and
> {code:java}
> [2024-01-16 04:19:32,584] ERROR [kafka-producer-network-thread | 
> i-01aea6907970b1bf6-StreamThread-1-producer] stream-thread 
> [i-01aea6907970b1bf6-StreamThread-1] stream-task [1_2] Error encountered 
> sending r   ecord to topic joined-counts for task 1_2 due to:
> org.apache.kafka.common.errors.InvalidProducerEpochException: Producer 
> attempted to produce with an old epoch.
> Written offsets would not be recorded and no more records would be sent since 
> the producer is fenced, indicating the task may be migrated out 
> (org.apache.kafka.streams.processor.internals.RecordCollectorImp   l)
> org.apache.kafka.common.errors.InvalidProducerEpochException: Producer 
> attempted to produce with an old epoch.
> // followed by
> [2024-01-16 04:19:32,587] ERROR [kafka-producer-network-thread | 
> i-01aea6907970b1bf6-StreamThread-1-producer] [Producer 
> clientId=i-01aea6907970b1bf6-StreamThread-1-producer, 
> transactionalId=stream-soak-test   
> -bbb995dc-1ba2-41ed-8791-0512ab4b904d-1] Aborting producer batches due to 
> fatal error (org.apache.kafka.clients.producer.internals.Sender)
> java.lang.IllegalStateException: TransactionalId 
> stream-soak-test-bbb995dc-1ba2-41ed-8791-0512ab4b904d-1: Invalid transition 
> attempted from state FATAL_ERROR to state ABORTABLE_ERROR
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:996)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionToAbortableError(TransactionManager.java:451)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.maybeTransitionToErrorState(TransactionManager.java:664)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.handleFailedBatch(TransactionManager.java:669)
> at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:835)
> at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:819)
> at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:771)
> at 
> org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:702)
> at 
> org.apache.kafka.clients.producer.internals.Sender.lambda$null$1(Sender.java:627)
> at java.util.ArrayList.forEach(ArrayList.java:1259)
> at 
> org.apache.kafka.clients.producer.internals.Sender.lambda$handleProduceResponse$2(Sender.java:612)
> at java.lang.Iterable.forEach(Iterable.java:75)
> at 
> org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:612)
> at 
> org.apache.kafka.clients.producer.internals.Sender.lambda$sendProduceRequest$8(Sender.java:917)
> at 
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:154)
> at 
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:608)
> at 

[jira] [Created] (KAFKA-16221) IllegalStateException from Producer

2024-02-04 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-16221:
---

 Summary: IllegalStateException from Producer
 Key: KAFKA-16221
 URL: https://issues.apache.org/jira/browse/KAFKA-16221
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.6.0
Reporter: Matthias J. Sax


https://issues.apache.org/jira/browse/KAFKA-14831 fixed a producer bug about 
internal TX state transition and the producer is now throwing an 
IllegalStateException in situations it did swallow an internal error before.

This change surfaces a bug in Kafka Streams: Kafka Streams calls 
`abortTransaction()` blindly when a task is closed dirty, even if the Producer 
is already in an internal fatal state. However, if the Producer is in a fatal 
state, Kafka Streams should skip `abortTransaction` and only `close()` the 
Producer when closing a task dirty.

If the Producer throws an IllegalStateException on `abortTransaction()` Kafka 
Streams treats this exception ("correctly") as fatal, and StreamsThread dies. 
However, Kafka Streams is actually in a state in which it can recover from, and 
thus should not let StreamThread die by carry forward (by not calling 
`abortTransaction()` and moving forward with the dirty close of the task).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-10184) Flaky HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores

2024-01-30 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-10184.
-
  Assignee: (was: John Roesler)
Resolution: Cannot Reproduce

This ticket is pretty old, and looking into Gradle Enterprise the test seems to 
be stable. Closing for now.

> Flaky 
> HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores
> --
>
> Key: KAFKA-10184
> URL: https://issues.apache.org/jira/browse/KAFKA-10184
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Guozhang Wang
>Priority: Critical
>
> {code}
> Stacktrace
> java.lang.AssertionError: Condition not met within timeout 12. Input 
> records haven't all been written to the changelog: 442
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26)
>   at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$6(TestUtils.java:401)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:449)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:398)
>   at 
> org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasks(HighAvailabilityTaskAssignorIntegrationTest.java:149)
>   at 
> org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores(HighAvailabilityTaskAssignorIntegrationTest.java:91)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>   at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>   at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>   at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
>   at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>   at 
> 

[jira] [Resolved] (KAFKA-16139) StreamsUpgradeTest fails consistently in 3.7.0

2024-01-17 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16139?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-16139.
-
Fix Version/s: 3.7.0
   3.6.1
   Resolution: Fixed

> StreamsUpgradeTest fails consistently in 3.7.0
> --
>
> Key: KAFKA-16139
> URL: https://issues.apache.org/jira/browse/KAFKA-16139
> Project: Kafka
>  Issue Type: Test
>Affects Versions: 3.7.0
>Reporter: Stanislav Kozlovski
>Assignee: Bruno Cadonna
>Priority: Major
> Fix For: 3.7.0, 3.6.1
>
>
> h1. 
> kafkatest.tests.streams.streams_upgrade_test.StreamsUpgradeTest#test_rolling_upgrade_with_2_bouncesArguments:\{
>  “from_version”: “3.5.1”, “to_version”: “3.7.0-SNAPSHOT”}
>  
> {{TimeoutError('Could not detect Kafka Streams version 3.7.0-SNAPSHOT on 
> ubuntu@worker2')}}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16158) Cleanup usage of `TimestampedBytesStore` interface

2024-01-17 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-16158:
---

 Summary: Cleanup usage of `TimestampedBytesStore` interface
 Key: KAFKA-16158
 URL: https://issues.apache.org/jira/browse/KAFKA-16158
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Matthias J. Sax


We added `TimestampedBytesStore` interface many release ago. It's purpose is to 
indicate if a byte-store's binary value contains a "plain value" or a 
"" format. Stores with "" format should implement the 
interface, however not all stores which this format do.

We tried to fix one occurrence via 
https://issues.apache.org/jira/browse/KAFKA-15629 by adding 
`TimestampedBytesStore` to `KeyValueToTimestampedKeyValueByteStoreAdapter`, 
whoever this change broke the restore code path (cf 
https://issues.apache.org/jira/browse/KAFKA-16141) and thus we reverted the 
change.

During the investigation, we also notices that 
`InMemoryTimestampedKeyValueStoreMarker` implements `TimestampedBytesStore` but 
does not do a byte-array translation (it's unclear why no byte array 
translation happens) – and it's also unclear if in-memory store is testes 
properly.

We should try to clean this all up, adding `TimestampedBytesStore` to 
`KeyValueToTimestampedKeyValueByteStoreAdapter` and figure out how avoid 
breaking the restore code path. In addition, we should verify if 
`InMemoryTimestampedKeyValueStoreMarker` is correct or not, and if the restore 
code path (and maybe also IQv2 code path) is tested properly and correct.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14949) Add Streams upgrade tests from AK 3.4

2024-01-09 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14949?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-14949.
-
Resolution: Fixed

> Add Streams upgrade tests from AK 3.4
> -
>
> Key: KAFKA-14949
> URL: https://issues.apache.org/jira/browse/KAFKA-14949
> Project: Kafka
>  Issue Type: Task
>  Components: streams, system tests
>Reporter: Victoria Xia
>Assignee: Mickael Maison
>Priority: Critical
>
> Streams upgrade tests currently only test upgrading from 3.3 and earlier 
> versions 
> ([link|https://github.com/apache/kafka/blob/056657d84d84e116ffc9460872945b4d2b479ff3/tests/kafkatest/tests/streams/streams_application_upgrade_test.py#L30]).
>  We should add 3.4 as an "upgrade_from" version into these tests, in light of 
> the upcoming 3.5 release.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15448) Streams StandbyTaskUpdateListener

2023-12-19 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15448?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-15448.
-
Fix Version/s: 3.7.0
 Assignee: Colt McNealy
   Resolution: Fixed

> Streams StandbyTaskUpdateListener
> -
>
> Key: KAFKA-15448
> URL: https://issues.apache.org/jira/browse/KAFKA-15448
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Colt McNealy
>Assignee: Colt McNealy
>Priority: Minor
>  Labels: kip
> Fix For: 3.7.0
>
>
> KIP-869: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-988%3A+Streams+Standby+Update+Listener]
> In addition to the new metrics in KIP-869, it would be great to have a 
> callback that allows for monitoring of Standby Task status. The 
> StateRestoreListener is currently not called for Standby Tasks for good 
> reasons (the API wouldn't make sense for Standby). I've attached an interface 
> which would be nice to have:
>  
> ```
> public interface StandbyTaskUpdateListener {
> ​
> public enum SuspendReason
> { MIGRATED, PROMOTED; }
>  
> /**
>  * Method called upon the creation of the Standby Task.
> *
>  * @param topicPartition the TopicPartition of the Standby Task.
>  * @param storeName the name of the store being watched by this Standby Task.
>  * @param earliestOffset the earliest offset available on the Changelog topic.
>  * @param startingOffset the offset from which the Standby Task starts 
> watching.
>  * @param currentEndOffset the current latest offset on the associated 
> changelog partition.
> */
> void onTaskCreated(final TopicPartition topicPartition,
> final String storeName,
> final long earliestOffset
> final long startingOffset,
> final long currentEndOffset);
> ​
> /**
>  * Method called after restoring a batch of records. In this case the maximum 
> size of the batch is whatever
>  * the value of the MAX_POLL_RECORDS is set to.
> *
>  * This method is called after restoring each batch and it is advised to keep 
> processing to a minimum.
>  * Any heavy processing will hold up recovering the next batch, hence slowing 
> down the restore process as a
>  * whole.
> *
>  * If you need to do any extended processing or connecting to an external 
> service consider doing so asynchronously.
> *
>  * @param topicPartition the TopicPartition containing the values to restore
>  * @param storeName the name of the store undergoing restoration
>  * @param batchEndOffset the inclusive ending offset for the current restored 
> batch for this TopicPartition
>  * @param numRestored the total number of records restored in this batch for 
> this TopicPartition
>  * @param currentEndOffset the current end offset of the changelog topic 
> partition.
> */
> void onBatchRestored(final TopicPartition topicPartition,
> final String storeName,
> final long batchEndOffset,
> final long numRestored,
> final long currentEndOffset);
> ​
> /**
>  * Method called after a Standby Task is closed, either because the task 
> migrated to a new instance or because the task was promoted to an Active task.
> */
> void onTaskSuspended(final TopicPartition topicPartition,
> final String storeName,
> final long storeOffset,
> final long currentEndOffset,
> final SuspendReason reason);
> }
> ```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-9545) Flaky Test `RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted`

2023-12-12 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-9545.

Resolution: Fixed

> Flaky Test `RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted`
> --
>
> Key: KAFKA-9545
> URL: https://issues.apache.org/jira/browse/KAFKA-9545
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Jason Gustafson
>Assignee: Ashwin Pankaj
>Priority: Major
>
> https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/4678/testReport/org.apache.kafka.streams.integration/RegexSourceIntegrationTest/testRegexMatchesTopicsAWhenDeleted/
> {code}
> java.lang.AssertionError: Condition not met within timeout 15000. Stream 
> tasks not updated
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26)
>   at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$5(TestUtils.java:367)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:415)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:383)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:366)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:337)
>   at 
> org.apache.kafka.streams.integration.RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted(RegexSourceIntegrationTest.java:224)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15026) Implement min-cost flow balancing tasks for same subtopology

2023-12-06 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15026?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-15026.
-
Fix Version/s: 3.7.0
   Resolution: Fixed

> Implement min-cost flow balancing tasks for same subtopology
> 
>
> Key: KAFKA-15026
> URL: https://issues.apache.org/jira/browse/KAFKA-15026
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Hao Li
>Assignee: Hao Li
>Priority: Major
> Fix For: 3.7.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15022) Support rack aware task assignment in Kafka streams

2023-12-06 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15022?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-15022.
-
Fix Version/s: 3.7.0
   3.6.0
   Resolution: Fixed

> Support rack aware task assignment in Kafka streams 
> 
>
> Key: KAFKA-15022
> URL: https://issues.apache.org/jira/browse/KAFKA-15022
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Hao Li
>Assignee: Hao Li
>Priority: Major
>  Labels: kip, kip-925
> Fix For: 3.7.0, 3.6.0
>
>
> For KIP-925: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15347) Single-Key_multi-timestamp IQs with versioned state stores

2023-12-06 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15347?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-15347.
-
Fix Version/s: 3.7.0
   Resolution: Fixed

> Single-Key_multi-timestamp IQs with versioned state stores
> --
>
> Key: KAFKA-15347
> URL: https://issues.apache.org/jira/browse/KAFKA-15347
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Alieh Saeedi
>Assignee: Alieh Saeedi
>Priority: Major
> Fix For: 3.7.0
>
>
> [KIP-968|https://cwiki.apache.org/confluence/display/KAFKA/KIP-968%3A+Support+single-key_multi-timestamp+interactive+queries+%28IQv2%29+for+versioned+state+stores]
> This ticket covers just four query types:
> *Key Queries with multiple timestamps:*
>  # single-key query with upper bound timestamp
>  # single-key query with lower bound timestamp
>  # single-key query with timestamp range
>  # single-key all versions query



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15957) ConsistencyVectorIntegrationTest.shouldHaveSamePositionBoundActiveAndStandBy broken

2023-12-03 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15957?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-15957.
-
Resolution: Fixed

> ConsistencyVectorIntegrationTest.shouldHaveSamePositionBoundActiveAndStandBy 
> broken
> ---
>
> Key: KAFKA-15957
> URL: https://issues.apache.org/jira/browse/KAFKA-15957
> Project: Kafka
>  Issue Type: Bug
>Reporter: Justine Olshan
>Assignee: Lucas Brutschy
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15527) Add reverseRange and reverseAll query over kv-store in IQv2

2023-11-30 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-15527.
-
Resolution: Fixed

> Add reverseRange and reverseAll query over kv-store in IQv2
> ---
>
> Key: KAFKA-15527
> URL: https://issues.apache.org/jira/browse/KAFKA-15527
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Hanyu Zheng
>Assignee: Hanyu Zheng
>Priority: Major
>  Labels: kip
> Fix For: 3.7.0
>
>
> Add reverseRange and reverseAll query over kv-store in IQv2
> Update an implementation of the Query interface, introduced in [KIP-796: 
> Interactive Query 
> v2|https://cwiki.apache.org/confluence/display/KAFKA/KIP-796%3A+Interactive+Query+v2]
>  , to support reverseRange and reverseAll.
> Use bounded query to achieve reverseRange and use unbounded query to achieve 
> reverseAll.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15951) MissingSourceTopicException should include topic names

2023-11-30 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-15951:
---

 Summary: MissingSourceTopicException should include topic names
 Key: KAFKA-15951
 URL: https://issues.apache.org/jira/browse/KAFKA-15951
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Matthias J. Sax


As the title say – we don't include topic names in all cases, what make it hard 
for users to identify the root cause more clearly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15629) proposal to introduce IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery

2023-11-30 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-15629.
-
Resolution: Fixed

> proposal to introduce IQv2 Query Types: TimestampedKeyQuery and 
> TimestampedRangeQuery
> -
>
> Key: KAFKA-15629
> URL: https://issues.apache.org/jira/browse/KAFKA-15629
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Hanyu Zheng
>Assignee: Hanyu Zheng
>Priority: Major
>  Labels: kip
> Fix For: 3.7.0
>
>
> KIP-992: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-992%3A+Proposal+to+introduce+IQv2+Query+Types%3A+TimestampedKeyQuery+and+TimestampedRangeQuery
> In the current IQv2 code, there are noticeable differences when interfacing 
> with plain-kv-store and ts-kv-store. Notably, the return type V acts as a 
> simple value for plain-kv-store but evolves into ValueAndTimestamp for 
> ts-kv-store, which presents type safety issues in the API.
> Even if IQv2 hasn't gained widespread adoption, an immediate fix might bring 
> compatibility concerns.
> This brings us to the essence of our proposal: the introduction of distinct 
> query types. One that returns a plain value, another for values accompanied 
> by timestamps.
> While querying a ts-kv-store for a plain value and then extracting it is 
> feasible, it doesn't make sense to query a plain-kv-store for a 
> ValueAndTimestamp.
> Our vision is for plain-kv-store to always return V, while ts-kv-store should 
> return ValueAndTimestamp.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15346) Single-Key_single-timestamp IQs with versioned state stores

2023-11-15 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15346?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-15346.
-
Fix Version/s: 3.7.0
   Resolution: Fixed

> Single-Key_single-timestamp IQs with versioned state stores
> ---
>
> Key: KAFKA-15346
> URL: https://issues.apache.org/jira/browse/KAFKA-15346
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Alieh Saeedi
>Assignee: Alieh Saeedi
>Priority: Major
> Fix For: 3.7.0
>
>
> [KIP-960|https://cwiki.apache.org/confluence/display/KAFKA/KIP-960%3A+Support+single-key_single-timestamp+interactive+queries+%28IQv2%29+for+versioned+state+stores]
> This ticket covers just two query types:
> *Key Queries with single timestamp:*
>  # single-key latest-value lookup
>  # single-key lookup with timestamp (upper) bound



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15669) Implement telemetry naming strategy

2023-11-01 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15669?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-15669.
-
Fix Version/s: 3.7.0
   Resolution: Fixed

> Implement telemetry naming strategy
> ---
>
> Key: KAFKA-15669
> URL: https://issues.apache.org/jira/browse/KAFKA-15669
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Apoorv Mittal
>Assignee: Apoorv Mittal
>Priority: Major
> Fix For: 3.7.0
>
>
> Define classes and implement telemetry metrics naming strategy for the 
> KIP-714 as defined here: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability#KIP714:Clientmetricsandobservability-Metricsnamingandformat]
>  
> The naming strategy must also support delta temporality metrics with a suffix 
> in original metric name.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15668) Add Opentelemetry Proto library with shadowed classes

2023-11-01 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15668?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-15668.
-
Fix Version/s: 3.7.0
   Resolution: Fixed

> Add Opentelemetry Proto library with shadowed classes
> -
>
> Key: KAFKA-15668
> URL: https://issues.apache.org/jira/browse/KAFKA-15668
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Apoorv Mittal
>Assignee: Apoorv Mittal
>Priority: Major
> Fix For: 3.7.0
>
>
> The KIP-714 requires addition of [Java client 
> dependency|https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability#KIP714:Clientmetricsandobservability-Javaclientdependencies]
>  of {{{}opentelemetry-proto{}}}, also brings transitive dependency of 
> {{protobuf-java.}} The dependencies should be shadowed to avoid JVM 
> versioning conflicts.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15768) StateQueryResult#getOnlyPartitionResult should not throw for FailedQueryResult

2023-10-31 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-15768:
---

 Summary: StateQueryResult#getOnlyPartitionResult should not throw 
for FailedQueryResult
 Key: KAFKA-15768
 URL: https://issues.apache.org/jira/browse/KAFKA-15768
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Matthias J. Sax


Calling `StateQueryResult#getOnlyPartitionResult` crashes with an incorrect 
`IllegalArgumentException` if the only result is a `FailedQueryResult`.

The issue is the internal `filter(r -> r.getResult() != 0)` step, that blindly 
(and incorrectly) calls `getResult`.

Given the semantics of `getOnlyPartitionResult` we should not care if the 
result is SuccessQueryResult or FailedQueryResult, but only check if there is a 
single result or not. (The user has not means to avoid getting an exception 
otherwise.)

Side-note: why does `FailedQueryResult#getResult` throw an 
IllegalArgumentException (there is no argument passed into the method – it 
should rather be an `IllegalStateException` – but I guess we would need a KIP 
for this fix?)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15765) Remove task level metric "commit-latency"

2023-10-31 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-15765:
---

 Summary: Remove task level metric "commit-latency"
 Key: KAFKA-15765
 URL: https://issues.apache.org/jira/browse/KAFKA-15765
 Project: Kafka
  Issue Type: Task
  Components: streams
Reporter: Matthias J. Sax
 Fix For: 4.0.0


We stopped tracking this metric with KIP-447, but kept it for backward 
compatibility reasons. It's time to remove it completely with 4.0 release.

Cf 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics]

And [https://github.com/apache/kafka/pull/8218/files#r390712211]

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15672) Add 3.6 to streams system tests

2023-10-31 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-15672.
-
Resolution: Duplicate

> Add 3.6 to streams system tests
> ---
>
> Key: KAFKA-15672
> URL: https://issues.apache.org/jira/browse/KAFKA-15672
> Project: Kafka
>  Issue Type: Test
>  Components: streams, system tests
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Critical
>
> 3.6.0 was released recently. We need to add `3.6.0` to the system tests (in 
> particular upgrade and broker compatibility tests)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15602) Breaking change in 3.4.0 ByteBufferSerializer

2023-10-30 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-15602.
-
Fix Version/s: 3.4.2
   3.5.2
   3.7.0
   3.6.1
 Assignee: Matthias J. Sax
   Resolution: Fixed

As discussed, reverted this in all applicable branches.

> Breaking change in 3.4.0 ByteBufferSerializer
> -
>
> Key: KAFKA-15602
> URL: https://issues.apache.org/jira/browse/KAFKA-15602
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 3.4.0, 3.5.0, 3.4.1, 3.6.0, 3.5.1
>Reporter: Luke Kirby
>Assignee: Matthias J. Sax
>Priority: Critical
> Fix For: 3.4.2, 3.5.2, 3.7.0, 3.6.1
>
>
> [This PR|https://github.com/apache/kafka/pull/12683/files] claims to have 
> solved the situation described by KAFKA-4852, namely, to have 
> ByteBufferSerializer respect ByteBuffers wrapping byte arrays with non-0 
> offsets (or, put another way, to honor the buffer's position() as the start 
> point to consume bytes from). Unfortunately, it failed to actually do this, 
> and instead changed the expectations for how an input ByteBuffer's limit and 
> position should be set before being provided to send() on a producer 
> configured with ByteBufferSerializer. Code that worked with pre-3.4.0 
> releases now produce 0-length messages instead of the intended messages, 
> effectively introducing a breaking change for existing users of the 
> serializer in the wild.
> Here are a few different inputs and serialized outputs under pre-3.4.0 and 
> 3.4.0+ to summarize the breaking change:
> ||buffer argument||3.3.2 serialized output||3.4.0+ serialized output||
> |ByteBuffer.wrap("test".getBytes(UTF_8))|len=4 
> val=test|len=4 val=test|
> |ByteBuffer.allocate(8).put("test".getBytes(UTF_8)).flip()|len=4 
> val=test|len=0 val=|
> |ByteBuffer.allocate(8).put("test".getBytes(UTF_8))|len=8 
> val=test<0><0><0><0>|len=4 val=test|
> |ByteBuffer buff = ByteBuffer.allocate(8).put("test".getBytes(UTF_8));
> buff.limit(buff.position());|len=4 
> val=test|len=4 val=test|
> |ByteBuffer.wrap("test".getBytes(UTF_8), 1, 3)|len=4 val=test|len=1 val=t|
> Notably, plain-wrappers of byte arrays continue to work under both versions 
> due to the special case in the serializer for them. I suspect that this is 
> the dominant use-case, which is why this has apparently gone un-reported to 
> this point. The wrapped-with-offset case fails for both cases for different 
> reasons (the expected value would be "est"). As demonstrated here, you can 
> ensure that a manually assembled ByteBuffer will work under both versions by 
> ensuring that your buffers start have position == limit == message-length 
> (and an actual desired start position of 0). Clearly, though, behavior has 
> changed dramatically for the second and third case there, with the 3.3.2 
> behavior, in my experience, aligning better with naive expectations.
> [Previously|https://github.com/apache/kafka/blob/35a0de32ee3823dfb548a1cd5d5faf4f7c99e4e0/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java],
>  the serializer would just rewind() the buffer and respect the limit as the 
> indicator as to how much data was in the buffer. So, essentially, the 
> prevailing contract was that the data from position 0 (always!) up to the 
> limit on the buffer would be serialized; so it was really just the limit that 
> was honored. So if, per the original issue, you have a byte[] array wrapped 
> with, say, ByteBuffer.wrap(bytes, 3, 5) then that will yield a ByteBuffer() 
> with position = 3 indicating the desired start point to read from, but 
> effectively ignored by the serializer due to the rewind().
> So while the serializer didn't work when presenting a ByteBuffer view onto a 
> sub-view of a backing array, it did however follow expected behavior when 
> employing standard patterns to populate ByteBuffers backed by 
> larger-than-necessary arrays and using limit() to identify the end of actual 
> data, consistent with conventional usage of flip() to switch from writing to 
> a buffer to setting it up to be read from (e.g., to be passed into a 
> producer.send() call). E.g.,
> {code:java}
> ByteBuffer bb = ByteBuffer.allocate(TOO_MUCH);
> ... // some sequence of 
> bb.put(...); // populate buffer with some number of bytes less than TOO_MUCH 
> ... 
> bb.flip(); /* logically, this says "I am done writing, let's set this up for 
> reading"; pragmatically, it sets the limit to the current position so that 
> whoever reads the buffer knows when to stop reading, and sets the position to 
> zero so it knows where to start reading from */ 
> producer.send(bb); {code}
> Technically, you wouldn't even need to use flip() there, since position 

[jira] [Reopened] (KAFKA-4852) ByteBufferSerializer not compatible with offsets

2023-10-30 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4852?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reopened KAFKA-4852:

  Assignee: (was: LinShunkang)

> ByteBufferSerializer not compatible with offsets
> 
>
> Key: KAFKA-4852
> URL: https://issues.apache.org/jira/browse/KAFKA-4852
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.1.1
> Environment: all
>Reporter: Werner Daehn
>Priority: Minor
> Fix For: 3.4.0
>
>
> Quick intro: A ByteBuffer.rewind() resets the position to zero. What if the 
> ByteBuffer was created with an offset? new ByteBuffer(data, 3, 10)? The 
> ByteBufferSerializer will send from pos=0 and not from pos=3 onwards.
> Solution: No rewind() but flip() for reading a ByteBuffer. That's what the 
> flip is meant for.
> Story:
> Imagine the incoming data comes from a byte[], e.g. a network stream 
> containing topicname, partition, key, value, ... and you want to create a new 
> ProducerRecord for that. As the constructor of ProducerRecord requires 
> (topic, partition, key, value) you have to copy from above byte[] the key and 
> value. That means there is a memcopy taking place. Since the payload can be 
> potentially large, that introduces a lot of overhead. Twice the memory.
> A nice solution to this problem is to simply wrap the network byte[] into new 
> ByteBuffers:
> ByteBuffer key = ByteBuffer.wrap(data, keystart, keylength);
> ByteBuffer value = ByteBuffer.wrap(data, valuestart, valuelength);
> and then use the ByteBufferSerializer instead of the ByteArraySerializer.
> But that does not work as the ByteBufferSerializer does a rewind(), hence 
> both, key and value, will start at position=0 of the data[].
> public class ByteBufferSerializer implements Serializer {
> public byte[] serialize(String topic, ByteBuffer data) {
>  data.rewind();



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15666) Добавить функцию поиска по почте

2023-10-23 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15666?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-15666.
-
Resolution: Invalid

[~noraverba] – We can only take tickets in English. – Also piped the title 
through a translater an it did not really make sense to me, especially as the 
description is empty.

Close as invalid.

> Добавить функцию поиска по почте
> 
>
> Key: KAFKA-15666
> URL: https://issues.apache.org/jira/browse/KAFKA-15666
> Project: Kafka
>  Issue Type: Improvement
>  Components: build
>Reporter: Eleonora
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15672) Add 3.6 to streams system tests

2023-10-23 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-15672:
---

 Summary: Add 3.6 to streams system tests
 Key: KAFKA-15672
 URL: https://issues.apache.org/jira/browse/KAFKA-15672
 Project: Kafka
  Issue Type: Test
  Components: streams, system tests
Reporter: Matthias J. Sax


3.6.0 was released recently. We need to add `3.6.0` to the system tests (in 
particular upgrade and broker compatibility tests)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15378) Rolling upgrade system tests are failing

2023-10-20 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-15378.
-
Fix Version/s: 3.4.2
   3.5.2
   3.7.0
   3.6.1
   Resolution: Fixed

> Rolling upgrade system tests are failing
> 
>
> Key: KAFKA-15378
> URL: https://issues.apache.org/jira/browse/KAFKA-15378
> Project: Kafka
>  Issue Type: Task
>  Components: streams, system tests
>Affects Versions: 3.5.1
>Reporter: Lucas Brutschy
>Assignee: Matthias J. Sax
>Priority: Major
> Fix For: 3.4.2, 3.5.2, 3.7.0, 3.6.1
>
>
> The system tests are having failures for these tests:
> {noformat}
> kafkatest.tests.streams.streams_upgrade_test.StreamsUpgradeTest.test_rolling_upgrade_with_2_bounces.from_version=3.1.2.to_version=3.6.0-SNAPSHOT
> kafkatest.tests.streams.streams_upgrade_test.StreamsUpgradeTest.test_rolling_upgrade_with_2_bounces.from_version=3.2.3.to_version=3.6.0-SNAPSHOT
> kafkatest.tests.streams.streams_upgrade_test.StreamsUpgradeTest.test_rolling_upgrade_with_2_bounces.from_version=3.3.1.to_version=3.6.0-SNAPSHOT
> {noformat}
> See 
> [https://jenkins.confluent.io/job/system-test-kafka-branch-builder/5801/console]
>  for logs and other test data.
> Note that system tests currently only run with [this 
> fix](https://github.com/apache/kafka/commit/24d1780061a645bb2fbeefd8b8f50123c28ca94e),
>  I think some CVE python library update broke the system tests... 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15437) Add metrics about open iterators

2023-10-05 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-15437.
-
Resolution: Duplicate

> Add metrics about open iterators
> 
>
> Key: KAFKA-15437
> URL: https://issues.apache.org/jira/browse/KAFKA-15437
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
>  Labels: need-kip
>
> Kafka Streams allows to create iterators over state stores. Those iterator 
> must get closed to free up resources (especially for RocksDB). – We regularly 
> get user reports of "resource leaks" that can be pinned down to leaking (ie 
> not-closed) iterators.
> To simplify monitoring, it would be helpful to add a metric about open 
> iterators to allow users to alert and pin-point the issue directly (and 
> before the actually resource leak is observed).
> We might want to have a DEBUG level per-store metric (to allow identifying 
> the store in question quickly), but an already rolled up INFO level metric 
> for the whole application.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15491) RackId doesn't exist error while running WordCountDemo

2023-09-29 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15491?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-15491.
-
Fix Version/s: 3.6.1
   3.7.0
 Assignee: Hao Li
   Resolution: Fixed

> RackId doesn't exist error while running WordCountDemo
> --
>
> Key: KAFKA-15491
> URL: https://issues.apache.org/jira/browse/KAFKA-15491
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Luke Chen
>Assignee: Hao Li
>Priority: Major
> Fix For: 3.6.1, 3.7.0
>
>
> While running the WordCountDemo following the 
> [docs|https://kafka.apache.org/documentation/streams/quickstart], I saw the 
> following error logs in the stream application output. Though everything 
> still works fine, it'd be better there are no ERROR logs in the demo app.
> {code:java}
> [2023-09-24 14:15:11,723] ERROR RackId doesn't exist for process 
> e2391098-23e8-47eb-8d5e-ff6e697c33f5 and consumer 
> streams-wordcount-e2391098-23e8-47eb-8d5e-ff6e697c33f5-StreamThread-1-consumer-adae58be-f5f5-429b-a2b4-67bf732726e8
>  
> (org.apache.kafka.streams.processor.internals.assignment.RackAwareTaskAssignor)
> [2023-09-24 14:15:11,757] ERROR RackId doesn't exist for process 
> e2391098-23e8-47eb-8d5e-ff6e697c33f5 and consumer 
> streams-wordcount-e2391098-23e8-47eb-8d5e-ff6e697c33f5-StreamThread-1-consumer-adae58be-f5f5-429b-a2b4-67bf732726e8
>  
> (org.apache.kafka.streams.processor.internals.assignment.RackAwareTaskAssignor)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15443) Upgrade RocksDB dependency

2023-09-07 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-15443:
---

 Summary: Upgrade RocksDB dependency
 Key: KAFKA-15443
 URL: https://issues.apache.org/jira/browse/KAFKA-15443
 Project: Kafka
  Issue Type: Task
  Components: streams
Reporter: Matthias J. Sax


Kafka Streams currently depends on RocksDB 7.9.2

However, the latest version of RocksDB is already 8.5.3. We should check the 
RocksDB release notes to see what benefits we get to upgrade to the latest 
version (and file corresponding tickets to exploit improvement of newer 
releases as applicable).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15437) Add metrics about open iterators

2023-09-05 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-15437:
---

 Summary: Add metrics about open iterators
 Key: KAFKA-15437
 URL: https://issues.apache.org/jira/browse/KAFKA-15437
 Project: Kafka
  Issue Type: New Feature
  Components: streams
Reporter: Matthias J. Sax


Kafka Streams allows to create iterators over state stores. Those iterator must 
get closed to free up resources (especially for RocksDB). – We regularly get 
user reports of "resource leaks" that can be pinned down to leaking (ie 
not-closed) iterators.

To simplify monitoring, it would be helpful to add a metric about open 
iterators to allow users to alert and pin-point the issue directly (and before 
the actually resource leak is observed).

We might want to have a DEBUG level per-store metric (to allow identifying the 
store in question quickly), but an already rolled up INFO level metric for the 
whole application.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-13197) KStream-GlobalKTable join semantics don't match documentation

2023-08-11 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13197?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-13197.
-
Fix Version/s: 3.6.0
   3.5.2
   Resolution: Fixed

> KStream-GlobalKTable join semantics don't match documentation
> -
>
> Key: KAFKA-13197
> URL: https://issues.apache.org/jira/browse/KAFKA-13197
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation, streams
>Affects Versions: 2.7.0
>Reporter: Tommy Becker
>Assignee: Florin Akermann
>Priority: Major
> Fix For: 3.6.0, 3.5.2
>
>
> As part of KAFKA-10277, the behavior of KStream-GlobalKTable joins was 
> changed. It appears the change was intended to merely relax a requirement but 
> it actually broke backwards compatibility. Although it does allow {{null}} 
> keys and values in the KStream to be joined, it now excludes {{null}} results 
> of the {{KeyValueMapper}}. We have an application which can return {{null}} 
> from the {{KeyValueMapper}} for non-null keys in the KStream, and relies on 
> these nulls being passed to the {{ValueJoiner}}. Indeed the javadoc still 
> explicitly says this is done:
> {quote}If a KStream input record key or value is null the record will not be 
> included in the join operation and thus no output record will be added to the 
> resulting KStream.
>  If keyValueMapper returns null implying no match exists, a null value will 
> be provided to ValueJoiner.
> {quote}
> Both these statements are incorrect.
> I think the new behavior is worse than the previous/documented behavior. It 
> feels more reasonable to have a non-null stream record map to a null join key 
> (our use-case is event-enhancement where the incoming record doesn't have the 
> join field), than the reverse.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15309) Add custom error handler to Producer

2023-08-04 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-15309:
---

 Summary: Add custom error handler to Producer
 Key: KAFKA-15309
 URL: https://issues.apache.org/jira/browse/KAFKA-15309
 Project: Kafka
  Issue Type: New Feature
  Components: producer 
Reporter: Matthias J. Sax


The producer batches up multiple records into batches, and a single record 
specific error might fail the whole batch.

This ticket suggest to add a per-record error handler, that allows user to opt 
into skipping bad records without failing the whole batch (similar to Kafka 
Streams `ProductionExceptionHandler`.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15307) Kafka Streams configuration docs outdate

2023-08-04 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-15307:
---

 Summary: Kafka Streams configuration docs outdate
 Key: KAFKA-15307
 URL: https://issues.apache.org/jira/browse/KAFKA-15307
 Project: Kafka
  Issue Type: Task
  Components: docs, streams
Reporter: Matthias J. Sax


[https://kafka.apache.org/35/documentation/streams/developer-guide/config-streams.html]
 need to be updated.

It's missing a lot of newly added config, and still lists already removed 
configs.

For deprecated configs, we could consider to also remove them, or add a 
"deprecated config" section and keep the for the time being.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15251) Upgrade system test to use 3.5.1

2023-07-25 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-15251:
---

 Summary: Upgrade system test to use 3.5.1
 Key: KAFKA-15251
 URL: https://issues.apache.org/jira/browse/KAFKA-15251
 Project: Kafka
  Issue Type: Test
  Components: streams, system tests
Reporter: Matthias J. Sax


3.5.1 was released and we should update the upgrade system tests accordingly to 
use the new version



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-13295) Long restoration times for new tasks can lead to transaction timeouts

2023-07-12 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-13295.
-
Resolution: Fixed

With the new restore-thread, this issue should be resolved implicilty.

> Long restoration times for new tasks can lead to transaction timeouts
> -
>
> Key: KAFKA-13295
> URL: https://issues.apache.org/jira/browse/KAFKA-13295
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Sagar Rao
>Priority: Critical
>  Labels: eos, new-streams-runtime-should-fix
>
> In some EOS applications with relatively long restoration times we've noticed 
> a series of ProducerFencedExceptions occurring during/immediately after 
> restoration. The broker logs were able to confirm these were due to 
> transactions timing out.
> In Streams, it turns out we automatically begin a new txn when calling 
> {{send}} (if there isn’t already one in flight). A {{send}} occurs often 
> outside a commit during active processing (eg writing to the changelog), 
> leaving the txn open until the next commit. And if a StreamThread has been 
> actively processing when a rebalance results in a new stateful task without 
> revoking any existing tasks, the thread won’t actually commit this open txn 
> before it goes back into the restoration phase while it builds up state for 
> the new task. So the in-flight transaction is left open during restoration, 
> during which the StreamThread only consumes from the changelog without 
> committing, leaving it vulnerable to timing out when restoration times exceed 
> the configured transaction.timeout.ms for the producer client.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-7497) Kafka Streams should support self-join on streams

2023-05-23 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7497?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-7497.

Resolution: Fixed

> Kafka Streams should support self-join on streams
> -
>
> Key: KAFKA-7497
> URL: https://issues.apache.org/jira/browse/KAFKA-7497
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Robin Moffatt
>Priority: Major
>  Labels: needs-kip
>
> There are valid reasons to want to join a stream to itself, but Kafka Streams 
> does not currently support this ({{Invalid topology: Topic foo has already 
> been registered by another source.}}).  To perform the join requires creating 
> a second stream as a clone of the first, and then doing a join between the 
> two. This is a clunky workaround and results in unnecessary duplication of 
> data.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14173) TopologyTestDriver does not use mock wall clock time when sending test records

2023-05-23 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-14173.
-
Resolution: Not A Problem

> TopologyTestDriver does not use mock wall clock time when sending test records
> --
>
> Key: KAFKA-14173
> URL: https://issues.apache.org/jira/browse/KAFKA-14173
> Project: Kafka
>  Issue Type: Bug
>  Components: streams-test-utils
>Affects Versions: 2.3.1
>Reporter: Guido Josquin
>Priority: Minor
>
> I am trying to test a stream-stream join with `TopologyTestDriver`. My goal 
> is to confirm that my topology performs the following left join correctly.
> {code:java}
> bills
>   .leftJoin(payments)(
> {
>   case (billValue, null) => billValue
>   case (billValue, paymentValue) => (billValue.toInt - 
> paymentValue.toInt).toString
> },
> JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(100))
>   )
>   .to("debt")
> {code}
>  
> In other words, if we see a `bill` and a `payment` within 100ms, the payment 
> should be subtracted from the bill. If we do not see a payment, the debt is 
> simply the bill.
> Here is the test code.
> {code:java}
> val simpleLeftJoinTopology = new SimpleLeftJoinTopology
> val driver = new TopologyTestDriver(simpleLeftJoinTopology.topology)
> val serde = Serdes.stringSerde
> val bills = driver.createInputTopic("bills", serde.serializer, 
> serde.serializer)
> val payments = driver.createInputTopic("payments", serde.serializer, 
> serde.serializer)
> val debt = driver.createOutputTopic("debt", serde.deserializer, 
> serde.deserializer)
> bills.pipeInput("fred", "100")
> bills.pipeInput("george", "20")
> payments.pipeInput("fred", "95")
> // When in doubt, sleep twice
> driver.advanceWallClockTime(Duration.ofMillis(500))
> Thread.sleep(500)
> // Send a new record to cause the previous window to be closed
> payments.pipeInput("percy", "0")
> val keyValues = debt.readKeyValuesToList()
> keyValues should contain theSameElementsAs Seq(
>   // This record is present
>   new KeyValue[String, String]("fred", "5"),
>   // This record is missing
>   new KeyValue[String, String]("george", "20")
> )
> {code}
> Full code available at [https://github.com/Oduig/kstreams-left-join-example]
> Is seems that advancing the wall clock time, sleeping, or sending an extra 
> record, never triggers the join condition when data only arrives on the left 
> side. It is possible to circumvent this by passing an explicit event time 
> with each test record. (See 
> https://stackoverflow.com/questions/73443812/using-kafka-streams-topologytestdriver-how-to-test-left-join-between-two-strea/73540161#73540161)
>  
> However, the behavior deviates from a real Kafka broker. With a real broker, 
> if we do not send an event, it uses the wall clock time of the broker 
> instead. The behavior under test should be the same: 
> `driver.advanceWallClockTime` should provide the default time to be used for 
> `TestTopic.pipeInput`, when no other time is specified.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-10575) StateRestoreListener#onRestoreEnd should always be triggered

2023-05-23 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-10575.
-
Fix Version/s: 3.5.0
   Resolution: Fixed

> StateRestoreListener#onRestoreEnd should always be triggered
> 
>
> Key: KAFKA-10575
> URL: https://issues.apache.org/jira/browse/KAFKA-10575
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Major
> Fix For: 3.5.0
>
>
> Today we only trigger `StateRestoreListener#onRestoreEnd` when we complete 
> the restoration of an active task and transit it to the running state. 
> However the restoration can also be stopped when the restoring task gets 
> closed (because it gets migrated to another client, for example). We should 
> also trigger the callback indicating its progress when the restoration 
> stopped in any scenarios.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-4327) Move Reset Tool from core to streams

2023-04-19 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4327?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-4327.

Fix Version/s: (was: 4.0.0)
   Resolution: Fixed

This was resolved via https://issues.apache.org/jira/browse/KAFKA-14586.

> Move Reset Tool from core to streams
> 
>
> Key: KAFKA-4327
> URL: https://issues.apache.org/jira/browse/KAFKA-4327
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Blocker
>  Labels: kip
>
> This is a follow up on https://issues.apache.org/jira/browse/KAFKA-4008
> Currently, Kafka Streams Application Reset Tool is part of {{core}} module 
> due to ZK dependency. After KIP-4 got merged, this dependency can be dropped 
> and the Reset Tool can be moved to {{streams}} module.
> This should also update {{InternalTopicManager#filterExistingTopics}} that 
> revers to ResetTool in an exception message:
>  {{"Use 'kafka.tools.StreamsResetter' tool"}}
>  -> {{"Use '" + kafka.tools.StreamsResetter.getClass().getName() + "' tool"}}
> Doing this JIRA also requires to update the docs with regard to broker 
> backward compatibility – not all broker support "topic delete request" and 
> thus, the reset tool will not be backward compatible to all broker versions.
> KIP-756: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-756%3A+Move+StreamsResetter+tool+outside+of+core]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-7499) Extend ProductionExceptionHandler to cover serialization exceptions

2023-04-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-7499.

Fix Version/s: 3.5.0
   Resolution: Fixed

> Extend ProductionExceptionHandler to cover serialization exceptions
> ---
>
> Key: KAFKA-7499
> URL: https://issues.apache.org/jira/browse/KAFKA-7499
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Philip Nee
>Priority: Major
>  Labels: beginner, kip, newbie, newbie++
> Fix For: 3.5.0
>
>
> In 
> [KIP-210|https://cwiki.apache.org/confluence/display/KAFKA/KIP-210+-+Provide+for+custom+error+handling++when+Kafka+Streams+fails+to+produce],
>  an exception handler for the write path was introduced. This exception 
> handler covers exception that are raised in the producer callback.
> However, serialization happens before the data is handed to the producer with 
> Kafka Streams itself and the producer uses `byte[]/byte[]` key-value-pair 
> types.
> Thus, we might want to extend the ProductionExceptionHandler to cover 
> serialization exception, too, to skip over corrupted output messages. An 
> example could be a "String" message that contains invalid JSON and should be 
> serialized as JSON.
> KIP-399 (not voted yet; feel free to pick it up): 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-399%3A+Extend+ProductionExceptionHandler+to+cover+serialization+exceptions]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14834) Improved processor semantics for versioned stores

2023-04-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14834?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-14834.
-
Fix Version/s: 3.5.0
   Resolution: Fixed

> Improved processor semantics for versioned stores
> -
>
> Key: KAFKA-14834
> URL: https://issues.apache.org/jira/browse/KAFKA-14834
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Victoria Xia
>Assignee: Victoria Xia
>Priority: Major
>  Labels: kip, streams
> Fix For: 3.5.0
>
>
> With the introduction of versioned state stores in 
> [KIP-889|https://cwiki.apache.org/confluence/display/KAFKA/KIP-889%3A+Versioned+State+Stores],
>  we should leverage them to provide improved join semantics. 
> As described in 
> [KIP-914|https://cwiki.apache.org/confluence/display/KAFKA/KIP-914%3A+DSL+Processor+Semantics+for+Versioned+Stores],
>  we will make the following four improvements:
>  * stream-table joins will perform a timestamped lookup (using the 
> stream-side record timestamp) if the table is versioned
>  * table-table joins, including foreign key joins, will not produce new join 
> results on out-of-order records (by key) from versioned tables
>  * table filters will disable the existing optimization to not send duplicate 
> tombstones when applied to a versioned table
>  * table aggregations will ignore out-of-order records when aggregating a 
> versioned table



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (KAFKA-14318) KIP-878: Autoscaling for Statically Partitioned Streams

2023-04-07 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14318?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reopened KAFKA-14318:
-

> KIP-878: Autoscaling for Statically Partitioned Streams
> ---
>
> Key: KAFKA-14318
> URL: https://issues.apache.org/jira/browse/KAFKA-14318
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Priority: Major
>  Labels: kip
>
> [KIP-878: Autoscaling for Statically Partitioned 
> Streams|https://cwiki.apache.org/confluence/display/KAFKA/KIP-878%3A+Autoscaling+for+Statically+Partitioned+Streams]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14318) KIP-878: Autoscaling for Statically Partitioned Streams

2023-04-07 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14318?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-14318.
-
Resolution: Fixed

> KIP-878: Autoscaling for Statically Partitioned Streams
> ---
>
> Key: KAFKA-14318
> URL: https://issues.apache.org/jira/browse/KAFKA-14318
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Priority: Major
>  Labels: kip
>
> [KIP-878: Autoscaling for Statically Partitioned 
> Streams|https://cwiki.apache.org/confluence/display/KAFKA/KIP-878%3A+Autoscaling+for+Statically+Partitioned+Streams]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14491) Introduce Versioned Key-Value Stores to Kafka Streams

2023-04-05 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14491?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-14491.
-
Fix Version/s: 3.5.0
   Resolution: Fixed

> Introduce Versioned Key-Value Stores to Kafka Streams
> -
>
> Key: KAFKA-14491
> URL: https://issues.apache.org/jira/browse/KAFKA-14491
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Victoria Xia
>Assignee: Victoria Xia
>Priority: Major
>  Labels: kip, streams
> Fix For: 3.5.0
>
>
> The key-value state stores used by Kafka Streams today maintain only the 
> latest value associated with each key. In order to support applications which 
> require access to older record versions, Kafka Streams should have versioned 
> state stores. Versioned state stores are similar to key-value stores except 
> they can store multiple record versions for a single key. An example use case 
> for versioned key-value stores is in providing proper temporal join semantics 
> for stream-tables joins with regards to out-of-order data.
> See KIP for more: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-889%3A+Versioned+State+Stores



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14864) Memory leak in KStreamWindowAggregate with ON_WINDOW_CLOSE emit strategy

2023-04-03 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14864?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-14864.
-
Fix Version/s: 3.4.1
   3.3.3
   Resolution: Fixed

> Memory leak in KStreamWindowAggregate with ON_WINDOW_CLOSE emit strategy
> 
>
> Key: KAFKA-14864
> URL: https://issues.apache.org/jira/browse/KAFKA-14864
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.4.0, 3.3.2
>Reporter: Victoria Xia
>Assignee: Victoria Xia
>Priority: Major
> Fix For: 3.5.0, 3.4.1, 3.3.3
>
>
> The Streams DSL processor implementation for the ON_WINDOW_CLOSE emit 
> strategy during KStream windowed aggregations opens a key-value iterator but 
> does not call `close()` on it 
> ([link|https://github.com/apache/kafka/blob/5afedd9ac37c4d740f47867cfd31eaed15dc542f/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKStreamTimeWindowAggregateProcessor.java#L203]),
>  despite the Javadocs for the iterator making clear that users must do so in 
> order to release resources 
> ([link|https://github.com/apache/kafka/blob/5afedd9ac37c4d740f47867cfd31eaed15dc542f/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java#L27]).
>   
> I discovered this bug while running load testing benchmarks and noticed that 
> some runs were sporadically hitting OOMs, so it is definitely possible to hit 
> this in practice.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14839) Exclude protected variable from JavaDocs

2023-03-23 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-14839:
---

 Summary: Exclude protected variable from JavaDocs
 Key: KAFKA-14839
 URL: https://issues.apache.org/jira/browse/KAFKA-14839
 Project: Kafka
  Issue Type: Bug
  Components: documentation, streams
Reporter: Matthias J. Sax


Cf 
[https://kafka.apache.org/31/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#enableSpuriousResultFix]

The variable `enableSpuriousResultFix` is protected, and it's not public API, 
and thus should not show up in the JavaDocs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-4106) Consumer / add configure method to PartitionAssignor interface

2023-02-24 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4106?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-4106.

Resolution: Fixed

> Consumer / add configure method to PartitionAssignor interface
> --
>
> Key: KAFKA-4106
> URL: https://issues.apache.org/jira/browse/KAFKA-4106
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Affects Versions: 0.10.0.1
>Reporter: Florian Hussonnois
>Assignee: Jason Gustafson
>Priority: Minor
>
> Currently, we can implement a custom PartitionAssignor which will forward 
> user data that will be used during the assignments protocol. For example, 
> data can be used to implement a rack-aware assignor
> However, currently we cannot dynamically configure a PartitionAssignor 
> instance.
> It would be nice to add a method configure(Map PartitionAssignor interface. Then, this method will be invoked by the 
> KafkaConsumer  on each assignor, as this is do for deserializers.
> The code modifications are pretty straight-forward but involve modifying the 
> public interface PartitionAssignor. Does that mean this JIRA needs a KIP ?
> I can contribute to that improvement.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-8177) Allow for separate connect instances to have sink connectors with the same name

2023-02-24 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8177?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-8177.

Resolution: Fixed

> Allow for separate connect instances to have sink connectors with the same 
> name
> ---
>
> Key: KAFKA-8177
> URL: https://issues.apache.org/jira/browse/KAFKA-8177
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Paul Whalen
>Priority: Minor
>  Labels: connect
>
> If you have multiple Connect instances (either a single standalone or 
> distributed group of workers) running against the same Kafka cluster, the 
> connect instances cannot each have a sink connector with the same name and 
> still operate independently. This is because the consumer group ID used 
> internally for reading from the source topic(s) is entirely derived from the 
> connector's name: 
> [https://github.com/apache/kafka/blob/d0e436c471ba4122ddcc0f7a1624546f97c4a517/connect/runtime/src/main/java/org/apache/kafka/connect/util/SinkUtils.java#L24]
> The documentation of Connect implies to me that it supports "multi-tenancy," 
> that is, as long as...
>  * In standalone mode, the {{offset.storage.file.filename}} is not shared 
> between instances
>  * In distributed mode, {{group.id}} and {{config.storage.topic}}, 
> {{offset.storage.topic}}, and {{status.storage.topic}} are not the same 
> between instances
> ... then the connect instances can operate completely independently without 
> fear of conflict.  But the sink connector consumer group naming policy makes 
> this untrue. Obviously this can be achieved by uniquely naming connectors 
> across instances, but in some environments that could be a bit of a nuisance, 
> or a challenging policy to enforce. For instance, imagine a large group of 
> developers or data analysts all running their own standalone Connect to load 
> into a SQL database for their own analysis, or replicating to mirroring to 
> their own local cluster for testing.
> The obvious solution is allow supplying config that gives a Connect instance 
> some notion of identity, and to use that when creating the sink task consumer 
> group. Distributed mode already has this obviously ({{group.id}}), but it 
> would need to be added for standalone mode. Maybe {{instance.id}}? Given that 
> solution it seems like this would need a small KIP.
> I could also imagine this solving this problem through better documentation 
> ("ensure your connector names are unique!"), but having that subtlety doesn't 
> seem worth it to me. (Optionally) assigning identity to every Connect 
> instance seems strictly more clear, without any downside.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (KAFKA-8177) Allow for separate connect instances to have sink connectors with the same name

2023-02-24 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8177?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reopened KAFKA-8177:


> Allow for separate connect instances to have sink connectors with the same 
> name
> ---
>
> Key: KAFKA-8177
> URL: https://issues.apache.org/jira/browse/KAFKA-8177
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Paul Whalen
>Priority: Minor
>  Labels: connect
>
> If you have multiple Connect instances (either a single standalone or 
> distributed group of workers) running against the same Kafka cluster, the 
> connect instances cannot each have a sink connector with the same name and 
> still operate independently. This is because the consumer group ID used 
> internally for reading from the source topic(s) is entirely derived from the 
> connector's name: 
> [https://github.com/apache/kafka/blob/d0e436c471ba4122ddcc0f7a1624546f97c4a517/connect/runtime/src/main/java/org/apache/kafka/connect/util/SinkUtils.java#L24]
> The documentation of Connect implies to me that it supports "multi-tenancy," 
> that is, as long as...
>  * In standalone mode, the {{offset.storage.file.filename}} is not shared 
> between instances
>  * In distributed mode, {{group.id}} and {{config.storage.topic}}, 
> {{offset.storage.topic}}, and {{status.storage.topic}} are not the same 
> between instances
> ... then the connect instances can operate completely independently without 
> fear of conflict.  But the sink connector consumer group naming policy makes 
> this untrue. Obviously this can be achieved by uniquely naming connectors 
> across instances, but in some environments that could be a bit of a nuisance, 
> or a challenging policy to enforce. For instance, imagine a large group of 
> developers or data analysts all running their own standalone Connect to load 
> into a SQL database for their own analysis, or replicating to mirroring to 
> their own local cluster for testing.
> The obvious solution is allow supplying config that gives a Connect instance 
> some notion of identity, and to use that when creating the sink task consumer 
> group. Distributed mode already has this obviously ({{group.id}}), but it 
> would need to be added for standalone mode. Maybe {{instance.id}}? Given that 
> solution it seems like this would need a small KIP.
> I could also imagine this solving this problem through better documentation 
> ("ensure your connector names are unique!"), but having that subtlety doesn't 
> seem worth it to me. (Optionally) assigning identity to every Connect 
> instance seems strictly more clear, without any downside.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-5452) Aggressive log compaction ratio appears to have no negative effect on log-compacted topics

2023-02-24 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5452?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-5452.

Resolution: Fixed

> Aggressive log compaction ratio appears to have no negative effect on 
> log-compacted topics
> --
>
> Key: KAFKA-5452
> URL: https://issues.apache.org/jira/browse/KAFKA-5452
> Project: Kafka
>  Issue Type: Improvement
>  Components: config, core, log
>Affects Versions: 0.10.2.0, 0.10.2.1
> Environment: Ubuntu Trusty (14.04.5), Oracle JDK 8
>Reporter: Jeff Chao
>Priority: Major
>  Labels: performance
> Attachments: 200mbs-dirty0-dirty-1-dirty05.png, 
> flame-graph-200mbs-dirty0.png, flame-graph-200mbs-dirty0.svg
>
>
> Some of our users are seeing unintuitive/unexpected behavior with 
> log-compacted topics where they receive multiple records for the same key 
> when consuming. This is a result of low throughput on log-compacted topics 
> such that conditions ({{min.cleanable.dirty.ratio = 0.5}}, default) aren't 
> met for compaction to kick in.
> This prompted us to test and tune {{min.cleanable.dirty.ratio}} in our 
> clusters. It appears that having more aggressive log compaction ratios don't 
> have negative effects on CPU and memory utilization. If this is truly the 
> case, we should consider changing the default from {{0.5}} to something more 
> aggressive.
> Setup:
> # 8 brokers
> # 5 zk nodes
> # 32 partitions on a topic
> # replication factor 3
> # log roll 3 hours
> # log segment bytes 1 GB
> # log retention 24 hours
> # all messages to a single key
> # all messages to a unique key
> # all messages to a bounded key range [0, 999]
> # {{min.cleanable.dirty.ratio}} per topic = {{0}}, {{0.5}}, and {{1}}
> # 200 MB/s sustained, produce and consume traffic
> Observations:
> We were able to verify log cleaner threads were performing work by checking 
> the logs and verifying the {{cleaner-offset-checkpoint}} file for all topics. 
> We also observed the log cleaner's {{time-since-last-run-ms}} metric was 
> normal, never going above the default of 15 seconds.
> Under-replicated partitions stayed steady, same for replication lag.
> Here's an example test run where we try out {{min.cleanable.dirty.ratio = 
> 0}}, {{min.cleanable.dirty.ratio = 1}}, and {{min.cleanable.dirty.ratio = 
> 0.5}}. Troughs in between the peaks represent zero traffic and reconfiguring 
> of topics.
> (200mbs-dirty-0-dirty1-dirty05.png attached)
> !200mbs-dirty0-dirty-1-dirty05.png|thumbnail!
> Memory utilization is fine, but more interestingly, CPU doesn't appear to 
> have much difference.
> To get more detail, here is a flame graph (raw svg attached) of the run for 
> {{min.cleanable.dirty.ratio = 0}}. The conservative and default ratio flame 
> graphs are equivalent.
> (flame-graph-200mbs-dirty0.png attached)
> !flame-graph-200mbs-dirty0.png|thumbnail!
> Notice that the majority of CPU is coming from:
> # SSL operations (on reads/writes)
> # KafkaApis::handleFetchRequest (ReplicaManager::fetchMessages)
> # KafkaApis::handleOffsetFetchRequest
> We also have examples from small scale test runs which show similar behavior 
> but with scaled down CPU usage.
> It seems counterintuitive that there's no apparent difference in CPU whether 
> it be aggressive or conservative compaction ratios, so we'd like to get some 
> thoughts from the community.
> We're looking for feedback on whether or not anyone else has experienced this 
> behavior before as well or, if CPU isn't affected, has anyone seen something 
> related instead.
> If this is true, then we'd be happy to discuss further and provide a patch.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (KAFKA-4106) Consumer / add configure method to PartitionAssignor interface

2023-02-24 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4106?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reopened KAFKA-4106:


> Consumer / add configure method to PartitionAssignor interface
> --
>
> Key: KAFKA-4106
> URL: https://issues.apache.org/jira/browse/KAFKA-4106
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Affects Versions: 0.10.0.1
>Reporter: Florian Hussonnois
>Assignee: Jason Gustafson
>Priority: Minor
>
> Currently, we can implement a custom PartitionAssignor which will forward 
> user data that will be used during the assignments protocol. For example, 
> data can be used to implement a rack-aware assignor
> However, currently we cannot dynamically configure a PartitionAssignor 
> instance.
> It would be nice to add a method configure(Map PartitionAssignor interface. Then, this method will be invoked by the 
> KafkaConsumer  on each assignor, as this is do for deserializers.
> The code modifications are pretty straight-forward but involve modifying the 
> public interface PartitionAssignor. Does that mean this JIRA needs a KIP ?
> I can contribute to that improvement.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (KAFKA-3117) Fail test at: PlaintextConsumerTest. testAutoCommitOnRebalance

2023-02-24 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3117?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reopened KAFKA-3117:


> Fail test at: PlaintextConsumerTest. testAutoCommitOnRebalance 
> ---
>
> Key: KAFKA-3117
> URL: https://issues.apache.org/jira/browse/KAFKA-3117
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Affects Versions: 0.9.0.0
> Environment: oracle java764bit
> ubuntu 13.10 
>Reporter: edwardt
>Assignee: Jason Gustafson
>Priority: Major
>  Labels: newbie, test, transient-unit-test-failure
>
> java.lang.AssertionError: Expected partitions [topic-0, topic-1, topic2-0, 
> topic2-1] but actually got [topic-0, topic-1]
>   at org.junit.Assert.fail(Assert.java:88)
>   at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:730)
>   at 
> kafka.api.BaseConsumerTest.testAutoCommitOnRebalance(BaseConsumerTest.scala:125)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:22



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-6014) new consumer mirror maker halts after committing offsets to a deleted topic

2023-02-24 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6014?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-6014.

Resolution: Fixed

> new consumer mirror maker halts after committing offsets to a deleted topic
> ---
>
> Key: KAFKA-6014
> URL: https://issues.apache.org/jira/browse/KAFKA-6014
> Project: Kafka
>  Issue Type: Bug
>Reporter: Onur Karaman
>Assignee: Jason Gustafson
>Priority: Major
>
> New consumer throws an unexpected KafkaException when trying to commit to a 
> topic that has been deleted. MirrorMaker.commitOffsets doesn't attempt to 
> catch the KafkaException and just kills the process. We didn't see this in 
> the old consumer because old consumer just silently drops failed offset 
> commits.
> I ran a quick experiment locally to prove the behavior. The experiment:
> 1. start up a single broker
> 2. create a single-partition topic t
> 3. create a new consumer that consumes topic t
> 4. make the consumer commit every few seconds
> 5. delete topic t
> 6. expect: KafkaException that kills the process.
> Here's my script:
> {code}
> package org.apache.kafka.clients.consumer;
> import org.apache.kafka.common.TopicPartition;
> import java.util.Collections;
> import java.util.List;
> import java.util.Properties;
> public class OffsetCommitTopicDeletionTest {
> public static void main(String[] args) throws InterruptedException {
> Properties props = new Properties();
> props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
> "localhost:9090");
> props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "g");
> props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
> "org.apache.kafka.common.serialization.ByteArrayDeserializer");
> props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
> "org.apache.kafka.common.serialization.ByteArrayDeserializer");
> props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
> KafkaConsumer kafkaConsumer = new 
> KafkaConsumer<>(props);
> TopicPartition partition = new TopicPartition("t", 0);
> List partitions = 
> Collections.singletonList(partition);
> kafkaConsumer.assign(partitions);
> while (true) {
> kafkaConsumer.commitSync(Collections.singletonMap(partition, new 
> OffsetAndMetadata(0, "")));
> Thread.sleep(1000);
> }
> }
> }
> {code}
> Here are the other commands:
> {code}
> > rm -rf /tmp/zookeeper/ /tmp/kafka-logs* logs*
> > ./gradlew clean jar
> > ./bin/zookeeper-server-start.sh config/zookeeper.properties
> > export LOG_DIR=logs0 && ./bin/kafka-server-start.sh 
> > config/server0.properties
> > ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic t 
> > --partitions 1 --replication-factor 1
> > ./bin/kafka-run-class.sh 
> > org.apache.kafka.clients.consumer.OffsetCommitTopicDeletionTest
> > ./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic t
> {code}
> Here is the output:
> {code}
> [2017-10-04 20:00:14,451] ERROR [Consumer clientId=consumer-1, groupId=g] 
> Offset commit failed on partition t-0 at offset 0: This server does not host 
> this topic-partition. 
> (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
> Exception in thread "main" org.apache.kafka.common.KafkaException: Partition 
> t-0 may not exist or user may not have Describe access to topic
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:789)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:734)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:808)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:788)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:506)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:353)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:268)
>   at 
> 

[jira] [Reopened] (KAFKA-4187) Adding a flag to prefix topics with mirror maker

2023-02-24 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4187?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reopened KAFKA-4187:


> Adding a flag to prefix topics with mirror maker
> 
>
> Key: KAFKA-4187
> URL: https://issues.apache.org/jira/browse/KAFKA-4187
> Project: Kafka
>  Issue Type: Improvement
>  Components: tools
>Affects Versions: 0.8.2.1, 0.9.0.1, 0.10.0.0, 0.10.0.1
>Reporter: Vincent Rischmann
>Priority: Minor
>
> So I have a setup where I need to mirror our production cluster to our 
> preproduction cluster, but can't use the original topic names.
> I've patched mirror maker to allow me to define a prefix for each topic and I 
> basically prefix everything with mirror_. I'm wondering if there's interest 
> for this feature upstream ?
> I have a patch available for Kafka 0.9.0.1 (what I'm using) and from what 
> I've seen it should apply well to Kafka 0.10.0.X too.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-4187) Adding a flag to prefix topics with mirror maker

2023-02-24 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4187?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-4187.

Resolution: Fixed

> Adding a flag to prefix topics with mirror maker
> 
>
> Key: KAFKA-4187
> URL: https://issues.apache.org/jira/browse/KAFKA-4187
> Project: Kafka
>  Issue Type: Improvement
>  Components: tools
>Affects Versions: 0.8.2.1, 0.9.0.1, 0.10.0.0, 0.10.0.1
>Reporter: Vincent Rischmann
>Priority: Minor
>
> So I have a setup where I need to mirror our production cluster to our 
> preproduction cluster, but can't use the original topic names.
> I've patched mirror maker to allow me to define a prefix for each topic and I 
> basically prefix everything with mirror_. I'm wondering if there's interest 
> for this feature upstream ?
> I have a patch available for Kafka 0.9.0.1 (what I'm using) and from what 
> I've seen it should apply well to Kafka 0.10.0.X too.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-3117) Fail test at: PlaintextConsumerTest. testAutoCommitOnRebalance

2023-02-24 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3117?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-3117.

Resolution: Fixed

> Fail test at: PlaintextConsumerTest. testAutoCommitOnRebalance 
> ---
>
> Key: KAFKA-3117
> URL: https://issues.apache.org/jira/browse/KAFKA-3117
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Affects Versions: 0.9.0.0
> Environment: oracle java764bit
> ubuntu 13.10 
>Reporter: edwardt
>Assignee: Jason Gustafson
>Priority: Major
>  Labels: newbie, test, transient-unit-test-failure
>
> java.lang.AssertionError: Expected partitions [topic-0, topic-1, topic2-0, 
> topic2-1] but actually got [topic-0, topic-1]
>   at org.junit.Assert.fail(Assert.java:88)
>   at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:730)
>   at 
> kafka.api.BaseConsumerTest.testAutoCommitOnRebalance(BaseConsumerTest.scala:125)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:22



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (KAFKA-5452) Aggressive log compaction ratio appears to have no negative effect on log-compacted topics

2023-02-24 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5452?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reopened KAFKA-5452:


> Aggressive log compaction ratio appears to have no negative effect on 
> log-compacted topics
> --
>
> Key: KAFKA-5452
> URL: https://issues.apache.org/jira/browse/KAFKA-5452
> Project: Kafka
>  Issue Type: Improvement
>  Components: config, core, log
>Affects Versions: 0.10.2.0, 0.10.2.1
> Environment: Ubuntu Trusty (14.04.5), Oracle JDK 8
>Reporter: Jeff Chao
>Priority: Major
>  Labels: performance
> Attachments: 200mbs-dirty0-dirty-1-dirty05.png, 
> flame-graph-200mbs-dirty0.png, flame-graph-200mbs-dirty0.svg
>
>
> Some of our users are seeing unintuitive/unexpected behavior with 
> log-compacted topics where they receive multiple records for the same key 
> when consuming. This is a result of low throughput on log-compacted topics 
> such that conditions ({{min.cleanable.dirty.ratio = 0.5}}, default) aren't 
> met for compaction to kick in.
> This prompted us to test and tune {{min.cleanable.dirty.ratio}} in our 
> clusters. It appears that having more aggressive log compaction ratios don't 
> have negative effects on CPU and memory utilization. If this is truly the 
> case, we should consider changing the default from {{0.5}} to something more 
> aggressive.
> Setup:
> # 8 brokers
> # 5 zk nodes
> # 32 partitions on a topic
> # replication factor 3
> # log roll 3 hours
> # log segment bytes 1 GB
> # log retention 24 hours
> # all messages to a single key
> # all messages to a unique key
> # all messages to a bounded key range [0, 999]
> # {{min.cleanable.dirty.ratio}} per topic = {{0}}, {{0.5}}, and {{1}}
> # 200 MB/s sustained, produce and consume traffic
> Observations:
> We were able to verify log cleaner threads were performing work by checking 
> the logs and verifying the {{cleaner-offset-checkpoint}} file for all topics. 
> We also observed the log cleaner's {{time-since-last-run-ms}} metric was 
> normal, never going above the default of 15 seconds.
> Under-replicated partitions stayed steady, same for replication lag.
> Here's an example test run where we try out {{min.cleanable.dirty.ratio = 
> 0}}, {{min.cleanable.dirty.ratio = 1}}, and {{min.cleanable.dirty.ratio = 
> 0.5}}. Troughs in between the peaks represent zero traffic and reconfiguring 
> of topics.
> (200mbs-dirty-0-dirty1-dirty05.png attached)
> !200mbs-dirty0-dirty-1-dirty05.png|thumbnail!
> Memory utilization is fine, but more interestingly, CPU doesn't appear to 
> have much difference.
> To get more detail, here is a flame graph (raw svg attached) of the run for 
> {{min.cleanable.dirty.ratio = 0}}. The conservative and default ratio flame 
> graphs are equivalent.
> (flame-graph-200mbs-dirty0.png attached)
> !flame-graph-200mbs-dirty0.png|thumbnail!
> Notice that the majority of CPU is coming from:
> # SSL operations (on reads/writes)
> # KafkaApis::handleFetchRequest (ReplicaManager::fetchMessages)
> # KafkaApis::handleOffsetFetchRequest
> We also have examples from small scale test runs which show similar behavior 
> but with scaled down CPU usage.
> It seems counterintuitive that there's no apparent difference in CPU whether 
> it be aggressive or conservative compaction ratios, so we'd like to get some 
> thoughts from the community.
> We're looking for feedback on whether or not anyone else has experienced this 
> behavior before as well or, if CPU isn't affected, has anyone seen something 
> related instead.
> If this is true, then we'd be happy to discuss further and provide a patch.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-949) Integrate kafka into YARN

2023-02-24 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-949?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-949.
---
Resolution: Fixed

> Integrate kafka into YARN
> -
>
> Key: KAFKA-949
> URL: https://issues.apache.org/jira/browse/KAFKA-949
> Project: Kafka
>  Issue Type: New Feature
>  Components: contrib
>Affects Versions: 0.8.0
> Environment: hadoop 2-0.X
>Reporter: Kam Kasravi
>Priority: Major
>
> kafka is being added to bigtop (BIGTOP-989). Having kafka services available 
> under YARN will enable a number of cluster operations for kafka that YARN 
> handles.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (KAFKA-3410) Unclean leader election and "Halting because log truncation is not allowed"

2023-02-24 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3410?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reopened KAFKA-3410:


> Unclean leader election and "Halting because log truncation is not allowed"
> ---
>
> Key: KAFKA-3410
> URL: https://issues.apache.org/jira/browse/KAFKA-3410
> Project: Kafka
>  Issue Type: Bug
>  Components: replication
>Reporter: James Cheng
>Priority: Major
>  Labels: reliability
>
> I ran into a scenario where one of my brokers would continually shutdown, 
> with the error message:
> [2016-02-25 00:29:39,236] FATAL [ReplicaFetcherThread-0-1], Halting because 
> log truncation is not allowed for topic test, Current leader 1's latest 
> offset 0 is less than replica 2's latest offset 151 
> (kafka.server.ReplicaFetcherThread)
> I managed to reproduce it with the following scenario:
> 1. Start broker1, with unclean.leader.election.enable=false
> 2. Start broker2, with unclean.leader.election.enable=false
> 3. Create topic, single partition, with replication-factor 2.
> 4. Write data to the topic.
> 5. At this point, both brokers are in the ISR. Broker1 is the partition 
> leader.
> 6. Ctrl-Z on broker2. (Simulates a GC pause or a slow network) Broker2 gets 
> dropped out of ISR. Broker1 is still the leader. I can still write data to 
> the partition.
> 7. Shutdown Broker1. Hard or controlled, doesn't matter.
> 8. rm -rf the log directory of broker1. (This simulates a disk replacement or 
> full hardware replacement)
> 9. Resume broker2. It attempts to connect to broker1, but doesn't succeed 
> because broker1 is down. At this point, the partition is offline. Can't write 
> to it.
> 10. Resume broker1. Broker1 resumes leadership of the topic. Broker2 attempts 
> to join ISR, and immediately halts with the error message:
> [2016-02-25 00:29:39,236] FATAL [ReplicaFetcherThread-0-1], Halting because 
> log truncation is not allowed for topic test, Current leader 1's latest 
> offset 0 is less than replica 2's latest offset 151 
> (kafka.server.ReplicaFetcherThread)
> I am able to recover by setting unclean.leader.election.enable=true on my 
> brokers.
> I'm trying to understand a couple things:
> * In step 10, why is broker1 allowed to resume leadership even though it has 
> no data?
> * In step 10, why is it necessary to stop the entire broker due to one 
> partition that is in this state? Wouldn't it be possible for the broker to 
> continue to serve traffic for all the other topics, and just mark this one as 
> unavailable?
> * Would it make sense to allow an operator to manually specify which broker 
> they want to become the new master? This would give me more control over how 
> much data loss I am willing to handle. In this case, I would want broker2 to 
> become the new master. Or, is that possible and I just don't know how to do 
> it?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-8622) Snappy Compression Not Working

2023-02-24 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8622?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-8622.

Resolution: Fixed

> Snappy Compression Not Working
> --
>
> Key: KAFKA-8622
> URL: https://issues.apache.org/jira/browse/KAFKA-8622
> Project: Kafka
>  Issue Type: Bug
>  Components: compression
>Affects Versions: 2.3.0, 2.2.1
>Reporter: Kunal Verma
>Assignee: kaushik srinivas
>Priority: Major
>
> I am trying to produce a message on the broker with compression enabled as 
> snappy.
> Environment :
> Brokers[Kafka-cluster] are hosted on Centos 7
> I have download the latest version (2.3.0 & 2.2.1) tar, extract it and moved 
> to /opt/kafka-
> I have executed the broker with standard configuration.
> In my producer service(written in java), I have enabled snappy compression.
> props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
>  
> so while sending record on broker, I am getting following errors:
> org.apache.kafka.common.errors.UnknownServerException: The server experienced 
> an unexpected error when processing the request
>  
> While investing further at broker end I got following error in log
>  
> logs/kafkaServer.out:java.lang.UnsatisfiedLinkError: 
> /tmp/snappy-1.1.7-ecd381af-ffdd-4a5c-a3d8-b802d0fa4e85-libsnappyjava.so: 
> /tmp/snappy-1.1.7-ecd381af-ffdd-4a5c-a3d8-b802d0fa4e85-libsnappyjava.so: 
> failed to map segment from shared object: Operation not permitted
> --
>  
> [2019-07-02 15:29:43,399] ERROR [ReplicaManager broker=1] Error processing 
> append operation on partition test-bulk-1 (kafka.server.ReplicaManager)
> java.lang.NoClassDefFoundError: Could not initialize class 
> org.xerial.snappy.Snappy
> at 
> org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:435)
> at org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:466)
> at java.io.DataInputStream.readByte(DataInputStream.java:265)
> at org.apache.kafka.common.utils.ByteUtils.readVarint(ByteUtils.java:168)
> at 
> org.apache.kafka.common.record.DefaultRecord.readFrom(DefaultRecord.java:293)
> at 
> org.apache.kafka.common.record.DefaultRecordBatch$1.readNext(DefaultRecordBatch.java:264)
> at 
> org.apache.kafka.common.record.DefaultRecordBatch$RecordIterator.next(DefaultRecordBatch.java:569)
> at 
> org.apache.kafka.common.record.DefaultRecordBatch$RecordIterator.next(DefaultRecordBatch.java:538)
> at 
> org.apache.kafka.common.record.DefaultRecordBatch.iterator(DefaultRecordBatch.java:327)
> at 
> scala.collection.convert.Wrappers$JIterableWrapper.iterator(Wrappers.scala:55)
> at scala.collection.IterableLike.foreach(IterableLike.scala:74)
> at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
> at 
> kafka.log.LogValidator$.$anonfun$validateMessagesAndAssignOffsetsCompressed$1(LogValidator.scala:269)
> at 
> kafka.log.LogValidator$.$anonfun$validateMessagesAndAssignOffsetsCompressed$1$adapted(LogValidator.scala:261)
> at scala.collection.Iterator.foreach(Iterator.scala:941)
> at scala.collection.Iterator.foreach$(Iterator.scala:941)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
> at scala.collection.IterableLike.foreach(IterableLike.scala:74)
> at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
> at 
> kafka.log.LogValidator$.validateMessagesAndAssignOffsetsCompressed(LogValidator.scala:261)
> at 
> kafka.log.LogValidator$.validateMessagesAndAssignOffsets(LogValidator.scala:73)
> at kafka.log.Log.liftedTree1$1(Log.scala:881)
> at kafka.log.Log.$anonfun$append$2(Log.scala:868)
> at kafka.log.Log.maybeHandleIOException(Log.scala:2065)
> at kafka.log.Log.append(Log.scala:850)
> at kafka.log.Log.appendAsLeader(Log.scala:819)
> at 
> kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:772)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:253)
> at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:259)
> at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:759)
> at 
> kafka.server.ReplicaManager.$anonfun$appendToLocalLog$2(ReplicaManager.scala:763)
> at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:237)
> at scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149)
> at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237)
> at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230)
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44)
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:149)
> at scala.collection.TraversableLike.map(TraversableLike.scala:237)
> at scala.collection.TraversableLike.map$(TraversableLike.scala:230)
> at 

[jira] [Reopened] (KAFKA-6014) new consumer mirror maker halts after committing offsets to a deleted topic

2023-02-24 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6014?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reopened KAFKA-6014:


> new consumer mirror maker halts after committing offsets to a deleted topic
> ---
>
> Key: KAFKA-6014
> URL: https://issues.apache.org/jira/browse/KAFKA-6014
> Project: Kafka
>  Issue Type: Bug
>Reporter: Onur Karaman
>Assignee: Jason Gustafson
>Priority: Major
>
> New consumer throws an unexpected KafkaException when trying to commit to a 
> topic that has been deleted. MirrorMaker.commitOffsets doesn't attempt to 
> catch the KafkaException and just kills the process. We didn't see this in 
> the old consumer because old consumer just silently drops failed offset 
> commits.
> I ran a quick experiment locally to prove the behavior. The experiment:
> 1. start up a single broker
> 2. create a single-partition topic t
> 3. create a new consumer that consumes topic t
> 4. make the consumer commit every few seconds
> 5. delete topic t
> 6. expect: KafkaException that kills the process.
> Here's my script:
> {code}
> package org.apache.kafka.clients.consumer;
> import org.apache.kafka.common.TopicPartition;
> import java.util.Collections;
> import java.util.List;
> import java.util.Properties;
> public class OffsetCommitTopicDeletionTest {
> public static void main(String[] args) throws InterruptedException {
> Properties props = new Properties();
> props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
> "localhost:9090");
> props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "g");
> props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
> "org.apache.kafka.common.serialization.ByteArrayDeserializer");
> props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
> "org.apache.kafka.common.serialization.ByteArrayDeserializer");
> props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
> KafkaConsumer kafkaConsumer = new 
> KafkaConsumer<>(props);
> TopicPartition partition = new TopicPartition("t", 0);
> List partitions = 
> Collections.singletonList(partition);
> kafkaConsumer.assign(partitions);
> while (true) {
> kafkaConsumer.commitSync(Collections.singletonMap(partition, new 
> OffsetAndMetadata(0, "")));
> Thread.sleep(1000);
> }
> }
> }
> {code}
> Here are the other commands:
> {code}
> > rm -rf /tmp/zookeeper/ /tmp/kafka-logs* logs*
> > ./gradlew clean jar
> > ./bin/zookeeper-server-start.sh config/zookeeper.properties
> > export LOG_DIR=logs0 && ./bin/kafka-server-start.sh 
> > config/server0.properties
> > ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic t 
> > --partitions 1 --replication-factor 1
> > ./bin/kafka-run-class.sh 
> > org.apache.kafka.clients.consumer.OffsetCommitTopicDeletionTest
> > ./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic t
> {code}
> Here is the output:
> {code}
> [2017-10-04 20:00:14,451] ERROR [Consumer clientId=consumer-1, groupId=g] 
> Offset commit failed on partition t-0 at offset 0: This server does not host 
> this topic-partition. 
> (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
> Exception in thread "main" org.apache.kafka.common.KafkaException: Partition 
> t-0 may not exist or user may not have Describe access to topic
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:789)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:734)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:808)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:788)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:506)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:353)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:268)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214)
>   at 
> 

  1   2   3   4   5   6   7   8   9   10   >