[jira] [Commented] (KAFKA-13431) Sink Connectors: Support topic-mutating SMTs for async connectors (preCommit users)

2022-08-20 Thread Diego Erdody (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17582373#comment-17582373
 ] 

Diego Erdody commented on KAFKA-13431:
--

Hi [~yash.mayya] . No, I'm not planning to work on this anymore, all yours. 
Thanks!

> Sink Connectors: Support topic-mutating SMTs for async connectors (preCommit 
> users)
> ---
>
> Key: KAFKA-13431
> URL: https://issues.apache.org/jira/browse/KAFKA-13431
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Diego Erdody
>Assignee: Diego Erdody
>Priority: Major
>  Labels: needs-kip
>
> There's currently an incompatibility between Sink connectors overriding the 
> {{SinkTask.preCommit}} method (for asynchronous processing) and SMTs that 
> mutate the topic field.
> The problem was present since the {{preCommit}} method inception and is 
> rooted in a mismatch between the topic/partition that is passed to 
> {{open/preCommit}} (the original topic and partition before applying any 
> transformations) and the topic partition that is present in the SinkRecord 
> that the {{SinkTask.put}} method receives (after transformations are 
> applied). Since that's all the information the connector has to implement any 
> kind of internal offset tracking, the topic/partitions it can return in 
> preCommit will correspond to the transformed topic, when the framework 
> actually expects it to be the original topic.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14000) Kafka-connect standby server shows empty tasks list

2022-08-20 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14000?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17582346#comment-17582346
 ] 

Sagar Rao commented on KAFKA-14000:
---

hi [~xinzou] , Thanks for filing the bug. One thing that I wanted to know was 
how many partitions does the status topic have in your case? Generally it's 
recommended to have a single partition for the status topic to maintain the 
ordering of events. 

> Kafka-connect standby server shows empty tasks list
> ---
>
> Key: KAFKA-14000
> URL: https://issues.apache.org/jira/browse/KAFKA-14000
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.6.0
>Reporter: Xinyu Zou
>Assignee: Sagar Rao
>Priority: Major
> Attachments: kafka-connect-trace.log
>
>
> I'm using Kafka-connect distributed mode. There're two servers. One active 
> and one standby. The standby server sometimes shows empty tasks list in 
> status rest API response.
> curl host:8443/connectors/name1/status
> {code:java}
> {
>     "connector": {
>         "state": "RUNNING",
>         "worker_id": "1.2.3.4:10443"
>     },
>     "name": "name1",
>     "tasks": [],
>     "type": "source"
> } {code}
> I enabled TRACE log and checked. As required, the connect-status topic is set 
> to cleanup.policy=compact. But messages in the topic won't be compacted 
> timely. They will be compacted in a specific interval. So usually there're 
> more than one messages with same key. E.g. When kafka-connect is launched 
> there's no connector running. And then we start a new connector. Then there 
> will be two messages in connect-status topic:
> status-task-name1 : state=RUNNING, workerId='10.251.170.166:10443', 
> generation=100
> status-task-name1 : __
> Please check the log file [^kafka-connect-trace.log]. We can see that the 
> tasks status was removed finally. But actually the empty status was not the 
> newest message in topic connect-status.
>  
> When reading status from connect-status topic, it doesn't sort messages by 
> generation.
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java]
> So I think this could be improved. We can either sort the messages after poll 
> or compare generation value before we choose correct status message.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-13914) Implement kafka-metadata-quorum.sh

2022-08-20 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13914?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-13914.
-
Fix Version/s: 3.3.0
   Resolution: Fixed

> Implement kafka-metadata-quorum.sh
> --
>
> Key: KAFKA-13914
> URL: https://issues.apache.org/jira/browse/KAFKA-13914
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: dengziming
>Priority: Major
> Fix For: 3.3.0
>
>
> KIP-595 documents a tool for describing quorum status 
> `kafka-metadata-quorum.sh`: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-595%3A+A+Raft+Protocol+for+the+Metadata+Quorum#KIP595:ARaftProtocolfortheMetadataQuorum-ToolingSupport.]
>   We need to implement this.
> Note that this depends on the Admin API for `DescribeQuorum`, which is 
> proposed in KIP-836: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-836%3A+Addition+of+Information+in+DescribeQuorumResponse+about+Voter+Lag.]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] dengziming commented on pull request #12469: KAFKA-13914: Add command line tool kafka-metadata-quorum.sh

