[jira] [Commented] (KAFKA-16943) Synchronously verify Connect worker startup failure in InternalTopicsIntegrationTest

2024-06-20 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17856531#comment-17856531
 ] 

Chris Egerton commented on KAFKA-16943:
---

[~ksolves.kafka] the idea here isn't to assert that no workers have started 
after a given timeout, it's to assert that one or more workers has attempted, 
failed, and aborted startup. We don't want to just wait for 30 seconds, see 
that no workers have started up, and then call that good enough, since startup 
may take longer than 30 seconds on our CI infrastructure (which can be pretty 
slow), and if startup does fail before the 30 seconds are up, it still forces 
us to wait that long, adding bloat to test runtime.

> Synchronously verify Connect worker startup failure in 
> InternalTopicsIntegrationTest
> 
>
> Key: KAFKA-16943
> URL: https://issues.apache.org/jira/browse/KAFKA-16943
> Project: Kafka
>  Issue Type: Improvement
>  Components: connect
>Reporter: Chris Egerton
>Priority: Minor
>  Labels: newbie
> Attachments: code-diff.png
>
>
> Created after PR discussion 
> [here|https://github.com/apache/kafka/pull/16288#discussion_r1636615220].
> In some of our integration tests, we want to verify that a Connect worker 
> cannot start under poor conditions (such as when its internal topics do not 
> yet exist and it is configured to create them with a higher replication 
> factor than the number of available brokers, or when its internal topics 
> already exist but they do not have the compaction cleanup policy).
> This is currently not possible, and presents a possible gap in testing 
> coverage, especially for the test cases 
> {{testFailToCreateInternalTopicsWithMoreReplicasThanBrokers}} and 
> {{{}testFailToStartWhenInternalTopicsAreNotCompacted{}}}. It'd be nice if we 
> could have some way of synchronously awaiting the completion or failure of 
> worker startup in our integration tests in order to guarantee that worker 
> startup fails under sufficiently adverse conditions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14089) Flaky ExactlyOnceSourceIntegrationTest.testSeparateOffsetsTopic

2024-06-17 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14089?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton reassigned KAFKA-14089:
-

Fix Version/s: 3.8.0
   (was: 3.3.0)
 Assignee: Chris Egerton
   Resolution: Fixed

> Flaky ExactlyOnceSourceIntegrationTest.testSeparateOffsetsTopic
> ---
>
> Key: KAFKA-14089
> URL: https://issues.apache.org/jira/browse/KAFKA-14089
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.3.0
>Reporter: Mickael Maison
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 3.8.0
>
> Attachments: failure.txt, 
> org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.testSeparateOffsetsTopic.test.stdout
>
>
> It looks like the sequence got broken around "65535, 65537, 65536, 65539, 
> 65538, 65541, 65540, 65543"



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14089) Flaky ExactlyOnceSourceIntegrationTest.testSeparateOffsetsTopic

2024-06-17 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17855668#comment-17855668
 ] 

Chris Egerton commented on KAFKA-14089:
---

Should be fixed by 
[https://github.com/apache/kafka/pull/16306|https://github.com/apache/kafka/pull/16306.]

> Flaky ExactlyOnceSourceIntegrationTest.testSeparateOffsetsTopic
> ---
>
> Key: KAFKA-14089
> URL: https://issues.apache.org/jira/browse/KAFKA-14089
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.3.0
>Reporter: Mickael Maison
>Priority: Major
> Fix For: 3.3.0
>
> Attachments: failure.txt, 
> org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.testSeparateOffsetsTopic.test.stdout
>
>
> It looks like the sequence got broken around "65535, 65537, 65536, 65539, 
> 65538, 65541, 65540, 65543"



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-10816) Connect REST API should have a resource that can be used as a readiness probe

2024-06-14 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17855084#comment-17855084
 ] 

Chris Egerton commented on KAFKA-10816:
---

I've published 
[KIP-1017|https://cwiki.apache.org/confluence/display/KAFKA/KIP-1017%3A+Health+check+endpoint+for+Kafka+Connect]
 to try to address this.

> Connect REST API should have a resource that can be used as a readiness probe
> -
>
> Key: KAFKA-10816
> URL: https://issues.apache.org/jira/browse/KAFKA-10816
> Project: Kafka
>  Issue Type: Improvement
>  Components: connect
>Reporter: Randall Hauch
>Assignee: Chris Egerton
>Priority: Major
>
> There are a few ways to accurately detect whether a Connect worker is 
> *completely* ready to process all REST requests:
> # Wait for {{Herder started}} in the Connect worker logs
> # Use the REST API to issue a request that will be completed only after the 
> herder has started, such as {{GET /connectors/{name}/}} or {{GET 
> /connectors/{name}/status}}.
> Other techniques can be used to detect other startup states, though none of 
> these will guarantee that the worker has indeed completely started up and can 
> process all REST requests:
> * {{GET /}} can be used to know when the REST server has started, but this 
> may be before the worker has started completely and successfully.
> * {{GET /connectors}} can be used to know when the REST server has started, 
> but this may be before the worker has started completely and successfully. 
> And, for the distributed Connect worker, this may actually return an older 
> list of connectors if the worker hasn't yet completely read through the 
> internal config topic. It's also possible that this request returns even if 
> the worker is having trouble reading from the internal config topic.
> * {{GET /connector-plugins}} can be used to know when the REST server has 
> started, but this may be before the worker has started completely and 
> successfully.
> The Connect REST API should have an endpoint that more obviously and more 
> simply can be used as a readiness probe. This could be a new resource (e.g., 
> {{GET /status}}), though this would only work on newer Connect runtimes, and 
> existing tooling, installations, and examples would have to be modified to 
> take advantage of this feature (if it exists). 
> Alternatively, we could make sure that the existing resources (e.g., {{GET 
> /}} or {{GET /connectors}}) wait for the herder to start completely; this 
> wouldn't require a KIP and it would not require clients use different 
> technique for newer and older Connect runtimes. (Whether or not we back port 
> this is another question altogether, since it's debatable whether the 
> behavior of the existing REST resources is truly a bug.)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16935) Automatically wait for cluster startup in embedded Connect integration tests

2024-06-12 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-16935.
---
Resolution: Fixed

