[jira] [Updated] (KAFKA-13619) zookeeper.sync.time.ms is no longer used

2022-01-26 Thread Tomonari Yamashita (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13619?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tomonari Yamashita updated KAFKA-13619:
---
Description: 
- zookeeper.sync.time.ms is no longer used. But it is present in the 
documentation (1)
 -- The implementation and documentation of zookeeper.sync.time.ms should be 
removed.
 -- As far as I can see, it was already out of use by v2.0.0.
 -- Although for a different purpose and old, there is a pull request that 
tried to remove it (2).
 

(1) 
[https://kafka.apache.org/documentation/#brokerconfigs_zookeeper.sync.time.ms]

(2) [https://github.com/apache/kafka/pull/3167]

  was:
- zookeeper.sync.time.ms is no longer used. But it is present in the 
documentation (1)
 -- The implementation and documentation of zookeeper.sync.time.ms should be 
removed.
 -- As far as I can see, it was already out of use by v2.0.0.
 -- Although for a different purpose, there is a pull request that tried to 
remove it (2).
 

(1) 
[https://kafka.apache.org/documentation/#brokerconfigs_zookeeper.sync.time.ms]

(2) https://github.com/apache/kafka/pull/3167


> zookeeper.sync.time.ms is no longer used
> 
>
> Key: KAFKA-13619
> URL: https://issues.apache.org/jira/browse/KAFKA-13619
> Project: Kafka
>  Issue Type: Bug
>  Components: core, documentation
>Affects Versions: 2.0.0, 3.1.0
>Reporter: Tomonari Yamashita
>Priority: Major
>
> - zookeeper.sync.time.ms is no longer used. But it is present in the 
> documentation (1)
>  -- The implementation and documentation of zookeeper.sync.time.ms should be 
> removed.
>  -- As far as I can see, it was already out of use by v2.0.0.
>  -- Although for a different purpose and old, there is a pull request that 
> tried to remove it (2).
>  
> (1) 
> [https://kafka.apache.org/documentation/#brokerconfigs_zookeeper.sync.time.ms]
> (2) [https://github.com/apache/kafka/pull/3167]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13619) zookeeper.sync.time.ms is no longer used

2022-01-26 Thread Tomonari Yamashita (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13619?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tomonari Yamashita updated KAFKA-13619:
---
Description: 
- zookeeper.sync.time.ms is no longer used. But it is present in the 
documentation (1)
 -- The implementation and documentation of zookeeper.sync.time.ms should be 
removed.
 -- As far as I can see, it was already out of use by v2.0.0.
 -- Although for a different purpose and base is old, there is a pull request 
that tried to remove it (2).
 

(1) 
[https://kafka.apache.org/documentation/#brokerconfigs_zookeeper.sync.time.ms]

(2) [https://github.com/apache/kafka/pull/3167]

  was:
- zookeeper.sync.time.ms is no longer used. But it is present in the 
documentation (1)
 -- The implementation and documentation of zookeeper.sync.time.ms should be 
removed.
 -- As far as I can see, it was already out of use by v2.0.0.
 -- Although for a different purpose and old, there is a pull request that 
tried to remove it (2).
 

(1) 
[https://kafka.apache.org/documentation/#brokerconfigs_zookeeper.sync.time.ms]

(2) [https://github.com/apache/kafka/pull/3167]


> zookeeper.sync.time.ms is no longer used
> 
>
> Key: KAFKA-13619
> URL: https://issues.apache.org/jira/browse/KAFKA-13619
> Project: Kafka
>  Issue Type: Bug
>  Components: core, documentation
>Affects Versions: 2.0.0, 3.1.0
>Reporter: Tomonari Yamashita
>Priority: Major
>
> - zookeeper.sync.time.ms is no longer used. But it is present in the 
> documentation (1)
>  -- The implementation and documentation of zookeeper.sync.time.ms should be 
> removed.
>  -- As far as I can see, it was already out of use by v2.0.0.
>  -- Although for a different purpose and base is old, there is a pull request 
> that tried to remove it (2).
>  
> (1) 
> [https://kafka.apache.org/documentation/#brokerconfigs_zookeeper.sync.time.ms]
> (2) [https://github.com/apache/kafka/pull/3167]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-13614) Leader replication quota is applied to consumer fetches

2022-01-26 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13614?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-13614.
-
Fix Version/s: 3.2.0
 Reviewer: David Jacot
   Resolution: Fixed

> Leader replication quota is applied to consumer fetches
> ---
>
> Key: KAFKA-13614
> URL: https://issues.apache.org/jira/browse/KAFKA-13614
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: David Mao
>Assignee: David Mao
>Priority: Major
> Fix For: 3.2.0
>
>
> in ReplicaManager.readFromLocalLog we check shouldLeaderThrottle regardless 
> of whether the read is coming from a consumer or follower broker. This 
> results in replication quota being applied to consumer fetches.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] dajac merged pull request #11714: KAFKA-13614: Don't apply leader replication quota to consumer fetches

2022-01-26 Thread GitBox


dajac merged pull request #11714:
URL: https://github.com/apache/kafka/pull/11714


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13523) Implement IQv2 support in global stores

2022-01-26 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13523?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-13523:
-
Affects Version/s: 3.2.0

> Implement IQv2 support in global stores
> ---
>
> Key: KAFKA-13523
> URL: https://issues.apache.org/jira/browse/KAFKA-13523
> Project: Kafka
>  Issue Type: Sub-task
>Affects Versions: 3.2.0
>Reporter: John Roesler
>Priority: Blocker
>
> Global stores pose one significant problem for IQv2: when they start up, they 
> skip the regular ingest pipeline and instead use the "restoration" pipeline 
> to read up until the current end offset. Then, they switch over to the 
> regular ingest pipeline.
> IQv2 position tracking expects to track the position of each record from the 
> input topic through the ingest pipeline and then get the position headers 
> through the restoration pipeline via the changelog topic. The fact that 
> global stores "restore" the input topic instead of ingesting it violates our 
> expectations.
> It has also caused other problems, so we may want to consider switching the 
> global store processing to use the normal paradigm rather than adding 
> special-case logic to track positions in global stores.
>  
> Note: there are two primary reasons that global stores behave this way:
>  # We can write in batches during restoration, so the I/O may be more 
> efficient
>  # The global thread does not transition to RUNNING state until it reaches 
> the (current) end of the input topic, which blocks other threads from joining 
> against it, thereby improving the time synchronization of global KTable joins.
> If we want to propose changing the bootstrapping pipeline for global threads, 
> we should have some kind of answer to these concerns.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] vvcephei commented on a change in pull request #11676: KAFKA-13605: checkpoint position in state stores

2022-01-26 Thread GitBox


vvcephei commented on a change in pull request #11676:
URL: https://github.com/apache/kafka/pull/11676#discussion_r793249432



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
##
@@ -85,8 +85,23 @@
  * @throws IllegalStateException If store gets registered after 
initialized is already finished
  * @throws StreamsException if the store's change log does not contain the 
partition
  */
+default void register(final StateStore store,
+  final StateRestoreCallback stateRestoreCallback) {
+register(store, stateRestoreCallback, null);
+}
+
+/**
+ * Register and possibly restores the specified storage engine.
+ *
+ * @param store the storage engine
+ * @param stateRestoreCallback the restoration callback logic for 
log-backed state stores upon restart
+ * @param checkpointCallback called to checkpoint position metatadata of 
state stores
+ *
+ * @throws IllegalStateException If store gets registered after 
initialized is already finished
+ * @throws StreamsException if the store's change log does not contain the 
partition
+ */
 void register(final StateStore store,
-  final StateRestoreCallback stateRestoreCallback);
+  final StateRestoreCallback stateRestoreCallback, final 
CheckpointCallback checkpointCallback);

Review comment:
   I've filed a 3.2.0 blocker to revisit this: 
https://issues.apache.org/jira/browse/KAFKA-13622




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13526) IQv2: Consider more generic logic for mapping between binary and typed queries

2022-01-26 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-13526:
-
Priority: Blocker  (was: Major)

> IQv2: Consider more generic logic for mapping between binary and typed queries
> --
>
> Key: KAFKA-13526
> URL: https://issues.apache.org/jira/browse/KAFKA-13526
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: John Roesler
>Priority: Blocker
>
> Right now, typed queries (like KeyQuery) need to be specially handled and 
> translated to their binary counterparts (like RawKeyQuery). This happens in 
> the Metered store layers, where the serdes are known. It is necessary because 
> lower store layers are only able to handle binary data (because they don't 
> know the serdes).
> This situation is not ideal, since the Metered store layers will grow to host 
> quite a bit of query handling and translation logic, because the relationship 
> between typed queries and binary counterparts is not obvious, and because we 
> can only automatically translate known query types. User-supplied queries and 
> stores will have to work things out using their a-priori knowledge of the 
> serdes.
>  
> One suggestion (from [~mjsax] ) is to come up with some kind of generic 
> "query mapping" API, which the Metered stores would use to translate back and 
> forth between typed and raw keys and values. Users would be able to supply 
> their own mappings along with their custom queries.
> Another option would be to have the Metered stores attach the serdes to the 
> query on the way down and then to the result on the way up. Then, the serdes 
> would be available in the bytes store (as part of the request) and to the 
> users when they get their results (as part of the response).
> Other options may also surface once we start playing with ideas.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13548) IQv2: revisit WindowKeyQuery and WindowRangeQuery

2022-01-26 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-13548:
-
Priority: Blocker  (was: Major)

> IQv2: revisit WindowKeyQuery and WindowRangeQuery
> -
>
> Key: KAFKA-13548
> URL: https://issues.apache.org/jira/browse/KAFKA-13548
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: John Roesler
>Priority: Blocker
>
> During discussion of KIP-806, there was a suggestion to refactor the queries 
> following a builder pattern so that we can compactly and flexibly specify 
> lower and upper bounds on the keys, window start times, and window end times.
> We should circle back and try to generalize the queries' interfaces before 
> the first release of IQv2.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13622) Revisit the complexity of position tracking in state stores

2022-01-26 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13622?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-13622:
-
Affects Version/s: 3.2.0

> Revisit the complexity of position tracking in state stores
> ---
>
> Key: KAFKA-13622
> URL: https://issues.apache.org/jira/browse/KAFKA-13622
> Project: Kafka
>  Issue Type: Sub-task
>Affects Versions: 3.2.0
>Reporter: John Roesler
>Priority: Blocker
>
> Currently, state store implementers have a significant burden to track 
> position correctly. They have to:
>  * update the position during all puts
>  * implement the RecordBatchingStateRestoreCallback and use the 
> {color:#00}ChangelogRecordDeserializationHelper to update the position 
> based on record headers{color}
>  * {color:#00}implement some mechanism to restore the position after a 
> restart if the store is persistent (such as supply a CommitCallback to write 
> the position to a local file and then read the file during init){color}
> {color:#00}[~guozhang] pointed out during review that this is probably 
> too much responsibility (and certainly too much opportunity for error). We 
> should see what we can do to simplify these responsibilities, if not 
> eliminate them entirely from the store implementer's scope of concern.
> {color}
>  
> {color:#00}See 
> https://github.com/apache/kafka/pull/11676#discussion_r790358058{color}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13541) Make IQv2 query/store interface type safe

2022-01-26 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-13541:
-
Affects Version/s: 3.2.0

> Make IQv2 query/store interface type safe
> -
>
> Key: KAFKA-13541
> URL: https://issues.apache.org/jira/browse/KAFKA-13541
> Project: Kafka
>  Issue Type: Sub-task
>Affects Versions: 3.2.0
>Reporter: Patrick Stuedi
>Assignee: Patrick Stuedi
>Priority: Blocker
>
> Currently the new IQv2 interface allows applications to query state stores 
> using subclasses of the Query type. Unfortunately there is currently no 
> way to check that the template type of the query matches the type of the 
> relevant store the query is executed on. As a consequence stores have to do a 
> set of unsafe casts.
> This ticket is to explore ways to make the query interface type safe where 
> only type mismatches are detected at compile time.  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13541) Make IQv2 query/store interface type safe

2022-01-26 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-13541:
-
Priority: Blocker  (was: Major)

> Make IQv2 query/store interface type safe
> -
>
> Key: KAFKA-13541
> URL: https://issues.apache.org/jira/browse/KAFKA-13541
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Patrick Stuedi
>Assignee: Patrick Stuedi
>Priority: Blocker
>
> Currently the new IQv2 interface allows applications to query state stores 
> using subclasses of the Query type. Unfortunately there is currently no 
> way to check that the template type of the query matches the type of the 
> relevant store the query is executed on. As a consequence stores have to do a 
> set of unsafe casts.
> This ticket is to explore ways to make the query interface type safe where 
> only type mismatches are detected at compile time.  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13523) Implement IQv2 support in global stores

2022-01-26 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13523?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-13523:
-
Priority: Blocker  (was: Major)

> Implement IQv2 support in global stores
> ---
>
> Key: KAFKA-13523
> URL: https://issues.apache.org/jira/browse/KAFKA-13523
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: John Roesler
>Priority: Blocker
>
> Global stores pose one significant problem for IQv2: when they start up, they 
> skip the regular ingest pipeline and instead use the "restoration" pipeline 
> to read up until the current end offset. Then, they switch over to the 
> regular ingest pipeline.
> IQv2 position tracking expects to track the position of each record from the 
> input topic through the ingest pipeline and then get the position headers 
> through the restoration pipeline via the changelog topic. The fact that 
> global stores "restore" the input topic instead of ingesting it violates our 
> expectations.
> It has also caused other problems, so we may want to consider switching the 
> global store processing to use the normal paradigm rather than adding 
> special-case logic to track positions in global stores.
>  
> Note: there are two primary reasons that global stores behave this way:
>  # We can write in batches during restoration, so the I/O may be more 
> efficient
>  # The global thread does not transition to RUNNING state until it reaches 
> the (current) end of the input topic, which blocks other threads from joining 
> against it, thereby improving the time synchronization of global KTable joins.
> If we want to propose changing the bootstrapping pipeline for global threads, 
> we should have some kind of answer to these concerns.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13548) IQv2: revisit WindowKeyQuery and WindowRangeQuery

2022-01-26 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-13548:
-
Affects Version/s: 3.2.0

> IQv2: revisit WindowKeyQuery and WindowRangeQuery
> -
>
> Key: KAFKA-13548
> URL: https://issues.apache.org/jira/browse/KAFKA-13548
> Project: Kafka
>  Issue Type: Sub-task
>Affects Versions: 3.2.0
>Reporter: John Roesler
>Priority: Blocker
>
> During discussion of KIP-806, there was a suggestion to refactor the queries 
> following a builder pattern so that we can compactly and flexibly specify 
> lower and upper bounds on the keys, window start times, and window end times.
> We should circle back and try to generalize the queries' interfaces before 
> the first release of IQv2.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13526) IQv2: Consider more generic logic for mapping between binary and typed queries

2022-01-26 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-13526:
-
Affects Version/s: 3.2.0

> IQv2: Consider more generic logic for mapping between binary and typed queries
> --
>
> Key: KAFKA-13526
> URL: https://issues.apache.org/jira/browse/KAFKA-13526
> Project: Kafka
>  Issue Type: Sub-task
>Affects Versions: 3.2.0
>Reporter: John Roesler
>Priority: Blocker
>
> Right now, typed queries (like KeyQuery) need to be specially handled and 
> translated to their binary counterparts (like RawKeyQuery). This happens in 
> the Metered store layers, where the serdes are known. It is necessary because 
> lower store layers are only able to handle binary data (because they don't 
> know the serdes).
> This situation is not ideal, since the Metered store layers will grow to host 
> quite a bit of query handling and translation logic, because the relationship 
> between typed queries and binary counterparts is not obvious, and because we 
> can only automatically translate known query types. User-supplied queries and 
> stores will have to work things out using their a-priori knowledge of the 
> serdes.
>  
> One suggestion (from [~mjsax] ) is to come up with some kind of generic 
> "query mapping" API, which the Metered stores would use to translate back and 
> forth between typed and raw keys and values. Users would be able to supply 
> their own mappings along with their custom queries.
> Another option would be to have the Metered stores attach the serdes to the 
> query on the way down and then to the result on the way up. Then, the serdes 
> would be available in the bytes store (as part of the request) and to the 
> users when they get their results (as part of the response).
> Other options may also surface once we start playing with ideas.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13554) Rename RangeQuery to KeyRangeQuery

2022-01-26 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13554?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-13554:
-
Affects Version/s: 3.2.0

> Rename RangeQuery to KeyRangeQuery
> --
>
> Key: KAFKA-13554
> URL: https://issues.apache.org/jira/browse/KAFKA-13554
> Project: Kafka
>  Issue Type: Sub-task
>Affects Versions: 3.2.0
>Reporter: John Roesler
>Assignee: Vicky Papavasileiou
>Priority: Major
>
> Just to avoid confusion wrt WindowRangeQuery



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13554) Rename RangeQuery to KeyRangeQuery

2022-01-26 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13554?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-13554:
-
Priority: Blocker  (was: Major)

> Rename RangeQuery to KeyRangeQuery
> --
>
> Key: KAFKA-13554
> URL: https://issues.apache.org/jira/browse/KAFKA-13554
> Project: Kafka
>  Issue Type: Sub-task
>Affects Versions: 3.2.0
>Reporter: John Roesler
>Assignee: Vicky Papavasileiou
>Priority: Blocker
>
> Just to avoid confusion wrt WindowRangeQuery



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13622) Revisit the complexity of position tracking in state stores

2022-01-26 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13622?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-13622:
-
Priority: Blocker  (was: Major)

> Revisit the complexity of position tracking in state stores
> ---
>
> Key: KAFKA-13622
> URL: https://issues.apache.org/jira/browse/KAFKA-13622
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: John Roesler
>Priority: Blocker
>
> Currently, state store implementers have a significant burden to track 
> position correctly. They have to:
>  * update the position during all puts
>  * implement the RecordBatchingStateRestoreCallback and use the 
> {color:#00}ChangelogRecordDeserializationHelper to update the position 
> based on record headers{color}
>  * {color:#00}implement some mechanism to restore the position after a 
> restart if the store is persistent (such as supply a CommitCallback to write 
> the position to a local file and then read the file during init){color}
> {color:#00}[~guozhang] pointed out during review that this is probably 
> too much responsibility (and certainly too much opportunity for error). We 
> should see what we can do to simplify these responsibilities, if not 
> eliminate them entirely from the store implementer's scope of concern.
> {color}
>  
> {color:#00}See 
> https://github.com/apache/kafka/pull/11676#discussion_r790358058{color}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13622) Revisit the complexity of position tracking in state stores

2022-01-26 Thread John Roesler (Jira)
John Roesler created KAFKA-13622:


 Summary: Revisit the complexity of position tracking in state 
stores
 Key: KAFKA-13622
 URL: https://issues.apache.org/jira/browse/KAFKA-13622
 Project: Kafka
  Issue Type: Sub-task
Reporter: John Roesler


Currently, state store implementers have a significant burden to track position 
correctly. They have to:
 * update the position during all puts
 * implement the RecordBatchingStateRestoreCallback and use the 
{color:#00}ChangelogRecordDeserializationHelper to update the position 
based on record headers{color}
 * {color:#00}implement some mechanism to restore the position after a 
restart if the store is persistent (such as supply a CommitCallback to write 
the position to a local file and then read the file during init){color}

{color:#00}[~guozhang] pointed out during review that this is probably too 
much responsibility (and certainly too much opportunity for error). We should 
see what we can do to simplify these responsibilities, if not eliminate them 
entirely from the store implementer's scope of concern.
{color}

 

{color:#00}See 
https://github.com/apache/kafka/pull/11676#discussion_r790358058{color}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-13608) Implement Position restoration for all in-memory state stores

2022-01-26 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-13608.
--
Resolution: Duplicate

> Implement Position restoration for all in-memory state stores
> -
>
> Key: KAFKA-13608
> URL: https://issues.apache.org/jira/browse/KAFKA-13608
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Vicky Papavasileiou
>Priority: Major
>
> In-memory state stores restore their state from the changelog (as opposed to 
> RocksDB stores that restore from disk). In-memory stores currently don't 
> handle restoring of the Position



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13608) Implement Position restoration for all in-memory state stores

2022-01-26 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17482866#comment-17482866
 ] 

John Roesler commented on KAFKA-13608:
--

Ah, I didn't realize there was a ticket for this. I just happened to take care 
of it while making sure that the testing for 
[https://github.com/apache/kafka/pull/11676] was complete.

> Implement Position restoration for all in-memory state stores
> -
>
> Key: KAFKA-13608
> URL: https://issues.apache.org/jira/browse/KAFKA-13608
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Vicky Papavasileiou
>Priority: Major
>
> In-memory state stores restore their state from the changelog (as opposed to 
> RocksDB stores that restore from disk). In-memory stores currently don't 
> handle restoring of the Position



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-13524) IQv2: Implement KeyQuery from the RecordCache

2022-01-26 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13524?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-13524.
--
Resolution: Fixed

> IQv2: Implement KeyQuery from the RecordCache
> -
>
> Key: KAFKA-13524
> URL: https://issues.apache.org/jira/browse/KAFKA-13524
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: John Roesler
>Assignee: Vicky Papavasileiou
>Priority: Major
>
> The Record Cache in Kafka Streams is more properly termed a write buffer, 
> since it only caches writes, not reads, and its intent is to buffer the 
> writes before flushing them in bulk into lower store layers.
> Unlike scan-type queries, which require scanning both the record cache and 
> the underlying store and collating the results, the KeyQuery (and any other 
> point lookup) can straightforwardly be served from the record cache if it is 
> buffered or fall through to the underlying store if not.
> In contrast to scan-type operations, benchmarks reveal that key-based cache 
> reads are faster than always skipping the cache as well.
> Therefore, it makes sense to implement a handler in the CachingKeyValueStore 
> for the KeyQuery specifically in order to serve fresher key-based lookups. 
> Scan queries may also be useful, but their less flattering performance 
> profile makes it reasonable to leave them for follow-on work.
> We could add an option to disable cache reads on the KeyQuery, but since they 
> seem to be always better, I'm leaning toward just unilaterally serving cached 
> records if they exist.
>  
> I did a quick POC of this: 
> [https://github.com/vvcephei/kafka/pull/new/iqv2-poc-cache-queries]
>  
> The internal code of the caching stores should be refactored to share logic 
> with the regular store methods. Scan queries will be more complicated, since 
> they require merging the cache with the wrapped result.
> There is a bug related to that non-timestamped-store-serde hack (see the 
> failing test when you run IQv2StoreIntegrationTest). Even though the inner 
> store is not timestamped, the cache returns a timestamped value. We'll have 
> to discuss options to fix it.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (KAFKA-13524) IQv2: Implement KeyQuery from the RecordCache

2022-01-26 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13524?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler reassigned KAFKA-13524:


Assignee: Vicky Papavasileiou

> IQv2: Implement KeyQuery from the RecordCache
> -
>
> Key: KAFKA-13524
> URL: https://issues.apache.org/jira/browse/KAFKA-13524
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: John Roesler
>Assignee: Vicky Papavasileiou
>Priority: Major
>
> The Record Cache in Kafka Streams is more properly termed a write buffer, 
> since it only caches writes, not reads, and its intent is to buffer the 
> writes before flushing them in bulk into lower store layers.
> Unlike scan-type queries, which require scanning both the record cache and 
> the underlying store and collating the results, the KeyQuery (and any other 
> point lookup) can straightforwardly be served from the record cache if it is 
> buffered or fall through to the underlying store if not.
> In contrast to scan-type operations, benchmarks reveal that key-based cache 
> reads are faster than always skipping the cache as well.
> Therefore, it makes sense to implement a handler in the CachingKeyValueStore 
> for the KeyQuery specifically in order to serve fresher key-based lookups. 
> Scan queries may also be useful, but their less flattering performance 
> profile makes it reasonable to leave them for follow-on work.
> We could add an option to disable cache reads on the KeyQuery, but since they 
> seem to be always better, I'm leaning toward just unilaterally serving cached 
> records if they exist.
>  
> I did a quick POC of this: 
> [https://github.com/vvcephei/kafka/pull/new/iqv2-poc-cache-queries]
>  
> The internal code of the caching stores should be refactored to share logic 
> with the regular store methods. Scan queries will be more complicated, since 
> they require merging the cache with the wrapped result.
> There is a bug related to that non-timestamped-store-serde hack (see the 
> failing test when you run IQv2StoreIntegrationTest). Even though the inner 
> store is not timestamped, the cache returns a timestamped value. We'll have 
> to discuss options to fix it.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13605) Checkpoint position in state stores

2022-01-26 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13605?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-13605:
-
Parent: KAFKA-13479
Issue Type: Sub-task  (was: Improvement)

> Checkpoint position in state stores
> ---
>
> Key: KAFKA-13605
> URL: https://issues.apache.org/jira/browse/KAFKA-13605
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Patrick Stuedi
>Assignee: Patrick Stuedi
>Priority: Critical
>
> There are cases in which a state store neither has an in-memory position 
> built up nor has it gone through the state restoration process. If a store is 
> persistent (i.e., RocksDB), and we stop and restart Streams, we will have 
> neither of those continuity mechanisms available. This ticket is to fill in 
> that gap.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] vvcephei commented on a change in pull request #11676: KAFKA-13605: checkpoint position in state stores

2022-01-26 Thread GitBox


vvcephei commented on a change in pull request #11676:
URL: https://github.com/apache/kafka/pull/11676#discussion_r793216634



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
##
@@ -264,8 +267,12 @@ public void init(final ProcessorContext context,
 
 segments.openExisting(this.context, observedStreamTime);
 
+this.positionCheckpointFile = new File(context.stateDir(), this.name() 
+ ".position");

Review comment:
   Thanks! @patrickstuedi consolidated the logic of actually reading the 
position checkpoints into StoreQueryUtils, as you suggested. Also, after some 
cleanup, we only have two instances of this now, in 
`AbstractRocksDBSegmentedBytesStore` and in `RocksDBStore`. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #11676: KAFKA-13605: checkpoint position in state stores

2022-01-26 Thread GitBox


vvcephei commented on a change in pull request #11676:
URL: https://github.com/apache/kafka/pull/11676#discussion_r793214036



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
##
@@ -152,4 +153,11 @@ default void init(final StateStoreContext context, final 
StateStore root) {
 // If a store doesn't implement a query handler, then all queries are 
unknown.
 return QueryResult.forUnknownQueryType(query, this);
 }
+
+/**
+ * This state store's position
+ */
+default Position getPosition() {
+throw new UnsupportedOperationException("Not implemented");

Review comment:
   Note: this was added in another PR (not sure why it was also in this 
one). By adding this to the StateStore interface @vpapavas was able to 
dramatically simplify the change-logging store layers. However, it may be 
possible to avoid this as part of the same follow-on revision we discussed 
above.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #11676: KAFKA-13605: checkpoint position in state stores

2022-01-26 Thread GitBox


vvcephei commented on a change in pull request #11676:
URL: https://github.com/apache/kafka/pull/11676#discussion_r793212425



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
##
@@ -85,8 +85,23 @@
  * @throws IllegalStateException If store gets registered after 
initialized is already finished
  * @throws StreamsException if the store's change log does not contain the 
partition
  */
+default void register(final StateStore store,
+  final StateRestoreCallback stateRestoreCallback) {
+register(store, stateRestoreCallback, null);
+}
+
+/**
+ * Register and possibly restores the specified storage engine.
+ *
+ * @param store the storage engine
+ * @param stateRestoreCallback the restoration callback logic for 
log-backed state stores upon restart
+ * @param checkpointCallback called to checkpoint position metatadata of 
state stores
+ *
+ * @throws IllegalStateException If store gets registered after 
initialized is already finished
+ * @throws StreamsException if the store's change log does not contain the 
partition
+ */
 void register(final StateStore store,
-  final StateRestoreCallback stateRestoreCallback);
+  final StateRestoreCallback stateRestoreCallback, final 
CheckpointCallback checkpointCallback);

Review comment:
   Thanks for that naming suggestion, @guozhangwang ! I agree that "commit" 
is a much better term than "checkpoint".
   
   Regarding your larger point, I agree that there's a burden on store 
maintainers to track the position correctly and to integrate correctly with 
state restoration and checkpointing. I think that we can probably address the 
burden of this API without sacrificing the precise correctness of position 
tracking. As you mentioned, it'll be easier to consider an alternative as a 
follow-on PR instead of continuing to revise this one, though. I'll file a 
ticket shortly to circle back on this issue before the next release.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #11676: KAFKA-13605: checkpoint position in state stores

2022-01-26 Thread GitBox


vvcephei commented on a change in pull request #11676:
URL: https://github.com/apache/kafka/pull/11676#discussion_r793198150



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
##
@@ -44,9 +43,12 @@
 public void init(final StateStoreContext context, final StateStore root) {
 super.init(context, root);
 this.stateStoreContext = context;
+
+this.position = super.getPosition();

Review comment:
   I see. While digging through the code, I noticed that it's actually 
`RocksDB{Session,Windowed}Store`, which wraps SegmentedBytesStore, which wraps 
the actual segments (RocksDBStores).
   
   It turns out that SegmentedBytesStore is where we're tracking the position, 
and that's also where we do restoration and checkpointing, so in reality, we 
just don't need this logic in `RocksDB{Session,Window}Store` at all!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #11676: KAFKA-13605: checkpoint position in state stores

2022-01-26 Thread GitBox


vvcephei commented on a change in pull request #11676:
URL: https://github.com/apache/kafka/pull/11676#discussion_r793196902



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
##
@@ -85,8 +85,23 @@
  * @throws IllegalStateException If store gets registered after 
initialized is already finished
  * @throws StreamsException if the store's change log does not contain the 
partition
  */
+default void register(final StateStore store,

Review comment:
   I see what you're getting at, but the whole point of the deprecation is 
that users should be migrating to the new API asap. We're going to drop that 
deprecated init pretty soon anyway, so I'd rather be a bit more hard-line about 
not adding new things to it in the mean time.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] Kvicii commented on pull request #11715: KAFKA-13618;`Exptected` rename to `Expected`

2022-01-26 Thread GitBox


Kvicii commented on pull request #11715:
URL: https://github.com/apache/kafka/pull/11715#issuecomment-1022676092


   @mimaison  OK


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on a change in pull request #11382: KAFKA-13348: Allow Source Tasks to Handle Producer Exceptions

2022-01-26 Thread GitBox


C0urante commented on a change in pull request #11382:
URL: https://github.com/apache/kafka/pull/11382#discussion_r793112244



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##
@@ -364,9 +365,16 @@ private boolean sendRecords() {
 producerRecord,
 (recordMetadata, e) -> {
 if (e != null) {
-log.error("{} failed to send record to {}: ", 
WorkerSourceTask.this, topic, e);
-log.trace("{} Failed record: {}", 
WorkerSourceTask.this, preTransformRecord);
-producerSendException.compareAndSet(null, e);
+if 
(retryWithToleranceOperator.getErrorToleranceType().equals(ToleranceType.ALL)) {
+// executeFailed here allows the use of 
existing logging infrastructure/configuration
+
retryWithToleranceOperator.executeFailed(Stage.KAFKA_PRODUCE, 
WorkerSourceTask.class,
+preTransformRecord, e);
+commitTaskRecord(preTransformRecord, null);

Review comment:
   We should not be logging at `ERROR` level for every single record if we 
aren't failing the task unless the user has explicitly enabled this by setting 
`errors.log.enable` to `true` in their connector config.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] TheKnowles commented on pull request #11382: KAFKA-13348: Allow Source Tasks to Handle Producer Exceptions

2022-01-26 Thread GitBox


TheKnowles commented on pull request #11382:
URL: https://github.com/apache/kafka/pull/11382#issuecomment-1022663365


   > Thanks @TheKnowles for the PR. I've made a first pass and left a few 
comments.
   
   @mimaison Thank you for reviewing. I've replied to each comment above and 
pushed changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] TheKnowles commented on a change in pull request #11382: KAFKA-13348: Allow Source Tasks to Handle Producer Exceptions

2022-01-26 Thread GitBox


TheKnowles commented on a change in pull request #11382:
URL: https://github.com/apache/kafka/pull/11382#discussion_r793104975



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
##
@@ -111,6 +111,23 @@ public RetryWithToleranceOperator(long errorRetryTimeout, 
long errorMaxDelayInMi
 return errantRecordFuture;
 }
 
+public synchronized Future executeFailed(Stage stage, Class 
executingClass,
+   SourceRecord sourceRecord,
+   Throwable error) {
+
+markAsFailed();
+context.sourceRecord(sourceRecord);
+context.currentContext(stage, executingClass);
+context.error(error);
+errorHandlingMetrics.recordFailure();
+Future errantRecordFuture = context.report();
+if (!withinToleranceLimits()) {
+errorHandlingMetrics.recordError();
+throw new ConnectException("Tolerance exceeded in error handler", 
error);

Review comment:
   I added some context to the string error message denoting it was a 
Source Worker. I am open to suggestions on how verbose this message should be.

##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
##
@@ -222,6 +222,13 @@ private void createWorkerTask() {
 createWorkerTask(TargetState.STARTED);
 }
 
+private void createWorkerTaskWithErrorToleration() {

Review comment:
   +1 I have refactored the constructors to be cleaner with various 
parameter lists.

##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
##
@@ -815,6 +822,32 @@ public void testSendRecordsTaskCommitRecordFail() throws 
Exception {
 PowerMock.verifyAll();
 }
 
+@Test
+public void testSourceTaskIgnoresProducerException() throws Exception {
+createWorkerTaskWithErrorToleration();
+expectTopicCreation(TOPIC);
+
+// send two records
+// record 1 will succeed
+// record 2 will invoke the producer's failure callback, but ignore 
the exception via retryOperator
+// and no ConnectException will be thrown
+SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+
+expectSendRecordOnce();
+expectSendRecordProducerCallbackFail();
+sourceTask.commitRecord(EasyMock.anyObject(SourceRecord.class), 
EasyMock.anyObject(RecordMetadata.class));

Review comment:
   +1




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] TheKnowles commented on a change in pull request #11382: KAFKA-13348: Allow Source Tasks to Handle Producer Exceptions

2022-01-26 Thread GitBox


TheKnowles commented on a change in pull request #11382:
URL: https://github.com/apache/kafka/pull/11382#discussion_r793104806



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##
@@ -364,9 +365,16 @@ private boolean sendRecords() {
 producerRecord,
 (recordMetadata, e) -> {
 if (e != null) {
-log.error("{} failed to send record to {}: ", 
WorkerSourceTask.this, topic, e);
-log.trace("{} Failed record: {}", 
WorkerSourceTask.this, preTransformRecord);
-producerSendException.compareAndSet(null, e);
+if 
(retryWithToleranceOperator.getErrorToleranceType().equals(ToleranceType.ALL)) {
+// executeFailed here allows the use of 
existing logging infrastructure/configuration
+
retryWithToleranceOperator.executeFailed(Stage.KAFKA_PRODUCE, 
WorkerSourceTask.class,
+preTransformRecord, e);
+commitTaskRecord(preTransformRecord, null);

Review comment:
   Previously it was suggested to have the tolerance operator handle via 
the logging report. I would personally find it useful to have it in the connect 
log regardless of tolerance error logging configuration. I've moved the 
error/debug log lines to above the tolerance check to log in all instances.

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
##
@@ -229,6 +246,13 @@ public synchronized boolean withinToleranceLimits() {
 }
 }
 
+// For source connectors that want to skip kafka producer errors.
+// They cannot use withinToleranceLimits() as no failure may have actually 
occurred prior to the producer failing
+// to write to kafka.
+public synchronized ToleranceType getErrorToleranceType() {

Review comment:
   It does not. Type is immutable and thread safe. I had dug through the 
ticket that retroactively made this class thread safe and it seemed like a good 
idea at the time to slap a synchronized on it to match the rest of the class, 
but is not necessary at all. Removed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] TheKnowles commented on a change in pull request #11382: KAFKA-13348: Allow Source Tasks to Handle Producer Exceptions

2022-01-26 Thread GitBox


TheKnowles commented on a change in pull request #11382:
URL: https://github.com/apache/kafka/pull/11382#discussion_r793104719



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##
@@ -364,9 +365,16 @@ private boolean sendRecords() {
 producerRecord,
 (recordMetadata, e) -> {
 if (e != null) {
-log.error("{} failed to send record to {}: ", 
WorkerSourceTask.this, topic, e);
-log.trace("{} Failed record: {}", 
WorkerSourceTask.this, preTransformRecord);
-producerSendException.compareAndSet(null, e);
+if 
(retryWithToleranceOperator.getErrorToleranceType().equals(ToleranceType.ALL)) {

Review comment:
   +1




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on pull request #11345: KAFKA-13603: Allow the empty active segment to have missing offset index during recovery

2022-01-26 Thread GitBox


junrao commented on pull request #11345:
URL: https://github.com/apache/kafka/pull/11345#issuecomment-1022654522


   @ccding : There is a conflict now. Could you rebase?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] splett2 commented on pull request #11671: KAFKA-13388; Kafka Producer nodes stuck in CHECKING_API_VERSIONS

2022-01-26 Thread GitBox


splett2 commented on pull request #11671:
URL: https://github.com/apache/kafka/pull/11671#issuecomment-1022640040


   thanks for fixing this!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13621) Resign leader on network partition

2022-01-26 Thread Jason Gustafson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13621?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17482745#comment-17482745
 ] 

Jason Gustafson commented on KAFKA-13621:
-

[~guozhang] I seem to recall that we had considered logic like this at some 
point. Do you recall if we had a strong reason to reject it?

> Resign leader on network partition
> --
>
> Key: KAFKA-13621
> URL: https://issues.apache.org/jira/browse/KAFKA-13621
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jose Armando Garcia Sancio
>Assignee: Jose Armando Garcia Sancio
>Priority: Major
>
> h1. Motivation
> If the current leader A at epoch X gets partition from the rest of the 
> quorum, quorum voter A will stay leader at epoch X. This happens because 
> voter A will never receive an request from the rest of the voters increasing 
> the epoch. These requests that typically increase the epoch of past leaders 
> are BeginQuorumEpoch and Vote.
> In addition if voter A (leader at epoch X) doesn't get partition from the 
> rest of the brokers (observer in the KRaft protocol) the brokers will never 
> learn about the new quorum leader. This happens because 1. observers learn 
> about the leader from the Fetch response and 2. observer send a Fetch request 
> to a random leader if the Fetch request times out.
> Neither of these two scenarios will cause the broker to send a request to a 
> different voter because the leader at epoch X will never send a different 
> leader in the response and the broker will never send a Fetch request to a 
> different voter because the Fetch request will never timeout.
> h1. Proposed Changes
> In this scenario the A, the leader at epoch X, will stop receiving Fetch 
> request from the majority of the voters. Voter A should resign as leader if 
> the Fetch request from the majority of the voters is old enough. A reasonable 
> value for "old enough" is the Fetch timeout value.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] hachikuji merged pull request #11693: MINOR: Convert LogLoader into a class

2022-01-26 Thread GitBox


hachikuji merged pull request #11693:
URL: https://github.com/apache/kafka/pull/11693


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13612) internal topics won't be created in metadataRequest when auto.create.topics.enable=false

2022-01-26 Thread Jason Gustafson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13612?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17482729#comment-17482729
 ] 

Jason Gustafson commented on KAFKA-13612:
-

[~dnwe] I agree with you that computing consumer lag is painful. However, keep 
in mind that even the `Fetch` API uses polling, and the requests must be sent 
to all partitions of __consumer_offsets (which for most clusters means fetching 
on all brokers). It is fair to say though that this approach gives you only the 
deltas, which is more efficient than always fetching all offsets. On the other 
hand, you still need to periodically send ListOffsets to all of the partitions 
that have committed offsets to get the latest end offset, so the difference 
might not be quite so dramatic. 

The problem is that reading from __consumer_offsets is not a supported API, so 
users have to deal with breakage. There's no guarantee that we'll even continue 
to use this topic for committed offsets in the future. Many users run into 
hotspot issues with the current hashing approach, so there is some interest in 
changing it. My take is that this there is indeed a gap in the consumer 
administrative APIs and we should try to address it with an API that can 
actually offer compatibility guarantees. For example, perhaps we could 
introduce an OffsetQuery API or something like that to make it easier to get 
the latest offset commits without respect to a specific group. 

 

> internal topics won't be created in metadataRequest when 
> auto.create.topics.enable=false
> 
>
> Key: KAFKA-13612
> URL: https://issues.apache.org/jira/browse/KAFKA-13612
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.8.0, 3.0.0
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
>
> In KAFKA-9751, when create internal topics through FindCoordinator or 
> Metadata request, we route the topic creation request to the controller 
> instead of handling by itself. We change logic in 
> `KafkaApis#getTopicMetadata`, and make the internal topic won't get created 
> when "auto.create.topics.enable=false`. 
> h4.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13621) Resign leader on network partition

2022-01-26 Thread Jose Armando Garcia Sancio (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13621?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jose Armando Garcia Sancio updated KAFKA-13621:
---
Summary: Resign leader on network partition  (was: Resign leader on 
partition)

> Resign leader on network partition
> --
>
> Key: KAFKA-13621
> URL: https://issues.apache.org/jira/browse/KAFKA-13621
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jose Armando Garcia Sancio
>Assignee: Jose Armando Garcia Sancio
>Priority: Major
>
> h1. Motivation
> If the current leader A at epoch X gets partition from the rest of the 
> quorum, quorum voter A will stay leader at epoch X. This happens because 
> voter A will never receive an request from the rest of the voters increasing 
> the epoch. These requests that typically increase the epoch of past leaders 
> are BeginQuorumEpoch and Vote.
> In addition if voter A (leader at epoch X) doesn't get partition from the 
> rest of the brokers (observer in the KRaft protocol) the brokers will never 
> learn about the new quorum leader. This happens because 1. observers learn 
> about the leader from the Fetch response and 2. observer send a Fetch request 
> to a random leader if the Fetch request times out.
> Neither of these two scenarios will cause the broker to send a request to a 
> different voter because the leader at epoch X will never send a different 
> leader in the response and the broker will never send a Fetch request to a 
> different voter because the Fetch request will never timeout.
> h1. Proposed Changes
> In this scenario the A, the leader at epoch X, will stop receiving Fetch 
> request from the majority of the voters. Voter A should resign as leader if 
> the Fetch request from the majority of the voters is old enough. A reasonable 
> value for "old enough" is the Fetch timeout value.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13621) Resign leader on partition

2022-01-26 Thread Jose Armando Garcia Sancio (Jira)
Jose Armando Garcia Sancio created KAFKA-13621:
--

 Summary: Resign leader on partition
 Key: KAFKA-13621
 URL: https://issues.apache.org/jira/browse/KAFKA-13621
 Project: Kafka
  Issue Type: Sub-task
Reporter: Jose Armando Garcia Sancio
Assignee: Jose Armando Garcia Sancio


h1. Motivation

If the current leader A at epoch X gets partition from the rest of the quorum, 
quorum voter A will stay leader at epoch X. This happens because voter A will 
never receive an request from the rest of the voters increasing the epoch. 
These requests that typically increase the epoch of past leaders are 
BeginQuorumEpoch and Vote.

In addition if voter A (leader at epoch X) doesn't get partition from the rest 
of the brokers (observer in the KRaft protocol) the brokers will never learn 
about the new quorum leader. This happens because 1. observers learn about the 
leader from the Fetch response and 2. observer send a Fetch request to a random 
leader if the Fetch request times out.

Neither of these two scenarios will cause the broker to send a request to a 
different voter because the leader at epoch X will never send a different 
leader in the response and the broker will never send a Fetch request to a 
different voter because the Fetch request will never timeout.
h1. Proposed Changes

In this scenario the A, the leader at epoch X, will stop receiving Fetch 
request from the majority of the voters. Voter A should resign as leader if the 
Fetch request from the majority of the voters is old enough. A reasonable value 
for "old enough" is the Fetch timeout value.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13620) The request handler metric name for ControllerApis should be different than KafkaApis

2022-01-26 Thread Colin McCabe (Jira)
Colin McCabe created KAFKA-13620:


 Summary: The request handler metric name for ControllerApis should 
be different than KafkaApis
 Key: KAFKA-13620
 URL: https://issues.apache.org/jira/browse/KAFKA-13620
 Project: Kafka
  Issue Type: Bug
Reporter: Colin McCabe






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] mimaison commented on pull request #11599: KAFKA-13527: Add top-level error code field to DescribeLogDirsResponse

2022-01-26 Thread GitBox


mimaison commented on pull request #11599:
URL: https://github.com/apache/kafka/pull/11599#issuecomment-1022438866


   @dajac Do you want to take another look? Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-13618) BatchAccumulator `Exptected` rename to `Expected`

2022-01-26 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison resolved KAFKA-13618.

Fix Version/s: 3.2.0
   Resolution: Fixed

> BatchAccumulator `Exptected` rename to `Expected`
> -
>
> Key: KAFKA-13618
> URL: https://issues.apache.org/jira/browse/KAFKA-13618
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Kvicii.Yu
>Assignee: Kvicii.Yu
>Priority: Major
> Fix For: 3.2.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] mimaison merged pull request #11715: KAFKA-13618;`Exptected` rename to `Expected`

2022-01-26 Thread GitBox


mimaison merged pull request #11715:
URL: https://github.com/apache/kafka/pull/11715


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on pull request #11715: KAFKA-13618;`Exptected` rename to `Expected`

2022-01-26 Thread GitBox


mimaison commented on pull request #11715:
URL: https://github.com/apache/kafka/pull/11715#issuecomment-1022424951


   Thanks @Kvicii 
   For such small changes you don't need to create a JIRA. Just prefix your 
commit with `MINOR: `.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #11716: MINOR: Add 3.0 to streams system tests

2022-01-26 Thread GitBox


dajac commented on pull request #11716:
URL: https://github.com/apache/kafka/pull/11716#issuecomment-1022337985


   Run: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4762/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac opened a new pull request #11716: MINOR: Add 3.0 to streams system tests

2022-01-26 Thread GitBox


dajac opened a new pull request #11716:
URL: https://github.com/apache/kafka/pull/11716


   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei merged pull request #11682: KAFKA-13524: Add IQv2 query handling to the caching layer

2022-01-26 Thread GitBox


vvcephei merged pull request #11682:
URL: https://github.com/apache/kafka/pull/11682


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #11682: KAFKA-13524: Add IQv2 query handling to the caching layer

2022-01-26 Thread GitBox


vvcephei commented on pull request #11682:
URL: https://github.com/apache/kafka/pull/11682#issuecomment-1022316812


   Unrelated test failures:
   ```
   Build / ARM / 
kafka.server.ReplicaManagerTest.testDeltaFromLeaderToFollower()
   Build / JDK 8 and Scala 2.12 / kafka.admin.LeaderElectionCommandTest.[1] 
Type=Raft, Name=testElectionResultOutput, Security=PLAINTEXT
   Build / JDK 8 and Scala 2.12 / 
kafka.api.PlaintextAdminIntegrationTest.testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords()
   Build / JDK 8 and Scala 2.12 / 
kafka.api.TransactionsTest.testAbortTransactionTimeout()
   Build / JDK 17 and Scala 2.13 / 
integration.kafka.server.FetchRequestBetweenDifferentIbpTest.testControllerOldToNewIBP()
   Build / JDK 17 and Scala 2.13 / 
kafka.admin.LeaderElectionCommandTest.[1] Type=Raft, Name=testTopicPartition, 
Security=PLAINTEXT
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13619) zookeeper.sync.time.ms is no longer used

2022-01-26 Thread Tomonari Yamashita (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13619?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tomonari Yamashita updated KAFKA-13619:
---
Description: 
- zookeeper.sync.time.ms is no longer used. But it is present in the 
documentation (1)
 -- The implementation and documentation of zookeeper.sync.time.ms should be 
removed.
 -- As far as I can see, it was already out of use by v2.0.0.
 -- Although for a different purpose, there is a pull request that tried to 
remove it (2).
 

(1) 
[https://kafka.apache.org/documentation/#brokerconfigs_zookeeper.sync.time.ms]

(2) https://github.com/apache/kafka/pull/3167

  was:
- zookeeper.sync.time.ms is no longer used. But it is present in the 
documentation(1)
 -- The implementation and documentation of zookeeper.sync.time.ms should be 
removed.
 -- FYI, as far as I can see, it was already out of use by v2.0.0.
 

(1) 
[https://kafka.apache.org/documentation/#brokerconfigs_zookeeper.sync.time.ms]


> zookeeper.sync.time.ms is no longer used
> 
>
> Key: KAFKA-13619
> URL: https://issues.apache.org/jira/browse/KAFKA-13619
> Project: Kafka
>  Issue Type: Bug
>  Components: core, documentation
>Affects Versions: 2.0.0, 3.1.0
>Reporter: Tomonari Yamashita
>Priority: Major
>
> - zookeeper.sync.time.ms is no longer used. But it is present in the 
> documentation (1)
>  -- The implementation and documentation of zookeeper.sync.time.ms should be 
> removed.
>  -- As far as I can see, it was already out of use by v2.0.0.
>  -- Although for a different purpose, there is a pull request that tried to 
> remove it (2).
>  
> (1) 
> [https://kafka.apache.org/documentation/#brokerconfigs_zookeeper.sync.time.ms]
> (2) https://github.com/apache/kafka/pull/3167



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] showuon commented on a change in pull request #11691: KAFKA-13598: enable idempotence producer by default and validate the configs

2022-01-26 Thread GitBox


showuon commented on a change in pull request #11691:
URL: https://github.com/apache/kafka/pull/11691#discussion_r792663973



##
File path: 
core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
##
@@ -352,20 +359,28 @@ abstract class EndToEndAuthorizationTest extends 
IntegrationTestHarness with Sas
 // Verify successful produce/consume/describe on another topic using the 
same producer, consumer and adminClient
 val topic2 = "topic2"
 val tp2 = new TopicPartition(topic2, 0)
+
 setReadAndWriteAcls(tp2)
-sendRecords(producer, numRecords, tp2)
+// in idempotence producer, we need to create another producer because the 
previous one is in FATEL_ERROR state (due to authorization error)

Review comment:
   I think that's how it is designed. I added more comment for it to make 
it clear:
   `// // in idempotence producer, we need to create another producer 
because the previous one is in FATEL_ERROR state (due to authorization error)`
   `// If the transaction state in FATEL_ERROR, it'll never transit to other 
state. check TransactionManager#isTransitionValid for detail`
   
   Thank you.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13619) zookeeper.sync.time.ms is no longer used

2022-01-26 Thread Tomonari Yamashita (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13619?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tomonari Yamashita updated KAFKA-13619:
---
Component/s: core
 documentation

> zookeeper.sync.time.ms is no longer used
> 
>
> Key: KAFKA-13619
> URL: https://issues.apache.org/jira/browse/KAFKA-13619
> Project: Kafka
>  Issue Type: Bug
>  Components: core, documentation
>Affects Versions: 2.0.0, 3.1.0
>Reporter: Tomonari Yamashita
>Priority: Major
>
> - zookeeper.sync.time.ms is no longer used. But it is present in the 
> documentation(1)
>  -- The implementation and documentation of zookeeper.sync.time.ms should be 
> removed.
>  -- FYI, as far as I can see, it was already out of use by v2.0.0.
>  
> (1) 
> [https://kafka.apache.org/documentation/#brokerconfigs_zookeeper.sync.time.ms]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13619) zookeeper.sync.time.ms is no longer used

2022-01-26 Thread Tomonari Yamashita (Jira)
Tomonari Yamashita created KAFKA-13619:
--

 Summary: zookeeper.sync.time.ms is no longer used
 Key: KAFKA-13619
 URL: https://issues.apache.org/jira/browse/KAFKA-13619
 Project: Kafka
  Issue Type: Bug
Affects Versions: 3.1.0, 2.0.0
Reporter: Tomonari Yamashita


- zookeeper.sync.time.ms is no longer used. But it is present in the 
documentation(1)
 -- The implementation and documentation of zookeeper.sync.time.ms should be 
removed.
 -- FYI, as far as I can see, it was already out of use by v2.0.0.
 

(1) 
[https://kafka.apache.org/documentation/#brokerconfigs_zookeeper.sync.time.ms]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] showuon commented on a change in pull request #11691: KAFKA-13598: enable idempotence producer by default and validate the configs

2022-01-26 Thread GitBox


showuon commented on a change in pull request #11691:
URL: https://github.com/apache/kafka/pull/11691#discussion_r792658808



##
File path: 
core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
##
@@ -2383,6 +2384,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   private def buildTransactionalProducer(): KafkaProducer[Array[Byte], 
Array[Byte]] = {
 producerConfig.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, 
transactionalId)
+producerConfig.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, 
"true")

Review comment:
   Yes, because at the beginning of the `AuthorizerIntegrationTest` test 
suite, the idempotence is disabled.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11691: KAFKA-13598: enable idempotence producer by default and validate the configs

2022-01-26 Thread GitBox


showuon commented on a change in pull request #11691:
URL: https://github.com/apache/kafka/pull/11691#discussion_r792657463



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
##
@@ -170,6 +170,7 @@ public void configure(final WorkerConfig config) {
 producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
 producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
 producerProps.put(ProducerConfig.RETRIES_CONFIG, 0); // we handle 
retries in this class
+producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false); // 
disable idempotence since retries is force to 0

Review comment:
   Yes, it might be another PR (or KIP) to enable it in Connect.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11691: KAFKA-13598: enable idempotence producer by default and validate the configs

2022-01-26 Thread GitBox


showuon commented on a change in pull request #11691:
URL: https://github.com/apache/kafka/pull/11691#discussion_r792653054



##
File path: 
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
##
@@ -24,14 +24,15 @@
 import org.slf4j.LoggerFactory;
 import org.apache.kafka.common.config.provider.ConfigProvider;
 
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.HashSet;
+import java.util.HashMap;
+import java.util.Objects;
 import java.util.TreeMap;
+import java.util.ArrayList;

Review comment:
   Weird, not sure why intellij "help" me reorder it. Updated!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] Kvicii commented on pull request #11715: `Exptected` rename to `Expected`

2022-01-26 Thread GitBox


Kvicii commented on pull request #11715:
URL: https://github.com/apache/kafka/pull/11715#issuecomment-1022211436


   @showuon hi, cloud you help me.
   PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11691: KAFKA-13598: enable idempotence producer by default and validate the configs

2022-01-26 Thread GitBox


showuon commented on a change in pull request #11691:
URL: https://github.com/apache/kafka/pull/11691#discussion_r792651448



##
File path: 
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
##
@@ -226,6 +227,11 @@ public Password getPassword(String key) {
 return keys;
 }
 
+public boolean hasKeyInOriginals(String configKey) {
+Objects.requireNonNull(configKey, "config key cannot be null");

Review comment:
   Make sense! We can rely on the underlying map.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] Kvicii opened a new pull request #11715: `Exptected` rename to `Expected`

2022-01-26 Thread GitBox


Kvicii opened a new pull request #11715:
URL: https://github.com/apache/kafka/pull/11715


   BatchAccumulator `Exptected` rename to `Expected`
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon edited a comment on pull request #11705: [WIP] KAFKA-9847: add config to set default store type

2022-01-26 Thread GitBox


showuon edited a comment on pull request #11705:
URL: https://github.com/apache/kafka/pull/11705#issuecomment-1022206850


   @mjsax , I know the KIP is still under discussion about the `Stores` API. 
But I think the PR is ready for pre-review right now (if you have time :)).
   
   For the `Stores` API change, I'll update it after we reach a conclusion.
   Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #11705: [WIP] KAFKA-9847: add config to set default store type

2022-01-26 Thread GitBox


showuon commented on pull request #11705:
URL: https://github.com/apache/kafka/pull/11705#issuecomment-1022206850


   @mjsax , I know the KIP is still having discussion about the `Stores` API. 
But I think the PR is ready for pre-review right now (if you have time :)).
   
   For the `Stores` API change, I'll update it after we reach a conclusion.
   Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13618) BatchAccumulator `Exptected` rename to `Expected`

2022-01-26 Thread Kvicii.Yu (Jira)
Kvicii.Yu created KAFKA-13618:
-

 Summary: BatchAccumulator `Exptected` rename to `Expected`
 Key: KAFKA-13618
 URL: https://issues.apache.org/jira/browse/KAFKA-13618
 Project: Kafka
  Issue Type: Improvement
Reporter: Kvicii.Yu
Assignee: Kvicii.Yu






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] mimaison commented on a change in pull request #11382: KAFKA-13348: Allow Source Tasks to Handle Producer Exceptions

2022-01-26 Thread GitBox


mimaison commented on a change in pull request #11382:
URL: https://github.com/apache/kafka/pull/11382#discussion_r792539420



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##
@@ -364,9 +365,16 @@ private boolean sendRecords() {
 producerRecord,
 (recordMetadata, e) -> {
 if (e != null) {
-log.error("{} failed to send record to {}: ", 
WorkerSourceTask.this, topic, e);
-log.trace("{} Failed record: {}", 
WorkerSourceTask.this, preTransformRecord);
-producerSendException.compareAndSet(null, e);
+if 
(retryWithToleranceOperator.getErrorToleranceType().equals(ToleranceType.ALL)) {

Review comment:
   We can use `==` to compare enums.

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
##
@@ -229,6 +246,13 @@ public synchronized boolean withinToleranceLimits() {
 }
 }
 
+// For source connectors that want to skip kafka producer errors.
+// They cannot use withinToleranceLimits() as no failure may have actually 
occurred prior to the producer failing
+// to write to kafka.
+public synchronized ToleranceType getErrorToleranceType() {

Review comment:
   Does this need to be `synchronized`?

##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
##
@@ -222,6 +222,13 @@ private void createWorkerTask() {
 createWorkerTask(TargetState.STARTED);
 }
 
+private void createWorkerTaskWithErrorToleration() {

Review comment:
   Can we reuse the `createWorkerTask()` method just below by passing a 
`RetryWithToleranceOperator` argument instead of creating the 
`WorkerSourceTask` object here?

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##
@@ -364,9 +365,16 @@ private boolean sendRecords() {
 producerRecord,
 (recordMetadata, e) -> {
 if (e != null) {
-log.error("{} failed to send record to {}: ", 
WorkerSourceTask.this, topic, e);
-log.trace("{} Failed record: {}", 
WorkerSourceTask.this, preTransformRecord);
-producerSendException.compareAndSet(null, e);
+if 
(retryWithToleranceOperator.getErrorToleranceType().equals(ToleranceType.ALL)) {
+// executeFailed here allows the use of 
existing logging infrastructure/configuration
+
retryWithToleranceOperator.executeFailed(Stage.KAFKA_PRODUCE, 
WorkerSourceTask.class,
+preTransformRecord, e);
+commitTaskRecord(preTransformRecord, null);

Review comment:
   Should we have a debug/trace log in this path?

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
##
@@ -111,6 +111,23 @@ public RetryWithToleranceOperator(long errorRetryTimeout, 
long errorMaxDelayInMi
 return errantRecordFuture;
 }
 
+public synchronized Future executeFailed(Stage stage, Class 
executingClass,
+   SourceRecord sourceRecord,
+   Throwable error) {
+
+markAsFailed();
+context.sourceRecord(sourceRecord);
+context.currentContext(stage, executingClass);
+context.error(error);
+errorHandlingMetrics.recordFailure();
+Future errantRecordFuture = context.report();
+if (!withinToleranceLimits()) {
+errorHandlingMetrics.recordError();
+throw new ConnectException("Tolerance exceeded in error handler", 
error);

Review comment:
   Now that this message can come from 2 different paths, should we add 
some context to the message to disambiguate them?

##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
##
@@ -815,6 +822,32 @@ public void testSendRecordsTaskCommitRecordFail() throws 
Exception {
 PowerMock.verifyAll();
 }
 
+@Test
+public void testSourceTaskIgnoresProducerException() throws Exception {
+createWorkerTaskWithErrorToleration();
+expectTopicCreation(TOPIC);
+
+// send two records
+// record 1 will succeed
+// record 2 will invoke the producer's failure callback, but ignore 
the exception via retryOperator
+// and no ConnectException will be thrown
+SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+

[jira] [Resolved] (KAFKA-9279) Silent data loss in Kafka producer

2022-01-26 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9279?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison resolved KAFKA-9279.
---
Fix Version/s: 3.2.0
   Resolution: Fixed

> Silent data loss in Kafka producer
> --
>
> Key: KAFKA-9279
> URL: https://issues.apache.org/jira/browse/KAFKA-9279
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 2.3.0
>Reporter: Andrew Klopper
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 3.2.0
>
>
> It appears that it is possible for a producer.commitTransaction() call to 
> succeed even if an individual producer.send() call has failed. The following 
> code demonstrates the issue:
> {code:java}
> package org.example.dataloss;
> import java.nio.charset.StandardCharsets;
> import java.util.Properties;
> import java.util.Random;
> import org.apache.kafka.clients.producer.KafkaProducer;
> import org.apache.kafka.clients.producer.ProducerConfig;
> import org.apache.kafka.clients.producer.ProducerRecord;
> import org.apache.kafka.common.serialization.ByteArraySerializer;
> public class Main {
> public static void main(final String[] args) {
> final Properties producerProps = new Properties();
> if (args.length != 2) {
> System.err.println("Invalid command-line arguments");
> System.exit(1);
> }
> final String bootstrapServer = args[0];
> final String topic = args[1];
> producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
> bootstrapServer);
> producerProps.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "50");
> producerProps.setProperty(ProducerConfig.LINGER_MS_CONFIG, "1000");
> producerProps.setProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 
> "100");
> producerProps.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, 
> "true");
> producerProps.setProperty(ProducerConfig.CLIENT_ID_CONFIG, 
> "dataloss_01");
> producerProps.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, 
> "dataloss_01");
> try (final KafkaProducer producer = new 
> KafkaProducer<>(producerProps, new ByteArraySerializer(), new 
> ByteArraySerializer())) {
> producer.initTransactions();
> producer.beginTransaction();
> final Random random = new Random();
> final byte[] largePayload = new byte[200];
> random.nextBytes(largePayload);
> producer.send(
> new ProducerRecord<>(
> topic,
> "large".getBytes(StandardCharsets.UTF_8),
> largePayload
> ),
> (metadata, e) -> {
> if (e == null) {
> System.out.println("INFO: Large payload succeeded");
> } else {
> System.err.printf("ERROR: Large payload failed: 
> %s\n", e.getMessage());
> }
> }
> );
> producer.commitTransaction();
> System.out.println("Commit succeeded");
> } catch (final Exception e) {
> System.err.printf("FATAL ERROR: %s", e.getMessage());
> }
> }
> }
> {code}
> The code prints the following output:
> {code:java}
> ERROR: Large payload failed: The message is 293 bytes when serialized 
> which is larger than the maximum request size you have configured with the 
> max.request.size configuration.
> Commit succeeded{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] mimaison merged pull request #11508: KAFKA-9279: Fail producer transactions for asynchronously-reported, synchronously-encountered ApiExceptions

2022-01-26 Thread GitBox


mimaison merged pull request #11508:
URL: https://github.com/apache/kafka/pull/11508


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13617) Enhance delete-expired-group-metadata logging

2022-01-26 Thread Nicolas Guyomar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nicolas Guyomar updated KAFKA-13617:

Description: 
Hi team,

When you try to understand why a consumer group offset was expired, the default 
INFO logging on the group coordinator does not give much : 

info(s"Removed $numOffsetsRemoved expired offsets in ${time.milliseconds() - 
currentTimestamp} milliseconds.")

[https://github.com/apache/kafka/blob/22d056c9b76c9bf8417d8701594d1fcee1c6a655/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L834]

Considering the rest of the available logs are at TRACE level if I'm not 
mistaking, and my use is mostly root cause analysis type of work, the INFO log 
is mostly what Kafka Admin get "post offset deletion"

Would it be possible to enhance this log with the actual group/topic/partition 
that was removed please ? 

Thank you

  was:
Hi team,

When you try to understand why a consumer group offset was expired, the default 
INFO logging on the group coordinator does not give much : 

info(s"Removed $numOffsetsRemoved expired offsets in ${time.milliseconds() - 
currentTimestamp} milliseconds.")

[https://github.com/apache/kafka/blob/22d056c9b76c9bf8417d8701594d1fcee1c6a655/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L834]

 

Would it be possible to enhance this log with the actual group/topic/partition 
that was removed please ? 

Thank you


> Enhance delete-expired-group-metadata logging
> -
>
> Key: KAFKA-13617
> URL: https://issues.apache.org/jira/browse/KAFKA-13617
> Project: Kafka
>  Issue Type: Improvement
>  Components: offset manager
>Reporter: Nicolas Guyomar
>Priority: Minor
>
> Hi team,
> When you try to understand why a consumer group offset was expired, the 
> default INFO logging on the group coordinator does not give much : 
> info(s"Removed $numOffsetsRemoved expired offsets in ${time.milliseconds() - 
> currentTimestamp} milliseconds.")
> [https://github.com/apache/kafka/blob/22d056c9b76c9bf8417d8701594d1fcee1c6a655/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L834]
> Considering the rest of the available logs are at TRACE level if I'm not 
> mistaking, and my use is mostly root cause analysis type of work, the INFO 
> log is mostly what Kafka Admin get "post offset deletion"
> Would it be possible to enhance this log with the actual 
> group/topic/partition that was removed please ? 
> Thank you



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13617) Enhance delete-expired-group-metadata logging

2022-01-26 Thread Nicolas Guyomar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13617?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17482351#comment-17482351
 ] 

Nicolas Guyomar commented on KAFKA-13617:
-

Hi [~joecqupt] , indeed I'll edit my description, my use case is mostly root 
cause analysis post deletion, hence I only have the INFO ones available

> Enhance delete-expired-group-metadata logging
> -
>
> Key: KAFKA-13617
> URL: https://issues.apache.org/jira/browse/KAFKA-13617
> Project: Kafka
>  Issue Type: Improvement
>  Components: offset manager
>Reporter: Nicolas Guyomar
>Priority: Minor
>
> Hi team,
> When you try to understand why a consumer group offset was expired, the 
> default INFO logging on the group coordinator does not give much : 
> info(s"Removed $numOffsetsRemoved expired offsets in ${time.milliseconds() - 
> currentTimestamp} milliseconds.")
> [https://github.com/apache/kafka/blob/22d056c9b76c9bf8417d8701594d1fcee1c6a655/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L834]
>  
> Would it be possible to enhance this log with the actual 
> group/topic/partition that was removed please ? 
> Thank you



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-7572) Producer should not send requests with negative partition id

2022-01-26 Thread Wenhao Ji (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7572?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17482323#comment-17482323
 ] 

Wenhao Ji commented on KAFKA-7572:
--

[~guozhang] Would you mind reviewing the pr 
[#10525|https://github.com/apache/kafka/pull/10525] for me?

> Producer should not send requests with negative partition id
> 
>
> Key: KAFKA-7572
> URL: https://issues.apache.org/jira/browse/KAFKA-7572
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.0.1, 1.1.1
>Reporter: Yaodong Yang
>Assignee: Wenhao Ji
>Priority: Major
>  Labels: patch-available
>
> h3. Issue:
> In one Kafka producer log from our users, we found the following weird one:
> timestamp="2018-10-09T17:37:41,237-0700",level="ERROR", Message="Write to 
> Kafka failed with: ",exception="java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for 
> topicName--2: 30042 ms has passed since batch creation plus linger time
>  at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:94)
>  at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:64)
>  at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:29)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1 
> record(s) for topicName--2: 30042 ms has passed since batch creation plus 
> linger time"
> After a few hours debugging, we finally understood the root cause of this 
> issue:
>  # The producer used a buggy custom Partitioner, which sometimes generates 
> negative partition ids for new records.
>  # The corresponding produce requests were rejected by brokers, because it's 
> illegal to have a partition with a negative id.
>  # The client kept refreshing its local cluster metadata, but could not send 
> produce requests successfully.
>  # From the above log, we found a suspicious string "topicName--2":
>  # According to the source code, the format of this string in the log is 
> TopicName+"-"+PartitionId.
>  # It's not easy to notice that there were 2 consecutive dash in the above 
> log.
>  # Eventually, we found that the second dash was a negative sign. Therefore, 
> the partition id is -2, rather than 2.
>  # The bug the custom Partitioner.
> h3. Proposal:
>  # Producer code should check the partitionId before sending requests to 
> brokers.
>  # If there is a negative partition Id, just throw an IllegalStateException{{ 
> }}exception.
>  # Such a quick check can save lots of time for people debugging their 
> producer code.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] predatorray commented on pull request #10525: KAFKA-7572: Producer should not send requests with negative partition id

2022-01-26 Thread GitBox


predatorray commented on pull request #10525:
URL: https://github.com/apache/kafka/pull/10525#issuecomment-1021977962


   @guozhangwang Would you mind reviewing this pr for me?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org