2022-08-20 Thread GitBox


dengziming commented on PR #12469:
URL: https://github.com/apache/kafka/pull/12469#issuecomment-1221354360

   Thanks for your patience @hachikuji 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14091) Suddenly-killed tasks can leave hanging transactions open

2022-08-20 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-14091:
--
Description: 
Right now, if a task running with exactly-once support is killed ungracefully, 
it may leave a hanging transaction open. If the transaction included writes to 
the global offsets topic, then startup for future workers becomes blocked on 
that transaction expiring.

Ideally, we could identify these kinds of hanging transactions and proactively 
abort them.

Unfortunately, there are a few facts that make this fairly complicated:
 # Workers read to the end of the offsets topic during startup, before joining 
the cluster
 # Workers do not know which tasks they are assigned until they join the cluster

The result of these facts is that we cannot trust workers that are restarted 
shortly after being ungracefully shut down to fence out their own hanging 
transactions, since any hanging transactions would prevent them from being able 
to join the group and receive their task assignment in the first place.

We could possibly accomplish this by having the leader proactively abort any 
open transactions for tasks on workers that appear to have left the cluster 
during a rebalance. This would not require us to wait for the scheduled 
rebalance delay to elapse, since the intent of the delay is to provide a buffer 
between when workers leave and when their connectors/tasks are reallocated 
across the cluster (and, if the worker is able to rejoin before that buffer is 
consumed, then give it back the same connectors/tasks it was running 
previously); aborting transactions for tasks on these workers would not 
interfere with that goal.

 