> Automatically wait for cluster startup in embedded Connect integration tests
> 
>
> Key: KAFKA-16935
> URL: https://issues.apache.org/jira/browse/KAFKA-16935
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 3.8.0
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 3.8.0
>
>
> It's a common idiom in our integration tests to [start an embedded Kafka and 
> Connect 
> cluster|https://github.com/apache/kafka/blob/aecaf4447561edd8da9f06e3abdf46f382dc9d89/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java#L120-L135]
>  and then immediately afterwards [wait for each worker in the Connect cluster 
> to complete 
> startup|https://github.com/apache/kafka/blob/aecaf4447561edd8da9f06e3abdf46f382dc9d89/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/ConnectAssertions.java#L62-L92].
>  Separating these two actions into separate steps makes our tests lengthier 
> and can even lead to bugs and flakiness if the second step is accidentally 
> omitted (see [https://github.com/apache/kafka/pull/16286] for one example).
> Instead, we should default to automatically awaiting the complete startup of 
> every worker in an embedded Connect cluster when {{EmbeddedConnect::start}} 
> is invoked, and require callers to opt out if they do not want to 
> automatically wait for startup to complete when invoking that method.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (KAFKA-16935) Automatically wait for cluster startup in embedded Connect integration tests

2024-06-12 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton reopened KAFKA-16935:
---

> Automatically wait for cluster startup in embedded Connect integration tests
> 
>
> Key: KAFKA-16935
> URL: https://issues.apache.org/jira/browse/KAFKA-16935
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 3.8.0
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 3.8.0
>
>
> It's a common idiom in our integration tests to [start an embedded Kafka and 
> Connect 
> cluster|https://github.com/apache/kafka/blob/aecaf4447561edd8da9f06e3abdf46f382dc9d89/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java#L120-L135]
>  and then immediately afterwards [wait for each worker in the Connect cluster 
> to complete 
> startup|https://github.com/apache/kafka/blob/aecaf4447561edd8da9f06e3abdf46f382dc9d89/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/ConnectAssertions.java#L62-L92].
>  Separating these two actions into separate steps makes our tests lengthier 
> and can even lead to bugs and flakiness if the second step is accidentally 
> omitted (see [https://github.com/apache/kafka/pull/16286] for one example).
> Instead, we should default to automatically awaiting the complete startup of 
> every worker in an embedded Connect cluster when {{EmbeddedConnect::start}} 
> is invoked, and require callers to opt out if they do not want to 
> automatically wait for startup to complete when invoking that method.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16935) Automatically wait for cluster startup in embedded Connect integration tests

2024-06-12 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-16935:
--
Fix Version/s: 3.8.0

> Automatically wait for cluster startup in embedded Connect integration tests
> 
>
> Key: KAFKA-16935
> URL: https://issues.apache.org/jira/browse/KAFKA-16935
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 3.8.0
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 3.8.0
>
>
> It's a common idiom in our integration tests to [start an embedded Kafka and 
> Connect 
> cluster|https://github.com/apache/kafka/blob/aecaf4447561edd8da9f06e3abdf46f382dc9d89/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java#L120-L135]
>  and then immediately afterwards [wait for each worker in the Connect cluster 
> to complete 
> startup|https://github.com/apache/kafka/blob/aecaf4447561edd8da9f06e3abdf46f382dc9d89/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/ConnectAssertions.java#L62-L92].
>  Separating these two actions into separate steps makes our tests lengthier 
> and can even lead to bugs and flakiness if the second step is accidentally 
> omitted (see [https://github.com/apache/kafka/pull/16286] for one example).
> Instead, we should default to automatically awaiting the complete startup of 
> every worker in an embedded Connect cluster when {{EmbeddedConnect::start}} 
> is invoked, and require callers to opt out if they do not want to 
> automatically wait for startup to complete when invoking that method.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16943) Synchronously verify Connect worker startup failure in InternalTopicsIntegrationTest

2024-06-12 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16943?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-16943:
--
Labels: newbie  (was: )

> Synchronously verify Connect worker startup failure in 
> InternalTopicsIntegrationTest
> 
>
> Key: KAFKA-16943
> URL: https://issues.apache.org/jira/browse/KAFKA-16943
> Project: Kafka
>  Issue Type: Improvement
>  Components: connect
>Reporter: Chris Egerton
>Priority: Minor
>  Labels: newbie
>
> Created after PR discussion 
> [here|https://github.com/apache/kafka/pull/16288#discussion_r1636615220].
> In some of our integration tests, we want to verify that a Connect worker 
> cannot start under poor conditions (such as when its internal topics do not 
> yet exist and it is configured to create them with a higher replication 
> factor than the number of available brokers, or when its internal topics 
> already exist but they do not have the compaction cleanup policy).
> This is currently not possible, and presents a possible gap in testing 
> coverage, especially for the test cases 
> {{testFailToCreateInternalTopicsWithMoreReplicasThanBrokers}} and 
> {{{}testFailToStartWhenInternalTopicsAreNotCompacted{}}}. It'd be nice if we 
> could have some way of synchronously awaiting the completion or failure of 
> worker startup in our integration tests in order to guarantee that worker 
> startup fails under sufficiently adverse conditions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16943) Synchronously verify Connect worker startup failure in InternalTopicsIntegrationTest

2024-06-12 Thread Chris Egerton (Jira)
Chris Egerton created KAFKA-16943:
-

 Summary: Synchronously verify Connect worker startup failure in 
InternalTopicsIntegrationTest
 Key: KAFKA-16943
 URL: https://issues.apache.org/jira/browse/KAFKA-16943
 Project: Kafka
  Issue Type: Improvement
  Components: connect
Reporter: Chris Egerton


Created after PR discussion 
[here|https://github.com/apache/kafka/pull/16288#discussion_r1636615220].

In some of our integration tests, we want to verify that a Connect worker 
cannot start under poor conditions (such as when its internal topics do not yet 
exist and it is configured to create them with a higher replication factor than 
the number of available brokers, or when its internal topics already exist but 
they do not have the compaction cleanup policy).

This is currently not possible, and presents a possible gap in testing 
coverage, especially for the test cases 
{{testFailToCreateInternalTopicsWithMoreReplicasThanBrokers}} and 
{{{}testFailToStartWhenInternalTopicsAreNotCompacted{}}}. It'd be nice if we 
could have some way of synchronously awaiting the completion or failure of 
worker startup in our integration tests in order to guarantee that worker 
startup fails under sufficiently adverse conditions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16935) Automatically wait for cluster startup in embedded Connect integration tests

2024-06-11 Thread Chris Egerton (Jira)
Chris Egerton created KAFKA-16935:
-

 Summary: Automatically wait for cluster startup in embedded 
Connect integration tests
 Key: KAFKA-16935
 URL: https://issues.apache.org/jira/browse/KAFKA-16935
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 3.8.0
Reporter: Chris Egerton
Assignee: Chris Egerton


It's a common idiom in our integration tests to [start an embedded Kafka and 
Connect 
cluster|https://github.com/apache/kafka/blob/aecaf4447561edd8da9f06e3abdf46f382dc9d89/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java#L120-L135]
 and then immediately afterwards [wait for each worker in the Connect cluster 
to complete 
startup|https://github.com/apache/kafka/blob/aecaf4447561edd8da9f06e3abdf46f382dc9d89/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/ConnectAssertions.java#L62-L92].
 Separating these two actions into separate steps makes our tests lengthier and 
can even lead to bugs and flakiness if the second step is accidentally omitted 
(see [https://github.com/apache/kafka/pull/16286] for one example).

Instead, we should default to automatically awaiting the complete startup of 
every worker in an embedded Connect cluster when {{EmbeddedConnect::start}} is 
invoked, and require callers to opt out if they do not want to automatically 
wait for startup to complete when invoking that method.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16931) Transient REST failures to forward fenceZombie requests leave Connect Tasks in FAILED state

2024-06-11 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16931?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17854113#comment-17854113
 ] 

Chris Egerton commented on KAFKA-16931:
---

Spoke too soon, have one more thought! [~ecomar] it sounds like you've been 
using MM2 with exactly-once support enabled. In dedicated mode there's 
currently no way to restart failed tasks without restarting an entire process, 
which can be a PITA. Maybe we could add more permissive retry logic for 
dedicated MM2 clusters in this case, and keep the retry logic for vanilla Kafka 
Connect clusters either as-is, or at least more conservative?

> Transient REST failures to forward fenceZombie requests leave Connect Tasks 
> in FAILED state
> ---
>
> Key: KAFKA-16931
> URL: https://issues.apache.org/jira/browse/KAFKA-16931
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: Edoardo Comar
>Priority: Major
>
> When Kafka Connect runs in exactly_once mode, a task restart will fence 
> possible zombies tasks.
> This is achieved forwarding the request to the leader worker using the REST 
> protocol.
> At scale, in distributed mode, occasionally an HTTPs request may fail because 
> of a networking glitch, reconfiguration etc
> Currently there is no attempt to retry the REST request, the task is left in 
> a FAILED state and requires an external restart (with the REST API).
> Would this issue require a small KIP to introduce configuration entries to  
> limit the number of retries, backoff times etc ?
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16931) Transient REST failures to forward fenceZombie requests leave Connect Tasks in FAILED state

2024-06-11 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16931?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17854091#comment-17854091
 ] 

Chris Egerton edited comment on KAFKA-16931 at 6/11/24 3:09 PM:


First, one small clarification: task restarts do not result in zombie fencings 
unless no successful zombie fencing has taken place yet for the current 
generation of task configs. They do require an unconditional REST request to 
the leader to check on whether that fencing has taken place yet, and to perform 
one if it hasn't.

With that out of the way, a KIP would definitely be required if we wanted to 
add new configurations related to retries. We could add some hard-coded retry 
logic for now, which IMO wouldn't require a KIP. The tricky part either way 
would be striking a balance between resiliency to transient failures (which the 
current design certainly lacks) and surfacing non-retriable errors to users in 
an easily-accessible manner (which, despite its shortcomings, the current 
design does fairly well).

If we do decide to add new configuration properties, perhaps they could apply 
for all inter-worker REST requests (including requests to the {{PUT 
/connectors//fence}} endpoint, the {{PUT 
/connectors//tasks}} endpoint, and user-initiated requests that are 
forwarded from one worker to another)? It's also a bit of a sharp edge that, 
right now, failures to forward task configs to the leader are retried 
infinitely with nothing but a ton of {{{}ERROR{}}}-level log messages to 
indicate any sign of unhealthiness, and it could be useful to allow the 
connector to fail at some point instead.

Finally, one other alternative could be to force a read to the end of the 
config topic, then only issue a request to the leader if it appears that a 
round of zombie fencing is necessary. This could significantly reduce the 
number of requests that get sent to the leader, though it wouldn't complete 
eliminate the need for them. IMO this wouldn't need a KIP either.


was (Author: chrisegerton):
First, one small clarification: task restarts do not result in zombie fencings 
unless no successful zombie fencing has taken place yet for the current 
generation of task configs. They do require an unconditional REST request to 
the leader to check on whether that fencing has taken place yet, and to perform 
one if it hasn't.

With that out of the way, a KIP would definitely be required if we wanted to 
add new configurations related to retries. We could add some hard-coded retry 
logic for now, which IMO wouldn't require a KIP. The tricky part either way 
would be striking a balance between resiliency to transient failures (which the 
current design certainly lacks) and surfacing non-retriable errors to users in 
an easily-accessible manner (which, despite its shortcomings, the current 
design does fairly well).

If we do decide to add new configuration properties, perhaps they could apply 
for all inter-worker REST requests (including requests to the {{PUT 
/connectors//fence}} endpoint, the {{PUT 
/connectors//tasks}} endpoint, and user-initiated requests that are 
forwarded from one worker to another)? It's also a bit of a sharp edge that, 
right now, failures to forward task configs to the leader are retried 
infinitely with nothing but a ton of {{{}ERROR{}}}-level log messages to 
indicate any sign of unhealthiness, and it could be useful to allow the 
connector to fail at some point instead.

> Transient REST failures to forward fenceZombie requests leave Connect Tasks 
> in FAILED state
> ---
>
> Key: KAFKA-16931
> URL: https://issues.apache.org/jira/browse/KAFKA-16931
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: Edoardo Comar
>Priority: Major
>
> When Kafka Connect runs in exactly_once mode, a task restart will fence 
> possible zombies tasks.
> This is achieved forwarding the request to the leader worker using the REST 
> protocol.
> At scale, in distributed mode, occasionally an HTTPs request may fail because 
> of a networking glitch, reconfiguration etc
> Currently there is no attempt to retry the REST request, the task is left in 
> a FAILED state and requires an external restart (with the REST API).
> Would this issue require a small KIP to introduce configuration entries to  
> limit the number of retries, backoff times etc ?
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16931) Transient REST failures to forward fenceZombie requests leave Connect Tasks in FAILED state

2024-06-11 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16931?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17854091#comment-17854091
 ] 

Chris Egerton edited comment on KAFKA-16931 at 6/11/24 3:06 PM:


First, one small clarification: task restarts do not result in zombie fencings 
unless no successful zombie fencing has taken place yet for the current 
generation of task configs. They do require an unconditional REST request to 
the leader to check on whether that fencing has taken place yet, and to perform 
one if it hasn't.

With that out of the way, a KIP would definitely be required if we wanted to 
add new configurations related to retries. We could add some hard-coded retry 
logic for now, which IMO wouldn't require a KIP. The tricky part either way 
would be striking a balance between resiliency to transient failures (which the 
current design certainly lacks) and surfacing non-retriable errors to users in 
an easily-accessible manner (which, despite its shortcomings, the current 
design does fairly well).

If we do decide to add new configuration properties, perhaps they could apply 
for all inter-worker REST requests (including requests to the {{PUT 
/connectors//fence}} endpoint, the {{PUT 
/connectors//tasks}} endpoint, and user-initiated requests that are 
forwarded from one worker to another)? It's also a bit of a sharp edge that, 
right now, failures to forward task configs to the leader are retried 
infinitely with nothing but a ton of {{{}ERROR{}}}-level log messages to 
indicate any sign of unhealthiness, and it could be useful to allow the 
connector to fail at some point instead.


was (Author: chrisegerton):
First, one small clarification: task restarts do not result in zombie fencings 
unless no successful zombie fencing has taken place yet for the current 
generation of task configs. They do require an unconditional REST request to 
the leader to check on whether that fencing has taken place yet, and to perform 
one if it hasn't.

With that out of the way, a KIP would definitely be required if we wanted to 
add new configurations related to retries. We could add some hard-coded retry 
logic for now, which IMO wouldn't require a KIP. The tricky part either way 
would be striking a balance between resiliency to transient failures (which the 
current design certainly lacks) and surfacing non-retriable errors to users in 
an easily-accessible manner (which, despite its shortcomings, the current 
design does fairly well).

> Transient REST failures to forward fenceZombie requests leave Connect Tasks 
> in FAILED state
> ---
>
> Key: KAFKA-16931
> URL: https://issues.apache.org/jira/browse/KAFKA-16931
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: Edoardo Comar
>Priority: Major
>
> When Kafka Connect runs in exactly_once mode, a task restart will fence 
> possible zombies tasks.
> This is achieved forwarding the request to the leader worker using the REST 
> protocol.
> At scale, in distributed mode, occasionally an HTTPs request may fail because 
> of a networking glitch, reconfiguration etc
> Currently there is no attempt to retry the REST request, the task is left in 
> a FAILED state and requires an external restart (with the REST API).
> Would this issue require a small KIP to introduce configuration entries to  
> limit the number of retries, backoff times etc ?
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16931) Transient REST failures to forward fenceZombie requests leave Connect Tasks in FAILED state

2024-06-11 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16931?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17854091#comment-17854091
 ] 

Chris Egerton commented on KAFKA-16931:
---

First, one small clarification: task restarts do not result in zombie fencings 
unless no successful zombie fencing has taken place yet for the current 
generation of task configs. They do require an unconditional REST request to 
the leader to check on whether that fencing has taken place yet, and to perform 
one if it hasn't.

With that out of the way, a KIP would definitely be required if we wanted to 
add new configurations related to retries. We could add some hard-coded retry 
logic for now, which IMO wouldn't require a KIP. The tricky part either way 
would be striking a balance between resiliency to transient failures (which the 
current design certainly lacks) and surfacing non-retriable errors to users in 
an easily-accessible manner (which, despite its shortcomings, the current 
design does fairly well).

> Transient REST failures to forward fenceZombie requests leave Connect Tasks 
> in FAILED state
> ---
>
> Key: KAFKA-16931
> URL: https://issues.apache.org/jira/browse/KAFKA-16931
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: Edoardo Comar
>Priority: Major
>
> When Kafka Connect runs in exactly_once mode, a task restart will fence 
> possible zombies tasks.
> This is achieved forwarding the request to the leader worker using the REST 
> protocol.
> At scale, in distributed mode, occasionally an HTTPs request may fail because 
> of a networking glitch, reconfiguration etc
> Currently there is no attempt to retry the REST request, the task is left in 
> a FAILED state and requires an external restart (with the REST API).
> Would this issue require a small KIP to introduce configuration entries to  
> limit the number of retries, backoff times etc ?
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-9228) Reconfigured converters and clients may not be propagated to connector tasks

2024-06-10 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-9228:
-
Fix Version/s: 3.7.1

> Reconfigured converters and clients may not be propagated to connector tasks
> 
>
> Key: KAFKA-9228
> URL: https://issues.apache.org/jira/browse/KAFKA-9228
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 2.3.0, 2.4.0, 2.3.1, 2.3.2
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 3.8.0, 3.7.1
>
>
> If an existing connector is reconfigured but the only changes are to its 
> converters and/or Kafka clients (enabled as of 
> [KIP-458|https://cwiki.apache.org/confluence/display/KAFKA/KIP-458%3A+Connector+Client+Config+Override+Policy]),
>  the changes will not propagate to its tasks unless the connector also 
> generates task configs that differ from the existing task configs. Even after 
> this point, if the connector tasks are reconfigured, they will still not pick 
> up on the new converter and/or Kafka client configs.
> This is because the {{DistributedHerder}} only writes new task configurations 
> to the connect config topic [if the connector-provided task configs differ 
> from the task configs already in the config 
> topic|https://github.com/apache/kafka/blob/e499c960e4f9cfc462f1a05a110d79ffa1c5b322/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1285-L1332],
>  and neither of those contain converter or Kafka client configs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-9228) Reconfigured converters and clients may not be propagated to connector tasks

2024-06-10 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-9228:
-
Fix Version/s: 3.8.0
   (was: 4.0.0)

> Reconfigured converters and clients may not be propagated to connector tasks
> 
>
> Key: KAFKA-9228
> URL: https://issues.apache.org/jira/browse/KAFKA-9228
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 2.3.0, 2.4.0, 2.3.1, 2.3.2
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 3.8.0
>
>
> If an existing connector is reconfigured but the only changes are to its 
> converters and/or Kafka clients (enabled as of 
> [KIP-458|https://cwiki.apache.org/confluence/display/KAFKA/KIP-458%3A+Connector+Client+Config+Override+Policy]),
>  the changes will not propagate to its tasks unless the connector also 
> generates task configs that differ from the existing task configs. Even after 
> this point, if the connector tasks are reconfigured, they will still not pick 
> up on the new converter and/or Kafka client configs.
> This is because the {{DistributedHerder}} only writes new task configurations 
> to the connect config topic [if the connector-provided task configs differ 
> from the task configs already in the config 
> topic|https://github.com/apache/kafka/blob/e499c960e4f9cfc462f1a05a110d79ffa1c5b322/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1285-L1332],
>  and neither of those contain converter or Kafka client configs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-9228) Reconfigured converters and clients may not be propagated to connector tasks

2024-06-10 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-9228:
-
Fix Version/s: (was: 3.9.0)

> Reconfigured converters and clients may not be propagated to connector tasks
> 
>
> Key: KAFKA-9228
> URL: https://issues.apache.org/jira/browse/KAFKA-9228
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 2.3.0, 2.4.0, 2.3.1, 2.3.2
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 4.0.0
>
>
> If an existing connector is reconfigured but the only changes are to its 
> converters and/or Kafka clients (enabled as of 
> [KIP-458|https://cwiki.apache.org/confluence/display/KAFKA/KIP-458%3A+Connector+Client+Config+Override+Policy]),
>  the changes will not propagate to its tasks unless the connector also 
> generates task configs that differ from the existing task configs. Even after 
> this point, if the connector tasks are reconfigured, they will still not pick 
> up on the new converter and/or Kafka client configs.
> This is because the {{DistributedHerder}} only writes new task configurations 
> to the connect config topic [if the connector-provided task configs differ 
> from the task configs already in the config 
> topic|https://github.com/apache/kafka/blob/e499c960e4f9cfc462f1a05a110d79ffa1c5b322/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1285-L1332],
>  and neither of those contain converter or Kafka client configs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16837) Kafka Connect fails on update connector for incorrect previous Config Provider tasks

2024-06-10 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16837?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-16837:
--
Fix Version/s: (was: 3.9.0)

> Kafka Connect fails on update connector for incorrect previous Config 
> Provider tasks
> 
>
> Key: KAFKA-16837
> URL: https://issues.apache.org/jira/browse/KAFKA-16837
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 3.5.1, 3.6.1, 3.8.0
>Reporter: Sergey Ivanov
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 3.8.0, 3.7.1
>
> Attachments: kafka_connect_config.png
>
>
> Hello,
> We faced an issue when is not possible to update Connector config if the 
> *previous* task contains ConfigProvider's value with incorrect value that 
> leads to ConfigException.
> I can provide simple Test Case to reproduce it with FileConfigProvider, but 
> actually any ConfigProvider is acceptable that could raise exception if 
> something wrong with config (like resource doesn't exist).
> *Prerequisites:*
> Kafka Connect instance with config providers:
>  
> {code:java}
> config.providers=file
> config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider{code}
>  
> 1. Create Kafka topic "test"
> 2. On the Kafka Connect instance create the file 
> "/opt/kafka/provider.properties" with content
> {code:java}
> topics=test
> {code}
> 3. Create simple FileSink connector:
> {code:java}
> PUT /connectors/local-file-sink/config
> {
>   "connector.class": "FileStreamSink",
>   "tasks.max": "1",
>   "file": "/opt/kafka/test.sink.txt",
>   "topics": "${file:/opt/kafka/provider.properties:topics}"
> }
> {code}
> 4. Checks that everything works fine:
> {code:java}
> GET /connectors?expand=info=status
> ...
> "status": {
>   "name": "local-file-sink",
>   "connector": {
> "state": "RUNNING",
> "worker_id": "10.10.10.10:8083"
>   },
>   "tasks": [
> {
>   "id": 0,
>   "state": "RUNNING",
>   "worker_id": "10.10.10.10:8083"
> }
>   ],
>   "type": "sink"
> }
>   }
> }
> {code}
> Looks fine.
> 5. Renames the file to "/opt/kafka/provider2.properties".
> 6. Update connector with new correct file name:
> {code:java}
> PUT /connectors/local-file-sink/config
> {
>   "connector.class": "FileStreamSink",
>   "tasks.max": "1",
>   "file": "/opt/kafka/test.sink.txt",
>   "topics": "${file:/opt/kafka/provider2.properties:topics}"
> }
> {code}
> Update {*}succeed{*}, got 200. 
> 7. Checks that everything works fine:
> {code:java}
> {
>   "local-file-sink": {
> "info": {
>   "name": "local-file-sink",
>   "config": {
> "connector.class": "FileStreamSink",
> "file": "/opt/kafka/test.sink.txt",
> "tasks.max": "1",
> "topics": "${file:/opt/kafka/provider2.properties:topics}",
> "name": "local-file-sink"
>   },
>   "tasks": [
> {
>   "connector": "local-file-sink",
>   "task": 0
> }
>   ],
>   "type": "sink"
> },
> "status": {
>   "name": "local-file-sink",
>   "connector": {
> "state": "RUNNING",
> "worker_id": "10.10.10.10:8083"
>   },
>   "tasks": [
> {
>   "id": 0,
>   "state": "FAILED",
>   "worker_id": "10.10.10.10:8083",
>   "trace": "org.apache.kafka.common.errors.InvalidTopicException: 
> Invalid topics: [${file:/opt/kafka/provider.properties:topics}]"
> }
>   ],
>   "type": "sink"
> }
>   }
> }
> {code}
> Config has been updated, but new task has not been created. And as result 
> connector doesn't work.
> It failed on:
> {code:java}
> [2024-05-24T12:08:24.362][ERROR][request_id= ][tenant_id= 
> ][thread=DistributedHerder-connect-1-1][class=org.apache.kafka.connect.runtime.distributed.DistributedHerder][method=lambda$reconfigureConnectorTasksWithExponentialBackoffRetries$44]
>  [Worker clientId=connect-1, groupId=streaming-service_streaming_service] 
> Failed to reconfigure connector's tasks (local-file-sink), retrying after 
> backoff.
> org.apache.kafka.common.config.ConfigException: Could not read properties 
> from file /opt/kafka/provider.properties
>  at 
> org.apache.kafka.common.config.provider.FileConfigProvider.get(FileConfigProvider.java:98)
>  at 
> org.apache.kafka.common.config.ConfigTransformer.transform(ConfigTransformer.java:103)
>  at 
> org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform(WorkerConfigTransformer.java:58)
>  at 
> org.apache.kafka.connect.storage.ClusterConfigState.taskConfig(ClusterConfigState.java:181)
>  at 
> org.apache.kafka.connect.runtime.AbstractHerder.taskConfigsChanged(AbstractHerder.java:804)
>  at 
> 

[jira] [Updated] (KAFKA-16838) Kafka Connect loads old tasks from removed connectors

2024-06-10 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16838?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-16838:
--
Fix Version/s: (was: 3.9.0)

> Kafka Connect loads old tasks from removed connectors
> -
>
> Key: KAFKA-16838
> URL: https://issues.apache.org/jira/browse/KAFKA-16838
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 3.5.1, 3.6.1, 3.8.0
>Reporter: Sergey Ivanov
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 3.8.0, 3.7.1
>
>
> Hello,
> When creating connector we faced an error from one of our ConfigProviders 
> about not existing resource, but we didn't try to set that resource as config 
> value:
> {code:java}
> [2024-05-24T12:08:24.362][ERROR][request_id= ][tenant_id= 
> ][thread=DistributedHerder-connect-1-1][class=org.apache.kafka.connect.runtime.distributed.DistributedHerder][method=lambda$reconfigureConnectorTasksWithExponentialBackoffRetries$44]
>  [Worker clientId=connect-1, groupId=streaming-service_streaming_service] 
> Failed to reconfigure connector's tasks (local-file-sink), retrying after 
> backoff.
> org.apache.kafka.common.config.ConfigException: Could not read properties 
> from file /opt/kafka/provider.properties
>  at 
> org.apache.kafka.common.config.provider.FileConfigProvider.get(FileConfigProvider.java:98)
>  at 
> org.apache.kafka.common.config.ConfigTransformer.transform(ConfigTransformer.java:103)
>  at 
> org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform(WorkerConfigTransformer.java:58)
>  at 
> org.apache.kafka.connect.storage.ClusterConfigState.taskConfig(ClusterConfigState.java:181)
>  at 
> org.apache.kafka.connect.runtime.AbstractHerder.taskConfigsChanged(AbstractHerder.java:804)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.publishConnectorTaskConfigs(DistributedHerder.java:2089)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnector(DistributedHerder.java:2082)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnectorTasksWithExponentialBackoffRetries(DistributedHerder.java:2025)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$null$42(DistributedHerder.java:2038)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.runRequest(DistributedHerder.java:2232)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:470)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:371)
>  at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
>  at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
>  at java.base/java.lang.Thread.run(Thread.java:840)
>  {code}
> It looked like there already was connector with the same name and same 
> config, +but it wasn't.+
> After investigation we found out, that few months ago on that cloud there was 
> the connector with the same name and another value for config provider. Then 
> it was removed, but by some reason when we tried to create connector with the 
> same name months ago AbstractHerder tried to update tasks from our previous 
> connector
> As an example I used FileConfigProvider, but actually any ConfigProvider is 
> acceptable which could raise exception if something wrong with config (like 
> result doesn't exist).
> We continued our investigation and found the issue 
> https://issues.apache.org/jira/browse/KAFKA-7745 that says Connect doesn't 
> send tombstone message for *commit* and *task* records in the config topic of 
> Kafka Connect. As we remember, the config topic is `compact` *that means 
> commit and tasks are are always stored* (months, years after connector 
> removing) while tombstones for connector messages are cleaned with 
> {{delete.retention.ms}}  property. That impacts further connector creations 
> with the same name.
> We didn't investigate reasons in ConfigClusterStore and how to avoid that 
> issue, because would {+}like to ask{+}, probably it's better to fix 
> KAFKA-7745 and send tombstones for commit and task messages as connect does 
> for connector and target messages?
> In the common way the TC looks like:
>  # Create connector with config provider to resource1
>  # Remove connector
>  # Remove resouce1
>  # Wait 2-4 weeks :) (until config topic being compacted and tombstone 
> messages about config and target connector are removed)
>  # Try to create connector with the same name and config provider to resource2
> 

[jira] [Updated] (KAFKA-9228) Reconfigured converters and clients may not be propagated to connector tasks

2024-06-10 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-9228:
-
Fix Version/s: 4.0.0

> Reconfigured converters and clients may not be propagated to connector tasks
> 
>
> Key: KAFKA-9228
> URL: https://issues.apache.org/jira/browse/KAFKA-9228
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 2.3.0, 2.4.0, 2.3.1, 2.3.2
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 4.0.0, 3.9.0
>
>
> If an existing connector is reconfigured but the only changes are to its 
> converters and/or Kafka clients (enabled as of 
> [KIP-458|https://cwiki.apache.org/confluence/display/KAFKA/KIP-458%3A+Connector+Client+Config+Override+Policy]),
>  the changes will not propagate to its tasks unless the connector also 
> generates task configs that differ from the existing task configs. Even after 
> this point, if the connector tasks are reconfigured, they will still not pick 
> up on the new converter and/or Kafka client configs.
> This is because the {{DistributedHerder}} only writes new task configurations 
> to the connect config topic [if the connector-provided task configs differ 
> from the task configs already in the config 
> topic|https://github.com/apache/kafka/blob/e499c960e4f9cfc462f1a05a110d79ffa1c5b322/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1285-L1332],
>  and neither of those contain converter or Kafka client configs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-9228) Reconfigured converters and clients may not be propagated to connector tasks

2024-06-10 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-9228.
--
Fix Version/s: 3.9.0
   Resolution: Fixed

> Reconfigured converters and clients may not be propagated to connector tasks
> 
>
> Key: KAFKA-9228
> URL: https://issues.apache.org/jira/browse/KAFKA-9228
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 2.3.0, 2.4.0, 2.3.1, 2.3.2
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 3.9.0
>
>
> If an existing connector is reconfigured but the only changes are to its 
> converters and/or Kafka clients (enabled as of 
> [KIP-458|https://cwiki.apache.org/confluence/display/KAFKA/KIP-458%3A+Connector+Client+Config+Override+Policy]),
>  the changes will not propagate to its tasks unless the connector also 
> generates task configs that differ from the existing task configs. Even after 
> this point, if the connector tasks are reconfigured, they will still not pick 
> up on the new converter and/or Kafka client configs.
> This is because the {{DistributedHerder}} only writes new task configurations 
> to the connect config topic [if the connector-provided task configs differ 
> from the task configs already in the config 
> topic|https://github.com/apache/kafka/blob/e499c960e4f9cfc462f1a05a110d79ffa1c5b322/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1285-L1332],
>  and neither of those contain converter or Kafka client configs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16838) Kafka Connect loads old tasks from removed connectors

2024-06-04 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16838?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-16838:
--
Fix Version/s: 3.7.1

> Kafka Connect loads old tasks from removed connectors
> -
>
> Key: KAFKA-16838
> URL: https://issues.apache.org/jira/browse/KAFKA-16838
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 3.5.1, 3.6.1, 3.8.0
>Reporter: Sergey Ivanov
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 3.8.0, 3.7.1, 3.9.0
>
>
> Hello,
> When creating connector we faced an error from one of our ConfigProviders 
> about not existing resource, but we didn't try to set that resource as config 
> value:
> {code:java}
> [2024-05-24T12:08:24.362][ERROR][request_id= ][tenant_id= 
> ][thread=DistributedHerder-connect-1-1][class=org.apache.kafka.connect.runtime.distributed.DistributedHerder][method=lambda$reconfigureConnectorTasksWithExponentialBackoffRetries$44]
>  [Worker clientId=connect-1, groupId=streaming-service_streaming_service] 
> Failed to reconfigure connector's tasks (local-file-sink), retrying after 
> backoff.
> org.apache.kafka.common.config.ConfigException: Could not read properties 
> from file /opt/kafka/provider.properties
>  at 
> org.apache.kafka.common.config.provider.FileConfigProvider.get(FileConfigProvider.java:98)
>  at 
> org.apache.kafka.common.config.ConfigTransformer.transform(ConfigTransformer.java:103)
>  at 
> org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform(WorkerConfigTransformer.java:58)
>  at 
> org.apache.kafka.connect.storage.ClusterConfigState.taskConfig(ClusterConfigState.java:181)
>  at 
> org.apache.kafka.connect.runtime.AbstractHerder.taskConfigsChanged(AbstractHerder.java:804)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.publishConnectorTaskConfigs(DistributedHerder.java:2089)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnector(DistributedHerder.java:2082)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnectorTasksWithExponentialBackoffRetries(DistributedHerder.java:2025)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$null$42(DistributedHerder.java:2038)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.runRequest(DistributedHerder.java:2232)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:470)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:371)
>  at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
>  at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
>  at java.base/java.lang.Thread.run(Thread.java:840)
>  {code}
> It looked like there already was connector with the same name and same 
> config, +but it wasn't.+
> After investigation we found out, that few months ago on that cloud there was 
> the connector with the same name and another value for config provider. Then 
> it was removed, but by some reason when we tried to create connector with the 
> same name months ago AbstractHerder tried to update tasks from our previous 
> connector
> As an example I used FileConfigProvider, but actually any ConfigProvider is 
> acceptable which could raise exception if something wrong with config (like 
> result doesn't exist).
> We continued our investigation and found the issue 
> https://issues.apache.org/jira/browse/KAFKA-7745 that says Connect doesn't 
> send tombstone message for *commit* and *task* records in the config topic of 
> Kafka Connect. As we remember, the config topic is `compact` *that means 
> commit and tasks are are always stored* (months, years after connector 
> removing) while tombstones for connector messages are cleaned with 
> {{delete.retention.ms}}  property. That impacts further connector creations 
> with the same name.
> We didn't investigate reasons in ConfigClusterStore and how to avoid that 
> issue, because would {+}like to ask{+}, probably it's better to fix 
> KAFKA-7745 and send tombstones for commit and task messages as connect does 
> for connector and target messages?
> In the common way the TC looks like:
>  # Create connector with config provider to resource1
>  # Remove connector
>  # Remove resouce1
>  # Wait 2-4 weeks :) (until config topic being compacted and tombstone 
> messages about config and target connector are removed)
>  # Try to create connector with the same name and config provider to resource2
> I 

[jira] [Updated] (KAFKA-16837) Kafka Connect fails on update connector for incorrect previous Config Provider tasks

2024-06-04 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16837?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-16837:
--
Fix Version/s: 3.7.1

> Kafka Connect fails on update connector for incorrect previous Config 
> Provider tasks
> 
>
> Key: KAFKA-16837
> URL: https://issues.apache.org/jira/browse/KAFKA-16837
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 3.5.1, 3.6.1, 3.8.0
>Reporter: Sergey Ivanov
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 3.8.0, 3.7.1, 3.9.0
>
> Attachments: kafka_connect_config.png
>
>
> Hello,
> We faced an issue when is not possible to update Connector config if the 
> *previous* task contains ConfigProvider's value with incorrect value that 
> leads to ConfigException.
> I can provide simple Test Case to reproduce it with FileConfigProvider, but 
> actually any ConfigProvider is acceptable that could raise exception if 
> something wrong with config (like resource doesn't exist).
> *Prerequisites:*
> Kafka Connect instance with config providers:
>  
> {code:java}
> config.providers=file
> config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider{code}
>  
> 1. Create Kafka topic "test"
> 2. On the Kafka Connect instance create the file 
> "/opt/kafka/provider.properties" with content
> {code:java}
> topics=test
> {code}
> 3. Create simple FileSink connector:
> {code:java}
> PUT /connectors/local-file-sink/config
> {
>   "connector.class": "FileStreamSink",
>   "tasks.max": "1",
>   "file": "/opt/kafka/test.sink.txt",
>   "topics": "${file:/opt/kafka/provider.properties:topics}"
> }
> {code}
> 4. Checks that everything works fine:
> {code:java}
> GET /connectors?expand=info=status
> ...
> "status": {
>   "name": "local-file-sink",
>   "connector": {
> "state": "RUNNING",
> "worker_id": "10.10.10.10:8083"
>   },
>   "tasks": [
> {
>   "id": 0,
>   "state": "RUNNING",
>   "worker_id": "10.10.10.10:8083"
> }
>   ],
>   "type": "sink"
> }
>   }
> }
> {code}
> Looks fine.
> 5. Renames the file to "/opt/kafka/provider2.properties".
> 6. Update connector with new correct file name:
> {code:java}
> PUT /connectors/local-file-sink/config
> {
>   "connector.class": "FileStreamSink",
>   "tasks.max": "1",
>   "file": "/opt/kafka/test.sink.txt",
>   "topics": "${file:/opt/kafka/provider2.properties:topics}"
> }
> {code}
> Update {*}succeed{*}, got 200. 
> 7. Checks that everything works fine:
> {code:java}
> {
>   "local-file-sink": {
> "info": {
>   "name": "local-file-sink",
>   "config": {
> "connector.class": "FileStreamSink",
> "file": "/opt/kafka/test.sink.txt",
> "tasks.max": "1",
> "topics": "${file:/opt/kafka/provider2.properties:topics}",
> "name": "local-file-sink"
>   },
>   "tasks": [
> {
>   "connector": "local-file-sink",
>   "task": 0
> }
>   ],
>   "type": "sink"
> },
> "status": {
>   "name": "local-file-sink",
>   "connector": {
> "state": "RUNNING",
> "worker_id": "10.10.10.10:8083"
>   },
>   "tasks": [
> {
>   "id": 0,
>   "state": "FAILED",
>   "worker_id": "10.10.10.10:8083",
>   "trace": "org.apache.kafka.common.errors.InvalidTopicException: 
> Invalid topics: [${file:/opt/kafka/provider.properties:topics}]"
> }
>   ],
>   "type": "sink"
> }
>   }
> }
> {code}
> Config has been updated, but new task has not been created. And as result 
> connector doesn't work.
> It failed on:
> {code:java}
> [2024-05-24T12:08:24.362][ERROR][request_id= ][tenant_id= 
> ][thread=DistributedHerder-connect-1-1][class=org.apache.kafka.connect.runtime.distributed.DistributedHerder][method=lambda$reconfigureConnectorTasksWithExponentialBackoffRetries$44]
>  [Worker clientId=connect-1, groupId=streaming-service_streaming_service] 
> Failed to reconfigure connector's tasks (local-file-sink), retrying after 
> backoff.
> org.apache.kafka.common.config.ConfigException: Could not read properties 
> from file /opt/kafka/provider.properties
>  at 
> org.apache.kafka.common.config.provider.FileConfigProvider.get(FileConfigProvider.java:98)
>  at 
> org.apache.kafka.common.config.ConfigTransformer.transform(ConfigTransformer.java:103)
>  at 
> org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform(WorkerConfigTransformer.java:58)
>  at 
> org.apache.kafka.connect.storage.ClusterConfigState.taskConfig(ClusterConfigState.java:181)
>  at 
> org.apache.kafka.connect.runtime.AbstractHerder.taskConfigsChanged(AbstractHerder.java:804)
>  at 
> 

[jira] [Updated] (KAFKA-16838) Kafka Connect loads old tasks from removed connectors

2024-06-04 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16838?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-16838:
--
Fix Version/s: 3.8.0

> Kafka Connect loads old tasks from removed connectors
> -
>
> Key: KAFKA-16838
> URL: https://issues.apache.org/jira/browse/KAFKA-16838
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 3.5.1, 3.6.1, 3.8.0
>Reporter: Sergey Ivanov
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 3.8.0, 3.9.0
>
>
> Hello,
> When creating connector we faced an error from one of our ConfigProviders 
> about not existing resource, but we didn't try to set that resource as config 
> value:
> {code:java}
> [2024-05-24T12:08:24.362][ERROR][request_id= ][tenant_id= 
> ][thread=DistributedHerder-connect-1-1][class=org.apache.kafka.connect.runtime.distributed.DistributedHerder][method=lambda$reconfigureConnectorTasksWithExponentialBackoffRetries$44]
>  [Worker clientId=connect-1, groupId=streaming-service_streaming_service] 
> Failed to reconfigure connector's tasks (local-file-sink), retrying after 
> backoff.
> org.apache.kafka.common.config.ConfigException: Could not read properties 
> from file /opt/kafka/provider.properties
>  at 
> org.apache.kafka.common.config.provider.FileConfigProvider.get(FileConfigProvider.java:98)
>  at 
> org.apache.kafka.common.config.ConfigTransformer.transform(ConfigTransformer.java:103)
>  at 
> org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform(WorkerConfigTransformer.java:58)
>  at 
> org.apache.kafka.connect.storage.ClusterConfigState.taskConfig(ClusterConfigState.java:181)
>  at 
> org.apache.kafka.connect.runtime.AbstractHerder.taskConfigsChanged(AbstractHerder.java:804)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.publishConnectorTaskConfigs(DistributedHerder.java:2089)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnector(DistributedHerder.java:2082)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnectorTasksWithExponentialBackoffRetries(DistributedHerder.java:2025)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$null$42(DistributedHerder.java:2038)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.runRequest(DistributedHerder.java:2232)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:470)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:371)
>  at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
>  at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
>  at java.base/java.lang.Thread.run(Thread.java:840)
>  {code}
> It looked like there already was connector with the same name and same 
> config, +but it wasn't.+
> After investigation we found out, that few months ago on that cloud there was 
> the connector with the same name and another value for config provider. Then 
> it was removed, but by some reason when we tried to create connector with the 
> same name months ago AbstractHerder tried to update tasks from our previous 
> connector
> As an example I used FileConfigProvider, but actually any ConfigProvider is 
> acceptable which could raise exception if something wrong with config (like 
> result doesn't exist).
> We continued our investigation and found the issue 
> https://issues.apache.org/jira/browse/KAFKA-7745 that says Connect doesn't 
> send tombstone message for *commit* and *task* records in the config topic of 
> Kafka Connect. As we remember, the config topic is `compact` *that means 
> commit and tasks are are always stored* (months, years after connector 
> removing) while tombstones for connector messages are cleaned with 
> {{delete.retention.ms}}  property. That impacts further connector creations 
> with the same name.
> We didn't investigate reasons in ConfigClusterStore and how to avoid that 
> issue, because would {+}like to ask{+}, probably it's better to fix 
> KAFKA-7745 and send tombstones for commit and task messages as connect does 
> for connector and target messages?
> In the common way the TC looks like:
>  # Create connector with config provider to resource1
>  # Remove connector
>  # Remove resouce1
>  # Wait 2-4 weeks :) (until config topic being compacted and tombstone 
> messages about config and target connector are removed)
>  # Try to create connector with the same name and config provider to resource2
> I can 

[jira] [Updated] (KAFKA-16837) Kafka Connect fails on update connector for incorrect previous Config Provider tasks

2024-06-04 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16837?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-16837:
--
Fix Version/s: 3.8.0

> Kafka Connect fails on update connector for incorrect previous Config 
> Provider tasks
> 
>
> Key: KAFKA-16837
> URL: https://issues.apache.org/jira/browse/KAFKA-16837
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 3.5.1, 3.6.1, 3.8.0
>Reporter: Sergey Ivanov
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 3.8.0, 3.9.0
>
> Attachments: kafka_connect_config.png
>
>
> Hello,
> We faced an issue when is not possible to update Connector config if the 
> *previous* task contains ConfigProvider's value with incorrect value that 
> leads to ConfigException.
> I can provide simple Test Case to reproduce it with FileConfigProvider, but 
> actually any ConfigProvider is acceptable that could raise exception if 
> something wrong with config (like resource doesn't exist).
> *Prerequisites:*
> Kafka Connect instance with config providers:
>  
> {code:java}
> config.providers=file
> config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider{code}
>  
> 1. Create Kafka topic "test"
> 2. On the Kafka Connect instance create the file 
> "/opt/kafka/provider.properties" with content
> {code:java}
> topics=test
> {code}
> 3. Create simple FileSink connector:
> {code:java}
> PUT /connectors/local-file-sink/config
> {
>   "connector.class": "FileStreamSink",
>   "tasks.max": "1",
>   "file": "/opt/kafka/test.sink.txt",
>   "topics": "${file:/opt/kafka/provider.properties:topics}"
> }
> {code}
> 4. Checks that everything works fine:
> {code:java}
> GET /connectors?expand=info=status
> ...
> "status": {
>   "name": "local-file-sink",
>   "connector": {
> "state": "RUNNING",
> "worker_id": "10.10.10.10:8083"
>   },
>   "tasks": [
> {
>   "id": 0,
>   "state": "RUNNING",
>   "worker_id": "10.10.10.10:8083"
> }
>   ],
>   "type": "sink"
> }
>   }
> }
> {code}
> Looks fine.
> 5. Renames the file to "/opt/kafka/provider2.properties".
> 6. Update connector with new correct file name:
> {code:java}
> PUT /connectors/local-file-sink/config
> {
>   "connector.class": "FileStreamSink",
>   "tasks.max": "1",
>   "file": "/opt/kafka/test.sink.txt",
>   "topics": "${file:/opt/kafka/provider2.properties:topics}"
> }
> {code}
> Update {*}succeed{*}, got 200. 
> 7. Checks that everything works fine:
> {code:java}
> {
>   "local-file-sink": {
> "info": {
>   "name": "local-file-sink",
>   "config": {
> "connector.class": "FileStreamSink",
> "file": "/opt/kafka/test.sink.txt",
> "tasks.max": "1",
> "topics": "${file:/opt/kafka/provider2.properties:topics}",
> "name": "local-file-sink"
>   },
>   "tasks": [
> {
>   "connector": "local-file-sink",
>   "task": 0
> }
>   ],
>   "type": "sink"
> },
> "status": {
>   "name": "local-file-sink",
>   "connector": {
> "state": "RUNNING",
> "worker_id": "10.10.10.10:8083"
>   },
>   "tasks": [
> {
>   "id": 0,
>   "state": "FAILED",
>   "worker_id": "10.10.10.10:8083",
>   "trace": "org.apache.kafka.common.errors.InvalidTopicException: 
> Invalid topics: [${file:/opt/kafka/provider.properties:topics}]"
> }
>   ],
>   "type": "sink"
> }
>   }
> }
> {code}
> Config has been updated, but new task has not been created. And as result 
> connector doesn't work.
> It failed on:
> {code:java}
> [2024-05-24T12:08:24.362][ERROR][request_id= ][tenant_id= 
> ][thread=DistributedHerder-connect-1-1][class=org.apache.kafka.connect.runtime.distributed.DistributedHerder][method=lambda$reconfigureConnectorTasksWithExponentialBackoffRetries$44]
>  [Worker clientId=connect-1, groupId=streaming-service_streaming_service] 
> Failed to reconfigure connector's tasks (local-file-sink), retrying after 
> backoff.
> org.apache.kafka.common.config.ConfigException: Could not read properties 
> from file /opt/kafka/provider.properties
>  at 
> org.apache.kafka.common.config.provider.FileConfigProvider.get(FileConfigProvider.java:98)
>  at 
> org.apache.kafka.common.config.ConfigTransformer.transform(ConfigTransformer.java:103)
>  at 
> org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform(WorkerConfigTransformer.java:58)
>  at 
> org.apache.kafka.connect.storage.ClusterConfigState.taskConfig(ClusterConfigState.java:181)
>  at 
> org.apache.kafka.connect.runtime.AbstractHerder.taskConfigsChanged(AbstractHerder.java:804)
>  at 
> 

[jira] [Resolved] (KAFKA-16838) Kafka Connect loads old tasks from removed connectors

2024-06-04 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16838?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-16838.
---
Fix Version/s: 3.9.0
   Resolution: Fixed

> Kafka Connect loads old tasks from removed connectors
> -
>
> Key: KAFKA-16838
> URL: https://issues.apache.org/jira/browse/KAFKA-16838
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 3.5.1, 3.6.1, 3.8.0
>Reporter: Sergey Ivanov
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 3.9.0
>
>
> Hello,
> When creating connector we faced an error from one of our ConfigProviders 
> about not existing resource, but we didn't try to set that resource as config 
> value:
> {code:java}
> [2024-05-24T12:08:24.362][ERROR][request_id= ][tenant_id= 
> ][thread=DistributedHerder-connect-1-1][class=org.apache.kafka.connect.runtime.distributed.DistributedHerder][method=lambda$reconfigureConnectorTasksWithExponentialBackoffRetries$44]
>  [Worker clientId=connect-1, groupId=streaming-service_streaming_service] 
> Failed to reconfigure connector's tasks (local-file-sink), retrying after 
> backoff.
> org.apache.kafka.common.config.ConfigException: Could not read properties 
> from file /opt/kafka/provider.properties
>  at 
> org.apache.kafka.common.config.provider.FileConfigProvider.get(FileConfigProvider.java:98)
>  at 
> org.apache.kafka.common.config.ConfigTransformer.transform(ConfigTransformer.java:103)
>  at 
> org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform(WorkerConfigTransformer.java:58)
>  at 
> org.apache.kafka.connect.storage.ClusterConfigState.taskConfig(ClusterConfigState.java:181)
>  at 
> org.apache.kafka.connect.runtime.AbstractHerder.taskConfigsChanged(AbstractHerder.java:804)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.publishConnectorTaskConfigs(DistributedHerder.java:2089)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnector(DistributedHerder.java:2082)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnectorTasksWithExponentialBackoffRetries(DistributedHerder.java:2025)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$null$42(DistributedHerder.java:2038)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.runRequest(DistributedHerder.java:2232)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:470)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:371)
>  at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
>  at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
>  at java.base/java.lang.Thread.run(Thread.java:840)
>  {code}
> It looked like there already was connector with the same name and same 
> config, +but it wasn't.+
> After investigation we found out, that few months ago on that cloud there was 
> the connector with the same name and another value for config provider. Then 
> it was removed, but by some reason when we tried to create connector with the 
> same name months ago AbstractHerder tried to update tasks from our previous 
> connector
> As an example I used FileConfigProvider, but actually any ConfigProvider is 
> acceptable which could raise exception if something wrong with config (like 
> result doesn't exist).
> We continued our investigation and found the issue 
> https://issues.apache.org/jira/browse/KAFKA-7745 that says Connect doesn't 
> send tombstone message for *commit* and *task* records in the config topic of 
> Kafka Connect. As we remember, the config topic is `compact` *that means 
> commit and tasks are are always stored* (months, years after connector 
> removing) while tombstones for connector messages are cleaned with 
> {{delete.retention.ms}}  property. That impacts further connector creations 
> with the same name.
> We didn't investigate reasons in ConfigClusterStore and how to avoid that 
> issue, because would {+}like to ask{+}, probably it's better to fix 
> KAFKA-7745 and send tombstones for commit and task messages as connect does 
> for connector and target messages?
> In the common way the TC looks like:
>  # Create connector with config provider to resource1
>  # Remove connector
>  # Remove resouce1
>  # Wait 2-4 weeks :) (until config topic being compacted and tombstone 
> messages about config and target connector are removed)
>  # Try to create connector with the same name and config provider to 

[jira] [Resolved] (KAFKA-16837) Kafka Connect fails on update connector for incorrect previous Config Provider tasks

2024-06-04 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16837?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-16837.
---
Fix Version/s: 3.9.0
   Resolution: Fixed

> Kafka Connect fails on update connector for incorrect previous Config 
> Provider tasks
> 
>
> Key: KAFKA-16837
> URL: https://issues.apache.org/jira/browse/KAFKA-16837
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 3.5.1, 3.6.1, 3.8.0
>Reporter: Sergey Ivanov
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 3.9.0
>
> Attachments: kafka_connect_config.png
>
>
> Hello,
> We faced an issue when is not possible to update Connector config if the 
> *previous* task contains ConfigProvider's value with incorrect value that 
> leads to ConfigException.
> I can provide simple Test Case to reproduce it with FileConfigProvider, but 
> actually any ConfigProvider is acceptable that could raise exception if 
> something wrong with config (like resource doesn't exist).
> *Prerequisites:*
> Kafka Connect instance with config providers:
>  
> {code:java}
> config.providers=file
> config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider{code}
>  
> 1. Create Kafka topic "test"
> 2. On the Kafka Connect instance create the file 
> "/opt/kafka/provider.properties" with content
> {code:java}
> topics=test
> {code}
> 3. Create simple FileSink connector:
> {code:java}
> PUT /connectors/local-file-sink/config
> {
>   "connector.class": "FileStreamSink",
>   "tasks.max": "1",
>   "file": "/opt/kafka/test.sink.txt",
>   "topics": "${file:/opt/kafka/provider.properties:topics}"
> }
> {code}
> 4. Checks that everything works fine:
> {code:java}
> GET /connectors?expand=info=status
> ...
> "status": {
>   "name": "local-file-sink",
>   "connector": {
> "state": "RUNNING",
> "worker_id": "10.10.10.10:8083"
>   },
>   "tasks": [
> {
>   "id": 0,
>   "state": "RUNNING",
>   "worker_id": "10.10.10.10:8083"
> }
>   ],
>   "type": "sink"
> }
>   }
> }
> {code}
> Looks fine.
> 5. Renames the file to "/opt/kafka/provider2.properties".
> 6. Update connector with new correct file name:
> {code:java}
> PUT /connectors/local-file-sink/config
> {
>   "connector.class": "FileStreamSink",
>   "tasks.max": "1",
>   "file": "/opt/kafka/test.sink.txt",
>   "topics": "${file:/opt/kafka/provider2.properties:topics}"
> }
> {code}
> Update {*}succeed{*}, got 200. 
> 7. Checks that everything works fine:
> {code:java}
> {
>   "local-file-sink": {
> "info": {
>   "name": "local-file-sink",
>   "config": {
> "connector.class": "FileStreamSink",
> "file": "/opt/kafka/test.sink.txt",
> "tasks.max": "1",
> "topics": "${file:/opt/kafka/provider2.properties:topics}",
> "name": "local-file-sink"
>   },
>   "tasks": [
> {
>   "connector": "local-file-sink",
>   "task": 0
> }
>   ],
>   "type": "sink"
> },
> "status": {
>   "name": "local-file-sink",
>   "connector": {
> "state": "RUNNING",
> "worker_id": "10.10.10.10:8083"
>   },
>   "tasks": [
> {
>   "id": 0,
>   "state": "FAILED",
>   "worker_id": "10.10.10.10:8083",
>   "trace": "org.apache.kafka.common.errors.InvalidTopicException: 
> Invalid topics: [${file:/opt/kafka/provider.properties:topics}]"
> }
>   ],
>   "type": "sink"
> }
>   }
> }
> {code}
> Config has been updated, but new task has not been created. And as result 
> connector doesn't work.
> It failed on:
> {code:java}
> [2024-05-24T12:08:24.362][ERROR][request_id= ][tenant_id= 
> ][thread=DistributedHerder-connect-1-1][class=org.apache.kafka.connect.runtime.distributed.DistributedHerder][method=lambda$reconfigureConnectorTasksWithExponentialBackoffRetries$44]
>  [Worker clientId=connect-1, groupId=streaming-service_streaming_service] 
> Failed to reconfigure connector's tasks (local-file-sink), retrying after 
> backoff.
> org.apache.kafka.common.config.ConfigException: Could not read properties 
> from file /opt/kafka/provider.properties
>  at 
> org.apache.kafka.common.config.provider.FileConfigProvider.get(FileConfigProvider.java:98)
>  at 
> org.apache.kafka.common.config.ConfigTransformer.transform(ConfigTransformer.java:103)
>  at 
> org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform(WorkerConfigTransformer.java:58)
>  at 
> org.apache.kafka.connect.storage.ClusterConfigState.taskConfig(ClusterConfigState.java:181)
>  at 
> org.apache.kafka.connect.runtime.AbstractHerder.taskConfigsChanged(AbstractHerder.java:804)
>  at 
> 

[jira] [Assigned] (KAFKA-16881) InitialState type leaks into the Connect REST API OpenAPI spec

2024-06-03 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16881?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton reassigned KAFKA-16881:
-

Assignee: 黃竣陽

> InitialState type leaks into the Connect REST API OpenAPI spec
> --
>
> Key: KAFKA-16881
> URL: https://issues.apache.org/jira/browse/KAFKA-16881
> Project: Kafka
>  Issue Type: Task
>  Components: connect
>Affects Versions: 3.7.0
>Reporter: Mickael Maison
>Assignee: 黃竣陽
>Priority: Major
>
> In our [OpenAPI spec 
> file|https://kafka.apache.org/37/generated/connect_rest.yaml] we have the 
> following:
> {noformat}
> CreateConnectorRequest:
>       type: object
>       properties:
>         config:
>           type: object
>           additionalProperties:
>             type: string
>         initialState:
>           type: string
>           enum:
>           - RUNNING
>           - PAUSED
>           - STOPPED
>         initial_state:
>           type: string
>           enum:
>           - RUNNING
>           - PAUSED
>           - STOPPED
>           writeOnly: true
>         name:
>           type: string{noformat}
> Only initial_state is a valid field, InitialState should not be present.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16881) InitialState type leaks into the Connect REST API OpenAPI spec

2024-06-03 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17851669#comment-17851669
 ] 

Chris Egerton commented on KAFKA-16881:
---

[~m1a2st] I've assigned the issue to you. Thanks for volunteering!

> InitialState type leaks into the Connect REST API OpenAPI spec
> --
>
> Key: KAFKA-16881
> URL: https://issues.apache.org/jira/browse/KAFKA-16881
> Project: Kafka
>  Issue Type: Task
>  Components: connect
>Affects Versions: 3.7.0
>Reporter: Mickael Maison
>Assignee: 黃竣陽
>Priority: Major
>
> In our [OpenAPI spec 
> file|https://kafka.apache.org/37/generated/connect_rest.yaml] we have the 
> following:
> {noformat}
> CreateConnectorRequest:
>       type: object
>       properties:
>         config:
>           type: object
>           additionalProperties:
>             type: string
>         initialState:
>           type: string
>           enum:
>           - RUNNING
>           - PAUSED
>           - STOPPED
>         initial_state:
>           type: string
>           enum:
>           - RUNNING
>           - PAUSED
>           - STOPPED
>           writeOnly: true
>         name:
>           type: string{noformat}
> Only initial_state is a valid field, InitialState should not be present.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16838) Kafka Connect loads old tasks from removed connectors

2024-06-03 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17851667#comment-17851667
 ] 

Chris Egerton commented on KAFKA-16838:
---

It's probably possible but it might get a bit messy. We'd need to think through 
some edge cases, such as if a worker that's patched to emit (and presumably 
handle) tombstones for task configs dies partway through deleting a connector, 
and then a worker that hasn't been patched yet (either because the cluster is 
in the middle of a rolling upgrade, or the cluster has been intentionally 
downgraded) reads from the config topic.

I don't think it's impossible to handle this, but I opted for a simpler 
approach to fix this bug for now where we just ignore task configs in the 
config topic when no accompanying connector config is present.

> Kafka Connect loads old tasks from removed connectors
> -
>
> Key: KAFKA-16838
> URL: https://issues.apache.org/jira/browse/KAFKA-16838
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 3.5.1, 3.6.1, 3.8.0
>Reporter: Sergey Ivanov
>Assignee: Chris Egerton
>Priority: Major
>
> Hello,
> When creating connector we faced an error from one of our ConfigProviders 
> about not existing resource, but we didn't try to set that resource as config 
> value:
> {code:java}
> [2024-05-24T12:08:24.362][ERROR][request_id= ][tenant_id= 
> ][thread=DistributedHerder-connect-1-1][class=org.apache.kafka.connect.runtime.distributed.DistributedHerder][method=lambda$reconfigureConnectorTasksWithExponentialBackoffRetries$44]
>  [Worker clientId=connect-1, groupId=streaming-service_streaming_service] 
> Failed to reconfigure connector's tasks (local-file-sink), retrying after 
> backoff.
> org.apache.kafka.common.config.ConfigException: Could not read properties 
> from file /opt/kafka/provider.properties
>  at 
> org.apache.kafka.common.config.provider.FileConfigProvider.get(FileConfigProvider.java:98)
>  at 
> org.apache.kafka.common.config.ConfigTransformer.transform(ConfigTransformer.java:103)
>  at 
> org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform(WorkerConfigTransformer.java:58)
>  at 
> org.apache.kafka.connect.storage.ClusterConfigState.taskConfig(ClusterConfigState.java:181)
>  at 
> org.apache.kafka.connect.runtime.AbstractHerder.taskConfigsChanged(AbstractHerder.java:804)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.publishConnectorTaskConfigs(DistributedHerder.java:2089)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnector(DistributedHerder.java:2082)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnectorTasksWithExponentialBackoffRetries(DistributedHerder.java:2025)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$null$42(DistributedHerder.java:2038)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.runRequest(DistributedHerder.java:2232)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:470)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:371)
>  at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
>  at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
>  at java.base/java.lang.Thread.run(Thread.java:840)
>  {code}
> It looked like there already was connector with the same name and same 
> config, +but it wasn't.+
> After investigation we found out, that few months ago on that cloud there was 
> the connector with the same name and another value for config provider. Then 
> it was removed, but by some reason when we tried to create connector with the 
> same name months ago AbstractHerder tried to update tasks from our previous 
> connector
> As an example I used FileConfigProvider, but actually any ConfigProvider is 
> acceptable which could raise exception if something wrong with config (like 
> result doesn't exist).
> We continued our investigation and found the issue 
> https://issues.apache.org/jira/browse/KAFKA-7745 that says Connect doesn't 
> send tombstone message for *commit* and *task* records in the config topic of 
> Kafka Connect. As we remember, the config topic is `compact` *that means 
> commit and tasks are are always stored* (months, years after connector 
> removing) while tombstones for connector messages are cleaned with 
> {{delete.retention.ms}}  property. That impacts further connector creations 
> with the same name.
> We 

[jira] [Commented] (KAFKA-16838) Kafka Connect loads old tasks from removed connectors

2024-05-31 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17851129#comment-17851129
 ] 

