[jira] [Updated] (KAFKA-16196) Cast transform doesn't handle invalid whole value casts gracefully

2024-01-26 Thread Yash Mayya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16196?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yash Mayya updated KAFKA-16196:
---
Fix Version/s: 3.8.0

> Cast transform doesn't handle invalid whole value casts gracefully
> --
>
> Key: KAFKA-16196
> URL: https://issues.apache.org/jira/browse/KAFKA-16196
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: Yash Mayya
>Assignee: Yash Mayya
>Priority: Major
> Fix For: 3.8.0
>
>
> The Cast transform currently doesn't handle invalid whole value casts 
> gracefully. A whole value cast is configured like \{"spec": "int8"} as 
> opposed to a field level cast like \{"spec": "field1:int8"}.
>  
> If an invalid field level cast is specified (for instance - \{"spec": 
> "field1:invalid"}), this results in a {{ConfigException}} being thrown here - 
> [https://github.com/apache/kafka/blob/5f410ceb04878ca44d2d007655155b5303a47907/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java#L416]
>  which is handled gracefully as a validation error here - 
> [https://github.com/apache/kafka/blob/5f410ceb04878ca44d2d007655155b5303a47907/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java#L605-L609]
>  
> However, invalid whole value casts (for instance - \{"spec": "invalid"}) 
> aren't handled appropriately and result in an 
> {{IllegalArgumentException}} being thrown, which surfaces as an uncaught 
> exception and a {{500 Internal Server Error}} response from the connector 
> create / update / config validation REST API endpoint.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16196) Cast transform doesn't handle invalid whole value casts gracefully

2024-01-25 Thread Yash Mayya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16196?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yash Mayya updated KAFKA-16196:
---
Description: 
The Cast transform currently doesn't handle invalid whole value casts 
gracefully. A whole value cast is configured like \{"spec": "int8"} as opposed 
to a field level cast like \{"spec": "field1:int8"}.

 

If an invalid field level cast is specified (for instance - \{"spec": 
"field1:invalid"}), this results in a {{ConfigException}} being thrown here - 
[https://github.com/apache/kafka/blob/5f410ceb04878ca44d2d007655155b5303a47907/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java#L416]
 which is handled gracefully as a validation error here - 
[https://github.com/apache/kafka/blob/5f410ceb04878ca44d2d007655155b5303a47907/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java#L605-L609]

 

However, invalid whole value casts (for instance - \{"spec": "invalid"}) aren't 
handled appropriately and result in an 
{{IllegalArgumentException}} being thrown, which surfaces as an uncaught 
exception and a {{500 Internal Server Error}} response from the connector 
create / update / config validation REST API endpoint.

  was:
The Cast transform currently doesn't handle invalid whole value casts 
gracefully. A whole value cast is configured like \{"spec": "int8"} as opposed 
to a field level cast like {{{}{"spec": "field1:int8"{.

 

If an invalid field level cast is specified (for instance - {{{}{"spec": 
"field1:invalid"{), this results in a {{ConfigException}} being thrown here 
- 
[https://github.com/apache/kafka/blob/5f410ceb04878ca44d2d007655155b5303a47907/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java#L416]
 which is handled gracefully as a validation error here - 
[https://github.com/apache/kafka/blob/5f410ceb04878ca44d2d007655155b5303a47907/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java#L605-L609]

 

However, invalid whole value casts (for instance - {{{}{"spec": "invalid"{) 
aren't handled appropriately and result in an 
{{IllegalArgumentException}} being thrown, which surfaces as an uncaught 
exception and a {{500 Internal Server Error}} response from the connector 
create / update / config validation REST API endpoint.


> Cast transform doesn't handle invalid whole value casts gracefully
> --
>
> Key: KAFKA-16196
> URL: https://issues.apache.org/jira/browse/KAFKA-16196
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: Yash Mayya
>Assignee: Yash Mayya
>Priority: Major
>
> The Cast transform currently doesn't handle invalid whole value casts 
> gracefully. A whole value cast is configured like \{"spec": "int8"} as 
> opposed to a field level cast like \{"spec": "field1:int8"}.
>  
> If an invalid field level cast is specified (for instance - \{"spec": 
> "field1:invalid"}), this results in a {{ConfigException}} being thrown here - 
> [https://github.com/apache/kafka/blob/5f410ceb04878ca44d2d007655155b5303a47907/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java#L416]
>  which is handled gracefully as a validation error here - 
> [https://github.com/apache/kafka/blob/5f410ceb04878ca44d2d007655155b5303a47907/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java#L605-L609]
>  
> However, invalid whole value casts (for instance - \{"spec": "invalid"}) 
> aren't handled appropriately and result in an 
> {{IllegalArgumentException}} being thrown, which surfaces as an uncaught 
> exception and a {{500 Internal Server Error}} response from the connector 
> create / update / config validation REST API endpoint.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16196) Cast transform doesn't handle invalid whole value casts gracefully

2024-01-25 Thread Yash Mayya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16196?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yash Mayya updated KAFKA-16196:
---
Description: 
The Cast transform currently doesn't handle invalid whole value casts 
gracefully. A whole value cast is configured like \{"spec": "int8"} as opposed 
to a field level cast like {{{}{"spec": "field1:int8"{.

 

If an invalid field level cast is specified (for instance - {{{}{"spec": 
"field1:invalid"{), this results in a {{ConfigException}} being thrown here 
- 
[https://github.com/apache/kafka/blob/5f410ceb04878ca44d2d007655155b5303a47907/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java#L416]
 which is handled gracefully as a validation error here - 
[https://github.com/apache/kafka/blob/5f410ceb04878ca44d2d007655155b5303a47907/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java#L605-L609]

 

However, invalid whole value casts (for instance - {{{}{"spec": "invalid"{) 
aren't handled appropriately and result in an 
{{IllegalArgumentException}} being thrown, which surfaces as an uncaught 
exception and a {{500 Internal Server Error}} response from the connector 
create / update / config validation REST API endpoint.

  was:
The Cast transform currently doesn't handle invalid whole value casts 
gracefully. A whole value cast is configured like {{{"spec": "int8"}}} as 
opposed to a field level cast like {{{}{"spec": "field1:int8"{.

 

If an invalid field level cast is specified (for instance - {{{}{"spec": 
"field1:invalid"{), this results in a {{ConfigException}} being thrown here 
- 
[https://github.com/apache/kafka/blob/5f410ceb04878ca44d2d007655155b5303a47907/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java#L416]
 which is handled gracefully as a validation error here - 
[https://github.com/apache/kafka/blob/5f410ceb04878ca44d2d007655155b5303a47907/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java#L605-L609]

 

However, invalid whole value casts (for instance - {{{}{"spec": "invalid"{) 
aren't handled appropriately and result in an 
{{IllegalArgumentException}} being thrown, which surfaces as an uncaught 
exception and a {{500 Internal Server Error}} response from the connector 
create / update / config validation REST API endpoint.


> Cast transform doesn't handle invalid whole value casts gracefully
> --
>
> Key: KAFKA-16196
> URL: https://issues.apache.org/jira/browse/KAFKA-16196
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: Yash Mayya
>Assignee: Yash Mayya
>Priority: Major
>
> The Cast transform currently doesn't handle invalid whole value casts 
> gracefully. A whole value cast is configured like \{"spec": "int8"} as 
> opposed to a field level cast like {{{}{"spec": "field1:int8"{.
>  
> If an invalid field level cast is specified (for instance - {{{}{"spec": 
> "field1:invalid"{), this results in a {{ConfigException}} being thrown 
> here - 
> [https://github.com/apache/kafka/blob/5f410ceb04878ca44d2d007655155b5303a47907/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java#L416]
>  which is handled gracefully as a validation error here - 
> [https://github.com/apache/kafka/blob/5f410ceb04878ca44d2d007655155b5303a47907/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java#L605-L609]
>  
> However, invalid whole value casts (for instance - {{{}{"spec": 
> "invalid"{) aren't handled appropriately and result in an 
> {{IllegalArgumentException}} being thrown, which surfaces as an uncaught 
> exception and a {{500 Internal Server Error}} response from the connector 
> create / update / config validation REST API endpoint.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16196) Cast transform doesn't handle invalid whole value casts gracefully

2024-01-25 Thread Yash Mayya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16196?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yash Mayya updated KAFKA-16196:
---
Description: 
The Cast transform currently doesn't handle invalid whole value casts 
gracefully. A whole value cast is configured like {{{"spec": "int8"}}} as 
opposed to a field level cast like {{{}{"spec": "field1:int8"{.

 

If an invalid field level cast is specified (for instance - {{{}{"spec": 
"field1:invalid"{), this results in a {{ConfigException}} being thrown here 
- 
[https://github.com/apache/kafka/blob/5f410ceb04878ca44d2d007655155b5303a47907/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java#L416]
 which is handled gracefully as a validation error here - 
[https://github.com/apache/kafka/blob/5f410ceb04878ca44d2d007655155b5303a47907/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java#L605-L609]

 

However, invalid whole value casts (for instance - {{{}{"spec": "invalid"{) 
aren't handled appropriately and result in an 
{{IllegalArgumentException}} being thrown, which surfaces as an uncaught 
exception and a {{500 Internal Server Error}} response from the connector 
create / update / config validation REST API endpoint.

  was:
The Cast transform currently doesn't handle invalid whole value casts 
gracefully. A whole value cast is configured like {{{"spec": "int8"}}} as 
opposed to a field level cast like {{{}{"spec": "field1:int8"{.

 

If an invalid field level cast is specified (for instance - {{{}{"spec": 
"field1:invalid"{), this results in a {{ConfigException}} being thrown here 
- 
[https://github.com/apache/kafka/blob/5f410ceb04878ca44d2d007655155b5303a47907/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java#L416]
 which is handled gracefully as a validation error here - 
[https://github.com/apache/kafka/blob/5f410ceb04878ca44d2d007655155b5303a47907/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java#L605-L609]

 

However, invalid whole value casts aren't handled appropriately and result in 
an 
{{IllegalArgumentException}} being thrown, which surfaces as an uncaught 
exception and a {{500 Internal Server Error}} response from the connector 
create / update / config validation REST API endpoint.


> Cast transform doesn't handle invalid whole value casts gracefully
> --
>
> Key: KAFKA-16196
> URL: https://issues.apache.org/jira/browse/KAFKA-16196
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: Yash Mayya
>Assignee: Yash Mayya
>Priority: Major
>
> The Cast transform currently doesn't handle invalid whole value casts 
> gracefully. A whole value cast is configured like {{{"spec": "int8"}}} as 
> opposed to a field level cast like {{{}{"spec": "field1:int8"{.
>  
> If an invalid field level cast is specified (for instance - {{{}{"spec": 
> "field1:invalid"{), this results in a {{ConfigException}} being thrown 
> here - 
> [https://github.com/apache/kafka/blob/5f410ceb04878ca44d2d007655155b5303a47907/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java#L416]
>  which is handled gracefully as a validation error here - 
> [https://github.com/apache/kafka/blob/5f410ceb04878ca44d2d007655155b5303a47907/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java#L605-L609]
>  
> However, invalid whole value casts (for instance - {{{}{"spec": 
> "invalid"{) aren't handled appropriately and result in an 
> {{IllegalArgumentException}} being thrown, which surfaces as an uncaught 
> exception and a {{500 Internal Server Error}} response from the connector 
> create / update / config validation REST API endpoint.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16196) Cast transform doesn't handle invalid whole value casts gracefully

2024-01-25 Thread Yash Mayya (Jira)
Yash Mayya created KAFKA-16196:
--

 Summary: Cast transform doesn't handle invalid whole value casts 
gracefully
 Key: KAFKA-16196
 URL: https://issues.apache.org/jira/browse/KAFKA-16196
 Project: Kafka
  Issue Type: Bug
  Components: connect
Reporter: Yash Mayya
Assignee: Yash Mayya


The Cast transform currently doesn't handle invalid whole value casts 
gracefully. A whole value cast is configured like {{{"spec": "int8"}}} as 
opposed to a field level cast like {{{}{"spec": "field1:int8"{.

 

If an invalid field level cast is specified (for instance - {{{}{"spec": 
"field1:invalid"{), this results in a {{ConfigException}} being thrown here 
- 
[https://github.com/apache/kafka/blob/5f410ceb04878ca44d2d007655155b5303a47907/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java#L416]
 which is handled gracefully as a validation error here - 
[https://github.com/apache/kafka/blob/5f410ceb04878ca44d2d007655155b5303a47907/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java#L605-L609]

 

However, invalid whole value casts aren't handled appropriately and result in 
an 
{{IllegalArgumentException}} being thrown, which surfaces as an uncaught 
exception and a {{500 Internal Server Error}} response from the connector 
create / update / config validation REST API endpoint.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15888) DistributedHerder log context should not use the same client ID for each Connect worker by default

2023-11-22 Thread Yash Mayya (Jira)
Yash Mayya created KAFKA-15888:
--

 Summary: DistributedHerder log context should not use the same 
client ID for each Connect worker by default
 Key: KAFKA-15888
 URL: https://issues.apache.org/jira/browse/KAFKA-15888
 Project: Kafka
  Issue Type: Bug
  Components: connect, KafkaConnect
Reporter: Yash Mayya
Assignee: Yash Mayya


By default, if there is no "{{{}client.id"{}}} configured on a Connect worker 
running in distributed mode, the same client ID ("connect-1") will be used in 
the log context for the DistributedHerder class in every single worker in the 
Connect cluster. This default is quite confusing and obviously not very useful. 
Further, based on how this default is configured 
([ref|https://github.com/apache/kafka/blob/150b0e8290cda57df668ba89f6b422719866de5a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L299]),
 it seems like this might have been an unintentional bug. We could simply use 
the workerId (the advertised host name and port of the worker) by default 
instead, which should be unique for each worker in a cluster.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15570) Add unit tests for MemoryConfigBackingStore

2023-10-10 Thread Yash Mayya (Jira)
Yash Mayya created KAFKA-15570:
--

 Summary: Add unit tests for MemoryConfigBackingStore
 Key: KAFKA-15570
 URL: https://issues.apache.org/jira/browse/KAFKA-15570
 Project: Kafka
  Issue Type: Test
  Components: connect, KafkaConnect
Reporter: Yash Mayya
Assignee: Yash Mayya


Currently, the 
[MemoryConfigBackingStore|https://github.com/apache/kafka/blob/6e164bb9ace3ea7a1a9542904d1a01c9fd3a1b48/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java#L37]
 class doesn't have any unit tests for its functionality. While most of its 
functionality is fairly lightweight today, changes will be introduced with 
[KIP-980|https://cwiki.apache.org/confluence/display/KAFKA/KIP-980%3A+Allow+creating+connectors+in+a+stopped+state]
 (potentially 
[KIP-976|https://cwiki.apache.org/confluence/display/KAFKA/KIP-976%3A+Cluster-wide+dynamic+log+adjustment+for+Kafka+Connect]
 as well) and it would be good to have a test setup in place before those 
changes are made.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15547) Thread leak in MirrorMakerConfigTest#testClientConfigProperties

2023-10-04 Thread Yash Mayya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15547?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yash Mayya updated KAFKA-15547:
---
Issue Type: Test  (was: Bug)

> Thread leak in MirrorMakerConfigTest#testClientConfigProperties
> ---
>
> Key: KAFKA-15547
> URL: https://issues.apache.org/jira/browse/KAFKA-15547
> Project: Kafka
>  Issue Type: Test
>Reporter: Kalpesh Patel
>Assignee: Kalpesh Patel
>Priority: Minor
> Fix For: 3.7.0
>
>
> The test MirrorMakerConfigTest#testClientConfigProperties opens a 
> ForwardingAdmin but fails to close it.
> we should enclose this in a try-with-resources statement to ensure the Admin 
> client is closed and there is no thread leak



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15547) Thread leak in MirrorMakerConfigTest#testClientConfigProperties

2023-10-04 Thread Yash Mayya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15547?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yash Mayya resolved KAFKA-15547.

Fix Version/s: 3.7.0
   Resolution: Fixed

> Thread leak in MirrorMakerConfigTest#testClientConfigProperties
> ---
>
> Key: KAFKA-15547
> URL: https://issues.apache.org/jira/browse/KAFKA-15547
> Project: Kafka
>  Issue Type: Bug
>Reporter: Kalpesh Patel
>Assignee: Kalpesh Patel
>Priority: Minor
> Fix For: 3.7.0
>
>
> The test MirrorMakerConfigTest#testClientConfigProperties opens a 
> ForwardingAdmin but fails to close it.
> we should enclose this in a try-with-resources statement to ensure the Admin 
> client is closed and there is no thread leak



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15177) MirrorMaker 2 should implement the alterOffsets KIP-875 API

2023-09-27 Thread Yash Mayya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15177?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yash Mayya resolved KAFKA-15177.

Fix Version/s: 3.6.0
   Resolution: Fixed

> MirrorMaker 2 should implement the alterOffsets KIP-875 API
> ---
>
> Key: KAFKA-15177
> URL: https://issues.apache.org/jira/browse/KAFKA-15177
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect, mirrormaker
>Reporter: Yash Mayya
>Assignee: Chris Egerton
>Priority: Minor
> Fix For: 3.6.0
>
>
> The {{MirrorSourceConnector}} class should implement the new alterOffsets API 
> added in 
> [KIP-875|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect].
>  We could also implement the API in 
> {{MirrorCheckpointConnector}} and 
> {{MirrorHeartbeatConnector}} to prevent external modification of offsets 
> since the operation wouldn't really make sense in their case.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15248) Add BooleanConverter to Kafka Connect

2023-09-26 Thread Yash Mayya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15248?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yash Mayya updated KAFKA-15248:
---
Fix Version/s: 3.7.0

> Add BooleanConverter to Kafka Connect
> -
>
> Key: KAFKA-15248
> URL: https://issues.apache.org/jira/browse/KAFKA-15248
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Hector Geraldino
>Assignee: Hector Geraldino
>Priority: Minor
> Fix For: 3.7.0
>
>
> KIP-959: Add BooleanConverter to Kafka Connect -> 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-959%3A+Add+BooleanConverter+to+Kafka+Connect



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15470) Allow creating connectors in a stopped state

2023-09-17 Thread Yash Mayya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15470?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17766107#comment-17766107
 ] 

Yash Mayya commented on KAFKA-15470:


https://cwiki.apache.org/confluence/display/KAFKA/KIP-980%3A+Allow+creating+connectors+in+a+stopped+state

> Allow creating connectors in a stopped state
> 
>
> Key: KAFKA-15470
> URL: https://issues.apache.org/jira/browse/KAFKA-15470
> Project: Kafka
>  Issue Type: New Feature
>  Components: connect, KafkaConnect
>Reporter: Yash Mayya
>Assignee: Yash Mayya
>Priority: Major
>  Labels: connect, kafka-connect, kip-required
> Fix For: 3.7.0
>
>
> [KIP-875: First-class offsets support in Kafka 
> Connect|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect]
>  introduced a new {{STOPPED}} state for connectors along with some REST API 
> endpoints to retrieve and modify offsets for connectors. Currently, only 
> connectors that already exist can be stopped and any newly created connector 
> will always be in the {{RUNNING}} state initially. Allowing the creation of 
> connectors in a {{STOPPED}} state will facilitate multiple new use cases. One 
> interesting use case would be to migrate connectors from one Kafka Connect 
> cluster to another. Individual connector migration would be useful in a 
> number of scenarios such as breaking a large cluster into multiple smaller 
> clusters (or vice versa), moving a connector from a cluster running in one 
> data center to another etc. A connector migration could be achieved by using 
> the following sequence of steps :-
>  # Stop the running connector on the original Kafka Connect cluster
>  # Retrieve the offsets for the connector via the {{GET 
> /connectors/\{connector}/offsets}}  endpoint
>  # Create the connector in a stopped state using the same configuration on 
> the new Kafka Connect cluster
>  # Alter the offsets for the connector on the new cluster via the {{PATCH 
> /connectors/\{connector}/offsets}}  endpoint (using the offsets obtained from 
> the original cluster)
>  # Resume the connector on the new cluster and delete it on the original 
> cluster
> Another use case for creating connectors in a stopped state could be 
> deploying connectors as a part of a larger data pipeline before the source / 
> sink data system has been created or is ready for data transfer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15470) Allow creating connectors in a stopped state

2023-09-15 Thread Yash Mayya (Jira)
Yash Mayya created KAFKA-15470:
--

 Summary: Allow creating connectors in a stopped state
 Key: KAFKA-15470
 URL: https://issues.apache.org/jira/browse/KAFKA-15470
 Project: Kafka
  Issue Type: New Feature
  Components: connect, KafkaConnect
Reporter: Yash Mayya
Assignee: Yash Mayya
 Fix For: 3.7.0


[KIP-875: First-class offsets support in Kafka 
Connect|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect]
 introduced a new {{STOPPED}} state for connectors along with some REST API 
endpoints to retrieve and modify offsets for connectors. Currently, only 
connectors that already exist can be stopped and any newly created connector 
will always be in the {{RUNNING}} state initially. Allowing the creation of 
connectors in a {{STOPPED}} state will facilitate multiple new use cases. One 
interesting use case would be to migrate connectors from one Kafka Connect 
cluster to another. Individual connector migration would be useful in a number 
of scenarios such as breaking a large cluster into multiple smaller clusters 
(or vice versa), moving a connector from a cluster running in one data center 
to another etc. A connector migration could be achieved by using the following 
sequence of steps :-
 # Stop the running connector on the original Kafka Connect cluster
 # Retrieve the offsets for the connector via the {{GET 
/connectors/\{connector}/offsets}}  endpoint
 # Create the connector in a stopped state using the same configuration on the 
new Kafka Connect cluster
 # Alter the offsets for the connector on the new cluster via the {{PATCH 
/connectors/\{connector}/offsets}}  endpoint (using the offsets obtained from 
the original cluster)
 # Resume the connector on the new cluster and delete it on the original cluster

Another use case for creating connectors in a stopped state could be deploying 
connectors as a part of a larger data pipeline before the source / sink data 
system has been created or is ready for data transfer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14067) Sink connector override.consumer.group.id can conflict with worker group.id

2023-09-12 Thread Yash Mayya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14067?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yash Mayya resolved KAFKA-14067.

Resolution: Fixed

> Sink connector override.consumer.group.id can conflict with worker group.id
> ---
>
> Key: KAFKA-14067
> URL: https://issues.apache.org/jira/browse/KAFKA-14067
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.3.0
>Reporter: Greg Harris
>Priority: Minor
> Fix For: 3.7.0
>
>
> Currently there is a validation step for connector names which prevents sink 
> connector consumer groups from colliding with the worker group.id.
> There is currently no such validation for consumer.override.group.id that 
> would prevent a conflicting connector from being configured, and so it is 
> possible to misconfigure a connector in a way that may be damaging to the 
> workers themselves.
> Reproduction steps:
> 1. Configure a connect distributed cluster with a certain group.id in the 
> worker config.
> 2. Configure a sink connector with consumer.override.group.id having the same 
> value as in the worker config
> Expected behavior:
> 1. An error is returned indicating that the consumer.override.group.id is 
> invalid
> 2. The connector is not created or started
> Actual behavior:
> 1. No error is returned, and the configuration is otherwise valid.
> 2. The connector is created and starts running.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14067) Sink connector override.consumer.group.id can conflict with worker group.id

2023-09-12 Thread Yash Mayya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14067?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yash Mayya updated KAFKA-14067:
---
Fix Version/s: 3.7.0

> Sink connector override.consumer.group.id can conflict with worker group.id
> ---
>
> Key: KAFKA-14067
> URL: https://issues.apache.org/jira/browse/KAFKA-14067
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.3.0
>Reporter: Greg Harris
>Priority: Minor
> Fix For: 3.7.0
>
>
> Currently there is a validation step for connector names which prevents sink 
> connector consumer groups from colliding with the worker group.id.
> There is currently no such validation for consumer.override.group.id that 
> would prevent a conflicting connector from being configured, and so it is 
> possible to misconfigure a connector in a way that may be damaging to the 
> workers themselves.
> Reproduction steps:
> 1. Configure a connect distributed cluster with a certain group.id in the 
> worker config.
> 2. Configure a sink connector with consumer.override.group.id having the same 
> value as in the worker config
> Expected behavior:
> 1. An error is returned indicating that the consumer.override.group.id is 
> invalid
> 2. The connector is not created or started
> Actual behavior:
> 1. No error is returned, and the configuration is otherwise valid.
> 2. The connector is created and starts running.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14067) Sink connector override.consumer.group.id can conflict with worker group.id

2023-09-12 Thread Yash Mayya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17764114#comment-17764114
 ] 

Yash Mayya commented on KAFKA-14067:


This has been fixed in [https://github.com/apache/kafka/pull/14303] 

> Sink connector override.consumer.group.id can conflict with worker group.id
> ---
>
> Key: KAFKA-14067
> URL: https://issues.apache.org/jira/browse/KAFKA-14067
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.3.0
>Reporter: Greg Harris
>Priority: Minor
>
> Currently there is a validation step for connector names which prevents sink 
> connector consumer groups from colliding with the worker group.id.
> There is currently no such validation for consumer.override.group.id that 
> would prevent a conflicting connector from being configured, and so it is 
> possible to misconfigure a connector in a way that may be damaging to the 
> workers themselves.
> Reproduction steps:
> 1. Configure a connect distributed cluster with a certain group.id in the 
> worker config.
> 2. Configure a sink connector with consumer.override.group.id having the same 
> value as in the worker config
> Expected behavior:
> 1. An error is returned indicating that the consumer.override.group.id is 
> invalid
> 2. The connector is not created or started
> Actual behavior:
> 1. No error is returned, and the configuration is otherwise valid.
> 2. The connector is created and starts running.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14855) Harden integration testing logic for asserting that a connector is deleted

2023-09-11 Thread Yash Mayya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14855?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yash Mayya reassigned KAFKA-14855:
--

Assignee: Yash Mayya

> Harden integration testing logic for asserting that a connector is deleted
> --
>
> Key: KAFKA-14855
> URL: https://issues.apache.org/jira/browse/KAFKA-14855
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Yash Mayya
>Priority: Minor
>
> In the Connect embedded integration testing framework, the 
> [EmbeddedConnectClusterAssertions::assertConnectorAndTasksAreStopped 
> method|https://github.com/apache/kafka/blob/31440b00f3ed8de65f368d41d6cf2efb07ca4a5c/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java#L411-L428]
>  is used in several places to verify that a connector has been deleted. (This 
> method may be renamed in an upcoming PR to something like 
> {{{}assertConnectorAndTasksAreNotRunning{}}}, but apart from that, its usage 
> and semantics will remain unchanged.) However, the [underlying logic for that 
> assertion|https://github.com/apache/kafka/blob/31440b00f3ed8de65f368d41d6cf2efb07ca4a5c/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java#L430-L451]
>  doesn't strictly check for deletion (which can be done by verifying that the 
> connector and its tasks no longer appear in the REST API at all), since it 
> also allows for the Connector or tasks to appear in the REST API, but with a 
> state that is not {{{}RUNNING{}}}.
> This constraint is a bit too lax and may be silently masking issues with our 
> shutdown logic for to-be-deleted connectors. We should try to narrow the 
> criteria for that method so that it fails if the Connector or any of its 
> tasks still appear in the REST API, even with a non-{{{}RUNNING{}}} state.
> However, we should also be careful to ensure that current uses of that method 
> are not relying on its semantics. If, for some reason, a test case requires 
> the existing semantics, we should evaluate whether it's necessary to continue 
> to rely on those semantics, and if so, probably preserve the existing method 
> so that it can be used wherever applicable (but rewrite all other tests to 
> use the new, stricter method).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14725) Improve cancellation semantics for connector tasks

2023-08-31 Thread Yash Mayya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14725?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17760929#comment-17760929
 ] 

Yash Mayya commented on KAFKA-14725:


[~ChrisEgerton] please feel free to take over this one, thanks. 

> Improve cancellation semantics for connector tasks
> --
>
> Key: KAFKA-14725
> URL: https://issues.apache.org/jira/browse/KAFKA-14725
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Yash Mayya
>Priority: Major
>
> This came about during discussion on 
> [https://github.com/apache/kafka/pull/13208/,] which addressed KAFKA-5756.
>  
> Right now, we make some effort to disable and shut down tasks that have been 
> scheduled for shutdown but taken longer than the [graceful shutdown timeout 
> period|https://kafka.apache.org/documentation.html#connectconfigs_task.shutdown.graceful.timeout.ms].
> The logic for performing this disablement is contained in the {{cancel}} 
> method for the 
> [WorkerTask|https://github.com/apache/kafka/blob/b9754747d6eaa029c4bb69b073d749ff8df15908/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java#L129-L136]
>  and its subclasses (at the time of writing, that would be the 
> [AbstractWorkerSourceTask|https://github.com/apache/kafka/blob/b9754747d6eaa029c4bb69b073d749ff8df15908/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java],
>  
> [WorkerSourceTask|https://github.com/apache/kafka/blob/b9754747d6eaa029c4bb69b073d749ff8df15908/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java],
>  
> [ExactlyOnceWorkerSourceTask|https://github.com/apache/kafka/blob/b9754747d6eaa029c4bb69b073d749ff8df15908/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java],
>  and 
> [WorkerSinkTask|https://github.com/apache/kafka/blob/b9754747d6eaa029c4bb69b073d749ff8df15908/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java]
>  classes). Right now we don't do much to interrupt in-progress operations, 
> which may lead to zombie tasks lying around on a worker that have not yet 
> relinquished resources like Kafka clients, file descriptors, or database 
> connections despite being scheduled for shutdown.
> We can and should make the cancellation logic for tasks more stringent, 
> including but not limited to:
>  * Interrupting the work thread for the task
>  * Interrupting any in-progress offset commits
>  * Preemptively shutting down any Kafka clients created for use by the task



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15387) Deprecate and remove Connect's redundant task configurations retrieval endpoint

2023-08-21 Thread Yash Mayya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15387?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yash Mayya updated KAFKA-15387:
---
Summary: Deprecate and remove Connect's redundant task configurations 
retrieval endpoint  (was: Deprecate and remove Connect's duplicate task 
configurations retrieval endpoint)

> Deprecate and remove Connect's redundant task configurations retrieval 
> endpoint
> ---
>
> Key: KAFKA-15387
> URL: https://issues.apache.org/jira/browse/KAFKA-15387
> Project: Kafka
>  Issue Type: Task
>  Components: KafkaConnect
>Reporter: Yash Mayya
>Assignee: Yash Mayya
>Priority: Minor
>  Labels: Connect, kafka-connect, kip-required
> Fix For: 4.0.0
>
>
> A new endpoint ({{{}GET /connectors/\{connector}/tasks-config){}}} was added 
> to Kafka Connect's REST API to expose task configurations in 
> [KIP-661|https://cwiki.apache.org/confluence/display/KAFKA/KIP-661%3A+Expose+task+configurations+in+Connect+REST+API].
>  However, the original patch for Kafka Connect's REST API had already added 
> an endpoint ({{{}GET /connectors/\{connector}/tasks){}}} to retrieve the list 
> of a connector's tasks and their configurations (ref - 
> [https://github.com/apache/kafka/pull/378] , 
> https://issues.apache.org/jira/browse/KAFKA-2369) and this was missed in 
> KIP-661. We can deprecate the endpoint added by KIP-661 in 3.7 (the next 
> minor AK release) and remove it in 4.0 (the next major AK release) since it's 
> redundant to have two separate endpoints to expose task configurations. 
> Related discussions in 
> [https://github.com/apache/kafka/pull/13424#discussion_r1144727886] and 
> https://issues.apache.org/jira/browse/KAFKA-15377 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15387) Deprecate and remove Connect's duplicate task configurations retrieval endpoint

2023-08-21 Thread Yash Mayya (Jira)
Yash Mayya created KAFKA-15387:
--

 Summary: Deprecate and remove Connect's duplicate task 
configurations retrieval endpoint
 Key: KAFKA-15387
 URL: https://issues.apache.org/jira/browse/KAFKA-15387
 Project: Kafka
  Issue Type: Task
  Components: KafkaConnect
Reporter: Yash Mayya
Assignee: Yash Mayya
 Fix For: 4.0.0


A new endpoint ({{{}GET /connectors/\{connector}/tasks-config){}}} was added to 
Kafka Connect's REST API to expose task configurations in 
[KIP-661|https://cwiki.apache.org/confluence/display/KAFKA/KIP-661%3A+Expose+task+configurations+in+Connect+REST+API].
 However, the original patch for Kafka Connect's REST API had already added an 
endpoint ({{{}GET /connectors/\{connector}/tasks){}}} to retrieve the list of a 
connector's tasks and their configurations (ref - 
[https://github.com/apache/kafka/pull/378] , 
https://issues.apache.org/jira/browse/KAFKA-2369) and this was missed in 
KIP-661. We can deprecate the endpoint added by KIP-661 in 3.7 (the next minor 
AK release) and remove it in 4.0 (the next major AK release) since it's 
redundant to have two separate endpoints to expose task configurations. Related 
discussions in 
[https://github.com/apache/kafka/pull/13424#discussion_r1144727886] and 
https://issues.apache.org/jira/browse/KAFKA-15377 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-15377) GET /connectors/{connector}/tasks-config endpoint exposes externalized secret values

2023-08-21 Thread Yash Mayya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17756798#comment-17756798
 ] 

Yash Mayya edited comment on KAFKA-15377 at 8/21/23 11:00 AM:
--

Although I'd like to note that since the endpoint won't be going away at least 
until 4.0 (assuming the KIP is accepted and implemented), we should probably 
still fix the security issue described in this ticket - 
[https://github.com/apache/kafka/pull/14244|https://github.com/apache/kafka/pull/14244.]


was (Author: yash.mayya):
Although I'd like to note that since the endpoint won't be going away at least 
until 4.0 (assuming the KIP is accepted and implemented), we should probably 
still fix the security issue described in this ticket - 
[https://github.com/apache/kafka/pull/14244.] 

> GET /connectors/{connector}/tasks-config endpoint exposes externalized secret 
> values
> 
>
> Key: KAFKA-15377
> URL: https://issues.apache.org/jira/browse/KAFKA-15377
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Yash Mayya
>Assignee: Yash Mayya
>Priority: Major
>
> The {{GET /connectors/\{connector}/tasks-config}} endpoint added in 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-661%3A+Expose+task+configurations+in+Connect+REST+API]
>  exposes externalized secret values in task configurations (see 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-297%3A+Externalizing+Secrets+for+Connect+Configurations)].
>  A similar bug was fixed in https://issues.apache.org/jira/browse/KAFKA-5117 
> / [https://github.com/apache/kafka/pull/6129] for the {{GET 
> /connectors/\{connector}/tasks}} endpoint. The config provider placeholder 
> should be used instead of the resolved config value.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-15377) GET /connectors/{connector}/tasks-config endpoint exposes externalized secret values

2023-08-21 Thread Yash Mayya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17756798#comment-17756798
 ] 

Yash Mayya edited comment on KAFKA-15377 at 8/21/23 11:00 AM:
--

Although I'd like to note that since the endpoint won't be going away at least 
until 4.0 (assuming the KIP is accepted and implemented), we should probably 
still fix the security issue described in this ticket - 
[https://github.com/apache/kafka/pull/14244|https://github.com/apache/kafka/pull/14244].


was (Author: yash.mayya):
Although I'd like to note that since the endpoint won't be going away at least 
until 4.0 (assuming the KIP is accepted and implemented), we should probably 
still fix the security issue described in this ticket - 
[https://github.com/apache/kafka/pull/14244|https://github.com/apache/kafka/pull/14244.]

> GET /connectors/{connector}/tasks-config endpoint exposes externalized secret 
> values
> 
>
> Key: KAFKA-15377
> URL: https://issues.apache.org/jira/browse/KAFKA-15377
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Yash Mayya
>Assignee: Yash Mayya
>Priority: Major
>
> The {{GET /connectors/\{connector}/tasks-config}} endpoint added in 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-661%3A+Expose+task+configurations+in+Connect+REST+API]
>  exposes externalized secret values in task configurations (see 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-297%3A+Externalizing+Secrets+for+Connect+Configurations)].
>  A similar bug was fixed in https://issues.apache.org/jira/browse/KAFKA-5117 
> / [https://github.com/apache/kafka/pull/6129] for the {{GET 
> /connectors/\{connector}/tasks}} endpoint. The config provider placeholder 
> should be used instead of the resolved config value.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15377) GET /connectors/{connector}/tasks-config endpoint exposes externalized secret values

2023-08-21 Thread Yash Mayya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17756798#comment-17756798
 ] 

Yash Mayya commented on KAFKA-15377:


Although I'd like to note that since the endpoint won't be going away at least 
until 4.0 (assuming the KIP is accepted and implemented), we should probably 
still fix the security issue described in this ticket - 
[https://github.com/apache/kafka/pull/14244.] 

> GET /connectors/{connector}/tasks-config endpoint exposes externalized secret 
> values
> 
>
> Key: KAFKA-15377
> URL: https://issues.apache.org/jira/browse/KAFKA-15377
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Yash Mayya
>Assignee: Yash Mayya
>Priority: Major
>
> The {{GET /connectors/\{connector}/tasks-config}} endpoint added in 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-661%3A+Expose+task+configurations+in+Connect+REST+API]
>  exposes externalized secret values in task configurations (see 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-297%3A+Externalizing+Secrets+for+Connect+Configurations)].
>  A similar bug was fixed in https://issues.apache.org/jira/browse/KAFKA-5117 
> / [https://github.com/apache/kafka/pull/6129] for the {{GET 
> /connectors/\{connector}/tasks}} endpoint. The config provider placeholder 
> should be used instead of the resolved config value.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15377) GET /connectors/{connector}/tasks-config endpoint exposes externalized secret values

2023-08-21 Thread Yash Mayya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17756797#comment-17756797
 ] 

Yash Mayya commented on KAFKA-15377:


Makes sense, thanks Mickael - I'll publish a small KIP soon.

> GET /connectors/{connector}/tasks-config endpoint exposes externalized secret 
> values
> 
>
> Key: KAFKA-15377
> URL: https://issues.apache.org/jira/browse/KAFKA-15377
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Yash Mayya
>Assignee: Yash Mayya
>Priority: Major
>
> The {{GET /connectors/\{connector}/tasks-config}} endpoint added in 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-661%3A+Expose+task+configurations+in+Connect+REST+API]
>  exposes externalized secret values in task configurations (see 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-297%3A+Externalizing+Secrets+for+Connect+Configurations)].
>  A similar bug was fixed in https://issues.apache.org/jira/browse/KAFKA-5117 
> / [https://github.com/apache/kafka/pull/6129] for the {{GET 
> /connectors/\{connector}/tasks}} endpoint. The config provider placeholder 
> should be used instead of the resolved config value.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15377) GET /connectors/{connector}/tasks-config endpoint exposes externalized secret values

2023-08-21 Thread Yash Mayya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17756787#comment-17756787
 ] 

Yash Mayya commented on KAFKA-15377:


Yeah, we'd discussed this previously here - 
[https://github.com/apache/kafka/pull/13424#discussion_r1144727886.] I'd be in 
favor of removing it completely; would we be able to do so with a single KIP 
(i.e. skipping deprecation followed by removal) targeting the next major 
release (4.0)?

> GET /connectors/{connector}/tasks-config endpoint exposes externalized secret 
> values
> 
>
> Key: KAFKA-15377
> URL: https://issues.apache.org/jira/browse/KAFKA-15377
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Yash Mayya
>Assignee: Yash Mayya
>Priority: Major
>
> The {{GET /connectors/\{connector}/tasks-config}} endpoint added in 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-661%3A+Expose+task+configurations+in+Connect+REST+API]
>  exposes externalized secret values in task configurations (see 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-297%3A+Externalizing+Secrets+for+Connect+Configurations)].
>  A similar bug was fixed in https://issues.apache.org/jira/browse/KAFKA-5117 
> / [https://github.com/apache/kafka/pull/6129] for the {{GET 
> /connectors/\{connector}/tasks}} endpoint. The config provider placeholder 
> should be used instead of the resolved config value.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-15377) GET /connectors/{connector}/tasks-config endpoint exposes externalized secret values

2023-08-18 Thread Yash Mayya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17755826#comment-17755826
 ] 

Yash Mayya edited comment on KAFKA-15377 at 8/18/23 7:13 AM:
-

[~mimaison] [~ChrisEgerton] even though this will technically change the 
response for a public REST API, I'm not sure it requires a KIP since it should 
be classified as a bug. What do you folks think?


was (Author: yash.mayya):
[~mimaison] [~ChrisEgerton] even though this does technically change the 
response for a public REST API, I'm not sure it requires a KIP since it should 
be classified as a bug. What do you folks think?

> GET /connectors/{connector}/tasks-config endpoint exposes externalized secret 
> values
> 
>
> Key: KAFKA-15377
> URL: https://issues.apache.org/jira/browse/KAFKA-15377
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Yash Mayya
>Assignee: Yash Mayya
>Priority: Major
>
> The {{GET /connectors/\{connector}/tasks-config}} endpoint added in 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-661%3A+Expose+task+configurations+in+Connect+REST+API]
>  exposes externalized secret values in task configurations (see 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-297%3A+Externalizing+Secrets+for+Connect+Configurations)].
>  A similar bug was fixed in https://issues.apache.org/jira/browse/KAFKA-5117 
> / [https://github.com/apache/kafka/pull/6129] for the {{GET 
> /connectors/\{connector}/tasks}} endpoint. The config provider placeholder 
> should be used instead of the resolved config value.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15377) GET /connectors/{connector}/tasks-config endpoint exposes externalized secret values

2023-08-18 Thread Yash Mayya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17755826#comment-17755826
 ] 

Yash Mayya commented on KAFKA-15377:


[~mimaison] [~ChrisEgerton] even though this does technically change the 
response for a public REST API, I'm not sure it requires a KIP since it should 
be classified as a bug. What do you folks think?

> GET /connectors/{connector}/tasks-config endpoint exposes externalized secret 
> values
> 
>
> Key: KAFKA-15377
> URL: https://issues.apache.org/jira/browse/KAFKA-15377
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Yash Mayya
>Assignee: Yash Mayya
>Priority: Major
>
> The {{GET /connectors/\{connector}/tasks-config}} endpoint added in 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-661%3A+Expose+task+configurations+in+Connect+REST+API]
>  exposes externalized secret values in task configurations (see 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-297%3A+Externalizing+Secrets+for+Connect+Configurations)].
>  A similar bug was fixed in https://issues.apache.org/jira/browse/KAFKA-5117 
> / [https://github.com/apache/kafka/pull/6129] for the {{GET 
> /connectors/\{connector}/tasks}} endpoint. The config provider placeholder 
> should be used instead of the resolved config value.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15377) GET /connectors/{connector}/tasks-config endpoint exposes externalized secret values

2023-08-18 Thread Yash Mayya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15377?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yash Mayya updated KAFKA-15377:
---
Description: The {{GET /connectors/\{connector}/tasks-config}} endpoint 
added in 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-661%3A+Expose+task+configurations+in+Connect+REST+API]
 exposes externalized secret values in task configurations (see 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-297%3A+Externalizing+Secrets+for+Connect+Configurations)].
 A similar bug was fixed in https://issues.apache.org/jira/browse/KAFKA-5117 / 
[https://github.com/apache/kafka/pull/6129] for the {{GET 
/connectors/\{connector}/tasks}} endpoint. The config provider placeholder 
should be used instead of the resolved config value.  (was: The \{{GET 
/connectors/{connector}/tasks-config}} endpoint added in 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-661%3A+Expose+task+configurations+in+Connect+REST+API]
 exposes externalized secret values in task configurations (see 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-297%3A+Externalizing+Secrets+for+Connect+Configurations)].
 A similar bug was fixed in https://issues.apache.org/jira/browse/KAFKA-5117 / 
[https://github.com/apache/kafka/pull/6129] for the \{{GET 
/connectors/{connector}/tasks}} endpoint. The config provider placeholder 
should be used instead of the resolved config value.)

> GET /connectors/{connector}/tasks-config endpoint exposes externalized secret 
> values
> 
>
> Key: KAFKA-15377
> URL: https://issues.apache.org/jira/browse/KAFKA-15377
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Yash Mayya
>Assignee: Yash Mayya
>Priority: Major
>
> The {{GET /connectors/\{connector}/tasks-config}} endpoint added in 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-661%3A+Expose+task+configurations+in+Connect+REST+API]
>  exposes externalized secret values in task configurations (see 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-297%3A+Externalizing+Secrets+for+Connect+Configurations)].
>  A similar bug was fixed in https://issues.apache.org/jira/browse/KAFKA-5117 
> / [https://github.com/apache/kafka/pull/6129] for the {{GET 
> /connectors/\{connector}/tasks}} endpoint. The config provider placeholder 
> should be used instead of the resolved config value.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15377) GET /connectors/{connector}/tasks-config endpoint exposes externalized secret values

2023-08-18 Thread Yash Mayya (Jira)
Yash Mayya created KAFKA-15377:
--

 Summary: GET /connectors/{connector}/tasks-config endpoint exposes 
externalized secret values
 Key: KAFKA-15377
 URL: https://issues.apache.org/jira/browse/KAFKA-15377
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: Yash Mayya
Assignee: Yash Mayya


The \{{GET /connectors/{connector}/tasks-config}} endpoint added in 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-661%3A+Expose+task+configurations+in+Connect+REST+API]
 exposes externalized secret values in task configurations (see 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-297%3A+Externalizing+Secrets+for+Connect+Configurations)].
 A similar bug was fixed in https://issues.apache.org/jira/browse/KAFKA-5117 / 
[https://github.com/apache/kafka/pull/6129] for the \{{GET 
/connectors/{connector}/tasks}} endpoint. The config provider placeholder 
should be used instead of the resolved config value.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14133) Remaining EasyMock to Mockito tests

2023-08-16 Thread Yash Mayya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yash Mayya updated KAFKA-14133:
---
Description: 
{color:#de350b}There are tests which use both PowerMock and EasyMock. I have 
put those in https://issues.apache.org/jira/browse/KAFKA-14132. Tests here rely 
solely on EasyMock.{color}

Unless stated in brackets the tests are in the streams module.

A list of tests which still require to be moved from EasyMock to Mockito as of 
2nd of August 2022 which do not have a Jira issue and do not have pull requests 
I am aware of which are opened:

{color:#ff8b00}In Review{color}
{color:#00875a}Merged{color}
 # {color:#00875a}WorkerConnectorTest{color} (connect) (owner: [~yash.mayya] )
 # {color:#00875a}WorkerCoordinatorTest{color} (connect) (owner: [~yash.mayya] )
 # {color:#00875a}RootResourceTest{color} (connect) (owner: [~yash.mayya] )
 # {color:#00875a}ByteArrayProducerRecordEquals{color} (connect) (owner: 
[~yash.mayya] )
 # {color:#00875a}KStreamFlatTransformTest{color} (owner: Christo)
 # {color:#00875a}KStreamFlatTransformValuesTest{color} (owner: Christo)
 # {color:#00875a}KStreamPrintTest{color} (owner: Christo)
 # {color:#00875a}KStreamRepartitionTest{color} (owner: Christo)
 # {color:#00875a}MaterializedInternalTest{color} (owner: Christo)
 # {color:#00875a}TransformerSupplierAdapterTest{color} (owner: Christo)
 # {color:#00875a}KTableSuppressProcessorMetricsTest{color} (owner: Christo)
 # {color:#00875a}ClientUtilsTest{color} (owner: Christo)
 # {color:#00875a}HighAvailabilityStreamsPartitionAssignorTest{color} (owner: 
Christo)
 # {color:#00875a}TopologyTest{color} (owner: Christo)
 # {color:#00875a}KTableSuppressProcessorTest{color} (owner: Christo)
 # {color:#00875a}ChangeLoggingSessionBytesStoreTest{color} (owner: Christo)
 # {color:#00875a}ChangeLoggingTimestampedWindowBytesStoreTest{color} (owner: 
Christo)
 # {color:#00875a}ChangeLoggingWindowBytesStoreTest{color} (owner: Christo)
 # {color:#00875a}MeteredTimestampedWindowStoreTest{color} (owner: Christo)
 # {color:#00875a}StreamsRebalanceListenerTest{color} (owner: Christo)
 # {color:#00875a}TimestampedKeyValueStoreMaterializerTest{color} (owner: 
Christo)
 # {color:#00875a}CachingInMemoryKeyValueStoreTest{color} (owner: Christo)
 # {color:#00875a}CachingInMemorySessionStoreTest{color} (owner: Christo)
 # {color:#00875a}CachingPersistentSessionStoreTest{color} (owner: Christo)
 # {color:#00875a}CachingPersistentWindowStoreTest{color} (owner: Christo)
 # {color:#00875a}ChangeLoggingKeyValueBytesStoreTest{color} (owner: Christo)
 # {color:#00875a}ChangeLoggingTimestampedKeyValueBytesStoreTest{color} (owner: 
Christo)
 # {color:#00875a}CompositeReadOnlyWindowStoreTest{color} (owner: Christo)
 # {color:#00875a}KeyValueStoreBuilderTest{color} (owner: Christo)
 # {color:#00875a}RocksDBStoreTest{color} (owner: Christo)
 # {color:#00875a}StreamThreadStateStoreProviderTest{color} (owner: Christo)
 # {color:#ff8b00}TaskManagerTest{color} (owner: Christo)
 # {color:#00875a}InternalTopicManagerTest{color} (owner: Christo)
 # {color:#00875a}ProcessorContextImplTest{color} (owner: Christo)
 # {color:#00875a}WriteConsistencyVectorTest{color} (owner: Christo)
 # {color:#00875a}StreamsAssignmentScaleTest{color} (owner: Christo)
 # {color:#00875a}StreamsPartitionAssignorTest{color} (owner: Christo)
 # {color:#00875a}AssignmentTestUtils{color} (owner: Christo)
 # {color:#ff8b00}ProcessorStateManagerTest{color} (owner: Matthew) (takeover: 
Christo)
 # {color:#ff8b00}StandbyTaskTest{color} (owner: Matthew)
 # {color:#ff8b00}StoreChangelogReaderTest{color} (owner: Matthew)
 # {color:#ff8b00}StreamTaskTest{color} (owner: Matthew)
 # {color:#ff8b00}StreamThreadTest{color} (owner: Matthew) (takeover: Christo)
 # {color:#ff8b00}StreamsMetricsImplTest{color} (owner: Dalibor) (Captured in 
https://issues.apache.org/jira/browse/KAFKA-12947)
 # {color:#00875a}TimeOrderedCachingPersistentWindowStoreTest{color} (owner: 
[~shekharrajak])
 # {color:#00875a}TimeOrderedWindowStoreTest{color} (owner: [~shekharrajak]) 
[https://github.com/apache/kafka/pull/12777] 
 # AbstractStreamTest
 # {color:#ff8b00}KStreamTransformValuesTest{color} (owner: Christo)
 # {color:#ff8b00}KTableImplTest{color} (owner: Christo)
 # {color:#ff8b00}KTableTransformValuesTest{color} (owner: Christo)
 # {color:#ff8b00}SessionCacheFlushListenerTest{color} (owner: Christo)
 # {color:#ff8b00}TimestampedCacheFlushListenerTest{color} (owner: Christo)
 # {color:#ff8b00}TimestampedTupleForwarderTest{color} (owner: Christo)
 # {color:#ff8b00}ActiveTaskCreatorTest{color} (owner: Christo)
 # {color:#ff8b00}ChangelogTopicsTest{color} (owner: Christo)
 # {color:#ff8b00}GlobalProcessorContextImplTest{color} (owner: Christo)
 # RecordCollectorTest (owner: Christo)
 # StateRestoreCallbackAdapterTest (owner: Christo)
 # StoreToProcessorContextAdapterTest (owner: Christo)
 # StreamsProducerTest (owner: Nelson)
 

[jira] [Updated] (KAFKA-14133) Remaining EasyMock to Mockito tests

2023-08-15 Thread Yash Mayya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yash Mayya updated KAFKA-14133:
---
Description: 
{color:#de350b}There are tests which use both PowerMock and EasyMock. I have 
put those in https://issues.apache.org/jira/browse/KAFKA-14132. Tests here rely 
solely on EasyMock.{color}

Unless stated in brackets the tests are in the streams module.

A list of tests which still require to be moved from EasyMock to Mockito as of 
2nd of August 2022 which do not have a Jira issue and do not have pull requests 
I am aware of which are opened:

{color:#ff8b00}In Review{color}
{color:#00875a}Merged{color}
 # {color:#00875a}WorkerConnectorTest{color} (connect) (owner: [~yash.mayya] )
 # {color:#00875a}WorkerCoordinatorTest{color} (connect) (owner: [~yash.mayya] )
 # {color:#00875a}RootResourceTest{color} (connect) (owner: [~yash.mayya] )
 # {color:#00875a}ByteArrayProducerRecordEquals{color} (connect) (owner: 
[~yash.mayya] )
 # {color:#00875a}KStreamFlatTransformTest{color} (owner: Christo)
 # {color:#00875a}KStreamFlatTransformValuesTest{color} (owner: Christo)
 # {color:#00875a}KStreamPrintTest{color} (owner: Christo)
 # {color:#00875a}KStreamRepartitionTest{color} (owner: Christo)
 # {color:#00875a}MaterializedInternalTest{color} (owner: Christo)
 # {color:#00875a}TransformerSupplierAdapterTest{color} (owner: Christo)
 # {color:#00875a}KTableSuppressProcessorMetricsTest{color} (owner: Christo)
 # {color:#00875a}ClientUtilsTest{color} (owner: Christo)
 # {color:#00875a}HighAvailabilityStreamsPartitionAssignorTest{color} (owner: 
Christo)
 # {color:#00875a}TopologyTest{color} (owner: Christo)
 # {color:#00875a}KTableSuppressProcessorTest{color} (owner: Christo)
 # {color:#00875a}ChangeLoggingSessionBytesStoreTest{color} (owner: Christo)
 # {color:#00875a}ChangeLoggingTimestampedWindowBytesStoreTest{color} (owner: 
Christo)
 # {color:#00875a}ChangeLoggingWindowBytesStoreTest{color} (owner: Christo)
 # {color:#00875a}MeteredTimestampedWindowStoreTest{color} (owner: Christo)
 # {color:#00875a}StreamsRebalanceListenerTest{color} (owner: Christo)
 # {color:#00875a}TimestampedKeyValueStoreMaterializerTest{color} (owner: 
Christo)
 # {color:#00875a}CachingInMemoryKeyValueStoreTest{color} (owner: Christo)
 # {color:#00875a}CachingInMemorySessionStoreTest{color} (owner: Christo)
 # {color:#00875a}CachingPersistentSessionStoreTest{color} (owner: Christo)
 # {color:#00875a}CachingPersistentWindowStoreTest{color} (owner: Christo)
 # {color:#00875a}ChangeLoggingKeyValueBytesStoreTest{color} (owner: Christo)
 # {color:#00875a}ChangeLoggingTimestampedKeyValueBytesStoreTest{color} (owner: 
Christo)
 # {color:#00875a}CompositeReadOnlyWindowStoreTest{color} (owner: Christo)
 # {color:#00875a}KeyValueStoreBuilderTest{color} (owner: Christo)
 # {color:#00875a}RocksDBStoreTest{color} (owner: Christo)
 # {color:#00875a}StreamThreadStateStoreProviderTest{color} (owner: Christo)
 # {color:#ff8b00}TaskManagerTest{color} (owner: Christo)
 # {color:#00875a}InternalTopicManagerTest{color} (owner: Christo)
 # {color:#00875a}ProcessorContextImplTest{color} (owner: Christo)
 # {color:#00875a}WriteConsistencyVectorTest{color} (owner: Christo)
 # {color:#00875a}StreamsAssignmentScaleTest{color} (owner: Christo)
 # {color:#00875a}StreamsPartitionAssignorTest{color} (owner: Christo)
 # {color:#00875a}AssignmentTestUtils{color} (owner: Christo)
 # {color:#ff8b00}ProcessorStateManagerTest{color} (owner: Matthew) (takeover: 
Christo)
 # {color:#ff8b00}StandbyTaskTest{color} (owner: Matthew)
 # {color:#ff8b00}StoreChangelogReaderTest{color} (owner: Matthew)
 # {color:#ff8b00}StreamTaskTest{color} (owner: Matthew)
 # {color:#ff8b00}StreamThreadTest{color} (owner: Matthew) (takeover: Christo)
 # {color:#ff8b00}StreamsMetricsImplTest{color} (owner: Dalibor) (Captured in 
https://issues.apache.org/jira/browse/KAFKA-12947)
 # {color:#00875a}TimeOrderedCachingPersistentWindowStoreTest{color} (owner: 
[~shekharrajak])
 # {color:#00875a}TimeOrderedWindowStoreTest{color} (owner: [~shekharrajak]) 
[https://github.com/apache/kafka/pull/12777] 
 # AbstractStreamTest
 # {color:#ff8b00}KStreamTransformValuesTest{color} (owner: Christo)
 # {color:#ff8b00}KTableImplTest{color} (owner: Christo)
 # {color:#ff8b00}KTableTransformValuesTest{color} (owner: Christo)
 # {color:#ff8b00}SessionCacheFlushListenerTest{color} (owner: Christo)
 # {color:#ff8b00}TimestampedCacheFlushListenerTest{color} (owner: Christo)
 # {color:#ff8b00}TimestampedTupleForwarderTest{color} (owner: Christo)
 # {color:#ff8b00}ActiveTaskCreatorTest{color} (owner: Christo)
 # {color:#ff8b00}ChangelogTopicsTest{color} (owner: Christo)
 # {color:#ff8b00}GlobalProcessorContextImplTest{color} (owner: Christo)
 # RecordCollectorTest (owner: Christo)
 # StateRestoreCallbackAdapterTest (owner: Christo)
 # StoreToProcessorContextAdapterTest (owner: Christo)
 # StreamsProducerTest (owner: Nelson)
 

[jira] [Updated] (KAFKA-14133) Remaining EasyMock to Mockito tests

2023-08-15 Thread Yash Mayya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yash Mayya updated KAFKA-14133:
---
Description: 
{color:#de350b}There are tests which use both PowerMock and EasyMock. I have 
put those in https://issues.apache.org/jira/browse/KAFKA-14132. Tests here rely 
solely on EasyMock.{color}

Unless stated in brackets the tests are in the streams module.

A list of tests which still require to be moved from EasyMock to Mockito as of 
2nd of August 2022 which do not have a Jira issue and do not have pull requests 
I am aware of which are opened:

{color:#ff8b00}In Review{color}
{color:#00875a}Merged{color}
 # {color:#00875a}WorkerConnectorTest{color} (connect) (owner: [~yash.mayya] )
 # {color:#00875a}WorkerCoordinatorTest{color} (connect) (owner: [~yash.mayya] )
 # {color:#00875a}RootResourceTest{color} (connect) (owner: [~yash.mayya] )
 # {color:#00875a}ByteArrayProducerRecordEquals{color} (connect) (owner: 
[~yash.mayya] )
 # {color:#00875a}KStreamFlatTransformTest{color} (owner: Christo)
 # {color:#00875a}KStreamFlatTransformValuesTest{color} (owner: Christo)
 # {color:#00875a}KStreamPrintTest{color} (owner: Christo)
 # {color:#00875a}KStreamRepartitionTest{color} (owner: Christo)
 # {color:#00875a}MaterializedInternalTest{color} (owner: Christo)
 # {color:#00875a}TransformerSupplierAdapterTest{color} (owner: Christo)
 # {color:#00875a}KTableSuppressProcessorMetricsTest{color} (owner: Christo)
 # {color:#00875a}ClientUtilsTest{color} (owner: Christo)
 # {color:#00875a}HighAvailabilityStreamsPartitionAssignorTest{color} (owner: 
Christo)
 # {color:#00875a}TopologyTest{color} (owner: Christo)
 # {color:#00875a}KTableSuppressProcessorTest{color} (owner: Christo)
 # {color:#00875a}ChangeLoggingSessionBytesStoreTest{color} (owner: Christo)
 # {color:#00875a}ChangeLoggingTimestampedWindowBytesStoreTest{color} (owner: 
Christo)
 # {color:#00875a}ChangeLoggingWindowBytesStoreTest{color} (owner: Christo)
 # {color:#00875a}MeteredTimestampedWindowStoreTest{color} (owner: Christo)
 # {color:#00875a}StreamsRebalanceListenerTest{color} (owner: Christo)
 # {color:#00875a}TimestampedKeyValueStoreMaterializerTest{color} (owner: 
Christo)
 # {color:#00875a}CachingInMemoryKeyValueStoreTest{color} (owner: Christo)
 # {color:#00875a}CachingInMemorySessionStoreTest{color} (owner: Christo)
 # {color:#00875a}CachingPersistentSessionStoreTest{color} (owner: Christo)
 # {color:#00875a}CachingPersistentWindowStoreTest{color} (owner: Christo)
 # {color:#00875a}ChangeLoggingKeyValueBytesStoreTest{color} (owner: Christo)
 # {color:#00875a}ChangeLoggingTimestampedKeyValueBytesStoreTest{color} (owner: 
Christo)
 # {color:#00875a}CompositeReadOnlyWindowStoreTest{color} (owner: Christo)
 # {color:#00875a}KeyValueStoreBuilderTest{color} (owner: Christo)
 # {color:#00875a}RocksDBStoreTest{color} (owner: Christo)
 # {color:#00875a}StreamThreadStateStoreProviderTest{color} (owner: Christo)
 # {color:#ff8b00}TaskManagerTest{color} (owner: Christo)
 # {color:#00875a}InternalTopicManagerTest{color} (owner: Christo)
 # {color:#00875a}ProcessorContextImplTest{color} (owner: Christo)
 # {color:#00875a}WriteConsistencyVectorTest{color} (owner: Christo)
 # {color:#00875a}StreamsAssignmentScaleTest{color} (owner: Christo)
 # {color:#00875a}StreamsPartitionAssignorTest{color} (owner: Christo)
 # {color:#00875a}AssignmentTestUtils{color} (owner: Christo)
 # {color:#ff8b00}ProcessorStateManagerTest{color} (owner: Matthew) (takeover: 
Christo)
 # {color:#ff8b00}StandbyTaskTest{color} (owner: Matthew)
 # {color:#ff8b00}StoreChangelogReaderTest{color} (owner: Matthew)
 # {color:#ff8b00}StreamTaskTest{color} (owner: Matthew)
 # {color:#ff8b00}StreamThreadTest{color} (owner: Matthew) (takeover: Christo)
 # {color:#ff8b00}StreamsMetricsImplTest{color} (owner: Dalibor) (Captured in 
https://issues.apache.org/jira/browse/KAFKA-12947)
 # {color:#00875a}TimeOrderedCachingPersistentWindowStoreTest{color} (owner: 
[~shekharrajak])
 # {color:#00875a}TimeOrderedWindowStoreTest{color} (owner: [~shekharrajak]) 
[https://github.com/apache/kafka/pull/12777] 
 # AbstractStreamTest
 # {color:#ff8b00}KStreamTransformValuesTest{color} (owner: Christo)
 # {color:#ff8b00}KTableImplTest{color} (owner: Christo)
 # {color:#ff8b00}KTableTransformValuesTest{color} (owner: Christo)
 # {color:#ff8b00}SessionCacheFlushListenerTest{color} (owner: Christo)
 # {color:#ff8b00}TimestampedCacheFlushListenerTest{color} (owner: Christo)
 # {color:#ff8b00}TimestampedTupleForwarderTest{color} (owner: Christo)
 # {color:#ff8b00}ActiveTaskCreatorTest{color} (owner: Christo)
 # {color:#ff8b00}ChangelogTopicsTest{color} (owner: Christo)
 # {color:#ff8b00}GlobalProcessorContextImplTest{color} (owner: Christo)
 # RecordCollectorTest (owner: Christo)
 # StateRestoreCallbackAdapterTest (owner: Christo)
 # StoreToProcessorContextAdapterTest (owner: Christo)
 # StreamsProducerTest (owner: Nelson)
 

[jira] [Updated] (KAFKA-14133) Remaining EasyMock to Mockito tests

2023-08-15 Thread Yash Mayya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yash Mayya updated KAFKA-14133:
---
Description: 
{color:#de350b}There are tests which use both PowerMock and EasyMock. I have 
put those in https://issues.apache.org/jira/browse/KAFKA-14132. Tests here rely 
solely on EasyMock.{color}

Unless stated in brackets the tests are in the streams module.

A list of tests which still require to be moved from EasyMock to Mockito as of 
2nd of August 2022 which do not have a Jira issue and do not have pull requests 
I am aware of which are opened:

{color:#ff8b00}In Review{color}
{color:#00875a}Merged{color}
 # {color:#00875a}WorkerConnectorTest{color} (connect) (owner: [~yash.mayya] )
 # {color:#00875a}WorkerCoordinatorTest{color} (connect) (owner: [~yash.mayya] )
 # {color:#00875a}RootResourceTest{color} (connect) (owner: [~yash.mayya] )
 # {color:#00875a}ByteArrayProducerRecordEquals{color} (connect) (owner: 
[~yash.mayya] )
 # {color:#00875a}KStreamFlatTransformTest{color} (owner: Christo)
 # {color:#00875a}KStreamFlatTransformValuesTest{color} (owner: Christo)
 # {color:#00875a}KStreamPrintTest{color} (owner: Christo)
 # {color:#00875a}KStreamRepartitionTest{color} (owner: Christo)
 # {color:#00875a}MaterializedInternalTest{color} (owner: Christo)
 # {color:#00875a}TransformerSupplierAdapterTest{color} (owner: Christo)
 # {color:#00875a}KTableSuppressProcessorMetricsTest{color} (owner: Christo)
 # {color:#00875a}ClientUtilsTest{color} (owner: Christo)
 # {color:#00875a}HighAvailabilityStreamsPartitionAssignorTest{color} (owner: 
Christo)
 # {color:#00875a}TopologyTest{color} (owner: Christo)
 # {color:#00875a}KTableSuppressProcessorTest{color} (owner: Christo)
 # {color:#00875a}ChangeLoggingSessionBytesStoreTest{color} (owner: Christo)
 # {color:#00875a}ChangeLoggingTimestampedWindowBytesStoreTest{color} (owner: 
Christo)
 # {color:#00875a}ChangeLoggingWindowBytesStoreTest{color} (owner: Christo)
 # {color:#00875a}MeteredTimestampedWindowStoreTest{color} (owner: Christo)
 # {color:#00875a}StreamsRebalanceListenerTest{color} (owner: Christo)
 # {color:#00875a}TimestampedKeyValueStoreMaterializerTest{color} (owner: 
Christo)
 # {color:#00875a}CachingInMemoryKeyValueStoreTest{color} (owner: Christo)
 # {color:#00875a}CachingInMemorySessionStoreTest{color} (owner: Christo)
 # {color:#00875a}CachingPersistentSessionStoreTest{color} (owner: Christo)
 # {color:#00875a}CachingPersistentWindowStoreTest{color} (owner: Christo)
 # {color:#00875a}ChangeLoggingKeyValueBytesStoreTest{color} (owner: Christo)
 # {color:#00875a}ChangeLoggingTimestampedKeyValueBytesStoreTest{color} (owner: 
Christo)
 # {color:#00875a}CompositeReadOnlyWindowStoreTest{color} (owner: Christo)
 # {color:#00875a}KeyValueStoreBuilderTest{color} (owner: Christo)
 # {color:#00875a}RocksDBStoreTest{color} (owner: Christo)
 # {color:#00875a}StreamThreadStateStoreProviderTest{color} (owner: Christo)
 # {color:#ff8b00}TaskManagerTest{color} (owner: Christo)
 # {color:#00875a}InternalTopicManagerTest{color} (owner: Christo)
 # {color:#00875a}ProcessorContextImplTest{color} (owner: Christo)
 # {color:#00875a}WriteConsistencyVectorTest{color} (owner: Christo)
 # {color:#00875a}StreamsAssignmentScaleTest{color} (owner: Christo)
 # {color:#00875a}StreamsPartitionAssignorTest{color} (owner: Christo)
 # {color:#00875a}AssignmentTestUtils{color} (owner: Christo)
 # {color:#ff8b00}ProcessorStateManagerTest{color} (owner: Matthew) (takeover: 
Christo)
 # {color:#ff8b00}StandbyTaskTest{color} (owner: Matthew)
 # {color:#ff8b00}StoreChangelogReaderTest{color} (owner: Matthew)
 # {color:#ff8b00}StreamTaskTest{color} (owner: Matthew)
 # {color:#ff8b00}StreamThreadTest{color} (owner: Matthew) (takeover: Christo)
 # {color:#ff8b00}StreamsMetricsImplTest{color} (owner: Dalibor) (Captured in 
https://issues.apache.org/jira/browse/KAFKA-12947)
 # {color:#00875a}TimeOrderedCachingPersistentWindowStoreTest{color} (owner: 
[~shekharrajak])
 # {color:#00875a}TimeOrderedWindowStoreTest{color} (owner: [~shekharrajak]) 
[https://github.com/apache/kafka/pull/12777] 
 # AbstractStreamTest
 # {color:#ff8b00}KStreamTransformValuesTest{color} (owner: Christo)
 # {color:#ff8b00}KTableImplTest{color} (owner: Christo)
 # {color:#ff8b00}KTableTransformValuesTest{color} (owner: Christo)
 # {color:#ff8b00}SessionCacheFlushListenerTest{color} (owner: Christo)
 # {color:#ff8b00}TimestampedCacheFlushListenerTest{color} (owner: Christo)
 # {color:#ff8b00}TimestampedTupleForwarderTest{color} (owner: Christo)
 # {color:#ff8b00}ActiveTaskCreatorTest{color} (owner: Christo)
 # {color:#ff8b00}ChangelogTopicsTest{color} (owner: Christo)
 # {color:#ff8b00}GlobalProcessorContextImplTest{color} (owner: Christo)
 # RecordCollectorTest (owner: Christo)
 # StateRestoreCallbackAdapterTest (owner: Christo)
 # StoreToProcessorContextAdapterTest (owner: Christo)
 # StreamsProducerTest (owner: Nelson)
 

[jira] [Updated] (KAFKA-14132) Remaining PowerMock to Mockito tests

2023-08-11 Thread Yash Mayya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yash Mayya updated KAFKA-14132:
---
Description: 
{color:#de350b}Some of the tests below use EasyMock as well. For those migrate 
both PowerMock and EasyMock to Mockito.{color}

Unless stated in brackets the tests are in the connect module.

A list of tests which still require to be moved from PowerMock to Mockito as of 
2nd of August 2022 which do not have a Jira issue and do not have pull requests 
I am aware of which are opened:

{color:#ff8b00}InReview{color}
{color:#00875a}Merged{color}
 # {color:#00875a}ErrorHandlingTaskTest{color} (owner: [~shekharrajak])
 # {color:#00875a}SourceTaskOffsetCommiterTest{color} (owner: Christo)
 # {color:#00875a}WorkerMetricsGroupTest{color} (owner: Divij)
 # {color:#00875a}WorkerTaskTest{color} (owner: [~yash.mayya])
 # {color:#00875a}ErrorReporterTest{color} (owner: [~yash.mayya])
 # {color:#00875a}RetryWithToleranceOperatorTest{color} (owner: [~yash.mayya])
 # {color:#00875a}WorkerErrantRecordReporterTest{color} (owner: [~yash.mayya])
 # {color:#00875a}ConnectorsResourceTest{color} (owner: [~mdedetrich-aiven])
 # {color:#ff8b00}StandaloneHerderTest{color} (owner: [~mdedetrich-aiven]) 
([https://github.com/apache/kafka/pull/12728])
 # KafkaConfigBackingStoreTest (owner: [~mdedetrich-aiven])
 # {color:#00875a}KafkaOffsetBackingStoreTest{color} (owner: Christo) 
([https://github.com/apache/kafka/pull/12418])
 # {color:#00875a}KafkaBasedLogTest{color} (owner: @bachmanity ])
 # {color:#00875a}RetryUtilTest{color} (owner: [~yash.mayya])
 # {color:#00875a}RepartitionTopicTest{color} (streams) (owner: Christo)
 # {color:#00875a}StateManagerUtilTest{color} (streams) (owner: Christo)

*The coverage report for the above tests after the change should be >= to what 
the coverage is now.*

  was:
{color:#de350b}Some of the tests below use EasyMock as well. For those migrate 
both PowerMock and EasyMock to Mockito.{color}

Unless stated in brackets the tests are in the connect module.

A list of tests which still require to be moved from PowerMock to Mockito as of 
2nd of August 2022 which do not have a Jira issue and do not have pull requests 
I am aware of which are opened:

{color:#ff8b00}InReview{color}
{color:#00875a}Merged{color}
 # {color:#00875a}ErrorHandlingTaskTest{color} (owner: [~shekharrajak])
 # {color:#00875a}SourceTaskOffsetCommiterTest{color} (owner: Christo)
 # {color:#00875a}WorkerMetricsGroupTest{color} (owner: Divij)
 # {color:#00875a}WorkerTaskTest{color} (owner: [~yash.mayya])
 # {color:#00875a}ErrorReporterTest{color} (owner: [~yash.mayya])
 # {color:#00875a}RetryWithToleranceOperatorTest{color} (owner: [~yash.mayya])
 # {color:#00875a}WorkerErrantRecordReporterTest{color} (owner: [~yash.mayya])
 # {color:#00875a}ConnectorsResourceTest{color} (owner: [~mdedetrich-aiven])
 # {color:#ff8b00}StandaloneHerderTest{color} (owner: [~mdedetrich-aiven]) 
([https://github.com/apache/kafka/pull/12728])
 # KafkaConfigBackingStoreTest (owner: [~mdedetrich-aiven])
 # {color:#00875a}KafkaOffsetBackingStoreTest{color} (owner: Christo) 
([https://github.com/apache/kafka/pull/12418])
 # {color:#ff8b00}KafkaBasedLogTest{color} (owner: @bachmanity ])
 # RetryUtilTest (owner: [~mdedetrich-aiven] )
 # {color:#00875a}RepartitionTopicTest{color} (streams) (owner: Christo)
 # {color:#00875a}StateManagerUtilTest{color} (streams) (owner: Christo)

*The coverage report for the above tests after the change should be >= to what 
the coverage is now.*


> Remaining PowerMock to Mockito tests
> 
>
> Key: KAFKA-14132
> URL: https://issues.apache.org/jira/browse/KAFKA-14132
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Christo Lolov
>Assignee: Christo Lolov
>Priority: Major
>
> {color:#de350b}Some of the tests below use EasyMock as well. For those 
> migrate both PowerMock and EasyMock to Mockito.{color}
> Unless stated in brackets the tests are in the connect module.
> A list of tests which still require to be moved from PowerMock to Mockito as 
> of 2nd of August 2022 which do not have a Jira issue and do not have pull 
> requests I am aware of which are opened:
> {color:#ff8b00}InReview{color}
> {color:#00875a}Merged{color}
>  # {color:#00875a}ErrorHandlingTaskTest{color} (owner: [~shekharrajak])
>  # {color:#00875a}SourceTaskOffsetCommiterTest{color} (owner: Christo)
>  # {color:#00875a}WorkerMetricsGroupTest{color} (owner: Divij)
>  # {color:#00875a}WorkerTaskTest{color} (owner: [~yash.mayya])
>  # {color:#00875a}ErrorReporterTest{color} (owner: [~yash.mayya])
>  # {color:#00875a}RetryWithToleranceOperatorTest{color} (owner: [~yash.mayya])
>  # {color:#00875a}WorkerErrantRecordReporterTest{color} (owner: [~yash.mayya])
>  # {color:#00875a}ConnectorsResourceTest{color} (owner: [~mdedetrich-aiven])
> 

[jira] [Commented] (KAFKA-10457) JsonConverter.toConnectData trims BigInteger to Long for schema-less case

2023-07-25 Thread Yash Mayya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17746865#comment-17746865
 ] 

Yash Mayya commented on KAFKA-10457:


If we want to add a new Connect schema type to accommodate big numbers, it'll 
probably require a small KIP since the types are a part of the public API. This 
ticket probably hasn't received more attention because a workaround exists with 
the 
[Decimal|https://github.com/apache/kafka/blob/c7de30f38bfd6e2d62a0b5c09b5dc9707e58096b/connect/api/src/main/java/org/apache/kafka/connect/data/Decimal.java#L40]
 class when schemas are enabled.

> JsonConverter.toConnectData trims BigInteger to Long for schema-less case
> -
>
> Key: KAFKA-10457
> URL: https://issues.apache.org/jira/browse/KAFKA-10457
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Oleksandr Diachenko
>Assignee: Oleksandr Diachenko
>Priority: Critical
>
>  
> When _JsonConverter_ is configured with _schemas.enable=false_ and value, 
> exceeding _Double_ is passed, the result is incorrect since the converter 
> trims it to _Double:_
> {code:java}
> Map props = Collections.singletonMap("schemas.enable", 
> false);
> converter.configure(props, true);
> BigInteger value = BigInteger.valueOf(Long.MAX_VALUE).add(new 
> BigInteger("1"));
> String msg = value.toString();
> SchemaAndValue schemaAndValue = converter.toConnectData(TOPIC, 
> msg.getBytes());
> assertNull(schemaAndValue.schema());
> assertEquals(value, schemaAndValue.value());
> {code}
>  
>  Fails with:
>  
> {code:java}
> expected:<9223372036854775808> but was:<-9223372036854775808>
> Expected :9223372036854775808
> Actual :-9223372036854775808
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] (KAFKA-15238) Connect workers can be disabled by DLQ-related blocking admin client calls

2023-07-24 Thread Yash Mayya (Jira)


[ https://issues.apache.org/jira/browse/KAFKA-15238 ]


Yash Mayya deleted comment on KAFKA-15238:


was (Author: yash.mayya):
https://github.com/apache/kafka/pull/14079

> Connect workers can be disabled by DLQ-related blocking admin client calls
> --
>
> Key: KAFKA-15238
> URL: https://issues.apache.org/jira/browse/KAFKA-15238
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Yash Mayya
>Assignee: Yash Mayya
>Priority: Major
>
> When Kafka Connect is run in distributed mode - if a sink connector's task is 
> restarted (via a worker's REST API), the following sequence of steps will 
> occur (on the DistributedHerder's thread):
>  
>  # The existing sink task will be stopped 
> ([ref|https://github.com/apache/kafka/blob/4981fa939d588645401619bfc3e321dc523d10e7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1367])
>  # A new sink task will be started 
> ([ref|https://github.com/apache/kafka/blob/4981fa939d588645401619bfc3e321dc523d10e7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1867C40-L1867C40])
>  # As a part of the above step, a new {{WorkerSinkTask}} will be instantiated 
> ([ref|https://github.com/apache/kafka/blob/4981fa939d588645401619bfc3e321dc523d10e7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L656-L663])
>  # The DLQ reporter (see 
> [KIP-298|https://cwiki.apache.org/confluence/display/KAFKA/KIP-298%3A+Error+Handling+in+Connect])
>  for the sink task is also instantiated and configured as a part of this 
> ([ref|https://github.com/apache/kafka/blob/4981fa939d588645401619bfc3e321dc523d10e7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L1800])
>  # The DLQ reporter setup involves two synchronous admin client calls to list 
> topics and create the DLQ topic if it isn't already created 
> ([ref|https://github.com/apache/kafka/blob/4981fa939d588645401619bfc3e321dc523d10e7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java#L84-L87])
>  
> All of these are occurring synchronously on the herder's tick thread - in 
> this portion 
> [here|https://github.com/apache/kafka/blob/4981fa939d588645401619bfc3e321dc523d10e7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L457-L469]
>  where external requests are run. If the admin client call in the DLQ 
> reporter setup step blocks for some time (due to auth failures and retries or 
> network issues or whatever other reason), this can cause the Connect worker 
> to become non-functional (REST API requests will timeout) and even fall out 
> of the Connect cluster and become a zombie (since the tick thread also drives 
> group membership functions - see 
> [here|https://github.com/apache/kafka/blob/4981fa939d588645401619bfc3e321dc523d10e7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L403],
>  
> [here|https://github.com/apache/kafka/blob/4981fa939d588645401619bfc3e321dc523d10e7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L535]).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15238) Connect workers can be disabled by DLQ-related blocking admin client calls

2023-07-24 Thread Yash Mayya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15238?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yash Mayya updated KAFKA-15238:
---
Description: 
When Kafka Connect is run in distributed mode - if a sink connector's task is 
restarted (via a worker's REST API), the following sequence of steps will occur 
(on the DistributedHerder's thread):

 
 # The existing sink task will be stopped 
([ref|https://github.com/apache/kafka/blob/4981fa939d588645401619bfc3e321dc523d10e7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1367])
 # A new sink task will be started 
([ref|https://github.com/apache/kafka/blob/4981fa939d588645401619bfc3e321dc523d10e7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1867C40-L1867C40])
 # As a part of the above step, a new {{WorkerSinkTask}} will be instantiated 
([ref|https://github.com/apache/kafka/blob/4981fa939d588645401619bfc3e321dc523d10e7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L656-L663])
 # The DLQ reporter (see 
[KIP-298|https://cwiki.apache.org/confluence/display/KAFKA/KIP-298%3A+Error+Handling+in+Connect])
 for the sink task is also instantiated and configured as a part of this 
([ref|https://github.com/apache/kafka/blob/4981fa939d588645401619bfc3e321dc523d10e7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L1800])
 # The DLQ reporter setup involves two synchronous admin client calls to list 
topics and create the DLQ topic if it isn't already created 
([ref|https://github.com/apache/kafka/blob/4981fa939d588645401619bfc3e321dc523d10e7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java#L84-L87])

 

All of these are occurring synchronously on the herder's tick thread - in this 
portion 
[here|https://github.com/apache/kafka/blob/4981fa939d588645401619bfc3e321dc523d10e7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L457-L469]
 where external requests are run. If the admin client call in the DLQ reporter 
setup step blocks for some time (due to auth failures and retries or network 
issues or whatever other reason), this can cause the Connect worker to become 
non-functional (REST API requests will timeout) and even fall out of the 
Connect cluster and become a zombie (since the tick thread also drives group 
membership functions - see 
[here|https://github.com/apache/kafka/blob/4981fa939d588645401619bfc3e321dc523d10e7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L403],
 
[here|https://github.com/apache/kafka/blob/4981fa939d588645401619bfc3e321dc523d10e7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L535]).

  was:
When Kafka Connect is run in distributed mode - if a sink connector's task is 
restarted (via a worker's REST API), the following sequence of steps will occur 
(on the DistributedHerder's thread):

 
 # The existing sink task will be stopped 
([ref|https://github.com/apache/kafka/blob/4981fa939d588645401619bfc3e321dc523d10e7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1367])
 # A new sink task will be started 
([ref|https://github.com/apache/kafka/blob/4981fa939d588645401619bfc3e321dc523d10e7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1867C40-L1867C40])
 # As a part of the above step, a new {{WorkerSinkTask}} will be instantiated 
([ref|https://github.com/apache/kafka/blob/4981fa939d588645401619bfc3e321dc523d10e7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L656-L663])
 # The DLQ reporter (see 
[KIP-298|https://cwiki.apache.org/confluence/display/KAFKA/KIP-298%3A+Error+Handling+in+Connect])
 for the sink task is also instantiated and configured as a part of this 
([ref|https://github.com/apache/kafka/blob/4981fa939d588645401619bfc3e321dc523d10e7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L1800])
 # The DLQ reporter setup involves two synchronous admin client calls to list 
topics and create the DLQ topic if it isn't already created 
([ref|https://github.com/apache/kafka/blob/4981fa939d588645401619bfc3e321dc523d10e7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java#L84-L87])

 

All of these are occurring synchronously on the herder's tick thread - in this 
portion 
[here|https://github.com/apache/kafka/blob/4981fa939d588645401619bfc3e321dc523d10e7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L457-L469]
 where external requests are run. If the admin client call in the DLQ reporter 
setup step blocks for some time (due to auth failures and retries or network 
issues or 

[jira] [Updated] (KAFKA-15238) Connect workers can be disabled by DLQ-related blocking admin client calls

2023-07-24 Thread Yash Mayya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15238?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yash Mayya updated KAFKA-15238:
---
Summary: Connect workers can be disabled by DLQ-related blocking admin 
client calls  (was: Connect workers can be disabled by DLQ related stuck admin 
client calls)

> Connect workers can be disabled by DLQ-related blocking admin client calls
> --
>
> Key: KAFKA-15238
> URL: https://issues.apache.org/jira/browse/KAFKA-15238
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Yash Mayya
>Assignee: Yash Mayya
>Priority: Major
>
> When Kafka Connect is run in distributed mode - if a sink connector's task is 
> restarted (via a worker's REST API), the following sequence of steps will 
> occur (on the DistributedHerder's thread):
>  
>  # The existing sink task will be stopped 
> ([ref|https://github.com/apache/kafka/blob/4981fa939d588645401619bfc3e321dc523d10e7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1367])
>  # A new sink task will be started 
> ([ref|https://github.com/apache/kafka/blob/4981fa939d588645401619bfc3e321dc523d10e7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1867C40-L1867C40])
>  # As a part of the above step, a new {{WorkerSinkTask}} will be instantiated 
> ([ref|https://github.com/apache/kafka/blob/4981fa939d588645401619bfc3e321dc523d10e7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L656-L663])
>  # The DLQ reporter (see 
> [KIP-298|https://cwiki.apache.org/confluence/display/KAFKA/KIP-298%3A+Error+Handling+in+Connect])
>  for the sink task is also instantiated and configured as a part of this 
> ([ref|https://github.com/apache/kafka/blob/4981fa939d588645401619bfc3e321dc523d10e7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L1800])
>  # The DLQ reporter setup involves two synchronous admin client calls to list 
> topics and create the DLQ topic if it isn't already created 
> ([ref|https://github.com/apache/kafka/blob/4981fa939d588645401619bfc3e321dc523d10e7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java#L84-L87])
>  
> All of these are occurring synchronously on the herder's tick thread - in 
> this portion 
> [here|https://github.com/apache/kafka/blob/4981fa939d588645401619bfc3e321dc523d10e7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L457-L469]
>  where external requests are run. If the admin client call in the DLQ 
> reporter setup step blocks for some time (due to auth failures and retries or 
> network issues or whatever other reason), this can cause the Connect worker 
> to become non-functional (REST API requests will timeout) and even fall out 
> of the Connect cluster and become a zombie (since the tick thread also drives 
> group membership functions).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15238) Connect workers can be disabled by DLQ related stuck admin client calls

2023-07-24 Thread Yash Mayya (Jira)
Yash Mayya created KAFKA-15238:
--

 Summary: Connect workers can be disabled by DLQ related stuck 
admin client calls
 Key: KAFKA-15238
 URL: https://issues.apache.org/jira/browse/KAFKA-15238
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: Yash Mayya
Assignee: Yash Mayya


When Kafka Connect is run in distributed mode - if a sink connector's task is 
restarted (via a worker's REST API), the following sequence of steps will occur 
(on the DistributedHerder's thread):

 
 # The existing sink task will be stopped 
([ref|https://github.com/apache/kafka/blob/4981fa939d588645401619bfc3e321dc523d10e7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1367])
 # A new sink task will be started 
([ref|https://github.com/apache/kafka/blob/4981fa939d588645401619bfc3e321dc523d10e7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1867C40-L1867C40])
 # As a part of the above step, a new {{WorkerSinkTask}} will be instantiated 
([ref|https://github.com/apache/kafka/blob/4981fa939d588645401619bfc3e321dc523d10e7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L656-L663])
 # The DLQ reporter (see 
[KIP-298|https://cwiki.apache.org/confluence/display/KAFKA/KIP-298%3A+Error+Handling+in+Connect])
 for the sink task is also instantiated and configured as a part of this 
([ref|https://github.com/apache/kafka/blob/4981fa939d588645401619bfc3e321dc523d10e7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L1800])
 # The DLQ reporter setup involves two synchronous admin client calls to list 
topics and create the DLQ topic if it isn't already created 
([ref|https://github.com/apache/kafka/blob/4981fa939d588645401619bfc3e321dc523d10e7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java#L84-L87])

 

All of these are occurring synchronously on the herder's tick thread - in this 
portion 
[here|https://github.com/apache/kafka/blob/4981fa939d588645401619bfc3e321dc523d10e7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L457-L469]
 where external requests are run. If the admin client call in the DLQ reporter 
setup step blocks for some time (due to auth failures and retries or network 
issues or whatever other reason), this can cause the Connect worker to become 
non-functional (REST API requests will timeout) and even fall out of the 
Connect cluster and become a zombie (since the tick thread also drives group 
membership functions).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15216) InternalSinkRecord::newRecord method ignores the headers argument

2023-07-19 Thread Yash Mayya (Jira)
Yash Mayya created KAFKA-15216:
--

 Summary: InternalSinkRecord::newRecord method ignores the headers 
argument
 Key: KAFKA-15216
 URL: https://issues.apache.org/jira/browse/KAFKA-15216
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: Yash Mayya
Assignee: Yash Mayya


[https://github.com/apache/kafka/blob/a1f6ab69387deb10988461152a0087f0cd2827c4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/InternalSinkRecord.java#L50-L56]
 - the headers argument passed to the {{InternalSinkRecord}} constructor is the 
instance field via the accessor {{headers()}} method instead of the 
{{newRecord}} method's {{headers}} argument value.

 

Originally discovered 
[here.|https://github.com/apache/kafka/pull/14024#discussion_r1266917499]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15182) Normalize offsets before invoking SourceConnector::alterOffsets

2023-07-14 Thread Yash Mayya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15182?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yash Mayya updated KAFKA-15182:
---
Description: 
See discussion 
[here|https://github.com/apache/kafka/pull/13945#discussion_r1260946148]

 

TLDR: When users attempt to externally modify source connector offsets via the 
{{PATCH /offsets}} endpoint (introduced in 
[KIP-875|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect]),
 type mismatches can occur between offsets passed to 
{{SourceConnector::alterOffsets}} and the offsets that are retrieved by 
connectors / tasks via an instance of {{OffsetStorageReader}} after the offsets 
have been modified. In order to prevent this type mismatch that could lead to 
subtle bugs in connectors, we could serialize + deserialize the offsets using 
the worker's internal JSON converter before invoking 
{{{}SourceConnector::alterOffsets{}}}.

  was:
See discussion 
[here|https://github.com/apache/kafka/pull/13945#discussion_r1260946148]

 

TLDR: When users attempt to externally modify source connector offsets via the 
{{PATCH /offsets}} endpoint (introduced in 
[KIP-875|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect]),
 type mismatches can occur between offsets passed to 
{{SourceConnector::alterOffsets}} and the offsets that are retrieved by 
connectors / tasks via an instance of {{OffsetStorageReader }}after the offsets 
have been modified. In order to prevent this type mismatch that could lead to 
subtle bugs in connectors, we could serialize + deserialize the offsets using 
the worker's internal JSON converter before invoking 
{{{}SourceConnector::alterOffsets{}}}.


> Normalize offsets before invoking SourceConnector::alterOffsets
> ---
>
> Key: KAFKA-15182
> URL: https://issues.apache.org/jira/browse/KAFKA-15182
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 3.6.0
>Reporter: Yash Mayya
>Assignee: Yash Mayya
>Priority: Major
> Fix For: 3.6.0
>
>
> See discussion 
> [here|https://github.com/apache/kafka/pull/13945#discussion_r1260946148]
>  
> TLDR: When users attempt to externally modify source connector offsets via 
> the {{PATCH /offsets}} endpoint (introduced in 
> [KIP-875|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect]),
>  type mismatches can occur between offsets passed to 
> {{SourceConnector::alterOffsets}} and the offsets that are retrieved by 
> connectors / tasks via an instance of {{OffsetStorageReader}} after the 
> offsets have been modified. In order to prevent this type mismatch that could 
> lead to subtle bugs in connectors, we could serialize + deserialize the 
> offsets using the worker's internal JSON converter before invoking 
> {{{}SourceConnector::alterOffsets{}}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15182) Normalize offsets before invoking SourceConnector::alterOffsets

2023-07-12 Thread Yash Mayya (Jira)
Yash Mayya created KAFKA-15182:
--

 Summary: Normalize offsets before invoking 
SourceConnector::alterOffsets
 Key: KAFKA-15182
 URL: https://issues.apache.org/jira/browse/KAFKA-15182
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Reporter: Yash Mayya
Assignee: Yash Mayya


See discussion 
[here|https://github.com/apache/kafka/pull/13945#discussion_r1260946148]

 

TLDR: When users attempt to externally modify source connector offsets via the 
{{PATCH /offsets}} endpoint (introduced in 
[KIP-875|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect]),
 type mismatches can occur between offsets passed to 
{{SourceConnector::alterOffsets}} and the offsets that are retrieved by 
connectors / tasks via an instance of {{OffsetStorageReader }}after the offsets 
have been modified. In order to prevent this type mismatch that could lead to 
subtle bugs in connectors, we could serialize + deserialize the offsets using 
the worker's internal JSON converter before invoking 
{{{}SourceConnector::alterOffsets{}}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15179) Add integration tests for the FileStream Sink and Source connectors

2023-07-11 Thread Yash Mayya (Jira)
Yash Mayya created KAFKA-15179:
--

 Summary: Add integration tests for the FileStream Sink and Source 
connectors
 Key: KAFKA-15179
 URL: https://issues.apache.org/jira/browse/KAFKA-15179
 Project: Kafka
  Issue Type: Improvement
Reporter: Yash Mayya
Assignee: Yash Mayya


Add integration tests for the FileStream Sink and Source connectors covering 
various different common scenarios.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15177) MirrorMaker 2 should implement the alterOffsets KIP-875 API

2023-07-11 Thread Yash Mayya (Jira)
Yash Mayya created KAFKA-15177:
--

 Summary: MirrorMaker 2 should implement the alterOffsets KIP-875 
API
 Key: KAFKA-15177
 URL: https://issues.apache.org/jira/browse/KAFKA-15177
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect, mirrormaker
Reporter: Yash Mayya


The {{MirrorSourceConnector}} class should implement the new alterOffsets API 
added in 
[KIP-875|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect].
 We could also implement the API in 
{{MirrorCheckpointConnector}} and 
{{MirrorHeartbeatConnector}} to prevent external modification of offsets since 
the operation wouldn't really make sense in their case.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14353) Kafka Connect REST API configuration validation timeout improvements

2023-07-10 Thread Yash Mayya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14353?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yash Mayya updated KAFKA-14353:
---
Priority: Minor  (was: Major)

> Kafka Connect REST API configuration validation timeout improvements
> 
>
> Key: KAFKA-14353
> URL: https://issues.apache.org/jira/browse/KAFKA-14353
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Yash Mayya
>Assignee: Yash Mayya
>Priority: Minor
>  Labels: kip-required
>
> Kafka Connect currently defines a default REST API request timeout of [90 
> seconds|https://github.com/apache/kafka/blob/5e399fe6f3aa65b42b9cdbf1c4c53f6989a570f0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectResource.java#L30].
>  If a REST API request takes longer than this timeout value, a {{500 Internal 
> Server Error}}  response is returned with the message "Request timed out".
> The {{POST /connectors}}  and the {{PUT /connectors/\{connector}/config}}  
> endpoints that are used to create or update connectors internally do a 
> connector configuration validation (the details of which vary depending on 
> the connector plugin) before proceeding to write a message to the Connect 
> cluster's config topic. If the configuration validation takes longer than 90 
> seconds, the connector is still eventually created after the config 
> validation completes (even though a {{500 Internal Server Error}}  response 
> is returned to the user) which leads to a fairly confusing user experience.
> Furthermore, this situation is exacerbated by the potential for config 
> validations occurring twice for a single request. If Kafka Connect is running 
> in distributed mode, requests to create or update a connector are forwarded 
> to the Connect worker which is currently the leader of the group, if the 
> initial request is made to a worker which is not the leader. In this case, 
> the config validation occurs both on the initial worker, as well as the 
> leader (assuming that the first config validation is successful) - this means 
> that if a config validation takes longer than 45 seconds to complete each 
> time, it will result in the original create / update connector request timing 
> out.
> Slow config validations can occur in certain exceptional scenarios - consider 
> a database connector which has elaborate validation logic involving querying 
> information schema to get a list of tables and views to validate the user's 
> connector configuration. If the database has a very high number of tables and 
> views and the database is under a heavy load in terms of query volume, such 
> information schema queries can end up being considerably slow to complete.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14353) Kafka Connect REST API configuration validation timeout improvements

2023-07-10 Thread Yash Mayya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14353?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yash Mayya updated KAFKA-14353:
---
Description: 
Kafka Connect currently defines a default REST API request timeout of [90 
seconds|https://github.com/apache/kafka/blob/5e399fe6f3aa65b42b9cdbf1c4c53f6989a570f0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectResource.java#L30].
 If a REST API request takes longer than this timeout value, a {{500 Internal 
Server Error}}  response is returned with the message "Request timed out".

The {{POST /connectors}}  and the {{PUT /connectors/\{connector}/config}}  
endpoints that are used to create or update connectors internally do a 
connector configuration validation (the details of which vary depending on the 
connector plugin) before proceeding to write a message to the Connect cluster's 
config topic. If the configuration validation takes longer than 90 seconds, the 
connector is still eventually created after the config validation completes 
(even though a {{500 Internal Server Error}}  response is returned to the user) 
which leads to a fairly confusing user experience.

Furthermore, this situation is exacerbated by the potential for config 
validations occurring twice for a single request. If Kafka Connect is running 
in distributed mode, requests to create or update a connector are forwarded to 
the Connect worker which is currently the leader of the group, if the initial 
request is made to a worker which is not the leader. In this case, the config 
validation occurs both on the initial worker, as well as the leader (assuming 
that the first config validation is successful) - this means that if a config 
validation takes longer than 45 seconds to complete each time, it will result 
in the original create / update connector request timing out.

Slow config validations can occur in certain exceptional scenarios - consider a 
database connector which has elaborate validation logic involving querying 
information schema to get a list of tables and views to validate the user's 
connector configuration. If the database has a very high number of tables and 
views and the database is under a heavy load in terms of query volume, such 
information schema queries can end up being considerably slow to complete.

  was:Kafka Connect currently defines a default REST API request timeout of [90 
seconds|https://github.com/apache/kafka/blob/5e399fe6f3aa65b42b9cdbf1c4c53f6989a570f0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectResource.java#L30]
 which isn't configurable. If a REST API request takes longer than this, a 
{{500 Internal Server Error}}  response is returned with the message "Request 
timed out". In exceptional scenarios, a longer timeout may be required for 
operations such as connector config validation / connector creation (which 
internally does a config validation first). We should allow the request timeout 
to be configurable via a Kafka Connect worker property.


> Kafka Connect REST API configuration validation timeout improvements
> 
>
> Key: KAFKA-14353
> URL: https://issues.apache.org/jira/browse/KAFKA-14353
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Yash Mayya
>Assignee: Yash Mayya
>Priority: Major
>  Labels: kip-required
>
> Kafka Connect currently defines a default REST API request timeout of [90 
> seconds|https://github.com/apache/kafka/blob/5e399fe6f3aa65b42b9cdbf1c4c53f6989a570f0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectResource.java#L30].
>  If a REST API request takes longer than this timeout value, a {{500 Internal 
> Server Error}}  response is returned with the message "Request timed out".
> The {{POST /connectors}}  and the {{PUT /connectors/\{connector}/config}}  
> endpoints that are used to create or update connectors internally do a 
> connector configuration validation (the details of which vary depending on 
> the connector plugin) before proceeding to write a message to the Connect 
> cluster's config topic. If the configuration validation takes longer than 90 
> seconds, the connector is still eventually created after the config 
> validation completes (even though a {{500 Internal Server Error}}  response 
> is returned to the user) which leads to a fairly confusing user experience.
> Furthermore, this situation is exacerbated by the potential for config 
> validations occurring twice for a single request. If Kafka Connect is running 
> in distributed mode, requests to create or update a connector are forwarded 
> to the Connect worker which is currently the leader of the group, if the 
> initial request is made to a worker which is not 

[jira] [Updated] (KAFKA-14353) Kafka Connect REST API configuration validation timeout improvements

2023-07-10 Thread Yash Mayya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14353?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yash Mayya updated KAFKA-14353:
---
Summary: Kafka Connect REST API configuration validation timeout 
improvements  (was: Make Kafka Connect REST API request timeouts configurable)

> Kafka Connect REST API configuration validation timeout improvements
> 
>
> Key: KAFKA-14353
> URL: https://issues.apache.org/jira/browse/KAFKA-14353
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Yash Mayya
>Assignee: Yash Mayya
>Priority: Major
>  Labels: kip-required
>
> Kafka Connect currently defines a default REST API request timeout of [90 
> seconds|https://github.com/apache/kafka/blob/5e399fe6f3aa65b42b9cdbf1c4c53f6989a570f0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectResource.java#L30]
>  which isn't configurable. If a REST API request takes longer than this, a 
> {{500 Internal Server Error}}  response is returned with the message "Request 
> timed out". In exceptional scenarios, a longer timeout may be required for 
> operations such as connector config validation / connector creation (which 
> internally does a config validation first). We should allow the request 
> timeout to be configurable via a Kafka Connect worker property.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15151) Missing connector-stopped-task-count metric

2023-07-06 Thread Yash Mayya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17740554#comment-17740554
 ] 

Yash Mayya commented on KAFKA-15151:


When a connector is stopped, all the tasks are shutdown - i.e. the task count 
would become 0. So I don't think that such a metric would make sense here? 

> Missing connector-stopped-task-count metric
> ---
>
> Key: KAFKA-15151
> URL: https://issues.apache.org/jira/browse/KAFKA-15151
> Project: Kafka
>  Issue Type: Task
>  Components: KafkaConnect
>Reporter: Mickael Maison
>Assignee: Yash Mayya
>Priority: Major
>
> We have task-count metrics for all other states but when adding the STOPPED 
> state we did not add the respective metric.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15151) Missing connector-stopped-task-count metric

2023-07-06 Thread Yash Mayya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yash Mayya reassigned KAFKA-15151:
--

Assignee: Yash Mayya

> Missing connector-stopped-task-count metric
> ---
>
> Key: KAFKA-15151
> URL: https://issues.apache.org/jira/browse/KAFKA-15151
> Project: Kafka
>  Issue Type: Task
>  Components: KafkaConnect
>Reporter: Mickael Maison
>Assignee: Yash Mayya
>Priority: Major
>
> We have task-count metrics for all other states but when adding the STOPPED 
> state we did not add the respective metric.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15145) AbstractWorkerSourceTask re-processes records filtered out by SMTs on retriable exceptions

2023-07-04 Thread Yash Mayya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15145?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yash Mayya updated KAFKA-15145:
---
Description: If a RetriableException is thrown from an admin client or 
producer client operation in 
[AbstractWorkerSourceTask::sendRecords|https://github.com/apache/kafka/blob/5c2492bca71200806ccf776ea31639a90290d43e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L388],
 the send operation is retried for the remaining records in the batch. There is 
a bug in the logic for computing the remaining records in a batch which causes 
records that are filtered out by the task's transformation chain to be 
re-processed. This will also result in the SourceTask::commitRecord method 
being called twice for the same record, which can cause certain types of source 
connectors to fail. This bug seems to exist since when SMTs were first 
introduced in 0.10.2  (was: If a RetriableException is thrown from an admin 
client or producer client operation in 
[AbstractWorkerSourceTask::sendRecords|https://github.com/apache/kafka/blob/5c2492bca71200806ccf776ea31639a90290d43e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L388],
 the send operation is retried for the remaining records in the batch. There is 
a minor bug in the logic for computing the remaining records in a batch which 
causes records that are filtered out by the task's transformation chain to be 
re-processed. This will also result in the SourceTask::commitRecord method 
being called twice for the same record, which can cause certain types of source 
connectors to fail. This bug seems to exist since when SMTs were first 
introduced in 0.10.2)

> AbstractWorkerSourceTask re-processes records filtered out by SMTs on 
> retriable exceptions
> --
>
> Key: KAFKA-15145
> URL: https://issues.apache.org/jira/browse/KAFKA-15145
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.10.2.0, 0.10.2.1, 0.10.2.2, 0.11.0.0, 0.11.0.1, 
> 0.11.0.2, 0.11.0.3, 1.0.0, 1.0.1, 1.0.2, 1.1.0, 1.1.1, 2.0.0, 2.0.1, 2.1.0, 
> 2.2.0, 2.1.1, 2.3.0, 2.2.1, 2.2.2, 2.4.0, 2.3.1, 2.5.0, 2.4.1, 2.6.0, 2.5.1, 
> 2.7.0, 2.6.1, 2.8.0, 2.7.1, 2.6.2, 3.1.0, 2.6.3, 2.7.2, 2.8.1, 3.0.0, 3.0.1, 
> 2.8.2, 3.2.0, 3.1.1, 3.3.0, 3.0.2, 3.1.2, 3.2.1, 3.4.0, 3.2.2, 3.2.3, 3.3.1, 
> 3.3.2, 3.5.0, 3.4.1
>Reporter: Yash Mayya
>Assignee: Yash Mayya
>Priority: Minor
>
> If a RetriableException is thrown from an admin client or producer client 
> operation in 
> [AbstractWorkerSourceTask::sendRecords|https://github.com/apache/kafka/blob/5c2492bca71200806ccf776ea31639a90290d43e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L388],
>  the send operation is retried for the remaining records in the batch. There 
> is a bug in the logic for computing the remaining records in a batch which 
> causes records that are filtered out by the task's transformation chain to be 
> re-processed. This will also result in the SourceTask::commitRecord method 
> being called twice for the same record, which can cause certain types of 
> source connectors to fail. This bug seems to exist since when SMTs were first 
> introduced in 0.10.2



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15145) AbstractWorkerSourceTask re-processes records filtered out by SMTs on retriable exceptions

2023-07-04 Thread Yash Mayya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15145?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yash Mayya updated KAFKA-15145:
---
Description: If a RetriableException is thrown from an admin client or 
producer client operation in 
[AbstractWorkerSourceTask::sendRecords|https://github.com/apache/kafka/blob/5c2492bca71200806ccf776ea31639a90290d43e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L388],
 the send operation is retried for the remaining records in the batch. There is 
a minor bug in the logic for computing the remaining records in a batch which 
causes records that are filtered out by the task's transformation chain to be 
re-processed. This will also result in the SourceTask::commitRecord method 
being called twice for the same record, which can cause certain types of source 
connectors to fail. This bug seems to exist since when SMTs were first 
introduced in 0.10.2  (was: If a RetriableException is thrown from an admin 
client or producer client operation in 
[AbstractWorkerSourceTask::sendRecords|https://github.com/apache/kafka/blob/5c2492bca71200806ccf776ea31639a90290d43e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L388],
 the send operation is retried for the remaining records in the batch. There is 
a minor bug in the logic for computing the remaining records in a batch which 
causes records that are filtered out by the task's transformation chain to be 
re-processed. This will also result in the SourceTask::commitRecord method 
being called twice for the same record, which can cause certain types of source 
connectors to fail. This bug seems to exist ever since when SMTs were first 
introduced in 0.10.2)

> AbstractWorkerSourceTask re-processes records filtered out by SMTs on 
> retriable exceptions
> --
>
> Key: KAFKA-15145
> URL: https://issues.apache.org/jira/browse/KAFKA-15145
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.10.2.0, 0.10.2.1, 0.10.2.2, 0.11.0.0, 0.11.0.1, 
> 0.11.0.2, 0.11.0.3, 1.0.0, 1.0.1, 1.0.2, 1.1.0, 1.1.1, 2.0.0, 2.0.1, 2.1.0, 
> 2.2.0, 2.1.1, 2.3.0, 2.2.1, 2.2.2, 2.4.0, 2.3.1, 2.5.0, 2.4.1, 2.6.0, 2.5.1, 
> 2.7.0, 2.6.1, 2.8.0, 2.7.1, 2.6.2, 3.1.0, 2.6.3, 2.7.2, 2.8.1, 3.0.0, 3.0.1, 
> 2.8.2, 3.2.0, 3.1.1, 3.3.0, 3.0.2, 3.1.2, 3.2.1, 3.4.0, 3.2.2, 3.2.3, 3.3.1, 
> 3.3.2, 3.5.0, 3.4.1
>Reporter: Yash Mayya
>Assignee: Yash Mayya
>Priority: Minor
>
> If a RetriableException is thrown from an admin client or producer client 
> operation in 
> [AbstractWorkerSourceTask::sendRecords|https://github.com/apache/kafka/blob/5c2492bca71200806ccf776ea31639a90290d43e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L388],
>  the send operation is retried for the remaining records in the batch. There 
> is a minor bug in the logic for computing the remaining records in a batch 
> which causes records that are filtered out by the task's transformation chain 
> to be re-processed. This will also result in the SourceTask::commitRecord 
> method being called twice for the same record, which can cause certain types 
> of source connectors to fail. This bug seems to exist since when SMTs were 
> first introduced in 0.10.2



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15145) AbstractWorkerSourceTask re-processes records filtered out by SMTs on retriable exceptions

2023-07-04 Thread Yash Mayya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15145?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yash Mayya updated KAFKA-15145:
---
Description: If a RetriableException is thrown from an admin client or 
producer client operation in 
[AbstractWorkerSourceTask::sendRecords|https://github.com/apache/kafka/blob/5c2492bca71200806ccf776ea31639a90290d43e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L388],
 the send operation is retried for the remaining records in the batch. There is 
a minor bug in the logic for computing the remaining records in a batch which 
causes records that are filtered out by the task's transformation chain to be 
re-processed. This will also result in the SourceTask::commitRecord method 
being called twice for the same record, which can cause certain types of source 
connectors to fail. This bug seems to exist since when SMTs were first 
introduced in 0.10.2  (was: If a RetriableException is thrown from an admin 
client or producer client operation in 
[AbstractWorkerSourceTask::sendRecords|https://github.com/apache/kafka/blob/5c2492bca71200806ccf776ea31639a90290d43e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L388],
 the send operation is retried for the remaining records in the batch. There is 
a minor bug in the logic for computing the remaining records for a batch which 
causes records that are filtered out by the task's transformation chain to be 
re-processed. This will also result in the SourceTask::commitRecord method 
being called twice for the same record, which can cause certain types of source 
connectors to fail. This bug seems to exist since when SMTs were first 
introduced in 0.10.2)

> AbstractWorkerSourceTask re-processes records filtered out by SMTs on 
> retriable exceptions
> --
>
> Key: KAFKA-15145
> URL: https://issues.apache.org/jira/browse/KAFKA-15145
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.10.2.0, 0.10.2.1, 0.10.2.2, 0.11.0.0, 0.11.0.1, 
> 0.11.0.2, 0.11.0.3, 1.0.0, 1.0.1, 1.0.2, 1.1.0, 1.1.1, 2.0.0, 2.0.1, 2.1.0, 
> 2.2.0, 2.1.1, 2.3.0, 2.2.1, 2.2.2, 2.4.0, 2.3.1, 2.5.0, 2.4.1, 2.6.0, 2.5.1, 
> 2.7.0, 2.6.1, 2.8.0, 2.7.1, 2.6.2, 3.1.0, 2.6.3, 2.7.2, 2.8.1, 3.0.0, 3.0.1, 
> 2.8.2, 3.2.0, 3.1.1, 3.3.0, 3.0.2, 3.1.2, 3.2.1, 3.4.0, 3.2.2, 3.2.3, 3.3.1, 
> 3.3.2, 3.5.0, 3.4.1
>Reporter: Yash Mayya
>Assignee: Yash Mayya
>Priority: Minor
>
> If a RetriableException is thrown from an admin client or producer client 
> operation in 
> [AbstractWorkerSourceTask::sendRecords|https://github.com/apache/kafka/blob/5c2492bca71200806ccf776ea31639a90290d43e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L388],
>  the send operation is retried for the remaining records in the batch. There 
> is a minor bug in the logic for computing the remaining records in a batch 
> which causes records that are filtered out by the task's transformation chain 
> to be re-processed. This will also result in the SourceTask::commitRecord 
> method being called twice for the same record, which can cause certain types 
> of source connectors to fail. This bug seems to exist since when SMTs were 
> first introduced in 0.10.2



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15145) AbstractWorkerSourceTask re-processes records filtered out by SMTs on retriable exceptions

2023-07-04 Thread Yash Mayya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15145?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yash Mayya updated KAFKA-15145:
---
Description: If a RetriableException is thrown from an admin client or 
producer client operation in 
[AbstractWorkerSourceTask::sendRecords|https://github.com/apache/kafka/blob/5c2492bca71200806ccf776ea31639a90290d43e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L388],
 the send operation is retried for the remaining records in the batch. There is 
a minor bug in the logic for computing the remaining records in a batch which 
causes records that are filtered out by the task's transformation chain to be 
re-processed. This will also result in the SourceTask::commitRecord method 
being called twice for the same record, which can cause certain types of source 
connectors to fail. This bug seems to exist ever since when SMTs were first 
introduced in 0.10.2  (was: If a RetriableException is thrown from an admin 
client or producer client operation in 
[AbstractWorkerSourceTask::sendRecords|https://github.com/apache/kafka/blob/5c2492bca71200806ccf776ea31639a90290d43e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L388],
 the send operation is retried for the remaining records in the batch. There is 
a minor bug in the logic for computing the remaining records in a batch which 
causes records that are filtered out by the task's transformation chain to be 
re-processed. This will also result in the SourceTask::commitRecord method 
being called twice for the same record, which can cause certain types of source 
connectors to fail. This bug seems to exist since when SMTs were first 
introduced in 0.10.2)

> AbstractWorkerSourceTask re-processes records filtered out by SMTs on 
> retriable exceptions
> --
>
> Key: KAFKA-15145
> URL: https://issues.apache.org/jira/browse/KAFKA-15145
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.10.2.0, 0.10.2.1, 0.10.2.2, 0.11.0.0, 0.11.0.1, 
> 0.11.0.2, 0.11.0.3, 1.0.0, 1.0.1, 1.0.2, 1.1.0, 1.1.1, 2.0.0, 2.0.1, 2.1.0, 
> 2.2.0, 2.1.1, 2.3.0, 2.2.1, 2.2.2, 2.4.0, 2.3.1, 2.5.0, 2.4.1, 2.6.0, 2.5.1, 
> 2.7.0, 2.6.1, 2.8.0, 2.7.1, 2.6.2, 3.1.0, 2.6.3, 2.7.2, 2.8.1, 3.0.0, 3.0.1, 
> 2.8.2, 3.2.0, 3.1.1, 3.3.0, 3.0.2, 3.1.2, 3.2.1, 3.4.0, 3.2.2, 3.2.3, 3.3.1, 
> 3.3.2, 3.5.0, 3.4.1
>Reporter: Yash Mayya
>Assignee: Yash Mayya
>Priority: Minor
>
> If a RetriableException is thrown from an admin client or producer client 
> operation in 
> [AbstractWorkerSourceTask::sendRecords|https://github.com/apache/kafka/blob/5c2492bca71200806ccf776ea31639a90290d43e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L388],
>  the send operation is retried for the remaining records in the batch. There 
> is a minor bug in the logic for computing the remaining records in a batch 
> which causes records that are filtered out by the task's transformation chain 
> to be re-processed. This will also result in the SourceTask::commitRecord 
> method being called twice for the same record, which can cause certain types 
> of source connectors to fail. This bug seems to exist ever since when SMTs 
> were first introduced in 0.10.2



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15145) AbstractWorkerSourceTask re-processes records filtered out by SMTs on retriable exceptions

2023-07-04 Thread Yash Mayya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15145?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yash Mayya updated KAFKA-15145:
---
Affects Version/s: 3.4.1
   3.5.0
   3.3.2
   3.3.1
   3.2.3
   3.2.2
   3.4.0
   3.2.1
   3.1.2
   3.0.2
   3.3.0
   3.1.1
   3.2.0
   2.8.2
   3.0.1
   3.0.0
   2.8.1
   2.7.2
   2.6.3
   3.1.0
   2.6.2
   2.7.1
   2.8.0
   2.6.1
   2.7.0
   2.5.1
   2.6.0
   2.4.1
   2.5.0
   2.3.1
   2.4.0
   2.2.2
   2.2.1
   2.3.0
   2.1.1
   2.2.0
   2.1.0
   2.0.1
   2.0.0
   1.1.1
   1.1.0
   1.0.2
   1.0.1
   1.0.0
   0.11.0.3
   0.11.0.2
   0.11.0.1
   0.11.0.0
   0.10.2.2
   0.10.2.1
   0.10.2.0

> AbstractWorkerSourceTask re-processes records filtered out by SMTs on 
> retriable exceptions
> --
>
> Key: KAFKA-15145
> URL: https://issues.apache.org/jira/browse/KAFKA-15145
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.10.2.0, 0.10.2.1, 0.10.2.2, 0.11.0.0, 0.11.0.1, 
> 0.11.0.2, 0.11.0.3, 1.0.0, 1.0.1, 1.0.2, 1.1.0, 1.1.1, 2.0.0, 2.0.1, 2.1.0, 
> 2.2.0, 2.1.1, 2.3.0, 2.2.1, 2.2.2, 2.4.0, 2.3.1, 2.5.0, 2.4.1, 2.6.0, 2.5.1, 
> 2.7.0, 2.6.1, 2.8.0, 2.7.1, 2.6.2, 3.1.0, 2.6.3, 2.7.2, 2.8.1, 3.0.0, 3.0.1, 
> 2.8.2, 3.2.0, 3.1.1, 3.3.0, 3.0.2, 3.1.2, 3.2.1, 3.4.0, 3.2.2, 3.2.3, 3.3.1, 
> 3.3.2, 3.5.0, 3.4.1
>Reporter: Yash Mayya
>Assignee: Yash Mayya
>Priority: Minor
>
> If a RetriableException is thrown from an admin client or producer client 
> operation in 
> [AbstractWorkerSourceTask::sendRecords|https://github.com/apache/kafka/blob/5c2492bca71200806ccf776ea31639a90290d43e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L388],
>  the send operation is retried for the remaining records in the batch. There 
> is a minor bug in the logic for computing the remaining records for a batch 
> which causes records that are filtered out by the task's transformation chain 
> to be re-processed. This will also result in the SourceTask::commitRecord 
> method being called twice for the same record, which can cause certain types 
> of source connectors to fail. This bug seems to exist since when SMTs were 
> first introduced in 0.10.2



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15145) AbstractWorkerSourceTask re-processes records filtered out by SMTs on retriable exceptions

2023-07-04 Thread Yash Mayya (Jira)
Yash Mayya created KAFKA-15145:
--

 Summary: AbstractWorkerSourceTask re-processes records filtered 
out by SMTs on retriable exceptions
 Key: KAFKA-15145
 URL: https://issues.apache.org/jira/browse/KAFKA-15145
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: Yash Mayya
Assignee: Yash Mayya


If a RetriableException is thrown from an admin client or producer client 
operation in 
[AbstractWorkerSourceTask::sendRecords|https://github.com/apache/kafka/blob/5c2492bca71200806ccf776ea31639a90290d43e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L388],
 the send operation is retried for the remaining records in the batch. There is 
a minor bug in the logic for computing the remaining records for a batch which 
causes records that are filtered out by the task's transformation chain to be 
re-processed. This will also result in the SourceTask::commitRecord method 
being called twice for the same record, which can cause certain types of source 
connectors to fail. This bug seems to exist since when SMTs were first 
introduced in 0.10.2



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15091) Javadocs for SourceTask::commit are incorrect

2023-07-01 Thread Yash Mayya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yash Mayya reassigned KAFKA-15091:
--

Assignee: Yash Mayya

> Javadocs for SourceTask::commit are incorrect
> -
>
> Key: KAFKA-15091
> URL: https://issues.apache.org/jira/browse/KAFKA-15091
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Yash Mayya
>Priority: Major
>
> The Javadocs for {{SourceTask::commit}} state that the method should:
> {quote}Commit the offsets, up to the offsets that have been returned by 
> [{{poll()}}|https://kafka.apache.org/34/javadoc/org/apache/kafka/connect/source/SourceTask.html#poll()].
> {quote}
> However, this is obviously incorrect given how the Connect runtime (when not 
> configured with exactly-once support for source connectors) performs polling 
> and offset commits on separate threads. There's also some extensive 
> discussion on the semantics of that method in KAFKA-5716 where it's made 
> clear that altering the behavior of the runtime to align with the documented 
> semantics of that method is not a viable option.
> We should update the Javadocs for this method to state that it does not have 
> anything to do with the offsets returned from {{SourceTask:poll}} and is 
> instead just a general, periodically-invoked hook to let the task know that 
> an offset commit has taken place (but with no guarantees as to which offsets 
> have been committed and which ones correspond to still-in-flight records).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15121) FileStreamSourceConnector and FileStreamSinkConnector should implement KIP-875 APIs (alterOffsets)

2023-06-26 Thread Yash Mayya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15121?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yash Mayya updated KAFKA-15121:
---
Summary: FileStreamSourceConnector and FileStreamSinkConnector should 
implement KIP-875 APIs (alterOffsets)  (was: FileStreamSourceConnector and 
FileStreamSinkConnector should implement KIP-875 APIs)

> FileStreamSourceConnector and FileStreamSinkConnector should implement 
> KIP-875 APIs (alterOffsets)
> --
>
> Key: KAFKA-15121
> URL: https://issues.apache.org/jira/browse/KAFKA-15121
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Yash Mayya
>Assignee: Yash Mayya
>Priority: Minor
>
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect]
>  introduced the new SourceConnector::alterOffsets and 
> SinkConnector::alterOffsets APIs. The FileStreamSourceConnector and 
> FileStreamSinkConnector should implement these new methods to improve the 
> user experience when modifying offsets for these connectors and also to serve 
> as an example for other connectors.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15121) FileStreamSourceConnector and FileStreamSinkConnector should implement KIP-875 APIs

2023-06-26 Thread Yash Mayya (Jira)
Yash Mayya created KAFKA-15121:
--

 Summary: FileStreamSourceConnector and FileStreamSinkConnector 
should implement KIP-875 APIs
 Key: KAFKA-15121
 URL: https://issues.apache.org/jira/browse/KAFKA-15121
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Reporter: Yash Mayya
Assignee: Yash Mayya


[https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect]
 introduced the new SourceConnector::alterOffsets and 
SinkConnector::alterOffsets APIs. The FileStreamSourceConnector and 
FileStreamSinkConnector should implement these new methods to improve the user 
experience when modifying offsets for these connectors and also to serve as an 
example for other connectors.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15113) Gracefully handle cases where a sink connector's admin and consumer client config overrides target different Kafka clusters

2023-06-23 Thread Yash Mayya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17736483#comment-17736483
 ] 

Yash Mayya commented on KAFKA-15113:


Thanks for reviewing the ticket Chris! Yeah, I'm not sure whether it's a 
realistic use case either, but I guess it's possible today to setup a sink 
connector which consumes from a topic on one Kafka cluster but has a DLQ topic 
on another cluster for example? So any change we'd make in this area would 
require a KIP I presume. I do like the idea of making it easier to configure 
common Kafka client override configurations, although I'm not so sure about 
changing the request structure for the create / update connector REST APIs just 
for this use case? It'd also be a little tricky to do the same with the `PUT 
/connectors/\{connector}/config` endpoint while maintaining compatibility.

> Gracefully handle cases where a sink connector's admin and consumer client 
> config overrides target different Kafka clusters
> ---
>
> Key: KAFKA-15113
> URL: https://issues.apache.org/jira/browse/KAFKA-15113
> Project: Kafka
>  Issue Type: Task
>  Components: KafkaConnect
>Reporter: Yash Mayya
>Priority: Minor
>
> Background reading -
>  * 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-458%3A+Connector+Client+Config+Override+Policy]
>  
>  * 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect]
>  
>  
> From [https://github.com/apache/kafka/pull/13434#discussion_r1144415671] -
> {quote}Currently, admin clients are only instantiated for sink connectors to 
> create the DLQ topic if required. So it seems like it could be technically 
> possible for a sink connector's consumer client overrides to target a 
> different Kafka cluster from its producer and admin client overrides. Such a 
> setup won't work with this implementation of the get offsets API as it is 
> using an admin client to get a sink connector's consumer group offsets. 
> However, I'm not sure we want to use a consumer client to retrieve the 
> offsets either as we shouldn't be disrupting the existing sink tasks' 
> consumer group just to fetch offsets. Leveraging a sink task's consumer also 
> isn't an option because fetching offsets for a stopped sink connector (where 
> all the tasks will be stopped) should be allowed. I'm wondering if we should 
> document that a connector's various client config override policies shouldn't 
> target different Kafka clusters (side note - looks like we don't [currently 
> document|https://kafka.apache.org/documentation/#connect] client config 
> overrides for Connect beyond just the worker property 
> {{{}connector.client.config.override.policy{}}}).
> {quote}
>  
> {quote}I don't think we need to worry too much about this. I cannot imagine a 
> sane use case that involves overriding a connector's Kafka clients with 
> different Kafka clusters (not just bootstrap servers, but actually different 
> clusters) for producer/consumer/admin. I'd be fine with adding a note to our 
> docs that that kind of setup isn't supported but I really, really hope that 
> it's not necessary and nobody's trying to do that in the first place. I also 
> suspect that there are other places where this might cause issues, like with 
> exactly-once source support or automatic topic creation for source connectors.
> That said, there is a different case we may want to consider: someone may 
> have configured consumer overrides for a sink connector, but not admin 
> overrides. This may happen if they don't use a DLQ topic. I don't know if we 
> absolutely need to handle this now and we may consider filing a follow-up 
> ticket to look into this, but one quick-and-dirty thought I've had is to 
> configure the admin client used here with a combination of the configurations 
> for the connector's admin client and its consumer, giving precedent to the 
> latter.
> {quote}
>  
> Also from [https://github.com/apache/kafka/pull/13818#discussion_r1224138055] 
> -
> {quote}We will have undesirable behavior if the connector is targeting a 
> Kafka cluster different from the Connect cluster's backing Kafka cluster and 
> the user has configured the consumer overrides appropriately for their 
> connector, but not the admin overrides (something we also discussed 
> previously 
> [here|https://github.com/apache/kafka/pull/13434#discussion_r1144415671]).
> In the above case, if a user attempts to reset their sink connector's offsets 
> via the {{DELETE /connectors/\{connector}/offsets}} endpoint, the following 
> will occur:
>  # We list the consumer group offsets via {{Admin::listConsumerGroupOffsets}} 
> which returns an empty partition offsets map for the sink 

[jira] [Created] (KAFKA-15113) Gracefully handle cases where a sink connector's admin and consumer client config overrides target different Kafka clusters

2023-06-22 Thread Yash Mayya (Jira)
Yash Mayya created KAFKA-15113:
--

 Summary: Gracefully handle cases where a sink connector's admin 
and consumer client config overrides target different Kafka clusters
 Key: KAFKA-15113
 URL: https://issues.apache.org/jira/browse/KAFKA-15113
 Project: Kafka
  Issue Type: Task
  Components: KafkaConnect
Reporter: Yash Mayya


Background reading -
 * 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-458%3A+Connector+Client+Config+Override+Policy]
 
 * 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect]
 

 

>From [https://github.com/apache/kafka/pull/13434#discussion_r1144415671] -
{quote}Currently, admin clients are only instantiated for sink connectors to 
create the DLQ topic if required. So it seems like it could be technically 
possible for a sink connector's consumer client overrides to target a different 
Kafka cluster from its producer and admin client overrides. Such a setup won't 
work with this implementation of the get offsets API as it is using an admin 
client to get a sink connector's consumer group offsets. However, I'm not sure 
we want to use a consumer client to retrieve the offsets either as we shouldn't 
be disrupting the existing sink tasks' consumer group just to fetch offsets. 
Leveraging a sink task's consumer also isn't an option because fetching offsets 
for a stopped sink connector (where all the tasks will be stopped) should be 
allowed. I'm wondering if we should document that a connector's various client 
config override policies shouldn't target different Kafka clusters (side note - 
looks like we don't [currently 
document|https://kafka.apache.org/documentation/#connect] client config 
overrides for Connect beyond just the worker property 
{{{}connector.client.config.override.policy{}}}).
{quote}
 
{quote}I don't think we need to worry too much about this. I cannot imagine a 
sane use case that involves overriding a connector's Kafka clients with 
different Kafka clusters (not just bootstrap servers, but actually different 
clusters) for producer/consumer/admin. I'd be fine with adding a note to our 
docs that that kind of setup isn't supported but I really, really hope that 
it's not necessary and nobody's trying to do that in the first place. I also 
suspect that there are other places where this might cause issues, like with 
exactly-once source support or automatic topic creation for source connectors.

That said, there is a different case we may want to consider: someone may have 
configured consumer overrides for a sink connector, but not admin overrides. 
This may happen if they don't use a DLQ topic. I don't know if we absolutely 
need to handle this now and we may consider filing a follow-up ticket to look 
into this, but one quick-and-dirty thought I've had is to configure the admin 
client used here with a combination of the configurations for the connector's 
admin client and its consumer, giving precedent to the latter.
{quote}
 

Also from [https://github.com/apache/kafka/pull/13818#discussion_r1224138055] -
{quote}We will have undesirable behavior if the connector is targeting a Kafka 
cluster different from the Connect cluster's backing Kafka cluster and the user 
has configured the consumer overrides appropriately for their connector, but 
not the admin overrides (something we also discussed previously 
[here|https://github.com/apache/kafka/pull/13434#discussion_r1144415671]).

In the above case, if a user attempts to reset their sink connector's offsets 
via the {{DELETE /connectors/\{connector}/offsets}} endpoint, the following 
will occur:
 # We list the consumer group offsets via {{Admin::listConsumerGroupOffsets}} 
which returns an empty partition offsets map for the sink connector's consumer 
group ID (it exists on a different Kafka cluster to the one that the admin 
client is connecting to).
 # We call {{SinkConnector::alterOffsets}} with an empty offsets map which 
could cause the sink connector to propagate the offsets reset related changes 
to the sink system.
 # We attempt to delete the consumer group via {{Admin::deleteConsumerGroups}} 
which returns {{GroupIdNotFoundException}} which we essentially swallow in 
order to keep offsets reset operations idempotent and return a success message 
to the user (even though the real consumer group for the sink connector on the 
other Kafka cluster hasn't been deleted).

This will occur if the connector's admin overrides are missing OR the admin 
overrides are deliberately configured to target a Kafka cluster different from 
the consumer overrides (although like you pointed out in the other linked 
thread, this doesn't seem like a valid use case that we'd even want to support).

I guess we'd want to pursue the approach you suggested where we'd configure the 
admin client with a combination of the connector's admin overrides and 

[jira] [Commented] (KAFKA-15091) Javadocs for SourceTask::commit are incorrect

2023-06-14 Thread Yash Mayya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17732666#comment-17732666
 ] 

Yash Mayya commented on KAFKA-15091:


{quote}{{it does not have anything to do with the offsets returned from 
{{SourceTask:poll}} and is instead just a general, periodically-invoked hook to 
let the task know that an offset commit has taken place (but with no guarantees 
as to which offsets have been committed and which ones correspond to 
still-in-flight records).}}
{quote}
 

The SourceTask::commit method doesn't seem like a particularly useful hook in 
its current shape; I wonder whether we should consider deprecating it...?

> Javadocs for SourceTask::commit are incorrect
> -
>
> Key: KAFKA-15091
> URL: https://issues.apache.org/jira/browse/KAFKA-15091
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Priority: Major
>
> The Javadocs for {{SourceTask::commit}} state that the method should:
> {quote}Commit the offsets, up to the offsets that have been returned by 
> [{{poll()}}|https://kafka.apache.org/34/javadoc/org/apache/kafka/connect/source/SourceTask.html#poll()].
> {quote}
> However, this is obviously incorrect given how the Connect runtime (when not 
> configured with exactly-once support for source connectors) performs polling 
> and offset commits on separate threads. There's also some extensive 
> discussion on the semantics of that method in KAFKA-5716 where it's made 
> clear that altering the behavior of the runtime to align with the documented 
> semantics of that method is not a viable option.
> We should update the Javadocs for this method to state that it does not have 
> anything to do with the offsets returned from {{SourceTask:poll}} and is 
> instead just a general, periodically-invoked hook to let the task know that 
> an offset commit has taken place (but with no guarantees as to which offsets 
> have been committed and which ones correspond to still-in-flight records).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15012) JsonConverter fails when there are leading Zeros in a field

2023-06-01 Thread Yash Mayya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15012?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yash Mayya reassigned KAFKA-15012:
--

Assignee: Yash Mayya

> JsonConverter fails when there are leading Zeros in a field
> ---
>
> Key: KAFKA-15012
> URL: https://issues.apache.org/jira/browse/KAFKA-15012
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.4.0, 3.3.2
>Reporter: Ranjan Rao
>Assignee: Yash Mayya
>Priority: Major
> Attachments: 
> enable_ALLOW_LEADING_ZEROS_FOR_NUMBERS_in_jackson_object_mapper_.patch
>
>
> When there are leading zeros in a field in the Kakfa Record, a sink connector 
> using JsonConverter fails with the below exception
>  
> {code:java}
> org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error 
> handler
>   at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)
>   at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:494)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:474)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
>   at 
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
>   at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:237)
>   at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>   at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>   at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>   at java.base/java.lang.Thread.run(Thread.java:829)
> Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] 
> to Kafka Connect data failed due to serialization error: 
>   at 
> org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:324)
>   at 
> org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertKey(WorkerSinkTask.java:531)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:494)
>   at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
>   at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
>   ... 13 more
> Caused by: org.apache.kafka.common.errors.SerializationException: 
> com.fasterxml.jackson.core.JsonParseException: Invalid numeric value: Leading 
> zeroes not allowed
>  at [Source: (byte[])"00080153032837"; line: 1, column: 2]
> Caused by: com.fasterxml.jackson.core.JsonParseException: Invalid numeric 
> value: Leading zeroes not allowed
>  at [Source: (byte[])"00080153032837"; line: 1, column: 2]
>   at 
> com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1840)
>   at 
> com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:712)
>   at 
> com.fasterxml.jackson.core.base.ParserMinimalBase.reportInvalidNumber(ParserMinimalBase.java:551)
>   at 
> com.fasterxml.jackson.core.json.UTF8StreamJsonParser._verifyNoLeadingZeroes(UTF8StreamJsonParser.java:1520)
>   at 
> com.fasterxml.jackson.core.json.UTF8StreamJsonParser._parsePosNumber(UTF8StreamJsonParser.java:1372)
>   at 
> com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:855)
>   at 
> com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:754)
>   at 
> com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4247)
>   at 
> com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2734)
>   at 
> org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:64)
>   at 
> org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:322)
>   at 
> org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertKey(WorkerSinkTask.java:531)
>   at 
> 

[jira] [Comment Edited] (KAFKA-15012) JsonConverter fails when there are leading Zeros in a field

2023-05-30 Thread Yash Mayya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17725223#comment-17725223
 ] 

Yash Mayya edited comment on KAFKA-15012 at 5/30/23 6:34 PM:
-

Thanks for filing this Jira [~ranjanrao]. Simply enabling the 
_ALLOWLEADINGZEROSFORNUMBERS_ feature would likely be a backward incompatible 
change since there could potentially be users relying on the existing behavior 
to send bad data (i.e. some numeric field has leading zeroes when it isn't 
expected to) to a DLQ topic. This might need a small KIP adding a new config 
(maybe {_}allow.leading.zeroes.for.numbers{_}) to the JsonConverter which 
defaults to false in order to be backward compatible. Alternatively, we could 
design a way to allow users to configure the various 
[JsonReadFeature|https://fasterxml.github.io/jackson-core/javadoc/2.10/com/fasterxml/jackson/core/json/JsonReadFeature.html]s
 and 
[JsonWriteFeature|https://fasterxml.github.io/jackson-core/javadoc/2.10/com/fasterxml/jackson/core/json/JsonWriteFeature.html]s
 for the JsonSerializer / JsonDeserializer used in the JsonConverter.


was (Author: yash.mayya):
Thanks for filing this Jira [~ranjanrao]. Simply enabling the 
_ALLOWLEADINGZEROSFORNUMBERS_ feature would likely be a backward incompatible 
change since there could potentially be users relying on the existing behavior 
to send bad data (i.e. some numeric field has leading zeroes when it isn't 
expected to) to a DLQ topic. This might need a small KIP adding a new config 
(maybe {_}allow.leading.zeroes.for.numbers{_}) to the JsonConverter which 
defaults to false in order to be backward compatible. Alternatively, we could 
design a way to allow users to configure the various 
[JsonReadFeature|https://fasterxml.github.io/jackson-core/javadoc/2.10/com/fasterxml/jackson/core/json/JsonReadFeature.html]s
 and 
[JsonWriteFeature|https://fasterxml.github.io/jackson-core/javadoc/2.10/com/fasterxml/jackson/core/JsonParser.Feature.html]s
 for the JsonSerializer / JsonDeserializer used in the JsonConverter.

> JsonConverter fails when there are leading Zeros in a field
> ---
>
> Key: KAFKA-15012
> URL: https://issues.apache.org/jira/browse/KAFKA-15012
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.4.0, 3.3.2
>Reporter: Ranjan Rao
>Priority: Major
> Attachments: 
> enable_ALLOW_LEADING_ZEROS_FOR_NUMBERS_in_jackson_object_mapper_.patch
>
>
> When there are leading zeros in a field in the Kakfa Record, a sink connector 
> using JsonConverter fails with the below exception
>  
> {code:java}
> org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error 
> handler
>   at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)
>   at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:494)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:474)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
>   at 
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
>   at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:237)
>   at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>   at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>   at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>   at java.base/java.lang.Thread.run(Thread.java:829)
> Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] 
> to Kafka Connect data failed due to serialization error: 
>   at 
> org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:324)
>   at 
> org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertKey(WorkerSinkTask.java:531)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:494)
>   at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
>   at 
> 

[jira] [Commented] (KAFKA-15012) JsonConverter fails when there are leading Zeros in a field

2023-05-30 Thread Yash Mayya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17727653#comment-17727653
 ] 

Yash Mayya commented on KAFKA-15012:


Thanks [~ChrisEgerton], that's a fair point. I was comparing this to 
https://issues.apache.org/jira/browse/KAFKA-8713 but your point regarding there 
being nothing different about the data given to the sink connectors post 
conversion here makes sense and causes this case to be different from 
[KIP-581|https://cwiki.apache.org/confluence/display/KAFKA/KIP-581%3A+Value+of+optional+null+field+which+has+default+value].
 I agree that this qualifies as a bug in the converter so I think we should go 
ahead with the proposed fix here.

> JsonConverter fails when there are leading Zeros in a field
> ---
>
> Key: KAFKA-15012
> URL: https://issues.apache.org/jira/browse/KAFKA-15012
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.4.0, 3.3.2
>Reporter: Ranjan Rao
>Priority: Major
> Attachments: 
> enable_ALLOW_LEADING_ZEROS_FOR_NUMBERS_in_jackson_object_mapper_.patch
>
>
> When there are leading zeros in a field in the Kakfa Record, a sink connector 
> using JsonConverter fails with the below exception
>  
> {code:java}
> org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error 
> handler
>   at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)
>   at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:494)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:474)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
>   at 
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
>   at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:237)
>   at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>   at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>   at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>   at java.base/java.lang.Thread.run(Thread.java:829)
> Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] 
> to Kafka Connect data failed due to serialization error: 
>   at 
> org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:324)
>   at 
> org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertKey(WorkerSinkTask.java:531)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:494)
>   at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
>   at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
>   ... 13 more
> Caused by: org.apache.kafka.common.errors.SerializationException: 
> com.fasterxml.jackson.core.JsonParseException: Invalid numeric value: Leading 
> zeroes not allowed
>  at [Source: (byte[])"00080153032837"; line: 1, column: 2]
> Caused by: com.fasterxml.jackson.core.JsonParseException: Invalid numeric 
> value: Leading zeroes not allowed
>  at [Source: (byte[])"00080153032837"; line: 1, column: 2]
>   at 
> com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1840)
>   at 
> com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:712)
>   at 
> com.fasterxml.jackson.core.base.ParserMinimalBase.reportInvalidNumber(ParserMinimalBase.java:551)
>   at 
> com.fasterxml.jackson.core.json.UTF8StreamJsonParser._verifyNoLeadingZeroes(UTF8StreamJsonParser.java:1520)
>   at 
> com.fasterxml.jackson.core.json.UTF8StreamJsonParser._parsePosNumber(UTF8StreamJsonParser.java:1372)
>   at 
> com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:855)
>   at 
> com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:754)
>   at 
> 

[jira] [Commented] (KAFKA-15034) Improvement of ReplaceField performance for long list

2023-05-29 Thread Yash Mayya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17727119#comment-17727119
 ] 

Yash Mayya commented on KAFKA-15034:


Thanks for filing this ticket [~baz33]. I think what you're suggesting makes 
sense and I've raised this PR which uses a HashSet for the include / exclude 
fields in the ReplaceField SMT and adds a JMH benchmark to demonstrate the 
performance improvements - [https://github.com/apache/kafka/pull/13776] 

> Improvement of ReplaceField performance for long list
> -
>
> Key: KAFKA-15034
> URL: https://issues.apache.org/jira/browse/KAFKA-15034
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 3.4.0
>Reporter: BDeus
>Assignee: Yash Mayya
>Priority: Minor
>
> SMTs ReplaceField use List for include and exclude filter that use 
> ArrayList internally.
> In case of long list of filter the complexity of arraylist _O(n )_ results in 
> poor performance.
> Could we use HashSet implementation in ReplaceField class instead of the 
> traditionnal ArrayList ?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15034) Improvement of ReplaceField performance for long list

2023-05-29 Thread Yash Mayya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15034?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yash Mayya reassigned KAFKA-15034:
--

Assignee: Yash Mayya

> Improvement of ReplaceField performance for long list
> -
>
> Key: KAFKA-15034
> URL: https://issues.apache.org/jira/browse/KAFKA-15034
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 3.4.0
>Reporter: BDeus
>Assignee: Yash Mayya
>Priority: Minor
>
> SMTs ReplaceField use List for include and exclude filter that use 
> ArrayList internally.
> In case of long list of filter the complexity of arraylist _O(n )_ results in 
> poor performance.
> Could we use HashSet implementation in ReplaceField class instead of the 
> traditionnal ArrayList ?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14956) Flaky test org.apache.kafka.connect.integration.OffsetsApiIntegrationTest#testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted

2023-05-29 Thread Yash Mayya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17727025#comment-17727025
 ] 

Yash Mayya commented on KAFKA-14956:


This test seems to be consistently passing after 
[https://github.com/apache/kafka/pull/13465] was merged (timeout value for the 
read offsets operation in *OffsetsApiIntegrationTest* was bumped up) so I'm 
gonna mark this ticket as resolved. In case a failure re-occurs, this ticket 
can be re-opened for investigation.

> Flaky test 
> org.apache.kafka.connect.integration.OffsetsApiIntegrationTest#testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted
> --
>
> Key: KAFKA-14956
> URL: https://issues.apache.org/jira/browse/KAFKA-14956
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Sagar Rao
>Assignee: Yash Mayya
>Priority: Major
>  Labels: flaky-test
>
> ```
> h4. Error
> org.opentest4j.AssertionFailedError: Condition not met within timeout 15000. 
> Sink connector consumer group offsets should catch up to the topic end 
> offsets ==> expected:  but was: 
> h4. Stacktrace
> org.opentest4j.AssertionFailedError: Condition not met within timeout 15000. 
> Sink connector consumer group offsets should catch up to the topic end 
> offsets ==> expected:  but was: 
>  at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
>  at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
>  at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)
>  at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)
>  at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:211)
>  at 
> app//org.apache.kafka.test.TestUtils.lambda$waitForCondition$4(TestUtils.java:337)
>  at 
> app//org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:385)
>  at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:334)
>  at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:318)
>  at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:291)
>  at 
> app//org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.getAndVerifySinkConnectorOffsets(OffsetsApiIntegrationTest.java:150)
>  at 
> app//org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted(OffsetsApiIntegrationTest.java:131)
>  at 
> java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
>  at 
> java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>  at 
> java.base@17.0.7/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.base@17.0.7/java.lang.reflect.Method.invoke(Method.java:568)
>  at 
> app//org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>  at 
> app//org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>  at 
> app//org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>  at 
> app//org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>  at 
> app//org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>  at 
> app//org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>  at app//org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>  at 
> app//org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>  at app//org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>  at 
> app//org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>  at 
> app//org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>  at app//org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>  at app//org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>  at app//org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>  at app//org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>  at app//org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>  at app//org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>  at app//org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:108)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>  at 
> 

[jira] [Resolved] (KAFKA-14956) Flaky test org.apache.kafka.connect.integration.OffsetsApiIntegrationTest#testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted

2023-05-29 Thread Yash Mayya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14956?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yash Mayya resolved KAFKA-14956.

Resolution: Fixed

> Flaky test 
> org.apache.kafka.connect.integration.OffsetsApiIntegrationTest#testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted
> --
>
> Key: KAFKA-14956
> URL: https://issues.apache.org/jira/browse/KAFKA-14956
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Sagar Rao
>Assignee: Yash Mayya
>Priority: Major
>  Labels: flaky-test
>
> ```
> h4. Error
> org.opentest4j.AssertionFailedError: Condition not met within timeout 15000. 
> Sink connector consumer group offsets should catch up to the topic end 
> offsets ==> expected:  but was: 
> h4. Stacktrace
> org.opentest4j.AssertionFailedError: Condition not met within timeout 15000. 
> Sink connector consumer group offsets should catch up to the topic end 
> offsets ==> expected:  but was: 
>  at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
>  at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
>  at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)
>  at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)
>  at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:211)
>  at 
> app//org.apache.kafka.test.TestUtils.lambda$waitForCondition$4(TestUtils.java:337)
>  at 
> app//org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:385)
>  at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:334)
>  at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:318)
>  at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:291)
>  at 
> app//org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.getAndVerifySinkConnectorOffsets(OffsetsApiIntegrationTest.java:150)
>  at 
> app//org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted(OffsetsApiIntegrationTest.java:131)
>  at 
> java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
>  at 
> java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>  at 
> java.base@17.0.7/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.base@17.0.7/java.lang.reflect.Method.invoke(Method.java:568)
>  at 
> app//org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>  at 
> app//org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>  at 
> app//org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>  at 
> app//org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>  at 
> app//org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>  at 
> app//org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>  at app//org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>  at 
> app//org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>  at app//org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>  at 
> app//org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>  at 
> app//org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>  at app//org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>  at app//org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>  at app//org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>  at app//org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>  at app//org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>  at app//org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>  at app//org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:108)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:40)
>  at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:60)
>  at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:52)
>  at 
> 

[jira] [Commented] (KAFKA-14995) Automate asf.yaml collaborators refresh

2023-05-24 Thread Yash Mayya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17725722#comment-17725722
 ] 

Yash Mayya commented on KAFKA-14995:


[~dajac] that GitHub team isn't publicly accessible though.

 
{quote}To generate the YAML lists, we need to map from Git log "Author" to 
Github username. There's presumably some way to do this in the Github REST API 
(the mapping is based on the email, IIUC), or we could also just update the 
Committers page to also document each committer's Github username.{quote}
This won't work because the list of collaborators is supposed to be dynamic 
right?

 

> Automate asf.yaml collaborators refresh
> ---
>
> Key: KAFKA-14995
> URL: https://issues.apache.org/jira/browse/KAFKA-14995
> Project: Kafka
>  Issue Type: Improvement
>Reporter: John Roesler
>Priority: Minor
>  Labels: newbie
>
> We have added a policy to use the asf.yaml Github Collaborators: 
> [https://github.com/apache/kafka-site/pull/510]
> The policy states that we set this list to be the top 20 commit authors who 
> are not Kafka committers. Unfortunately, it's not trivial to compute this 
> list.
> Here is the process I followed to generate the list the first time (note that 
> I generated this list on 2023-04-28, so the lookback is one year:
> 1. List authors by commit volume in the last year:
> {code:java}
> $ git shortlog --email --numbered --summary --since=2022-04-28 | vim {code}
> 2. manually filter out the authors who are committers, based on 
> [https://kafka.apache.org/committers]
> 3. truncate the list to 20 authors
> 4. for each author
> 4a. Find a commit in the `git log` that they were the author on:
> {code:java}
> commit 440bed2391338dc10fe4d36ab17dc104b61b85e8
> Author: hudeqi <1217150...@qq.com>
> Date:   Fri May 12 14:03:17 2023 +0800
> ...{code}
> 4b. Look up that commit in Github: 
> [https://github.com/apache/kafka/commit/440bed2391338dc10fe4d36ab17dc104b61b85e8]
> 4c. Copy their Github username into .asf.yaml under both the PR whitelist and 
> the Collaborators lists.
> 5. Send a PR to update .asf.yaml: [https://github.com/apache/kafka/pull/13713]
>  
> This is pretty time consuming and is very scriptable. Two complications:
>  * To do the filtering, we need to map from Git log "Author" to documented 
> Kafka "Committer" that we can use to perform the filter. Suggestion: just 
> update the structure of the "Committers" page to include their Git "Author" 
> name and email 
> ([https://github.com/apache/kafka-site/blob/asf-site/committers.html)]
>  * To generate the YAML lists, we need to map from Git log "Author" to Github 
> username. There's presumably some way to do this in the Github REST API (the 
> mapping is based on the email, IIUC), or we could also just update the 
> Committers page to also document each committer's Github username.
>  
> Ideally, we would write this script (to be stored in the Apache Kafka repo) 
> and create a Github Action to run it every three months.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14952) Publish metrics when source connector fails to poll data

2023-05-24 Thread Yash Mayya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14952?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yash Mayya updated KAFKA-14952:
---
Labels: connect connect-api kip-required  (was: connect connect-api)

> Publish metrics when source connector fails to poll data
> 
>
> Key: KAFKA-14952
> URL: https://issues.apache.org/jira/browse/KAFKA-14952
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 3.3.2
>Reporter: Ravindranath Kakarla
>Priority: Minor
>  Labels: connect, connect-api, kip-required
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Currently, there is no metric in Kafka Connect to track when a source 
> connector fails to poll data from the source. This information would be 
> useful to operators and developers to visualize, monitor and alert when the 
> connector fails to poll records from the source.
> Existing metrics like *kafka_producer_producer_metrics_record_error_total* 
> and *kafka_connect_task_error_metrics_total_record_failures* only cover 
> failures when producing data to the Kafka cluster but not when the source 
> task fails with a retryable exception or ConnectException.
> Polling from source can fail due to unavailability of the source system or 
> errors with the connect configuration. Currently, this cannot be monitored 
> directly using metrics and instead operators have to rely on log diving which 
> is not consistent with how other metrics are monitored.
> I propose adding new metrics to Kafka Connect, 
> "{_}source-record-poll-error-total{_}" and 
> "{_}source-record-poll-error-rate{_}" that can be used to monitor failures 
> during polling.
> *source-record-poll-error-total* - The total number of times a source 
> connector failed to poll data from the source. This will include both 
> retryable and non-retryable exceptions.
> *source-record-poll-error-rate* - The rate of above failures per unit of time.
> These metrics would be tracked at the connector level and could be exposed 
> through the JMX along with the other metrics.
> I am willing to submit a PR if this looks good, sample implementation code 
> below,
> {code:java}
> //AbstractWorkerSourceTask.java
> protected List poll() throws InterruptedException {
> try {
> return task.poll();
> } catch (RetriableException | 
> org.apache.kafka.common.errors.RetriableException e) {
> log.warn("{} failed to poll records from SourceTask. Will retry 
> operation.", this, e);
>   
>  sourceTaskMetricsGroup.recordPollError();
> // Do nothing. Let the framework poll whenever it's ready.
> return null;
> } catch (Throwable e) {
> sourceTaskMetricsGroup.recordPollError();
> 
> throw e;
> }
> } {code}
> [Reference|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L460]
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15018) Potential tombstone offsets corruption for exactly-once source connectors

2023-05-24 Thread Yash Mayya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17725713#comment-17725713
 ] 

Yash Mayya commented on KAFKA-15018:


Thanks for filing this bug ticket [~ChrisEgerton], it's an interesting one. 
This bug also affects regular source connectors that are configured to use a 
separate offsets topic right (and not only exactly-once support enabled source 
connectors as indicated in the ticket title / description)? 

 

I like the idea of synchronously writing tombstones to the global offsets store 
before writing them to the connector specific offsets store - it's clean and 
simple. You have a valid point about two synchronous writes to topics on 
potentially separate Kafka clusters being sub-optimal, however I also agree 
with your later point on tombstone offsets being pretty uncommon based on the 
connectors and use-cases I've encountered so far. I think it's a very 
reasonable trade-off to make (i.e. slightly worse performance for an edge case 
over potential correctness issues for the same edge case).

 

An alternate idea I had was to treat tombstones values differently from the 
absence of an offset - i.e. use a special value to represent tombstones in the 
offset store. This would allow us to distinguish between no offset being 
present in the connector specific offset store for a particular partition 
versus it being explicitly wiped by a connector task via a null / tombstone 
offset. The "special" value wouldn't be persisted in the topic, it would only 
be present in the in-memory store which represents the materialized view of the 
offset topic. However, we'd need to do some additional work to ensure that this 
value isn't leaked to connectors / tasks - basically, it should only be 
surfaced to the ConnectorOffsetBackingStore in order to make a decision on 
whether or not to use the offset from the global offsets store. I personally 
think that the other approach is cleaner overall though, WDYT?

> Potential tombstone offsets corruption for exactly-once source connectors
> -
>
> Key: KAFKA-15018
> URL: https://issues.apache.org/jira/browse/KAFKA-15018
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.3.0, 3.4.0, 3.3.1, 3.3.2, 3.5.0, 3.4.1
>Reporter: Chris Egerton
>Priority: Major
>
> When exactly-once support is enabled for source connectors, source offsets 
> can potentially be written to two different offsets topics: a topic specific 
> to the connector, and the global offsets topic (which was used for all 
> connectors prior to KIP-618 / version 3.3.0).
> Precedence is given to offsets in the per-connector offsets topic, but if 
> none are found for a given partition, then the global offsets topic is used 
> as a fallback.
> When committing offsets, a transaction is used to ensure that source records 
> and source offsets are written to the Kafka cluster targeted by the source 
> connector. This transaction only includes the connector-specific offsets 
> topic. Writes to the global offsets topic take place after writes to the 
> connector-specific offsets topic have completed successfully, and if they 
> fail, a warning message is logged, but no other action is taken.
> Normally, this ensures that, for offsets committed by exactly-once-supported 
> source connectors, the per-connector offsets topic is at least as up-to-date 
> as the global offsets topic, and sometimes even ahead.
> However, for tombstone offsets, we lose that guarantee. If a tombstone offset 
> is successfully written to the per-connector offsets topic, but cannot be 
> written to the global offsets topic, then the global offsets topic will still 
> contain that source offset, but the per-connector topic will not. Due to the 
> fallback-on-global logic used by the worker, if a task requests offsets for 
> one of the tombstoned partitions, the worker will provide it with the offsets 
> present in the global offsets topic, instead of indicating to the task that 
> no offsets can be found.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15012) JsonConverter fails when there are leading Zeros in a field

2023-05-23 Thread Yash Mayya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17725223#comment-17725223
 ] 

Yash Mayya commented on KAFKA-15012:


Thanks for filing this Jira [~ranjanrao]. Simply enabling the 
_ALLOWLEADINGZEROSFORNUMBERS_ feature would likely be a backward incompatible 
change since there could potentially be users relying on the existing behavior 
to send bad data (i.e. some numeric field has leading zeroes when it isn't 
expected to) to a DLQ topic. This might need a small KIP adding a new config 
(maybe {_}allow.leading.zeroes.for.numbers{_}) to the JsonConverter which 
defaults to false in order to be backward compatible. Alternatively, we could 
design a way to allow users to configure the various 
[JsonReadFeature|https://fasterxml.github.io/jackson-core/javadoc/2.10/com/fasterxml/jackson/core/json/JsonReadFeature.html]s
 and 
[JsonWriteFeature|https://fasterxml.github.io/jackson-core/javadoc/2.10/com/fasterxml/jackson/core/JsonParser.Feature.html]s
 for the JsonSerializer / JsonDeserializer used in the JsonConverter.

> JsonConverter fails when there are leading Zeros in a field
> ---
>
> Key: KAFKA-15012
> URL: https://issues.apache.org/jira/browse/KAFKA-15012
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.4.0, 3.3.2
>Reporter: Ranjan Rao
>Priority: Major
> Attachments: 
> enable_ALLOW_LEADING_ZEROS_FOR_NUMBERS_in_jackson_object_mapper_.patch
>
>
> When there are leading zeros in a field in the Kakfa Record, a sink connector 
> using JsonConverter fails with the below exception
>  
> {code:java}
> org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error 
> handler
>   at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)
>   at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:494)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:474)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
>   at 
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
>   at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:237)
>   at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>   at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>   at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>   at java.base/java.lang.Thread.run(Thread.java:829)
> Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] 
> to Kafka Connect data failed due to serialization error: 
>   at 
> org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:324)
>   at 
> org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertKey(WorkerSinkTask.java:531)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:494)
>   at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
>   at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
>   ... 13 more
> Caused by: org.apache.kafka.common.errors.SerializationException: 
> com.fasterxml.jackson.core.JsonParseException: Invalid numeric value: Leading 
> zeroes not allowed
>  at [Source: (byte[])"00080153032837"; line: 1, column: 2]
> Caused by: com.fasterxml.jackson.core.JsonParseException: Invalid numeric 
> value: Leading zeroes not allowed
>  at [Source: (byte[])"00080153032837"; line: 1, column: 2]
>   at 
> com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1840)
>   at 
> com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:712)
>   at 
> com.fasterxml.jackson.core.base.ParserMinimalBase.reportInvalidNumber(ParserMinimalBase.java:551)
>   at 
> com.fasterxml.jackson.core.json.UTF8StreamJsonParser._verifyNoLeadingZeroes(UTF8StreamJsonParser.java:1520)
>   at 
> 

[jira] [Commented] (KAFKA-14974) Restore backward compatibility in KafkaBasedLog

2023-05-08 Thread Yash Mayya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17720566#comment-17720566
 ] 

Yash Mayya commented on KAFKA-14974:


Thanks [~rhauch], your understanding here is correct. We should backport [this 
fix|https://github.com/apache/kafka/pull/13688] to {{{}3.3{}}}, {{3.4}} and 
{{3.5}} as well (before the {{3.5.0}} release ideally, if possible).

> Restore backward compatibility in KafkaBasedLog
> ---
>
> Key: KAFKA-14974
> URL: https://issues.apache.org/jira/browse/KAFKA-14974
> Project: Kafka
>  Issue Type: Task
>  Components: KafkaConnect
>Affects Versions: 3.5.0, 3.4.1, 3.3.3
>Reporter: Yash Mayya
>Assignee: Yash Mayya
>Priority: Major
>
> {{KafkaBasedLog}} is a widely used utility class that provides a generic 
> implementation of a shared, compacted log of records in a Kafka topic. It 
> isn't in Connect's public API, but has been used outside of Connect and we 
> try to preserve backward compatibility whenever possible. 
> https://issues.apache.org/jira/browse/KAFKA-14455 modified the two overloaded 
> void {{KafkaBasedLog::send}} methods to return a {{{}Future{}}}. While this 
> change is source compatible, it isn't binary compatible. We can restore 
> backward compatibility simply by re-instating the older send methods, and 
> renaming the new Future returning send methods.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14455) Kafka Connect create and update REST APIs should surface failures while writing to the config topic

2023-05-08 Thread Yash Mayya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14455?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yash Mayya updated KAFKA-14455:
---
Fix Version/s: 3.5.0
   3.4.1
   3.3.3

> Kafka Connect create and update REST APIs should surface failures while 
> writing to the config topic
> ---
>
> Key: KAFKA-14455
> URL: https://issues.apache.org/jira/browse/KAFKA-14455
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Yash Mayya
>Assignee: Yash Mayya
>Priority: Major
> Fix For: 3.5.0, 3.4.1, 3.3.3
>
>
> Kafka Connect's `POST /connectors` and `PUT /connectors/\{connector}/config` 
> REST APIs internally simply write a message to the Connect cluster's internal 
> config topic (which is then processed asynchronously by the herder). However, 
> no callback is passed to the producer's send method and there is no error 
> handling in place for producer send failures (see 
> [here|https://github.com/apache/kafka/blob/c1a54671e8fc6c7daec5f5ec3d8c934be96b4989/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L716]
>  / 
> [here|https://github.com/apache/kafka/blob/c1a54671e8fc6c7daec5f5ec3d8c934be96b4989/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L726]).
> Consider one such case where the Connect worker's principal doesn't have a 
> WRITE ACL on the cluster's config topic. Now suppose the user submits a 
> connector's configs via one of the above two APIs. The producer send 
> [here|https://github.com/apache/kafka/blob/c1a54671e8fc6c7daec5f5ec3d8c934be96b4989/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L716]
>  / 
> [here|https://github.com/apache/kafka/blob/c1a54671e8fc6c7daec5f5ec3d8c934be96b4989/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L726]
>  won't succeed (due to a TopicAuthorizationException) but the API responses 
> will be `201 Created` success responses anyway. This is a very poor UX 
> because the connector will actually never be created but the API response 
> indicated success. Furthermore, this failure would only be detectable if 
> TRACE logs are enabled (via [this 
> log)|https://github.com/apache/kafka/blob/df29b17fc40f7c15460988d58bc652c3d66b60f8/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java]
>  making it near impossible for users to debug. Producer callbacks should be 
> used to surface write failures back to the user via the API response.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14974) Restore backward compatibility in KafkaBasedLog

2023-05-08 Thread Yash Mayya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14974?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yash Mayya updated KAFKA-14974:
---
Affects Version/s: 3.5.0
   3.4.1
   3.3.3

> Restore backward compatibility in KafkaBasedLog
> ---
>
> Key: KAFKA-14974
> URL: https://issues.apache.org/jira/browse/KAFKA-14974
> Project: Kafka
>  Issue Type: Task
>  Components: KafkaConnect
>Affects Versions: 3.5.0, 3.4.1, 3.3.3
>Reporter: Yash Mayya
>Assignee: Yash Mayya
>Priority: Major
>
> {{KafkaBasedLog}} is a widely used utility class that provides a generic 
> implementation of a shared, compacted log of records in a Kafka topic. It 
> isn't in Connect's public API, but has been used outside of Connect and we 
> try to preserve backward compatibility whenever possible. 
> https://issues.apache.org/jira/browse/KAFKA-14455 modified the two overloaded 
> void {{KafkaBasedLog::send}} methods to return a {{{}Future{}}}. While this 
> change is source compatible, it isn't binary compatible. We can restore 
> backward compatibility simply by re-instating the older send methods, and 
> renaming the new Future returning send methods.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14974) Restore backward compatibility in KafkaBasedLog

2023-05-08 Thread Yash Mayya (Jira)
Yash Mayya created KAFKA-14974:
--

 Summary: Restore backward compatibility in KafkaBasedLog
 Key: KAFKA-14974
 URL: https://issues.apache.org/jira/browse/KAFKA-14974
 Project: Kafka
  Issue Type: Task
Reporter: Yash Mayya
Assignee: Yash Mayya


{{KafkaBasedLog}} is a widely used utility class that provides a generic 
implementation of a shared, compacted log of records in a Kafka topic. It isn't 
in Connect's public API, but has been used outside of Connect and we try to 
preserve backward compatibility whenever possible. 
https://issues.apache.org/jira/browse/KAFKA-14455 modified the two overloaded 
void {{KafkaBasedLog::send}} methods to return a {{{}Future{}}}. While this 
change is source compatible, it isn't binary compatible. We can restore 
backward compatibility simply by re-instating the older send methods, and 
renaming the new Future returning send methods.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14930) Public documentation for new Kafka Connect offset management REST APIs

2023-05-07 Thread Yash Mayya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14930?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yash Mayya updated KAFKA-14930:
---
Description: 
Add public documentation for the new Kafka Connect offset management REST APIs 
from 
[KIP-875|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect]
 being introduced in 3.6:
 * *PATCH* /connectors/\{connector}/offsets
 * *DELETE* /connectors/\{connector}/offsets

  was:
Add public documentation for the 3 new Kafka Connect offset management REST 
APIs being introduced in 
[KIP-875:|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect]
 * *PATCH* /connectors/\{connector}/offsets
 * *DELETE* /connectors/\{connector}/offsets


> Public documentation for new Kafka Connect offset management REST APIs
> --
>
> Key: KAFKA-14930
> URL: https://issues.apache.org/jira/browse/KAFKA-14930
> Project: Kafka
>  Issue Type: Sub-task
>  Components: KafkaConnect
>Reporter: Mickael Maison
>Assignee: Yash Mayya
>Priority: Major
> Fix For: 3.6.0
>
>
> Add public documentation for the new Kafka Connect offset management REST 
> APIs from 
> [KIP-875|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect]
>  being introduced in 3.6:
>  * *PATCH* /connectors/\{connector}/offsets
>  * *DELETE* /connectors/\{connector}/offsets



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14956) Flaky test org.apache.kafka.connect.integration.OffsetsApiIntegrationTest#testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted

2023-05-02 Thread Yash Mayya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14956?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yash Mayya reassigned KAFKA-14956:
--

Assignee: Yash Mayya

> Flaky test 
> org.apache.kafka.connect.integration.OffsetsApiIntegrationTest#testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted
> --
>
> Key: KAFKA-14956
> URL: https://issues.apache.org/jira/browse/KAFKA-14956
> Project: Kafka
>  Issue Type: Bug
>Reporter: Sagar Rao
>Assignee: Yash Mayya
>Priority: Major
>
> ```
> h4. Error
> org.opentest4j.AssertionFailedError: Condition not met within timeout 15000. 
> Sink connector consumer group offsets should catch up to the topic end 
> offsets ==> expected:  but was: 
> h4. Stacktrace
> org.opentest4j.AssertionFailedError: Condition not met within timeout 15000. 
> Sink connector consumer group offsets should catch up to the topic end 
> offsets ==> expected:  but was: 
>  at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
>  at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
>  at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)
>  at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)
>  at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:211)
>  at 
> app//org.apache.kafka.test.TestUtils.lambda$waitForCondition$4(TestUtils.java:337)
>  at 
> app//org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:385)
>  at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:334)
>  at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:318)
>  at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:291)
>  at 
> app//org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.getAndVerifySinkConnectorOffsets(OffsetsApiIntegrationTest.java:150)
>  at 
> app//org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted(OffsetsApiIntegrationTest.java:131)
>  at 
> java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
>  at 
> java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>  at 
> java.base@17.0.7/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.base@17.0.7/java.lang.reflect.Method.invoke(Method.java:568)
>  at 
> app//org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>  at 
> app//org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>  at 
> app//org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>  at 
> app//org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>  at 
> app//org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>  at 
> app//org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>  at app//org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>  at 
> app//org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>  at app//org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>  at 
> app//org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>  at 
> app//org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>  at app//org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>  at app//org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>  at app//org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>  at app//org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>  at app//org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>  at app//org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>  at app//org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:108)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:40)
>  at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:60)
>  at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:52)
>  at 
> java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> 

[jira] [Commented] (KAFKA-14947) Duplicate records are getting created in the topic.

2023-05-02 Thread Yash Mayya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17718513#comment-17718513
 ] 

Yash Mayya commented on KAFKA-14947:


[~krishnendudas] the offset commits for source connectors were asynchronous and 
periodic even in AK 2.6.2, see [this 
class|https://github.com/apache/kafka/blob/da65af02e5856e3429259e26eb49986122e34747/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java#L47].

 
{quote}With our existing connect API code and with the Kafka server (2.6.2), 
our ingestion mechanism was working fine in the live environment. We checked 
the Kafka server (2.6.2) WorkerSourceTask::execute() method, and it was 
following the below-mentioned execution path
 # Poll the task for the new data.
 # If get any data, save the new data into the Kafka topic.
 # Commit the offset.{quote}
 

This isn't exactly accurate as the offset commit part isn't done synchronously 
with the poll and the produce (to the Kafka topic). Perhaps the 
*offset.flush.interval.ms* worker configuration (which determines the offset 
commit interval) is different between your two environments?

 
{quote}But that willn't be persistent. At every start, the object will be 
reset. Any suggestion, on how we can make it persistent in the new Kafka server 
(3.1.1) 
{quote}
 

That will always be the case for restarts which is why I mentioned in my 
previous comment that source offsets are typically queried from the offset 
storage only during connector / task startup. I'm not sure I follow why the 
local in-memory offset can't be used during regular running of the connector 
task?

> Duplicate records are getting created in the topic. 
> 
>
> Key: KAFKA-14947
> URL: https://issues.apache.org/jira/browse/KAFKA-14947
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 3.1.1
>Reporter: krishnendu Das
>Priority: Blocker
> Attachments: Kafka_server_3.1.1_data_duplication_issue_log
>
>
> We are using Kafka connect API (version 2.3.0) and Kafka  (version 3.1.1) for 
> data ingestion purposes. Previously we were using Kafka (version 2.6.2) and 
> the same Kafka connect API (version 2.3.0). The data ingestion was happening 
> properly. 
>  
> Recently we updated the Kafka version from 2.6.2 to 3.1.1.
> Post update we are facing duplicate data issues from the source connector 
> into the Kafka topic. After debugging the 3.1.1 code, we saw one new function
> {*}updateCommittableOffsets{*}() got added and called inside the 
> {*}WorkerSourceTask::execute{*}() as part of bug fix --"KAFKA-12226: Commit 
> source task offsets without blocking on batch delivery (#11323)"
>  
> Now because of this function, we are observing this scenario
>  # Inside the execute() at the start of the flow, the call goes to 
> updateCommittableOffsets() to check if anything was there to perform the 
> committed offset or not. As the first poll is still not yet happened, this 
> function didn't find anything for commit.
>  # Then Kafka connects API poll() method is called from the 
> WorkerSourceTask::execute(). *-> 1st poll*
>  # Kafka Connect API (using sleepy policy) reads one source file from the 
> Cloud source directory.
>  # Read the whole content of the file and send the result set Kafka server to 
> write to the Kafka topic.
>  # During the 2nd poll updateCommittableOffsets() found some offset to commit 
> and its updates a reference variable committableOffsets, which will be used 
> further by the WorkerSourceTask::commitOffsets() function to perform actual 
> commit offset.
>  # Then Kafka connects the API poll() method is called from the 
> *WorkerSourceTask::execute().* *-> 2nd poll*
>  # Kafka Connect API (using sleepy policy) reads the same source file again 
> from the start, as the offsetStrorageReader::offset() didn’t give the latest 
> offset.
>  # Read the whole content of the file and send the result set Kafka server to 
> write to the Kafka topic.---> These create duplicate data into the topic.
> 
> 
>  # WorkerSourceTask::commitOffsets() commits the offset.
> 
> 
>  # Then Kafka connects API poll() method is called from the 
> {*}WorkerSourceTask::execute(){*}. -> 3rd poll
>  # This time offsetStrorageReader::offset() will be able to give the latest 
> offset.
>  # Kafka Connect API (using sleepy policy) reads the same source file from 
> the last read position.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14876) Public documentation for new Kafka Connect offset management REST APIs in 3.5

2023-05-02 Thread Yash Mayya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14876?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yash Mayya updated KAFKA-14876:
---
Description: 
Add public documentation for the new Kafka Connect REST APIs from 
[KIP-875|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect]
 being introduced in 3.5:
 * *GET* /connectors/\{connector}/offsets
 * *PUT* /connectors/\{connector}/stop

  was:
Add public documentation for the new Kafka Connect offset management REST API 
being introduced in 
[KIP-875|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect]
 in 3.5:
 * *GET* /connectors/\{connector}/offsets


> Public documentation for new Kafka Connect offset management REST APIs in 3.5
> -
>
> Key: KAFKA-14876
> URL: https://issues.apache.org/jira/browse/KAFKA-14876
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Yash Mayya
>Assignee: Yash Mayya
>Priority: Major
> Fix For: 3.5.0
>
>
> Add public documentation for the new Kafka Connect REST APIs from 
> [KIP-875|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect]
>  being introduced in 3.5:
>  * *GET* /connectors/\{connector}/offsets
>  * *PUT* /connectors/\{connector}/stop



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14876) Public documentation for new Kafka Connect offset management REST APIs in 3.5

2023-05-02 Thread Yash Mayya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17718464#comment-17718464
 ] 

Yash Mayya commented on KAFKA-14876:


[~gharris1727] [~ChrisEgerton] sounds good, I've raised this PR to document the 
stop API - [https://github.com/apache/kafka/pull/13657] 

> Public documentation for new Kafka Connect offset management REST APIs in 3.5
> -
>
> Key: KAFKA-14876
> URL: https://issues.apache.org/jira/browse/KAFKA-14876
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Yash Mayya
>Assignee: Yash Mayya
>Priority: Major
> Fix For: 3.5.0
>
>
> Add public documentation for the new Kafka Connect offset management REST API 
> being introduced in 
> [KIP-875|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect]
>  in 3.5:
>  * *GET* /connectors/\{connector}/offsets



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-13187) Replace EasyMock and PowerMock with Mockito for DistributedHerderTest

2023-04-28 Thread Yash Mayya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13187?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yash Mayya reassigned KAFKA-13187:
--

Assignee: Yash Mayya  (was: Matthew de Detrich)

> Replace EasyMock and PowerMock with Mockito for DistributedHerderTest
> -
>
> Key: KAFKA-13187
> URL: https://issues.apache.org/jira/browse/KAFKA-13187
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: YI-CHEN WANG
>Assignee: Yash Mayya
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-14947) Duplicate records are getting created in the topic.

2023-04-27 Thread Yash Mayya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17717095#comment-17717095
 ] 

Yash Mayya edited comment on KAFKA-14947 at 4/27/23 9:58 AM:
-

[~krishnendudas] this seems like a bug in the connector's implementation rather 
than a bug in the Connect framework in Apache Kafka. Since source connector 
offsets are committed by Connect workers periodically and asynchronously, there 
is no guarantee provided that offsets will be committed between successive poll 
calls. `OffsetStorageReader::offset` is typically used only during startup of 
connectors / tasks to resume progress after restarts, pause / resume etc.

 

In your provided scenario, why can't the connector simply read from its 
previous position in the second poll since it should be maintaining some 
internal state? Also note that Kafka Connect doesn't support exactly-once 
semantics for source connectors in 3.1.1, this functionality was added in 
3.3.0. Depending on your specific connector, it might also need additional 
changes to support the changes made in 
[KIP-618|https://cwiki.apache.org/confluence/display/KAFKA/KIP-618%3A+Exactly-Once+Support+for+Source+Connectors]
 (this is publicly documented 
[here|https://kafka.apache.org/documentation/#connect_exactlyoncesourceconnectors]).
 Another more lightweight option might be for the connector to implement the 
[SourceTask::commit|https://javadoc.io/static/org.apache.kafka/connect-api/3.4.0/org/apache/kafka/connect/source/SourceTask.html#commit--]
 method and not advance its internal position / offset state until previous 
offsets are committed - however, it's important to note that this won't 
guarantee exactly-once semantics (if, for example, a Connect worker goes down 
after records have been produced to the Kafka topic but before the commit 
method is called for the task) and is also not ideal for high throughput 
scenarios since poll could be called multiple times between each offset commit 
(period is configurable via the worker property {*}offset.flush.interval.ms{*}).


was (Author: yash.mayya):
[~krishnendudas] this seems like a bug in the connector's implementation rather 
than a bug in the Connect framework in Apache Kafka. Since source connector 
offsets are committed by Connect workers periodically and asynchronously, there 
is no guarantee provided that offsets will be committed between successive poll 
calls. `OffsetStorageReader::offset` is typically used only during startup of 
connectors / tasks to resume progress after restarts, pause / resume etc.

 

In your provided scenario, why can't the connector simply read from its 
previous position in the second poll since it should be maintaining some 
internal state? Also note that Kafka Connect doesn't support exactly-once 
semantics for source connectors in 3.1.1, this functionality was added in 
3.3.0. Depending on your specific connector, it might also need additional 
changes to support the changes made in 
[KIP-618|https://cwiki.apache.org/confluence/display/KAFKA/KIP-618%3A+Exactly-Once+Support+for+Source+Connectors]
 (this is publicly documented 
[here|https://kafka.apache.org/documentation/#connect_exactlyoncesourceconnectors]).
 Another more lightweight option might be for the connector to implement the 
[SourceTask::commit|https://javadoc.io/static/org.apache.kafka/connect-api/3.4.0/org/apache/kafka/connect/source/SourceTask.html#commit--]
 method and not advance its internal position / offset state until previous 
offsets are committed - however, it's important to note that this won't 
guarantee exactly-once semantics.

> Duplicate records are getting created in the topic. 
> 
>
> Key: KAFKA-14947
> URL: https://issues.apache.org/jira/browse/KAFKA-14947
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 3.1.1
>Reporter: krishnendu Das
>Priority: Blocker
> Attachments: Kafka_server_3.1.1_data_duplication_issue_log
>
>
> We are using Kafka connect API (version 2.3.0) and Kafka  (version 3.1.1) for 
> data ingestion purposes. Previously we were using Kafka (version 2.6.2) and 
> the same Kafka connect API (version 2.3.0). The data ingestion was happening 
> properly. 
>  
> Recently we updated the Kafka version from 2.6.2 to 3.1.1.
> Post update we are facing duplicate data issues from the source connector 
> into the Kafka topic. After debugging the 3.1.1 code, we saw one new function
> {*}updateCommittableOffsets{*}() got added and called inside the 
> {*}WorkerSourceTask::execute{*}() as part of bug fix --"KAFKA-12226: Commit 
> source task offsets without blocking on batch delivery (#11323)"
>  
> Now because of this function, we are observing this scenario
>  # Inside the execute() at the start of the flow, 

[jira] [Commented] (KAFKA-14947) Duplicate records are getting created in the topic.

2023-04-27 Thread Yash Mayya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17717095#comment-17717095
 ] 

Yash Mayya commented on KAFKA-14947:


[~krishnendudas] this seems like a bug in the connector's implementation rather 
than a bug in the Connect framework in Apache Kafka. Since source connector 
offsets are committed by Connect workers periodically and asynchronously, there 
is no guarantee provided that offsets will be committed between successive poll 
calls. `OffsetStorageReader::offset` is typically used only during startup of 
connectors / tasks to resume progress after restarts, pause / resume etc.

 

In your provided scenario, why can't the connector simply read from its 
previous position in the second poll since it should be maintaining some 
internal state? Also note that Kafka Connect doesn't support exactly-once 
semantics for source connectors in 3.1.1, this functionality was added in 
3.3.0. Depending on your specific connector, it might also need additional 
changes to support the changes made in 
[KIP-618|https://cwiki.apache.org/confluence/display/KAFKA/KIP-618%3A+Exactly-Once+Support+for+Source+Connectors]
 (this is publicly documented 
[here|https://kafka.apache.org/documentation/#connect_exactlyoncesourceconnectors]).
 Another more lightweight option might be for the connector to implement the 
[SourceTask::commit|https://javadoc.io/static/org.apache.kafka/connect-api/3.4.0/org/apache/kafka/connect/source/SourceTask.html#commit--]
 method and not advance its internal position / offset state until previous 
offsets are committed - however, it's important to note that this won't 
guarantee exactly-once semantics.

> Duplicate records are getting created in the topic. 
> 
>
> Key: KAFKA-14947
> URL: https://issues.apache.org/jira/browse/KAFKA-14947
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 3.1.1
>Reporter: krishnendu Das
>Priority: Blocker
> Attachments: Kafka_server_3.1.1_data_duplication_issue_log
>
>
> We are using Kafka connect API (version 2.3.0) and Kafka  (version 3.1.1) for 
> data ingestion purposes. Previously we were using Kafka (version 2.6.2) and 
> the same Kafka connect API (version 2.3.0). The data ingestion was happening 
> properly. 
>  
> Recently we updated the Kafka version from 2.6.2 to 3.1.1.
> Post update we are facing duplicate data issues from the source connector 
> into the Kafka topic. After debugging the 3.1.1 code, we saw one new function
> {*}updateCommittableOffsets{*}() got added and called inside the 
> {*}WorkerSourceTask::execute{*}() as part of bug fix --"KAFKA-12226: Commit 
> source task offsets without blocking on batch delivery (#11323)"
>  
> Now because of this function, we are observing this scenario
>  # Inside the execute() at the start of the flow, the call goes to 
> updateCommittableOffsets() to check if anything was there to perform the 
> committed offset or not. As the first poll is still not yet happened, this 
> function didn't find anything for commit.
>  # Then Kafka connects API poll() method is called from the 
> WorkerSourceTask::execute(). *-> 1st poll*
>  # Kafka Connect API (using sleepy policy) reads one source file from the 
> Cloud source directory.
>  # Read the whole content of the file and send the result set Kafka server to 
> write to the Kafka topic.
>  # During the 2nd poll updateCommittableOffsets() found some offset to commit 
> and its updates a reference variable committableOffsets, which will be used 
> further by the WorkerSourceTask::commitOffsets() function to perform actual 
> commit offset.
>  # Then Kafka connects the API poll() method is called from the 
> *WorkerSourceTask::execute().* *-> 2nd poll*
>  # Kafka Connect API (using sleepy policy) reads the same source file again 
> from the start, as the offsetStrorageReader::offset() didn’t give the latest 
> offset.
>  # Read the whole content of the file and send the result set Kafka server to 
> write to the Kafka topic.---> These create duplicate data into the topic.
> 
> 
>  # WorkerSourceTask::commitOffsets() commits the offset.
> 
> 
>  # Then Kafka connects API poll() method is called from the 
> {*}WorkerSourceTask::execute(){*}. -> 3rd poll
>  # This time offsetStrorageReader::offset() will be able to give the latest 
> offset.
>  # Kafka Connect API (using sleepy policy) reads the same source file from 
> the last read position.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14933) Document Kafka Connect's log level REST APIs added in KIP-495

2023-04-25 Thread Yash Mayya (Jira)
Yash Mayya created KAFKA-14933:
--

 Summary: Document Kafka Connect's log level REST APIs added in 
KIP-495
 Key: KAFKA-14933
 URL: https://issues.apache.org/jira/browse/KAFKA-14933
 Project: Kafka
  Issue Type: Task
  Components: documentation, KafkaConnect
Reporter: Yash Mayya
Assignee: Yash Mayya


[KIP-495|https://cwiki.apache.org/confluence/display/KAFKA/KIP-495%3A+Dynamically+Adjust+Log+Levels+in+Connect]
 added 3 REST APIs to allow dynamically adjusting log levels on Kafka Connect 
workers. This was added a long time ago (released in AK 2.4.0) but was never 
publicly documented. These REST APIs should be documented in 
[https://kafka.apache.org/documentation/#connect_rest]. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14876) Public documentation for new Kafka Connect offset management REST APIs in 3.5

2023-04-24 Thread Yash Mayya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14876?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yash Mayya updated KAFKA-14876:
---
Description: 
Add public documentation for the new Kafka Connect offset management REST API 
being introduced in 
[KIP-875|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect]
 in 3.5:
 * *GET* /connectors/\{connector}/offsets

  was:
Add public documentation for the new Kafka Connect offset management REST APIs 
being introduced in 
[KIP-875:|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect]
 in 3.5
 * *GET* /connectors/\{connector}/offsets


> Public documentation for new Kafka Connect offset management REST APIs in 3.5
> -
>
> Key: KAFKA-14876
> URL: https://issues.apache.org/jira/browse/KAFKA-14876
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Yash Mayya
>Assignee: Yash Mayya
>Priority: Major
> Fix For: 3.5.0
>
>
> Add public documentation for the new Kafka Connect offset management REST API 
> being introduced in 
> [KIP-875|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect]
>  in 3.5:
>  * *GET* /connectors/\{connector}/offsets



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14930) Public documentation for new Kafka Connect offset management REST APIs

2023-04-24 Thread Yash Mayya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14930?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yash Mayya updated KAFKA-14930:
---
Description: 
Add public documentation for the 3 new Kafka Connect offset management REST 
APIs being introduced in 
[KIP-875:|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect]
 * *PATCH* /connectors/\{connector}/offsets
 * *DELETE* /connectors/\{connector}/offsets

  was:
Add public documentation for the 3 new Kafka Connect offset management REST 
APIs being introduced in 
[KIP-875:|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect]{*}{*}
 * *PATCH* /connectors/\{connector}/offsets
 * *DELETE* /connectors/\{connector}/offsets)


> Public documentation for new Kafka Connect offset management REST APIs
> --
>
> Key: KAFKA-14930
> URL: https://issues.apache.org/jira/browse/KAFKA-14930
> Project: Kafka
>  Issue Type: Sub-task
>  Components: KafkaConnect
>Reporter: Mickael Maison
>Assignee: Yash Mayya
>Priority: Major
> Fix For: 3.6.0
>
>
> Add public documentation for the 3 new Kafka Connect offset management REST 
> APIs being introduced in 
> [KIP-875:|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect]
>  * *PATCH* /connectors/\{connector}/offsets
>  * *DELETE* /connectors/\{connector}/offsets



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14930) Public documentation for new Kafka Connect offset management REST APIs

2023-04-24 Thread Yash Mayya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14930?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yash Mayya reassigned KAFKA-14930:
--

Assignee: Yash Mayya

> Public documentation for new Kafka Connect offset management REST APIs
> --
>
> Key: KAFKA-14930
> URL: https://issues.apache.org/jira/browse/KAFKA-14930
> Project: Kafka
>  Issue Type: Sub-task
>  Components: KafkaConnect
>Reporter: Mickael Maison
>Assignee: Yash Mayya
>Priority: Major
> Fix For: 3.6.0
>
>
> Add public documentation for the 3 new Kafka Connect offset management REST 
> APIs being introduced in 
> [KIP-875:|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect]{*}{*}
>  * *PATCH* /connectors/\{connector}/offsets
>  * *DELETE* /connectors/\{connector}/offsets)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14930) Public documentation for new Kafka Connect offset management REST APIs

2023-04-24 Thread Yash Mayya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14930?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yash Mayya updated KAFKA-14930:
---
Fix Version/s: 3.6.0

> Public documentation for new Kafka Connect offset management REST APIs
> --
>
> Key: KAFKA-14930
> URL: https://issues.apache.org/jira/browse/KAFKA-14930
> Project: Kafka
>  Issue Type: Sub-task
>  Components: KafkaConnect
>Reporter: Mickael Maison
>Priority: Major
> Fix For: 3.6.0
>
>
> Add public documentation for the 3 new Kafka Connect offset management REST 
> APIs being introduced in 
> [KIP-875:|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect]{*}{*}
>  * *PATCH* /connectors/\{connector}/offsets
>  * *DELETE* /connectors/\{connector}/offsets)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14923) Upgrade io.netty_netty-codec for CVE-2022-41881

2023-04-20 Thread Yash Mayya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17714541#comment-17714541
 ] 

Yash Mayya commented on KAFKA-14923:


[~vikashmishra0808] looks like [https://github.com/apache/kafka/pull/13070] / 
https://issues.apache.org/jira/browse/KAFKA-14564  bumped up the netty version 
to 4.1.86.Final and the commit is present on the 3.5 branch as well (see 
[here)|https://github.com/apache/kafka/blob/23c013408f620425e9b6111e13be2e20b226a2e6/gradle/dependencies.gradle#L110]
 since it was merged to trunk before the 3.5 branch was cut. The upcoming 3.5.0 
release shouldn't be impacted by this CVE.

> Upgrade io.netty_netty-codec for CVE-2022-41881
> ---
>
> Key: KAFKA-14923
> URL: https://issues.apache.org/jira/browse/KAFKA-14923
> Project: Kafka
>  Issue Type: Task
>Affects Versions: 3.4.0, 3.3.2
>Reporter: Vikash Mishra
>Priority: Critical
>
> Currently used io.netty_netty-codec version 4.1.78 has high severity CVE: 
> [NVD - CVE-2022-41881 
> (nist.gov)|https://nvd.nist.gov/vuln/detail/CVE-2022-41881]
> Fix was patched in version 4.1.86.Final. As we have higher stable version 
> 4.1.91.Final available we should upgrade to same to fix mentioned CVE.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-14876) Public documentation for new Kafka Connect offset management REST APIs

2023-04-17 Thread Yash Mayya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17713158#comment-17713158
 ] 

Yash Mayya edited comment on KAFKA-14876 at 4/17/23 4:11 PM:
-

Yep, makes sense, I can raise a PR for that soon.


was (Author: yash.mayya):
Yep, makes sense, I can raise a PR for that tomorrow.

> Public documentation for new Kafka Connect offset management REST APIs
> --
>
> Key: KAFKA-14876
> URL: https://issues.apache.org/jira/browse/KAFKA-14876
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Yash Mayya
>Assignee: Yash Mayya
>Priority: Major
> Fix For: 3.5.0
>
>
> Add public documentation for the 3 new Kafka Connect offset management REST 
> APIs being introduced in 
> [KIP-875:|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect]
>  * *GET* /connectors/\{connector}/offsets
>  * *PATCH* /connectors/\{connector}/offsets
>  * *DELETE* /connectors/\{connector}/offsets)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14876) Public documentation for new Kafka Connect offset management REST APIs

2023-04-17 Thread Yash Mayya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17713183#comment-17713183
 ] 

Yash Mayya commented on KAFKA-14876:


[~ChrisEgerton] I think we should document the new stop API only with the alter 
/ reset offsets APIs as it might not make much sense without them, what do you 
think?

> Public documentation for new Kafka Connect offset management REST APIs
> --
>
> Key: KAFKA-14876
> URL: https://issues.apache.org/jira/browse/KAFKA-14876
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Yash Mayya
>Assignee: Yash Mayya
>Priority: Major
> Fix For: 3.5.0
>
>
> Add public documentation for the 3 new Kafka Connect offset management REST 
> APIs being introduced in 
> [KIP-875:|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect]
>  * *GET* /connectors/\{connector}/offsets
>  * *PATCH* /connectors/\{connector}/offsets
>  * *DELETE* /connectors/\{connector}/offsets)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14876) Public documentation for new Kafka Connect offset management REST APIs

2023-04-17 Thread Yash Mayya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17713158#comment-17713158
 ] 

Yash Mayya commented on KAFKA-14876:


Yep, makes sense, I can raise a PR for that tomorrow.

> Public documentation for new Kafka Connect offset management REST APIs
> --
>
> Key: KAFKA-14876
> URL: https://issues.apache.org/jira/browse/KAFKA-14876
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Yash Mayya
>Assignee: Yash Mayya
>Priority: Major
>
> Add public documentation for the 3 new Kafka Connect offset management REST 
> APIs being introduced in 
> [KIP-875:|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect]
>  * *GET* /connectors/\{connector}/offsets
>  * *PATCH* /connectors/\{connector}/offsets
>  * *DELETE* /connectors/\{connector}/offsets)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14910) Consider cancelling ongoing alter connector offsets requests when the connector is resumed

2023-04-14 Thread Yash Mayya (Jira)
Yash Mayya created KAFKA-14910:
--

 Summary: Consider cancelling ongoing alter connector offsets 
requests when the connector is resumed
 Key: KAFKA-14910
 URL: https://issues.apache.org/jira/browse/KAFKA-14910
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Reporter: Yash Mayya


See discussion here for more details - 
[https://github.com/apache/kafka/pull/13465#discussion_r1164465874]

The implementation for the _*PATCH /connectors/\{connector}/offsets*_ and 
_*DELETE /connectors/\{connector}/offsets*_ APIs is completely asynchronous and 
the check for whether the connector is stopped will only be made at the 
beginning of the request. 

If the connector is resumed while the alter / reset offsets request is being 
processed, this can lead to certain issues (especially with non-EoS source 
connectors). For sink connectors, admin client requests to alter / reset 
offsets for a consumer group will be rejected if the consumer group is active 
(i.e. when the connector tasks come up). For source connectors when exactly 
once support is enabled on the worker, we do a round of zombie fencing before 
the tasks are brought up and this will basically disable the transactional 
producer used to alter offsets (the transactional producer uses the 
transactional ID for task 0 of the connector). However, for source connectors 
when exactly once support is not enabled on the worker (this is the default), 
there are no such safeguards. We could potentially add some interruption logic 
that cancels ongoing alter / reset offset requests when a connector is resumed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14910) Consider cancelling ongoing alter / reset connector offsets requests when the connector is resumed

2023-04-14 Thread Yash Mayya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14910?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yash Mayya updated KAFKA-14910:
---
Summary: Consider cancelling ongoing alter / reset connector offsets 
requests when the connector is resumed  (was: Consider cancelling ongoing alter 
connector offsets requests when the connector is resumed)

> Consider cancelling ongoing alter / reset connector offsets requests when the 
> connector is resumed
> --
>
> Key: KAFKA-14910
> URL: https://issues.apache.org/jira/browse/KAFKA-14910
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Yash Mayya
>Priority: Major
>
> See discussion here for more details - 
> [https://github.com/apache/kafka/pull/13465#discussion_r1164465874]
> The implementation for the _*PATCH /connectors/\{connector}/offsets*_ and 
> _*DELETE /connectors/\{connector}/offsets*_ APIs is completely asynchronous 
> and the check for whether the connector is stopped will only be made at the 
> beginning of the request. 
> If the connector is resumed while the alter / reset offsets request is being 
> processed, this can lead to certain issues (especially with non-EoS source 
> connectors). For sink connectors, admin client requests to alter / reset 
> offsets for a consumer group will be rejected if the consumer group is active 
> (i.e. when the connector tasks come up). For source connectors when exactly 
> once support is enabled on the worker, we do a round of zombie fencing before 
> the tasks are brought up and this will basically disable the transactional 
> producer used to alter offsets (the transactional producer uses the 
> transactional ID for task 0 of the connector). However, for source connectors 
> when exactly once support is not enabled on the worker (this is the default), 
> there are no such safeguards. We could potentially add some interruption 
> logic that cancels ongoing alter / reset offset requests when a connector is 
> resumed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14858) Standalone herder does not handle exceptions thrown from connector taskConfigs method

2023-04-09 Thread Yash Mayya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yash Mayya reassigned KAFKA-14858:
--

Assignee: Yash Mayya

> Standalone herder does not handle exceptions thrown from connector 
> taskConfigs method
> -
>
> Key: KAFKA-14858
> URL: https://issues.apache.org/jira/browse/KAFKA-14858
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Yash Mayya
>Priority: Major
>
> In distributed mode, if a connector throws an exception from its 
> {{taskConfigs}} method (invoked by the herder, through the {{Worker}} class, 
> [here|https://github.com/apache/kafka/blob/f3e4dd922933bf28b2c091e846cbc4e5255dd1d5/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1960]),
>  we wait for an exponential backoff period (see KAFKA-14732) and then [retry 
> the 
> operation|https://github.com/apache/kafka/blob/f3e4dd922933bf28b2c091e846cbc4e5255dd1d5/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1907-L1911].
> However, in standalone mode, not only do we not retry the operation, we do 
> not even log the exception. In addition, when REST calls are made that 
> require generating new task configs for a connector (which include creating 
> and reconfiguring a connector), if the connector's {{taskConfigs}} method 
> throws an exception, those requests will time out since the 
> [callback|https://github.com/apache/kafka/blob/f3e4dd922933bf28b2c091e846cbc4e5255dd1d5/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java#L183]
>  we use to respond to those requests never gets invoked.
> At a bare minimum, we should:
>  * Log any exceptions thrown from the {{taskConfigs}} method at {{ERROR}} 
> level
>  * Invoke any callbacks passed in to the relevant {{StandaloneHerder}} 
> methods with any exceptions thrown by the {{taskConfigs}} method
> We might also consider introducing the same kind of exponential backoff retry 
> logic used by distributed mode, but this can be addressed separately since it 
> would be a much larger change in behavior and may break existing user's 
> deployments.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14746) Throwing in Connector.taskConfigs in distributed mode generates a lot of logs

2023-04-09 Thread Yash Mayya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14746?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17709965#comment-17709965
 ] 

Yash Mayya commented on KAFKA-14746:


I agree that the intent of the retries seems to be mainly to handle failures 
while communicating with the leader - this infinite retry mechanism covering 
exceptions thrown from connectors' taskConfigs method doesn't seem to make 
sense intuitively and was probably an oversight. [~ChrisEgerton] are you 
suggesting a KIP to modify the existing retry mechanism to not cover exceptions 
thrown from connectors' taskConfigs method? Why would this require a KIP if 
this retry mechanism isn't part of the public API?

> Throwing in Connector.taskConfigs in distributed mode generates a lot of logs
> -
>
> Key: KAFKA-14746
> URL: https://issues.apache.org/jira/browse/KAFKA-14746
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Mickael Maison
>Priority: Major
>
> If a Connector throws in its taskConfigs() method, the runtime ends up 
> retrying using DistributedHerder.RECONFIGURE_CONNECTOR_TASKS_BACKOFF_MS which 
> is a fixed value (250ms). For each retry, the runtime prints the connector 
> configuration and the enriched configuration so this can quickly generate a 
> lot of logs.
> There is some value in throwing in taskConfigs() as it allows to fail fast in 
> case the connector is given bad credentials. For example this is what some of 
> the Debezium connectors do: 
> https://github.com/debezium/debezium/blob/main/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnector.java#L56-L69
> The way Connectors are expected to work today is to instead always create 
> tasks and let each task fail in case the configuration is wrong. We should 
> document that and make it clear in the javadoc that throwing in taskConfigs 
> is not recommended.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14876) Public documentation for new Kafka Connect offset management REST APIs

2023-04-02 Thread Yash Mayya (Jira)
Yash Mayya created KAFKA-14876:
--

 Summary: Public documentation for new Kafka Connect offset 
management REST APIs
 Key: KAFKA-14876
 URL: https://issues.apache.org/jira/browse/KAFKA-14876
 Project: Kafka
  Issue Type: Sub-task
Reporter: Yash Mayya
Assignee: Yash Mayya


Add public documentation for the 3 new Kafka Connect offset management REST 
APIs being introduced in 
[KIP-875:|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect]
 * *GET* /connectors/\{connector}/offsets
 * *PATCH* /connectors/\{connector}/offsets
 * *DELETE* /connectors/\{connector}/offsets)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14844) Kafka Connect's OffsetBackingStore interface should handle (de)serialization and connector namespacing

2023-03-24 Thread Yash Mayya (Jira)
Yash Mayya created KAFKA-14844:
--

 Summary: Kafka Connect's OffsetBackingStore interface should 
handle (de)serialization and connector namespacing
 Key: KAFKA-14844
 URL: https://issues.apache.org/jira/browse/KAFKA-14844
 Project: Kafka
  Issue Type: Task
  Components: KafkaConnect
Reporter: Yash Mayya
Assignee: Yash Mayya


Relevant discussion here - 
[https://github.com/apache/kafka/pull/13434/files#r114972]

 

TLDR - we should move serialization / deserialization and key construction 
(connector namespacing) for source connector offsets from the 
OffsetStorageWriter / OffsetStorageReader interfaces into the 
OffsetBackingStore interface. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14814) Skip restart of connectors when redundant resume request is made

2023-03-21 Thread Yash Mayya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14814?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17703107#comment-17703107
 ] 

Yash Mayya commented on KAFKA-14814:


Hi [~cmukka20], I've re-assigned the ticket to you! Feel free to ping me for 
review whenever it's ready.

> Skip restart of connectors when redundant resume request is made
> 
>
> Key: KAFKA-14814
> URL: https://issues.apache.org/jira/browse/KAFKA-14814
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Chaitanya Mukka
>Priority: Minor
>
> Consecutive requests to the {{PUT /connectors//resume}} endpoint will 
> cause the Connector to be restarted. This is a little wasteful and conflicts 
> with the idempotent nature of that endpoint. We can tweak the 
> {{MemoryConfigBackingStore}} and {{KafkaConfigBackingStore}} classes to not 
> invoke the {{onConnectorTargetStateChange}} method of their 
> {{ConfigUpdateListener}} instance if they pick up a new target state that 
> matches the current target state of the connector.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   >