It's also possible that we may have to handle the case where a 
[cancelled|https://github.com/apache/kafka/blob/badfbacdd09a9ee8821847f4b28d98625f354ed7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L274-L287]
 task leaves a transaction open; I have yet to confirm whether this is 
possible, though.

  was:
Right now, if a task running with exactly-once support is killed ungracefully, 
it may leave a hanging transaction open. If the transaction included writes to 
the offsets topic, then startup for future workers becomes blocked on that 
transaction expiring.

Ideally, we could identify these kinds of hanging transactions and proactively 
abort them.

Unfortunately, there are a few facts that make this fairly complicated:
 # Workers read to the end of the offsets topic during startup, before joining 
the cluster
 # Workers do not know which tasks they are assigned until they join the cluster

The result of these facts is that we cannot trust workers that are restarted 
shortly after being ungracefully shut down to fence out their own hanging 
transactions, since any hanging transactions would prevent them from being able 
to join the group and receive their task assignment in the first place.

We could possibly accomplish this by having the leader proactively abort any 
open transactions for tasks on workers that appear to have left the cluster 
during a rebalance. This would not require us to wait for the scheduled 
rebalance delay to elapse, since the intent of the delay is to provide a buffer 
between when workers leave and when their connectors/tasks are reallocated 
across the cluster (and, if the worker is able to rejoin before that buffer is 
consumed, then give it back the same connectors/tasks it was running 
previously); aborting transactions for tasks on these workers would not 
interfere with that goal.

 

It's also possible that we may have to handle the case where a 
[cancelled|https://github.com/apache/kafka/blob/badfbacdd09a9ee8821847f4b28d98625f354ed7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L274-L287]
 task leaves a transaction open; I have yet to confirm whether this is 
possible, though.


> Suddenly-killed tasks can leave hanging transactions open
> -
>
> Key: KAFKA-14091
> URL: https://issues.apache.org/jira/browse/KAFKA-14091
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Sagar Rao
>Priority: Major
>
> Right now, if a task running with exactly-once support is killed 
> ungracefully, it may leave a hanging transaction open. If the transaction 
> included writes to the global offsets topic, then startup for future workers 
> becomes blocked on that transaction expiring.
> Ideally, we could identify these kinds of hanging transactions and 
> proactively abort them.
> Unfortunately, there are a few facts that make this fairly complicated:
>  # Workers read to the end of the offsets topic during startup, before 
> joining the cluster
>  # Workers d

[GitHub] [kafka] hachikuji merged pull request #12469: KAFKA-13914: Add command line tool kafka-metadata-quorum.sh

2022-08-20 Thread GitBox


hachikuji merged PR #12469:
URL: https://github.com/apache/kafka/pull/12469


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #12544: KAFKA-14098: Add meaningful default client IDs for Connect workers

2022-08-20 Thread GitBox


C0urante commented on code in PR #12544:
URL: https://github.com/apache/kafka/pull/12544#discussion_r950708756


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##
@@ -983,7 +983,7 @@ private List 
sinkTaskReporters(ConnectorTaskId id, SinkConnectorC
 if (topic != null && !topic.isEmpty()) {
 Map producerProps = 
baseProducerConfigs(id.connector(), "connector-dlq-producer-" + id, config, 
connConfig, connectorClass,
 
connectorClientConfigOverridePolicy, kafkaClusterId);
-Map adminProps = adminConfigs(id.connector(), 
"connector-dlq-adminclient-", config, connConfig, connectorClass, 
connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SINK);
+Map adminProps = adminConfigs(id.connector(), 
"connector-dlq-adminclient-" + id, config, connConfig, connectorClass, 
connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SINK);

Review Comment:
   Small unrelated fix spotted while I was in the neighborhood. I checked the 
rest of this file for similar bugs; this appears to be the only one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #12544: KAFKA-14098: Add meaningful default client IDs for Connect workers

2022-08-20 Thread GitBox


C0urante commented on PR #12544:
URL: https://github.com/apache/kafka/pull/12544#issuecomment-1221326257

   @mimaison if you have a moment, would you mind giving this a pass?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante opened a new pull request, #12544: KAFKA-14098: Add meaningful default client IDs for Connect workers

2022-08-20 Thread GitBox


C0urante opened a new pull request, #12544:
URL: https://github.com/apache/kafka/pull/12544

   [Jira](https://issues.apache.org/jira/browse/KAFKA-14098)
   
   Adds the client ID `connect-cluster-${groupId}-${topic}` to the producers 
and consumers used by Connect workers to interact with internal topics, where 
`${groupId}` is the `group.id` in the worker config file, and `${topic}` is one 
of `configs`, `statuses`, or `offsets`. Since a single shared admin client is 
used across all three internal topics, it is given a client ID of 
`connect-cluster-${groupId}`.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #12536: KAFKA-14160: Streamline clusterId retrieval in Connect

2022-08-20 Thread GitBox


C0urante commented on code in PR #12536:
URL: https://github.com/apache/kafka/pull/12536#discussion_r950692858


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java:
##
@@ -409,6 +412,13 @@ public String groupId() {
 return null;
 }
 
+public String kafkaClusterId() {
+if (kafkaClusterId == null) {
+kafkaClusterId = ConnectUtils.lookupKafkaClusterId(this);
+}
+return kafkaClusterId;
+}

Review Comment:
   Should we move the `lookupKafkaClusterId` logic from `ConnectUtils` to this 
class, so that nobody accidentally starts using the variant that creates and 
discards an admin client once per call?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] songnon commented on a diff in pull request #12543: KAFKA-10149: Allow auto preferred leader election for partitions are not being reassigned

2022-08-20 Thread GitBox


songnon commented on code in PR #12543:
URL: https://github.com/apache/kafka/pull/12543#discussion_r950692749


##
core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala:
##
@@ -490,6 +490,41 @@ class ControllerIntegrationTest extends QuorumTestHarness {
   "failed to get expected partition state upon broker startup")
   }
 
+  @Test
+  def testAutoPreferredReplicaLeaderElectionWithReassigningPartitions(): Unit 
= {
+servers = makeServers(3, autoLeaderRebalanceEnable = true)
+val controllerId = TestUtils.waitUntilControllerElected(zkClient)
+val leaderBrokerId = servers.map(_.config.brokerId).filter(_ != 
controllerId).head
+val otherBrokerId = servers.map(_.config.brokerId).filter(e => e != 
controllerId && e != leaderBrokerId).head
+val tp = new TopicPartition("t", 0)
+val assignment = Map(tp.partition -> Seq(leaderBrokerId, controllerId))
+TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = 
assignment, servers = servers)
+val reassigningTp = new TopicPartition("reassigning", 0)
+val reassigningTpAssignment = Map(reassigningTp.partition -> 
Seq(controllerId))
+
+TestUtils.createTopic(zkClient, reassigningTp.topic, 
partitionReplicaAssignment = reassigningTpAssignment, servers = servers)
+servers(leaderBrokerId).shutdown()
+servers(leaderBrokerId).awaitShutdown()
+
+servers(otherBrokerId).shutdown()
+servers(otherBrokerId).awaitShutdown()
+waitForPartitionState(tp, firstControllerEpoch, controllerId, 
LeaderAndIsr.InitialLeaderEpoch + 1,
+  "failed to get expected partition state upon broker shutdown")
+
+val reassignment = Map(reassigningTp -> 
ReplicaAssignment(Seq(otherBrokerId), List(), List()))
+zkClient.createPartitionReassignment(reassignment.map { case (k, v) => k 
-> v.replicas })
+waitForPartitionState(reassigningTp, firstControllerEpoch, controllerId, 
LeaderAndIsr.InitialLeaderEpoch + 1,
+  "failed to get expected partition state during partition reassignment 
with offline replica")
+
+servers(leaderBrokerId).startup()
+waitForPartitionState(tp, firstControllerEpoch, leaderBrokerId, 
LeaderAndIsr.InitialLeaderEpoch + 2,
+  "failed to get expected partition state upon broker startup")
+
+servers(otherBrokerId).startup()
+waitForPartitionState(reassigningTp, firstControllerEpoch, otherBrokerId, 
LeaderAndIsr.InitialLeaderEpoch + 4,

Review Comment:
   Yes, it is.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14012) passing a "method" into the `Utils.closeQuietly` method cause NPE

2022-08-20 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14012?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-14012:
--
Fix Version/s: 3.4.0

> passing a "method" into the `Utils.closeQuietly` method cause NPE
> -
>
> Key: KAFKA-14012
> URL: https://issues.apache.org/jira/browse/KAFKA-14012
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.2.0
>Reporter: Luke Chen
>Assignee: Sagar Rao
>Priority: Major
> Fix For: 3.4.0
>
>
> Utils.closeQuietly method accepts `AutoCloseable` object, and close it. But 
> there are some places we passed "method" into Utils.closeQuietly, which 
> causes the object doesn't get closed as expected. 
> I found it appeared in:
> - WorkerConnector
> - AbstractWorkerSourceTask
> - KafkaConfigBackingStore
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14007) Connect header converters are never closed

2022-08-20 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14007?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-14007:
--
Fix Version/s: 3.4.0

> Connect header converters are never closed
> --
>
> Key: KAFKA-14007
> URL: https://issues.apache.org/jira/browse/KAFKA-14007
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Sagar Rao
>Priority: Major
> Fix For: 3.4.0
>
>
> The [HeaderConverter 
> interface|https://github.com/apache/kafka/blob/1e21201ea24389bdaccb8a462f3a53e356b58a58/connect/api/src/main/java/org/apache/kafka/connect/storage/HeaderConverter.java#L27]
>  extends {{Closeable}}, but {{HeaderConverter::close}} is never actually 
> invoked anywhere. We can and should start invoking it, probably wrapped in 
> [Utils::closeQuietly|https://github.com/apache/kafka/blob/1e21201ea24389bdaccb8a462f3a53e356b58a58/clients/src/main/java/org/apache/kafka/common/utils/Utils.java#L999-L1010]
>  so that any invalid logic in that method for custom header converters that 
> has to date gone undetected will not cause new task failures.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14000) Kafka-connect standby server shows empty tasks list

2022-08-20 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14000?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao reassigned KAFKA-14000:
-

Assignee: Sagar Rao

> Kafka-connect standby server shows empty tasks list
> ---
>
> Key: KAFKA-14000
> URL: https://issues.apache.org/jira/browse/KAFKA-14000
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.6.0
>Reporter: Xinyu Zou
>Assignee: Sagar Rao
>Priority: Major
> Attachments: kafka-connect-trace.log
>
>
> I'm using Kafka-connect distributed mode. There're two servers. One active 
> and one standby. The standby server sometimes shows empty tasks list in 
> status rest API response.
> curl host:8443/connectors/name1/status
> {code:java}
> {
>     "connector": {
>         "state": "RUNNING",
>         "worker_id": "1.2.3.4:10443"
>     },
>     "name": "name1",
>     "tasks": [],
>     "type": "source"
> } {code}
> I enabled TRACE log and checked. As required, the connect-status topic is set 
> to cleanup.policy=compact. But messages in the topic won't be compacted 
> timely. They will be compacted in a specific interval. So usually there're 
> more than one messages with same key. E.g. When kafka-connect is launched 
> there's no connector running. And then we start a new connector. Then there 
> will be two messages in connect-status topic:
> status-task-name1 : state=RUNNING, workerId='10.251.170.166:10443', 
> generation=100
> status-task-name1 : __
> Please check the log file [^kafka-connect-trace.log]. We can see that the 
> tasks status was removed finally. But actually the empty status was not the 
> newest message in topic connect-status.
>  
> When reading status from connect-status topic, it doesn't sort messages by 
> generation.
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java]
> So I think this could be improved. We can either sort the messages after poll 
> or compare generation value before we choose correct status message.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14015) ConfigProvider with ttl fails to restart tasks

2022-08-20 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao reassigned KAFKA-14015:
-

Assignee: Sagar Rao

> ConfigProvider with ttl fails to restart tasks
> --
>
> Key: KAFKA-14015
> URL: https://issues.apache.org/jira/browse/KAFKA-14015
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ross Lawley
>Assignee: Sagar Rao
>Priority: Major
>
> According to the 
> [KIP-297|https://cwiki.apache.org/confluence/display/KAFKA/KIP-297%3A+Externalizing+Secrets+for+Connect+Configurations#KIP297:ExternalizingSecretsforConnectConfigurations-SecretRotation]:
> {quote} * When the Herder receives the onChange() call, it will check a new 
> connector configuration property config.reload.action which can be one of the 
> following:
>  ** The value restart, which means to schedule a restart of the Connector and 
> all its Tasks. This will be the default.
>  ** The value none, which means to do nothing.{quote}
> However, the 
> [restartConnector|https://github.com/apache/kafka/blob/3.2.0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java#L287-L294]
>  method only restarts the connector and does not restart any tasks.  Suggest 
> calling {{restartConnectorAndTasks}} instead.
> The result is changed configurations provided by the ConfigProvider are not 
> picked up and existing tasks continue to use outdated configuration.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14012) passing a "method" into the `Utils.closeQuietly` method cause NPE

2022-08-20 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14012?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao resolved KAFKA-14012.
---
Resolution: Fixed

> passing a "method" into the `Utils.closeQuietly` method cause NPE
> -
>
> Key: KAFKA-14012
> URL: https://issues.apache.org/jira/browse/KAFKA-14012
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.2.0
>Reporter: Luke Chen
>Assignee: Sagar Rao
>Priority: Major
>
> Utils.closeQuietly method accepts `AutoCloseable` object, and close it. But 
> there are some places we passed "method" into Utils.closeQuietly, which 
> causes the object doesn't get closed as expected. 
> I found it appeared in:
> - WorkerConnector
> - AbstractWorkerSourceTask
> - KafkaConfigBackingStore
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14007) Connect header converters are never closed

2022-08-20 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14007?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao resolved KAFKA-14007.
---
Resolution: Fixed

> Connect header converters are never closed
> --
>
> Key: KAFKA-14007
> URL: https://issues.apache.org/jira/browse/KAFKA-14007
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Sagar Rao
>Priority: Major
>
> The [HeaderConverter 
> interface|https://github.com/apache/kafka/blob/1e21201ea24389bdaccb8a462f3a53e356b58a58/connect/api/src/main/java/org/apache/kafka/connect/storage/HeaderConverter.java#L27]
>  extends {{Closeable}}, but {{HeaderConverter::close}} is never actually 
> invoked anywhere. We can and should start invoking it, probably wrapped in 
> [Utils::closeQuietly|https://github.com/apache/kafka/blob/1e21201ea24389bdaccb8a462f3a53e356b58a58/clients/src/main/java/org/apache/kafka/common/utils/Utils.java#L999-L1010]
>  so that any invalid logic in that method for custom header converters that 
> has to date gone undetected will not cause new task failures.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] vamossagar12 commented on pull request #12485: KAFKA-14131: Adding InterruptException when reading to end of Offseto…

2022-08-20 Thread GitBox


vamossagar12 commented on PR #12485:
URL: https://github.com/apache/kafka/pull/12485#issuecomment-1221262360

   Thanks @C0urante . I think those are great points and I now realise the 
mistake I made with the PR. Here's what I am thinking and the answers to the 
rest of your points =>
   
   1) I agree for the most part that `InterruptedException` may or may not be 
thrown and it's pretty much unreliable. Having said that, if it does occur i.e 
the thread in `herderExecutor` does get interrupted, the backing offset kafka 
consumers would definitely get into an infinite loop. IMO, that's something 
that needs to be handled. That's where the original approach in the PR  is 
wrong as it's catching the `InterruptedException` in `consumer.poll` which may 
never even see it. Instead, I am thinking to add it in the 
`DistributedHerder.stop` method. I see it catches `InterruptedException` but 
doesn't do anything about it. What I think that can be done is that once the 
`herderExecutor` thread is interrupted, we can look to force close the Kafka 
consumers. This can be done by invoking `stopServices`. At this point, some of 
the consumers might be closed and some of them might not be, and here we can 
leverage the `stopRequested` flag i.e if this flag is set, then for all 
practical purpo
 ses the log should get closed. So, we can add that check in 
`KafkaConsumer.stop`. Let me know if this is making sense.
   
   - Does standalone mode need to be taken into account as well?
   
   Yeah I think it can have a similar issue so probably that can also be 
accounted for.
   
   - Will any logic that we add in DistributedHerder::stop conflict with the 
logic contained in halt, which will presumably be called once the herder 
finishes/aborts startup?
   
   That's a great point.. It could happen that `stopServices` might be invoked 
from both halt and from `InterruptedException`. So, what we would need to 
ensure is, that calling the `stop` method multiple times should be safe. The 
check that I described using `stopRequested` might be helpful. WDYT?
   
   - Will anything we add to DistributedHerder::stop run the risk of blocking 
indefinitely? If so, what are the potential ramifications?
   
   The approach I described, maybe not. Would love to hear your thoughts on 
this though.
   
   - None of this should result in any error messages being logged unless an 
operation has actually failed (instead of just being terminated prematurely)
   
   +1
   
   Regarding the last 3 points, I am not sure at this moment. Maybe if this 
approach sounds ok, we can get these answered. 
   - Can we ensure that the worker is able to gracefully leave the cluster if 
it's been able to join?
   - Can we ensure that the worker doesn't try to leave the cluster (or at the 
very least, cause any issues by trying to leave the cluster) if it hasn't been 
able to join?
   - If we're able to successfully abort one startup operation (such as reading 
to the end of the offsets topic), can we make sure that we don't even attempt 
any following startup operations (such as reading to the end of the status 
topic, which currently takes place after reading to the end of the offsets 
topic)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #12501: KAFKA-14097: Separate configuration for producer ID expiry