Chris Egerton commented on KAFKA-16838:
---

I knew this one felt familiar–we actually fixed a more common variant of this 
bug in [https://github.com/apache/kafka/pull/8444.] However, we missed the case 
of topic compaction.

> Kafka Connect loads old tasks from removed connectors
> -
>
> Key: KAFKA-16838
> URL: https://issues.apache.org/jira/browse/KAFKA-16838
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 3.5.1, 3.6.1, 3.8.0
>Reporter: Sergey Ivanov
>Assignee: Chris Egerton
>Priority: Major
>
> Hello,
> When creating connector we faced an error from one of our ConfigProviders 
> about not existing resource, but we didn't try to set that resource as config 
> value:
> {code:java}
> [2024-05-24T12:08:24.362][ERROR][request_id= ][tenant_id= 
> ][thread=DistributedHerder-connect-1-1][class=org.apache.kafka.connect.runtime.distributed.DistributedHerder][method=lambda$reconfigureConnectorTasksWithExponentialBackoffRetries$44]
>  [Worker clientId=connect-1, groupId=streaming-service_streaming_service] 
> Failed to reconfigure connector's tasks (local-file-sink), retrying after 
> backoff.
> org.apache.kafka.common.config.ConfigException: Could not read properties 
> from file /opt/kafka/provider.properties
>  at 
> org.apache.kafka.common.config.provider.FileConfigProvider.get(FileConfigProvider.java:98)
>  at 
> org.apache.kafka.common.config.ConfigTransformer.transform(ConfigTransformer.java:103)
>  at 
> org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform(WorkerConfigTransformer.java:58)
>  at 
> org.apache.kafka.connect.storage.ClusterConfigState.taskConfig(ClusterConfigState.java:181)
>  at 
> org.apache.kafka.connect.runtime.AbstractHerder.taskConfigsChanged(AbstractHerder.java:804)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.publishConnectorTaskConfigs(DistributedHerder.java:2089)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnector(DistributedHerder.java:2082)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnectorTasksWithExponentialBackoffRetries(DistributedHerder.java:2025)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$null$42(DistributedHerder.java:2038)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.runRequest(DistributedHerder.java:2232)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:470)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:371)
>  at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
>  at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
>  at java.base/java.lang.Thread.run(Thread.java:840)
>  {code}
> It looked like there already was connector with the same name and same 
> config, +but it wasn't.+
> After investigation we found out, that few months ago on that cloud there was 
> the connector with the same name and another value for config provider. Then 
> it was removed, but by some reason when we tried to create connector with the 
> same name months ago AbstractHerder tried to update tasks from our previous 
> connector
> As an example I used FileConfigProvider, but actually any ConfigProvider is 
> acceptable which could raise exception if something wrong with config (like 
> result doesn't exist).
> We continued our investigation and found the issue 
> https://issues.apache.org/jira/browse/KAFKA-7745 that says Connect doesn't 
> send tombstone message for *commit* and *task* records in the config topic of 
> Kafka Connect. As we remember, the config topic is `compact` *that means 
> commit and tasks are are always stored* (months, years after connector 
> removing) while tombstones for connector messages are cleaned with 
> {{delete.retention.ms}}  property. That impacts further connector creations 
> with the same name.
> We didn't investigate reasons in ConfigClusterStore and how to avoid that 
> issue, because would {+}like to ask{+}, probably it's better to fix 
> KAFKA-7745 and send tombstones for commit and task messages as connect does 
> for connector and target messages?
> In the common way the TC looks like:
>  # Create connector with config provider to resource1
>  # Remove connector
>  # Remove resouce1
>  # Wait 2-4 weeks :) (until config topic being compacted and 

[jira] [Resolved] (KAFKA-16844) ByteArrayConverter can't convert ByteBuffer

2024-05-30 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16844?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-16844.
---
Fix Version/s: 3.8.0
   Resolution: Fixed

> ByteArrayConverter can't convert ByteBuffer
> ---
>
> Key: KAFKA-16844
> URL: https://issues.apache.org/jira/browse/KAFKA-16844
> Project: Kafka
>  Issue Type: Improvement
>  Components: connect
>Reporter: Fan Yang
>Assignee: Fan Yang
>Priority: Minor
> Fix For: 3.8.0
>
>
> In current Schema design, schema type Bytes correspond to two kinds of 
> classes, byte[] and ByteBuffer. But current ByteArrayConverter can only 
> convert byte[]. My suggestion is to add ByteBuffer support in current 
> ByteArrayConverter.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16838) Kafka Connect loads old tasks from removed connectors

2024-05-28 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16838?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton reassigned KAFKA-16838:
-

Assignee: Chris Egerton

> Kafka Connect loads old tasks from removed connectors
> -
>
> Key: KAFKA-16838
> URL: https://issues.apache.org/jira/browse/KAFKA-16838
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 3.5.1, 3.6.1, 3.8.0
>Reporter: Sergey Ivanov
>Assignee: Chris Egerton
>Priority: Major
>
> Hello,
> When creating connector we faced an error from one of our ConfigProviders 
> about not existing resource, but we didn't try to set that resource as config 
> value:
> {code:java}
> [2024-05-24T12:08:24.362][ERROR][request_id= ][tenant_id= 
> ][thread=DistributedHerder-connect-1-1][class=org.apache.kafka.connect.runtime.distributed.DistributedHerder][method=lambda$reconfigureConnectorTasksWithExponentialBackoffRetries$44]
>  [Worker clientId=connect-1, groupId=streaming-service_streaming_service] 
> Failed to reconfigure connector's tasks (local-file-sink), retrying after 
> backoff.
> org.apache.kafka.common.config.ConfigException: Could not read properties 
> from file /opt/kafka/provider.properties
>  at 
> org.apache.kafka.common.config.provider.FileConfigProvider.get(FileConfigProvider.java:98)
>  at 
> org.apache.kafka.common.config.ConfigTransformer.transform(ConfigTransformer.java:103)
>  at 
> org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform(WorkerConfigTransformer.java:58)
>  at 
> org.apache.kafka.connect.storage.ClusterConfigState.taskConfig(ClusterConfigState.java:181)
>  at 
> org.apache.kafka.connect.runtime.AbstractHerder.taskConfigsChanged(AbstractHerder.java:804)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.publishConnectorTaskConfigs(DistributedHerder.java:2089)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnector(DistributedHerder.java:2082)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnectorTasksWithExponentialBackoffRetries(DistributedHerder.java:2025)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$null$42(DistributedHerder.java:2038)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.runRequest(DistributedHerder.java:2232)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:470)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:371)
>  at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
>  at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
>  at java.base/java.lang.Thread.run(Thread.java:840)
>  {code}
> It looked like there already was connector with the same name and same 
> config, +but it wasn't.+
> After investigation we found out, that few months ago on that cloud there was 
> the connector with the same name and another value for config provider. Then 
> it was removed, but by some reason when we tried to create connector with the 
> same name months ago AbstractHerder tried to update tasks from our previous 
> connector
> As an example I used FileConfigProvider, but actually any ConfigProvider is 
> acceptable which could raise exception if something wrong with config (like 
> result doesn't exist).
> We continued our investigation and found the issue 
> https://issues.apache.org/jira/browse/KAFKA-7745 that says Connect doesn't 
> send tombstone message for *commit* and *task* records in the config topic of 
> Kafka Connect. As we remember, the config topic is `compact` *that means 
> commit and tasks are are always stored* (months, years after connector 
> removing) while tombstones for connector messages are cleaned with 
> {{delete.retention.ms}}  property. That impacts further connector creations 
> with the same name.
> We didn't investigate reasons in ConfigClusterStore and how to avoid that 
> issue, because would {+}like to ask{+}, probably it's better to fix 
> KAFKA-7745 and send tombstones for commit and task messages as connect does 
> for connector and target messages?
> In the common way the TC looks like:
>  # Create connector with config provider to resource1
>  # Remove connector
>  # Remove resouce1
>  # Wait 2-4 weeks :) (until config topic being compacted and tombstone 
> messages about config and target connector are removed)
>  # Try to create connector with the same name and config provider to resource2
> I can provide synthetic TC to reproduce 

[jira] [Assigned] (KAFKA-16837) Kafka Connect fails on update connector for incorrect previous Config Provider tasks

2024-05-28 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16837?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton reassigned KAFKA-16837:
-

Assignee: Chris Egerton

> Kafka Connect fails on update connector for incorrect previous Config 
> Provider tasks
> 
>
> Key: KAFKA-16837
> URL: https://issues.apache.org/jira/browse/KAFKA-16837
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 3.5.1, 3.6.1, 3.8.0
>Reporter: Sergey Ivanov
>Assignee: Chris Egerton
>Priority: Major
> Attachments: kafka_connect_config.png
>
>
> Hello,
> We faced an issue when is not possible to update Connector config if the 
> *previous* task contains ConfigProvider's value with incorrect value that 
> leads to ConfigException.
> I can provide simple Test Case to reproduce it with FileConfigProvider, but 
> actually any ConfigProvider is acceptable that could raise exception if 
> something wrong with config (like resource doesn't exist).
> *Prerequisites:*
> Kafka Connect instance with config providers:
>  
> {code:java}
> config.providers=file
> config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider{code}
>  
> 1. Create Kafka topic "test"
> 2. On the Kafka Connect instance create the file 
> "/opt/kafka/provider.properties" with content
> {code:java}
> topics=test
> {code}
> 3. Create simple FileSink connector:
> {code:java}
> PUT /connectors/local-file-sink/config
> {
>   "connector.class": "FileStreamSink",
>   "tasks.max": "1",
>   "file": "/opt/kafka/test.sink.txt",
>   "topics": "${file:/opt/kafka/provider.properties:topics}"
> }
> {code}
> 4. Checks that everything works fine:
> {code:java}
> GET /connectors?expand=info=status
> ...
> "status": {
>   "name": "local-file-sink",
>   "connector": {
> "state": "RUNNING",
> "worker_id": "10.10.10.10:8083"
>   },
>   "tasks": [
> {
>   "id": 0,
>   "state": "RUNNING",
>   "worker_id": "10.10.10.10:8083"
> }
>   ],
>   "type": "sink"
> }
>   }
> }
> {code}
> Looks fine.
> 5. Renames the file to "/opt/kafka/provider2.properties".
> 6. Update connector with new correct file name:
> {code:java}
> PUT /connectors/local-file-sink/config
> {
>   "connector.class": "FileStreamSink",
>   "tasks.max": "1",
>   "file": "/opt/kafka/test.sink.txt",
>   "topics": "${file:/opt/kafka/provider2.properties:topics}"
> }
> {code}
> Update {*}succeed{*}, got 200. 
> 7. Checks that everything works fine:
> {code:java}
> {
>   "local-file-sink": {
> "info": {
>   "name": "local-file-sink",
>   "config": {
> "connector.class": "FileStreamSink",
> "file": "/opt/kafka/test.sink.txt",
> "tasks.max": "1",
> "topics": "${file:/opt/kafka/provider2.properties:topics}",
> "name": "local-file-sink"
>   },
>   "tasks": [
> {
>   "connector": "local-file-sink",
>   "task": 0
> }
>   ],
>   "type": "sink"
> },
> "status": {
>   "name": "local-file-sink",
>   "connector": {
> "state": "RUNNING",
> "worker_id": "10.10.10.10:8083"
>   },
>   "tasks": [
> {
>   "id": 0,
>   "state": "FAILED",
>   "worker_id": "10.10.10.10:8083",
>   "trace": "org.apache.kafka.common.errors.InvalidTopicException: 
> Invalid topics: [${file:/opt/kafka/provider.properties:topics}]"
> }
>   ],
>   "type": "sink"
> }
>   }
> }
> {code}
> Config has been updated, but new task has not been created. And as result 
> connector doesn't work.
> It failed on:
> {code:java}
> [2024-05-24T12:08:24.362][ERROR][request_id= ][tenant_id= 
> ][thread=DistributedHerder-connect-1-1][class=org.apache.kafka.connect.runtime.distributed.DistributedHerder][method=lambda$reconfigureConnectorTasksWithExponentialBackoffRetries$44]
>  [Worker clientId=connect-1, groupId=streaming-service_streaming_service] 
> Failed to reconfigure connector's tasks (local-file-sink), retrying after 
> backoff.
> org.apache.kafka.common.config.ConfigException: Could not read properties 
> from file /opt/kafka/provider.properties
>  at 
> org.apache.kafka.common.config.provider.FileConfigProvider.get(FileConfigProvider.java:98)
>  at 
> org.apache.kafka.common.config.ConfigTransformer.transform(ConfigTransformer.java:103)
>  at 
> org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform(WorkerConfigTransformer.java:58)
>  at 
> org.apache.kafka.connect.storage.ClusterConfigState.taskConfig(ClusterConfigState.java:181)
>  at 
> org.apache.kafka.connect.runtime.AbstractHerder.taskConfigsChanged(AbstractHerder.java:804)
>  at 
> 

[jira] [Commented] (KAFKA-16837) Kafka Connect fails on update connector for incorrect previous Config Provider tasks

2024-05-28 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17850059#comment-17850059
 ] 

Chris Egerton commented on KAFKA-16837:
---

Thanks for raising this [~mrMigles], great find!

I don't think we need to compare config provider-resolved values for task 
configs, since we end up performing that resolution at the same time for both 
sets of configs and that's likely to generate the same values. It should be 
fine to compare original configs instead.

> Kafka Connect fails on update connector for incorrect previous Config 
> Provider tasks
> 
>
> Key: KAFKA-16837
> URL: https://issues.apache.org/jira/browse/KAFKA-16837
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 3.5.1, 3.6.1, 3.8.0
>Reporter: Sergey Ivanov
>Priority: Major
> Attachments: kafka_connect_config.png
>
>
> Hello,
> We faced an issue when is not possible to update Connector config if the 
> *previous* task contains ConfigProvider's value with incorrect value that 
> leads to ConfigException.
> I can provide simple Test Case to reproduce it with FileConfigProvider, but 
> actually any ConfigProvider is acceptable that could raise exception if 
> something wrong with config (like resource doesn't exist).
> *Prerequisites:*
> Kafka Connect instance with config providers:
>  
> {code:java}
> config.providers=file
> config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider{code}
>  
> 1. Create Kafka topic "test"
> 2. On the Kafka Connect instance create the file 
> "/opt/kafka/provider.properties" with content
> {code:java}
> topics=test
> {code}
> 3. Create simple FileSink connector:
> {code:java}
> PUT /connectors/local-file-sink/config
> {
>   "connector.class": "FileStreamSink",
>   "tasks.max": "1",
>   "file": "/opt/kafka/test.sink.txt",
>   "topics": "${file:/opt/kafka/provider.properties:topics}"
> }
> {code}
> 4. Checks that everything works fine:
> {code:java}
> GET /connectors?expand=info=status
> ...
> "status": {
>   "name": "local-file-sink",
>   "connector": {
> "state": "RUNNING",
> "worker_id": "10.10.10.10:8083"
>   },
>   "tasks": [
> {
>   "id": 0,
>   "state": "RUNNING",
>   "worker_id": "10.10.10.10:8083"
> }
>   ],
>   "type": "sink"
> }
>   }
> }
> {code}
> Looks fine.
> 5. Renames the file to "/opt/kafka/provider2.properties".
> 6. Update connector with new correct file name:
> {code:java}
> PUT /connectors/local-file-sink/config
> {
>   "connector.class": "FileStreamSink",
>   "tasks.max": "1",
>   "file": "/opt/kafka/test.sink.txt",
>   "topics": "${file:/opt/kafka/provider2.properties:topics}"
> }
> {code}
> Update {*}succeed{*}, got 200. 
> 7. Checks that everything works fine:
> {code:java}
> {
>   "local-file-sink": {
> "info": {
>   "name": "local-file-sink",
>   "config": {
> "connector.class": "FileStreamSink",
> "file": "/opt/kafka/test.sink.txt",
> "tasks.max": "1",
> "topics": "${file:/opt/kafka/provider2.properties:topics}",
> "name": "local-file-sink"
>   },
>   "tasks": [
> {
>   "connector": "local-file-sink",
>   "task": 0
> }
>   ],
>   "type": "sink"
> },
> "status": {
>   "name": "local-file-sink",
>   "connector": {
> "state": "RUNNING",
> "worker_id": "10.10.10.10:8083"
>   },
>   "tasks": [
> {
>   "id": 0,
>   "state": "FAILED",
>   "worker_id": "10.10.10.10:8083",
>   "trace": "org.apache.kafka.common.errors.InvalidTopicException: 
> Invalid topics: [${file:/opt/kafka/provider.properties:topics}]"
> }
>   ],
>   "type": "sink"
> }
>   }
> }
> {code}
> Config has been updated, but new task has not been created. And as result 
> connector doesn't work.
> It failed on:
> {code:java}
> [2024-05-24T12:08:24.362][ERROR][request_id= ][tenant_id= 
> ][thread=DistributedHerder-connect-1-1][class=org.apache.kafka.connect.runtime.distributed.DistributedHerder][method=lambda$reconfigureConnectorTasksWithExponentialBackoffRetries$44]
>  [Worker clientId=connect-1, groupId=streaming-service_streaming_service] 
> Failed to reconfigure connector's tasks (local-file-sink), retrying after 
> backoff.
> org.apache.kafka.common.config.ConfigException: Could not read properties 
> from file /opt/kafka/provider.properties
>  at 
> org.apache.kafka.common.config.provider.FileConfigProvider.get(FileConfigProvider.java:98)
>  at 
> org.apache.kafka.common.config.ConfigTransformer.transform(ConfigTransformer.java:103)
>  at 
> 

