[jira] [Commented] (KAFKA-13431) Sink Connectors: Support topic-mutating SMTs for async connectors (preCommit users)
[ https://issues.apache.org/jira/browse/KAFKA-13431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17582373#comment-17582373 ] Diego Erdody commented on KAFKA-13431: -- Hi [~yash.mayya] . No, I'm not planning to work on this anymore, all yours. Thanks! > Sink Connectors: Support topic-mutating SMTs for async connectors (preCommit > users) > --- > > Key: KAFKA-13431 > URL: https://issues.apache.org/jira/browse/KAFKA-13431 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Diego Erdody >Assignee: Diego Erdody >Priority: Major > Labels: needs-kip > > There's currently an incompatibility between Sink connectors overriding the > {{SinkTask.preCommit}} method (for asynchronous processing) and SMTs that > mutate the topic field. > The problem was present since the {{preCommit}} method inception and is > rooted in a mismatch between the topic/partition that is passed to > {{open/preCommit}} (the original topic and partition before applying any > transformations) and the topic partition that is present in the SinkRecord > that the {{SinkTask.put}} method receives (after transformations are > applied). Since that's all the information the connector has to implement any > kind of internal offset tracking, the topic/partitions it can return in > preCommit will correspond to the transformed topic, when the framework > actually expects it to be the original topic. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14000) Kafka-connect standby server shows empty tasks list
[ https://issues.apache.org/jira/browse/KAFKA-14000?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17582346#comment-17582346 ] Sagar Rao commented on KAFKA-14000: --- hi [~xinzou] , Thanks for filing the bug. One thing that I wanted to know was how many partitions does the status topic have in your case? Generally it's recommended to have a single partition for the status topic to maintain the ordering of events. > Kafka-connect standby server shows empty tasks list > --- > > Key: KAFKA-14000 > URL: https://issues.apache.org/jira/browse/KAFKA-14000 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.6.0 >Reporter: Xinyu Zou >Assignee: Sagar Rao >Priority: Major > Attachments: kafka-connect-trace.log > > > I'm using Kafka-connect distributed mode. There're two servers. One active > and one standby. The standby server sometimes shows empty tasks list in > status rest API response. > curl host:8443/connectors/name1/status > {code:java} > { > "connector": { > "state": "RUNNING", > "worker_id": "1.2.3.4:10443" > }, > "name": "name1", > "tasks": [], > "type": "source" > } {code} > I enabled TRACE log and checked. As required, the connect-status topic is set > to cleanup.policy=compact. But messages in the topic won't be compacted > timely. They will be compacted in a specific interval. So usually there're > more than one messages with same key. E.g. When kafka-connect is launched > there's no connector running. And then we start a new connector. Then there > will be two messages in connect-status topic: > status-task-name1 : state=RUNNING, workerId='10.251.170.166:10443', > generation=100 > status-task-name1 : __ > Please check the log file [^kafka-connect-trace.log]. We can see that the > tasks status was removed finally. But actually the empty status was not the > newest message in topic connect-status. > > When reading status from connect-status topic, it doesn't sort messages by > generation. > [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java] > So I think this could be improved. We can either sort the messages after poll > or compare generation value before we choose correct status message. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-13914) Implement kafka-metadata-quorum.sh
[ https://issues.apache.org/jira/browse/KAFKA-13914?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-13914. - Fix Version/s: 3.3.0 Resolution: Fixed > Implement kafka-metadata-quorum.sh > -- > > Key: KAFKA-13914 > URL: https://issues.apache.org/jira/browse/KAFKA-13914 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Assignee: dengziming >Priority: Major > Fix For: 3.3.0 > > > KIP-595 documents a tool for describing quorum status > `kafka-metadata-quorum.sh`: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-595%3A+A+Raft+Protocol+for+the+Metadata+Quorum#KIP595:ARaftProtocolfortheMetadataQuorum-ToolingSupport.] > We need to implement this. > Note that this depends on the Admin API for `DescribeQuorum`, which is > proposed in KIP-836: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-836%3A+Addition+of+Information+in+DescribeQuorumResponse+about+Voter+Lag.] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] dengziming commented on pull request #12469: KAFKA-13914: Add command line tool kafka-metadata-quorum.sh
dengziming commented on PR #12469: URL: https://github.com/apache/kafka/pull/12469#issuecomment-1221354360 Thanks for your patience @hachikuji -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14091) Suddenly-killed tasks can leave hanging transactions open
[ https://issues.apache.org/jira/browse/KAFKA-14091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton updated KAFKA-14091: -- Description: Right now, if a task running with exactly-once support is killed ungracefully, it may leave a hanging transaction open. If the transaction included writes to the global offsets topic, then startup for future workers becomes blocked on that transaction expiring. Ideally, we could identify these kinds of hanging transactions and proactively abort them. Unfortunately, there are a few facts that make this fairly complicated: # Workers read to the end of the offsets topic during startup, before joining the cluster # Workers do not know which tasks they are assigned until they join the cluster The result of these facts is that we cannot trust workers that are restarted shortly after being ungracefully shut down to fence out their own hanging transactions, since any hanging transactions would prevent them from being able to join the group and receive their task assignment in the first place. We could possibly accomplish this by having the leader proactively abort any open transactions for tasks on workers that appear to have left the cluster during a rebalance. This would not require us to wait for the scheduled rebalance delay to elapse, since the intent of the delay is to provide a buffer between when workers leave and when their connectors/tasks are reallocated across the cluster (and, if the worker is able to rejoin before that buffer is consumed, then give it back the same connectors/tasks it was running previously); aborting transactions for tasks on these workers would not interfere with that goal. It's also possible that we may have to handle the case where a [cancelled|https://github.com/apache/kafka/blob/badfbacdd09a9ee8821847f4b28d98625f354ed7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L274-L287] task leaves a transaction open; I have yet to confirm whether this is possible, though. was: Right now, if a task running with exactly-once support is killed ungracefully, it may leave a hanging transaction open. If the transaction included writes to the offsets topic, then startup for future workers becomes blocked on that transaction expiring. Ideally, we could identify these kinds of hanging transactions and proactively abort them. Unfortunately, there are a few facts that make this fairly complicated: # Workers read to the end of the offsets topic during startup, before joining the cluster # Workers do not know which tasks they are assigned until they join the cluster The result of these facts is that we cannot trust workers that are restarted shortly after being ungracefully shut down to fence out their own hanging transactions, since any hanging transactions would prevent them from being able to join the group and receive their task assignment in the first place. We could possibly accomplish this by having the leader proactively abort any open transactions for tasks on workers that appear to have left the cluster during a rebalance. This would not require us to wait for the scheduled rebalance delay to elapse, since the intent of the delay is to provide a buffer between when workers leave and when their connectors/tasks are reallocated across the cluster (and, if the worker is able to rejoin before that buffer is consumed, then give it back the same connectors/tasks it was running previously); aborting transactions for tasks on these workers would not interfere with that goal. It's also possible that we may have to handle the case where a [cancelled|https://github.com/apache/kafka/blob/badfbacdd09a9ee8821847f4b28d98625f354ed7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L274-L287] task leaves a transaction open; I have yet to confirm whether this is possible, though. > Suddenly-killed tasks can leave hanging transactions open > - > > Key: KAFKA-14091 > URL: https://issues.apache.org/jira/browse/KAFKA-14091 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Chris Egerton >Assignee: Sagar Rao >Priority: Major > > Right now, if a task running with exactly-once support is killed > ungracefully, it may leave a hanging transaction open. If the transaction > included writes to the global offsets topic, then startup for future workers > becomes blocked on that transaction expiring. > Ideally, we could identify these kinds of hanging transactions and > proactively abort them. > Unfortunately, there are a few facts that make this fairly complicated: > # Workers read to the end of the offsets topic during startup, before > joining the cluster > # Workers d
[GitHub] [kafka] hachikuji merged pull request #12469: KAFKA-13914: Add command line tool kafka-metadata-quorum.sh
hachikuji merged PR #12469: URL: https://github.com/apache/kafka/pull/12469 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #12544: KAFKA-14098: Add meaningful default client IDs for Connect workers
C0urante commented on code in PR #12544: URL: https://github.com/apache/kafka/pull/12544#discussion_r950708756 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ## @@ -983,7 +983,7 @@ private List sinkTaskReporters(ConnectorTaskId id, SinkConnectorC if (topic != null && !topic.isEmpty()) { Map producerProps = baseProducerConfigs(id.connector(), "connector-dlq-producer-" + id, config, connConfig, connectorClass, connectorClientConfigOverridePolicy, kafkaClusterId); -Map adminProps = adminConfigs(id.connector(), "connector-dlq-adminclient-", config, connConfig, connectorClass, connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SINK); +Map adminProps = adminConfigs(id.connector(), "connector-dlq-adminclient-" + id, config, connConfig, connectorClass, connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SINK); Review Comment: Small unrelated fix spotted while I was in the neighborhood. I checked the rest of this file for similar bugs; this appears to be the only one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #12544: KAFKA-14098: Add meaningful default client IDs for Connect workers
C0urante commented on PR #12544: URL: https://github.com/apache/kafka/pull/12544#issuecomment-1221326257 @mimaison if you have a moment, would you mind giving this a pass? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante opened a new pull request, #12544: KAFKA-14098: Add meaningful default client IDs for Connect workers
C0urante opened a new pull request, #12544: URL: https://github.com/apache/kafka/pull/12544 [Jira](https://issues.apache.org/jira/browse/KAFKA-14098) Adds the client ID `connect-cluster-${groupId}-${topic}` to the producers and consumers used by Connect workers to interact with internal topics, where `${groupId}` is the `group.id` in the worker config file, and `${topic}` is one of `configs`, `statuses`, or `offsets`. Since a single shared admin client is used across all three internal topics, it is given a client ID of `connect-cluster-${groupId}`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #12536: KAFKA-14160: Streamline clusterId retrieval in Connect
C0urante commented on code in PR #12536: URL: https://github.com/apache/kafka/pull/12536#discussion_r950692858 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java: ## @@ -409,6 +412,13 @@ public String groupId() { return null; } +public String kafkaClusterId() { +if (kafkaClusterId == null) { +kafkaClusterId = ConnectUtils.lookupKafkaClusterId(this); +} +return kafkaClusterId; +} Review Comment: Should we move the `lookupKafkaClusterId` logic from `ConnectUtils` to this class, so that nobody accidentally starts using the variant that creates and discards an admin client once per call? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] songnon commented on a diff in pull request #12543: KAFKA-10149: Allow auto preferred leader election for partitions are not being reassigned
songnon commented on code in PR #12543: URL: https://github.com/apache/kafka/pull/12543#discussion_r950692749 ## core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala: ## @@ -490,6 +490,41 @@ class ControllerIntegrationTest extends QuorumTestHarness { "failed to get expected partition state upon broker startup") } + @Test + def testAutoPreferredReplicaLeaderElectionWithReassigningPartitions(): Unit = { +servers = makeServers(3, autoLeaderRebalanceEnable = true) +val controllerId = TestUtils.waitUntilControllerElected(zkClient) +val leaderBrokerId = servers.map(_.config.brokerId).filter(_ != controllerId).head +val otherBrokerId = servers.map(_.config.brokerId).filter(e => e != controllerId && e != leaderBrokerId).head +val tp = new TopicPartition("t", 0) +val assignment = Map(tp.partition -> Seq(leaderBrokerId, controllerId)) +TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers) +val reassigningTp = new TopicPartition("reassigning", 0) +val reassigningTpAssignment = Map(reassigningTp.partition -> Seq(controllerId)) + +TestUtils.createTopic(zkClient, reassigningTp.topic, partitionReplicaAssignment = reassigningTpAssignment, servers = servers) +servers(leaderBrokerId).shutdown() +servers(leaderBrokerId).awaitShutdown() + +servers(otherBrokerId).shutdown() +servers(otherBrokerId).awaitShutdown() +waitForPartitionState(tp, firstControllerEpoch, controllerId, LeaderAndIsr.InitialLeaderEpoch + 1, + "failed to get expected partition state upon broker shutdown") + +val reassignment = Map(reassigningTp -> ReplicaAssignment(Seq(otherBrokerId), List(), List())) +zkClient.createPartitionReassignment(reassignment.map { case (k, v) => k -> v.replicas }) +waitForPartitionState(reassigningTp, firstControllerEpoch, controllerId, LeaderAndIsr.InitialLeaderEpoch + 1, + "failed to get expected partition state during partition reassignment with offline replica") + +servers(leaderBrokerId).startup() +waitForPartitionState(tp, firstControllerEpoch, leaderBrokerId, LeaderAndIsr.InitialLeaderEpoch + 2, + "failed to get expected partition state upon broker startup") + +servers(otherBrokerId).startup() +waitForPartitionState(reassigningTp, firstControllerEpoch, otherBrokerId, LeaderAndIsr.InitialLeaderEpoch + 4, Review Comment: Yes, it is. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14012) passing a "method" into the `Utils.closeQuietly` method cause NPE
[ https://issues.apache.org/jira/browse/KAFKA-14012?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton updated KAFKA-14012: -- Fix Version/s: 3.4.0 > passing a "method" into the `Utils.closeQuietly` method cause NPE > - > > Key: KAFKA-14012 > URL: https://issues.apache.org/jira/browse/KAFKA-14012 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 3.2.0 >Reporter: Luke Chen >Assignee: Sagar Rao >Priority: Major > Fix For: 3.4.0 > > > Utils.closeQuietly method accepts `AutoCloseable` object, and close it. But > there are some places we passed "method" into Utils.closeQuietly, which > causes the object doesn't get closed as expected. > I found it appeared in: > - WorkerConnector > - AbstractWorkerSourceTask > - KafkaConfigBackingStore > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14007) Connect header converters are never closed
[ https://issues.apache.org/jira/browse/KAFKA-14007?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton updated KAFKA-14007: -- Fix Version/s: 3.4.0 > Connect header converters are never closed > -- > > Key: KAFKA-14007 > URL: https://issues.apache.org/jira/browse/KAFKA-14007 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Chris Egerton >Assignee: Sagar Rao >Priority: Major > Fix For: 3.4.0 > > > The [HeaderConverter > interface|https://github.com/apache/kafka/blob/1e21201ea24389bdaccb8a462f3a53e356b58a58/connect/api/src/main/java/org/apache/kafka/connect/storage/HeaderConverter.java#L27] > extends {{Closeable}}, but {{HeaderConverter::close}} is never actually > invoked anywhere. We can and should start invoking it, probably wrapped in > [Utils::closeQuietly|https://github.com/apache/kafka/blob/1e21201ea24389bdaccb8a462f3a53e356b58a58/clients/src/main/java/org/apache/kafka/common/utils/Utils.java#L999-L1010] > so that any invalid logic in that method for custom header converters that > has to date gone undetected will not cause new task failures. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14000) Kafka-connect standby server shows empty tasks list
[ https://issues.apache.org/jira/browse/KAFKA-14000?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sagar Rao reassigned KAFKA-14000: - Assignee: Sagar Rao > Kafka-connect standby server shows empty tasks list > --- > > Key: KAFKA-14000 > URL: https://issues.apache.org/jira/browse/KAFKA-14000 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.6.0 >Reporter: Xinyu Zou >Assignee: Sagar Rao >Priority: Major > Attachments: kafka-connect-trace.log > > > I'm using Kafka-connect distributed mode. There're two servers. One active > and one standby. The standby server sometimes shows empty tasks list in > status rest API response. > curl host:8443/connectors/name1/status > {code:java} > { > "connector": { > "state": "RUNNING", > "worker_id": "1.2.3.4:10443" > }, > "name": "name1", > "tasks": [], > "type": "source" > } {code} > I enabled TRACE log and checked. As required, the connect-status topic is set > to cleanup.policy=compact. But messages in the topic won't be compacted > timely. They will be compacted in a specific interval. So usually there're > more than one messages with same key. E.g. When kafka-connect is launched > there's no connector running. And then we start a new connector. Then there > will be two messages in connect-status topic: > status-task-name1 : state=RUNNING, workerId='10.251.170.166:10443', > generation=100 > status-task-name1 : __ > Please check the log file [^kafka-connect-trace.log]. We can see that the > tasks status was removed finally. But actually the empty status was not the > newest message in topic connect-status. > > When reading status from connect-status topic, it doesn't sort messages by > generation. > [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java] > So I think this could be improved. We can either sort the messages after poll > or compare generation value before we choose correct status message. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14015) ConfigProvider with ttl fails to restart tasks
[ https://issues.apache.org/jira/browse/KAFKA-14015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sagar Rao reassigned KAFKA-14015: - Assignee: Sagar Rao > ConfigProvider with ttl fails to restart tasks > -- > > Key: KAFKA-14015 > URL: https://issues.apache.org/jira/browse/KAFKA-14015 > Project: Kafka > Issue Type: Bug >Reporter: Ross Lawley >Assignee: Sagar Rao >Priority: Major > > According to the > [KIP-297|https://cwiki.apache.org/confluence/display/KAFKA/KIP-297%3A+Externalizing+Secrets+for+Connect+Configurations#KIP297:ExternalizingSecretsforConnectConfigurations-SecretRotation]: > {quote} * When the Herder receives the onChange() call, it will check a new > connector configuration property config.reload.action which can be one of the > following: > ** The value restart, which means to schedule a restart of the Connector and > all its Tasks. This will be the default. > ** The value none, which means to do nothing.{quote} > However, the > [restartConnector|https://github.com/apache/kafka/blob/3.2.0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java#L287-L294] > method only restarts the connector and does not restart any tasks. Suggest > calling {{restartConnectorAndTasks}} instead. > The result is changed configurations provided by the ConfigProvider are not > picked up and existing tasks continue to use outdated configuration. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-14012) passing a "method" into the `Utils.closeQuietly` method cause NPE
[ https://issues.apache.org/jira/browse/KAFKA-14012?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sagar Rao resolved KAFKA-14012. --- Resolution: Fixed > passing a "method" into the `Utils.closeQuietly` method cause NPE > - > > Key: KAFKA-14012 > URL: https://issues.apache.org/jira/browse/KAFKA-14012 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 3.2.0 >Reporter: Luke Chen >Assignee: Sagar Rao >Priority: Major > > Utils.closeQuietly method accepts `AutoCloseable` object, and close it. But > there are some places we passed "method" into Utils.closeQuietly, which > causes the object doesn't get closed as expected. > I found it appeared in: > - WorkerConnector > - AbstractWorkerSourceTask > - KafkaConfigBackingStore > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-14007) Connect header converters are never closed
[ https://issues.apache.org/jira/browse/KAFKA-14007?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sagar Rao resolved KAFKA-14007. --- Resolution: Fixed > Connect header converters are never closed > -- > > Key: KAFKA-14007 > URL: https://issues.apache.org/jira/browse/KAFKA-14007 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Chris Egerton >Assignee: Sagar Rao >Priority: Major > > The [HeaderConverter > interface|https://github.com/apache/kafka/blob/1e21201ea24389bdaccb8a462f3a53e356b58a58/connect/api/src/main/java/org/apache/kafka/connect/storage/HeaderConverter.java#L27] > extends {{Closeable}}, but {{HeaderConverter::close}} is never actually > invoked anywhere. We can and should start invoking it, probably wrapped in > [Utils::closeQuietly|https://github.com/apache/kafka/blob/1e21201ea24389bdaccb8a462f3a53e356b58a58/clients/src/main/java/org/apache/kafka/common/utils/Utils.java#L999-L1010] > so that any invalid logic in that method for custom header converters that > has to date gone undetected will not cause new task failures. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] vamossagar12 commented on pull request #12485: KAFKA-14131: Adding InterruptException when reading to end of Offseto…
vamossagar12 commented on PR #12485: URL: https://github.com/apache/kafka/pull/12485#issuecomment-1221262360 Thanks @C0urante . I think those are great points and I now realise the mistake I made with the PR. Here's what I am thinking and the answers to the rest of your points => 1) I agree for the most part that `InterruptedException` may or may not be thrown and it's pretty much unreliable. Having said that, if it does occur i.e the thread in `herderExecutor` does get interrupted, the backing offset kafka consumers would definitely get into an infinite loop. IMO, that's something that needs to be handled. That's where the original approach in the PR is wrong as it's catching the `InterruptedException` in `consumer.poll` which may never even see it. Instead, I am thinking to add it in the `DistributedHerder.stop` method. I see it catches `InterruptedException` but doesn't do anything about it. What I think that can be done is that once the `herderExecutor` thread is interrupted, we can look to force close the Kafka consumers. This can be done by invoking `stopServices`. At this point, some of the consumers might be closed and some of them might not be, and here we can leverage the `stopRequested` flag i.e if this flag is set, then for all practical purpo ses the log should get closed. So, we can add that check in `KafkaConsumer.stop`. Let me know if this is making sense. - Does standalone mode need to be taken into account as well? Yeah I think it can have a similar issue so probably that can also be accounted for. - Will any logic that we add in DistributedHerder::stop conflict with the logic contained in halt, which will presumably be called once the herder finishes/aborts startup? That's a great point.. It could happen that `stopServices` might be invoked from both halt and from `InterruptedException`. So, what we would need to ensure is, that calling the `stop` method multiple times should be safe. The check that I described using `stopRequested` might be helpful. WDYT? - Will anything we add to DistributedHerder::stop run the risk of blocking indefinitely? If so, what are the potential ramifications? The approach I described, maybe not. Would love to hear your thoughts on this though. - None of this should result in any error messages being logged unless an operation has actually failed (instead of just being terminated prematurely) +1 Regarding the last 3 points, I am not sure at this moment. Maybe if this approach sounds ok, we can get these answered. - Can we ensure that the worker is able to gracefully leave the cluster if it's been able to join? - Can we ensure that the worker doesn't try to leave the cluster (or at the very least, cause any issues by trying to leave the cluster) if it hasn't been able to join? - If we're able to successfully abort one startup operation (such as reading to the end of the offsets topic), can we make sure that we don't even attempt any following startup operations (such as reading to the end of the status topic, which currently takes place after reading to the end of the offsets topic) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #12501: KAFKA-14097: Separate configuration for producer ID expiry
dajac commented on code in PR #12501: URL: https://github.com/apache/kafka/pull/12501#discussion_r950665710 ## core/src/main/scala/kafka/server/KafkaConfig.scala: ## @@ -1295,7 +1298,8 @@ object KafkaConfig { .define(TransactionsAbortTimedOutTransactionCleanupIntervalMsProp, INT, Defaults.TransactionsAbortTimedOutTransactionsCleanupIntervalMS, atLeast(1), LOW, TransactionsAbortTimedOutTransactionsIntervalMsDoc) .define(TransactionsRemoveExpiredTransactionalIdCleanupIntervalMsProp, INT, Defaults.TransactionsRemoveExpiredTransactionsCleanupIntervalMS, atLeast(1), LOW, TransactionsRemoveExpiredTransactionsIntervalMsDoc) - .define(ProducerIdExpirationMsProp, INT, Defaults.ProducerIdExpirationMs, atLeast(1), HIGH, ProducerIdExpirationMsDoc) + .define(ProducerIdExpirationMsProp, INT, Defaults.ProducerIdExpirationMs, atLeast(1), LOW, ProducerIdExpirationMsDoc) + .defineInternal(ProducerIdExpirationCheckIntervalMsProp, INT, Defaults.ProducerIdExpirationCheckIntervalMs, atLeast(1), LOW, ProducerIdExpirationMsDoc) Review Comment: Right. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org