2022-08-20 Thread GitBox


dajac commented on code in PR #12501:
URL: https://github.com/apache/kafka/pull/12501#discussion_r950665710


##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -1295,7 +1298,8 @@ object KafkaConfig {
   .define(TransactionsAbortTimedOutTransactionCleanupIntervalMsProp, INT, 
Defaults.TransactionsAbortTimedOutTransactionsCleanupIntervalMS, atLeast(1), 
LOW, TransactionsAbortTimedOutTransactionsIntervalMsDoc)
   .define(TransactionsRemoveExpiredTransactionalIdCleanupIntervalMsProp, 
INT, Defaults.TransactionsRemoveExpiredTransactionsCleanupIntervalMS, 
atLeast(1), LOW, TransactionsRemoveExpiredTransactionsIntervalMsDoc)
 
-  .define(ProducerIdExpirationMsProp, INT, 
Defaults.ProducerIdExpirationMs, atLeast(1), HIGH, ProducerIdExpirationMsDoc)
+  .define(ProducerIdExpirationMsProp, INT, 
Defaults.ProducerIdExpirationMs, atLeast(1), LOW, ProducerIdExpirationMsDoc)
+  .defineInternal(ProducerIdExpirationCheckIntervalMsProp, INT, 
Defaults.ProducerIdExpirationCheckIntervalMs, atLeast(1), LOW, 
ProducerIdExpirationMsDoc)

Review Comment:
   Right.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org