[jira] [Comment Edited] (KAFKA-16837) Kafka Connect fails on update connector for incorrect previous Config Provider tasks

2024-05-28 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17850059#comment-17850059
 ] 

Chris Egerton edited comment on KAFKA-16837 at 5/28/24 3:07 PM:


Thanks for raising this [~mrMigles], great find!

I don't think we need to compare config provider-resolved values for task 
configs, since we end up performing that resolution at the same time for both 
sets of configs and that's likely to generate the same values. It should be 
fine to compare original ("raw") configs instead.


was (Author: chrisegerton):
Thanks for raising this [~mrMigles], great find!

I don't think we need to compare config provider-resolved values for task 
configs, since we end up performing that resolution at the same time for both 
sets of configs and that's likely to generate the same values. It should be 
fine to compare original configs instead.

> Kafka Connect fails on update connector for incorrect previous Config 
> Provider tasks
> 
>
> Key: KAFKA-16837
> URL: https://issues.apache.org/jira/browse/KAFKA-16837
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 3.5.1, 3.6.1, 3.8.0
>Reporter: Sergey Ivanov
>Priority: Major
> Attachments: kafka_connect_config.png
>
>
> Hello,
> We faced an issue when is not possible to update Connector config if the 
> *previous* task contains ConfigProvider's value with incorrect value that 
> leads to ConfigException.
> I can provide simple Test Case to reproduce it with FileConfigProvider, but 
> actually any ConfigProvider is acceptable that could raise exception if 
> something wrong with config (like resource doesn't exist).
> *Prerequisites:*
> Kafka Connect instance with config providers:
>  
> {code:java}
> config.providers=file
> config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider{code}
>  
> 1. Create Kafka topic "test"
> 2. On the Kafka Connect instance create the file 
> "/opt/kafka/provider.properties" with content
> {code:java}
> topics=test
> {code}
> 3. Create simple FileSink connector:
> {code:java}
> PUT /connectors/local-file-sink/config
> {
>   "connector.class": "FileStreamSink",
>   "tasks.max": "1",
>   "file": "/opt/kafka/test.sink.txt",
>   "topics": "${file:/opt/kafka/provider.properties:topics}"
> }
> {code}
> 4. Checks that everything works fine:
> {code:java}
> GET /connectors?expand=info=status
> ...
> "status": {
>   "name": "local-file-sink",
>   "connector": {
> "state": "RUNNING",
> "worker_id": "10.10.10.10:8083"
>   },
>   "tasks": [
> {
>   "id": 0,
>   "state": "RUNNING",
>   "worker_id": "10.10.10.10:8083"
> }
>   ],
>   "type": "sink"
> }
>   }
> }
> {code}
> Looks fine.
> 5. Renames the file to "/opt/kafka/provider2.properties".
> 6. Update connector with new correct file name:
> {code:java}
> PUT /connectors/local-file-sink/config
> {
>   "connector.class": "FileStreamSink",
>   "tasks.max": "1",
>   "file": "/opt/kafka/test.sink.txt",
>   "topics": "${file:/opt/kafka/provider2.properties:topics}"
> }
> {code}
> Update {*}succeed{*}, got 200. 
> 7. Checks that everything works fine:
> {code:java}
> {
>   "local-file-sink": {
> "info": {
>   "name": "local-file-sink",
>   "config": {
> "connector.class": "FileStreamSink",
> "file": "/opt/kafka/test.sink.txt",
> "tasks.max": "1",
> "topics": "${file:/opt/kafka/provider2.properties:topics}",
> "name": "local-file-sink"
>   },
>   "tasks": [
> {
>   "connector": "local-file-sink",
>   "task": 0
> }
>   ],
>   "type": "sink"
> },
> "status": {
>   "name": "local-file-sink",
>   "connector": {
> "state": "RUNNING",
> "worker_id": "10.10.10.10:8083"
>   },
>   "tasks": [
> {
>   "id": 0,
>   "state": "FAILED",
>   "worker_id": "10.10.10.10:8083",
>   "trace": "org.apache.kafka.common.errors.InvalidTopicException: 
> Invalid topics: [${file:/opt/kafka/provider.properties:topics}]"
> }
>   ],
>   "type": "sink"
> }
>   }
> }
> {code}
> Config has been updated, but new task has not been created. And as result 
> connector doesn't work.
> It failed on:
> {code:java}
> [2024-05-24T12:08:24.362][ERROR][request_id= ][tenant_id= 
> ][thread=DistributedHerder-connect-1-1][class=org.apache.kafka.connect.runtime.distributed.DistributedHerder][method=lambda$reconfigureConnectorTasksWithExponentialBackoffRetries$44]
>  [Worker clientId=connect-1, groupId=streaming-service_streaming_service] 
> Failed to reconfigure connector's tasks (local-file-sink), retrying 

[jira] [Assigned] (KAFKA-9228) Reconfigured converters and clients may not be propagated to connector tasks

2024-05-20 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton reassigned KAFKA-9228:


Assignee: Chris Egerton  (was: Greg Harris)

> Reconfigured converters and clients may not be propagated to connector tasks
> 
>
> Key: KAFKA-9228
> URL: https://issues.apache.org/jira/browse/KAFKA-9228
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 2.3.0, 2.4.0, 2.3.1, 2.3.2
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
>
> If an existing connector is reconfigured but the only changes are to its 
> converters and/or Kafka clients (enabled as of 
> [KIP-458|https://cwiki.apache.org/confluence/display/KAFKA/KIP-458%3A+Connector+Client+Config+Override+Policy]),
>  the changes will not propagate to its tasks unless the connector also 
> generates task configs that differ from the existing task configs. Even after 
> this point, if the connector tasks are reconfigured, they will still not pick 
> up on the new converter and/or Kafka client configs.
> This is because the {{DistributedHerder}} only writes new task configurations 
> to the connect config topic [if the connector-provided task configs differ 
> from the task configs already in the config 
> topic|https://github.com/apache/kafka/blob/e499c960e4f9cfc462f1a05a110d79ffa1c5b322/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1285-L1332],
>  and neither of those contain converter or Kafka client configs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16603) Data loss when kafka connect sending data to Kafka

2024-05-20 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16603?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-16603.
---
Resolution: Not A Bug

> Data loss when kafka connect sending data to Kafka
> --
>
> Key: KAFKA-16603
> URL: https://issues.apache.org/jira/browse/KAFKA-16603
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 3.3.1
>Reporter: Anil Dasari
>Priority: Major
>
> We are experiencing a data loss when Kafka Source connector is failed to send 
> data to Kafka topic and offset topic. 
> Kafka cluster and Kafka connect details:
>  # Kafka connect version i.e client : Confluent community version 7.3.1 i.e 
> Kafka 3.3.1
>  # Kafka version: 0.11.0 (server)
>  # Cluster size : 3 brokers
>  # Number of partitions in all topics = 3
>  # Replication factor = 3
>  # Min ISR set 2
>  # Uses no transformations in Kafka connector
>  # Use default error tolerance i.e None.
> Our connector checkpoints the offsets info received in 
> SourceTask#commitRecord and resume the data process from the persisted 
> checkpoint.
> The data loss is noticed when broker is unresponsive for few mins due to high 
> load and kafka connector was restarted. Also, Kafka connector graceful 
> shutdown failed.
> Logs:
>  
> {code:java}
> [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Discovered group 
> coordinator 10.75.100.176:31000 (id: 2147483647 rack: null)
> Apr 22, 2024 @ 15:56:16.152 [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Group coordinator 
> 10.75.100.176:31000 (id: 2147483647 rack: null) is unavailable or invalid due 
> to cause: coordinator unavailable. isDisconnected: false. Rediscovery will be 
> attempted.
> Apr 22, 2024 @ 15:56:16.153 [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Requesting disconnect from 
> last known coordinator 10.75.100.176:31000 (id: 2147483647 rack: null)
> Apr 22, 2024 @ 15:56:16.514 [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Node 0 disconnected.
> Apr 22, 2024 @ 15:56:16.708 [Producer 
> clientId=connector-producer-d094a5d7bbb046b99d62398cb84d648c-0] Node 0 
> disconnected.
> Apr 22, 2024 @ 15:56:16.710 [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Node 2147483647 
> disconnected.
> Apr 22, 2024 @ 15:56:16.731 [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Group coordinator 
> 10.75.100.176:31000 (id: 2147483647 rack: null) is unavailable or invalid due 
> to cause: coordinator unavailable. isDisconnected: true. Rediscovery will be 
> attempted.
> Apr 22, 2024 @ 15:56:19.103 == Trying to sleep while stop == (** custom log 
> **)
> Apr 22, 2024 @ 15:56:19.755 [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Broker coordinator was 
> unreachable for 3000ms. Revoking previous assignment Assignment{error=0, 
> leader='connect-1-8f41a1d2-6cc9-4956-9be3-1fbae9c6d305', 
> leaderUrl='http://10.75.100.46:8083/', offset=4, 
> connectorIds=[d094a5d7bbb046b99d62398cb84d648c], 
> taskIds=[d094a5d7bbb046b99d62398cb84d648c-0], revokedConnectorIds=[], 
> revokedTaskIds=[], delay=0} to avoid running tasks while not being a member 
> the group
> Apr 22, 2024 @ 15:56:19.866 Stopping connector 
> d094a5d7bbb046b99d62398cb84d648c
> Apr 22, 2024 @ 15:56:19.874 Stopping task d094a5d7bbb046b99d62398cb84d648c-0
> Apr 22, 2024 @ 15:56:19.880 Scheduled shutdown for 
> WorkerConnectorWorkerConnector{id=d094a5d7bbb046b99d62398cb84d648c}
> Apr 22, 2024 @ 15:56:24.105 Connector 'd094a5d7bbb046b99d62398cb84d648c' 
> failed to properly shut down, has become unresponsive, and may be consuming 
> external resources. Correct the configuration for this connector or remove 
> the connector. After fixing the connector, it may be necessary to restart 
> this worker to release any consumed resources.
> Apr 22, 2024 @ 15:56:24.110 [Producer 
> clientId=connector-producer-d094a5d7bbb046b99d62398cb84d648c-0] Closing the 
> Kafka producer with timeoutMillis = 0 ms.
> Apr 22, 2024 @ 15:56:24.110 [Producer 
> clientId=connector-producer-d094a5d7bbb046b99d62398cb84d648c-0] Proceeding to 
> force close the producer since pending requests could not be completed within 
> timeout 0 ms.
> Apr 22, 2024 @ 15:56:24.112 [Producer 
> clientId=connector-producer-d094a5d7bbb046b99d62398cb84d648c-0] Beginning 
> shutdown of Kafka producer I/O thread, sending remaining records.
> Apr 22, 2024 @ 15:56:24.112 [Producer 
> clientId=connector-producer-d094a5d7bbb046b99d62398cb84d648c-0] Aborting 
> incomplete batches due to forced shutdown
> Apr 22, 2024 @ 15:56:24.113 
> WorkerSourceTaskWorkerSourceTask{id=d094a5d7bbb046b99d62398cb84d648c-0} 
> 

[jira] [Commented] (KAFKA-16603) Data loss when kafka connect sending data to Kafka

2024-05-20 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847880#comment-17847880
 ] 

Chris Egerton commented on KAFKA-16603:
---

Since this is not a bug (the Connect runtime does not cause data loss if its 
API is used correctly), I agree that we can close this ticket. I'll leave it up 
to you or other connector developers if they'd like to see a 
{{SourceConnector::onCommit}} hook for when new offsets are read from the 
offsets topic, though; I don't want to file a Jira ticket unless I strongly 
believe the change is warranted.

> Data loss when kafka connect sending data to Kafka
> --
>
> Key: KAFKA-16603
> URL: https://issues.apache.org/jira/browse/KAFKA-16603
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 3.3.1
>Reporter: Anil Dasari
>Priority: Major
>
> We are experiencing a data loss when Kafka Source connector is failed to send 
> data to Kafka topic and offset topic. 
> Kafka cluster and Kafka connect details:
>  # Kafka connect version i.e client : Confluent community version 7.3.1 i.e 
> Kafka 3.3.1
>  # Kafka version: 0.11.0 (server)
>  # Cluster size : 3 brokers
>  # Number of partitions in all topics = 3
>  # Replication factor = 3
>  # Min ISR set 2
>  # Uses no transformations in Kafka connector
>  # Use default error tolerance i.e None.
> Our connector checkpoints the offsets info received in 
> SourceTask#commitRecord and resume the data process from the persisted 
> checkpoint.
> The data loss is noticed when broker is unresponsive for few mins due to high 
> load and kafka connector was restarted. Also, Kafka connector graceful 
> shutdown failed.
> Logs:
>  
> {code:java}
> [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Discovered group 
> coordinator 10.75.100.176:31000 (id: 2147483647 rack: null)
> Apr 22, 2024 @ 15:56:16.152 [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Group coordinator 
> 10.75.100.176:31000 (id: 2147483647 rack: null) is unavailable or invalid due 
> to cause: coordinator unavailable. isDisconnected: false. Rediscovery will be 
> attempted.
> Apr 22, 2024 @ 15:56:16.153 [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Requesting disconnect from 
> last known coordinator 10.75.100.176:31000 (id: 2147483647 rack: null)
> Apr 22, 2024 @ 15:56:16.514 [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Node 0 disconnected.
> Apr 22, 2024 @ 15:56:16.708 [Producer 
> clientId=connector-producer-d094a5d7bbb046b99d62398cb84d648c-0] Node 0 
> disconnected.
> Apr 22, 2024 @ 15:56:16.710 [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Node 2147483647 
> disconnected.
> Apr 22, 2024 @ 15:56:16.731 [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Group coordinator 
> 10.75.100.176:31000 (id: 2147483647 rack: null) is unavailable or invalid due 
> to cause: coordinator unavailable. isDisconnected: true. Rediscovery will be 
> attempted.
> Apr 22, 2024 @ 15:56:19.103 == Trying to sleep while stop == (** custom log 
> **)
> Apr 22, 2024 @ 15:56:19.755 [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Broker coordinator was 
> unreachable for 3000ms. Revoking previous assignment Assignment{error=0, 
> leader='connect-1-8f41a1d2-6cc9-4956-9be3-1fbae9c6d305', 
> leaderUrl='http://10.75.100.46:8083/', offset=4, 
> connectorIds=[d094a5d7bbb046b99d62398cb84d648c], 
> taskIds=[d094a5d7bbb046b99d62398cb84d648c-0], revokedConnectorIds=[], 
> revokedTaskIds=[], delay=0} to avoid running tasks while not being a member 
> the group
> Apr 22, 2024 @ 15:56:19.866 Stopping connector 
> d094a5d7bbb046b99d62398cb84d648c
> Apr 22, 2024 @ 15:56:19.874 Stopping task d094a5d7bbb046b99d62398cb84d648c-0
> Apr 22, 2024 @ 15:56:19.880 Scheduled shutdown for 
> WorkerConnectorWorkerConnector{id=d094a5d7bbb046b99d62398cb84d648c}
> Apr 22, 2024 @ 15:56:24.105 Connector 'd094a5d7bbb046b99d62398cb84d648c' 
> failed to properly shut down, has become unresponsive, and may be consuming 
> external resources. Correct the configuration for this connector or remove 
> the connector. After fixing the connector, it may be necessary to restart 
> this worker to release any consumed resources.
> Apr 22, 2024 @ 15:56:24.110 [Producer 
> clientId=connector-producer-d094a5d7bbb046b99d62398cb84d648c-0] Closing the 
> Kafka producer with timeoutMillis = 0 ms.
> Apr 22, 2024 @ 15:56:24.110 [Producer 
> clientId=connector-producer-d094a5d7bbb046b99d62398cb84d648c-0] Proceeding to 
> force close the producer since pending requests could not be completed within 
> timeout 0 ms.
> Apr 22, 2024 @ 15:56:24.112 

[jira] [Resolved] (KAFKA-16656) Using a custom replication.policy.separator with DefaultReplicationPolicy

2024-05-20 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16656?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-16656.
---
Resolution: Not A Bug

> Using a custom replication.policy.separator with DefaultReplicationPolicy
> -
>
> Key: KAFKA-16656
> URL: https://issues.apache.org/jira/browse/KAFKA-16656
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 3.5.1
>Reporter: Lenin Joseph
>Priority: Major
>
> Hi,
> In the case of bidirectional replication using mm2, when we tried using a 
> custom replication.policy.separator( ex: "-") with DefaultReplicationPolicy , 
> we see cyclic replication of topics. Could you confirm whether it's mandatory 
> to use a CustomReplicationPolicy whenever we want to use a separator other 
> than a "." ?
> Regards, 
> Lenin



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16603) Data loss when kafka connect sending data to Kafka

2024-05-15 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846662#comment-17846662
 ] 

Chris Egerton commented on KAFKA-16603:
---

[~dasarianil] In most cases, a single worker-global consumer is used to read 
offsets from the offsets topic. If a connector uses a separate offsets topic 
(which can happen if it is explicitly configured to use one via the 
[offsets.storage.topic|https://kafka.apache.org/documentation.html#sourceconnectorconfigs_offsets.storage.topic]
 property in its configuration, or if exactly-once support is enabled and the 
connector is configured to write to a different Kafka cluster than the one used 
by the Kafka Connect worker for its internal topics), then a new consumer is 
spun up for that connector, but that consumer will also be constantly polling 
the topic for changes and reads to the end of that topic should not lead to a 
significant bump in load on the Kafka cluster.

 

As far as communicating committed offsets to the {{SourceConnector}} instance 
goes, we cannot reuse state stored in the JVM to do this in most cases because 
Kafka Connect is intended to run in distributed mode with multiple workers, and 
there is no guarantee that the {{SourceConnector}} instance is running on the 
same worker as any of its tasks.

Again, though, we could possibly add some hook to the {{SourceConnector}} class 
that lets implementations know when new offsets have been successfully 
committed by any of its tasks. Would that be sufficient?

> Data loss when kafka connect sending data to Kafka
> --
>
> Key: KAFKA-16603
> URL: https://issues.apache.org/jira/browse/KAFKA-16603
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 3.3.1
>Reporter: Anil Dasari
>Priority: Major
>
> We are experiencing a data loss when Kafka Source connector is failed to send 
> data to Kafka topic and offset topic. 
> Kafka cluster and Kafka connect details:
>  # Kafka connect version i.e client : Confluent community version 7.3.1 i.e 
> Kafka 3.3.1
>  # Kafka version: 0.11.0 (server)
>  # Cluster size : 3 brokers
>  # Number of partitions in all topics = 3
>  # Replication factor = 3
>  # Min ISR set 2
>  # Uses no transformations in Kafka connector
>  # Use default error tolerance i.e None.
> Our connector checkpoints the offsets info received in 
> SourceTask#commitRecord and resume the data process from the persisted 
> checkpoint.
> The data loss is noticed when broker is unresponsive for few mins due to high 
> load and kafka connector was restarted. Also, Kafka connector graceful 
> shutdown failed.
> Logs:
>  
> {code:java}
> [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Discovered group 
> coordinator 10.75.100.176:31000 (id: 2147483647 rack: null)
> Apr 22, 2024 @ 15:56:16.152 [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Group coordinator 
> 10.75.100.176:31000 (id: 2147483647 rack: null) is unavailable or invalid due 
> to cause: coordinator unavailable. isDisconnected: false. Rediscovery will be 
> attempted.
> Apr 22, 2024 @ 15:56:16.153 [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Requesting disconnect from 
> last known coordinator 10.75.100.176:31000 (id: 2147483647 rack: null)
> Apr 22, 2024 @ 15:56:16.514 [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Node 0 disconnected.
> Apr 22, 2024 @ 15:56:16.708 [Producer 
> clientId=connector-producer-d094a5d7bbb046b99d62398cb84d648c-0] Node 0 
> disconnected.
> Apr 22, 2024 @ 15:56:16.710 [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Node 2147483647 
> disconnected.
> Apr 22, 2024 @ 15:56:16.731 [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Group coordinator 
> 10.75.100.176:31000 (id: 2147483647 rack: null) is unavailable or invalid due 
> to cause: coordinator unavailable. isDisconnected: true. Rediscovery will be 
> attempted.
> Apr 22, 2024 @ 15:56:19.103 == Trying to sleep while stop == (** custom log 
> **)
> Apr 22, 2024 @ 15:56:19.755 [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Broker coordinator was 
> unreachable for 3000ms. Revoking previous assignment Assignment{error=0, 
> leader='connect-1-8f41a1d2-6cc9-4956-9be3-1fbae9c6d305', 
> leaderUrl='http://10.75.100.46:8083/', offset=4, 
> connectorIds=[d094a5d7bbb046b99d62398cb84d648c], 
> taskIds=[d094a5d7bbb046b99d62398cb84d648c-0], revokedConnectorIds=[], 
> revokedTaskIds=[], delay=0} to avoid running tasks while not being a member 
> the group
> Apr 22, 2024 @ 15:56:19.866 Stopping connector 
> d094a5d7bbb046b99d62398cb84d648c
> Apr 22, 2024 @ 15:56:19.874 Stopping 

[jira] [Comment Edited] (KAFKA-16603) Data loss when kafka connect sending data to Kafka

2024-05-13 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17845927#comment-17845927
 ] 

Chris Egerton edited comment on KAFKA-16603 at 5/13/24 1:34 PM:


[~dasarianil] ah, nice! That takes advantage of existing logic we use in Kafka 
Connect to prevent connector source offsets from being prematurely committed, 
should save you work.

It shouldn't be too expensive to read source offsets at runtime–have you 
encountered a noticeable performance impact from this? Connect workers are 
constantly polling the offsets topic regardless of whether connectors have 
issued read requests; the only additional workload incurred by requesting 
offsets from the runtime should be a negligible bump in CPU from converting the 
offsets to/from byte arrays, and possibly a bit of load on your Kafka cluster 
caused by fetching the latest stable offsets for the offsets topic.

I think we could possibly add some kind of callback-based API for 
{{SourceConnector}} implementations to be notified whenever new offsets are 
read from the offsets topic, but I'd be hesitant to do so if there isn't 
convincing evidence that the existing API is insufficient. We almost certainly 
can't do something that would notify {{SourceConnector}} instances of when 
records from their tasks are successfully produced to Kafka.


was (Author: chrisegerton):
[~dasarianil] ah, nice! That takes advantage of existing logic we use in Kafka 
Connect to prevent connector source offsets from being prematurely committed, 
should save you work.

It shouldn't be too expensive to read source offsets at runtime–have you 
encountered a noticeable performance impact from this? Connect workers are 
constantly polling the offsets topic regardless of whether connectors have 
issued read requests; the only additional workload incurred by requesting 
offsets from the runtime should be a negligible bump in CPU from converting the 
offsets to/from byte arrays, and possibly a bit of load on your Kafka cluster 
caused by fetching the latest stable offsets for the offsets topic.

I think we could possibly add some kind of callback-based API for 
{{SourceConnector}} implementations to be notified whenever new offsets are 
read from the offsets topic, but I'd be hesitant to do so if there isn't 
convincing evidence that the existing API is insufficient.

> Data loss when kafka connect sending data to Kafka
> --
>
> Key: KAFKA-16603
> URL: https://issues.apache.org/jira/browse/KAFKA-16603
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 3.3.1
>Reporter: Anil Dasari
>Priority: Major
>
> We are experiencing a data loss when Kafka Source connector is failed to send 
> data to Kafka topic and offset topic. 
> Kafka cluster and Kafka connect details:
>  # Kafka connect version i.e client : Confluent community version 7.3.1 i.e 
> Kafka 3.3.1
>  # Kafka version: 0.11.0 (server)
>  # Cluster size : 3 brokers
>  # Number of partitions in all topics = 3
>  # Replication factor = 3
>  # Min ISR set 2
>  # Uses no transformations in Kafka connector
>  # Use default error tolerance i.e None.
> Our connector checkpoints the offsets info received in 
> SourceTask#commitRecord and resume the data process from the persisted 
> checkpoint.
> The data loss is noticed when broker is unresponsive for few mins due to high 
> load and kafka connector was restarted. Also, Kafka connector graceful 
> shutdown failed.
> Logs:
>  
> {code:java}
> [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Discovered group 
> coordinator 10.75.100.176:31000 (id: 2147483647 rack: null)
> Apr 22, 2024 @ 15:56:16.152 [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Group coordinator 
> 10.75.100.176:31000 (id: 2147483647 rack: null) is unavailable or invalid due 
> to cause: coordinator unavailable. isDisconnected: false. Rediscovery will be 
> attempted.
> Apr 22, 2024 @ 15:56:16.153 [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Requesting disconnect from 
> last known coordinator 10.75.100.176:31000 (id: 2147483647 rack: null)
> Apr 22, 2024 @ 15:56:16.514 [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Node 0 disconnected.
> Apr 22, 2024 @ 15:56:16.708 [Producer 
> clientId=connector-producer-d094a5d7bbb046b99d62398cb84d648c-0] Node 0 
> disconnected.
> Apr 22, 2024 @ 15:56:16.710 [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Node 2147483647 
> disconnected.
> Apr 22, 2024 @ 15:56:16.731 [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Group coordinator 
> 10.75.100.176:31000 (id: 2147483647 rack: null) is 

[jira] [Comment Edited] (KAFKA-16603) Data loss when kafka connect sending data to Kafka

2024-05-13 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17845927#comment-17845927
 ] 

Chris Egerton edited comment on KAFKA-16603 at 5/13/24 1:33 PM:


[~dasarianil] ah, nice! That takes advantage of existing logic we use in Kafka 
Connect to prevent connector source offsets from being prematurely committed, 
should save you work.

It shouldn't be too expensive to read source offsets at runtime–have you 
encountered a noticeable performance impact from this? Connect workers are 
constantly polling the offsets topic regardless of whether connectors have 
issued read requests; the only additional workload incurred by requesting 
offsets from the runtime should be a negligible bump in CPU from converting the 
offsets to/from byte arrays, and possibly a bit of load on your Kafka cluster 
caused by fetching the latest stable offsets for the offsets topic.

I think we could possibly add some kind of callback-based API for 
{{SourceConnector}} implementations to be notified whenever new offsets are 
read from the offsets topic, but I'd be hesitant to do so if there isn't 
convincing evidence that the existing API is insufficient.


was (Author: chrisegerton):
[~dasarianil] ah, nice! That takes advantage of existing logic we use in Kafka 
Connect to prevent connector source offsets from being prematurely committed, 
should save you work.

It shouldn't be too expensive to read source offsets at runtime–have you 
encountered a noticeable performance impact from this? Connect workers are 
constantly polling the offsets topic regardless of whether connectors have 
issued read requests; the only additional workload incurred by requesting 
offsets from the runtime should be a negligible bump in CPU from converting the 
offsets to/from byte arrays, and possibly a bit of load on your Kafka cluster 
caused by fetching the latest stable offsets for the offsets topic.

> Data loss when kafka connect sending data to Kafka
> --
>
> Key: KAFKA-16603
> URL: https://issues.apache.org/jira/browse/KAFKA-16603
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 3.3.1
>Reporter: Anil Dasari
>Priority: Major
>
> We are experiencing a data loss when Kafka Source connector is failed to send 
> data to Kafka topic and offset topic. 
> Kafka cluster and Kafka connect details:
>  # Kafka connect version i.e client : Confluent community version 7.3.1 i.e 
> Kafka 3.3.1
>  # Kafka version: 0.11.0 (server)
>  # Cluster size : 3 brokers
>  # Number of partitions in all topics = 3
>  # Replication factor = 3
>  # Min ISR set 2
>  # Uses no transformations in Kafka connector
>  # Use default error tolerance i.e None.
> Our connector checkpoints the offsets info received in 
> SourceTask#commitRecord and resume the data process from the persisted 
> checkpoint.
> The data loss is noticed when broker is unresponsive for few mins due to high 
> load and kafka connector was restarted. Also, Kafka connector graceful 
> shutdown failed.
> Logs:
>  
> {code:java}
> [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Discovered group 
> coordinator 10.75.100.176:31000 (id: 2147483647 rack: null)
> Apr 22, 2024 @ 15:56:16.152 [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Group coordinator 
> 10.75.100.176:31000 (id: 2147483647 rack: null) is unavailable or invalid due 
> to cause: coordinator unavailable. isDisconnected: false. Rediscovery will be 
> attempted.
> Apr 22, 2024 @ 15:56:16.153 [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Requesting disconnect from 
> last known coordinator 10.75.100.176:31000 (id: 2147483647 rack: null)
> Apr 22, 2024 @ 15:56:16.514 [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Node 0 disconnected.
> Apr 22, 2024 @ 15:56:16.708 [Producer 
> clientId=connector-producer-d094a5d7bbb046b99d62398cb84d648c-0] Node 0 
> disconnected.
> Apr 22, 2024 @ 15:56:16.710 [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Node 2147483647 
> disconnected.
> Apr 22, 2024 @ 15:56:16.731 [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Group coordinator 
> 10.75.100.176:31000 (id: 2147483647 rack: null) is unavailable or invalid due 
> to cause: coordinator unavailable. isDisconnected: true. Rediscovery will be 
> attempted.
> Apr 22, 2024 @ 15:56:19.103 == Trying to sleep while stop == (** custom log 
> **)
> Apr 22, 2024 @ 15:56:19.755 [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Broker coordinator was 
> unreachable for 3000ms. Revoking previous assignment Assignment{error=0, 
> 

[jira] [Commented] (KAFKA-16603) Data loss when kafka connect sending data to Kafka

2024-05-13 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17845927#comment-17845927
 ] 

Chris Egerton commented on KAFKA-16603:
---

[~dasarianil] ah, nice! That takes advantage of existing logic we use in Kafka 
Connect to prevent connector source offsets from being prematurely committed, 
should save you work.

It shouldn't be too expensive to read source offsets at runtime–have you 
encountered a noticeable performance impact from this? Connect workers are 
constantly polling the offsets topic regardless of whether connectors have 
issued read requests; the only additional workload incurred by requesting 
offsets from the runtime should be a negligible bump in CPU from converting the 
offsets to/from byte arrays, and possibly a bit of load on your Kafka cluster 
caused by fetching the latest stable offsets for the offsets topic.

> Data loss when kafka connect sending data to Kafka
> --
>
> Key: KAFKA-16603
> URL: https://issues.apache.org/jira/browse/KAFKA-16603
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 3.3.1
>Reporter: Anil Dasari
>Priority: Major
>
> We are experiencing a data loss when Kafka Source connector is failed to send 
> data to Kafka topic and offset topic. 
> Kafka cluster and Kafka connect details:
>  # Kafka connect version i.e client : Confluent community version 7.3.1 i.e 
> Kafka 3.3.1
>  # Kafka version: 0.11.0 (server)
>  # Cluster size : 3 brokers
>  # Number of partitions in all topics = 3
>  # Replication factor = 3
>  # Min ISR set 2
>  # Uses no transformations in Kafka connector
>  # Use default error tolerance i.e None.
> Our connector checkpoints the offsets info received in 
> SourceTask#commitRecord and resume the data process from the persisted 
> checkpoint.
> The data loss is noticed when broker is unresponsive for few mins due to high 
> load and kafka connector was restarted. Also, Kafka connector graceful 
> shutdown failed.
> Logs:
>  
> {code:java}
> [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Discovered group 
> coordinator 10.75.100.176:31000 (id: 2147483647 rack: null)
> Apr 22, 2024 @ 15:56:16.152 [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Group coordinator 
> 10.75.100.176:31000 (id: 2147483647 rack: null) is unavailable or invalid due 
> to cause: coordinator unavailable. isDisconnected: false. Rediscovery will be 
> attempted.
> Apr 22, 2024 @ 15:56:16.153 [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Requesting disconnect from 
> last known coordinator 10.75.100.176:31000 (id: 2147483647 rack: null)
> Apr 22, 2024 @ 15:56:16.514 [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Node 0 disconnected.
> Apr 22, 2024 @ 15:56:16.708 [Producer 
> clientId=connector-producer-d094a5d7bbb046b99d62398cb84d648c-0] Node 0 
> disconnected.
> Apr 22, 2024 @ 15:56:16.710 [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Node 2147483647 
> disconnected.
> Apr 22, 2024 @ 15:56:16.731 [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Group coordinator 
> 10.75.100.176:31000 (id: 2147483647 rack: null) is unavailable or invalid due 
> to cause: coordinator unavailable. isDisconnected: true. Rediscovery will be 
> attempted.
> Apr 22, 2024 @ 15:56:19.103 == Trying to sleep while stop == (** custom log 
> **)
> Apr 22, 2024 @ 15:56:19.755 [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Broker coordinator was 
> unreachable for 3000ms. Revoking previous assignment Assignment{error=0, 
> leader='connect-1-8f41a1d2-6cc9-4956-9be3-1fbae9c6d305', 
> leaderUrl='http://10.75.100.46:8083/', offset=4, 
> connectorIds=[d094a5d7bbb046b99d62398cb84d648c], 
> taskIds=[d094a5d7bbb046b99d62398cb84d648c-0], revokedConnectorIds=[], 
> revokedTaskIds=[], delay=0} to avoid running tasks while not being a member 
> the group
> Apr 22, 2024 @ 15:56:19.866 Stopping connector 
> d094a5d7bbb046b99d62398cb84d648c
> Apr 22, 2024 @ 15:56:19.874 Stopping task d094a5d7bbb046b99d62398cb84d648c-0
> Apr 22, 2024 @ 15:56:19.880 Scheduled shutdown for 
> WorkerConnectorWorkerConnector{id=d094a5d7bbb046b99d62398cb84d648c}
> Apr 22, 2024 @ 15:56:24.105 Connector 'd094a5d7bbb046b99d62398cb84d648c' 
> failed to properly shut down, has become unresponsive, and may be consuming 
> external resources. Correct the configuration for this connector or remove 
> the connector. After fixing the connector, it may be necessary to restart 
> this worker to release any consumed resources.
> Apr 22, 2024 @ 15:56:24.110 [Producer 
> clientId=connector-producer-d094a5d7bbb046b99d62398cb84d648c-0] Closing 

[jira] [Updated] (KAFKA-16445) PATCH method for connector configuration

2024-05-10 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16445?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-16445:
--
Fix Version/s: 3.8.0

> PATCH method for connector configuration
> 
>
> Key: KAFKA-16445
> URL: https://issues.apache.org/jira/browse/KAFKA-16445
> Project: Kafka
>  Issue Type: Improvement
>  Components: connect
>Reporter: Ivan Yurchenko
>Assignee: Ivan Yurchenko
>Priority: Minor
> Fix For: 3.8.0
>
>
> As  [KIP-477: Add PATCH method for connector config in Connect REST 
> API|https://cwiki.apache.org/confluence/display/KAFKA/KIP-477%3A+Add+PATCH+method+for+connector+config+in+Connect+REST+API]
>  suggests, we should introduce the PATCH method for connector configuration.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16656) Using a custom replication.policy.separator with DefaultReplicationPolicy

2024-05-08 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17844653#comment-17844653
 ] 

Chris Egerton commented on KAFKA-16656:
---

Hmm... I've expanded this to an integration test to test out that scenario and 
everything is still running smoothly. Are you sure you're using the 
{{DefaultReplicationPolicy}} class and not {{{}IdentityReplicationPolicy{}}}?

I should note that my tests are running on the latest trunk, so it's possible 
that there's a bug that's been fixed in between 3.5.1 and now that would 
explain the issues you're seeing.

> Using a custom replication.policy.separator with DefaultReplicationPolicy
> -
>
> Key: KAFKA-16656
> URL: https://issues.apache.org/jira/browse/KAFKA-16656
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 3.5.1
>Reporter: Lenin Joseph
>Priority: Major
>
> Hi,
> In the case of bidirectional replication using mm2, when we tried using a 
> custom replication.policy.separator( ex: "-") with DefaultReplicationPolicy , 
> we see cyclic replication of topics. Could you confirm whether it's mandatory 
> to use a CustomReplicationPolicy whenever we want to use a separator other 
> than a "." ?
> Regards, 
> Lenin



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16656) Using a custom replication.policy.separator with DefaultReplicationPolicy

2024-05-08 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17844653#comment-17844653
 ] 

Chris Egerton edited comment on KAFKA-16656 at 5/8/24 1:15 PM:
---

Hmm... I've expanded my local setup to a full-on integration test to run 
through that scenario and everything is still running smoothly. Are you sure 
you're using the {{DefaultReplicationPolicy}} class and not 
{{{}IdentityReplicationPolicy{}}}?

I should note that my tests are running on the latest trunk, so it's possible 
that there's a bug that's been fixed in between 3.5.1 and now that would 
explain the issues you're seeing.


was (Author: chrisegerton):
Hmm... I've expanded this to an integration test to test out that scenario and 
everything is still running smoothly. Are you sure you're using the 
{{DefaultReplicationPolicy}} class and not {{{}IdentityReplicationPolicy{}}}?

I should note that my tests are running on the latest trunk, so it's possible 
that there's a bug that's been fixed in between 3.5.1 and now that would 
explain the issues you're seeing.

> Using a custom replication.policy.separator with DefaultReplicationPolicy
> -
>
> Key: KAFKA-16656
> URL: https://issues.apache.org/jira/browse/KAFKA-16656
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 3.5.1
>Reporter: Lenin Joseph
>Priority: Major
>
> Hi,
> In the case of bidirectional replication using mm2, when we tried using a 
> custom replication.policy.separator( ex: "-") with DefaultReplicationPolicy , 
> we see cyclic replication of topics. Could you confirm whether it's mandatory 
> to use a CustomReplicationPolicy whenever we want to use a separator other 
> than a "." ?
> Regards, 
> Lenin



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16108) Backport fix for KAFKA-16093 to 3.7

2024-05-08 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16108?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-16108.
---
Resolution: Done

> Backport fix for KAFKA-16093 to 3.7
> ---
>
> Key: KAFKA-16108
> URL: https://issues.apache.org/jira/browse/KAFKA-16108
> Project: Kafka
>  Issue Type: Improvement
>  Components: connect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Blocker
> Fix For: 3.7.1
>
>
> A fix for KAFKA-16093 is present on the branches trunk (the version for which 
> is currently 3.8.0-SNAPSHOT) and 3.6. We are in code freeze for the 3.7.0 
> release, and this issue is not a blocker, so it cannot be backported right 
> now.
> We should backport the fix once 3.7.0 has been released and before 3.7.1 is 
> released.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16108) Backport fix for KAFKA-16093 to 3.7

2024-05-08 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16108?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17844647#comment-17844647
 ] 

Chris Egerton commented on KAFKA-16108:
---

I set it to blocker as a reminder to backport before 3.7.1, which we should 
still do (I'll get on it later today). It was not a blocker for the 3.7.0 
release because that one was already in code freeze.

> Backport fix for KAFKA-16093 to 3.7
> ---
>
> Key: KAFKA-16108
> URL: https://issues.apache.org/jira/browse/KAFKA-16108
> Project: Kafka
>  Issue Type: Improvement
>  Components: connect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Blocker
> Fix For: 3.7.1
>
>
> A fix for KAFKA-16093 is present on the branches trunk (the version for which 
> is currently 3.8.0-SNAPSHOT) and 3.6. We are in code freeze for the 3.7.0 
> release, and this issue is not a blocker, so it cannot be backported right 
> now.
> We should backport the fix once 3.7.0 has been released and before 3.7.1 is 
> released.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15018) Potential tombstone offsets corruption for exactly-once source connectors

2024-05-07 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15018?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-15018.
---
Fix Version/s: 3.8.0
   Resolution: Fixed

> Potential tombstone offsets corruption for exactly-once source connectors
> -
>
> Key: KAFKA-15018
> URL: https://issues.apache.org/jira/browse/KAFKA-15018
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 3.3.0, 3.4.0, 3.3.1, 3.3.2, 3.5.0, 3.4.1
>Reporter: Chris Egerton
>Assignee: Sagar Rao
>Priority: Major
> Fix For: 3.8.0
>
>
> When exactly-once support is enabled for source connectors, source offsets 
> can potentially be written to two different offsets topics: a topic specific 
> to the connector, and the global offsets topic (which was used for all 
> connectors prior to KIP-618 / version 3.3.0).
> Precedence is given to offsets in the per-connector offsets topic, but if 
> none are found for a given partition, then the global offsets topic is used 
> as a fallback.
> When committing offsets, a transaction is used to ensure that source records 
> and source offsets are written to the Kafka cluster targeted by the source 
> connector. This transaction only includes the connector-specific offsets 
> topic. Writes to the global offsets topic take place after writes to the 
> connector-specific offsets topic have completed successfully, and if they 
> fail, a warning message is logged, but no other action is taken.
> Normally, this ensures that, for offsets committed by exactly-once-supported 
> source connectors, the per-connector offsets topic is at least as up-to-date 
> as the global offsets topic, and sometimes even ahead.
> However, for tombstone offsets, we lose that guarantee. If a tombstone offset 
> is successfully written to the per-connector offsets topic, but cannot be 
> written to the global offsets topic, then the global offsets topic will still 
> contain that source offset, but the per-connector topic will not. Due to the 
> fallback-on-global logic used by the worker, if a task requests offsets for 
> one of the tombstoned partitions, the worker will provide it with the offsets 
> present in the global offsets topic, instead of indicating to the task that 
> no offsets can be found.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16656) Using a custom replication.policy.separator with DefaultReplicationPolicy

2024-05-07 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17844368#comment-17844368
 ] 

Chris Egerton commented on KAFKA-16656:
---

Hi [~leninjoseph] it should be possible to use a custom separator with the 
{{DefaultReplicationPolicy}} class. What are the names of the topics you're 
seeing cyclical replication for?

 

I've sketched out a unit test that can be added to the [ReplicationPolicyTest 
suite|https://github.com/apache/kafka/blob/05df10449eb9c95fe6d6055b302c84686be8058d/connect/mirror-client/src/test/java/org/apache/kafka/connect/mirror/ReplicationPolicyTest.java]
 that probes how the class handles custom separators and everything appears to 
be working, but this doesn't rule out the possibility of bugs in other places:

 

{{@Test}}
{{public void testCustomSeparator() {}}
{{  DefaultReplicationPolicy policyWithCustomSeparator = new 
DefaultReplicationPolicy();}}
{{  Map config = new HashMap<>();}}
{{  config.put(DefaultReplicationPolicy.SEPARATOR_CONFIG, "-");}}
{{  policyWithCustomSeparator.configure(config);}}

{{  assertEquals("source", policyWithCustomSeparator.topicSource("source-t"));}}
{{  assertEquals("t", policyWithCustomSeparator.upstreamTopic("source-t"));}}
{{}}}

> Using a custom replication.policy.separator with DefaultReplicationPolicy
> -
>
> Key: KAFKA-16656
> URL: https://issues.apache.org/jira/browse/KAFKA-16656
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 3.5.1
>Reporter: Lenin Joseph
>Priority: Major
>
> Hi,
> In the case of bidirectional replication using mm2, when we tried using a 
> custom replication.policy.separator( ex: "-") with DefaultReplicationPolicy , 
> we see cyclic replication of topics. Could you confirm whether it's mandatory 
> to use a CustomReplicationPolicy whenever we want to use a separator other 
> than a "." ?
> Regards, 
> Lenin



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-13329) Connect does not perform preflight validation for per-connector key and value converters

2024-05-07 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13329?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-13329.
---
Fix Version/s: 3.8.0
   Resolution: Fixed

> Connect does not perform preflight validation for per-connector key and value 
> converters
> 
>
> Key: KAFKA-13329
> URL: https://issues.apache.org/jira/browse/KAFKA-13329
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 3.8.0
>
>
> Users may specify a key and/or value converter class for their connector 
> directly in the configuration for that connector. If this occurs, no 
> preflight validation is performed to ensure that the specified converter is 
> valid.
> Unfortunately, the [Converter 
> interface|https://github.com/apache/kafka/blob/4eb386f6e060e12e1940c0d780987e3a7c438d74/connect/api/src/main/java/org/apache/kafka/connect/storage/Converter.java]
>  does not require converters to expose a {{ConfigDef}} (unlike the 
> [HeaderConverter 
> interface|https://github.com/apache/kafka/blob/4eb386f6e060e12e1940c0d780987e3a7c438d74/connect/api/src/main/java/org/apache/kafka/connect/storage/HeaderConverter.java#L48-L52],
>  which does have that requirement), so it's unlikely that the configuration 
> properties of the converter itself can be validated.
> However, we can and should still validate that the converter class exists, 
> can be instantiated (i.e., has a public, no-args constructor and is a 
> concrete, non-abstract class), and implements the {{Converter}} interface.
> *EDIT:* Since this ticket was originally filed, a {{Converter::config}} 
> method was added in 
> [KIP-769|https://cwiki.apache.org/confluence/display/KAFKA/KIP-769%3A+Connect+APIs+to+list+all+connector+plugins+and+retrieve+their+configuration+definitions].
>  We can now utilize that config definition during preflight validation for 
> connectors.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-13328) Connect does not perform preflight validation for per-connector header converters

2024-05-07 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13328?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-13328.
---
Fix Version/s: 3.8.0
   Resolution: Fixed

> Connect does not perform preflight validation for per-connector header 
> converters
> -
>
> Key: KAFKA-13328
> URL: https://issues.apache.org/jira/browse/KAFKA-13328
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 3.8.0
>
>
> Users may specify a header converter class for their connector directly in 
> the configuration for that connector. If this occurs, no preflight validation 
> is performed to ensure that the specified converter is valid.
> {{HeaderConverter}} implementations are required to provide a valid 
> {{ConfigDef}} to the Connect framework via 
> [HeaderConverter::config|https://github.com/apache/kafka/blob/4eb386f6e060e12e1940c0d780987e3a7c438d74/connect/api/src/main/java/org/apache/kafka/connect/storage/HeaderConverter.java#L48-L52],
>  but this object isn't actually leveraged anywhere by Connect.
> Connect should make use of this config object during preflight validation for 
> connectors to fail faster when their header converters are misconfigured.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16604) Deprecate ConfigDef.ConfigKey constructor from public APIs

2024-04-30 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16604?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842412#comment-17842412
 ] 

Chris Egerton commented on KAFKA-16604:
---

We don't necessarily have to deprecate anything. It's a little awkward to 
support all the {{ConfigDef::define}} variants and a builder pattern for 
{{{}ConfigKey{}}}, but this would produce no breaking changes and provide a 
path forward that doesn't run the risk of accidental compatibility violations 
like in KAFKA-16592. IMO that would be a reasonable compromise.

Regardless, I agree with the root sentiment of this ticket (that we should 
probably change direction in how we evolve these two classes). Would love to 
discuss further on a KIP thread!

> Deprecate ConfigDef.ConfigKey constructor from public APIs
> --
>
> Key: KAFKA-16604
> URL: https://issues.apache.org/jira/browse/KAFKA-16604
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Sagar Rao
>Assignee: Sagar Rao
>Priority: Major
>
> Currently, one can create ConfigKey by either invoking the public constructor 
> directly and passing it to a ConfigDef object or by invoking the a bunch of 
> define methods. The 2 ways can get confusing at times. Moreover, it could 
> lead to errors as was noticed in KAFKA-16592
> We should ideally have only 1 way exposed to the users which IMO should be to 
> create the objects only through the exposed define methods. This ticket is 
> about marking the public constructor of ConfigKey as Deprecated first and 
> then making it private eventually.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16603) Data loss when kafka connect sending data to Kafka

2024-04-29 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842020#comment-17842020
 ] 

Chris Egerton commented on KAFKA-16603:
---

[~dasarianil] That's correct; {{Producer::send}} doesn't guarantee the order of 
acks across different partitions.

 

One possible workaround could be to enable exactly-once support for your Kafka 
Connect cluster (docs 
[here|https://kafka.apache.org/documentation.html#connect_exactlyoncesource]). 
This would not necessarily grant exactly-once semantics to your connector, but 
it would guarantee that record batches are produced atomically and prevent the 
scenario you're encountering here where records in a batch are ack'd for some 
partitions but not others.

> Data loss when kafka connect sending data to Kafka
> --
>
> Key: KAFKA-16603
> URL: https://issues.apache.org/jira/browse/KAFKA-16603
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 3.3.1
>Reporter: Anil Dasari
>Priority: Major
>
> We are experiencing a data loss when Kafka Source connector is failed to send 
> data to Kafka topic and offset topic. 
> Kafka cluster and Kafka connect details:
>  # Kafka connect version i.e client : Confluent community version 7.3.1 i.e 
> Kafka 3.3.1
>  # Kafka version: 0.11.0 (server)
>  # Cluster size : 3 brokers
>  # Number of partitions in all topics = 3
>  # Replication factor = 3
>  # Min ISR set 2
>  # Uses no transformations in Kafka connector
>  # Use default error tolerance i.e None.
> Our connector checkpoints the offsets info received in 
> SourceTask#commitRecord and resume the data process from the persisted 
> checkpoint.
> The data loss is noticed when broker is unresponsive for few mins due to high 
> load and kafka connector was restarted. Also, Kafka connector graceful 
> shutdown failed.
> Logs:
>  
> {code:java}
> [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Discovered group 
> coordinator 10.75.100.176:31000 (id: 2147483647 rack: null)
> Apr 22, 2024 @ 15:56:16.152 [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Group coordinator 
> 10.75.100.176:31000 (id: 2147483647 rack: null) is unavailable or invalid due 
> to cause: coordinator unavailable. isDisconnected: false. Rediscovery will be 
> attempted.
> Apr 22, 2024 @ 15:56:16.153 [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Requesting disconnect from 
> last known coordinator 10.75.100.176:31000 (id: 2147483647 rack: null)
> Apr 22, 2024 @ 15:56:16.514 [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Node 0 disconnected.
> Apr 22, 2024 @ 15:56:16.708 [Producer 
> clientId=connector-producer-d094a5d7bbb046b99d62398cb84d648c-0] Node 0 
> disconnected.
> Apr 22, 2024 @ 15:56:16.710 [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Node 2147483647 
> disconnected.
> Apr 22, 2024 @ 15:56:16.731 [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Group coordinator 
> 10.75.100.176:31000 (id: 2147483647 rack: null) is unavailable or invalid due 
> to cause: coordinator unavailable. isDisconnected: true. Rediscovery will be 
> attempted.
> Apr 22, 2024 @ 15:56:19.103 == Trying to sleep while stop == (** custom log 
> **)
> Apr 22, 2024 @ 15:56:19.755 [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Broker coordinator was 
> unreachable for 3000ms. Revoking previous assignment Assignment{error=0, 
> leader='connect-1-8f41a1d2-6cc9-4956-9be3-1fbae9c6d305', 
> leaderUrl='http://10.75.100.46:8083/', offset=4, 
> connectorIds=[d094a5d7bbb046b99d62398cb84d648c], 
> taskIds=[d094a5d7bbb046b99d62398cb84d648c-0], revokedConnectorIds=[], 
> revokedTaskIds=[], delay=0} to avoid running tasks while not being a member 
> the group
> Apr 22, 2024 @ 15:56:19.866 Stopping connector 
> d094a5d7bbb046b99d62398cb84d648c
> Apr 22, 2024 @ 15:56:19.874 Stopping task d094a5d7bbb046b99d62398cb84d648c-0
> Apr 22, 2024 @ 15:56:19.880 Scheduled shutdown for 
> WorkerConnectorWorkerConnector{id=d094a5d7bbb046b99d62398cb84d648c}
> Apr 22, 2024 @ 15:56:24.105 Connector 'd094a5d7bbb046b99d62398cb84d648c' 
> failed to properly shut down, has become unresponsive, and may be consuming 
> external resources. Correct the configuration for this connector or remove 
> the connector. After fixing the connector, it may be necessary to restart 
> this worker to release any consumed resources.
> Apr 22, 2024 @ 15:56:24.110 [Producer 
> clientId=connector-producer-d094a5d7bbb046b99d62398cb84d648c-0] Closing the 
> Kafka producer with timeoutMillis = 0 ms.
> Apr 22, 2024 @ 15:56:24.110 [Producer 
> 

[jira] [Commented] (KAFKA-16604) Deprecate ConfigDef.ConfigKey constructor from public APIs

2024-04-29 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16604?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842016#comment-17842016
 ] 

Chris Egerton commented on KAFKA-16604:
---

I don't know if it's worth the breaking change to disallow manual instantiation 
of the {{ConfigKey}} class. If we want to prevent bugs like we saw in 
KAFKA-16592, one alternative is to add a builder class for the {{ConfigKey}} 
class. This would allow us to add new fields to the {{ConfigKey}} class without 
having to add new (public) constructors, or alter existing ones.

> Deprecate ConfigDef.ConfigKey constructor from public APIs
> --
>
> Key: KAFKA-16604
> URL: https://issues.apache.org/jira/browse/KAFKA-16604
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Sagar Rao
>Assignee: Sagar Rao
>Priority: Major
>
> Currently, one can create ConfigKey by either invoking the public constructor 
> directly and passing it to a ConfigDef object or by invoking the a bunch of 
> define methods. The 2 ways can get confusing at times. Moreover, it could 
> lead to errors as was noticed in KAFKA-16592
> We should ideally have only 1 way exposed to the users which IMO should be to 
> create the objects only through the exposed define methods. This ticket is 
> about marking the public constructor of ConfigKey as Deprecated first and 
> then making it private eventually.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16423) Ignoring offset partition key with an unexpected format for the first element in the partition key list. Expected type: java.lang.String, actual type: null"

2024-03-26 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16423?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-16423.
---
Fix Version/s: (was: 3.6.2)
   (was: 3.8.0)
   (was: 3.7.1)
   Resolution: Duplicate

> Ignoring offset partition key with an unexpected format for the first element 
> in the partition key list. Expected type: java.lang.String, actual type: null"
> 
>
> Key: KAFKA-16423
> URL: https://issues.apache.org/jira/browse/KAFKA-16423
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 3.5.0, 3.6.0, 3.5.1, 3.5.2, 3.7.0, 3.6.1, 3.8.0
>Reporter: johndoe
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16423) Ignoring offset partition key with an unexpected format for the first element in the partition key list. Expected type: java.lang.String, actual type: null"

2024-03-26 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16423?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-16423.
---
Resolution: Duplicate

> Ignoring offset partition key with an unexpected format for the first element 
> in the partition key list. Expected type: java.lang.String, actual type: null"
> 
>
> Key: KAFKA-16423
> URL: https://issues.apache.org/jira/browse/KAFKA-16423
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 3.5.0, 3.6.0, 3.5.1, 3.5.2, 3.7.0, 3.6.1, 3.8.0
>Reporter: johndoe
>Assignee: johndoe
>Priority: Minor
> Fix For: 3.6.2, 3.8.0, 3.7.1
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (KAFKA-16423) Ignoring offset partition key with an unexpected format for the first element in the partition key list. Expected type: java.lang.String, actual type: null"

2024-03-26 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16423?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton reopened KAFKA-16423:
---
  Assignee: (was: johndoe)

> Ignoring offset partition key with an unexpected format for the first element 
> in the partition key list. Expected type: java.lang.String, actual type: null"
> 
>
> Key: KAFKA-16423
> URL: https://issues.apache.org/jira/browse/KAFKA-16423
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 3.5.0, 3.6.0, 3.5.1, 3.5.2, 3.7.0, 3.6.1, 3.8.0
>Reporter: johndoe
>Priority: Minor
> Fix For: 3.6.2, 3.8.0, 3.7.1
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16392) Spurious log warnings: "Ignoring offset partition key with an unexpected format for the second element in the partition key list. Expected type: java.util.Map, actual ty

2024-03-20 Thread Chris Egerton (Jira)
Chris Egerton created KAFKA-16392:
-

 Summary: Spurious log warnings: "Ignoring offset partition key 
with an unexpected format for the second element in the partition key list. 
Expected type: java.util.Map, actual type: null"
 Key: KAFKA-16392
 URL: https://issues.apache.org/jira/browse/KAFKA-16392
 Project: Kafka
  Issue Type: Bug
  Components: connect
Affects Versions: 3.6.1, 3.7.0, 3.5.2, 3.5.1, 3.6.0, 3.5.0, 3.8.0
Reporter: Chris Egerton
Assignee: Chris Egerton


Some source connectors choose not to specify source offsets with the records 
they emit (or rather, to provide null partitions/offsets). When these 
partitions are parsed by a Kafka Connect worker, this currently leads to a 
spurious warning log message.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15575) Prevent Connectors from exceeding tasks.max configuration

2024-02-01 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-15575.
---
Fix Version/s: 3.8.0
   Resolution: Fixed

> Prevent Connectors from exceeding tasks.max configuration
> -
>
> Key: KAFKA-15575
> URL: https://issues.apache.org/jira/browse/KAFKA-15575
> Project: Kafka
>  Issue Type: Task
>  Components: connect
>Reporter: Greg Harris
>Assignee: Chris Egerton
>Priority: Minor
>  Labels: kip
> Fix For: 3.8.0
>
>
> The Connector::taskConfigs(int maxTasks) function is used by Connectors to 
> enumerate tasks configurations. This takes an argument which comes from the 
> tasks.max connector config. This is the Javadoc for that method:
> {noformat}
> /**
>  * Returns a set of configurations for Tasks based on the current 
> configuration,
>  * producing at most {@code maxTasks} configurations.
>  *
>  * @param maxTasks maximum number of configurations to generate
>  * @return configurations for Tasks
>  */
> public abstract List> taskConfigs(int maxTasks);
> {noformat}
> This includes the constraint that the number of tasks is at most maxTasks, 
> but this constraint is not enforced by the framework.
>  
> To enforce this constraint, we could begin dropping configs that exceed the 
> limit, and log a warning. For sink connectors this should harmlessly 
> rebalance the consumer subscriptions onto the remaining tasks. For source 
> connectors that distribute their work via task configs, this may result in an 
> interruption in data transfer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15524) Flaky test org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testResetSinkConnectorOffsetsZombieSinkTasks

2024-01-31 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15524?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton reassigned KAFKA-15524:
-

Assignee: Chris Egerton

> Flaky test 
> org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testResetSinkConnectorOffsetsZombieSinkTasks
> --
>
> Key: KAFKA-15524
> URL: https://issues.apache.org/jira/browse/KAFKA-15524
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 3.6.0, 3.5.1
>Reporter: Josep Prat
>Assignee: Chris Egerton
>Priority: Major
>  Labels: flaky, flaky-test
>
> Last seen: 
> [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14458/3/testReport/junit/org.apache.kafka.connect.integration/OffsetsApiIntegrationTest/Build___JDK_17_and_Scala_2_13___testResetSinkConnectorOffsetsZombieSinkTasks/]
>  
> h3. Error Message
> {code:java}
> java.lang.RuntimeException: java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: The request timed out.{code}
> h3. Stacktrace
> {code:java}
> java.lang.RuntimeException: java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: The request timed out. at 
> org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:427)
>  at 
> org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:401)
>  at 
> org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:392)
>  at 
> org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testResetSinkConnectorOffsetsZombieSinkTasks(OffsetsApiIntegrationTest.java:763)
>  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method) at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>  at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.base/java.lang.reflect.Method.invoke(Method.java:568) at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>  at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) 
> at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) 
> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>  at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>  at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at 
> org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at 
> org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at 
> org.junit.runners.ParentRunner.run(ParentRunner.java:413) at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:112)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:40)
>  at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:60)
>  at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:52)
>  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method) at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>  at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.base/java.lang.reflect.Method.invoke(Method.java:568) at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
>  at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>  at 
> 

[jira] [Assigned] (KAFKA-15917) Flaky test - OffsetsApiIntegrationTest.testAlterSinkConnectorOffsetsZombieSinkTasks

2024-01-31 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15917?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton reassigned KAFKA-15917:
-

Assignee: Chris Egerton

> Flaky test - 
> OffsetsApiIntegrationTest.testAlterSinkConnectorOffsetsZombieSinkTasks
> ---
>
> Key: KAFKA-15917
> URL: https://issues.apache.org/jira/browse/KAFKA-15917
> Project: Kafka
>  Issue Type: Bug
>Reporter: Haruki Okada
>Assignee: Chris Egerton
>Priority: Major
>  Labels: flaky-test
> Attachments: stdout.log
>
>
> [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14242/14/tests/]
>  
>  
> {code:java}
> Error
> java.lang.AssertionError: 
> Expected: a string containing "zombie sink task"
>  but: was "Could not alter connector offsets. Error response: 
> {"error_code":500,"message":"Failed to alter consumer group offsets for 
> connector test-connector"}"
> Stacktrace
> java.lang.AssertionError: 
> Expected: a string containing "zombie sink task"
>  but: was "Could not alter connector offsets. Error response: 
> {"error_code":500,"message":"Failed to alter consumer group offsets for 
> connector test-connector"}"
> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:8)
> at 
> org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testAlterSinkConnectorOffsetsZombieSinkTasks(OffsetsApiIntegrationTest.java:431)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
> at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
> at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
> at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
> at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
> at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:112)
> at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
> at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:40)
> at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:60)
> at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:52)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
> at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
> at 
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
> at 
> org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
> at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
> at 
> 

[jira] [Commented] (KAFKA-10816) Connect REST API should have a resource that can be used as a readiness probe

2024-01-25 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17811018#comment-17811018
 ] 

Chris Egerton commented on KAFKA-10816:
---

Hi [~tombentley], it's been a while since there's been activity on this one so 
I've reassigned it to myself. Let me know if you were still planning on working 
on it and I'll be happy to give it back :)

> Connect REST API should have a resource that can be used as a readiness probe
> -
>
> Key: KAFKA-10816
> URL: https://issues.apache.org/jira/browse/KAFKA-10816
> Project: Kafka
>  Issue Type: Improvement
>  Components: connect
>Reporter: Randall Hauch
>Assignee: Chris Egerton
>Priority: Major
>
> There are a few ways to accurately detect whether a Connect worker is 
> *completely* ready to process all REST requests:
> # Wait for {{Herder started}} in the Connect worker logs
> # Use the REST API to issue a request that will be completed only after the 
> herder has started, such as {{GET /connectors/{name}/}} or {{GET 
> /connectors/{name}/status}}.
> Other techniques can be used to detect other startup states, though none of 
> these will guarantee that the worker has indeed completely started up and can 
> process all REST requests:
> * {{GET /}} can be used to know when the REST server has started, but this 
> may be before the worker has started completely and successfully.
> * {{GET /connectors}} can be used to know when the REST server has started, 
> but this may be before the worker has started completely and successfully. 
> And, for the distributed Connect worker, this may actually return an older 
> list of connectors if the worker hasn't yet completely read through the 
> internal config topic. It's also possible that this request returns even if 
> the worker is having trouble reading from the internal config topic.
> * {{GET /connector-plugins}} can be used to know when the REST server has 
> started, but this may be before the worker has started completely and 
> successfully.
> The Connect REST API should have an endpoint that more obviously and more 
> simply can be used as a readiness probe. This could be a new resource (e.g., 
> {{GET /status}}), though this would only work on newer Connect runtimes, and 
> existing tooling, installations, and examples would have to be modified to 
> take advantage of this feature (if it exists). 
> Alternatively, we could make sure that the existing resources (e.g., {{GET 
> /}} or {{GET /connectors}}) wait for the herder to start completely; this 
> wouldn't require a KIP and it would not require clients use different 
> technique for newer and older Connect runtimes. (Whether or not we back port 
> this is another question altogether, since it's debatable whether the 
> behavior of the existing REST resources is truly a bug.)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-10816) Connect REST API should have a resource that can be used as a readiness probe

2024-01-25 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10816?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton reassigned KAFKA-10816:
-

Assignee: Chris Egerton  (was: Tom Bentley)

> Connect REST API should have a resource that can be used as a readiness probe
> -
>
> Key: KAFKA-10816
> URL: https://issues.apache.org/jira/browse/KAFKA-10816
> Project: Kafka
>  Issue Type: Improvement
>  Components: connect
>Reporter: Randall Hauch
>Assignee: Chris Egerton
>Priority: Major
>
> There are a few ways to accurately detect whether a Connect worker is 
> *completely* ready to process all REST requests:
> # Wait for {{Herder started}} in the Connect worker logs
> # Use the REST API to issue a request that will be completed only after the 
> herder has started, such as {{GET /connectors/{name}/}} or {{GET 
> /connectors/{name}/status}}.
> Other techniques can be used to detect other startup states, though none of 
> these will guarantee that the worker has indeed completely started up and can 
> process all REST requests:
> * {{GET /}} can be used to know when the REST server has started, but this 
> may be before the worker has started completely and successfully.
> * {{GET /connectors}} can be used to know when the REST server has started, 
> but this may be before the worker has started completely and successfully. 
> And, for the distributed Connect worker, this may actually return an older 
> list of connectors if the worker hasn't yet completely read through the 
> internal config topic. It's also possible that this request returns even if 
> the worker is having trouble reading from the internal config topic.
> * {{GET /connector-plugins}} can be used to know when the REST server has 
> started, but this may be before the worker has started completely and 
> successfully.
> The Connect REST API should have an endpoint that more obviously and more 
> simply can be used as a readiness probe. This could be a new resource (e.g., 
> {{GET /status}}), though this would only work on newer Connect runtimes, and 
> existing tooling, installations, and examples would have to be modified to 
> take advantage of this feature (if it exists). 
> Alternatively, we could make sure that the existing resources (e.g., {{GET 
> /}} or {{GET /connectors}}) wait for the herder to start completely; this 
> wouldn't require a KIP and it would not require clients use different 
> technique for newer and older Connect runtimes. (Whether or not we back port 
> this is another question altogether, since it's debatable whether the 
> behavior of the existing REST resources is truly a bug.)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15675) Fix flaky ConnectorRestartApiIntegrationTest.testMultiWorkerRestartOnlyConnector() test

2024-01-23 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17810057#comment-17810057
 ] 

Chris Egerton commented on KAFKA-15675:
---

I've done some analysis on this one and believe I've found the root cause. It's 
a confluence of a few different issues, but the TL;DR is: *the request to 
{{POST /connectors//restart?onlyFailed=false=false}} 
fails with a 409 error, this does not cause the test to (immediately) fail, but 
the connector is never restarted, which causes the test to time out while 
[waiting for the connector to be 
stopped|https://github.com/apache/kafka/blob/4d6a422e8607c257e58b6956061732dc33621fc2/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorRestartApiIntegrationTest.java#L272-L275].*

 

This kind of scenario probably raises several questions. Here's my best attempt 
to anticipate and address them:

 

*Why does the 409 response not cause the test to immediately fail?*

It's unclear on the original rationale for this, but the code structure 
[here|https://github.com/apache/kafka/blob/4d6a422e8607c257e58b6956061732dc33621fc2/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java#L374-L383]
 is fairly clear: issue the request, and if the status code is less than 400, 
attempt to deserialize the body. Then, unconditionally, return either null or 
the deserialized response body.

 

*Why is the 409 response occurring?*

The cluster (or, to be more specific, either the worker that received the 
initial REST request or, if the request was forwarded, the leader) detected 
that a rebalance due to an added/removed connector or new task configs was 
about to take place, and rejected the request. See the {{DistributedHerder}} 
class's 
[restartConnectorAndTasks|https://github.com/apache/kafka/blob/4d6a422e8607c257e58b6956061732dc33621fc2/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1467]
 and 
[checkRebalanceNeeded|https://github.com/apache/kafka/blob/4d6a422e8607c257e58b6956061732dc33621fc2/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L2302-L2307]
 methods for the logic to check for pending rebalances, and its logic for 
detecting pending rebalances 
[here|https://github.com/apache/kafka/blob/4d6a422e8607c257e58b6956061732dc33621fc2/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L2385],
 
[here|https://github.com/apache/kafka/blob/4d6a422e8607c257e58b6956061732dc33621fc2/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L2400],
 and 
[here|https://github.com/apache/kafka/blob/4d6a422e8607c257e58b6956061732dc33621fc2/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L2419].

 

*Why is a rebalance pending by the time we try to restart the connector? 
Shouldn't the cluster and the set of connectors and tasks on it be stable by 
this point?*

Yes, the cluster and set of connectors and tasks on it should be stable by the 
time we issue our restart request. We check to make sure that [every worker in 
the cluster is up and 
running|https://github.com/apache/kafka/blob/0ef89a7cc059b0ba9b2536e018303c9004ac9142/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorRestartApiIntegrationTest.java#L116-L117]
 before proceeding with the rest of the test, and that the [connector and 
expected number of tasks are 
running|https://github.com/apache/kafka/blob/0ef89a7cc059b0ba9b2536e018303c9004ac9142/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorRestartApiIntegrationTest.java#L252-L253]
 before issuing the restart request. Unfortunately, the former check–for worker 
liveness across the cluster–does not guarantee that every worker has joined the 
cluster. This check is [performed by issuing a request to the root 
resource|https://github.com/apache/kafka/blob/0ef89a7cc059b0ba9b2536e018303c9004ac9142/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java#L956-L975]
 ({{{}GET /{}}}) for each worker: if the response is valid (i.e., its body 
matches the expected format), then the worker is considered up and running. 
However, this does not guarantee that the worker has actually completed 
startup: it may not have finished reading to the end of internal topics, or had 
a chance to contact the group coordinator and join the cluster yet.

 

After examining the logs of one test case, it appeared that the following 
sequence of events took place:
 # A single worker completes startup (creates and reads to the end of internal 
topics, then joins the cluster)
 # The connector is created (by chance, the REST request to create the 
connector happens to be sent to the only worker that has completed startup so 
far)
 # The 

[jira] [Assigned] (KAFKA-15675) Fix flaky ConnectorRestartApiIntegrationTest.testMultiWorkerRestartOnlyConnector() test

2024-01-23 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15675?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton reassigned KAFKA-15675:
-

Assignee: Chris Egerton

> Fix flaky 
> ConnectorRestartApiIntegrationTest.testMultiWorkerRestartOnlyConnector() test
> ---
>
> Key: KAFKA-15675
> URL: https://issues.apache.org/jira/browse/KAFKA-15675
> Project: Kafka
>  Issue Type: Bug
>Reporter: Kirk True
>Assignee: Chris Egerton
>Priority: Major
>  Labels: flaky-test
> Attachments: error.stacktrace.txt, error.stdout.txt
>
>
> This integration test is flaky around 9% of test runs. Source: [Gradle 
> Enterprise test 
> trends|https://ge.apache.org/scans/tests?search.relativeStartTime=P28D=KAFKA=org.apache.kafka.connect.integration.ConnectorRestartApiIntegrationTest=testMultiWorkerRestartOnlyConnector].
> One failure had this message:
> {code:java}
> java.lang.AssertionError: Failed to stop connector and tasks within 12ms 
> {code}
> Please see the attachments for the stack trace and stdout log.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16108) Backport fix for KAFKA-16093 to 3.7

2024-01-10 Thread Chris Egerton (Jira)
Chris Egerton created KAFKA-16108:
-

 Summary: Backport fix for KAFKA-16093 to 3.7
 Key: KAFKA-16108
 URL: https://issues.apache.org/jira/browse/KAFKA-16108
 Project: Kafka
  Issue Type: Improvement
  Components: connect
Reporter: Chris Egerton
Assignee: Chris Egerton
 Fix For: 3.7.1


A fix for KAFKA-16093 is present on the branches trunk (the version for which 
is currently 3.8.0-SNAPSHOT) and 3.6. We are in code freeze for the 3.7.0 
release, and this issue is not a blocker, so it cannot be backported right now.

We should backport the fix once 3.7.0 has been released and before 3.7.1 is 
released.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16093) Spurious warnings logged to stderr about empty path annotations and providers not implementing provider interfaces

2024-01-10 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16093?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-16093:
--
Fix Version/s: 3.6.2

> Spurious warnings logged to stderr about empty path annotations and providers 
> not implementing provider interfaces
> --
>
> Key: KAFKA-16093
> URL: https://issues.apache.org/jira/browse/KAFKA-16093
> Project: Kafka
>  Issue Type: Improvement
>  Components: connect
>Affects Versions: 3.6.0, 3.7.0, 3.6.1, 3.6.2, 3.8.0
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Minor
> Fix For: 3.6.2, 3.8.0
>
>
> Some warnings get logged to stderr on Connect startup. For example:
> {quote}Jan 08, 2024 1:48:18 PM org.glassfish.jersey.internal.inject.Providers 
> checkProviderRuntime
> WARNING: A provider 
> org.apache.kafka.connect.runtime.rest.resources.RootResource registered in 
> SERVER runtime does not implement any provider interfaces applicable in the 
> SERVER runtime. Due to constraint configuration problems the provider 
> org.apache.kafka.connect.runtime.rest.resources.RootResource will be ignored. 
> Jan 08, 2024 1:48:18 PM org.glassfish.jersey.internal.inject.Providers 
> checkProviderRuntime
> WARNING: A provider 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource registered 
> in SERVER runtime does not implement any provider interfaces applicable in 
> the SERVER runtime. Due to constraint configuration problems the provider 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource will be 
> ignored. 
> Jan 08, 2024 1:48:18 PM org.glassfish.jersey.internal.inject.Providers 
> checkProviderRuntime
> WARNING: A provider 
> org.apache.kafka.connect.runtime.rest.resources.InternalConnectResource 
> registered in SERVER runtime does not implement any provider interfaces 
> applicable in the SERVER runtime. Due to constraint configuration problems 
> the provider 
> org.apache.kafka.connect.runtime.rest.resources.InternalConnectResource will 
> be ignored. 
> Jan 08, 2024 1:48:18 PM org.glassfish.jersey.internal.inject.Providers 
> checkProviderRuntime
> WARNING: A provider 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource 
> registered in SERVER runtime does not implement any provider interfaces 
> applicable in the SERVER runtime. Due to constraint configuration problems 
> the provider 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource will 
> be ignored. 
> Jan 08, 2024 1:48:18 PM org.glassfish.jersey.internal.inject.Providers 
> checkProviderRuntime
> WARNING: A provider 
> org.apache.kafka.connect.runtime.rest.resources.LoggingResource registered in 
> SERVER runtime does not implement any provider interfaces applicable in the 
> SERVER runtime. Due to constraint configuration problems the provider 
> org.apache.kafka.connect.runtime.rest.resources.LoggingResource will be 
> ignored. 
> Jan 08, 2024 1:48:19 PM org.glassfish.jersey.internal.Errors logErrors
> WARNING: The following warnings have been detected: WARNING: The 
> (sub)resource method listLoggers in 
> org.apache.kafka.connect.runtime.rest.resources.LoggingResource contains 
> empty path annotation.
> WARNING: The (sub)resource method listConnectors in 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains 
> empty path annotation.
> WARNING: The (sub)resource method createConnector in 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains 
> empty path annotation.
> WARNING: The (sub)resource method listConnectorPlugins in 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource 
> contains empty path annotation.
> WARNING: The (sub)resource method serverInfo in 
> org.apache.kafka.connect.runtime.rest.resources.RootResource contains empty 
> path annotation.
> {quote}
> These are benign, but can confuse and even frighten new users.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16093) Spurious warnings logged to stderr about empty path annotations and providers not implementing provider interfaces

2024-01-10 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16093?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17805158#comment-17805158
 ] 

Chris Egerton commented on KAFKA-16093:
---

Note that the "affects version(s)" field is not completely filled out, as this 
issue goes back several years.

> Spurious warnings logged to stderr about empty path annotations and providers 
> not implementing provider interfaces
> --
>
> Key: KAFKA-16093
> URL: https://issues.apache.org/jira/browse/KAFKA-16093
> Project: Kafka
>  Issue Type: Improvement
>  Components: connect
>Affects Versions: 3.7.0, 3.8.0
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Minor
> Fix For: 3.8.0
>
>
> Some warnings get logged to stderr on Connect startup. For example:
> {quote}Jan 08, 2024 1:48:18 PM org.glassfish.jersey.internal.inject.Providers 
> checkProviderRuntime
> WARNING: A provider 
> org.apache.kafka.connect.runtime.rest.resources.RootResource registered in 
> SERVER runtime does not implement any provider interfaces applicable in the 
> SERVER runtime. Due to constraint configuration problems the provider 
> org.apache.kafka.connect.runtime.rest.resources.RootResource will be ignored. 
> Jan 08, 2024 1:48:18 PM org.glassfish.jersey.internal.inject.Providers 
> checkProviderRuntime
> WARNING: A provider 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource registered 
> in SERVER runtime does not implement any provider interfaces applicable in 
> the SERVER runtime. Due to constraint configuration problems the provider 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource will be 
> ignored. 
> Jan 08, 2024 1:48:18 PM org.glassfish.jersey.internal.inject.Providers 
> checkProviderRuntime
> WARNING: A provider 
> org.apache.kafka.connect.runtime.rest.resources.InternalConnectResource 
> registered in SERVER runtime does not implement any provider interfaces 
> applicable in the SERVER runtime. Due to constraint configuration problems 
> the provider 
> org.apache.kafka.connect.runtime.rest.resources.InternalConnectResource will 
> be ignored. 
> Jan 08, 2024 1:48:18 PM org.glassfish.jersey.internal.inject.Providers 
> checkProviderRuntime
> WARNING: A provider 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource 
> registered in SERVER runtime does not implement any provider interfaces 
> applicable in the SERVER runtime. Due to constraint configuration problems 
> the provider 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource will 
> be ignored. 
> Jan 08, 2024 1:48:18 PM org.glassfish.jersey.internal.inject.Providers 
> checkProviderRuntime
> WARNING: A provider 
> org.apache.kafka.connect.runtime.rest.resources.LoggingResource registered in 
> SERVER runtime does not implement any provider interfaces applicable in the 
> SERVER runtime. Due to constraint configuration problems the provider 
> org.apache.kafka.connect.runtime.rest.resources.LoggingResource will be 
> ignored. 
> Jan 08, 2024 1:48:19 PM org.glassfish.jersey.internal.Errors logErrors
> WARNING: The following warnings have been detected: WARNING: The 
> (sub)resource method listLoggers in 
> org.apache.kafka.connect.runtime.rest.resources.LoggingResource contains 
> empty path annotation.
> WARNING: The (sub)resource method listConnectors in 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains 
> empty path annotation.
> WARNING: The (sub)resource method createConnector in 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains 
> empty path annotation.
> WARNING: The (sub)resource method listConnectorPlugins in 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource 
> contains empty path annotation.
> WARNING: The (sub)resource method serverInfo in 
> org.apache.kafka.connect.runtime.rest.resources.RootResource contains empty 
> path annotation.
> {quote}
> These are benign, but can confuse and even frighten new users.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16093) Spurious warnings logged to stderr about empty path annotations and providers not implementing provider interfaces

2024-01-10 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16093?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-16093:
--
Affects Version/s: 3.6.1
   3.6.0
   3.6.2

> Spurious warnings logged to stderr about empty path annotations and providers 
> not implementing provider interfaces
> --
>
> Key: KAFKA-16093
> URL: https://issues.apache.org/jira/browse/KAFKA-16093
> Project: Kafka
>  Issue Type: Improvement
>  Components: connect
>Affects Versions: 3.6.0, 3.7.0, 3.6.1, 3.6.2, 3.8.0
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Minor
> Fix For: 3.8.0
>
>
> Some warnings get logged to stderr on Connect startup. For example:
> {quote}Jan 08, 2024 1:48:18 PM org.glassfish.jersey.internal.inject.Providers 
> checkProviderRuntime
> WARNING: A provider 
> org.apache.kafka.connect.runtime.rest.resources.RootResource registered in 
> SERVER runtime does not implement any provider interfaces applicable in the 
> SERVER runtime. Due to constraint configuration problems the provider 
> org.apache.kafka.connect.runtime.rest.resources.RootResource will be ignored. 
> Jan 08, 2024 1:48:18 PM org.glassfish.jersey.internal.inject.Providers 
> checkProviderRuntime
> WARNING: A provider 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource registered 
> in SERVER runtime does not implement any provider interfaces applicable in 
> the SERVER runtime. Due to constraint configuration problems the provider 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource will be 
> ignored. 
> Jan 08, 2024 1:48:18 PM org.glassfish.jersey.internal.inject.Providers 
> checkProviderRuntime
> WARNING: A provider 
> org.apache.kafka.connect.runtime.rest.resources.InternalConnectResource 
> registered in SERVER runtime does not implement any provider interfaces 
> applicable in the SERVER runtime. Due to constraint configuration problems 
> the provider 
> org.apache.kafka.connect.runtime.rest.resources.InternalConnectResource will 
> be ignored. 
> Jan 08, 2024 1:48:18 PM org.glassfish.jersey.internal.inject.Providers 
> checkProviderRuntime
> WARNING: A provider 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource 
> registered in SERVER runtime does not implement any provider interfaces 
> applicable in the SERVER runtime. Due to constraint configuration problems 
> the provider 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource will 
> be ignored. 
> Jan 08, 2024 1:48:18 PM org.glassfish.jersey.internal.inject.Providers 
> checkProviderRuntime
> WARNING: A provider 
> org.apache.kafka.connect.runtime.rest.resources.LoggingResource registered in 
> SERVER runtime does not implement any provider interfaces applicable in the 
> SERVER runtime. Due to constraint configuration problems the provider 
> org.apache.kafka.connect.runtime.rest.resources.LoggingResource will be 
> ignored. 
> Jan 08, 2024 1:48:19 PM org.glassfish.jersey.internal.Errors logErrors
> WARNING: The following warnings have been detected: WARNING: The 
> (sub)resource method listLoggers in 
> org.apache.kafka.connect.runtime.rest.resources.LoggingResource contains 
> empty path annotation.
> WARNING: The (sub)resource method listConnectors in 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains 
> empty path annotation.
> WARNING: The (sub)resource method createConnector in 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains 
> empty path annotation.
> WARNING: The (sub)resource method listConnectorPlugins in 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource 
> contains empty path annotation.
> WARNING: The (sub)resource method serverInfo in 
> org.apache.kafka.connect.runtime.rest.resources.RootResource contains empty 
> path annotation.
> {quote}
> These are benign, but can confuse and even frighten new users.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16084) Simplify and deduplicate StandaloneHerderTest mocking

2024-01-09 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17804792#comment-17804792
 ] 

Chris Egerton commented on KAFKA-16084:
---

Thanks for the clarification [~gharris1727], agree that those redundant 
instantiations should be cleaned up 

> Simplify and deduplicate StandaloneHerderTest mocking
> -
>
> Key: KAFKA-16084
> URL: https://issues.apache.org/jira/browse/KAFKA-16084
> Project: Kafka
>  Issue Type: Test
>  Components: connect
>Reporter: Greg Harris
>Priority: Minor
>  Labels: newbie++
>
> The StandaloneHerderTest has some cruft that can be cleaned up. What i've 
> found:
> * The `connector` field is written in nearly every test, but only read by one 
> test, and looks to be nearly irrelevant.
> * `expectConfigValidation` has two ways of specifying consecutive 
> validations. 1. The boolean shouldCreateConnector which is true in the first 
> invocation and false in subsequent invocations. 2. by passing multiple 
> configurations via varargs.
> * The class uses a mix of Mock annotations and mock(Class) invocations
> * The test doesn't stop the thread pool created inside the herder and might 
> leak threads
> * Mocking for Worker#startConnector is 6 lines which are duplicated 8 times 
> throughout the test
> * Some waits are 1000 ms and others are 1000 s, and could be pulled out to 
> constants or a util method



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16093) Spurious warnings logged to stderr about empty path annotations and providers not implementing provider interfaces

2024-01-08 Thread Chris Egerton (Jira)
Chris Egerton created KAFKA-16093:
-

 Summary: Spurious warnings logged to stderr about empty path 
annotations and providers not implementing provider interfaces
 Key: KAFKA-16093
 URL: https://issues.apache.org/jira/browse/KAFKA-16093
 Project: Kafka
  Issue Type: Improvement
  Components: connect
Affects Versions: 3.7.0, 3.8.0
Reporter: Chris Egerton
Assignee: Chris Egerton


Some warnings get logged to stderr on Connect startup. For example:
{quote}Jan 08, 2024 1:48:18 PM org.glassfish.jersey.internal.inject.Providers 
checkProviderRuntime

WARNING: A provider 
org.apache.kafka.connect.runtime.rest.resources.RootResource registered in 
SERVER runtime does not implement any provider interfaces applicable in the 
SERVER runtime. Due to constraint configuration problems the provider 
org.apache.kafka.connect.runtime.rest.resources.RootResource will be ignored. 

Jan 08, 2024 1:48:18 PM org.glassfish.jersey.internal.inject.Providers 
checkProviderRuntime

WARNING: A provider 
org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource registered 
in SERVER runtime does not implement any provider interfaces applicable in the 
SERVER runtime. Due to constraint configuration problems the provider 
org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource will be 
ignored. 

Jan 08, 2024 1:48:18 PM org.glassfish.jersey.internal.inject.Providers 
checkProviderRuntime

WARNING: A provider 
org.apache.kafka.connect.runtime.rest.resources.InternalConnectResource 
registered in SERVER runtime does not implement any provider interfaces 
applicable in the SERVER runtime. Due to constraint configuration problems the 
provider 
org.apache.kafka.connect.runtime.rest.resources.InternalConnectResource will be 
ignored. 

Jan 08, 2024 1:48:18 PM org.glassfish.jersey.internal.inject.Providers 
checkProviderRuntime

WARNING: A provider 
org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource 
registered in SERVER runtime does not implement any provider interfaces 
applicable in the SERVER runtime. Due to constraint configuration problems the 
provider 
org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource will 
be ignored. 

Jan 08, 2024 1:48:18 PM org.glassfish.jersey.internal.inject.Providers 
checkProviderRuntime

WARNING: A provider 
org.apache.kafka.connect.runtime.rest.resources.LoggingResource registered in 
SERVER runtime does not implement any provider interfaces applicable in the 
SERVER runtime. Due to constraint configuration problems the provider 
org.apache.kafka.connect.runtime.rest.resources.LoggingResource will be 
ignored. 

Jan 08, 2024 1:48:19 PM org.glassfish.jersey.internal.Errors logErrors

WARNING: The following warnings have been detected: WARNING: The (sub)resource 
method listLoggers in 
org.apache.kafka.connect.runtime.rest.resources.LoggingResource contains empty 
path annotation.

WARNING: The (sub)resource method listConnectors in 
org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains 
empty path annotation.

WARNING: The (sub)resource method createConnector in 
org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains 
empty path annotation.

WARNING: The (sub)resource method listConnectorPlugins in 
org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource 
contains empty path annotation.

WARNING: The (sub)resource method serverInfo in 
org.apache.kafka.connect.runtime.rest.resources.RootResource contains empty 
path annotation.
{quote}
These are benign, but can confuse and even frighten new users.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16084) Simplify and deduplicate StandaloneHerderTest mocking

2024-01-06 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803835#comment-17803835
 ] 

Chris Egerton commented on KAFKA-16084:
---

I don't think a mix of annotation- and method-based mocks is an inherently bad 
thing. If a mock is used frequently or is cumbersome to create via the {{mock}} 
method, then it's acceptable to create it with an annotation. On the other 
hand, if a mock is used in a small subset of tests, it's better to create it 
using the {{mock}} method to avoid adding clutter to the entire test suite.

> Simplify and deduplicate StandaloneHerderTest mocking
> -
>
> Key: KAFKA-16084
> URL: https://issues.apache.org/jira/browse/KAFKA-16084
> Project: Kafka
>  Issue Type: Test
>  Components: connect
>Reporter: Greg Harris
>Priority: Minor
>  Labels: newbie++
>
> The StandaloneHerderTest has some cruft that can be cleaned up. What i've 
> found:
> * The `connector` field is written in nearly every test, but only read by one 
> test, and looks to be nearly irrelevant.
> * `expectConfigValidation` has two ways of specifying consecutive 
> validations. 1. The boolean shouldCreateConnector which is true in the first 
> invocation and false in subsequent invocations. 2. by passing multiple 
> configurations via varargs.
> * The class uses a mix of Mock annotations and mock(Class) invocations
> * The test doesn't stop the thread pool created inside the herder and might 
> leak threads
> * Mocking for Worker#startConnector is 6 lines which are duplicated 8 times 
> throughout the test
> * Some waits are 1000 ms and others are 1000 s, and could be pulled out to 
> constants or a util method



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15090) Source tasks are no longer stopped on a separate thread

2023-12-12 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15090?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-15090:
--
Description: 
Before [https://github.com/apache/kafka/pull/9669], in distributed mode, the 
{{SourceTask::stop}} method would be invoked on the herder tick thread, which 
is a separate thread from the dedicated thread which was responsible for 
polling data from the task and producing it to Kafka.

This aligned with the Javadocs for {{{}SourceTask:poll{}}}, which state:
{quote}The task will be stopped on a separate thread, and when that happens 
this method is expected to unblock, quickly finish up any remaining processing, 
and return.
{quote}
However, it came with the downside that the herder's tick thread would be 
blocked until the invocation of {{SourceTask::stop}} completed, which could 
result in major parts of the worker's REST API becoming unavailable and even 
the worker falling out of the cluster.

As a result, in 
[https://github.com/apache/kafka/pull/9669|https://github.com/apache/kafka/pull/9669],
 we changed the logic for task shutdown to cause {{SourceTask::stop}} to be 
invoked on the dedicated thread for the task (i.e., the one responsible for 
polling data from it and producing that data to Kafka).

This altered the semantics for {{SourceTask:poll}} and {{SourceTask::stop}} and 
may have broken connectors that block during {{poll}} with the expectation that 
{{stop}} can and will be invoked concurrently as a signal that any ongoing 
polls should be interrupted immediately.

Although reverting the fix is likely not a viable option (blocking the herder 
thread on interactions with user-written plugins is high-risk and we have tried 
to eliminate all instances of this where feasible), we may try to restore the 
expected contract by spinning up a separate thread exclusively for invoking 
{{SourceTask::stop}} separately from the dedicated thread for the task and the 
herder's thread.

  was:
Before [https://github.com/apache/kafka/pull/9669], in distributed mode, the 
{{SourceTask::stop}} method would be invoked on the herder tick thread, which 
is a separate thread from the dedicated thread which was responsible for 
polling data from the task and producing it to Kafka.

This aligned with the Javadocs for {{{}SourceTask:poll{}}}, which state:
{quote}The task will be stopped on a separate thread, and when that happens 
this method is expected to unblock, quickly finish up any remaining processing, 
and return.
{quote}
However, it came with the downside that the herder's tick thread would be 
blocked until the invocation of {{SourceTask::stop}} completed, which could 
result in major parts of the worker's REST API becoming unavailable and even 
the worker falling out of the cluster.

As a result, in 
[https://github.com/apache/kafka/pull/9669|https://github.com/apache/kafka/pull/9669,],
 we changed the logic for task shutdown to cause {{SourceTask::stop}} to be 
invoked on the dedicated thread for the task (i.e., the one responsible for 
polling data from it and producing that data to Kafka).

This altered the semantics for {{SourceTask:poll}} and {{SourceTask::stop}} and 
may have broken connectors that block during {{poll}} with the expectation that 
{{stop}} can and will be invoked concurrently as a signal that any ongoing 
polls should be interrupted immediately.

Although reverting the fix is likely not a viable option (blocking the herder 
thread on interactions with user-written plugins is high-risk and we have tried 
to eliminate all instances of this where feasible), we may try to restore the 
expected contract by spinning up a separate thread exclusively for invoking 
{{SourceTask::stop}} separately from the dedicated thread for the task and the 
herder's thread.


> Source tasks are no longer stopped on a separate thread
> ---
>
> Key: KAFKA-15090
> URL: https://issues.apache.org/jira/browse/KAFKA-15090
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.1.0, 3.0.0, 3.0.1, 3.2.0, 3.1.1, 3.3.0, 3.0.2, 3.1.2, 
> 3.2.1, 3.4.0, 3.2.2, 3.2.3, 3.3.1, 3.3.2, 3.2.4, 3.1.3, 3.0.3, 3.5.0, 3.4.1, 
> 3.3.3, 3.6.0, 3.5.1
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
>
> Before [https://github.com/apache/kafka/pull/9669], in distributed mode, the 
> {{SourceTask::stop}} method would be invoked on the herder tick thread, which 
> is a separate thread from the dedicated thread which was responsible for 
> polling data from the task and producing it to Kafka.
> This aligned with the Javadocs for {{{}SourceTask:poll{}}}, which state:
> {quote}The task will be stopped on a separate thread, and when that happens 
> this method is expected to unblock, quickly finish up any remaining 
> processing, 

[jira] [Updated] (KAFKA-15090) Source tasks are no longer stopped on a separate thread

2023-12-12 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15090?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-15090:
--
Description: 
Before [https://github.com/apache/kafka/pull/9669], in distributed mode, the 
{{SourceTask::stop}} method would be invoked on the herder tick thread, which 
is a separate thread from the dedicated thread which was responsible for 
polling data from the task and producing it to Kafka.

This aligned with the Javadocs for {{{}SourceTask:poll{}}}, which state:
{quote}The task will be stopped on a separate thread, and when that happens 
this method is expected to unblock, quickly finish up any remaining processing, 
and return.
{quote}
However, it came with the downside that the herder's tick thread would be 
blocked until the invocation of {{SourceTask::stop}} completed, which could 
result in major parts of the worker's REST API becoming unavailable and even 
the worker falling out of the cluster.

As a result, in 
[https://github.com/apache/kafka/pull/9669|https://github.com/apache/kafka/pull/9669,],
 we changed the logic for task shutdown to cause {{SourceTask::stop}} to be 
invoked on the dedicated thread for the task (i.e., the one responsible for 
polling data from it and producing that data to Kafka).

This altered the semantics for {{SourceTask:poll}} and {{SourceTask::stop}} and 
may have broken connectors that block during {{poll}} with the expectation that 
{{stop}} can and will be invoked concurrently as a signal that any ongoing 
polls should be interrupted immediately.

Although reverting the fix is likely not a viable option (blocking the herder 
thread on interactions with user-written plugins is high-risk and we have tried 
to eliminate all instances of this where feasible), we may try to restore the 
expected contract by spinning up a separate thread exclusively for invoking 
{{SourceTask::stop}} separately from the dedicated thread for the task and the 
herder's thread.

  was:
Before [https://github.com/apache/kafka/pull/9669], in distributed mode, the 
{{SourceTask::stop}} method would be invoked on the herder tick thread, which 
is a separate thread from the dedicated thread which was responsible for 
polling data from the task and producing it to Kafka.

This aligned with the Javadocs for {{{}SourceTask:poll{}}}, which state:
{quote}The task will be stopped on a separate thread, and when that happens 
this method is expected to unblock, quickly finish up any remaining processing, 
and return.
{quote}
However, it came with the downside that the herder's tick thread would be 
blocked until the invocation of {{SourceTask::stop}} completed, which could 
result in major parts of the worker's REST API becoming unavailable and even 
the worker falling out of the cluster.

As a result, in [https://github.com/apache/kafka/pull/9669,] we changed the 
logic for task shutdown to cause {{SourceTask::stop}} to be invoked on the 
dedicated thread for the task (i.e., the one responsible for polling data from 
it and producing that data to Kafka).

This altered the semantics for {{SourceTask:poll}} and {{SourceTask::stop}} and 
may have broken connectors that block during {{poll}} with the expectation that 
{{stop}} can and will be invoked concurrently as a signal that any ongoing 
polls should be interrupted immediately.

Although reverting the fix is likely not a viable option (blocking the herder 
thread on interactions with user-written plugins is high-risk and we have tried 
to eliminate all instances of this where feasible), we may try to restore the 
expected contract by spinning up a separate thread exclusively for invoking 
{{SourceTask::stop}} separately from the dedicated thread for the task and the 
herder's thread.


> Source tasks are no longer stopped on a separate thread
> ---
>
> Key: KAFKA-15090
> URL: https://issues.apache.org/jira/browse/KAFKA-15090
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.1.0, 3.0.0, 3.0.1, 3.2.0, 3.1.1, 3.3.0, 3.0.2, 3.1.2, 
> 3.2.1, 3.4.0, 3.2.2, 3.2.3, 3.3.1, 3.3.2, 3.2.4, 3.1.3, 3.0.3, 3.5.0, 3.4.1, 
> 3.3.3, 3.6.0, 3.5.1
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
>
> Before [https://github.com/apache/kafka/pull/9669], in distributed mode, the 
> {{SourceTask::stop}} method would be invoked on the herder tick thread, which 
> is a separate thread from the dedicated thread which was responsible for 
> polling data from the task and producing it to Kafka.
> This aligned with the Javadocs for {{{}SourceTask:poll{}}}, which state:
> {quote}The task will be stopped on a separate thread, and when that happens 
> this method is expected to unblock, quickly finish up any remaining 
> processing, and return.
> {quote}
> However, it came with 

[jira] [Commented] (KAFKA-15992) Make MM2 heartbeats topic name configurable

2023-12-12 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17795777#comment-17795777
 ] 

Chris Egerton commented on KAFKA-15992:
---

The heartbeats topic name is already configurable in replication policies as of 
[KIP-690|https://cwiki.apache.org/confluence/display/KAFKA/KIP-690%3A+Add+additional+configuration+to+control+MirrorMaker+2+internal+topics+naming+convention].
 While it's still an option to augment the {{DefaultReplicationPolicy}} class 
with configuration properties to control its heartbeat topic, users can also 
subclass the {{DefaultReplicationPolicy}} to add custom logic right now if 
they'd like to.

 

I don't raise these points as reasons to not add this configurability to the 
{{DefaultReplicationPolicy}} class, but just to note the current workarounds 
that are available in case users need this right now and can't wait for a KIP 
to be approved and released.

> Make MM2 heartbeats topic name configurable
> ---
>
> Key: KAFKA-15992
> URL: https://issues.apache.org/jira/browse/KAFKA-15992
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Affects Versions: 3.7.0
>Reporter: Viktor Somogyi-Vass
>Assignee: Bertalan Kondrat
>Priority: Major
>
> With DefaultReplicationPolicy, the heartbeats topic name is hard-coded. 
> Instead, this should be configurable, so users can avoid collisions with the 
> "heartbeats" topics of other systems.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15563) Provide informative error messages when Connect REST requests time out

2023-12-11 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15563?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-15563.
---
Fix Version/s: 3.7.0
   Resolution: Fixed

> Provide informative error messages when Connect REST requests time out
> --
>
> Key: KAFKA-15563
> URL: https://issues.apache.org/jira/browse/KAFKA-15563
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 3.7.0
>
>
> The Kafka Connect REST API has a hardcoded timeout of 90 seconds. If any 
> operations take longer than that, a 500 error response is returned with the 
> message "Request timed out" (see 
> [here|https://github.com/apache/kafka/blob/7e1c453af9533aba8c19da2d08ce6595c1441fc0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/HerderRequestHandler.java#L70]).
> This can be a source of frustration for users, who want to understand what is 
> causing the request to time out. This can be specific to the request (for 
> example, a connector's [custom multi-property validation 
> logic|https://kafka.apache.org/35/javadoc/org/apache/kafka/connect/connector/Connector.html#validate(java.util.Map)]
>  is taking too long), or applicable to any request that goes through the 
> herder's tick thread (for which there are a variety of possible causes).
> We can give users better, immediate insight into what is causing requests to 
> time out by including information about the last possibly-blocking operation 
> the worker performed while servicing the request (or attempting to enter a 
> state where all preconditions necessary to service the request have been 
> satisfied), and when the worker began that operation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15988) Kafka Connect OffsetsApiIntegrationTest takes too long

2023-12-07 Thread Chris Egerton (Jira)
Chris Egerton created KAFKA-15988:
-

 Summary: Kafka Connect OffsetsApiIntegrationTest takes too long
 Key: KAFKA-15988
 URL: https://issues.apache.org/jira/browse/KAFKA-15988
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Reporter: Chris Egerton
Assignee: Chris Egerton


The [OffsetsApiIntegrationTest 
suite|https://github.com/apache/kafka/blob/c515bf51f820f26ff6be6b0fde03b47b69a10b00/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java]
 currently contains 27 test cases. Each test case begins by creating embedded 
Kafka and Kafka Connect clusters, which is fairly resource-intensive and 
time-consuming.

If possible, we should reuse those embedded clusters across test cases.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14132) Remaining PowerMock to Mockito tests

2023-12-07 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-14132:
--
Description: 
{color:#de350b}Some of the tests below use EasyMock as well. For those migrate 
both PowerMock and EasyMock to Mockito.{color}

Unless stated in brackets the tests are in the connect module.

A list of tests which still require to be moved from PowerMock to Mockito as of 
2nd of August 2022 which do not have a Jira issue and do not have pull requests 
I am aware of which are opened:

{color:#ff8b00}InReview{color}
{color:#00875a}Merged{color}
 # {color:#00875a}ErrorHandlingTaskTest{color} (owner: [~shekharrajak])
 # {color:#00875a}SourceTaskOffsetCommiterTest{color} (owner: Christo)
 # {color:#00875a}WorkerMetricsGroupTest{color} (owner: Divij)
 # {color:#00875a}WorkerTaskTest{color} (owner: [~yash.mayya])
 # {color:#00875a}ErrorReporterTest{color} (owner: [~yash.mayya])
 # {color:#00875a}RetryWithToleranceOperatorTest{color} (owner: [~yash.mayya])
 # {color:#00875a}WorkerErrantRecordReporterTest{color} (owner: [~yash.mayya])
 # {color:#00875a}ConnectorsResourceTest{color} (owner: [~mdedetrich-aiven])
 # {color:#00875a}StandaloneHerderTest{color} (owner: [~mdedetrich-aiven])
 # KafkaConfigBackingStoreTest (owner: [~bachmanity1])
 # {color:#00875a}KafkaOffsetBackingStoreTest{color} (owner: Christo) 
([https://github.com/apache/kafka/pull/12418])
 # {color:#00875a}KafkaBasedLogTest{color} (owner: @bachmanity ])
 # {color:#00875a}RetryUtilTest{color} (owner: [~yash.mayya])
 # {color:#00875a}RepartitionTopicTest{color} (streams) (owner: Christo)
 # {color:#00875a}StateManagerUtilTest{color} (streams) (owner: Christo)

*The coverage report for the above tests after the change should be >= to what 
the coverage is now.*

  was:
{color:#de350b}Some of the tests below use EasyMock as well. For those migrate 
both PowerMock and EasyMock to Mockito.{color}

Unless stated in brackets the tests are in the connect module.

A list of tests which still require to be moved from PowerMock to Mockito as of 
2nd of August 2022 which do not have a Jira issue and do not have pull requests 
I am aware of which are opened:

{color:#ff8b00}InReview{color}
{color:#00875a}Merged{color}
 # {color:#00875a}ErrorHandlingTaskTest{color} (owner: [~shekharrajak])
 # {color:#00875a}SourceTaskOffsetCommiterTest{color} (owner: Christo)
 # {color:#00875a}WorkerMetricsGroupTest{color} (owner: Divij)
 # {color:#00875a}WorkerTaskTest{color} (owner: [~yash.mayya])
 # {color:#00875a}ErrorReporterTest{color} (owner: [~yash.mayya])
 # {color:#00875a}RetryWithToleranceOperatorTest{color} (owner: [~yash.mayya])
 # {color:#00875a}WorkerErrantRecordReporterTest{color} (owner: [~yash.mayya])
 # {color:#00875a}ConnectorsResourceTest{color} (owner: [~mdedetrich-aiven])
 # {color:#ff8b00}StandaloneHerderTest{color} (owner: [~mdedetrich-aiven]) 
([https://github.com/apache/kafka/pull/12728])
 # KafkaConfigBackingStoreTest (owner: [~bachmanity1])
 # {color:#00875a}KafkaOffsetBackingStoreTest{color} (owner: Christo) 
([https://github.com/apache/kafka/pull/12418])
 # {color:#00875a}KafkaBasedLogTest{color} (owner: @bachmanity ])
 # {color:#00875a}RetryUtilTest{color} (owner: [~yash.mayya])
 # {color:#00875a}RepartitionTopicTest{color} (streams) (owner: Christo)
 # {color:#00875a}StateManagerUtilTest{color} (streams) (owner: Christo)

*The coverage report for the above tests after the change should be >= to what 
the coverage is now.*


> Remaining PowerMock to Mockito tests
> 
>
> Key: KAFKA-14132
> URL: https://issues.apache.org/jira/browse/KAFKA-14132
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Christo Lolov
>Assignee: Christo Lolov
>Priority: Major
> Fix For: 3.7.0
>
>
> {color:#de350b}Some of the tests below use EasyMock as well. For those 
> migrate both PowerMock and EasyMock to Mockito.{color}
> Unless stated in brackets the tests are in the connect module.
> A list of tests which still require to be moved from PowerMock to Mockito as 
> of 2nd of August 2022 which do not have a Jira issue and do not have pull 
> requests I am aware of which are opened:
> {color:#ff8b00}InReview{color}
> {color:#00875a}Merged{color}
>  # {color:#00875a}ErrorHandlingTaskTest{color} (owner: [~shekharrajak])
>  # {color:#00875a}SourceTaskOffsetCommiterTest{color} (owner: Christo)
>  # {color:#00875a}WorkerMetricsGroupTest{color} (owner: Divij)
>  # {color:#00875a}WorkerTaskTest{color} (owner: [~yash.mayya])
>  # {color:#00875a}ErrorReporterTest{color} (owner: [~yash.mayya])
>  # {color:#00875a}RetryWithToleranceOperatorTest{color} (owner: [~yash.mayya])
>  # {color:#00875a}WorkerErrantRecordReporterTest{color} (owner: [~yash.mayya])
>  # {color:#00875a}ConnectorsResourceTest{color} (owner: [~mdedetrich-aiven])
>  # 

[jira] [Commented] (KAFKA-15985) Mirrormaker 2 offset sync is incomplete

2023-12-07 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17794266#comment-17794266
 ] 

Chris Egerton commented on KAFKA-15985:
---

[~Reamer] Can you share your MM2 configuration (feel free to scrub any 
sensitive properties)? I'm especially interested in the value of the 
{{offset.lag.max}} property, though other properties may also be relevant here.

> Mirrormaker 2 offset sync is incomplete
> ---
>
> Key: KAFKA-15985
> URL: https://issues.apache.org/jira/browse/KAFKA-15985
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.5.1
>Reporter: Philipp Dallig
>Priority: Major
>
> We are currently trying to migrate between two Kafka clusters using 
> Mirrormaker2
> new kafka cluster version: 7.5.2-ccs
> old kafka cluster version: kafka_2.13-2.8.0
> The Mirrormaker 2 process runs on the new cluster (target cluster)
> My main problem: The lag in the target cluster is not the same as in the 
> source cluster.
> target cluster
> {code}
> GROUP   TOPICPARTITION  CURRENT-OFFSET  
> LOG-END-OFFSET  LAG CONSUMER-ID HOSTCLIENT-ID
> test-sync-5 kafka-replication-test-5 0  36373668  
>   31  -   -   -
> {code}
> source cluster
> {code}
> GROUP   TOPICPARTITION  CURRENT-OFFSET  
> LOG-END-OFFSET  LAG CONSUMER-ID HOSTCLIENT-ID
> test-sync-5 kafka-replication-test-5 0  36683668  
>   0   -   -   -
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-13988) Mirrormaker 2 auto.offset.reset=latest not working

2023-12-04 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13988?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-13988:
--
Fix Version/s: 3.6.2

> Mirrormaker 2 auto.offset.reset=latest not working
> --
>
> Key: KAFKA-13988
> URL: https://issues.apache.org/jira/browse/KAFKA-13988
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 3.2.0
> Environment: Source Kafka cluster running on Ubuntu 20
> Source Kafka cluster Kafka v0.10
> Target Kafka cluster running in AWS MSK
> Target Kafka cluster Kafka v2.6.2
> Mirrormaker version 3.2.0 running on Ubuntu 20.
>Reporter: Daniel Florek
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 3.7.0, 3.6.2
>
>
> Hi. 
> I have problem setting up mirroring with MM2 from latest offset between 2 
> clusters. In logs I can see that Consumer that is consuming topics has 
> auto.offset.reset property set to latest. But still topics are read from 
> offset 0. I am using following configuration:
>  
> {code:java}
> clusters = A, B
> A.bootstrap.servers = broker-01A:9092
> B.bootstrap.servers = broker-01B:9092,broker-02B:9092,broker-03B:9092
> replication.policy.class = 
> org.apache.kafka.connect.mirror.IdentityReplicationPolicy
> #Enable replication between clusters and define topics which should be 
> replicated
> A->B.enabled = true
> A->B.topics = .*
> A->B.replication.factor=3
> A->B.emit.heartbeats.enabled = true
> A->B.emit.checkpoints.enabled = true
> auto.offset.reset=latest
> consumer.auto.offset.reset=latest
> A.consumer.auto.offset.reset=latest
> B.consumer.auto.offset.reset=latest
> refresh.topics.enabled=true
> heartbeats.topic.replication.factor=1
> checkpoints.topic.replication.factor=1
> offset-syncs.topic.replication.factor=1
> config.storage.replication.factor = 1
> offset.storage.replication.factor = 1
> status.storage.replication.factor = 1 {code}
> I am using Kafka 3.2.0 for Mirrormaker 2. Source kafka cluster is 1 broker 
> running on EC2 instance in AWS (quite an old version I think 0.10). Target 
> kafka cluster contains 3 brokers running in AWS MSK (version 2.6.2). 
> Could you point me what I am doing wrong? Or is this possibly a bug?
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15912) Parallelize conversion and transformation steps in Connect

2023-12-04 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15912?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17792933#comment-17792933
 ] 

Chris Egerton commented on KAFKA-15912:
---

+1 for the concerns about lack of thread safety in existing SMTs, and for not 
breaking stateful SMTs that rely on in-order record delivery.

I suppose we could still give each SMT/converter plugin (or a subset of them) a 
dedicated thread to work on. For example, in a source connector pipeline with 
two SMTs called "ValueToKey" and "ExtractField", and three converters for 
record keys, values, and headers, we could have something like this:

 

Thread 1: ValueToKey, ExtractField (in that order)

Thread 2: Header converter, key converter (in any order)

Thread 3: Value converter

 

Records would be delivered initially to the first thread, then passed to the 
second thread, then passed to the third, then back to the task thread (or, if 
we really want to get fancy, possibly dispatched directly to the producer).

This would allow up to three records to be processed at a time, though it would 
still be susceptible to hotspots (e.g., if there are no headers involved, the 
header converter step is basically a no-op, and traversing the entire record 
value for value conversion is likely to be the most CPU-intensive step). It's 
also unclear if this kind of limited parallelism would lead to much performance 
improvement on workers running multiple tasks; my suspicion is that the CPU 
would be pretty well-saturated on many of these already.

> Parallelize conversion and transformation steps in Connect
> --
>
> Key: KAFKA-15912
> URL: https://issues.apache.org/jira/browse/KAFKA-15912
> Project: Kafka
>  Issue Type: Improvement
>  Components: connect
>Reporter: Mickael Maison
>Priority: Major
>
> In busy Connect pipelines, the conversion and transformation steps can 
> sometimes have a very significant impact on performance. This is especially 
> true with large records with complex schemas, for example with CDC connectors 
> like Debezium.
> Today in order to always preserve ordering, converters and transformations 
> are called on one record at a time in a single thread in the Connect worker. 
> As Connect usually handles records in batches (up to max.poll.records in sink 
> pipelines, for source pipelines while it really depends on the connector, 
> most connectors I've seen still tend to return multiple records each loop), 
> it could be highly beneficial to attempt running the converters and 
> transformation chain in parallel by a pool a processing threads.
> It should be possible to do some of these steps in parallel and still keep 
> exact ordering. I'm even considering whether an option to lose ordering but 
> allow even faster processing would make sense.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-10840) Need way to catch auth issues in poll method of Java Kafka client

2023-12-04 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17792922#comment-17792922
 ] 

Chris Egerton commented on KAFKA-10840:
---

[~pnee] It's been a while but this does ring a bell. I think the case I recall 
is that source connectors were hanging because their admin client was 
infinitely retrying on some operation, but at this point it was years ago so 
I'm fuzzy on the details.

It'd be great if we could fail clients on expired certs, without also failing 
on transient (or possibly-transient) errors.

> Need way to catch auth issues in poll method of Java Kafka client
> -
>
> Key: KAFKA-10840
> URL: https://issues.apache.org/jira/browse/KAFKA-10840
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Devin G. Bost
>Priority: Blocker
>  Labels: authentication, client
>
> We recently implemented SSL authentication at our company, and when certs 
> expire, the Kafka client poll method silently fails without throwing any kind 
> of exception. This is a problem because the data flow could stop at any time 
> (due to certificate expiration) without us being able to handle it. The auth 
> issue shows up in Kafka broker logs, but we don't see any indication on the 
> client-side that there was an auth issue. As a consequence, the auth failure 
> happens 10 times a second forever. 
> We need a way to know on the client-side if an auth issue is blocking the 
> connection to Kafka so we can handle the exception and refresh the certs 
> (keystore/truststore) when the certs expire. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15821) Active topics for deleted connectors are not reset in standalone mode

2023-11-13 Thread Chris Egerton (Jira)
Chris Egerton created KAFKA-15821:
-

 Summary: Active topics for deleted connectors are not reset in 
standalone mode
 Key: KAFKA-15821
 URL: https://issues.apache.org/jira/browse/KAFKA-15821
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 3.5.1, 3.6.0, 3.4.1, 3.5.0, 3.3.2, 3.3.1, 3.2.3, 3.2.2, 
3.4.0, 3.2.1, 3.1.2, 3.0.2, 3.3.0, 3.1.1, 3.2.0, 2.8.2, 3.0.1, 3.0.0, 2.8.1, 
2.7.2, 2.6.3, 3.1.0, 2.6.2, 2.7.1, 2.8.0, 2.6.1, 2.7.0, 2.5.1, 2.6.0, 2.5.0, 
3.7.0
Reporter: Chris Egerton


In 
[KIP-558|https://cwiki.apache.org/confluence/display/KAFKA/KIP-558%3A+Track+the+set+of+actively+used+topics+by+connectors+in+Kafka+Connect],
 a new REST endpoint was added to report the set of active topics for a 
connector. The KIP specified that "Deleting a connector will reset this 
connector's set of active topics", and this logic was successfully implemented 
in distributed mode. However, in standalone mode, active topics for deleted 
connectors are not deleted, and if a connector is re-created, it will inherit 
the active topics of its predecessor(s) unless they were manually reset.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15575) Prevent Connectors from exceeding tasks.max configuration

2023-11-10 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17785147#comment-17785147
 ] 

Chris Egerton commented on KAFKA-15575:
---

I've published 
[KIP-1004|https://cwiki.apache.org/confluence/display/KAFKA/KIP-1004%3A+Enforce+tasks.max+property+in+Kafka+Connect]
 to try to address this gap.

> Prevent Connectors from exceeding tasks.max configuration
> -
>
> Key: KAFKA-15575
> URL: https://issues.apache.org/jira/browse/KAFKA-15575
> Project: Kafka
>  Issue Type: Task
>  Components: KafkaConnect
>Reporter: Greg Harris
>Assignee: Chris Egerton
>Priority: Minor
>  Labels: kip
>
> The Connector::taskConfigs(int maxTasks) function is used by Connectors to 
> enumerate tasks configurations. This takes an argument which comes from the 
> tasks.max connector config. This is the Javadoc for that method:
> {noformat}
> /**
>  * Returns a set of configurations for Tasks based on the current 
> configuration,
>  * producing at most {@code maxTasks} configurations.
>  *
>  * @param maxTasks maximum number of configurations to generate
>  * @return configurations for Tasks
>  */
> public abstract List> taskConfigs(int maxTasks);
> {noformat}
> This includes the constraint that the number of tasks is at most maxTasks, 
> but this constraint is not enforced by the framework.
>  
> To enforce this constraint, we could begin dropping configs that exceed the 
> limit, and log a warning. For sink connectors this should harmlessly 
> rebalance the consumer subscriptions onto the remaining tasks. For source 
> connectors that distribute their work via task configs, this may result in an 
> interruption in data transfer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15575) Prevent Connectors from exceeding tasks.max configuration

2023-11-10 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-15575:
--
Description: 
The Connector::taskConfigs(int maxTasks) function is used by Connectors to 
enumerate tasks configurations. This takes an argument which comes from the 
tasks.max connector config. This is the Javadoc for that method:
{noformat}
/**
 * Returns a set of configurations for Tasks based on the current configuration,
 * producing at most {@code maxTasks} configurations.
 *
 * @param maxTasks maximum number of configurations to generate
 * @return configurations for Tasks
 */
public abstract List> taskConfigs(int maxTasks);
{noformat}
This includes the constraint that the number of tasks is at most maxTasks, but 
this constraint is not enforced by the framework.

 

To enforce this constraint, we could begin dropping configs that exceed the 
limit, and log a warning. For sink connectors this should harmlessly rebalance 
the consumer subscriptions onto the remaining tasks. For source connectors that 
distribute their work via task configs, this may result in an interruption in 
data transfer.

  was:
The Connector::taskConfigs(int maxTasks) function is used by Connectors to 
enumerate tasks configurations. This takes an argument which comes from the 
tasks.max connector config. This is the Javadoc for that method:
{noformat}
/**
 * Returns a set of configurations for Tasks based on the current configuration,
 * producing at most {@code maxTasks} configurations.
 *
 * @param maxTasks maximum number of configurations to generate
 * @return configurations for Tasks
 */
public abstract List> taskConfigs(int maxTasks);
{noformat}
This includes the constraint that the number of tasks is at most maxTasks, but 
this constraint is not enforced by the framework.

 

We should begin enforcing this constraint by dropping configs that exceed the 
limit, and logging a warning. For sink connectors this should harmlessly 
rebalance the consumer subscriptions onto the remaining tasks. For source 
connectors that distribute their work via task configs, this may result in an 
interruption in data transfer.


> Prevent Connectors from exceeding tasks.max configuration
> -
>
> Key: KAFKA-15575
> URL: https://issues.apache.org/jira/browse/KAFKA-15575
> Project: Kafka
>  Issue Type: Task
>  Components: KafkaConnect
>Reporter: Greg Harris
>Assignee: Chris Egerton
>Priority: Minor
>  Labels: kip
>
> The Connector::taskConfigs(int maxTasks) function is used by Connectors to 
> enumerate tasks configurations. This takes an argument which comes from the 
> tasks.max connector config. This is the Javadoc for that method:
> {noformat}
> /**
>  * Returns a set of configurations for Tasks based on the current 
> configuration,
>  * producing at most {@code maxTasks} configurations.
>  *
>  * @param maxTasks maximum number of configurations to generate
>  * @return configurations for Tasks
>  */
> public abstract List> taskConfigs(int maxTasks);
> {noformat}
> This includes the constraint that the number of tasks is at most maxTasks, 
> but this constraint is not enforced by the framework.
>  
> To enforce this constraint, we could begin dropping configs that exceed the 
> limit, and log a warning. For sink connectors this should harmlessly 
> rebalance the consumer subscriptions onto the remaining tasks. For source 
> connectors that distribute their work via task configs, this may result in an 
> interruption in data transfer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15693) Disabling scheduled rebalance delay in Connect can lead to indefinitely unassigned connectors and tasks

2023-11-09 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15693?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-15693:
--
Fix Version/s: 3.5.2
   3.7.0
   3.6.1

> Disabling scheduled rebalance delay in Connect can lead to indefinitely 
> unassigned connectors and tasks
> ---
>
> Key: KAFKA-15693
> URL: https://issues.apache.org/jira/browse/KAFKA-15693
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.3.0, 2.4.0, 2.3.1, 2.5.0, 2.4.1, 2.6.0, 2.5.1, 2.7.0, 
> 2.6.1, 2.8.0, 2.7.1, 2.6.2, 3.1.0, 2.6.3, 2.7.2, 2.8.1, 3.0.0, 3.0.1, 2.8.2, 
> 3.2.0, 3.1.1, 3.3.0, 3.0.2, 3.1.2, 3.2.1, 3.4.0, 3.2.2, 3.2.3, 3.3.1, 3.3.2, 
> 3.5.0, 3.4.1, 3.6.0, 3.5.1, 3.7.0
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 3.5.2, 3.7.0, 3.6.1
>
>
> Kafka Connect supports deferred resolution of imbalances when using the 
> incremental rebalancing algorithm introduced in 
> [KIP-415|https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect].
>  When enabled, this feature introduces a configurable delay period between 
> when "lost" assignments (i.e., connectors and tasks that were assigned to a 
> worker in the previous round of rebalance but are not assigned to a worker 
> during the current round of rebalance) are detected and when they are 
> reassigned to a worker. The delay can be configured with the 
> {{scheduled.rebalance.max.delay.ms}} property.
> If this property is set to 0, then there should be no delay between when lost 
> assignments are detected and when they are reassigned. Instead, however, this 
> configuration can cause lost assignments to be withheld during a rebalance, 
> remaining unassigned until the next rebalance, which, because scheduled 
> delays are disabled, will not happen on its own and will only take place when 
> unrelated conditions warrant it (such as the creation or deletion of a 
> connector, a worker joining or leaving the cluster, new task configs being 
> generated for a connector, etc.).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15575) Prevent Connectors from exceeding tasks.max configuration

2023-11-07 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-15575:
--
Labels: kip  (was: )

> Prevent Connectors from exceeding tasks.max configuration
> -
>
> Key: KAFKA-15575
> URL: https://issues.apache.org/jira/browse/KAFKA-15575
> Project: Kafka
>  Issue Type: Task
>  Components: KafkaConnect
>Reporter: Greg Harris
>Assignee: Chris Egerton
>Priority: Minor
>  Labels: kip
>
> The Connector::taskConfigs(int maxTasks) function is used by Connectors to 
> enumerate tasks configurations. This takes an argument which comes from the 
> tasks.max connector config. This is the Javadoc for that method:
> {noformat}
> /**
>  * Returns a set of configurations for Tasks based on the current 
> configuration,
>  * producing at most {@code maxTasks} configurations.
>  *
>  * @param maxTasks maximum number of configurations to generate
>  * @return configurations for Tasks
>  */
> public abstract List> taskConfigs(int maxTasks);
> {noformat}
> This includes the constraint that the number of tasks is at most maxTasks, 
> but this constraint is not enforced by the framework.
>  
> We should begin enforcing this constraint by dropping configs that exceed the 
> limit, and logging a warning. For sink connectors this should harmlessly 
> rebalance the consumer subscriptions onto the remaining tasks. For source 
> connectors that distribute their work via task configs, this may result in an 
> interruption in data transfer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15575) Prevent Connectors from exceeding tasks.max configuration

2023-11-07 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton reassigned KAFKA-15575:
-

Assignee: Chris Egerton

> Prevent Connectors from exceeding tasks.max configuration
> -
>
> Key: KAFKA-15575
> URL: https://issues.apache.org/jira/browse/KAFKA-15575
> Project: Kafka
>  Issue Type: Task
>  Components: KafkaConnect
>Reporter: Greg Harris
>Assignee: Chris Egerton
>Priority: Minor
>
> The Connector::taskConfigs(int maxTasks) function is used by Connectors to 
> enumerate tasks configurations. This takes an argument which comes from the 
> tasks.max connector config. This is the Javadoc for that method:
> {noformat}
> /**
>  * Returns a set of configurations for Tasks based on the current 
> configuration,
>  * producing at most {@code maxTasks} configurations.
>  *
>  * @param maxTasks maximum number of configurations to generate
>  * @return configurations for Tasks
>  */
> public abstract List> taskConfigs(int maxTasks);
> {noformat}
> This includes the constraint that the number of tasks is at most maxTasks, 
> but this constraint is not enforced by the framework.
>  
> We should begin enforcing this constraint by dropping configs that exceed the 
> limit, and logging a warning. For sink connectors this should harmlessly 
> rebalance the consumer subscriptions onto the remaining tasks. For source 
> connectors that distribute their work via task configs, this may result in an 
> interruption in data transfer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15680) Partition-Count is not getting updated Correctly in the Incremental Co-operative Rebalancing(ICR) Mode of Rebalancing

2023-11-06 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15680?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-15680:
--
Fix Version/s: 3.6.1

> Partition-Count is not getting updated Correctly in the Incremental 
> Co-operative Rebalancing(ICR) Mode of Rebalancing
> -
>
> Key: KAFKA-15680
> URL: https://issues.apache.org/jira/browse/KAFKA-15680
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 3.0.1
>Reporter: Pritam Kumar
>Assignee: Pritam Kumar
>Priority: Minor
> Fix For: 3.7.0, 3.6.1
>
>
> * In ICR(Incremental Cooperative Rebalancing) mode, whenever a new worker, 
> say Worker 3 joins, a new global assignment is computed by the leader, say 
> Worker1, that results in the revocation of some tasks from each existing 
> worker i.e Worker1 and Worker2.
>  * Once the new member join is completed, 
> *ConsumerCoordinator.OnJoinComplete()* method is called which primarily 
> computes all the new partitions assigned and the partitions which are revoked 
> and updates the subscription Object.
>  * If it was the case of revocation which we check by checking the 
> “partitonsRevoked” list, we call the method {*}“invoke{*}PartitionRevoked()” 
> which internally calls “updatePartitionCount()” which fetches partition from 
> the *assignment* object which is yet not updated by the new assignment.
>  * It is only just before calling the “{*}invokePartitionsAssigned{*}()” 
> method that we update the *assignment* by invoking the following → 
> *subscriptions.assignFromSubscribed(assignedPartitions);*



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15680) Partition-Count is not getting updated Correctly in the Incremental Co-operative Rebalancing(ICR) Mode of Rebalancing

2023-11-06 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15680?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-15680.
---
Fix Version/s: 3.7.0
   (was: 3.6.1)
   Resolution: Fixed

> Partition-Count is not getting updated Correctly in the Incremental 
> Co-operative Rebalancing(ICR) Mode of Rebalancing
> -
>
> Key: KAFKA-15680
> URL: https://issues.apache.org/jira/browse/KAFKA-15680
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 3.0.1
>Reporter: Pritam Kumar
>Assignee: Pritam Kumar
>Priority: Minor
> Fix For: 3.7.0
>
>
> * In ICR(Incremental Cooperative Rebalancing) mode, whenever a new worker, 
> say Worker 3 joins, a new global assignment is computed by the leader, say 
> Worker1, that results in the revocation of some tasks from each existing 
> worker i.e Worker1 and Worker2.
>  * Once the new member join is completed, 
> *ConsumerCoordinator.OnJoinComplete()* method is called which primarily 
> computes all the new partitions assigned and the partitions which are revoked 
> and updates the subscription Object.
>  * If it was the case of revocation which we check by checking the 
> “partitonsRevoked” list, we call the method {*}“invoke{*}PartitionRevoked()” 
> which internally calls “updatePartitionCount()” which fetches partition from 
> the *assignment* object which is yet not updated by the new assignment.
>  * It is only just before calling the “{*}invokePartitionsAssigned{*}()” 
> method that we update the *assignment* by invoking the following → 
> *subscriptions.assignFromSubscribed(assignedPartitions);*



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15787) Investigate new test case failure - testReplicateSourceDefault - MirrorConnectorsIntegrationBaseTest

2023-11-06 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15787?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17783269#comment-17783269
 ] 

Chris Egerton commented on KAFKA-15787:
---

[~apoorvmittal10] please check for duplicates before filing any more tickets 
like this. Thanks!

> Investigate new test case failure - testReplicateSourceDefault - 
> MirrorConnectorsIntegrationBaseTest
> 
>
> Key: KAFKA-15787
> URL: https://issues.apache.org/jira/browse/KAFKA-15787
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Apoorv Mittal
>Priority: Major
>
> PR - [https://github.com/apache/kafka/pull/14621] has 7 new test case failure 
> which are not related to the PR though. This Jira tracks the failure of these 
> tests for investigation if current changes somehow impact the tests.
> CI: 
> [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14621/12/tests/]
> Failed tests
> Build / JDK 17 and Scala 2.13 / 
> testTransactionAfterTransactionIdExpiresButProducerIdRemains(String).quorum=kraft
>  – kafka.api.ProducerIdExpirationTest
> 8s
> Build / JDK 8 and Scala 2.12 / 
> testBumpTransactionalEpoch(String).quorum=kraft – kafka.api.TransactionsTest
> 1m 20s
> Build / JDK 11 and Scala 2.13 / testReplicateSourceDefault() – 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest
> 2m 15s
> Build / JDK 11 and Scala 2.13 / testReplicateSourceDefault() – 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest
> 1m 51s
> Build / JDK 11 and Scala 2.13 / 
> testDeleteCmdNonExistingGroup(String).quorum=kraft – 
> kafka.admin.DeleteConsumerGroupsTest
> 11s
> Build / JDK 11 and Scala 2.13 / testTimeouts() – 
> org.apache.kafka.controller.QuorumControllerTest
> <1s
> Build / JDK 11 and Scala 2.13 / 
> testHighWaterMarkAfterPartitionReassignment(String).quorum=kraft – 
> org.apache.kafka.tools.reassign.ReassignPartitionsIntegrationTest



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15787) Investigate new test case failure - testReplicateSourceDefault - MirrorConnectorsIntegrationBaseTest

2023-11-06 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15787?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-15787.
---
Resolution: Duplicate

> Investigate new test case failure - testReplicateSourceDefault - 
> MirrorConnectorsIntegrationBaseTest
> 
>
> Key: KAFKA-15787
> URL: https://issues.apache.org/jira/browse/KAFKA-15787
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Apoorv Mittal
>Priority: Major
>
> PR - [https://github.com/apache/kafka/pull/14621] has 7 new test case failure 
> which are not related to the PR though. This Jira tracks the failure of these 
> tests for investigation if current changes somehow impact the tests.
> CI: 
> [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14621/12/tests/]
> Failed tests
> Build / JDK 17 and Scala 2.13 / 
> testTransactionAfterTransactionIdExpiresButProducerIdRemains(String).quorum=kraft
>  – kafka.api.ProducerIdExpirationTest
> 8s
> Build / JDK 8 and Scala 2.12 / 
> testBumpTransactionalEpoch(String).quorum=kraft – kafka.api.TransactionsTest
> 1m 20s
> Build / JDK 11 and Scala 2.13 / testReplicateSourceDefault() – 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest
> 2m 15s
> Build / JDK 11 and Scala 2.13 / testReplicateSourceDefault() – 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest
> 1m 51s
> Build / JDK 11 and Scala 2.13 / 
> testDeleteCmdNonExistingGroup(String).quorum=kraft – 
> kafka.admin.DeleteConsumerGroupsTest
> 11s
> Build / JDK 11 and Scala 2.13 / testTimeouts() – 
> org.apache.kafka.controller.QuorumControllerTest
> <1s
> Build / JDK 11 and Scala 2.13 / 
> testHighWaterMarkAfterPartitionReassignment(String).quorum=kraft – 
> org.apache.kafka.tools.reassign.ReassignPartitionsIntegrationTest



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15761) ConnectorRestartApiIntegrationTest.testMultiWorkerRestartOnlyConnector is flaky

2023-10-31 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15761?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-15761.
---
Resolution: Duplicate

> ConnectorRestartApiIntegrationTest.testMultiWorkerRestartOnlyConnector is 
> flaky
> ---
>
> Key: KAFKA-15761
> URL: https://issues.apache.org/jira/browse/KAFKA-15761
> Project: Kafka
>  Issue Type: Bug
>  Components: unit tests
>Reporter: Calvin Liu
>Priority: Major
>
> Build / JDK 21 and Scala 2.13 / testMultiWorkerRestartOnlyConnector – 
> org.apache.kafka.connect.integration.ConnectorRestartApiIntegrationTest
> {code:java}
> java.lang.AssertionError: Failed to stop connector and tasks within 12ms  
> at org.junit.Assert.fail(Assert.java:89)at 
> org.junit.Assert.assertTrue(Assert.java:42)  at 
> org.apache.kafka.connect.integration.ConnectorRestartApiIntegrationTest.runningConnectorAndTasksRestart(ConnectorRestartApiIntegrationTest.java:273)
>  at 
> org.apache.kafka.connect.integration.ConnectorRestartApiIntegrationTest.testMultiWorkerRestartOnlyConnector(ConnectorRestartApiIntegrationTest.java:231)
>  at 
> java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
> at java.base/java.lang.reflect.Method.invoke(Method.java:580)   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) 
>at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)  
> at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)  at 
> org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)  at 
> org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>  at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15761) ConnectorRestartApiIntegrationTest.testMultiWorkerRestartOnlyConnector is flaky

2023-10-31 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17781455#comment-17781455
 ] 

Chris Egerton commented on KAFKA-15761:
---

[~calvinliu] thanks for reporting this, but there's already an open ticket for 
that test. Can you please check for existing tickets before filing new ones in 
the future? Thanks!

> ConnectorRestartApiIntegrationTest.testMultiWorkerRestartOnlyConnector is 
> flaky
> ---
>
> Key: KAFKA-15761
> URL: https://issues.apache.org/jira/browse/KAFKA-15761
> Project: Kafka
>  Issue Type: Bug
>  Components: unit tests
>Reporter: Calvin Liu
>Priority: Major
>
> Build / JDK 21 and Scala 2.13 / testMultiWorkerRestartOnlyConnector – 
> org.apache.kafka.connect.integration.ConnectorRestartApiIntegrationTest
> {code:java}
> java.lang.AssertionError: Failed to stop connector and tasks within 12ms  
> at org.junit.Assert.fail(Assert.java:89)at 
> org.junit.Assert.assertTrue(Assert.java:42)  at 
> org.apache.kafka.connect.integration.ConnectorRestartApiIntegrationTest.runningConnectorAndTasksRestart(ConnectorRestartApiIntegrationTest.java:273)
>  at 
> org.apache.kafka.connect.integration.ConnectorRestartApiIntegrationTest.testMultiWorkerRestartOnlyConnector(ConnectorRestartApiIntegrationTest.java:231)
>  at 
> java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
> at java.base/java.lang.reflect.Method.invoke(Method.java:580)   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) 
>at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)  
> at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)  at 
> org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)  at 
> org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>  at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15676) Scheduled rebalance delay for Connect is unnecessarily triggered when group coordinator bounces

2023-10-26 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15676?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton reassigned KAFKA-15676:
-

Assignee: (was: Chris Egerton)

> Scheduled rebalance delay for Connect is unnecessarily triggered when group 
> coordinator bounces
> ---
>
> Key: KAFKA-15676
> URL: https://issues.apache.org/jira/browse/KAFKA-15676
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Priority: Major
>
> When a Connect worker loses contact with the group coordinator, it 
> voluntarily gives up (i.e., stops) its assignment of connectors and tasks 
> (for more context, see KAFKA-9184). However, this change in state is not 
> relayed to the worker's instance of the [IncrementalCooperativeAssignor 
> class|https://github.com/apache/kafka/blob/d144b7ee387308a59e52cbdabc7b66dd3b2926cc/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java].
> If the group coordinator for a Connect cluster is unavailable for long 
> enough, all of the workers in the cluster will revoke their assigned 
> connectors and tasks and, upon rejoining the group, report that they have 
> been assigned no connectors and tasks.
> If a worker's member ID is reset before rejoining the group (which can happen 
> if, for example, the [maximum poll 
> interval|https://kafka.apache.org/documentation.html#consumerconfigs_max.poll.interval.ms]
>  for the worker is exceeded), the leader of the cluster will not act as if 
> the worker had rejoined the group; instead, it will act as if the worker had 
> left the group and a new, unrelated worker had joined during the same 
> rebalance. This will cause the scheduled rebalance delay to be triggered, and 
> for the connectors and tasks previously-assigned to that worker to remain 
> unassigned until the delay expires.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15693) Disabling scheduled rebalance delay in Connect can lead to indefinitely unassigned connectors and tasks

2023-10-26 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17779977#comment-17779977
 ] 

Chris Egerton commented on KAFKA-15693:
---

For all affected versions, the most graceful workaround is to instead set the 
{{scheduled.rebalance.max.delay.ms}} property to an extremely low value (such 
as 1) instead of 0.

> Disabling scheduled rebalance delay in Connect can lead to indefinitely 
> unassigned connectors and tasks
> ---
>
> Key: KAFKA-15693
> URL: https://issues.apache.org/jira/browse/KAFKA-15693
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.3.0, 2.4.0, 2.3.1, 2.5.0, 2.4.1, 2.6.0, 2.5.1, 2.7.0, 
> 2.6.1, 2.8.0, 2.7.1, 2.6.2, 3.1.0, 2.6.3, 2.7.2, 2.8.1, 3.0.0, 3.0.1, 2.8.2, 
> 3.2.0, 3.1.1, 3.3.0, 3.0.2, 3.1.2, 3.2.1, 3.4.0, 3.2.2, 3.2.3, 3.3.1, 3.3.2, 
> 3.5.0, 3.4.1, 3.6.0, 3.5.1, 3.7.0
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
>
> Kafka Connect supports deferred resolution of imbalances when using the 
> incremental rebalancing algorithm introduced in 
> [KIP-415|https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect].
>  When enabled, this feature introduces a configurable delay period between 
> when "lost" assignments (i.e., connectors and tasks that were assigned to a 
> worker in the previous round of rebalance but are not assigned to a worker 
> during the current round of rebalance) are detected and when they are 
> reassigned to a worker. The delay can be configured with the 
> {{scheduled.rebalance.max.delay.ms}} property.
> If this property is set to 0, then there should be no delay between when lost 
> assignments are detected and when they are reassigned. Instead, however, this 
> configuration can cause lost assignments to be withheld during a rebalance, 
> remaining unassigned until the next rebalance, which, because scheduled 
> delays are disabled, will not happen on its own and will only take place when 
> unrelated conditions warrant it (such as the creation or deletion of a 
> connector, a worker joining or leaving the cluster, new task configs being 
> generated for a connector, etc.).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15693) Disabling scheduled rebalance delay in Connect can lead to indefinitely unassigned connectors and tasks

2023-10-26 Thread Chris Egerton (Jira)
Chris Egerton created KAFKA-15693:
-

 Summary: Disabling scheduled rebalance delay in Connect can lead 
to indefinitely unassigned connectors and tasks
 Key: KAFKA-15693
 URL: https://issues.apache.org/jira/browse/KAFKA-15693
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 3.5.1, 3.6.0, 3.4.1, 3.5.0, 3.3.2, 3.3.1, 3.2.3, 3.2.2, 
3.4.0, 3.2.1, 3.1.2, 3.0.2, 3.3.0, 3.1.1, 3.2.0, 2.8.2, 3.0.1, 3.0.0, 2.8.1, 
2.7.2, 2.6.3, 3.1.0, 2.6.2, 2.7.1, 2.8.0, 2.6.1, 2.7.0, 2.5.1, 2.6.0, 2.4.1, 
2.5.0, 2.3.1, 2.4.0, 2.3.0, 3.7.0
Reporter: Chris Egerton
Assignee: Chris Egerton


Kafka Connect supports deferred resolution of imbalances when using the 
incremental rebalancing algorithm introduced in 
[KIP-415|https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect].
 When enabled, this feature introduces a configurable delay period between when 
"lost" assignments (i.e., connectors and tasks that were assigned to a worker 
in the previous round of rebalance but are not assigned to a worker during the 
current round of rebalance) are detected and when they are reassigned to a 
worker. The delay can be configured with the 
{{scheduled.rebalance.max.delay.ms}} property.

If this property is set to 0, then there should be no delay between when lost 
assignments are detected and when they are reassigned. Instead, however, this 
configuration can cause lost assignments to be withheld during a rebalance, 
remaining unassigned until the next rebalance, which, because scheduled delays 
are disabled, will not happen on its own and will only take place when 
unrelated conditions warrant it (such as the creation or deletion of a 
connector, a worker joining or leaving the cluster, new task configs being 
generated for a connector, etc.).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15676) Scheduled rebalance delay for Connect is unnecessarily triggered when group coordinator bounces

2023-10-24 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15676?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-15676:
--
Description: 
When a Connect worker loses contact with the group coordinator, it voluntarily 
gives up (i.e., stops) its assignment of connectors and tasks (for more 
context, see KAFKA-9184). However, this change in state is not relayed to the 
worker's instance of the [IncrementalCooperativeAssignor 
class|https://github.com/apache/kafka/blob/d144b7ee387308a59e52cbdabc7b66dd3b2926cc/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java].

If the group coordinator for a Connect cluster is unavailable for long enough, 
all of the workers in the cluster will revoke their assigned connectors and 
tasks and, upon rejoining the group, report that they have been assigned no 
connectors and tasks.

If a worker's member ID is reset before rejoining the group (which can happen 
if, for example, the [maximum poll 
interval|https://kafka.apache.org/documentation.html#consumerconfigs_max.poll.interval.ms]
 for the worker is exceeded), the leader of the cluster will not act as if the 
worker had rejoined the group; instead, it will act as if the worker had left 
the group and a new, unrelated worker had joined during the same rebalance. 
This will cause the scheduled rebalance delay to be triggered, and for the 
connectors and tasks previously-assigned to that worker to remain unassigned 
until the delay expires.

  was:
When a Connect worker loses contact with the group coordinator, it voluntarily 
gives up (i.e., stops) its assignment of connectors and tasks (for more 
context, see KAFKA-9184). However, this change in state is not relayed to the 
worker's instance of the [IncrementalCooperativeAssignor 
class|https://github.com/apache/kafka/blob/d144b7ee387308a59e52cbdabc7b66dd3b2926cc/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java].

If the group coordinator for a Connect cluster is unavailable for long enough, 
all of the workers in cluster will revoke their assigned connectors and tasks 
and, upon rejoining the group, report that they have been assigned no 
connectors and tasks.

If a worker's member ID is reset before rejoining the group (which can happen 
if, for example, the [maximum poll 
interval|https://kafka.apache.org/documentation.html#consumerconfigs_max.poll.interval.ms]
 for the worker is exceeded), the leader of the cluster will not act as if the 
worker had rejoined the group; instead, it will act as if the worker had left 
the group and a new, unrelated worker had joined during the same rebalance. 
This will cause the scheduled rebalance delay to be triggered, and for the 
connectors and tasks previously-assigned to that worker to remain unassigned 
until the delay expires.


> Scheduled rebalance delay for Connect is unnecessarily triggered when group 
> coordinator bounces
> ---
>
> Key: KAFKA-15676
> URL: https://issues.apache.org/jira/browse/KAFKA-15676
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
>
> When a Connect worker loses contact with the group coordinator, it 
> voluntarily gives up (i.e., stops) its assignment of connectors and tasks 
> (for more context, see KAFKA-9184). However, this change in state is not 
> relayed to the worker's instance of the [IncrementalCooperativeAssignor 
> class|https://github.com/apache/kafka/blob/d144b7ee387308a59e52cbdabc7b66dd3b2926cc/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java].
> If the group coordinator for a Connect cluster is unavailable for long 
> enough, all of the workers in the cluster will revoke their assigned 
> connectors and tasks and, upon rejoining the group, report that they have 
> been assigned no connectors and tasks.
> If a worker's member ID is reset before rejoining the group (which can happen 
> if, for example, the [maximum poll 
> interval|https://kafka.apache.org/documentation.html#consumerconfigs_max.poll.interval.ms]
>  for the worker is exceeded), the leader of the cluster will not act as if 
> the worker had rejoined the group; instead, it will act as if the worker had 
> left the group and a new, unrelated worker had joined during the same 
> rebalance. This will cause the scheduled rebalance delay to be triggered, and 
> for the connectors and tasks previously-assigned to that worker to remain 
> unassigned until the delay expires.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   3   4   5   6   7   8   9   10   >