[kafka] branch 3.5 updated (9ad90954654 -> 37cab476a50)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a change to branch 3.5 in repository https://gitbox.apache.org/repos/asf/kafka.git from 9ad90954654 HOTFIX: fix broken Streams upgrade system test (#13654) add 37cab476a50 KAFKA-14974: Restore backward compatibility in KafkaBasedLog (#13688) No new revisions were added by this update. Summary of changes: .../connect/storage/KafkaConfigBackingStore.java | 4 +-- .../apache/kafka/connect/util/KafkaBasedLog.java | 37 +++--- .../storage/KafkaConfigBackingStoreTest.java | 10 +++--- 3 files changed, 40 insertions(+), 11 deletions(-)
[kafka] branch 3.3 updated: KAFKA-14974: Restore backward compatibility in KafkaBasedLog - fix unit tests
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 3.3 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.3 by this push: new 811fcac9f10 KAFKA-14974: Restore backward compatibility in KafkaBasedLog - fix unit tests 811fcac9f10 is described below commit 811fcac9f1044540fe7dfeb07b762fc1638723b7 Author: Randall Hauch AuthorDate: Tue May 9 12:44:12 2023 -0500 KAFKA-14974: Restore backward compatibility in KafkaBasedLog - fix unit tests Fix the unit tests that broke when cherry-picking the change to the 3.3 branch. --- .../connect/storage/KafkaOffsetBackingStoreTest.java | 20 +--- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java index 701351cb676..cf11230f3d2 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java @@ -193,9 +193,11 @@ public class KafkaOffsetBackingStoreTest { // Set offsets Capture callback0 = EasyMock.newCapture(); - EasyMock.expect(storeLog.sendWithReceipt(EasyMock.aryEq(TP0_KEY.array()), EasyMock.aryEq(TP0_VALUE.array()), EasyMock.capture(callback0))).andReturn(null); +storeLog.send(EasyMock.aryEq(TP0_KEY.array()), EasyMock.aryEq(TP0_VALUE.array()), EasyMock.capture(callback0)); +PowerMock.expectLastCall(); Capture callback1 = EasyMock.newCapture(); - EasyMock.expect(storeLog.sendWithReceipt(EasyMock.aryEq(TP1_KEY.array()), EasyMock.aryEq(TP1_VALUE.array()), EasyMock.capture(callback1))).andReturn(null); +storeLog.send(EasyMock.aryEq(TP1_KEY.array()), EasyMock.aryEq(TP1_VALUE.array()), EasyMock.capture(callback1)); +PowerMock.expectLastCall(); // Second get() should get the produced data and return the new values final Capture> secondGetReadToEndCallback = EasyMock.newCapture(); @@ -274,9 +276,10 @@ public class KafkaOffsetBackingStoreTest { // Set offsets Capture callback0 = EasyMock.newCapture(); - EasyMock.expect(storeLog.sendWithReceipt(EasyMock.isNull(byte[].class), EasyMock.aryEq(TP0_VALUE.array()), EasyMock.capture(callback0))).andReturn(null); +storeLog.send(EasyMock.isNull(byte[].class), EasyMock.aryEq(TP0_VALUE.array()), EasyMock.capture(callback0)); +PowerMock.expectLastCall(); Capture callback1 = EasyMock.newCapture(); - EasyMock.expect(storeLog.sendWithReceipt(EasyMock.aryEq(TP1_KEY.array()), EasyMock.isNull(byte[].class), EasyMock.capture(callback1))).andReturn(null); +storeLog.send(EasyMock.aryEq(TP1_KEY.array()), EasyMock.isNull(byte[].class), EasyMock.capture(callback1)); PowerMock.expectLastCall(); // Second get() should get the produced data and return the new values @@ -334,11 +337,14 @@ public class KafkaOffsetBackingStoreTest { // Set offsets Capture callback0 = EasyMock.newCapture(); - EasyMock.expect(storeLog.sendWithReceipt(EasyMock.aryEq(TP0_KEY.array()), EasyMock.aryEq(TP0_VALUE.array()), EasyMock.capture(callback0))).andReturn(null); +storeLog.send(EasyMock.aryEq(TP0_KEY.array()), EasyMock.aryEq(TP0_VALUE.array()), EasyMock.capture(callback0)); +PowerMock.expectLastCall(); Capture callback1 = EasyMock.newCapture(); - EasyMock.expect(storeLog.sendWithReceipt(EasyMock.aryEq(TP1_KEY.array()), EasyMock.aryEq(TP1_VALUE.array()), EasyMock.capture(callback1))).andReturn(null); +storeLog.send(EasyMock.aryEq(TP1_KEY.array()), EasyMock.aryEq(TP1_VALUE.array()), EasyMock.capture(callback1)); +PowerMock.expectLastCall(); Capture callback2 = EasyMock.newCapture(); - EasyMock.expect(storeLog.sendWithReceipt(EasyMock.aryEq(TP2_KEY.array()), EasyMock.aryEq(TP2_VALUE.array()), EasyMock.capture(callback2))).andReturn(null); +storeLog.send(EasyMock.aryEq(TP2_KEY.array()), EasyMock.aryEq(TP2_VALUE.array()), EasyMock.capture(callback2)); +PowerMock.expectLastCall(); expectClusterId();
[kafka] branch 3.3 updated: KAFKA-14974: Restore backward compatibility in KafkaBasedLog (#13688)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 3.3 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.3 by this push: new 9c3ca5afe2d KAFKA-14974: Restore backward compatibility in KafkaBasedLog (#13688) 9c3ca5afe2d is described below commit 9c3ca5afe2dbcef16d1d30caa436369d9fd9db6a Author: Yash Mayya AuthorDate: Tue May 9 17:58:45 2023 +0530 KAFKA-14974: Restore backward compatibility in KafkaBasedLog (#13688) `KafkaBasedLog` is a widely used utility class that provides a generic implementation of a shared, compacted log of records in a Kafka topic. It isn't in Connect's public API, but has been used outside of Connect and we try to preserve backward compatibility whenever possible. KAFKA-14455 modified the two overloaded void `KafkaBasedLog::send` methods to return a `Future`. While this change is source compatible, it isn't binary compatible. We can restore backward compatibility simply b [...] This refactoring changes no functionality other than restoring the older methods. Reviewers: Randall Hauch --- .../connect/storage/KafkaConfigBackingStore.java | 4 +-- .../apache/kafka/connect/util/KafkaBasedLog.java | 37 +++--- .../storage/KafkaConfigBackingStoreTest.java | 10 +++--- .../storage/KafkaOffsetBackingStoreTest.java | 14 4 files changed, 47 insertions(+), 18 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java index ac1f4ad7829..8f3b38c2d26 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java @@ -631,7 +631,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore { byte[] serializedTargetState = converter.fromConnectData(topic, TARGET_STATE_V0, connectTargetState); log.debug("Writing target state {} for connector {}", state, connector); try { -configLog.send(TARGET_STATE_KEY(connector), serializedTargetState).get(READ_WRITE_TOTAL_TIMEOUT_MS, TimeUnit.MILLISECONDS); +configLog.sendWithReceipt(TARGET_STATE_KEY(connector), serializedTargetState).get(READ_WRITE_TOTAL_TIMEOUT_MS, TimeUnit.MILLISECONDS); } catch (InterruptedException | ExecutionException | TimeoutException e) { log.error("Failed to write target state to Kafka", e); throw new ConnectException("Error writing target state to Kafka", e); @@ -780,7 +780,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore { if (!usesFencableWriter) { List> producerFutures = new ArrayList<>(); keyValues.forEach( -keyValue -> producerFutures.add(configLog.send(keyValue.key, keyValue.value)) +keyValue -> producerFutures.add(configLog.sendWithReceipt(keyValue.key, keyValue.value)) ); timer.update(); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java index 431ae871ce9..a05a05dffe7 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java @@ -77,6 +77,10 @@ import java.util.function.Supplier; * calling class keeps track of state based on the log and only writes to it when consume callbacks are invoked * and only reads it in {@link #readToEnd(Callback)} callbacks then no additional synchronization will be required. * + * + * This is a useful utility that has been used outside of Connect. This isn't in Connect's public API, + * but we've tried to maintain the method signatures and backward compatibility since early Kafka versions. + * */ public class KafkaBasedLog { private static final Logger log = LoggerFactory.getLogger(KafkaBasedLog.class); @@ -351,6 +355,31 @@ public class KafkaBasedLog { return future; } +/** + * Send a record asynchronously to the configured {@link #topic} without using a producer callback. + * + * This method exists for backward compatibility reasons and delegates to the newer + * {@link #sendWithReceipt(Object, Object)} method that returns a future. + * @param key the key for the {@link ProducerRecord} + * @param value the value for the {@link ProducerRecord} + */ +public void send(K key, V value) { +sendWithReceipt(key, value); +} + +/** + * Send a record asynchronously to the configured {@link
[kafka] branch 3.4 updated: KAFKA-14974: Restore backward compatibility in KafkaBasedLog (#13688)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 3.4 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.4 by this push: new 721a917b444 KAFKA-14974: Restore backward compatibility in KafkaBasedLog (#13688) 721a917b444 is described below commit 721a917b444fb7d06790fe40c4a21c70d7790c97 Author: Yash Mayya AuthorDate: Tue May 9 17:58:45 2023 +0530 KAFKA-14974: Restore backward compatibility in KafkaBasedLog (#13688) `KafkaBasedLog` is a widely used utility class that provides a generic implementation of a shared, compacted log of records in a Kafka topic. It isn't in Connect's public API, but has been used outside of Connect and we try to preserve backward compatibility whenever possible. KAFKA-14455 modified the two overloaded void `KafkaBasedLog::send` methods to return a `Future`. While this change is source compatible, it isn't binary compatible. We can restore backward compatibility simply b [...] This refactoring changes no functionality other than restoring the older methods. Reviewers: Randall Hauch --- .../connect/storage/KafkaConfigBackingStore.java | 4 +-- .../apache/kafka/connect/util/KafkaBasedLog.java | 37 +++--- .../storage/KafkaConfigBackingStoreTest.java | 10 +++--- 3 files changed, 40 insertions(+), 11 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java index e9f87697308..b5804697600 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java @@ -637,7 +637,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore { byte[] serializedTargetState = converter.fromConnectData(topic, TARGET_STATE_V0, connectTargetState); log.debug("Writing target state {} for connector {}", state, connector); try { -configLog.send(TARGET_STATE_KEY(connector), serializedTargetState).get(READ_WRITE_TOTAL_TIMEOUT_MS, TimeUnit.MILLISECONDS); +configLog.sendWithReceipt(TARGET_STATE_KEY(connector), serializedTargetState).get(READ_WRITE_TOTAL_TIMEOUT_MS, TimeUnit.MILLISECONDS); } catch (InterruptedException | ExecutionException | TimeoutException e) { log.error("Failed to write target state to Kafka", e); throw new ConnectException("Error writing target state to Kafka", e); @@ -789,7 +789,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore { if (!usesFencableWriter) { List> producerFutures = new ArrayList<>(); keyValues.forEach( -keyValue -> producerFutures.add(configLog.send(keyValue.key, keyValue.value)) +keyValue -> producerFutures.add(configLog.sendWithReceipt(keyValue.key, keyValue.value)) ); timer.update(); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java index 431ae871ce9..a05a05dffe7 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java @@ -77,6 +77,10 @@ import java.util.function.Supplier; * calling class keeps track of state based on the log and only writes to it when consume callbacks are invoked * and only reads it in {@link #readToEnd(Callback)} callbacks then no additional synchronization will be required. * + * + * This is a useful utility that has been used outside of Connect. This isn't in Connect's public API, + * but we've tried to maintain the method signatures and backward compatibility since early Kafka versions. + * */ public class KafkaBasedLog { private static final Logger log = LoggerFactory.getLogger(KafkaBasedLog.class); @@ -351,6 +355,31 @@ public class KafkaBasedLog { return future; } +/** + * Send a record asynchronously to the configured {@link #topic} without using a producer callback. + * + * This method exists for backward compatibility reasons and delegates to the newer + * {@link #sendWithReceipt(Object, Object)} method that returns a future. + * @param key the key for the {@link ProducerRecord} + * @param value the value for the {@link ProducerRecord} + */ +public void send(K key, V value) { +sendWithReceipt(key, value); +} + +/** + * Send a record asynchronously to the configured {@link #topic}. + * + * This method exists for backward comp
[kafka] branch trunk updated (b40a7fc037b -> 59ba9dbbc92)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from b40a7fc037b HOTFIX: fix broken Streams upgrade system test (#13654) add 59ba9dbbc92 KAFKA-14974: Restore backward compatibility in KafkaBasedLog (#13688) No new revisions were added by this update. Summary of changes: .../connect/storage/KafkaConfigBackingStore.java | 4 +-- .../apache/kafka/connect/util/KafkaBasedLog.java | 37 +++--- .../storage/KafkaConfigBackingStoreTest.java | 10 +++--- 3 files changed, 40 insertions(+), 11 deletions(-)
[kafka] branch 3.3 updated: MINOR: Migrate connect system tests to KRaft (#12621)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 3.3 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.3 by this push: new e3f89542471 MINOR: Migrate connect system tests to KRaft (#12621) e3f89542471 is described below commit e3f89542471919fbe80b1f4109eea0059f766fda Author: srishti-saraswat <98515593+srishti-saras...@users.noreply.github.com> AuthorDate: Thu Oct 27 21:49:14 2022 +0530 MINOR: Migrate connect system tests to KRaft (#12621) Adds the `metadata_quorum` parameter to the `@matrix(...)` annotation to many existing tests, so that they are run with both zookeeper and remote_kraft nodes. Reviewers: Randall Hauch , Greg Harris --- .../tests/connect/connect_distributed_test.py | 51 +++--- tests/kafkatest/tests/connect/connect_rest_test.py | 5 ++- tests/kafkatest/tests/connect/connect_test.py | 11 +++-- 3 files changed, 36 insertions(+), 31 deletions(-) diff --git a/tests/kafkatest/tests/connect/connect_distributed_test.py b/tests/kafkatest/tests/connect/connect_distributed_test.py index 8347afc8d66..3854573fc5b 100644 --- a/tests/kafkatest/tests/connect/connect_distributed_test.py +++ b/tests/kafkatest/tests/connect/connect_distributed_test.py @@ -20,7 +20,7 @@ from ducktape.mark import matrix, parametrize from ducktape.cluster.remoteaccount import RemoteCommandError from kafkatest.services.zookeeper import ZookeeperService -from kafkatest.services.kafka import KafkaService, config_property +from kafkatest.services.kafka import KafkaService, config_property, quorum from kafkatest.services.connect import ConnectDistributedService, VerifiableSource, VerifiableSink, ConnectRestError, MockSink, MockSource from kafkatest.services.console_consumer import ConsoleConsumer from kafkatest.services.security.security_config import SecurityConfig @@ -75,7 +75,7 @@ class ConnectDistributedTest(Test): self.TOPIC: {'partitions': 1, 'replication-factor': 1} } -self.zk = ZookeeperService(test_context, self.num_zk) +self.zk = ZookeeperService(test_context, self.num_zk) if quorum.for_test(test_context) == quorum.zk else None self.key_converter = "org.apache.kafka.connect.json.JsonConverter" self.value_converter = "org.apache.kafka.connect.json.JsonConverter" @@ -98,7 +98,8 @@ class ConnectDistributedTest(Test): include_filestream_connectors=include_filestream_connectors) self.cc.log_level = "DEBUG" -self.zk.start() +if self.zk: +self.zk.start() self.kafka.start() def _start_connector(self, config_file): @@ -164,8 +165,8 @@ class ConnectDistributedTest(Test): return self._task_has_state(task_id, status, 'RUNNING') @cluster(num_nodes=5) -@matrix(exactly_once_source=[True, False], connect_protocol=['sessioned', 'compatible', 'eager']) -def test_restart_failed_connector(self, exactly_once_source, connect_protocol): +@matrix(exactly_once_source=[True, False], connect_protocol=['sessioned', 'compatible', 'eager'], metadata_quorum=quorum.all_non_upgrade) +def test_restart_failed_connector(self, exactly_once_source, connect_protocol, metadata_quorum): self.EXACTLY_ONCE_SOURCE_SUPPORT = 'enabled' if exactly_once_source else 'disabled' self.CONNECT_PROTOCOL = connect_protocol self.setup_services() @@ -187,8 +188,8 @@ class ConnectDistributedTest(Test): err_msg="Failed to see connector transition to the RUNNING state") @cluster(num_nodes=5) -@matrix(connector_type=['source', 'exactly-once source', 'sink'], connect_protocol=['sessioned', 'compatible', 'eager']) -def test_restart_failed_task(self, connector_type, connect_protocol): +@matrix(connector_type=['source', 'exactly-once source', 'sink'], connect_protocol=['sessioned', 'compatible', 'eager'], metadata_quorum=quorum.all_non_upgrade) +def test_restart_failed_task(self, connector_type, connect_protocol, metadata_quorum): self.EXACTLY_ONCE_SOURCE_SUPPORT = 'enabled' if connector_type == 'exactly-once source' else 'disabled' self.CONNECT_PROTOCOL = connect_protocol self.setup_services() @@ -213,8 +214,8 @@ class ConnectDistributedTest(Test): err_msg="Failed to see task transition to the RUNNING state") @cluster(num_nodes=5) -@matrix(connect_protocol=['sessioned', 'compatible', 'eager']) -def test_restart_connector_and_tasks_failed_connector(self, connect_protocol): +@matrix(connect_protocol=['sessioned', 'compatible', 'eager'], metadata_quorum=quorum.all_non_upgrade) +def test_restart_connector_and_tasks_failed_connector(self, connect_protocol, metadata_quorum): self.CONNECT_
[kafka] branch trunk updated (47adb866369 -> 57aefa9c82d)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from 47adb866369 KAFKA-10149: Allow auto preferred leader election when there are ongoing partition reassignments (#12543) add 57aefa9c82d MINOR: Migrate connect system tests to KRaft (#12621) No new revisions were added by this update. Summary of changes: .../tests/connect/connect_distributed_test.py | 51 +++--- tests/kafkatest/tests/connect/connect_rest_test.py | 5 ++- tests/kafkatest/tests/connect/connect_test.py | 11 +++-- 3 files changed, 36 insertions(+), 31 deletions(-)
[kafka] branch 3.3 updated: KAFKA-14079 - Ack failed records in WorkerSourceTask when error tolerance is ALL (#12415)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 3.3 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.3 by this push: new 64ebc76df9 KAFKA-14079 - Ack failed records in WorkerSourceTask when error tolerance is ALL (#12415) 64ebc76df9 is described below commit 64ebc76df96cdc9cea419c0ab09cfa82d0ca743c Author: Christopher L. Shannon AuthorDate: Mon Jul 18 18:07:20 2022 -0400 KAFKA-14079 - Ack failed records in WorkerSourceTask when error tolerance is ALL (#12415) Make sure to ack all records where produce failed, when a connector's `errors.tolerance` config property is set to `all`. Acking is essential so that the task will continue to commit future record offsets properly and remove the records from internal tracking, preventing a memory leak. (cherry picked and slightly modified from commit 63e06aafd0cf37f8488c3830946051b3a30db2a0) Reviewers: Chris Egerton , Randall Hauch --- .../connect/runtime/AbstractWorkerSourceTask.java | 5 + .../kafka/connect/runtime/WorkerSourceTaskTest.java | 21 ++--- 2 files changed, 23 insertions(+), 3 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java index d89f577688..407f5fd828 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java @@ -38,6 +38,7 @@ import org.apache.kafka.connect.header.Header; import org.apache.kafka.connect.header.Headers; import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator; import org.apache.kafka.connect.runtime.errors.Stage; +import org.apache.kafka.connect.runtime.errors.ToleranceType; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTask; import org.apache.kafka.connect.source.SourceTaskContext; @@ -406,6 +407,10 @@ public abstract class AbstractWorkerSourceTask extends WorkerTask { } log.trace("{} Failed record: {}", AbstractWorkerSourceTask.this, preTransformRecord); producerSendFailed(false, producerRecord, preTransformRecord, e); +if (retryWithToleranceOperator.getErrorToleranceType() == ToleranceType.ALL) { +counter.skipRecord(); + submittedRecord.ifPresent(SubmittedRecords.SubmittedRecord::ack); +} } else { counter.completeRecord(); log.trace("{} Wrote record successfully: topic {} partition {} offset {}", diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java index 5ce0e44f3e..2d2cd00cf5 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java @@ -696,24 +696,39 @@ public class WorkerSourceTaskTest extends ThreadedTest { createWorkerTaskWithErrorToleration(); expectTopicCreation(TOPIC); +//Use different offsets for each record so we can verify all were committed +final Map offset2 = Collections.singletonMap("key", 13); + // send two records // record 1 will succeed // record 2 will invoke the producer's failure callback, but ignore the exception via retryOperator // and no ConnectException will be thrown SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); -SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); - +SourceRecord record2 = new SourceRecord(PARTITION, offset2, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); +expectOffsetFlush(true); expectSendRecordOnce(); expectSendRecordProducerCallbackFail(); sourceTask.commitRecord(EasyMock.anyObject(SourceRecord.class), EasyMock.isNull()); -EasyMock.expectLastCall(); + +//As of KAFKA-14079 all offsets should be committed, even for failed records (if ignored) +//Only the last offset will be passed to the method as everything up to that point is committed +//Before KAFKA-14079 offset 12 would have been passed and not 13 as it would have been unacked +offsetWriter.offset(PARTITION, offset2); +PowerMock.expectLastCall();
[kafka] branch trunk updated: KAFKA-14079 - Ack failed records in WorkerSourceTask when error tolerance is ALL (#12415)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 8142822633 KAFKA-14079 - Ack failed records in WorkerSourceTask when error tolerance is ALL (#12415) 8142822633 is described below commit 81428226332005c27870aacfccc813950c84386c Author: Christopher L. Shannon AuthorDate: Mon Jul 18 18:07:20 2022 -0400 KAFKA-14079 - Ack failed records in WorkerSourceTask when error tolerance is ALL (#12415) Make sure to ack all records where produce failed, when a connector's `errors.tolerance` config property is set to `all`. Acking is essential so that the task will continue to commit future record offsets properly and remove the records from internal tracking, preventing a memory leak. (cherry picked and slightly modified from commit 63e06aafd0cf37f8488c3830946051b3a30db2a0) Reviewers: Chris Egerton , Randall Hauch --- .../connect/runtime/AbstractWorkerSourceTask.java | 5 + .../kafka/connect/runtime/WorkerSourceTaskTest.java | 21 ++--- 2 files changed, 23 insertions(+), 3 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java index d89f577688..407f5fd828 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java @@ -38,6 +38,7 @@ import org.apache.kafka.connect.header.Header; import org.apache.kafka.connect.header.Headers; import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator; import org.apache.kafka.connect.runtime.errors.Stage; +import org.apache.kafka.connect.runtime.errors.ToleranceType; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTask; import org.apache.kafka.connect.source.SourceTaskContext; @@ -406,6 +407,10 @@ public abstract class AbstractWorkerSourceTask extends WorkerTask { } log.trace("{} Failed record: {}", AbstractWorkerSourceTask.this, preTransformRecord); producerSendFailed(false, producerRecord, preTransformRecord, e); +if (retryWithToleranceOperator.getErrorToleranceType() == ToleranceType.ALL) { +counter.skipRecord(); + submittedRecord.ifPresent(SubmittedRecords.SubmittedRecord::ack); +} } else { counter.completeRecord(); log.trace("{} Wrote record successfully: topic {} partition {} offset {}", diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java index 5ce0e44f3e..2d2cd00cf5 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java @@ -696,24 +696,39 @@ public class WorkerSourceTaskTest extends ThreadedTest { createWorkerTaskWithErrorToleration(); expectTopicCreation(TOPIC); +//Use different offsets for each record so we can verify all were committed +final Map offset2 = Collections.singletonMap("key", 13); + // send two records // record 1 will succeed // record 2 will invoke the producer's failure callback, but ignore the exception via retryOperator // and no ConnectException will be thrown SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); -SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); - +SourceRecord record2 = new SourceRecord(PARTITION, offset2, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); +expectOffsetFlush(true); expectSendRecordOnce(); expectSendRecordProducerCallbackFail(); sourceTask.commitRecord(EasyMock.anyObject(SourceRecord.class), EasyMock.isNull()); -EasyMock.expectLastCall(); + +//As of KAFKA-14079 all offsets should be committed, even for failed records (if ignored) +//Only the last offset will be passed to the method as everything up to that point is committed +//Before KAFKA-14079 offset 12 would have been passed and not 13 as it would have been unacked +offsetWriter.offset(PARTITION, offset2); +PowerMock.expectLastCall();
[kafka] branch 3.2 updated: KAFKA-14079 - Ack failed records in WorkerSourceTask when error tolerance is ALL (#12412)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 3.2 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.2 by this push: new ca674d9e17 KAFKA-14079 - Ack failed records in WorkerSourceTask when error tolerance is ALL (#12412) ca674d9e17 is described below commit ca674d9e17e249ddc422b54449f847332dc03e97 Author: Christopher L. Shannon AuthorDate: Mon Jul 18 18:06:45 2022 -0400 KAFKA-14079 - Ack failed records in WorkerSourceTask when error tolerance is ALL (#12412) Make sure to ack all records where produce failed, when a connector's `errors.tolerance` config property is set to `all`. Acking is essential so that the task will continue to commit future record offsets properly and remove the records from internal tracking, preventing a memory leak. Reviewers: Chris Egerton , Randall Hauch --- .../kafka/connect/runtime/WorkerSourceTask.java | 4 .../kafka/connect/runtime/WorkerSourceTaskTest.java | 21 ++--- 2 files changed, 22 insertions(+), 3 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java index 9ce2b8dbb8..a3d9b036c2 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java @@ -375,6 +375,10 @@ class WorkerSourceTask extends WorkerTask { // executeFailed here allows the use of existing logging infrastructure/configuration retryWithToleranceOperator.executeFailed(Stage.KAFKA_PRODUCE, WorkerSourceTask.class, preTransformRecord, e); + +//Ack the record so it will be skipped and offsets are committed +submittedRecord.ack(); +counter.skipRecord(); commitTaskRecord(preTransformRecord, null); } else { log.error("{} failed to send record to {}: ", WorkerSourceTask.this, topic, e); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java index 78db83c7ee..41df088c2a 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java @@ -835,25 +835,40 @@ public class WorkerSourceTaskTest extends ThreadedTest { createWorkerTaskWithErrorToleration(); expectTopicCreation(TOPIC); +//Use different offsets for each record so we can verify all were committed +final Map offset2 = Collections.singletonMap("key", 13); + // send two records // record 1 will succeed // record 2 will invoke the producer's failure callback, but ignore the exception via retryOperator // and no ConnectException will be thrown SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); -SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); - +SourceRecord record2 = new SourceRecord(PARTITION, offset2, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); +expectOffsetFlush(true); expectSendRecordOnce(); expectSendRecordProducerCallbackFail(); sourceTask.commitRecord(EasyMock.anyObject(SourceRecord.class), EasyMock.isNull()); -EasyMock.expectLastCall(); + +//As of KAFKA-14079 all offsets should be committed, even for failed records (if ignored) +//Only the last offset will be passed to the method as everything up to that point is committed +//Before KAFKA-14079 offset 12 would have been passed and not 13 as it would have been unacked +offsetWriter.offset(PARTITION, offset2); +PowerMock.expectLastCall(); PowerMock.replayAll(); +//Send records and then commit offsets and verify both were committed and no exception Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2)); Whitebox.invokeMethod(workerTask, "sendRecords"); +Whitebox.invokeMethod(workerTask, "updateCommittableOffsets"); +workerTask.commitOffsets(); PowerMock.verifyAll(); + +//Double check to make sure all submitted records were cleared +assertEquals(0, ((SubmittedRecords) Whitebox.getInternalState(workerTask,
[kafka] branch 3.2 updated: KAFKA-13770: Restore compatibility with KafkaBasedLog using older Kafka brokers (#11946)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 3.2 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.2 by this push: new 3f59718 KAFKA-13770: Restore compatibility with KafkaBasedLog using older Kafka brokers (#11946) 3f59718 is described below commit 3f59718fb9c7246dadfbf70bf042b386d7e9ee4a Author: Randall Hauch AuthorDate: Thu Mar 24 21:40:10 2022 -0500 KAFKA-13770: Restore compatibility with KafkaBasedLog using older Kafka brokers (#11946) The `retryEndOffsets(…)` method in `TopicAdmin` recently added (KAFKA-12879, #11797) to allow the `KafkaBasedLog.start()` method to retry any failures reading the last offsets for a topic. However, this introduce a regression when talking to older brokers (0.10.x or earlier). The `KafkaBasedLog` already had logic that expected an `UnsupportedVersionException` thrown by the admin client when a Kafka API is not available on an older broker, but the new retry logic in `TopicAdmin` did not account for this and wrapped the exception, thereby breaking the `KafkaBasedLog` logic and preventing startup. The fix is to propagate this `UnsupportedVersionException` from the `TopicAdmin.retryEndOffsets(…)` method. Added a new unit test that first replicated the problem before the fix, and verified the fix corrects the problem. --- .../org/apache/kafka/connect/util/TopicAdmin.java | 4 .../apache/kafka/connect/util/TopicAdminTest.java | 25 +- 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java index faf7b37..d97533a 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java @@ -714,6 +714,7 @@ public class TopicAdmin implements AutoCloseable { * must be 0 or more * @return the map of offset for each topic partition, or an empty map if the supplied partitions * are null or empty + * @throws UnsupportedVersionException if the broker is too old to support the admin client API to read end offsets * @throws ConnectException if {@code timeoutDuration} is exhausted * @see TopicAdmin#endOffsets(Set) */ @@ -725,6 +726,9 @@ public class TopicAdmin implements AutoCloseable { () -> "list offsets for topic partitions", timeoutDuration, retryBackoffMs); +} catch (UnsupportedVersionException e) { +// Older brokers don't support this admin method, so rethrow it without wrapping it +throw e; } catch (Exception e) { throw new ConnectException("Failed to list offsets for topic partitions.", e); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java index deea050..cf611db 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java @@ -467,8 +467,31 @@ public class TopicAdminTest { } } +/** + * TopicAdmin can be used to read the end offsets, but the admin client API used to do this was + * added to the broker in 0.11.0.0. This means that if Connect talks to older brokers, + * the admin client cannot be used to read end offsets, and will throw an UnsupportedVersionException. + */ +@Test +public void retryEndOffsetsShouldRethrowUnknownVersionException() { +String topicName = "myTopic"; +TopicPartition tp1 = new TopicPartition(topicName, 0); +Set tps = Collections.singleton(tp1); +Long offset = null; // response should use error +Cluster cluster = createCluster(1, topicName, 1); +try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) { +env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); +env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); +// Expect the admin client list offsets will throw unsupported version, simulating older brokers + env.kafkaClient().prepareResponse(listOffsetsResultWithUnsupportedVersion(tp1, offset)); +TopicAdmin admin = new TopicAdmin(null, env.adminClient()); +// The retryEndOffsets should catch and rethrow an unsupported version exception +assertThrows(UnsupportedVersionException.class, () -> admin.retryEndOffsets(tps, Duration.ofMillis(100),
[kafka] branch 2.6 updated: KAFKA-13770: Restore compatibility with KafkaBasedLog using older Kafka brokers (#11946)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 2.6 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.6 by this push: new 0257f26 KAFKA-13770: Restore compatibility with KafkaBasedLog using older Kafka brokers (#11946) 0257f26 is described below commit 0257f266063c3bffaa690fe3e98a3c876be99002 Author: Randall Hauch AuthorDate: Thu Mar 24 21:40:10 2022 -0500 KAFKA-13770: Restore compatibility with KafkaBasedLog using older Kafka brokers (#11946) The `retryEndOffsets(…)` method in `TopicAdmin` recently added (KAFKA-12879, #11797) to allow the `KafkaBasedLog.start()` method to retry any failures reading the last offsets for a topic. However, this introduce a regression when talking to older brokers (0.10.x or earlier). The `KafkaBasedLog` already had logic that expected an `UnsupportedVersionException` thrown by the admin client when a Kafka API is not available on an older broker, but the new retry logic in `TopicAdmin` did not account for this and wrapped the exception, thereby breaking the `KafkaBasedLog` logic and preventing startup. The fix is to propagate this `UnsupportedVersionException` from the `TopicAdmin.retryEndOffsets(…)` method. Added a new unit test that first replicated the problem before the fix, and verified the fix corrects the problem. --- .../org/apache/kafka/connect/util/TopicAdmin.java | 4 .../apache/kafka/connect/util/TopicAdminTest.java | 28 +- 2 files changed, 31 insertions(+), 1 deletion(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java index dfbca5b..fca066f 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java @@ -714,6 +714,7 @@ public class TopicAdmin implements AutoCloseable { * must be 0 or more * @return the map of offset for each topic partition, or an empty map if the supplied partitions * are null or empty + * @throws UnsupportedVersionException if the broker is too old to support the admin client API to read end offsets * @throws ConnectException if {@code timeoutDuration} is exhausted * @see TopicAdmin#endOffsets(Set) */ @@ -725,6 +726,9 @@ public class TopicAdmin implements AutoCloseable { () -> "list offsets for topic partitions", timeoutDuration, retryBackoffMs); +} catch (UnsupportedVersionException e) { +// Older brokers don't support this admin method, so rethrow it without wrapping it +throw e; } catch (Exception e) { throw new ConnectException("Failed to list offsets for topic partitions.", e); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java index dd855e1..8bbad23 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java @@ -503,11 +503,33 @@ public class TopicAdminTest { KafkaFuture future = mockFuture(); when(future.get()).thenReturn(resultsInfo); when(results.partitionResult(eq(tp))).thenReturn(future); +} +/** + * TopicAdmin can be used to read the end offsets, but the admin client API used to do this was + * added to the broker in 0.11.0.0. This means that if Connect talks to older brokers, + * the admin client cannot be used to read end offsets, and will throw an UnsupportedVersionException. + */ +@Test +public void retryEndOffsetsShouldRethrowUnknownVersionException() { +String topicName = "myTopic"; +TopicPartition tp1 = new TopicPartition(topicName, 0); +Set tps = Collections.singleton(tp1); +Long offset = 1000L; +Cluster cluster = createCluster(1, topicName, 1); +try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) { +env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); +env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); +// Expect the admin client list offsets will throw unsupported version, simulating older brokers + env.kafkaClient().prepareResponse(listOffsetsResultWithUnsupportedVersion(tp1, offset)); +TopicAdmin admin = new TopicAdmin(null, env.adminClient()); +// The retryEndOffsets should catch and rethrow an unsupported version ex
[kafka] branch 2.7 updated: KAFKA-13770: Restore compatibility with KafkaBasedLog using older Kafka brokers (#11946)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 2.7 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.7 by this push: new 0189aa6 KAFKA-13770: Restore compatibility with KafkaBasedLog using older Kafka brokers (#11946) 0189aa6 is described below commit 0189aa62882a2bd60c2beb1507176864ad253abb Author: Randall Hauch AuthorDate: Thu Mar 24 21:40:10 2022 -0500 KAFKA-13770: Restore compatibility with KafkaBasedLog using older Kafka brokers (#11946) The `retryEndOffsets(…)` method in `TopicAdmin` recently added (KAFKA-12879, #11797) to allow the `KafkaBasedLog.start()` method to retry any failures reading the last offsets for a topic. However, this introduce a regression when talking to older brokers (0.10.x or earlier). The `KafkaBasedLog` already had logic that expected an `UnsupportedVersionException` thrown by the admin client when a Kafka API is not available on an older broker, but the new retry logic in `TopicAdmin` did not account for this and wrapped the exception, thereby breaking the `KafkaBasedLog` logic and preventing startup. The fix is to propagate this `UnsupportedVersionException` from the `TopicAdmin.retryEndOffsets(…)` method. Added a new unit test that first replicated the problem before the fix, and verified the fix corrects the problem. --- .../org/apache/kafka/connect/util/TopicAdmin.java | 4 +++ .../apache/kafka/connect/util/TopicAdminTest.java | 32 +- 2 files changed, 35 insertions(+), 1 deletion(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java index 4e50eab..df39250 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java @@ -714,6 +714,7 @@ public class TopicAdmin implements AutoCloseable { * must be 0 or more * @return the map of offset for each topic partition, or an empty map if the supplied partitions * are null or empty + * @throws UnsupportedVersionException if the broker is too old to support the admin client API to read end offsets * @throws ConnectException if {@code timeoutDuration} is exhausted * @see TopicAdmin#endOffsets(Set) */ @@ -725,6 +726,9 @@ public class TopicAdmin implements AutoCloseable { () -> "list offsets for topic partitions", timeoutDuration, retryBackoffMs); +} catch (UnsupportedVersionException e) { +// Older brokers don't support this admin method, so rethrow it without wrapping it +throw e; } catch (Exception e) { throw new ConnectException("Failed to list offsets for topic partitions.", e); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java index 9f4f384..0ca8ba2 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java @@ -505,11 +505,33 @@ public class TopicAdminTest { KafkaFuture future = mockFuture(); when(future.get()).thenReturn(resultsInfo); when(results.partitionResult(eq(tp))).thenReturn(future); +} +/** + * TopicAdmin can be used to read the end offsets, but the admin client API used to do this was + * added to the broker in 0.11.0.0. This means that if Connect talks to older brokers, + * the admin client cannot be used to read end offsets, and will throw an UnsupportedVersionException. + */ +@Test +public void retryEndOffsetsShouldRethrowUnknownVersionException() { +String topicName = "myTopic"; +TopicPartition tp1 = new TopicPartition(topicName, 0); +Set tps = Collections.singleton(tp1); +Long offset = 1000L; +Cluster cluster = createCluster(1, topicName, 1); +try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) { +env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); +env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); +// Expect the admin client list offsets will throw unsupported version, simulating older brokers + env.kafkaClient().prepareResponse(listOffsetsResultWithUnsupportedVersion(tp1, offset)); +TopicAdmin admin = new TopicAdmin(null, env.adminClient()); +// The retryEndOffsets should catch and rethrow an unsupported version ex
[kafka] branch 2.8 updated: KAFKA-13770: Restore compatibility with KafkaBasedLog using older Kafka brokers (#11946)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 2.8 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.8 by this push: new 4eb0aee KAFKA-13770: Restore compatibility with KafkaBasedLog using older Kafka brokers (#11946) 4eb0aee is described below commit 4eb0aeec903b8043c47668cf99da774203d3eb96 Author: Randall Hauch AuthorDate: Thu Mar 24 21:40:10 2022 -0500 KAFKA-13770: Restore compatibility with KafkaBasedLog using older Kafka brokers (#11946) The `retryEndOffsets(…)` method in `TopicAdmin` recently added (KAFKA-12879, #11797) to allow the `KafkaBasedLog.start()` method to retry any failures reading the last offsets for a topic. However, this introduce a regression when talking to older brokers (0.10.x or earlier). The `KafkaBasedLog` already had logic that expected an `UnsupportedVersionException` thrown by the admin client when a Kafka API is not available on an older broker, but the new retry logic in `TopicAdmin` did not account for this and wrapped the exception, thereby breaking the `KafkaBasedLog` logic and preventing startup. The fix is to propagate this `UnsupportedVersionException` from the `TopicAdmin.retryEndOffsets(…)` method. Added a new unit test that first replicated the problem before the fix, and verified the fix corrects the problem. --- .../org/apache/kafka/connect/util/TopicAdmin.java | 4 .../apache/kafka/connect/util/TopicAdminTest.java | 25 +- 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java index 86a3ea2..de43742 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java @@ -714,6 +714,7 @@ public class TopicAdmin implements AutoCloseable { * must be 0 or more * @return the map of offset for each topic partition, or an empty map if the supplied partitions * are null or empty + * @throws UnsupportedVersionException if the broker is too old to support the admin client API to read end offsets * @throws ConnectException if {@code timeoutDuration} is exhausted * @see TopicAdmin#endOffsets(Set) */ @@ -725,6 +726,9 @@ public class TopicAdmin implements AutoCloseable { () -> "list offsets for topic partitions", timeoutDuration, retryBackoffMs); +} catch (UnsupportedVersionException e) { +// Older brokers don't support this admin method, so rethrow it without wrapping it +throw e; } catch (Exception e) { throw new ConnectException("Failed to list offsets for topic partitions.", e); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java index 72a808e..06da0f0b 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java @@ -467,8 +467,31 @@ public class TopicAdminTest { } } +/** + * TopicAdmin can be used to read the end offsets, but the admin client API used to do this was + * added to the broker in 0.11.0.0. This means that if Connect talks to older brokers, + * the admin client cannot be used to read end offsets, and will throw an UnsupportedVersionException. + */ +@Test +public void retryEndOffsetsShouldRethrowUnknownVersionException() { +String topicName = "myTopic"; +TopicPartition tp1 = new TopicPartition(topicName, 0); +Set tps = Collections.singleton(tp1); +Long offset = null; // response should use error +Cluster cluster = createCluster(1, topicName, 1); +try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) { +env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); +env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); +// Expect the admin client list offsets will throw unsupported version, simulating older brokers + env.kafkaClient().prepareResponse(listOffsetsResultWithUnsupportedVersion(tp1, offset)); +TopicAdmin admin = new TopicAdmin(null, env.adminClient()); +// The retryEndOffsets should catch and rethrow an unsupported version exception +assertThrows(UnsupportedVersionException.class, () -> admin.retryEndOffsets(tps, Duration.ofMillis(100),
[kafka] branch 3.0 updated: KAFKA-13770: Restore compatibility with KafkaBasedLog using older Kafka brokers (#11946)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 3.0 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.0 by this push: new d03a08d KAFKA-13770: Restore compatibility with KafkaBasedLog using older Kafka brokers (#11946) d03a08d is described below commit d03a08d05bb95daca0a96b5308aa91a6130d4156 Author: Randall Hauch AuthorDate: Thu Mar 24 21:40:10 2022 -0500 KAFKA-13770: Restore compatibility with KafkaBasedLog using older Kafka brokers (#11946) The `retryEndOffsets(…)` method in `TopicAdmin` recently added (KAFKA-12879, #11797) to allow the `KafkaBasedLog.start()` method to retry any failures reading the last offsets for a topic. However, this introduce a regression when talking to older brokers (0.10.x or earlier). The `KafkaBasedLog` already had logic that expected an `UnsupportedVersionException` thrown by the admin client when a Kafka API is not available on an older broker, but the new retry logic in `TopicAdmin` did not account for this and wrapped the exception, thereby breaking the `KafkaBasedLog` logic and preventing startup. The fix is to propagate this `UnsupportedVersionException` from the `TopicAdmin.retryEndOffsets(…)` method. Added a new unit test that first replicated the problem before the fix, and verified the fix corrects the problem. --- .../org/apache/kafka/connect/util/TopicAdmin.java | 4 .../apache/kafka/connect/util/TopicAdminTest.java | 25 +- 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java index 86a3ea2..de43742 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java @@ -714,6 +714,7 @@ public class TopicAdmin implements AutoCloseable { * must be 0 or more * @return the map of offset for each topic partition, or an empty map if the supplied partitions * are null or empty + * @throws UnsupportedVersionException if the broker is too old to support the admin client API to read end offsets * @throws ConnectException if {@code timeoutDuration} is exhausted * @see TopicAdmin#endOffsets(Set) */ @@ -725,6 +726,9 @@ public class TopicAdmin implements AutoCloseable { () -> "list offsets for topic partitions", timeoutDuration, retryBackoffMs); +} catch (UnsupportedVersionException e) { +// Older brokers don't support this admin method, so rethrow it without wrapping it +throw e; } catch (Exception e) { throw new ConnectException("Failed to list offsets for topic partitions.", e); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java index 72a808e..06da0f0b 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java @@ -467,8 +467,31 @@ public class TopicAdminTest { } } +/** + * TopicAdmin can be used to read the end offsets, but the admin client API used to do this was + * added to the broker in 0.11.0.0. This means that if Connect talks to older brokers, + * the admin client cannot be used to read end offsets, and will throw an UnsupportedVersionException. + */ +@Test +public void retryEndOffsetsShouldRethrowUnknownVersionException() { +String topicName = "myTopic"; +TopicPartition tp1 = new TopicPartition(topicName, 0); +Set tps = Collections.singleton(tp1); +Long offset = null; // response should use error +Cluster cluster = createCluster(1, topicName, 1); +try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) { +env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); +env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); +// Expect the admin client list offsets will throw unsupported version, simulating older brokers + env.kafkaClient().prepareResponse(listOffsetsResultWithUnsupportedVersion(tp1, offset)); +TopicAdmin admin = new TopicAdmin(null, env.adminClient()); +// The retryEndOffsets should catch and rethrow an unsupported version exception +assertThrows(UnsupportedVersionException.class, () -> admin.retryEndOffsets(tps, Duration.ofMillis(100),
[kafka] branch 3.1 updated: KAFKA-13770: Restore compatibility with KafkaBasedLog using older Kafka brokers (#11946)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 3.1 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.1 by this push: new e176f4b KAFKA-13770: Restore compatibility with KafkaBasedLog using older Kafka brokers (#11946) e176f4b is described below commit e176f4b2443f221d467c7985ecb9387a626751bf Author: Randall Hauch AuthorDate: Thu Mar 24 21:40:10 2022 -0500 KAFKA-13770: Restore compatibility with KafkaBasedLog using older Kafka brokers (#11946) The `retryEndOffsets(…)` method in `TopicAdmin` recently added (KAFKA-12879, #11797) to allow the `KafkaBasedLog.start()` method to retry any failures reading the last offsets for a topic. However, this introduce a regression when talking to older brokers (0.10.x or earlier). The `KafkaBasedLog` already had logic that expected an `UnsupportedVersionException` thrown by the admin client when a Kafka API is not available on an older broker, but the new retry logic in `TopicAdmin` did not account for this and wrapped the exception, thereby breaking the `KafkaBasedLog` logic and preventing startup. The fix is to propagate this `UnsupportedVersionException` from the `TopicAdmin.retryEndOffsets(…)` method. Added a new unit test that first replicated the problem before the fix, and verified the fix corrects the problem. --- .../org/apache/kafka/connect/util/TopicAdmin.java | 4 .../apache/kafka/connect/util/TopicAdminTest.java | 25 +- 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java index c1afd2c..8b2859a 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java @@ -714,6 +714,7 @@ public class TopicAdmin implements AutoCloseable { * must be 0 or more * @return the map of offset for each topic partition, or an empty map if the supplied partitions * are null or empty + * @throws UnsupportedVersionException if the broker is too old to support the admin client API to read end offsets * @throws ConnectException if {@code timeoutDuration} is exhausted * @see TopicAdmin#endOffsets(Set) */ @@ -725,6 +726,9 @@ public class TopicAdmin implements AutoCloseable { () -> "list offsets for topic partitions", timeoutDuration, retryBackoffMs); +} catch (UnsupportedVersionException e) { +// Older brokers don't support this admin method, so rethrow it without wrapping it +throw e; } catch (Exception e) { throw new ConnectException("Failed to list offsets for topic partitions.", e); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java index 32cec61..3c2f3e4 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java @@ -467,8 +467,31 @@ public class TopicAdminTest { } } +/** + * TopicAdmin can be used to read the end offsets, but the admin client API used to do this was + * added to the broker in 0.11.0.0. This means that if Connect talks to older brokers, + * the admin client cannot be used to read end offsets, and will throw an UnsupportedVersionException. + */ +@Test +public void retryEndOffsetsShouldRethrowUnknownVersionException() { +String topicName = "myTopic"; +TopicPartition tp1 = new TopicPartition(topicName, 0); +Set tps = Collections.singleton(tp1); +Long offset = null; // response should use error +Cluster cluster = createCluster(1, topicName, 1); +try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) { +env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); +env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); +// Expect the admin client list offsets will throw unsupported version, simulating older brokers + env.kafkaClient().prepareResponse(listOffsetsResultWithUnsupportedVersion(tp1, offset)); +TopicAdmin admin = new TopicAdmin(null, env.adminClient()); +// The retryEndOffsets should catch and rethrow an unsupported version exception +assertThrows(UnsupportedVersionException.class, () -> admin.retryEndOffsets(tps, Duration.ofMillis(100),
[kafka] branch trunk updated: KAFKA-13770: Restore compatibility with KafkaBasedLog using older Kafka brokers (#11946)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new ce88389 KAFKA-13770: Restore compatibility with KafkaBasedLog using older Kafka brokers (#11946) ce88389 is described below commit ce883892270a02a72e91afbdb1fabdd50d7da474 Author: Randall Hauch AuthorDate: Thu Mar 24 21:40:10 2022 -0500 KAFKA-13770: Restore compatibility with KafkaBasedLog using older Kafka brokers (#11946) The `retryEndOffsets(…)` method in `TopicAdmin` recently added (KAFKA-12879, #11797) to allow the `KafkaBasedLog.start()` method to retry any failures reading the last offsets for a topic. However, this introduce a regression when talking to older brokers (0.10.x or earlier). The `KafkaBasedLog` already had logic that expected an `UnsupportedVersionException` thrown by the admin client when a Kafka API is not available on an older broker, but the new retry logic in `TopicAdmin` did not account for this and wrapped the exception, thereby breaking the `KafkaBasedLog` logic and preventing startup. The fix is to propagate this `UnsupportedVersionException` from the `TopicAdmin.retryEndOffsets(…)` method. Added a new unit test that first replicated the problem before the fix, and verified the fix corrects the problem. --- .../org/apache/kafka/connect/util/TopicAdmin.java | 4 .../apache/kafka/connect/util/TopicAdminTest.java | 25 +- 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java index faf7b37..d97533a 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java @@ -714,6 +714,7 @@ public class TopicAdmin implements AutoCloseable { * must be 0 or more * @return the map of offset for each topic partition, or an empty map if the supplied partitions * are null or empty + * @throws UnsupportedVersionException if the broker is too old to support the admin client API to read end offsets * @throws ConnectException if {@code timeoutDuration} is exhausted * @see TopicAdmin#endOffsets(Set) */ @@ -725,6 +726,9 @@ public class TopicAdmin implements AutoCloseable { () -> "list offsets for topic partitions", timeoutDuration, retryBackoffMs); +} catch (UnsupportedVersionException e) { +// Older brokers don't support this admin method, so rethrow it without wrapping it +throw e; } catch (Exception e) { throw new ConnectException("Failed to list offsets for topic partitions.", e); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java index deea050..cf611db 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java @@ -467,8 +467,31 @@ public class TopicAdminTest { } } +/** + * TopicAdmin can be used to read the end offsets, but the admin client API used to do this was + * added to the broker in 0.11.0.0. This means that if Connect talks to older brokers, + * the admin client cannot be used to read end offsets, and will throw an UnsupportedVersionException. + */ +@Test +public void retryEndOffsetsShouldRethrowUnknownVersionException() { +String topicName = "myTopic"; +TopicPartition tp1 = new TopicPartition(topicName, 0); +Set tps = Collections.singleton(tp1); +Long offset = null; // response should use error +Cluster cluster = createCluster(1, topicName, 1); +try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) { +env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); +env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); +// Expect the admin client list offsets will throw unsupported version, simulating older brokers + env.kafkaClient().prepareResponse(listOffsetsResultWithUnsupportedVersion(tp1, offset)); +TopicAdmin admin = new TopicAdmin(null, env.adminClient()); +// The retryEndOffsets should catch and rethrow an unsupported version exception +assertThrows(UnsupportedVersionException.class, () -> admin.retryEndOf
[kafka] branch 2.5 updated: KAFKA-12879: Revert "KAFKA-12339: Add retry to admin client's listOffsets (#10152)"
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 2.5 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.5 by this push: new d5b53ad KAFKA-12879: Revert "KAFKA-12339: Add retry to admin client's listOffsets (#10152)" d5b53ad is described below commit d5b53ad132d1c1bfcd563ce5015884b6da831777 Author: Randall Hauch AuthorDate: Thu Mar 10 15:16:24 2022 -0600 KAFKA-12879: Revert "KAFKA-12339: Add retry to admin client's listOffsets (#10152)" This reverts commit fe132ee29329116a60e77b1d3e56aef42ae6347c. --- .../admin/internals/MetadataOperationContext.java | 1 - .../kafka/clients/admin/KafkaAdminClientTest.java | 41 ++ 2 files changed, 2 insertions(+), 40 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java index e7f2c07..c05e5cf 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java @@ -82,7 +82,6 @@ public final class MetadataOperationContext> { public static void handleMetadataErrors(MetadataResponse response) { for (TopicMetadata tm : response.topicMetadata()) { -if (shouldRefreshMetadata(tm.error())) throw tm.error().exception(); for (PartitionMetadata pm : tm.partitionMetadata()) { if (shouldRefreshMetadata(pm.error)) { throw pm.error.exception(); diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 7f30846..28b4640 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -342,15 +342,11 @@ public class KafkaAdminClientTest { } private static MetadataResponse prepareMetadataResponse(Cluster cluster, Errors error) { -return prepareMetadataResponse(cluster, error, error); -} - -private static MetadataResponse prepareMetadataResponse(Cluster cluster, Errors topicError, Errors partitionError) { List metadata = new ArrayList<>(); for (String topic : cluster.topics()) { List pms = new ArrayList<>(); for (PartitionInfo pInfo : cluster.availablePartitionsForTopic(topic)) { -PartitionMetadata pm = new PartitionMetadata(partitionError, +PartitionMetadata pm = new PartitionMetadata(error, new TopicPartition(topic, pInfo.partition()), Optional.of(pInfo.leader().id()), Optional.of(234), @@ -359,7 +355,7 @@ public class KafkaAdminClientTest { Arrays.stream(pInfo.offlineReplicas()).map(Node::id).collect(Collectors.toList())); pms.add(pm); } -TopicMetadata tm = new TopicMetadata(topicError, topic, false, pms); +TopicMetadata tm = new TopicMetadata(error, topic, false, pms); metadata.add(tm); } return MetadataResponse.prepareResponse(0, @@ -2347,39 +2343,6 @@ public class KafkaAdminClientTest { } @Test -public void testListOffsetsRetriableErrorOnMetadata() throws Exception { -Node node = new Node(0, "localhost", 8120); -List nodes = Collections.singletonList(node); -final Cluster cluster = new Cluster( -"mockClusterId", -nodes, -Collections.singleton(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node})), -Collections.emptySet(), -Collections.emptySet(), -node); -final TopicPartition tp0 = new TopicPartition("foo", 0); - -try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) { -env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); -env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.NONE)); -// metadata refresh because of UNKNOWN_TOPIC_OR_PARTITION -env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); -// listoffsets response from broker 0 -Map responseData = new HashMap<>(); -responseData.put(tp0, new PartitionData(Errors.NONE, -1L, 123L, Optional.of(321))); -env.kafkaClient().prepareResponse(new ListOffsetResponse(responseData)); - -ListOffsetsResult result = env.adminClient().listOffsets
[kafka] 03/03: KAFKA-12879: Remove extra sleep (#11872)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 2.6 in repository https://gitbox.apache.org/repos/asf/kafka.git commit 80e589989b744d9e1e5f4b3cc787d2d59053f9cd Author: Randall Hauch AuthorDate: Wed Mar 9 15:11:46 2022 -0600 KAFKA-12879: Remove extra sleep (#11872) --- .../runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java | 1 - 1 file changed, 1 deletion(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java index 64fd97c..84b9af1 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java @@ -96,7 +96,6 @@ public class RetryUtil { } Utils.sleep(retryBackoffMs); } -Utils.sleep(retryBackoffMs); } while (System.currentTimeMillis() < end); throw new ConnectException("Fail to " + descriptionStr + " after " + attempt + " attempts. Reason: " + lastError.getMessage(), lastError);
[kafka] 02/03: KAFKA-12879: Addendum to reduce flakiness of tests (#11871)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 2.6 in repository https://gitbox.apache.org/repos/asf/kafka.git commit ea032a1b9d5549b234b2f0a1bfb7bd1062c50db1 Author: Philip Nee AuthorDate: Wed Mar 9 12:37:48 2022 -0800 KAFKA-12879: Addendum to reduce flakiness of tests (#11871) This is an addendum to the KAFKA-12879 (#11797) to fix some tests that are somewhat flaky when a build machine is heavily loaded (when the timeouts are too small). - Add an if check to void sleep(0) - Increase timeout in the tests --- .../main/java/org/apache/kafka/connect/util/RetryUtil.java| 11 +++ .../java/org/apache/kafka/connect/util/RetryUtilTest.java | 10 ++ 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java index 9463f6a..64fd97c 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java @@ -88,10 +88,13 @@ public class RetryUtil { lastError = e; } -long millisRemaining = Math.max(0, end - System.currentTimeMillis()); -if (millisRemaining < retryBackoffMs) { -// exit when the time remaining is less than retryBackoffMs -break; +if (retryBackoffMs > 0) { +long millisRemaining = Math.max(0, end - System.currentTimeMillis()); +if (millisRemaining < retryBackoffMs) { +// exit when the time remaining is less than retryBackoffMs +break; +} +Utils.sleep(retryBackoffMs); } Utils.sleep(retryBackoffMs); } while (System.currentTimeMillis() < end); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/RetryUtilTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/RetryUtilTest.java index d6bc23b..c5445d3 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/RetryUtilTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/RetryUtilTest.java @@ -17,8 +17,8 @@ package org.apache.kafka.connect.util; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertThrows; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.WakeupException; @@ -36,6 +36,8 @@ import java.util.function.Supplier; @RunWith(PowerMockRunner.class) public class RetryUtilTest { +private static final Duration TIMEOUT = Duration.ofSeconds(10); + private Callable mockCallable; private final Supplier testMsg = () -> "Test"; @@ -69,7 +71,7 @@ public class RetryUtilTest { .thenThrow(new TimeoutException()) .thenReturn("success"); -assertEquals("success", RetryUtil.retryUntilTimeout(mockCallable, testMsg, Duration.ofMillis(100), 1)); +assertEquals("success", RetryUtil.retryUntilTimeout(mockCallable, testMsg, TIMEOUT, 1)); Mockito.verify(mockCallable, Mockito.times(4)).call(); } @@ -83,7 +85,7 @@ public class RetryUtilTest { .thenThrow(new TimeoutException("timeout")) .thenThrow(new NullPointerException("Non retriable")); NullPointerException e = assertThrows(NullPointerException.class, -() -> RetryUtil.retryUntilTimeout(mockCallable, testMsg, Duration.ofMillis(100), 0)); +() -> RetryUtil.retryUntilTimeout(mockCallable, testMsg, TIMEOUT, 0)); assertEquals("Non retriable", e.getMessage()); Mockito.verify(mockCallable, Mockito.times(6)).call(); } @@ -114,7 +116,7 @@ public class RetryUtilTest { .thenThrow(new TimeoutException()) .thenReturn("success"); -assertEquals("success", RetryUtil.retryUntilTimeout(mockCallable, testMsg, Duration.ofMillis(50), 0)); +assertEquals("success", RetryUtil.retryUntilTimeout(mockCallable, testMsg, TIMEOUT, 0)); Mockito.verify(mockCallable, Mockito.times(4)).call(); }
[kafka] 01/03: KAFKA-12879: Revert changes from KAFKA-12339 and instead add retry capability to KafkaBasedLog (#11797)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 2.6 in repository https://gitbox.apache.org/repos/asf/kafka.git commit 572d4fda383ac48e064e294cbb6d03537e44d071 Author: Philip Nee AuthorDate: Wed Mar 9 10:39:28 2022 -0800 KAFKA-12879: Revert changes from KAFKA-12339 and instead add retry capability to KafkaBasedLog (#11797) Fixes the compatibility issue regarding KAFKA-12879 by reverting the changes to the admin client from KAFKA-12339 (#10152) that retry admin client operations, and instead perform the retries within Connect's `KafkaBasedLog` during startup via a new `TopicAdmin.retryEndOffsets(..)` method. This method delegates to the existing `TopicAdmin.endOffsets(...)` method, but will retry on `RetriableException` until the retry timeout elapses. This change should be backward compatible to the KAFKA-12339 so that when Connect's `KafkaBasedLog` starts up it will retry attempts to read the end offsets for the log's topic. The `KafkaBasedLog` existing thread already has its own retry logic, and this is not changed. Added more unit tests, and thoroughly tested the new `RetryUtil` used to encapsulate the parameterized retry logic around any supplied function. --- checkstyle/suppressions.xml| 2 +- .../admin/internals/MetadataOperationContext.java | 1 - .../kafka/clients/admin/KafkaAdminClientTest.java | 41 + .../apache/kafka/connect/util/KafkaBasedLog.java | 27 ++- .../org/apache/kafka/connect/util/RetryUtil.java | 101 +++ .../org/apache/kafka/connect/util/TopicAdmin.java | 29 +++- .../kafka/connect/util/KafkaBasedLogTest.java | 14 +- .../apache/kafka/connect/util/RetryUtilTest.java | 184 + .../apache/kafka/connect/util/TopicAdminTest.java | 127 +- 9 files changed, 468 insertions(+), 58 deletions(-) diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index 2ecb6cb..fe777ec 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -114,7 +114,7 @@ files="ConfigKeyInfo.java"/> + files="(RestServer|AbstractHerder|DistributedHerder|TopicAdminTest).java"/> diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java index e7f2c07..c05e5cf 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java @@ -82,7 +82,6 @@ public final class MetadataOperationContext> { public static void handleMetadataErrors(MetadataResponse response) { for (TopicMetadata tm : response.topicMetadata()) { -if (shouldRefreshMetadata(tm.error())) throw tm.error().exception(); for (PartitionMetadata pm : tm.partitionMetadata()) { if (shouldRefreshMetadata(pm.error)) { throw pm.error.exception(); diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index e83f7be..aa55866 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -356,16 +356,12 @@ public class KafkaAdminClientTest { } private static MetadataResponse prepareMetadataResponse(Cluster cluster, Errors error) { -return prepareMetadataResponse(cluster, error, error); -} - -private static MetadataResponse prepareMetadataResponse(Cluster cluster, Errors topicError, Errors partitionError) { List metadata = new ArrayList<>(); for (String topic : cluster.topics()) { List pms = new ArrayList<>(); for (PartitionInfo pInfo : cluster.availablePartitionsForTopic(topic)) { MetadataResponsePartition pm = new MetadataResponsePartition() -.setErrorCode(partitionError.code()) +.setErrorCode(error.code()) .setPartitionIndex(pInfo.partition()) .setLeaderId(pInfo.leader().id()) .setLeaderEpoch(234) @@ -375,7 +371,7 @@ public class KafkaAdminClientTest { pms.add(pm); } MetadataResponseTopic tm = new MetadataResponseTopic() -.setErrorCode(topicError.code()) +.setErrorCode(error.code()) .setName(topic) .setIsInternal(false) .setPartitions(pms); @@ -2985,39 +2981,6 @@ public class KafkaAdminClientTest { } @Test -public void testListOffsetsRe
[kafka] branch 2.6 updated (3371d7f -> 80e5899)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a change to branch 2.6 in repository https://gitbox.apache.org/repos/asf/kafka.git. from 3371d7f KAFKA-12462: proceed with task revocation in case of thread in PENDING_SHUTDOWN (#10311) new 572d4fd KAFKA-12879: Revert changes from KAFKA-12339 and instead add retry capability to KafkaBasedLog (#11797) new ea032a1 KAFKA-12879: Addendum to reduce flakiness of tests (#11871) new 80e5899 KAFKA-12879: Remove extra sleep (#11872) The 3 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: checkstyle/suppressions.xml| 2 +- .../admin/internals/MetadataOperationContext.java | 1 - .../kafka/clients/admin/KafkaAdminClientTest.java | 41 + .../apache/kafka/connect/util/KafkaBasedLog.java | 27 ++- .../org/apache/kafka/connect/util/RetryUtil.java | 103 .../org/apache/kafka/connect/util/TopicAdmin.java | 29 +++- .../kafka/connect/util/KafkaBasedLogTest.java | 14 +- .../apache/kafka/connect/util/RetryUtilTest.java | 186 + .../apache/kafka/connect/util/TopicAdminTest.java | 127 +- 9 files changed, 472 insertions(+), 58 deletions(-) create mode 100644 connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java create mode 100644 connect/runtime/src/test/java/org/apache/kafka/connect/util/RetryUtilTest.java
[kafka] 01/03: KAFKA-12879: Revert changes from KAFKA-12339 and instead add retry capability to KafkaBasedLog (#11797)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 2.7 in repository https://gitbox.apache.org/repos/asf/kafka.git commit e2ed013c5a4592131ad8a09628cb5a339bced134 Author: Philip Nee AuthorDate: Wed Mar 9 10:39:28 2022 -0800 KAFKA-12879: Revert changes from KAFKA-12339 and instead add retry capability to KafkaBasedLog (#11797) Fixes the compatibility issue regarding KAFKA-12879 by reverting the changes to the admin client from KAFKA-12339 (#10152) that retry admin client operations, and instead perform the retries within Connect's `KafkaBasedLog` during startup via a new `TopicAdmin.retryEndOffsets(..)` method. This method delegates to the existing `TopicAdmin.endOffsets(...)` method, but will retry on `RetriableException` until the retry timeout elapses. This change should be backward compatible to the KAFKA-12339 so that when Connect's `KafkaBasedLog` starts up it will retry attempts to read the end offsets for the log's topic. The `KafkaBasedLog` existing thread already has its own retry logic, and this is not changed. Added more unit tests, and thoroughly tested the new `RetryUtil` used to encapsulate the parameterized retry logic around any supplied function. --- checkstyle/suppressions.xml| 2 +- .../admin/internals/MetadataOperationContext.java | 1 - .../kafka/clients/admin/KafkaAdminClientTest.java | 43 + .../apache/kafka/connect/util/KafkaBasedLog.java | 27 ++- .../org/apache/kafka/connect/util/RetryUtil.java | 101 +++ .../org/apache/kafka/connect/util/TopicAdmin.java | 29 +++- .../kafka/connect/util/KafkaBasedLogTest.java | 14 +- .../apache/kafka/connect/util/RetryUtilTest.java | 184 + .../apache/kafka/connect/util/TopicAdminTest.java | 140 +++- 9 files changed, 481 insertions(+), 60 deletions(-) diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index 8b353b6..fd59064 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -114,7 +114,7 @@ files="ConfigKeyInfo.java"/> + files="(RestServer|AbstractHerder|DistributedHerder|TopicAdminTest).java"/> diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java index e7f2c07..c05e5cf 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java @@ -82,7 +82,6 @@ public final class MetadataOperationContext> { public static void handleMetadataErrors(MetadataResponse response) { for (TopicMetadata tm : response.topicMetadata()) { -if (shouldRefreshMetadata(tm.error())) throw tm.error().exception(); for (PartitionMetadata pm : tm.partitionMetadata()) { if (shouldRefreshMetadata(pm.error)) { throw pm.error.exception(); diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index eec6a05..0774ba3 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -435,16 +435,12 @@ public class KafkaAdminClientTest { } private static MetadataResponse prepareMetadataResponse(Cluster cluster, Errors error) { -return prepareMetadataResponse(cluster, error, error); -} - -private static MetadataResponse prepareMetadataResponse(Cluster cluster, Errors topicError, Errors partitionError) { List metadata = new ArrayList<>(); for (String topic : cluster.topics()) { List pms = new ArrayList<>(); for (PartitionInfo pInfo : cluster.availablePartitionsForTopic(topic)) { MetadataResponsePartition pm = new MetadataResponsePartition() -.setErrorCode(partitionError.code()) +.setErrorCode(error.code()) .setPartitionIndex(pInfo.partition()) .setLeaderId(pInfo.leader().id()) .setLeaderEpoch(234) @@ -454,7 +450,7 @@ public class KafkaAdminClientTest { pms.add(pm); } MetadataResponseTopic tm = new MetadataResponseTopic() -.setErrorCode(topicError.code()) +.setErrorCode(error.code()) .setName(topic) .setIsInternal(false) .setPartitions(pms); @@ -3839,41 +3835,6 @@ public class KafkaAdminClientTest { } @Test -public void testListOffsetsRe
[kafka] 03/03: KAFKA-12879: Remove extra sleep (#11872)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 2.7 in repository https://gitbox.apache.org/repos/asf/kafka.git commit 1d97f7421dbb06a31a4cbb13bad8759eda6f5e29 Author: Randall Hauch AuthorDate: Wed Mar 9 15:11:46 2022 -0600 KAFKA-12879: Remove extra sleep (#11872) --- .../runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java | 1 - 1 file changed, 1 deletion(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java index 64fd97c..84b9af1 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java @@ -96,7 +96,6 @@ public class RetryUtil { } Utils.sleep(retryBackoffMs); } -Utils.sleep(retryBackoffMs); } while (System.currentTimeMillis() < end); throw new ConnectException("Fail to " + descriptionStr + " after " + attempt + " attempts. Reason: " + lastError.getMessage(), lastError);
[kafka] 02/03: KAFKA-12879: Addendum to reduce flakiness of tests (#11871)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 2.7 in repository https://gitbox.apache.org/repos/asf/kafka.git commit c0d3bb915dd5fbf26f8d85c6166a92f8efb046c7 Author: Philip Nee AuthorDate: Wed Mar 9 12:37:48 2022 -0800 KAFKA-12879: Addendum to reduce flakiness of tests (#11871) This is an addendum to the KAFKA-12879 (#11797) to fix some tests that are somewhat flaky when a build machine is heavily loaded (when the timeouts are too small). - Add an if check to void sleep(0) - Increase timeout in the tests --- .../main/java/org/apache/kafka/connect/util/RetryUtil.java| 11 +++ .../java/org/apache/kafka/connect/util/RetryUtilTest.java | 10 ++ 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java index 9463f6a..64fd97c 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java @@ -88,10 +88,13 @@ public class RetryUtil { lastError = e; } -long millisRemaining = Math.max(0, end - System.currentTimeMillis()); -if (millisRemaining < retryBackoffMs) { -// exit when the time remaining is less than retryBackoffMs -break; +if (retryBackoffMs > 0) { +long millisRemaining = Math.max(0, end - System.currentTimeMillis()); +if (millisRemaining < retryBackoffMs) { +// exit when the time remaining is less than retryBackoffMs +break; +} +Utils.sleep(retryBackoffMs); } Utils.sleep(retryBackoffMs); } while (System.currentTimeMillis() < end); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/RetryUtilTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/RetryUtilTest.java index 05f0212..58c101b 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/RetryUtilTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/RetryUtilTest.java @@ -36,6 +36,8 @@ import java.util.function.Supplier; @RunWith(PowerMockRunner.class) public class RetryUtilTest { +private static final Duration TIMEOUT = Duration.ofSeconds(10); + private Callable mockCallable; private final Supplier testMsg = () -> "Test"; @@ -69,7 +71,7 @@ public class RetryUtilTest { .thenThrow(new TimeoutException()) .thenReturn("success"); -assertEquals("success", RetryUtil.retryUntilTimeout(mockCallable, testMsg, Duration.ofMillis(100), 1)); +assertEquals("success", RetryUtil.retryUntilTimeout(mockCallable, testMsg, TIMEOUT, 1)); Mockito.verify(mockCallable, Mockito.times(4)).call(); } @@ -83,7 +85,7 @@ public class RetryUtilTest { .thenThrow(new TimeoutException("timeout")) .thenThrow(new NullPointerException("Non retriable")); NullPointerException e = assertThrows(NullPointerException.class, -() -> RetryUtil.retryUntilTimeout(mockCallable, testMsg, Duration.ofMillis(100), 0)); +() -> RetryUtil.retryUntilTimeout(mockCallable, testMsg, TIMEOUT, 0)); assertEquals("Non retriable", e.getMessage()); Mockito.verify(mockCallable, Mockito.times(6)).call(); } @@ -114,7 +116,7 @@ public class RetryUtilTest { .thenThrow(new TimeoutException()) .thenReturn("success"); -assertEquals("success", RetryUtil.retryUntilTimeout(mockCallable, testMsg, Duration.ofMillis(50), 0)); +assertEquals("success", RetryUtil.retryUntilTimeout(mockCallable, testMsg, TIMEOUT, 0)); Mockito.verify(mockCallable, Mockito.times(4)).call(); } @@ -151,7 +153,7 @@ public class RetryUtilTest { Mockito.when(mockCallable.call()) .thenThrow(new TimeoutException("timeout")) .thenReturn("success"); -assertEquals("success", RetryUtil.retryUntilTimeout(mockCallable, testMsg, Duration.ofMillis(100), -1)); +assertEquals("success", RetryUtil.retryUntilTimeout(mockCallable, testMsg, TIMEOUT, -1)); Mockito.verify(mockCallable, Mockito.times(2)).call(); }
[kafka] branch 2.7 updated (6eed774 -> 1d97f74)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a change to branch 2.7 in repository https://gitbox.apache.org/repos/asf/kafka.git. from 6eed774 KAFKA-13406: skip assignment validation for built-in cooperativeStickyAssignor (#11439) new e2ed013 KAFKA-12879: Revert changes from KAFKA-12339 and instead add retry capability to KafkaBasedLog (#11797) new c0d3bb9 KAFKA-12879: Addendum to reduce flakiness of tests (#11871) new 1d97f74 KAFKA-12879: Remove extra sleep (#11872) The 3 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: checkstyle/suppressions.xml| 2 +- .../admin/internals/MetadataOperationContext.java | 1 - .../kafka/clients/admin/KafkaAdminClientTest.java | 43 + .../apache/kafka/connect/util/KafkaBasedLog.java | 27 ++- .../org/apache/kafka/connect/util/RetryUtil.java | 103 .../org/apache/kafka/connect/util/TopicAdmin.java | 29 +++- .../kafka/connect/util/KafkaBasedLogTest.java | 14 +- .../apache/kafka/connect/util/RetryUtilTest.java | 186 + .../apache/kafka/connect/util/TopicAdminTest.java | 140 +++- 9 files changed, 485 insertions(+), 60 deletions(-) create mode 100644 connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java create mode 100644 connect/runtime/src/test/java/org/apache/kafka/connect/util/RetryUtilTest.java
[kafka] 03/03: KAFKA-12879: Remove extra sleep (#11872)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 2.8 in repository https://gitbox.apache.org/repos/asf/kafka.git commit d6525839dbe1bf5e6dba37ca2705ab1f3bac516b Author: Randall Hauch AuthorDate: Wed Mar 9 15:11:46 2022 -0600 KAFKA-12879: Remove extra sleep (#11872) --- .../runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java | 1 - 1 file changed, 1 deletion(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java index 64fd97c..84b9af1 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java @@ -96,7 +96,6 @@ public class RetryUtil { } Utils.sleep(retryBackoffMs); } -Utils.sleep(retryBackoffMs); } while (System.currentTimeMillis() < end); throw new ConnectException("Fail to " + descriptionStr + " after " + attempt + " attempts. Reason: " + lastError.getMessage(), lastError);
[kafka] 01/03: KAFKA-12879: Revert changes from KAFKA-12339 and instead add retry capability to KafkaBasedLog (#11797)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 2.8 in repository https://gitbox.apache.org/repos/asf/kafka.git commit 346cc48de9a3152e3ffa64f8432fc878e9be0418 Author: Philip Nee AuthorDate: Wed Mar 9 10:39:28 2022 -0800 KAFKA-12879: Revert changes from KAFKA-12339 and instead add retry capability to KafkaBasedLog (#11797) Fixes the compatibility issue regarding KAFKA-12879 by reverting the changes to the admin client from KAFKA-12339 (#10152) that retry admin client operations, and instead perform the retries within Connect's `KafkaBasedLog` during startup via a new `TopicAdmin.retryEndOffsets(..)` method. This method delegates to the existing `TopicAdmin.endOffsets(...)` method, but will retry on `RetriableException` until the retry timeout elapses. This change should be backward compatible to the KAFKA-12339 so that when Connect's `KafkaBasedLog` starts up it will retry attempts to read the end offsets for the log's topic. The `KafkaBasedLog` existing thread already has its own retry logic, and this is not changed. Added more unit tests, and thoroughly tested the new `RetryUtil` used to encapsulate the parameterized retry logic around any supplied function. --- .../admin/internals/MetadataOperationContext.java | 1 - .../kafka/clients/admin/KafkaAdminClientTest.java | 42 + .../apache/kafka/connect/util/KafkaBasedLog.java | 27 ++- .../org/apache/kafka/connect/util/RetryUtil.java | 101 +++ .../org/apache/kafka/connect/util/TopicAdmin.java | 29 +++- .../kafka/connect/util/KafkaBasedLogTest.java | 14 +- .../apache/kafka/connect/util/RetryUtilTest.java | 184 + .../apache/kafka/connect/util/TopicAdminTest.java | 59 ++- 8 files changed, 402 insertions(+), 55 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java index e7f2c07..c05e5cf 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java @@ -82,7 +82,6 @@ public final class MetadataOperationContext> { public static void handleMetadataErrors(MetadataResponse response) { for (TopicMetadata tm : response.topicMetadata()) { -if (shouldRefreshMetadata(tm.error())) throw tm.error().exception(); for (PartitionMetadata pm : tm.partitionMetadata()) { if (shouldRefreshMetadata(pm.error)) { throw pm.error.exception(); diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 6eb972b..b485432 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -445,16 +445,12 @@ public class KafkaAdminClientTest { } private static MetadataResponse prepareMetadataResponse(Cluster cluster, Errors error) { -return prepareMetadataResponse(cluster, error, error); -} - -private static MetadataResponse prepareMetadataResponse(Cluster cluster, Errors topicError, Errors partitionError) { List metadata = new ArrayList<>(); for (String topic : cluster.topics()) { List pms = new ArrayList<>(); for (PartitionInfo pInfo : cluster.availablePartitionsForTopic(topic)) { MetadataResponsePartition pm = new MetadataResponsePartition() -.setErrorCode(partitionError.code()) +.setErrorCode(error.code()) .setPartitionIndex(pInfo.partition()) .setLeaderId(pInfo.leader().id()) .setLeaderEpoch(234) @@ -464,7 +460,7 @@ public class KafkaAdminClientTest { pms.add(pm); } MetadataResponseTopic tm = new MetadataResponseTopic() -.setErrorCode(topicError.code()) +.setErrorCode(error.code()) .setName(topic) .setIsInternal(false) .setPartitions(pms); @@ -3913,40 +3909,6 @@ public class KafkaAdminClientTest { } @Test -public void testListOffsetsRetriableErrorOnMetadata() throws Exception { -Node node = new Node(0, "localhost", 8120); -List nodes = Collections.singletonList(node); -final Cluster cluster = new Cluster( -"mockClusterId", -nodes, -Collections.singleton(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node})), -Collections.emptySet(), -
[kafka] 02/03: KAFKA-12879: Addendum to reduce flakiness of tests (#11871)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 2.8 in repository https://gitbox.apache.org/repos/asf/kafka.git commit 0b53da0bd2400d2956cf0f5445a2b077e29b6705 Author: Philip Nee AuthorDate: Wed Mar 9 12:37:48 2022 -0800 KAFKA-12879: Addendum to reduce flakiness of tests (#11871) This is an addendum to the KAFKA-12879 (#11797) to fix some tests that are somewhat flaky when a build machine is heavily loaded (when the timeouts are too small). - Add an if check to void sleep(0) - Increase timeout in the tests --- .../main/java/org/apache/kafka/connect/util/RetryUtil.java| 11 +++ .../java/org/apache/kafka/connect/util/RetryUtilTest.java | 10 ++ 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java index 9463f6a..64fd97c 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java @@ -88,10 +88,13 @@ public class RetryUtil { lastError = e; } -long millisRemaining = Math.max(0, end - System.currentTimeMillis()); -if (millisRemaining < retryBackoffMs) { -// exit when the time remaining is less than retryBackoffMs -break; +if (retryBackoffMs > 0) { +long millisRemaining = Math.max(0, end - System.currentTimeMillis()); +if (millisRemaining < retryBackoffMs) { +// exit when the time remaining is less than retryBackoffMs +break; +} +Utils.sleep(retryBackoffMs); } Utils.sleep(retryBackoffMs); } while (System.currentTimeMillis() < end); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/RetryUtilTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/RetryUtilTest.java index 05f0212..58c101b 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/RetryUtilTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/RetryUtilTest.java @@ -36,6 +36,8 @@ import java.util.function.Supplier; @RunWith(PowerMockRunner.class) public class RetryUtilTest { +private static final Duration TIMEOUT = Duration.ofSeconds(10); + private Callable mockCallable; private final Supplier testMsg = () -> "Test"; @@ -69,7 +71,7 @@ public class RetryUtilTest { .thenThrow(new TimeoutException()) .thenReturn("success"); -assertEquals("success", RetryUtil.retryUntilTimeout(mockCallable, testMsg, Duration.ofMillis(100), 1)); +assertEquals("success", RetryUtil.retryUntilTimeout(mockCallable, testMsg, TIMEOUT, 1)); Mockito.verify(mockCallable, Mockito.times(4)).call(); } @@ -83,7 +85,7 @@ public class RetryUtilTest { .thenThrow(new TimeoutException("timeout")) .thenThrow(new NullPointerException("Non retriable")); NullPointerException e = assertThrows(NullPointerException.class, -() -> RetryUtil.retryUntilTimeout(mockCallable, testMsg, Duration.ofMillis(100), 0)); +() -> RetryUtil.retryUntilTimeout(mockCallable, testMsg, TIMEOUT, 0)); assertEquals("Non retriable", e.getMessage()); Mockito.verify(mockCallable, Mockito.times(6)).call(); } @@ -114,7 +116,7 @@ public class RetryUtilTest { .thenThrow(new TimeoutException()) .thenReturn("success"); -assertEquals("success", RetryUtil.retryUntilTimeout(mockCallable, testMsg, Duration.ofMillis(50), 0)); +assertEquals("success", RetryUtil.retryUntilTimeout(mockCallable, testMsg, TIMEOUT, 0)); Mockito.verify(mockCallable, Mockito.times(4)).call(); } @@ -151,7 +153,7 @@ public class RetryUtilTest { Mockito.when(mockCallable.call()) .thenThrow(new TimeoutException("timeout")) .thenReturn("success"); -assertEquals("success", RetryUtil.retryUntilTimeout(mockCallable, testMsg, Duration.ofMillis(100), -1)); +assertEquals("success", RetryUtil.retryUntilTimeout(mockCallable, testMsg, TIMEOUT, -1)); Mockito.verify(mockCallable, Mockito.times(2)).call(); }
[kafka] branch 2.8 updated (7fde1ef -> d652583)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a change to branch 2.8 in repository https://gitbox.apache.org/repos/asf/kafka.git. from 7fde1ef KAFKA-13636: Fix for the group coordinator issue where the offsets are deleted for unstable groups (#11742) new 346cc48 KAFKA-12879: Revert changes from KAFKA-12339 and instead add retry capability to KafkaBasedLog (#11797) new 0b53da0 KAFKA-12879: Addendum to reduce flakiness of tests (#11871) new d652583 KAFKA-12879: Remove extra sleep (#11872) The 3 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../admin/internals/MetadataOperationContext.java | 1 - .../kafka/clients/admin/KafkaAdminClientTest.java | 42 + .../apache/kafka/connect/util/KafkaBasedLog.java | 27 ++- .../org/apache/kafka/connect/util/RetryUtil.java | 103 .../org/apache/kafka/connect/util/TopicAdmin.java | 29 +++- .../kafka/connect/util/KafkaBasedLogTest.java | 14 +- .../apache/kafka/connect/util/RetryUtilTest.java | 186 + .../apache/kafka/connect/util/TopicAdminTest.java | 59 ++- 8 files changed, 406 insertions(+), 55 deletions(-) create mode 100644 connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java create mode 100644 connect/runtime/src/test/java/org/apache/kafka/connect/util/RetryUtilTest.java
[kafka] 03/03: KAFKA-12879: Remove extra sleep (#11872)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 3.0 in repository https://gitbox.apache.org/repos/asf/kafka.git commit 5265e1a607899bb562194eb1b0b6ff0b375d1626 Author: Randall Hauch AuthorDate: Wed Mar 9 15:11:46 2022 -0600 KAFKA-12879: Remove extra sleep (#11872) --- .../runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java | 1 - 1 file changed, 1 deletion(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java index 64fd97c..84b9af1 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java @@ -96,7 +96,6 @@ public class RetryUtil { } Utils.sleep(retryBackoffMs); } -Utils.sleep(retryBackoffMs); } while (System.currentTimeMillis() < end); throw new ConnectException("Fail to " + descriptionStr + " after " + attempt + " attempts. Reason: " + lastError.getMessage(), lastError);
[kafka] 01/03: KAFKA-12879: Revert changes from KAFKA-12339 and instead add retry capability to KafkaBasedLog (#11797)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 3.0 in repository https://gitbox.apache.org/repos/asf/kafka.git commit 66c2c43f6153f2529fde3ef82847ff0dda375024 Author: Philip Nee AuthorDate: Wed Mar 9 10:39:28 2022 -0800 KAFKA-12879: Revert changes from KAFKA-12339 and instead add retry capability to KafkaBasedLog (#11797) Fixes the compatibility issue regarding KAFKA-12879 by reverting the changes to the admin client from KAFKA-12339 (#10152) that retry admin client operations, and instead perform the retries within Connect's `KafkaBasedLog` during startup via a new `TopicAdmin.retryEndOffsets(..)` method. This method delegates to the existing `TopicAdmin.endOffsets(...)` method, but will retry on `RetriableException` until the retry timeout elapses. This change should be backward compatible to the KAFKA-12339 so that when Connect's `KafkaBasedLog` starts up it will retry attempts to read the end offsets for the log's topic. The `KafkaBasedLog` existing thread already has its own retry logic, and this is not changed. Added more unit tests, and thoroughly tested the new `RetryUtil` used to encapsulate the parameterized retry logic around any supplied function. --- .../admin/internals/MetadataOperationContext.java | 1 - .../kafka/clients/admin/KafkaAdminClientTest.java | 42 + .../apache/kafka/connect/util/KafkaBasedLog.java | 27 ++- .../org/apache/kafka/connect/util/RetryUtil.java | 101 +++ .../org/apache/kafka/connect/util/TopicAdmin.java | 29 +++- .../kafka/connect/util/KafkaBasedLogTest.java | 14 +- .../apache/kafka/connect/util/RetryUtilTest.java | 184 + .../apache/kafka/connect/util/TopicAdminTest.java | 59 ++- 8 files changed, 402 insertions(+), 55 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java index e7f2c07..c05e5cf 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java @@ -82,7 +82,6 @@ public final class MetadataOperationContext> { public static void handleMetadataErrors(MetadataResponse response) { for (TopicMetadata tm : response.topicMetadata()) { -if (shouldRefreshMetadata(tm.error())) throw tm.error().exception(); for (PartitionMetadata pm : tm.partitionMetadata()) { if (shouldRefreshMetadata(pm.error)) { throw pm.error.exception(); diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index f1ace85..5fb553f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -489,16 +489,12 @@ public class KafkaAdminClientTest { } private static MetadataResponse prepareMetadataResponse(Cluster cluster, Errors error) { -return prepareMetadataResponse(cluster, error, error); -} - -private static MetadataResponse prepareMetadataResponse(Cluster cluster, Errors topicError, Errors partitionError) { List metadata = new ArrayList<>(); for (String topic : cluster.topics()) { List pms = new ArrayList<>(); for (PartitionInfo pInfo : cluster.availablePartitionsForTopic(topic)) { MetadataResponsePartition pm = new MetadataResponsePartition() -.setErrorCode(partitionError.code()) +.setErrorCode(error.code()) .setPartitionIndex(pInfo.partition()) .setLeaderId(pInfo.leader().id()) .setLeaderEpoch(234) @@ -508,7 +504,7 @@ public class KafkaAdminClientTest { pms.add(pm); } MetadataResponseTopic tm = new MetadataResponseTopic() -.setErrorCode(topicError.code()) +.setErrorCode(error.code()) .setName(topic) .setIsInternal(false) .setPartitions(pms); @@ -4337,40 +4333,6 @@ public class KafkaAdminClientTest { } @Test -public void testListOffsetsRetriableErrorOnMetadata() throws Exception { -Node node = new Node(0, "localhost", 8120); -List nodes = Collections.singletonList(node); -final Cluster cluster = new Cluster( -"mockClusterId", -nodes, -Collections.singleton(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node})), -Collections.emptySet(), -
[kafka] 02/03: KAFKA-12879: Addendum to reduce flakiness of tests (#11871)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 3.0 in repository https://gitbox.apache.org/repos/asf/kafka.git commit 2153e4b7dc09b06a956d47795bcf7ef46c176fab Author: Philip Nee AuthorDate: Wed Mar 9 12:37:48 2022 -0800 KAFKA-12879: Addendum to reduce flakiness of tests (#11871) This is an addendum to the KAFKA-12879 (#11797) to fix some tests that are somewhat flaky when a build machine is heavily loaded (when the timeouts are too small). - Add an if check to void sleep(0) - Increase timeout in the tests --- .../main/java/org/apache/kafka/connect/util/RetryUtil.java| 11 +++ .../java/org/apache/kafka/connect/util/RetryUtilTest.java | 10 ++ 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java index 9463f6a..64fd97c 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java @@ -88,10 +88,13 @@ public class RetryUtil { lastError = e; } -long millisRemaining = Math.max(0, end - System.currentTimeMillis()); -if (millisRemaining < retryBackoffMs) { -// exit when the time remaining is less than retryBackoffMs -break; +if (retryBackoffMs > 0) { +long millisRemaining = Math.max(0, end - System.currentTimeMillis()); +if (millisRemaining < retryBackoffMs) { +// exit when the time remaining is less than retryBackoffMs +break; +} +Utils.sleep(retryBackoffMs); } Utils.sleep(retryBackoffMs); } while (System.currentTimeMillis() < end); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/RetryUtilTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/RetryUtilTest.java index 05f0212..58c101b 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/RetryUtilTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/RetryUtilTest.java @@ -36,6 +36,8 @@ import java.util.function.Supplier; @RunWith(PowerMockRunner.class) public class RetryUtilTest { +private static final Duration TIMEOUT = Duration.ofSeconds(10); + private Callable mockCallable; private final Supplier testMsg = () -> "Test"; @@ -69,7 +71,7 @@ public class RetryUtilTest { .thenThrow(new TimeoutException()) .thenReturn("success"); -assertEquals("success", RetryUtil.retryUntilTimeout(mockCallable, testMsg, Duration.ofMillis(100), 1)); +assertEquals("success", RetryUtil.retryUntilTimeout(mockCallable, testMsg, TIMEOUT, 1)); Mockito.verify(mockCallable, Mockito.times(4)).call(); } @@ -83,7 +85,7 @@ public class RetryUtilTest { .thenThrow(new TimeoutException("timeout")) .thenThrow(new NullPointerException("Non retriable")); NullPointerException e = assertThrows(NullPointerException.class, -() -> RetryUtil.retryUntilTimeout(mockCallable, testMsg, Duration.ofMillis(100), 0)); +() -> RetryUtil.retryUntilTimeout(mockCallable, testMsg, TIMEOUT, 0)); assertEquals("Non retriable", e.getMessage()); Mockito.verify(mockCallable, Mockito.times(6)).call(); } @@ -114,7 +116,7 @@ public class RetryUtilTest { .thenThrow(new TimeoutException()) .thenReturn("success"); -assertEquals("success", RetryUtil.retryUntilTimeout(mockCallable, testMsg, Duration.ofMillis(50), 0)); +assertEquals("success", RetryUtil.retryUntilTimeout(mockCallable, testMsg, TIMEOUT, 0)); Mockito.verify(mockCallable, Mockito.times(4)).call(); } @@ -151,7 +153,7 @@ public class RetryUtilTest { Mockito.when(mockCallable.call()) .thenThrow(new TimeoutException("timeout")) .thenReturn("success"); -assertEquals("success", RetryUtil.retryUntilTimeout(mockCallable, testMsg, Duration.ofMillis(100), -1)); +assertEquals("success", RetryUtil.retryUntilTimeout(mockCallable, testMsg, TIMEOUT, -1)); Mockito.verify(mockCallable, Mockito.times(2)).call(); }
[kafka] branch 3.0 updated (f5feb3c -> 5265e1a)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a change to branch 3.0 in repository https://gitbox.apache.org/repos/asf/kafka.git. from f5feb3c MINOR: Fix flaky test cases SocketServerTest.remoteCloseWithoutBufferedReceives and SocketServerTest.remoteCloseWithIncompleteBufferedReceive (#11861) new 66c2c43 KAFKA-12879: Revert changes from KAFKA-12339 and instead add retry capability to KafkaBasedLog (#11797) new 2153e4b KAFKA-12879: Addendum to reduce flakiness of tests (#11871) new 5265e1a KAFKA-12879: Remove extra sleep (#11872) The 3 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../admin/internals/MetadataOperationContext.java | 1 - .../kafka/clients/admin/KafkaAdminClientTest.java | 42 + .../apache/kafka/connect/util/KafkaBasedLog.java | 27 ++- .../org/apache/kafka/connect/util/RetryUtil.java | 103 .../org/apache/kafka/connect/util/TopicAdmin.java | 29 +++- .../kafka/connect/util/KafkaBasedLogTest.java | 14 +- .../apache/kafka/connect/util/RetryUtilTest.java | 186 + .../apache/kafka/connect/util/TopicAdminTest.java | 59 ++- 8 files changed, 406 insertions(+), 55 deletions(-) create mode 100644 connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java create mode 100644 connect/runtime/src/test/java/org/apache/kafka/connect/util/RetryUtilTest.java
[kafka] 03/03: KAFKA-12879: Remove extra sleep (#11872)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 3.1 in repository https://gitbox.apache.org/repos/asf/kafka.git commit c4f84c842961f076ec7962a3f89b50a8ee8cbf61 Author: Randall Hauch AuthorDate: Wed Mar 9 15:11:46 2022 -0600 KAFKA-12879: Remove extra sleep (#11872) --- .../runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java | 1 - 1 file changed, 1 deletion(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java index 64fd97c..84b9af1 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java @@ -96,7 +96,6 @@ public class RetryUtil { } Utils.sleep(retryBackoffMs); } -Utils.sleep(retryBackoffMs); } while (System.currentTimeMillis() < end); throw new ConnectException("Fail to " + descriptionStr + " after " + attempt + " attempts. Reason: " + lastError.getMessage(), lastError);
[kafka] 02/03: KAFKA-12879: Addendum to reduce flakiness of tests (#11871)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 3.1 in repository https://gitbox.apache.org/repos/asf/kafka.git commit 020902d1649d2575ed154d675c52a23fc21d3df9 Author: Philip Nee AuthorDate: Wed Mar 9 12:37:48 2022 -0800 KAFKA-12879: Addendum to reduce flakiness of tests (#11871) This is an addendum to the KAFKA-12879 (#11797) to fix some tests that are somewhat flaky when a build machine is heavily loaded (when the timeouts are too small). - Add an if check to void sleep(0) - Increase timeout in the tests --- .../main/java/org/apache/kafka/connect/util/RetryUtil.java| 11 +++ .../java/org/apache/kafka/connect/util/RetryUtilTest.java | 10 ++ 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java index 9463f6a..64fd97c 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java @@ -88,10 +88,13 @@ public class RetryUtil { lastError = e; } -long millisRemaining = Math.max(0, end - System.currentTimeMillis()); -if (millisRemaining < retryBackoffMs) { -// exit when the time remaining is less than retryBackoffMs -break; +if (retryBackoffMs > 0) { +long millisRemaining = Math.max(0, end - System.currentTimeMillis()); +if (millisRemaining < retryBackoffMs) { +// exit when the time remaining is less than retryBackoffMs +break; +} +Utils.sleep(retryBackoffMs); } Utils.sleep(retryBackoffMs); } while (System.currentTimeMillis() < end); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/RetryUtilTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/RetryUtilTest.java index 05f0212..58c101b 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/RetryUtilTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/RetryUtilTest.java @@ -36,6 +36,8 @@ import java.util.function.Supplier; @RunWith(PowerMockRunner.class) public class RetryUtilTest { +private static final Duration TIMEOUT = Duration.ofSeconds(10); + private Callable mockCallable; private final Supplier testMsg = () -> "Test"; @@ -69,7 +71,7 @@ public class RetryUtilTest { .thenThrow(new TimeoutException()) .thenReturn("success"); -assertEquals("success", RetryUtil.retryUntilTimeout(mockCallable, testMsg, Duration.ofMillis(100), 1)); +assertEquals("success", RetryUtil.retryUntilTimeout(mockCallable, testMsg, TIMEOUT, 1)); Mockito.verify(mockCallable, Mockito.times(4)).call(); } @@ -83,7 +85,7 @@ public class RetryUtilTest { .thenThrow(new TimeoutException("timeout")) .thenThrow(new NullPointerException("Non retriable")); NullPointerException e = assertThrows(NullPointerException.class, -() -> RetryUtil.retryUntilTimeout(mockCallable, testMsg, Duration.ofMillis(100), 0)); +() -> RetryUtil.retryUntilTimeout(mockCallable, testMsg, TIMEOUT, 0)); assertEquals("Non retriable", e.getMessage()); Mockito.verify(mockCallable, Mockito.times(6)).call(); } @@ -114,7 +116,7 @@ public class RetryUtilTest { .thenThrow(new TimeoutException()) .thenReturn("success"); -assertEquals("success", RetryUtil.retryUntilTimeout(mockCallable, testMsg, Duration.ofMillis(50), 0)); +assertEquals("success", RetryUtil.retryUntilTimeout(mockCallable, testMsg, TIMEOUT, 0)); Mockito.verify(mockCallable, Mockito.times(4)).call(); } @@ -151,7 +153,7 @@ public class RetryUtilTest { Mockito.when(mockCallable.call()) .thenThrow(new TimeoutException("timeout")) .thenReturn("success"); -assertEquals("success", RetryUtil.retryUntilTimeout(mockCallable, testMsg, Duration.ofMillis(100), -1)); +assertEquals("success", RetryUtil.retryUntilTimeout(mockCallable, testMsg, TIMEOUT, -1)); Mockito.verify(mockCallable, Mockito.times(2)).call(); }
[kafka] 01/03: KAFKA-12879: Revert changes from KAFKA-12339 and instead add retry capability to KafkaBasedLog (#11797)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 3.1 in repository https://gitbox.apache.org/repos/asf/kafka.git commit d95647bcc890c4f7a7572102f84d1759ef53281e Author: Philip Nee AuthorDate: Wed Mar 9 10:39:28 2022 -0800 KAFKA-12879: Revert changes from KAFKA-12339 and instead add retry capability to KafkaBasedLog (#11797) Fixes the compatibility issue regarding KAFKA-12879 by reverting the changes to the admin client from KAFKA-12339 (#10152) that retry admin client operations, and instead perform the retries within Connect's `KafkaBasedLog` during startup via a new `TopicAdmin.retryEndOffsets(..)` method. This method delegates to the existing `TopicAdmin.endOffsets(...)` method, but will retry on `RetriableException` until the retry timeout elapses. This change should be backward compatible to the KAFKA-12339 so that when Connect's `KafkaBasedLog` starts up it will retry attempts to read the end offsets for the log's topic. The `KafkaBasedLog` existing thread already has its own retry logic, and this is not changed. Added more unit tests, and thoroughly tested the new `RetryUtil` used to encapsulate the parameterized retry logic around any supplied function. --- .../admin/internals/MetadataOperationContext.java | 1 - .../kafka/clients/admin/KafkaAdminClientTest.java | 42 + .../apache/kafka/connect/util/KafkaBasedLog.java | 27 ++- .../org/apache/kafka/connect/util/RetryUtil.java | 101 +++ .../org/apache/kafka/connect/util/TopicAdmin.java | 29 +++- .../kafka/connect/util/KafkaBasedLogTest.java | 14 +- .../apache/kafka/connect/util/RetryUtilTest.java | 184 + .../apache/kafka/connect/util/TopicAdminTest.java | 59 ++- 8 files changed, 402 insertions(+), 55 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java index e7f2c07..c05e5cf 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java @@ -82,7 +82,6 @@ public final class MetadataOperationContext> { public static void handleMetadataErrors(MetadataResponse response) { for (TopicMetadata tm : response.topicMetadata()) { -if (shouldRefreshMetadata(tm.error())) throw tm.error().exception(); for (PartitionMetadata pm : tm.partitionMetadata()) { if (shouldRefreshMetadata(pm.error)) { throw pm.error.exception(); diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index b648b2d..f6a7352 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -489,16 +489,12 @@ public class KafkaAdminClientTest { } private static MetadataResponse prepareMetadataResponse(Cluster cluster, Errors error) { -return prepareMetadataResponse(cluster, error, error); -} - -private static MetadataResponse prepareMetadataResponse(Cluster cluster, Errors topicError, Errors partitionError) { List metadata = new ArrayList<>(); for (String topic : cluster.topics()) { List pms = new ArrayList<>(); for (PartitionInfo pInfo : cluster.availablePartitionsForTopic(topic)) { MetadataResponsePartition pm = new MetadataResponsePartition() -.setErrorCode(partitionError.code()) +.setErrorCode(error.code()) .setPartitionIndex(pInfo.partition()) .setLeaderId(pInfo.leader().id()) .setLeaderEpoch(234) @@ -508,7 +504,7 @@ public class KafkaAdminClientTest { pms.add(pm); } MetadataResponseTopic tm = new MetadataResponseTopic() -.setErrorCode(topicError.code()) +.setErrorCode(error.code()) .setName(topic) .setIsInternal(false) .setPartitions(pms); @@ -4340,40 +4336,6 @@ public class KafkaAdminClientTest { } @Test -public void testListOffsetsRetriableErrorOnMetadata() throws Exception { -Node node = new Node(0, "localhost", 8120); -List nodes = Collections.singletonList(node); -final Cluster cluster = new Cluster( -"mockClusterId", -nodes, -Collections.singleton(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node})), -Collections.emptySet(), -
[kafka] branch 3.1 updated (7802b45 -> c4f84c8)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a change to branch 3.1 in repository https://gitbox.apache.org/repos/asf/kafka.git. from 7802b45 MINOR: Fix flaky test cases SocketServerTest.remoteCloseWithoutBufferedReceives and SocketServerTest.remoteCloseWithIncompleteBufferedReceive (#11861) new d95647b KAFKA-12879: Revert changes from KAFKA-12339 and instead add retry capability to KafkaBasedLog (#11797) new 020902d KAFKA-12879: Addendum to reduce flakiness of tests (#11871) new c4f84c8 KAFKA-12879: Remove extra sleep (#11872) The 3 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../admin/internals/MetadataOperationContext.java | 1 - .../kafka/clients/admin/KafkaAdminClientTest.java | 42 + .../apache/kafka/connect/util/KafkaBasedLog.java | 27 ++- .../org/apache/kafka/connect/util/RetryUtil.java | 103 .../org/apache/kafka/connect/util/TopicAdmin.java | 29 +++- .../kafka/connect/util/KafkaBasedLogTest.java | 14 +- .../apache/kafka/connect/util/RetryUtilTest.java | 186 + .../apache/kafka/connect/util/TopicAdminTest.java | 59 ++- 8 files changed, 406 insertions(+), 55 deletions(-) create mode 100644 connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java create mode 100644 connect/runtime/src/test/java/org/apache/kafka/connect/util/RetryUtilTest.java
[kafka] branch trunk updated (ddcee81 -> d2d49f6)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from ddcee81 KAFKA-12879: Addendum to reduce flakiness of tests (#11871) add d2d49f6 KAFKA-12879: Remove extra sleep (#11872) No new revisions were added by this update. Summary of changes: .../runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java | 1 - 1 file changed, 1 deletion(-)
[kafka] branch trunk updated (28393be -> ddcee81)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from 28393be KAFKA-12879: Revert changes from KAFKA-12339 and instead add retry capability to KafkaBasedLog (#11797) add ddcee81 KAFKA-12879: Addendum to reduce flakiness of tests (#11871) No new revisions were added by this update. Summary of changes: .../main/java/org/apache/kafka/connect/util/RetryUtil.java| 11 +++ .../java/org/apache/kafka/connect/util/RetryUtilTest.java | 10 ++ 2 files changed, 13 insertions(+), 8 deletions(-)
[kafka] branch trunk updated (2367c89 -> 28393be)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from 2367c89 KAFKA-13630: Reduce amount of time that producer network thread holds batch queue lock (#11722) add 28393be KAFKA-12879: Revert changes from KAFKA-12339 and instead add retry capability to KafkaBasedLog (#11797) No new revisions were added by this update. Summary of changes: .../admin/internals/MetadataOperationContext.java | 1 - .../kafka/clients/admin/KafkaAdminClientTest.java | 42 + .../apache/kafka/connect/util/KafkaBasedLog.java | 27 ++- .../org/apache/kafka/connect/util/RetryUtil.java | 101 +++ .../org/apache/kafka/connect/util/TopicAdmin.java | 29 +++- .../kafka/connect/util/KafkaBasedLogTest.java | 14 +- .../apache/kafka/connect/util/RetryUtilTest.java | 184 + .../apache/kafka/connect/util/TopicAdminTest.java | 59 ++- 8 files changed, 402 insertions(+), 55 deletions(-) create mode 100644 connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java create mode 100644 connect/runtime/src/test/java/org/apache/kafka/connect/util/RetryUtilTest.java
[kafka] branch 3.0 updated: MINOR: Clarify logging behavior with errors.log.include.messages property (#11758)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 3.0 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.0 by this push: new 10e86d4 MINOR: Clarify logging behavior with errors.log.include.messages property (#11758) 10e86d4 is described below commit 10e86d493b0c55366052c24c2574c0619784b0a0 Author: Chris Egerton AuthorDate: Mon Feb 21 08:55:04 2022 -0500 MINOR: Clarify logging behavior with errors.log.include.messages property (#11758) The docs are a little misleading and some users can be confused about the exact behavior of this property. --- .../java/org/apache/kafka/connect/runtime/ConnectorConfig.java| 8 +--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java index 4ba1ddd..2f83284 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java @@ -150,9 +150,11 @@ public class ConnectorConfig extends AbstractConfig { public static final String ERRORS_LOG_INCLUDE_MESSAGES_CONFIG = "errors.log.include.messages"; public static final String ERRORS_LOG_INCLUDE_MESSAGES_DISPLAY = "Log Error Details"; public static final boolean ERRORS_LOG_INCLUDE_MESSAGES_DEFAULT = false; -public static final String ERRORS_LOG_INCLUDE_MESSAGES_DOC = "Whether to the include in the log the Connect record that resulted in " + -"a failure. This is 'false' by default, which will prevent record keys, values, and headers from being written to log files, " + -"although some information such as topic and partition number will still be logged."; +public static final String ERRORS_LOG_INCLUDE_MESSAGES_DOC = "Whether to include in the log the Connect record that resulted in a failure." + +"For sink records, the topic, partition, offset, and timestamp will be logged. " + +"For source records, the key and value (and their schemas), all headers, and the timestamp, Kafka topic, Kafka partition, source partition, " + +"and source offset will be logged. " + +"This is 'false' by default, which will prevent record keys, values, and headers from being written to log files."; public static final String CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX = "producer.override.";
[kafka] branch 3.1 updated: MINOR: Clarify logging behavior with errors.log.include.messages property (#11758)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 3.1 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.1 by this push: new 26f1c5f MINOR: Clarify logging behavior with errors.log.include.messages property (#11758) 26f1c5f is described below commit 26f1c5f91e6d479889dfe5a897ae2de96c576ec0 Author: Chris Egerton AuthorDate: Mon Feb 21 08:55:04 2022 -0500 MINOR: Clarify logging behavior with errors.log.include.messages property (#11758) The docs are a little misleading and some users can be confused about the exact behavior of this property. --- .../java/org/apache/kafka/connect/runtime/ConnectorConfig.java| 8 +--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java index 4ba1ddd..2f83284 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java @@ -150,9 +150,11 @@ public class ConnectorConfig extends AbstractConfig { public static final String ERRORS_LOG_INCLUDE_MESSAGES_CONFIG = "errors.log.include.messages"; public static final String ERRORS_LOG_INCLUDE_MESSAGES_DISPLAY = "Log Error Details"; public static final boolean ERRORS_LOG_INCLUDE_MESSAGES_DEFAULT = false; -public static final String ERRORS_LOG_INCLUDE_MESSAGES_DOC = "Whether to the include in the log the Connect record that resulted in " + -"a failure. This is 'false' by default, which will prevent record keys, values, and headers from being written to log files, " + -"although some information such as topic and partition number will still be logged."; +public static final String ERRORS_LOG_INCLUDE_MESSAGES_DOC = "Whether to include in the log the Connect record that resulted in a failure." + +"For sink records, the topic, partition, offset, and timestamp will be logged. " + +"For source records, the key and value (and their schemas), all headers, and the timestamp, Kafka topic, Kafka partition, source partition, " + +"and source offset will be logged. " + +"This is 'false' by default, which will prevent record keys, values, and headers from being written to log files."; public static final String CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX = "producer.override.";
[kafka] branch trunk updated (496aa1f -> 4d036ee)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from 496aa1f MINOR: Provide valid examples in README page. (#10259) add 4d036ee MINOR: Clarify logging behavior with errors.log.include.messages property (#11758) No new revisions were added by this update. Summary of changes: .../java/org/apache/kafka/connect/runtime/ConnectorConfig.java| 8 +--- 1 file changed, 5 insertions(+), 3 deletions(-)
[kafka] branch 3.0 updated: KAFKA-13469: Block for in-flight record delivery before end-of-life source task offset commit (#11524)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 3.0 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.0 by this push: new 87b3052 KAFKA-13469: Block for in-flight record delivery before end-of-life source task offset commit (#11524) 87b3052 is described below commit 87b3052c94696369156ebf6f521ea96c274a89cd Author: Chris Egerton AuthorDate: Tue Nov 30 11:35:50 2021 -0500 KAFKA-13469: Block for in-flight record delivery before end-of-life source task offset commit (#11524) Although committing source task offsets without blocking on the delivery of all in-flight records is beneficial most of the time, it can lead to duplicate record delivery if there are in-flight records at the time of the task's end-of-life offset commit. A best-effort attempt is made here to wait for any such in-flight records to be delivered before proceeding with the end-of-life offset commit for source tasks. Connect will block for up to offset.flush.timeout.ms milliseconds before calculating the latest committable offsets for the task and flushing those to the persistent offset store. Author: Chris Egerton Reviewer: Randall Hauch --- .../kafka/connect/runtime/SubmittedRecords.java| 59 -- .../kafka/connect/runtime/WorkerSourceTask.java| 6 +- .../connect/runtime/SubmittedRecordsTest.java | 94 +- 3 files changed, 149 insertions(+), 10 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java index 472a266..6cdd2c1 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java @@ -26,6 +26,9 @@ import java.util.Deque; import java.util.HashMap; import java.util.LinkedList; import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; /** * Used to track source records that have been (or are about to be) dispatched to a producer and their accompanying @@ -41,10 +44,11 @@ class SubmittedRecords { private static final Logger log = LoggerFactory.getLogger(SubmittedRecords.class); // Visible for testing -final Map, Deque> records; +final Map, Deque> records = new HashMap<>(); +private int numUnackedMessages = 0; +private CountDownLatch messageDrainLatch; public SubmittedRecords() { -this.records = new HashMap<>(); } /** @@ -68,6 +72,9 @@ class SubmittedRecords { SubmittedRecord result = new SubmittedRecord(partition, offset); records.computeIfAbsent(result.partition(), p -> new LinkedList<>()) .add(result); +synchronized (this) { +numUnackedMessages++; +} return result; } @@ -89,7 +96,9 @@ class SubmittedRecords { if (deque.isEmpty()) { records.remove(record.partition()); } -if (!result) { +if (result) { +messageAcked(); +} else { log.warn("Attempted to remove record from submitted queue for partition {}, but the record has not been submitted or has already been removed", record.partition()); } return result; @@ -132,6 +141,28 @@ class SubmittedRecords { return new CommittableOffsets(offsets, totalCommittableMessages, totalUncommittableMessages, records.size(), largestDequeSize, largestDequePartition); } +/** + * Wait for all currently in-flight messages to be acknowledged, up to the requested timeout. + * This method is expected to be called from the same thread that calls {@link #committableOffsets()}. + * @param timeout the maximum time to wait + * @param timeUnit the time unit of the timeout argument + * @return whether all in-flight messages were acknowledged before the timeout elapsed + */ +public boolean awaitAllMessages(long timeout, TimeUnit timeUnit) { +// Create a new message drain latch as a local variable to avoid SpotBugs warnings about inconsistent synchronization +// on an instance variable when invoking CountDownLatch::await outside a synchronized block +CountDownLatch messageDrainLatch; +synchronized (this) { +messageDrainLatch = new CountDownLatch(numUnackedMessages); +this.messageDrainLatch = messageDrainLatch; +} +try { +return messageDrainLatch.await(timeout, timeUnit); +} catch (InterruptedException e) { +return false; +} +} + // Note that this will return null if either there are no com
[kafka] branch 3.1 updated: KAFKA-13469: Block for in-flight record delivery before end-of-life source task offset commit (#11524)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 3.1 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.1 by this push: new a7e0eb0 KAFKA-13469: Block for in-flight record delivery before end-of-life source task offset commit (#11524) a7e0eb0 is described below commit a7e0eb06734ade0d5932e496d352e208a41d3664 Author: Chris Egerton AuthorDate: Tue Nov 30 11:35:50 2021 -0500 KAFKA-13469: Block for in-flight record delivery before end-of-life source task offset commit (#11524) Although committing source task offsets without blocking on the delivery of all in-flight records is beneficial most of the time, it can lead to duplicate record delivery if there are in-flight records at the time of the task's end-of-life offset commit. A best-effort attempt is made here to wait for any such in-flight records to be delivered before proceeding with the end-of-life offset commit for source tasks. Connect will block for up to offset.flush.timeout.ms milliseconds before calculating the latest committable offsets for the task and flushing those to the persistent offset store. Author: Chris Egerton Reviewer: Randall Hauch --- .../kafka/connect/runtime/SubmittedRecords.java| 59 -- .../kafka/connect/runtime/WorkerSourceTask.java| 6 +- .../connect/runtime/SubmittedRecordsTest.java | 94 +- 3 files changed, 149 insertions(+), 10 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java index 472a266..6cdd2c1 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java @@ -26,6 +26,9 @@ import java.util.Deque; import java.util.HashMap; import java.util.LinkedList; import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; /** * Used to track source records that have been (or are about to be) dispatched to a producer and their accompanying @@ -41,10 +44,11 @@ class SubmittedRecords { private static final Logger log = LoggerFactory.getLogger(SubmittedRecords.class); // Visible for testing -final Map, Deque> records; +final Map, Deque> records = new HashMap<>(); +private int numUnackedMessages = 0; +private CountDownLatch messageDrainLatch; public SubmittedRecords() { -this.records = new HashMap<>(); } /** @@ -68,6 +72,9 @@ class SubmittedRecords { SubmittedRecord result = new SubmittedRecord(partition, offset); records.computeIfAbsent(result.partition(), p -> new LinkedList<>()) .add(result); +synchronized (this) { +numUnackedMessages++; +} return result; } @@ -89,7 +96,9 @@ class SubmittedRecords { if (deque.isEmpty()) { records.remove(record.partition()); } -if (!result) { +if (result) { +messageAcked(); +} else { log.warn("Attempted to remove record from submitted queue for partition {}, but the record has not been submitted or has already been removed", record.partition()); } return result; @@ -132,6 +141,28 @@ class SubmittedRecords { return new CommittableOffsets(offsets, totalCommittableMessages, totalUncommittableMessages, records.size(), largestDequeSize, largestDequePartition); } +/** + * Wait for all currently in-flight messages to be acknowledged, up to the requested timeout. + * This method is expected to be called from the same thread that calls {@link #committableOffsets()}. + * @param timeout the maximum time to wait + * @param timeUnit the time unit of the timeout argument + * @return whether all in-flight messages were acknowledged before the timeout elapsed + */ +public boolean awaitAllMessages(long timeout, TimeUnit timeUnit) { +// Create a new message drain latch as a local variable to avoid SpotBugs warnings about inconsistent synchronization +// on an instance variable when invoking CountDownLatch::await outside a synchronized block +CountDownLatch messageDrainLatch; +synchronized (this) { +messageDrainLatch = new CountDownLatch(numUnackedMessages); +this.messageDrainLatch = messageDrainLatch; +} +try { +return messageDrainLatch.await(timeout, timeUnit); +} catch (InterruptedException e) { +return false; +} +} + // Note that this will return null if either there are no com
[kafka] branch trunk updated (e8dcbb9 -> f875576)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from e8dcbb9 KAFKA-13472: Correct last-committed offsets tracking for sink tasks after partial revocation (#11526) add f875576 KAFKA-13469: Block for in-flight record delivery before end-of-life source task offset commit (#11524) No new revisions were added by this update. Summary of changes: .../kafka/connect/runtime/SubmittedRecords.java| 59 -- .../kafka/connect/runtime/WorkerSourceTask.java| 6 +- .../connect/runtime/SubmittedRecordsTest.java | 94 +- 3 files changed, 149 insertions(+), 10 deletions(-)
[kafka] branch 2.8 updated: Revert "MINOR: Remove redundant argument from TaskMetricsGroup#recordCommit (#9642)"
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 2.8 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.8 by this push: new 97daef3 Revert "MINOR: Remove redundant argument from TaskMetricsGroup#recordCommit (#9642)" 97daef3 is described below commit 97daef397af47da04f01540f3afe43779066cbe2 Author: Randall Hauch AuthorDate: Tue Nov 16 09:51:45 2021 -0600 Revert "MINOR: Remove redundant argument from TaskMetricsGroup#recordCommit (#9642)" This reverts commit 047ad654da7903f3903760b0e6a6a58648ca7715. --- .../main/java/org/apache/kafka/connect/runtime/WorkerTask.java| 8 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java index a48bda1..e5489b2 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java @@ -301,7 +301,7 @@ abstract class WorkerTask implements Runnable { * @param duration the length of time in milliseconds for the commit attempt to complete */ protected void recordCommitSuccess(long duration) { -taskMetricsGroup.recordCommit(duration, null); +taskMetricsGroup.recordCommit(duration, true, null); } /** @@ -311,7 +311,7 @@ abstract class WorkerTask implements Runnable { * @param error the unexpected error that occurred; may be null in the case of timeouts or interruptions */ protected void recordCommitFailure(long duration, Throwable error) { -taskMetricsGroup.recordCommit(duration, error); +taskMetricsGroup.recordCommit(duration, false, error); } /** @@ -381,8 +381,8 @@ abstract class WorkerTask implements Runnable { metricGroup.close(); } -void recordCommit(long duration, Throwable error) { -if (error == null) { +void recordCommit(long duration, boolean success, Throwable error) { +if (success) { commitTime.record(duration); commitAttempts.record(1.0d); } else {
[kafka] branch 3.1 updated: Revert "MINOR: Remove redundant argument from TaskMetricsGroup#recordCommit (#9642)"
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 3.1 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.1 by this push: new 3c2dab1 Revert "MINOR: Remove redundant argument from TaskMetricsGroup#recordCommit (#9642)" 3c2dab1 is described below commit 3c2dab10c0f7f3cd3ff24f3e4c116d9b7755d38e Author: Randall Hauch AuthorDate: Tue Nov 16 09:52:48 2021 -0600 Revert "MINOR: Remove redundant argument from TaskMetricsGroup#recordCommit (#9642)" This reverts commit 047ad654da7903f3903760b0e6a6a58648ca7715. --- .../main/java/org/apache/kafka/connect/runtime/WorkerTask.java| 8 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java index e41b061..0d893f5 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java @@ -307,7 +307,7 @@ abstract class WorkerTask implements Runnable { * @param duration the length of time in milliseconds for the commit attempt to complete */ protected void recordCommitSuccess(long duration) { -taskMetricsGroup.recordCommit(duration, null); +taskMetricsGroup.recordCommit(duration, true, null); } /** @@ -317,7 +317,7 @@ abstract class WorkerTask implements Runnable { * @param error the unexpected error that occurred; may be null in the case of timeouts or interruptions */ protected void recordCommitFailure(long duration, Throwable error) { -taskMetricsGroup.recordCommit(duration, error); +taskMetricsGroup.recordCommit(duration, false, error); } /** @@ -387,8 +387,8 @@ abstract class WorkerTask implements Runnable { metricGroup.close(); } -void recordCommit(long duration, Throwable error) { -if (error == null) { +void recordCommit(long duration, boolean success, Throwable error) { +if (success) { commitTime.record(duration); commitAttempts.record(1.0d); } else {
[kafka] branch trunk updated (894e520 -> 9ea9f0f)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from 894e520 KAFKA-13449: Comment optimization for parameter log.cleaner.delete.retention.ms (#11505) add 9ea9f0f Revert "MINOR: Remove redundant argument from TaskMetricsGroup#recordCommit (#9642)" No new revisions were added by this update. Summary of changes: .../main/java/org/apache/kafka/connect/runtime/WorkerTask.java| 8 1 file changed, 4 insertions(+), 4 deletions(-)
[kafka] branch 3.0 updated: KAFKA-12226: Commit source task offsets without blocking on batch delivery (#11323)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 3.0 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.0 by this push: new af6f29c KAFKA-12226: Commit source task offsets without blocking on batch delivery (#11323) af6f29c is described below commit af6f29c8a0632ecb5d1089dbf1e55de4dbc5b2b2 Author: Chris Egerton AuthorDate: Sun Nov 7 12:39:04 2021 -0500 KAFKA-12226: Commit source task offsets without blocking on batch delivery (#11323) Replaces the current logic for committing source offsets, which is batch-based and blocks until the entirety of the current batch is fully written to and acknowledged by the broker, with a new non-blocking approach that commits source offsets for source records that have been "fully written" by the producer. The new logic consider a record fully written only if that source record and all records before it with the same source partition have all been written to Kafka and acknowledged. This new logic uses a deque for every source partition that a source task produces records for. Each element in that deque is a SubmittedRecord with a flag to track whether the producer has ack'd the delivery of that source record to Kafka. Periodically, the worker (on the same thread that polls the source task for records and transforms, converts, and dispatches them to the producer) polls acknowledged elements from the beginning of each of these deques and collects the latest offset [...] The behavior of the `offset.flush.timeout.ms property` is retained, but essentially now only applies to the actual writing of offset data to the internal offsets topic (if running in distributed mode) or the offsets file (if running in standalone mode). No time is spent during `WorkerSourceTask::commitOffsets` waiting on the acknowledgment of records by the producer. This behavior also does not change how the records are dispatched to the producer nor how the producer sends or batches those records. It's possible that memory exhaustion may occur if, for example, a single Kafka partition is offline for an extended period. In cases like this, the collection of deques in the SubmittedRecords class may continue to grow indefinitely until the partition comes back online and the SubmittedRecords in those deques that targeted the formerly-offline Kafka partition are acknowledged and can be removed. Although this may be suboptimal, it is no worse than the existing behavior of the framewo [...] Author: Chris Egerton Reviewed: Randall Hauch --- .../kafka/connect/runtime/SubmittedRecords.java| 297 + .../kafka/connect/runtime/WorkerSourceTask.java| 185 + .../kafka/connect/storage/OffsetStorageWriter.java | 5 +- .../connect/runtime/ErrorHandlingTaskTest.java | 4 - .../connect/runtime/SubmittedRecordsTest.java | 289 .../connect/runtime/WorkerSourceTaskTest.java | 116 .../connect/storage/OffsetStorageWriterTest.java | 8 +- 7 files changed, 712 insertions(+), 192 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java new file mode 100644 index 000..472a266 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime; + +import org.apache.kafka.connect.source.SourceRecord; +import org.apache.kafka.connect.source.SourceTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.Deque; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Map; + +/** + * Used to track source records that have been (or are about to be) dispatched to a producer and their accompanying + * source offsets. Records are tracked in the order in which they are submitted, which sh
[kafka] branch 3.1 updated: KAFKA-12226: Commit source task offsets without blocking on batch delivery (#11323)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 3.1 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.1 by this push: new f59471e KAFKA-12226: Commit source task offsets without blocking on batch delivery (#11323) f59471e is described below commit f59471ee441a48997fe305d02adfd17b2cd91364 Author: Chris Egerton AuthorDate: Sun Nov 7 12:39:04 2021 -0500 KAFKA-12226: Commit source task offsets without blocking on batch delivery (#11323) Replaces the current logic for committing source offsets, which is batch-based and blocks until the entirety of the current batch is fully written to and acknowledged by the broker, with a new non-blocking approach that commits source offsets for source records that have been "fully written" by the producer. The new logic consider a record fully written only if that source record and all records before it with the same source partition have all been written to Kafka and acknowledged. This new logic uses a deque for every source partition that a source task produces records for. Each element in that deque is a SubmittedRecord with a flag to track whether the producer has ack'd the delivery of that source record to Kafka. Periodically, the worker (on the same thread that polls the source task for records and transforms, converts, and dispatches them to the producer) polls acknowledged elements from the beginning of each of these deques and collects the latest offset [...] The behavior of the `offset.flush.timeout.ms property` is retained, but essentially now only applies to the actual writing of offset data to the internal offsets topic (if running in distributed mode) or the offsets file (if running in standalone mode). No time is spent during `WorkerSourceTask::commitOffsets` waiting on the acknowledgment of records by the producer. This behavior also does not change how the records are dispatched to the producer nor how the producer sends or batches those records. It's possible that memory exhaustion may occur if, for example, a single Kafka partition is offline for an extended period. In cases like this, the collection of deques in the SubmittedRecords class may continue to grow indefinitely until the partition comes back online and the SubmittedRecords in those deques that targeted the formerly-offline Kafka partition are acknowledged and can be removed. Although this may be suboptimal, it is no worse than the existing behavior of the framewo [...] Author: Chris Egerton Reviewed: Randall Hauch --- .../kafka/connect/runtime/SubmittedRecords.java| 297 + .../kafka/connect/runtime/WorkerSourceTask.java| 185 + .../kafka/connect/storage/OffsetStorageWriter.java | 5 +- .../connect/runtime/ErrorHandlingTaskTest.java | 4 - .../connect/runtime/SubmittedRecordsTest.java | 289 .../connect/runtime/WorkerSourceTaskTest.java | 116 .../connect/storage/OffsetStorageWriterTest.java | 8 +- 7 files changed, 712 insertions(+), 192 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java new file mode 100644 index 000..472a266 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime; + +import org.apache.kafka.connect.source.SourceRecord; +import org.apache.kafka.connect.source.SourceTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.Deque; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Map; + +/** + * Used to track source records that have been (or are about to be) dispatched to a producer and their accompanying + * source offsets. Records are tracked in the order in which they are submitted, which sh
[kafka] branch trunk updated (43bcc56 -> c1bdfa1)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from 43bcc56 KAFKA-13396: Allow create topic without partition/replicaFactor (#11429) add c1bdfa1 KAFKA-12226: Commit source task offsets without blocking on batch delivery (#11323) No new revisions were added by this update. Summary of changes: .../kafka/connect/runtime/SubmittedRecords.java| 297 + .../kafka/connect/runtime/WorkerSourceTask.java| 185 + .../kafka/connect/storage/OffsetStorageWriter.java | 5 +- .../connect/runtime/ErrorHandlingTaskTest.java | 4 - .../connect/runtime/SubmittedRecordsTest.java | 289 .../connect/runtime/WorkerSourceTaskTest.java | 116 .../connect/storage/OffsetStorageWriterTest.java | 8 +- 7 files changed, 712 insertions(+), 192 deletions(-) create mode 100644 connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java create mode 100644 connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SubmittedRecordsTest.java
[kafka] branch 3.0 updated: KAFKA-9887 fix failed task or connector count on startup failure (#8844)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 3.0 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.0 by this push: new 7f67691 KAFKA-9887 fix failed task or connector count on startup failure (#8844) 7f67691 is described below commit 7f6769110a52e07d0f406e188e570747eb941d8a Author: Michael Carter <53591940+michael-carter-instaclu...@users.noreply.github.com> AuthorDate: Wed Jul 21 08:39:26 2021 +1000 KAFKA-9887 fix failed task or connector count on startup failure (#8844) Moved the responsibility for recording task and connector startup and failure metrics from the invocation code into the status listener. The reason behind this is that the WorkerTasks (and subclasses) were either not propagating exceptions upwards, or were unable to do so easily because they were running on completely different threads. Also split out WorkerMetricsGroup from being an inner class into being a standard class. This was to make sure the Data Abstraction Count checkStyle rule was not violated. Author: Michael Carter Reviewers: Chris Egerton , Randall Hauch --- .../org/apache/kafka/connect/runtime/Worker.java | 105 + .../kafka/connect/runtime/WorkerMetricsGroup.java | 214 ++ .../kafka/connect/runtime/WorkerSinkTask.java | 3 +- .../kafka/connect/runtime/WorkerSourceTask.java| 25 ++- .../apache/kafka/connect/runtime/WorkerTask.java | 6 +- .../connect/runtime/ErrorHandlingTaskTest.java | 2 + .../connect/runtime/WorkerMetricsGroupTest.java| 249 + .../kafka/connect/runtime/WorkerSinkTaskTest.java | 3 + .../kafka/connect/runtime/WorkerTaskTest.java | 8 + .../apache/kafka/connect/runtime/WorkerTest.java | 6 +- 10 files changed, 507 insertions(+), 114 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java index 73201b5..11af818 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java @@ -21,13 +21,9 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.MetricName; import org.apache.kafka.common.MetricNameTemplate; import org.apache.kafka.common.config.ConfigValue; import org.apache.kafka.common.config.provider.ConfigProvider; -import org.apache.kafka.common.metrics.Sensor; -import org.apache.kafka.common.metrics.stats.CumulativeSum; -import org.apache.kafka.common.metrics.stats.Frequencies; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.connector.Connector; @@ -59,8 +55,8 @@ import org.apache.kafka.connect.storage.OffsetBackingStore; import org.apache.kafka.connect.storage.OffsetStorageReader; import org.apache.kafka.connect.storage.OffsetStorageReaderImpl; import org.apache.kafka.connect.storage.OffsetStorageWriter; -import org.apache.kafka.connect.util.ConnectUtils; import org.apache.kafka.connect.util.Callback; +import org.apache.kafka.connect.util.ConnectUtils; import org.apache.kafka.connect.util.ConnectorTaskId; import org.apache.kafka.connect.util.LoggingContext; import org.apache.kafka.connect.util.SinkUtils; @@ -147,7 +143,7 @@ public class Worker { this.plugins = plugins; this.config = config; this.connectorClientConfigOverridePolicy = connectorClientConfigOverridePolicy; -this.workerMetricsGroup = new WorkerMetricsGroup(metrics); +this.workerMetricsGroup = new WorkerMetricsGroup(this.connectors, this.tasks, metrics); Map internalConverterConfig = Collections.singletonMap(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, "false"); this.internalKeyConverter = plugins.newInternalConverter(true, JsonConverter.class.getName(), internalConverterConfig); @@ -247,6 +243,7 @@ public class Worker { TargetState initialState, Callback onConnectorStateChange ) { +final ConnectorStatus.Listener connectorStatusListener = workerMetricsGroup.wrapStatusListener(statusListener); try (LoggingContext loggingContext = LoggingContext.forConnector(connName)) { if (connectors.containsKey(connName)) { onConnectorStateChange.onCompletion( @@ -274,7 +271,7 @@ public class Worker { final OffsetStorageReader offsetReader = new OffsetStorageReaderImpl( offsetBackingStore, connName, internalKeyConverter, internalValueConverter); worke
[kafka-site] branch asf-site updated: Add CVE-2021-38153 (#375)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/kafka-site.git The following commit(s) were added to refs/heads/asf-site by this push: new d4258bd Add CVE-2021-38153 (#375) d4258bd is described below commit d4258bd575d84a60dfb929e2f97a3f7997f0c63d Author: Randall Hauch AuthorDate: Tue Sep 21 11:17:28 2021 -0500 Add CVE-2021-38153 (#375) --- cve-list.html | 29 + 1 file changed, 29 insertions(+) diff --git a/cve-list.html b/cve-list.html index ec22cfa..de6d308 100644 --- a/cve-list.html +++ b/cve-list.html @@ -9,6 +9,35 @@ This page lists all security vulnerabilities fixed in released versions of Apache Kafka. +http://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2021-38153;>CVE-2021-38153 +Timing Attack Vulnerability for Apache Kafka Connect and Clients + +Some components in Apache Kafka use Arrays.equals to validate a password or key, +which is vulnerable to timing attacks that make brute force attacks for such credentials +more likely to be successful. Users should upgrade to 2.8.1 or higher, or 3.0.0 or higher +where this vulnerability has been fixed. + + + + +Versions affected +2.0.0, 2.0.1, 2.1.0, 2.1.1, 2.2.0, 2.2.1, 2.2.2, 2.3.0, 2.3.1, 2.4.0, 2.4.1, 2.5.0, 2.5.1, 2.6.0, 2.6.1, 2.6.2, 2.7.0, 2.7.1, 2.8.0. + + +Fixed versions +2.8.1, 3.0.0 and later + + +Impact +This issue could result in privilege escalation. + + +Issue announced +21 Sep 2021 + + + + http://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2019-12399;>CVE-2019-12399 Apache Kafka Connect REST API may expose plaintext secrets in tasks endpoint
svn commit: r50020 - in /release/kafka: 2.7.0/ 2.8.0/
Author: rhauch Date: Mon Sep 20 15:51:42 2021 New Revision: 50020 Log: Remove 2.7.0 and 2.8.0 artifacts that are now archived Removed: release/kafka/2.7.0/ release/kafka/2.8.0/
[kafka-site] branch asf-site updated: Change 2.8.0 downloads to use archives (#374)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/kafka-site.git The following commit(s) were added to refs/heads/asf-site by this push: new 5cafb4f Change 2.8.0 downloads to use archives (#374) 5cafb4f is described below commit 5cafb4fdac8befc24974f42ad9d3660efe04ecaf Author: Randall Hauch AuthorDate: Mon Sep 20 10:05:52 2021 -0500 Change 2.8.0 downloads to use archives (#374) --- downloads.html | 10 +- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/downloads.html b/downloads.html index 591aefc..5022c0c 100644 --- a/downloads.html +++ b/downloads.html @@ -47,16 +47,16 @@ Released April 19, 2021 -https://downloads.apache.org/kafka/2.8.0/RELEASE_NOTES.html;>Release Notes +https://archive.apache.org/dist/kafka/2.8.0/RELEASE_NOTES.html;>Release Notes -Source download: https://www.apache.org/dyn/closer.cgi?path=/kafka/2.8.0/kafka-2.8.0-src.tgz;>kafka-2.8.0-src.tgz (https://downloads.apache.org/kafka/2.8.0/kafka-2.8.0-src.tgz.asc;>asc, https://downloads.apache.org/kafka/2.8.0/kafka-2.8.0-src.tgz.sha512;>sha512) +Source download: https://archive.apache.org/dist/kafka/2.8.0/kafka-2.8.0-src.tgz;>kafka-2.8.0-src.tgz (https://archive.apache.org/dist/kafka/2.8.0/kafka-2.8.0-src.tgz.asc;>asc, https://archive.apache.org/dist/kafka/2.8.0/kafka-2.8.0-src.tgz.sha512;>sha512) Binary downloads: -Scala 2.12 - https://www.apache.org/dyn/closer.cgi?path=/kafka/2.8.0/kafka_2.12-2.8.0.tgz;>kafka_2.12-2.8.0.tgz (https://downloads.apache.org/kafka/2.8.0/kafka_2.12-2.8.0.tgz.asc;>asc, https://downloads.apache.org/kafka/2.8.0/kafka_2.12-2.8.0.tgz.sha512;>sha512) -Scala 2.13 - https://www.apache.org/dyn/closer.cgi?path=/kafka/2.8.0/kafka_2.13-2.8.0.tgz;>kafka_2.13-2.8.0.tgz (https://downloads.apache.org/kafka/2.8.0/kafka_2.13-2.8.0.tgz.asc;>asc, https://downloads.apache.org/kafka/2.8.0/kafka_2.13-2.8.0.tgz.sha512;>sha512) +Scala 2.12 - https://archive.apache.org/dist/kafka/2.8.0/kafka_2.12-2.8.0.tgz;>kafka_2.12-2.8.0.tgz (https://archive.apache.org/dist/kafka/2.8.0/kafka_2.12-2.8.0.tgz.asc;>asc, https://archive.apache.org/dist/kafka/2.8.0/kafka_2.12-2.8.0.tgz.sha512;>sha512) +Scala 2.13 - https://archive.apache.org/dist/kafka/2.8.0/kafka_2.13-2.8.0.tgz;>kafka_2.13-2.8.0.tgz (https://archive.apache.org/dist/kafka/2.8.0/kafka_2.13-2.8.0.tgz.asc;>asc, https://archive.apache.org/dist/kafka/2.8.0/kafka_2.13-2.8.0.tgz.sha512;>sha512) We build for multiple versions of Scala. This only matters if you are using Scala and you want a version built for the same Scala version you use. Otherwise any version should work (2.13 is recommended). @@ -85,7 +85,7 @@ -For more information, please read the detailed https://downloads.apache.org/kafka/2.8.0/RELEASE_NOTES.html;>Release Notes. +For more information, please read the detailed https://archive.apache.org/dist/kafka/2.8.0/RELEASE_NOTES.html;>Release Notes.
svn commit: r49990 - in /release/kafka: ./ 2.8.1/
Author: rhauch Date: Fri Sep 17 17:47:25 2021 New Revision: 49990 Log: Release 2.8.1 Added: release/kafka/2.8.1/ release/kafka/2.8.1/RELEASE_NOTES.html release/kafka/2.8.1/RELEASE_NOTES.html.asc release/kafka/2.8.1/RELEASE_NOTES.html.md5 release/kafka/2.8.1/RELEASE_NOTES.html.sha1 release/kafka/2.8.1/RELEASE_NOTES.html.sha512 release/kafka/2.8.1/kafka-2.8.1-src.tgz (with props) release/kafka/2.8.1/kafka-2.8.1-src.tgz.asc release/kafka/2.8.1/kafka-2.8.1-src.tgz.md5 release/kafka/2.8.1/kafka-2.8.1-src.tgz.sha1 release/kafka/2.8.1/kafka-2.8.1-src.tgz.sha512 release/kafka/2.8.1/kafka_2.12-2.8.1-site-docs.tgz (with props) release/kafka/2.8.1/kafka_2.12-2.8.1-site-docs.tgz.asc release/kafka/2.8.1/kafka_2.12-2.8.1-site-docs.tgz.md5 release/kafka/2.8.1/kafka_2.12-2.8.1-site-docs.tgz.sha1 release/kafka/2.8.1/kafka_2.12-2.8.1-site-docs.tgz.sha512 release/kafka/2.8.1/kafka_2.12-2.8.1.tgz (with props) release/kafka/2.8.1/kafka_2.12-2.8.1.tgz.asc release/kafka/2.8.1/kafka_2.12-2.8.1.tgz.md5 release/kafka/2.8.1/kafka_2.12-2.8.1.tgz.sha1 release/kafka/2.8.1/kafka_2.12-2.8.1.tgz.sha512 release/kafka/2.8.1/kafka_2.13-2.8.1-site-docs.tgz (with props) release/kafka/2.8.1/kafka_2.13-2.8.1-site-docs.tgz.asc release/kafka/2.8.1/kafka_2.13-2.8.1-site-docs.tgz.md5 release/kafka/2.8.1/kafka_2.13-2.8.1-site-docs.tgz.sha1 release/kafka/2.8.1/kafka_2.13-2.8.1-site-docs.tgz.sha512 release/kafka/2.8.1/kafka_2.13-2.8.1.tgz (with props) release/kafka/2.8.1/kafka_2.13-2.8.1.tgz.asc release/kafka/2.8.1/kafka_2.13-2.8.1.tgz.md5 release/kafka/2.8.1/kafka_2.13-2.8.1.tgz.sha1 release/kafka/2.8.1/kafka_2.13-2.8.1.tgz.sha512 Modified: release/kafka/KEYS Added: release/kafka/2.8.1/RELEASE_NOTES.html == --- release/kafka/2.8.1/RELEASE_NOTES.html (added) +++ release/kafka/2.8.1/RELEASE_NOTES.html Fri Sep 17 17:47:25 2021 @@ -0,0 +1,76 @@ +Release Notes - Kafka - Version 2.8.1 +Below is a summary of the JIRA issues addressed in the 2.8.1 release of Kafka. For full documentation of the +release, a guide to get started, and information about the project, see the https://kafka.apache.org/;>Kafka +project site. + +Note about upgrades: Please carefully review the +https://kafka.apache.org/28/documentation.html#upgrade;>upgrade documentation for this release thoroughly +before upgrading your cluster. The upgrade notes discuss any critical information about incompatibilities and breaking +changes, performance changes, and any other changes that might impact your production deployment of Kafka. + +The documentation for the most recent release can be found at +https://kafka.apache.org/documentation.html;>https://kafka.apache.org/documentation.html. +New Feature + +[https://issues.apache.org/jira/browse/KAFKA-13207;>KAFKA-13207] - Replica fetcher should not update partition state on diverging epoch if partition removed from fetcher + +Improvement + +[https://issues.apache.org/jira/browse/KAFKA-10675;>KAFKA-10675] - Error message from ConnectSchema.validateValue() should include the name of the schema. +[https://issues.apache.org/jira/browse/KAFKA-13209;>KAFKA-13209] - Upgrade jetty-server to fix CVE-2021-34429 +[https://issues.apache.org/jira/browse/KAFKA-13258;>KAFKA-13258] - AlterClientQuotas response does not include an error when it failed +[https://issues.apache.org/jira/browse/KAFKA-13259;>KAFKA-13259] - DescribeProducers response does not include an error when it failed + +Bug + +[https://issues.apache.org/jira/browse/KAFKA-8562;>KAFKA-8562] - SASL_SSL still performs reverse DNS lookup despite KAFKA-5051 +[https://issues.apache.org/jira/browse/KAFKA-9747;>KAFKA-9747] - No tasks created for a connector +[https://issues.apache.org/jira/browse/KAFKA-9887;>KAFKA-9887] - failed-task-count JMX metric not updated if task fails during startup +[https://issues.apache.org/jira/browse/KAFKA-10340;>KAFKA-10340] - Source connectors should report error when trying to produce records to non-existent topics instead of hanging forever +[https://issues.apache.org/jira/browse/KAFKA-12252;>KAFKA-12252] - Distributed herder tick thread loops rapidly when worker loses leadership +[https://issues.apache.org/jira/browse/KAFKA-12262;>KAFKA-12262] - New session keys are never distributed when follower with key becomes leader +[https://issues.apache.org/jira/browse/KAFKA-12336;>KAFKA-12336] - custom stream naming does not work while calling stream[K, V](topicPattern: Pattern) API with named Consumed parameter +[https://issues.apache.org/jira/browse/KAFKA-12619;>KAFKA-12619] - Ensure LeaderChange message is committed before initializing high watermark +[https://issues.apache.org/jira/browse/KAFKA-12667;>KAFKA-12667] - Incorrect error log on StateDirectory
[kafka] branch 2.2 updated: MINOR: Use time constant algorithms when comparing passwords or keys (#10978)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 2.2 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.2 by this push: new ffa2812 MINOR: Use time constant algorithms when comparing passwords or keys (#10978) ffa2812 is described below commit ffa2812bf0c83c924fb8960131f40c39e441f3b2 Author: Randall Hauch AuthorDate: Fri Jul 30 17:48:03 2021 -0500 MINOR: Use time constant algorithms when comparing passwords or keys (#10978) Author: Randall Hauch Reviewers: Manikumar Reddy , Rajini Sivaram , Mickael Maison , Ismael Juma --- .../internals/PlainServerCallbackHandler.java | 4 +- .../security/scram/internals/ScramSaslClient.java | 3 +- .../security/scram/internals/ScramSaslServer.java | 3 +- .../security/token/delegation/DelegationToken.java | 3 +- .../java/org/apache/kafka/common/utils/Utils.java | 36 ++ .../org/apache/kafka/common/utils/UtilsTest.java | 44 ++ 6 files changed, 88 insertions(+), 5 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/security/plain/internals/PlainServerCallbackHandler.java b/clients/src/main/java/org/apache/kafka/common/security/plain/internals/PlainServerCallbackHandler.java index 842f986..10f5817 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/plain/internals/PlainServerCallbackHandler.java +++ b/clients/src/main/java/org/apache/kafka/common/security/plain/internals/PlainServerCallbackHandler.java @@ -22,9 +22,9 @@ import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.security.plain.PlainAuthenticateCallback; import org.apache.kafka.common.security.plain.PlainLoginModule; +import org.apache.kafka.common.utils.Utils; import java.io.IOException; -import java.util.Arrays; import java.util.List; import java.util.Map; @@ -65,7 +65,7 @@ public class PlainServerCallbackHandler implements AuthenticateCallbackHandler { String expectedPassword = JaasContext.configEntryOption(jaasConfigEntries, JAAS_USER_PREFIX + username, PlainLoginModule.class.getName()); -return expectedPassword != null && Arrays.equals(password, expectedPassword.toCharArray()); +return expectedPassword != null && Utils.isEqualConstantTime(password, expectedPassword.toCharArray()); } } diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslClient.java b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslClient.java index c21a52e..2e6191b 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslClient.java +++ b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslClient.java @@ -18,6 +18,7 @@ package org.apache.kafka.common.security.scram.internals; import java.nio.charset.StandardCharsets; import java.security.InvalidKeyException; +import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.Arrays; import java.util.Collection; @@ -204,7 +205,7 @@ public class ScramSaslClient implements SaslClient { try { byte[] serverKey = formatter.serverKey(saltedPassword); byte[] serverSignature = formatter.serverSignature(serverKey, clientFirstMessage, serverFirstMessage, clientFinalMessage); -if (!Arrays.equals(signature, serverSignature)) +if (!MessageDigest.isEqual(signature, serverSignature)) throw new SaslException("Invalid server signature in server final message"); } catch (InvalidKeyException e) { throw new SaslException("Sasl server signature verification failed", e); diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslServer.java b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslServer.java index f6286a6..3cc8ff0 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslServer.java +++ b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslServer.java @@ -17,6 +17,7 @@ package org.apache.kafka.common.security.scram.internals; import java.security.InvalidKeyException; +import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.Arrays; import java.util.Collection; @@ -226,7 +227,7 @@ public class ScramSaslServer implements SaslServer { byte[] expectedStoredKey = scramCredential.storedKey(); byte[] clientSignature = formatter.clientSignature(expectedStoredKey, clientFirstMessage, serverFirstMessage, clientFinalMessage); byte[] com
[kafka] branch 2.3 updated: MINOR: Use time constant algorithms when comparing passwords or keys (#10978)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 2.3 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.3 by this push: new da7a8db MINOR: Use time constant algorithms when comparing passwords or keys (#10978) da7a8db is described below commit da7a8db29e2073e3bc50f5a8192119af97ef7115 Author: Randall Hauch AuthorDate: Fri Jul 30 17:48:03 2021 -0500 MINOR: Use time constant algorithms when comparing passwords or keys (#10978) Author: Randall Hauch Reviewers: Manikumar Reddy , Rajini Sivaram , Mickael Maison , Ismael Juma --- .../internals/PlainServerCallbackHandler.java | 4 +- .../security/scram/internals/ScramSaslClient.java | 3 +- .../security/scram/internals/ScramSaslServer.java | 3 +- .../security/token/delegation/DelegationToken.java | 3 +- .../java/org/apache/kafka/common/utils/Utils.java | 36 ++ .../org/apache/kafka/common/utils/UtilsTest.java | 44 ++ 6 files changed, 88 insertions(+), 5 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/security/plain/internals/PlainServerCallbackHandler.java b/clients/src/main/java/org/apache/kafka/common/security/plain/internals/PlainServerCallbackHandler.java index 842f986..10f5817 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/plain/internals/PlainServerCallbackHandler.java +++ b/clients/src/main/java/org/apache/kafka/common/security/plain/internals/PlainServerCallbackHandler.java @@ -22,9 +22,9 @@ import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.security.plain.PlainAuthenticateCallback; import org.apache.kafka.common.security.plain.PlainLoginModule; +import org.apache.kafka.common.utils.Utils; import java.io.IOException; -import java.util.Arrays; import java.util.List; import java.util.Map; @@ -65,7 +65,7 @@ public class PlainServerCallbackHandler implements AuthenticateCallbackHandler { String expectedPassword = JaasContext.configEntryOption(jaasConfigEntries, JAAS_USER_PREFIX + username, PlainLoginModule.class.getName()); -return expectedPassword != null && Arrays.equals(password, expectedPassword.toCharArray()); +return expectedPassword != null && Utils.isEqualConstantTime(password, expectedPassword.toCharArray()); } } diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslClient.java b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslClient.java index c21a52e..2e6191b 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslClient.java +++ b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslClient.java @@ -18,6 +18,7 @@ package org.apache.kafka.common.security.scram.internals; import java.nio.charset.StandardCharsets; import java.security.InvalidKeyException; +import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.Arrays; import java.util.Collection; @@ -204,7 +205,7 @@ public class ScramSaslClient implements SaslClient { try { byte[] serverKey = formatter.serverKey(saltedPassword); byte[] serverSignature = formatter.serverSignature(serverKey, clientFirstMessage, serverFirstMessage, clientFinalMessage); -if (!Arrays.equals(signature, serverSignature)) +if (!MessageDigest.isEqual(signature, serverSignature)) throw new SaslException("Invalid server signature in server final message"); } catch (InvalidKeyException e) { throw new SaslException("Sasl server signature verification failed", e); diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslServer.java b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslServer.java index f6286a6..3cc8ff0 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslServer.java +++ b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslServer.java @@ -17,6 +17,7 @@ package org.apache.kafka.common.security.scram.internals; import java.security.InvalidKeyException; +import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.Arrays; import java.util.Collection; @@ -226,7 +227,7 @@ public class ScramSaslServer implements SaslServer { byte[] expectedStoredKey = scramCredential.storedKey(); byte[] clientSignature = formatter.clientSignature(expectedStoredKey, clientFirstMessage, serverFirstMessage, clientFinalMessage); byte[] com
[kafka] branch 2.4 updated: MINOR: Use time constant algorithms when comparing passwords or keys (#10978)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 2.4 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.4 by this push: new d0488d9 MINOR: Use time constant algorithms when comparing passwords or keys (#10978) d0488d9 is described below commit d0488d9a80ec55c4d5b5360e1381940f07d1d41e Author: Randall Hauch AuthorDate: Fri Jul 30 17:48:03 2021 -0500 MINOR: Use time constant algorithms when comparing passwords or keys (#10978) Author: Randall Hauch Reviewers: Manikumar Reddy , Rajini Sivaram , Mickael Maison , Ismael Juma --- .../internals/PlainServerCallbackHandler.java | 4 +- .../security/scram/internals/ScramSaslClient.java | 3 +- .../security/scram/internals/ScramSaslServer.java | 3 +- .../security/token/delegation/DelegationToken.java | 3 +- .../java/org/apache/kafka/common/utils/Utils.java | 36 ++ .../org/apache/kafka/common/utils/UtilsTest.java | 43 ++ 6 files changed, 87 insertions(+), 5 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/security/plain/internals/PlainServerCallbackHandler.java b/clients/src/main/java/org/apache/kafka/common/security/plain/internals/PlainServerCallbackHandler.java index 842f986..10f5817 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/plain/internals/PlainServerCallbackHandler.java +++ b/clients/src/main/java/org/apache/kafka/common/security/plain/internals/PlainServerCallbackHandler.java @@ -22,9 +22,9 @@ import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.security.plain.PlainAuthenticateCallback; import org.apache.kafka.common.security.plain.PlainLoginModule; +import org.apache.kafka.common.utils.Utils; import java.io.IOException; -import java.util.Arrays; import java.util.List; import java.util.Map; @@ -65,7 +65,7 @@ public class PlainServerCallbackHandler implements AuthenticateCallbackHandler { String expectedPassword = JaasContext.configEntryOption(jaasConfigEntries, JAAS_USER_PREFIX + username, PlainLoginModule.class.getName()); -return expectedPassword != null && Arrays.equals(password, expectedPassword.toCharArray()); +return expectedPassword != null && Utils.isEqualConstantTime(password, expectedPassword.toCharArray()); } } diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslClient.java b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslClient.java index c21a52e..2e6191b 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslClient.java +++ b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslClient.java @@ -18,6 +18,7 @@ package org.apache.kafka.common.security.scram.internals; import java.nio.charset.StandardCharsets; import java.security.InvalidKeyException; +import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.Arrays; import java.util.Collection; @@ -204,7 +205,7 @@ public class ScramSaslClient implements SaslClient { try { byte[] serverKey = formatter.serverKey(saltedPassword); byte[] serverSignature = formatter.serverSignature(serverKey, clientFirstMessage, serverFirstMessage, clientFinalMessage); -if (!Arrays.equals(signature, serverSignature)) +if (!MessageDigest.isEqual(signature, serverSignature)) throw new SaslException("Invalid server signature in server final message"); } catch (InvalidKeyException e) { throw new SaslException("Sasl server signature verification failed", e); diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslServer.java b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslServer.java index f6286a6..3cc8ff0 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslServer.java +++ b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslServer.java @@ -17,6 +17,7 @@ package org.apache.kafka.common.security.scram.internals; import java.security.InvalidKeyException; +import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.Arrays; import java.util.Collection; @@ -226,7 +227,7 @@ public class ScramSaslServer implements SaslServer { byte[] expectedStoredKey = scramCredential.storedKey(); byte[] clientSignature = formatter.clientSignature(expectedStoredKey, clientFirstMessage, serverFirstMessage, clientFinalMessage); byte[] com
[kafka] branch 2.5 updated: MINOR: Use time constant algorithms when comparing passwords or keys (#10978)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 2.5 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.5 by this push: new 0607661 MINOR: Use time constant algorithms when comparing passwords or keys (#10978) 0607661 is described below commit 06076616d7a9895dc05a01dd29cb1b1cb9ab4602 Author: Randall Hauch AuthorDate: Fri Jul 30 17:48:03 2021 -0500 MINOR: Use time constant algorithms when comparing passwords or keys (#10978) Author: Randall Hauch Reviewers: Manikumar Reddy , Rajini Sivaram , Mickael Maison , Ismael Juma --- .../internals/PlainServerCallbackHandler.java | 4 +- .../security/scram/internals/ScramSaslClient.java | 3 +- .../security/scram/internals/ScramSaslServer.java | 3 +- .../security/token/delegation/DelegationToken.java | 3 +- .../java/org/apache/kafka/common/utils/Utils.java | 36 ++ .../org/apache/kafka/common/utils/UtilsTest.java | 43 ++ 6 files changed, 87 insertions(+), 5 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/security/plain/internals/PlainServerCallbackHandler.java b/clients/src/main/java/org/apache/kafka/common/security/plain/internals/PlainServerCallbackHandler.java index 842f986..10f5817 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/plain/internals/PlainServerCallbackHandler.java +++ b/clients/src/main/java/org/apache/kafka/common/security/plain/internals/PlainServerCallbackHandler.java @@ -22,9 +22,9 @@ import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.security.plain.PlainAuthenticateCallback; import org.apache.kafka.common.security.plain.PlainLoginModule; +import org.apache.kafka.common.utils.Utils; import java.io.IOException; -import java.util.Arrays; import java.util.List; import java.util.Map; @@ -65,7 +65,7 @@ public class PlainServerCallbackHandler implements AuthenticateCallbackHandler { String expectedPassword = JaasContext.configEntryOption(jaasConfigEntries, JAAS_USER_PREFIX + username, PlainLoginModule.class.getName()); -return expectedPassword != null && Arrays.equals(password, expectedPassword.toCharArray()); +return expectedPassword != null && Utils.isEqualConstantTime(password, expectedPassword.toCharArray()); } } diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslClient.java b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslClient.java index c21a52e..2e6191b 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslClient.java +++ b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslClient.java @@ -18,6 +18,7 @@ package org.apache.kafka.common.security.scram.internals; import java.nio.charset.StandardCharsets; import java.security.InvalidKeyException; +import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.Arrays; import java.util.Collection; @@ -204,7 +205,7 @@ public class ScramSaslClient implements SaslClient { try { byte[] serverKey = formatter.serverKey(saltedPassword); byte[] serverSignature = formatter.serverSignature(serverKey, clientFirstMessage, serverFirstMessage, clientFinalMessage); -if (!Arrays.equals(signature, serverSignature)) +if (!MessageDigest.isEqual(signature, serverSignature)) throw new SaslException("Invalid server signature in server final message"); } catch (InvalidKeyException e) { throw new SaslException("Sasl server signature verification failed", e); diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslServer.java b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslServer.java index f6286a6..3cc8ff0 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslServer.java +++ b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslServer.java @@ -17,6 +17,7 @@ package org.apache.kafka.common.security.scram.internals; import java.security.InvalidKeyException; +import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.Arrays; import java.util.Collection; @@ -226,7 +227,7 @@ public class ScramSaslServer implements SaslServer { byte[] expectedStoredKey = scramCredential.storedKey(); byte[] clientSignature = formatter.clientSignature(expectedStoredKey, clientFirstMessage, serverFirstMessage, clientFinalMessage); byte[] com
[kafka] branch 2.6 updated: MINOR: Use time constant algorithms when comparing passwords or keys (#10978)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 2.6 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.6 by this push: new 461e282 MINOR: Use time constant algorithms when comparing passwords or keys (#10978) 461e282 is described below commit 461e2829a5acb41ec65d57ff690db7116b0f84b6 Author: Randall Hauch AuthorDate: Fri Jul 30 17:48:03 2021 -0500 MINOR: Use time constant algorithms when comparing passwords or keys (#10978) Author: Randall Hauch Reviewers: Manikumar Reddy , Rajini Sivaram , Mickael Maison , Ismael Juma --- .../internals/PlainServerCallbackHandler.java | 4 +- .../security/scram/internals/ScramSaslClient.java | 3 +- .../security/scram/internals/ScramSaslServer.java | 3 +- .../security/token/delegation/DelegationToken.java | 3 +- .../java/org/apache/kafka/common/utils/Utils.java | 36 ++ .../org/apache/kafka/common/utils/UtilsTest.java | 43 ++ 6 files changed, 87 insertions(+), 5 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/security/plain/internals/PlainServerCallbackHandler.java b/clients/src/main/java/org/apache/kafka/common/security/plain/internals/PlainServerCallbackHandler.java index 842f986..10f5817 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/plain/internals/PlainServerCallbackHandler.java +++ b/clients/src/main/java/org/apache/kafka/common/security/plain/internals/PlainServerCallbackHandler.java @@ -22,9 +22,9 @@ import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.security.plain.PlainAuthenticateCallback; import org.apache.kafka.common.security.plain.PlainLoginModule; +import org.apache.kafka.common.utils.Utils; import java.io.IOException; -import java.util.Arrays; import java.util.List; import java.util.Map; @@ -65,7 +65,7 @@ public class PlainServerCallbackHandler implements AuthenticateCallbackHandler { String expectedPassword = JaasContext.configEntryOption(jaasConfigEntries, JAAS_USER_PREFIX + username, PlainLoginModule.class.getName()); -return expectedPassword != null && Arrays.equals(password, expectedPassword.toCharArray()); +return expectedPassword != null && Utils.isEqualConstantTime(password, expectedPassword.toCharArray()); } } diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslClient.java b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslClient.java index c21a52e..2e6191b 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslClient.java +++ b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslClient.java @@ -18,6 +18,7 @@ package org.apache.kafka.common.security.scram.internals; import java.nio.charset.StandardCharsets; import java.security.InvalidKeyException; +import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.Arrays; import java.util.Collection; @@ -204,7 +205,7 @@ public class ScramSaslClient implements SaslClient { try { byte[] serverKey = formatter.serverKey(saltedPassword); byte[] serverSignature = formatter.serverSignature(serverKey, clientFirstMessage, serverFirstMessage, clientFinalMessage); -if (!Arrays.equals(signature, serverSignature)) +if (!MessageDigest.isEqual(signature, serverSignature)) throw new SaslException("Invalid server signature in server final message"); } catch (InvalidKeyException e) { throw new SaslException("Sasl server signature verification failed", e); diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslServer.java b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslServer.java index f6286a6..3cc8ff0 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslServer.java +++ b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslServer.java @@ -17,6 +17,7 @@ package org.apache.kafka.common.security.scram.internals; import java.security.InvalidKeyException; +import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.Arrays; import java.util.Collection; @@ -226,7 +227,7 @@ public class ScramSaslServer implements SaslServer { byte[] expectedStoredKey = scramCredential.storedKey(); byte[] clientSignature = formatter.clientSignature(expectedStoredKey, clientFirstMessage, serverFirstMessage, clientFinalMessage); byte[] com
[kafka] branch 2.8 updated: MINOR: Use time constant algorithms when comparing passwords or keys (#10978)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 2.8 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.8 by this push: new 3325342 MINOR: Use time constant algorithms when comparing passwords or keys (#10978) 3325342 is described below commit 3325342fecba56c2f5b28d60ca37605a7ebf420a Author: Randall Hauch AuthorDate: Fri Jul 30 17:48:03 2021 -0500 MINOR: Use time constant algorithms when comparing passwords or keys (#10978) Author: Randall Hauch Reviewers: Manikumar Reddy , Rajini Sivaram , Mickael Maison , Ismael Juma --- .../internals/PlainServerCallbackHandler.java | 4 +-- .../security/scram/internals/ScramSaslClient.java | 3 +- .../security/scram/internals/ScramSaslServer.java | 3 +- .../security/token/delegation/DelegationToken.java | 3 +- .../java/org/apache/kafka/common/utils/Utils.java | 36 +++ .../org/apache/kafka/common/utils/UtilsTest.java | 42 ++ 6 files changed, 86 insertions(+), 5 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/security/plain/internals/PlainServerCallbackHandler.java b/clients/src/main/java/org/apache/kafka/common/security/plain/internals/PlainServerCallbackHandler.java index 842f986..10f5817 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/plain/internals/PlainServerCallbackHandler.java +++ b/clients/src/main/java/org/apache/kafka/common/security/plain/internals/PlainServerCallbackHandler.java @@ -22,9 +22,9 @@ import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.security.plain.PlainAuthenticateCallback; import org.apache.kafka.common.security.plain.PlainLoginModule; +import org.apache.kafka.common.utils.Utils; import java.io.IOException; -import java.util.Arrays; import java.util.List; import java.util.Map; @@ -65,7 +65,7 @@ public class PlainServerCallbackHandler implements AuthenticateCallbackHandler { String expectedPassword = JaasContext.configEntryOption(jaasConfigEntries, JAAS_USER_PREFIX + username, PlainLoginModule.class.getName()); -return expectedPassword != null && Arrays.equals(password, expectedPassword.toCharArray()); +return expectedPassword != null && Utils.isEqualConstantTime(password, expectedPassword.toCharArray()); } } diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslClient.java b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslClient.java index ac486b7..536e409 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslClient.java +++ b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslClient.java @@ -18,6 +18,7 @@ package org.apache.kafka.common.security.scram.internals; import java.nio.charset.StandardCharsets; import java.security.InvalidKeyException; +import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.Arrays; import java.util.Collection; @@ -204,7 +205,7 @@ public class ScramSaslClient implements SaslClient { try { byte[] serverKey = formatter.serverKey(saltedPassword); byte[] serverSignature = formatter.serverSignature(serverKey, clientFirstMessage, serverFirstMessage, clientFinalMessage); -if (!Arrays.equals(signature, serverSignature)) +if (!MessageDigest.isEqual(signature, serverSignature)) throw new SaslException("Invalid server signature in server final message"); } catch (InvalidKeyException e) { throw new SaslException("Sasl server signature verification failed", e); diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslServer.java b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslServer.java index 1cc5b89..d5d55a6 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslServer.java +++ b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslServer.java @@ -17,6 +17,7 @@ package org.apache.kafka.common.security.scram.internals; import java.security.InvalidKeyException; +import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.Arrays; import java.util.Collection; @@ -226,7 +227,7 @@ public class ScramSaslServer implements SaslServer { byte[] expectedStoredKey = scramCredential.storedKey(); byte[] clientSignature = formatter.clientSignature(expectedStoredKey, clientFirstMessage, serverFirstMessage, clientFinalMessage); byte[] com
[kafka] branch 2.7 updated: MINOR: Use time constant algorithms when comparing passwords or keys (#10978)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 2.7 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.7 by this push: new 8436ade MINOR: Use time constant algorithms when comparing passwords or keys (#10978) 8436ade is described below commit 8436ade355c5d2ab6aa80e1791ae316c647db19f Author: Randall Hauch AuthorDate: Fri Jul 30 17:48:03 2021 -0500 MINOR: Use time constant algorithms when comparing passwords or keys (#10978) Author: Randall Hauch Reviewers: Manikumar Reddy , Rajini Sivaram , Mickael Maison , Ismael Juma --- .../internals/PlainServerCallbackHandler.java | 4 +-- .../security/scram/internals/ScramSaslClient.java | 3 +- .../security/scram/internals/ScramSaslServer.java | 3 +- .../security/token/delegation/DelegationToken.java | 3 +- .../java/org/apache/kafka/common/utils/Utils.java | 36 +++ .../org/apache/kafka/common/utils/UtilsTest.java | 42 ++ 6 files changed, 86 insertions(+), 5 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/security/plain/internals/PlainServerCallbackHandler.java b/clients/src/main/java/org/apache/kafka/common/security/plain/internals/PlainServerCallbackHandler.java index 842f986..10f5817 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/plain/internals/PlainServerCallbackHandler.java +++ b/clients/src/main/java/org/apache/kafka/common/security/plain/internals/PlainServerCallbackHandler.java @@ -22,9 +22,9 @@ import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.security.plain.PlainAuthenticateCallback; import org.apache.kafka.common.security.plain.PlainLoginModule; +import org.apache.kafka.common.utils.Utils; import java.io.IOException; -import java.util.Arrays; import java.util.List; import java.util.Map; @@ -65,7 +65,7 @@ public class PlainServerCallbackHandler implements AuthenticateCallbackHandler { String expectedPassword = JaasContext.configEntryOption(jaasConfigEntries, JAAS_USER_PREFIX + username, PlainLoginModule.class.getName()); -return expectedPassword != null && Arrays.equals(password, expectedPassword.toCharArray()); +return expectedPassword != null && Utils.isEqualConstantTime(password, expectedPassword.toCharArray()); } } diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslClient.java b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslClient.java index 6ee2e31..e9b6743 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslClient.java +++ b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslClient.java @@ -18,6 +18,7 @@ package org.apache.kafka.common.security.scram.internals; import java.nio.charset.StandardCharsets; import java.security.InvalidKeyException; +import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.Arrays; import java.util.Collection; @@ -204,7 +205,7 @@ public class ScramSaslClient implements SaslClient { try { byte[] serverKey = formatter.serverKey(saltedPassword); byte[] serverSignature = formatter.serverSignature(serverKey, clientFirstMessage, serverFirstMessage, clientFinalMessage); -if (!Arrays.equals(signature, serverSignature)) +if (!MessageDigest.isEqual(signature, serverSignature)) throw new SaslException("Invalid server signature in server final message"); } catch (InvalidKeyException e) { throw new SaslException("Sasl server signature verification failed", e); diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslServer.java b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslServer.java index f9bce80..c035b28 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslServer.java +++ b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslServer.java @@ -17,6 +17,7 @@ package org.apache.kafka.common.security.scram.internals; import java.security.InvalidKeyException; +import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.Arrays; import java.util.Collection; @@ -226,7 +227,7 @@ public class ScramSaslServer implements SaslServer { byte[] expectedStoredKey = scramCredential.storedKey(); byte[] clientSignature = formatter.clientSignature(expectedStoredKey, clientFirstMessage, serverFirstMessage, clientFinalMessage); byte[] com
[kafka] branch 3.0 updated: MINOR: Use time constant algorithms when comparing passwords or keys (#10978)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 3.0 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.0 by this push: new 00c086e MINOR: Use time constant algorithms when comparing passwords or keys (#10978) 00c086e is described below commit 00c086e9087c3163cb0502bf0067bae4d401d66e Author: Randall Hauch AuthorDate: Fri Jul 30 17:48:03 2021 -0500 MINOR: Use time constant algorithms when comparing passwords or keys (#10978) Author: Randall Hauch Reviewers: Manikumar Reddy , Rajini Sivaram , Mickael Maison , Ismael Juma --- .../internals/PlainServerCallbackHandler.java | 4 +- .../security/scram/internals/ScramSaslClient.java | 3 +- .../security/scram/internals/ScramSaslServer.java | 3 +- .../security/token/delegation/DelegationToken.java | 3 +- .../java/org/apache/kafka/common/utils/Utils.java | 36 ++ .../org/apache/kafka/common/utils/UtilsTest.java | 43 ++ 6 files changed, 87 insertions(+), 5 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/security/plain/internals/PlainServerCallbackHandler.java b/clients/src/main/java/org/apache/kafka/common/security/plain/internals/PlainServerCallbackHandler.java index 842f986..10f5817 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/plain/internals/PlainServerCallbackHandler.java +++ b/clients/src/main/java/org/apache/kafka/common/security/plain/internals/PlainServerCallbackHandler.java @@ -22,9 +22,9 @@ import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.security.plain.PlainAuthenticateCallback; import org.apache.kafka.common.security.plain.PlainLoginModule; +import org.apache.kafka.common.utils.Utils; import java.io.IOException; -import java.util.Arrays; import java.util.List; import java.util.Map; @@ -65,7 +65,7 @@ public class PlainServerCallbackHandler implements AuthenticateCallbackHandler { String expectedPassword = JaasContext.configEntryOption(jaasConfigEntries, JAAS_USER_PREFIX + username, PlainLoginModule.class.getName()); -return expectedPassword != null && Arrays.equals(password, expectedPassword.toCharArray()); +return expectedPassword != null && Utils.isEqualConstantTime(password, expectedPassword.toCharArray()); } } diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslClient.java b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslClient.java index ac486b7..536e409 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslClient.java +++ b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslClient.java @@ -18,6 +18,7 @@ package org.apache.kafka.common.security.scram.internals; import java.nio.charset.StandardCharsets; import java.security.InvalidKeyException; +import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.Arrays; import java.util.Collection; @@ -204,7 +205,7 @@ public class ScramSaslClient implements SaslClient { try { byte[] serverKey = formatter.serverKey(saltedPassword); byte[] serverSignature = formatter.serverSignature(serverKey, clientFirstMessage, serverFirstMessage, clientFinalMessage); -if (!Arrays.equals(signature, serverSignature)) +if (!MessageDigest.isEqual(signature, serverSignature)) throw new SaslException("Invalid server signature in server final message"); } catch (InvalidKeyException e) { throw new SaslException("Sasl server signature verification failed", e); diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslServer.java b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslServer.java index 1cc5b89..d5d55a6 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslServer.java +++ b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslServer.java @@ -17,6 +17,7 @@ package org.apache.kafka.common.security.scram.internals; import java.security.InvalidKeyException; +import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.Arrays; import java.util.Collection; @@ -226,7 +227,7 @@ public class ScramSaslServer implements SaslServer { byte[] expectedStoredKey = scramCredential.storedKey(); byte[] clientSignature = formatter.clientSignature(expectedStoredKey, clientFirstMessage, serverFirstMessage, clientFinalMessage); byte[] com
[kafka] branch trunk updated (56eb950 -> 8a1fcee)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from 56eb950 MINOR: log epoch and offset truncation similarly to HWM truncation (#11140) add 8a1fcee MINOR: Use time constant algorithms when comparing passwords or keys (#10978) No new revisions were added by this update. Summary of changes: .../internals/PlainServerCallbackHandler.java | 4 +- .../security/scram/internals/ScramSaslClient.java | 3 +- .../security/scram/internals/ScramSaslServer.java | 3 +- .../security/token/delegation/DelegationToken.java | 3 +- .../java/org/apache/kafka/common/utils/Utils.java | 36 ++ .../org/apache/kafka/common/utils/UtilsTest.java | 43 ++ 6 files changed, 87 insertions(+), 5 deletions(-)
[kafka] branch 2.7 updated: KAFKA-9887 fix failed task or connector count on startup failure (#8844)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 2.7 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.7 by this push: new 8e00835 KAFKA-9887 fix failed task or connector count on startup failure (#8844) 8e00835 is described below commit 8e00835b108b3779fbd927a41f3c3c3a0606c4d7 Author: Michael Carter <53591940+michael-carter-instaclu...@users.noreply.github.com> AuthorDate: Wed Jul 21 08:39:26 2021 +1000 KAFKA-9887 fix failed task or connector count on startup failure (#8844) Moved the responsibility for recording task and connector startup and failure metrics from the invocation code into the status listener. The reason behind this is that the WorkerTasks (and subclasses) were either not propagating exceptions upwards, or were unable to do so easily because they were running on completely different threads. Also split out WorkerMetricsGroup from being an inner class into being a standard class. This was to make sure the Data Abstraction Count checkStyle rule was not violated. Author: Michael Carter Reviewers: Chris Egerton , Randall Hauch --- .../org/apache/kafka/connect/runtime/Worker.java | 105 + .../kafka/connect/runtime/WorkerMetricsGroup.java | 204 + .../kafka/connect/runtime/WorkerSinkTask.java | 3 +- .../kafka/connect/runtime/WorkerSourceTask.java| 25 ++- .../apache/kafka/connect/runtime/WorkerTask.java | 6 +- .../connect/runtime/ErrorHandlingTaskTest.java | 2 + .../ErrorHandlingTaskWithTopicCreationTest.java| 2 + .../connect/runtime/WorkerMetricsGroupTest.java| 249 + .../kafka/connect/runtime/WorkerSinkTaskTest.java | 3 + .../kafka/connect/runtime/WorkerTaskTest.java | 8 + .../apache/kafka/connect/runtime/WorkerTest.java | 6 +- .../runtime/WorkerWithTopicCreationTest.java | 7 +- 12 files changed, 503 insertions(+), 117 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java index fffe71a..27f96de 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java @@ -21,13 +21,9 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.MetricName; import org.apache.kafka.common.MetricNameTemplate; import org.apache.kafka.common.config.ConfigValue; import org.apache.kafka.common.config.provider.ConfigProvider; -import org.apache.kafka.common.metrics.Sensor; -import org.apache.kafka.common.metrics.stats.CumulativeSum; -import org.apache.kafka.common.metrics.stats.Frequencies; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.connector.Connector; @@ -57,8 +53,8 @@ import org.apache.kafka.connect.storage.OffsetBackingStore; import org.apache.kafka.connect.storage.OffsetStorageReader; import org.apache.kafka.connect.storage.OffsetStorageReaderImpl; import org.apache.kafka.connect.storage.OffsetStorageWriter; -import org.apache.kafka.connect.util.ConnectUtils; import org.apache.kafka.connect.util.Callback; +import org.apache.kafka.connect.util.ConnectUtils; import org.apache.kafka.connect.util.ConnectorTaskId; import org.apache.kafka.connect.util.LoggingContext; import org.apache.kafka.connect.util.SinkUtils; @@ -145,7 +141,7 @@ public class Worker { this.plugins = plugins; this.config = config; this.connectorClientConfigOverridePolicy = connectorClientConfigOverridePolicy; -this.workerMetricsGroup = new WorkerMetricsGroup(metrics); +this.workerMetricsGroup = new WorkerMetricsGroup(this.connectors, this.tasks, metrics); // Internal converters are required properties, thus getClass won't return null. this.internalKeyConverter = plugins.newConverter( @@ -253,6 +249,7 @@ public class Worker { TargetState initialState, Callback onConnectorStateChange ) { +final ConnectorStatus.Listener connectorStatusListener = workerMetricsGroup.wrapStatusListener(statusListener); try (LoggingContext loggingContext = LoggingContext.forConnector(connName)) { if (connectors.containsKey(connName)) { onConnectorStateChange.onCompletion( @@ -280,7 +277,7 @@ public class Worker { final OffsetStorageReader offsetReader = new OffsetStorageReaderImpl( offsetBackingStore, connName, internalKeyConverter, internalValueCon
[kafka] branch 2.8 updated: KAFKA-9887 fix failed task or connector count on startup failure (#8844)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 2.8 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.8 by this push: new 8172077 KAFKA-9887 fix failed task or connector count on startup failure (#8844) 8172077 is described below commit 8172077fdfab15d3263531449ac894d22476086c Author: Michael Carter <53591940+michael-carter-instaclu...@users.noreply.github.com> AuthorDate: Wed Jul 21 08:39:26 2021 +1000 KAFKA-9887 fix failed task or connector count on startup failure (#8844) Moved the responsibility for recording task and connector startup and failure metrics from the invocation code into the status listener. The reason behind this is that the WorkerTasks (and subclasses) were either not propagating exceptions upwards, or were unable to do so easily because they were running on completely different threads. Also split out WorkerMetricsGroup from being an inner class into being a standard class. This was to make sure the Data Abstraction Count checkStyle rule was not violated. Author: Michael Carter Reviewers: Chris Egerton , Randall Hauch --- .../org/apache/kafka/connect/runtime/Worker.java | 105 + .../kafka/connect/runtime/WorkerMetricsGroup.java | 204 + .../kafka/connect/runtime/WorkerSinkTask.java | 3 +- .../kafka/connect/runtime/WorkerSourceTask.java| 25 ++- .../apache/kafka/connect/runtime/WorkerTask.java | 6 +- .../connect/runtime/ErrorHandlingTaskTest.java | 2 + .../ErrorHandlingTaskWithTopicCreationTest.java| 2 + .../connect/runtime/WorkerMetricsGroupTest.java| 249 + .../kafka/connect/runtime/WorkerSinkTaskTest.java | 3 + .../kafka/connect/runtime/WorkerTaskTest.java | 8 + .../apache/kafka/connect/runtime/WorkerTest.java | 6 +- .../runtime/WorkerWithTopicCreationTest.java | 7 +- 12 files changed, 503 insertions(+), 117 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java index fffe71a..27f96de 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java @@ -21,13 +21,9 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.MetricName; import org.apache.kafka.common.MetricNameTemplate; import org.apache.kafka.common.config.ConfigValue; import org.apache.kafka.common.config.provider.ConfigProvider; -import org.apache.kafka.common.metrics.Sensor; -import org.apache.kafka.common.metrics.stats.CumulativeSum; -import org.apache.kafka.common.metrics.stats.Frequencies; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.connector.Connector; @@ -57,8 +53,8 @@ import org.apache.kafka.connect.storage.OffsetBackingStore; import org.apache.kafka.connect.storage.OffsetStorageReader; import org.apache.kafka.connect.storage.OffsetStorageReaderImpl; import org.apache.kafka.connect.storage.OffsetStorageWriter; -import org.apache.kafka.connect.util.ConnectUtils; import org.apache.kafka.connect.util.Callback; +import org.apache.kafka.connect.util.ConnectUtils; import org.apache.kafka.connect.util.ConnectorTaskId; import org.apache.kafka.connect.util.LoggingContext; import org.apache.kafka.connect.util.SinkUtils; @@ -145,7 +141,7 @@ public class Worker { this.plugins = plugins; this.config = config; this.connectorClientConfigOverridePolicy = connectorClientConfigOverridePolicy; -this.workerMetricsGroup = new WorkerMetricsGroup(metrics); +this.workerMetricsGroup = new WorkerMetricsGroup(this.connectors, this.tasks, metrics); // Internal converters are required properties, thus getClass won't return null. this.internalKeyConverter = plugins.newConverter( @@ -253,6 +249,7 @@ public class Worker { TargetState initialState, Callback onConnectorStateChange ) { +final ConnectorStatus.Listener connectorStatusListener = workerMetricsGroup.wrapStatusListener(statusListener); try (LoggingContext loggingContext = LoggingContext.forConnector(connName)) { if (connectors.containsKey(connName)) { onConnectorStateChange.onCompletion( @@ -280,7 +277,7 @@ public class Worker { final OffsetStorageReader offsetReader = new OffsetStorageReaderImpl( offsetBackingStore, connName, internalKeyConverter, internalValueCon
[kafka] branch trunk updated: KAFKA-9887 fix failed task or connector count on startup failure (#8844)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 0314801 KAFKA-9887 fix failed task or connector count on startup failure (#8844) 0314801 is described below commit 0314801a8e67a96f8cdea85bf55cb5bed808fc34 Author: Michael Carter <53591940+michael-carter-instaclu...@users.noreply.github.com> AuthorDate: Wed Jul 21 08:39:26 2021 +1000 KAFKA-9887 fix failed task or connector count on startup failure (#8844) Moved the responsibility for recording task and connector startup and failure metrics from the invocation code into the status listener. The reason behind this is that the WorkerTasks (and subclasses) were either not propagating exceptions upwards, or were unable to do so easily because they were running on completely different threads. Also split out WorkerMetricsGroup from being an inner class into being a standard class. This was to make sure the Data Abstraction Count checkStyle rule was not violated. Author: Michael Carter Reviewers: Chris Egerton , Randall Hauch --- .../org/apache/kafka/connect/runtime/Worker.java | 105 + .../kafka/connect/runtime/WorkerMetricsGroup.java | 214 ++ .../kafka/connect/runtime/WorkerSinkTask.java | 3 +- .../kafka/connect/runtime/WorkerSourceTask.java| 25 ++- .../apache/kafka/connect/runtime/WorkerTask.java | 6 +- .../connect/runtime/ErrorHandlingTaskTest.java | 2 + .../connect/runtime/WorkerMetricsGroupTest.java| 249 + .../kafka/connect/runtime/WorkerSinkTaskTest.java | 3 + .../kafka/connect/runtime/WorkerTaskTest.java | 8 + .../apache/kafka/connect/runtime/WorkerTest.java | 6 +- 10 files changed, 507 insertions(+), 114 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java index 73201b5..11af818 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java @@ -21,13 +21,9 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.MetricName; import org.apache.kafka.common.MetricNameTemplate; import org.apache.kafka.common.config.ConfigValue; import org.apache.kafka.common.config.provider.ConfigProvider; -import org.apache.kafka.common.metrics.Sensor; -import org.apache.kafka.common.metrics.stats.CumulativeSum; -import org.apache.kafka.common.metrics.stats.Frequencies; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.connector.Connector; @@ -59,8 +55,8 @@ import org.apache.kafka.connect.storage.OffsetBackingStore; import org.apache.kafka.connect.storage.OffsetStorageReader; import org.apache.kafka.connect.storage.OffsetStorageReaderImpl; import org.apache.kafka.connect.storage.OffsetStorageWriter; -import org.apache.kafka.connect.util.ConnectUtils; import org.apache.kafka.connect.util.Callback; +import org.apache.kafka.connect.util.ConnectUtils; import org.apache.kafka.connect.util.ConnectorTaskId; import org.apache.kafka.connect.util.LoggingContext; import org.apache.kafka.connect.util.SinkUtils; @@ -147,7 +143,7 @@ public class Worker { this.plugins = plugins; this.config = config; this.connectorClientConfigOverridePolicy = connectorClientConfigOverridePolicy; -this.workerMetricsGroup = new WorkerMetricsGroup(metrics); +this.workerMetricsGroup = new WorkerMetricsGroup(this.connectors, this.tasks, metrics); Map internalConverterConfig = Collections.singletonMap(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, "false"); this.internalKeyConverter = plugins.newInternalConverter(true, JsonConverter.class.getName(), internalConverterConfig); @@ -247,6 +243,7 @@ public class Worker { TargetState initialState, Callback onConnectorStateChange ) { +final ConnectorStatus.Listener connectorStatusListener = workerMetricsGroup.wrapStatusListener(statusListener); try (LoggingContext loggingContext = LoggingContext.forConnector(connName)) { if (connectors.containsKey(connName)) { onConnectorStateChange.onCompletion( @@ -274,7 +271,7 @@ public class Worker { final OffsetStorageReader offsetReader = new OffsetStorageReaderImpl( offsetBackingStore, connName, internalKeyConverter, internalValueConverter); worke
[kafka] branch trunk updated (f29c43b -> 2e89f40)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from f29c43b KAFKA-12979; Implement command to find hanging transactions (#10974) add 2e89f40 KAFKA-13035 updated documentation for connector restart REST API to … (#10975) No new revisions were added by this update. Summary of changes: docs/connect.html | 7 ++- 1 file changed, 6 insertions(+), 1 deletion(-)
[kafka] branch trunk updated (51796bc -> cad2f5e)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from 51796bc KAFKA-10587; Rename kafka-mirror-maker CLI command line arguments for KIP-629 add cad2f5e KAFKA-12717: Remove internal Connect converter properties (KIP-738) (#10854) No new revisions were added by this update. Summary of changes: .../org/apache/kafka/connect/runtime/Worker.java | 16 ++-- .../apache/kafka/connect/runtime/WorkerConfig.java | 100 - .../kafka/connect/runtime/isolation/Plugins.java | 55 +++- .../kafka/connect/runtime/ConnectMetricsTest.java | 2 - .../connect/runtime/ErrorHandlingTaskTest.java | 4 - .../kafka/connect/runtime/MockConnectMetrics.java | 2 - .../runtime/SourceTaskOffsetCommitterTest.java | 4 - .../kafka/connect/runtime/WorkerSinkTaskTest.java | 4 - .../runtime/WorkerSinkTaskThreadedTest.java| 4 - .../connect/runtime/WorkerSourceTaskTest.java | 4 - .../apache/kafka/connect/runtime/WorkerTest.java | 16 ++-- .../runtime/distributed/DistributedHerderTest.java | 2 - .../connect/runtime/isolation/PluginsTest.java | 23 ++--- .../kafka/connect/runtime/rest/RestServerTest.java | 2 - .../connect/runtime/rest/util/SSLUtilsTest.java| 6 -- .../storage/FileOffsetBackingStoreTest.java| 2 - .../storage/KafkaConfigBackingStoreTest.java | 2 - .../storage/KafkaOffsetBackingStoreTest.java | 2 - docs/upgrade.html | 7 ++ 19 files changed, 73 insertions(+), 184 deletions(-)
[kafka] branch trunk updated (7da881f -> 5652ef1)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from 7da881f KAFKA-12928: Add a check whether the Task's statestore is actually a directory (#10862) add 5652ef1 KAFKA-12482 Remove deprecated rest.host.name and rest.port configs (#10841) No new revisions were added by this update. Summary of changes: config/connect-distributed.properties | 9 +-- .../apache/kafka/connect/runtime/WorkerConfig.java | 65 -- .../kafka/connect/runtime/rest/RestServer.java | 25 ++--- .../kafka/connect/runtime/WorkerConfigTest.java| 48 +++- .../kafka/connect/runtime/rest/RestServerTest.java | 31 --- .../util/clusters/EmbeddedConnectCluster.java | 8 +-- docs/upgrade.html | 4 +- 7 files changed, 93 insertions(+), 97 deletions(-)
[kafka] branch 2.6 updated: KAFKA-12904: Corrected the timeout for config validation REST API resource (#10834)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 2.6 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.6 by this push: new a35c503 KAFKA-12904: Corrected the timeout for config validation REST API resource (#10834) a35c503 is described below commit a35c503b2e5734d1f99ba7f72d2c00f4423e72d1 Author: Randall Hauch AuthorDate: Tue Jun 22 09:15:13 2021 -0500 KAFKA-12904: Corrected the timeout for config validation REST API resource (#10834) The constant is specified in milliseconds, and so the MILLISECOND time unit should be used instead of SECONDS. Author: Randall Hauch Reviewer: Konstantine Karantasis --- .../kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java index 1f6161e..0854c8f 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java @@ -88,7 +88,7 @@ public class ConnectorPluginsResource { herder.validateConnectorConfig(connectorConfig, validationCallback, false); try { -return validationCallback.get(ConnectorsResource.REQUEST_TIMEOUT_MS, TimeUnit.SECONDS); +return validationCallback.get(ConnectorsResource.REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS); } catch (TimeoutException e) { // This timeout is for the operation itself. None of the timeout error codes are relevant, so internal server // error is the best option
[kafka] branch 2.7 updated: KAFKA-12904: Corrected the timeout for config validation REST API resource (#10834)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 2.7 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.7 by this push: new 439251b KAFKA-12904: Corrected the timeout for config validation REST API resource (#10834) 439251b is described below commit 439251be08e50a3ac2f069abef051a4c21bba601 Author: Randall Hauch AuthorDate: Tue Jun 22 09:15:13 2021 -0500 KAFKA-12904: Corrected the timeout for config validation REST API resource (#10834) The constant is specified in milliseconds, and so the MILLISECOND time unit should be used instead of SECONDS. Author: Randall Hauch Reviewer: Konstantine Karantasis --- .../kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java index 1f6161e..0854c8f 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java @@ -88,7 +88,7 @@ public class ConnectorPluginsResource { herder.validateConnectorConfig(connectorConfig, validationCallback, false); try { -return validationCallback.get(ConnectorsResource.REQUEST_TIMEOUT_MS, TimeUnit.SECONDS); +return validationCallback.get(ConnectorsResource.REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS); } catch (TimeoutException e) { // This timeout is for the operation itself. None of the timeout error codes are relevant, so internal server // error is the best option
[kafka] branch 2.8 updated (3cbd063 -> 4d09c51)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a change to branch 2.8 in repository https://gitbox.apache.org/repos/asf/kafka.git. from 3cbd063 KAFKA-12870; Flush in progress not cleared after transaction completion (#10880) add 4d09c51 KAFKA-12904: Corrected the timeout for config validation REST API resource (#10834) No new revisions were added by this update. Summary of changes: .../kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-)
[kafka] branch trunk updated (c8684d8 -> b285662)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from c8684d8 KAFKA-12483: Enable client overrides in connector configs by default (KIP-722) (#10336) add b285662 KAFKA-12904: Corrected the timeout for config validation REST API resource (#10834) No new revisions were added by this update. Summary of changes: .../kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-)
[kafka] branch trunk updated (6cdde72 -> c8684d8)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from 6cdde72 KAFKA-12484: Enable Connect's connector log contexts by default (KIP-721) (#10335) add c8684d8 KAFKA-12483: Enable client overrides in connector configs by default (KIP-722) (#10336) No new revisions were added by this update. Summary of changes: .../apache/kafka/connect/runtime/WorkerConfig.java| 7 --- .../ConnectorClientPolicyIntegrationTest.java | 19 ++- 2 files changed, 22 insertions(+), 4 deletions(-)
[kafka] branch trunk updated (2ad9350 -> 6cdde72)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from 2ad9350 MINOR: Remove obsolete variables for metric sensors (#10912) add 6cdde72 KAFKA-12484: Enable Connect's connector log contexts by default (KIP-721) (#10335) No new revisions were added by this update. Summary of changes: config/connect-log4j.properties | 7 +++ 1 file changed, 3 insertions(+), 4 deletions(-)
[kafka] branch 2.4 updated: MINOR: Use MessageDigest equals when comparing signature (#10898)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 2.4 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.4 by this push: new 0faa9e6 MINOR: Use MessageDigest equals when comparing signature (#10898) 0faa9e6 is described below commit 0faa9e6793aa8d48e05c23179a80e109d17a Author: Randall Hauch AuthorDate: Fri Jun 18 09:53:23 2021 -0500 MINOR: Use MessageDigest equals when comparing signature (#10898) --- .../apache/kafka/connect/runtime/rest/InternalRequestSignature.java| 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/InternalRequestSignature.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/InternalRequestSignature.java index d59425b..3cee577 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/InternalRequestSignature.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/InternalRequestSignature.java @@ -24,6 +24,7 @@ import javax.crypto.Mac; import javax.crypto.SecretKey; import javax.ws.rs.core.HttpHeaders; import java.security.InvalidKeyException; +import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.Arrays; import java.util.Base64; @@ -108,7 +109,7 @@ public class InternalRequestSignature { } public boolean isValid(SecretKey key) { -return Arrays.equals(sign(mac, key, requestBody), requestSignature); +return MessageDigest.isEqual(sign(mac, key, requestBody), requestSignature); } private static Mac mac(String signatureAlgorithm) throws NoSuchAlgorithmException {
[kafka] branch 2.5 updated: MINOR: Use MessageDigest equals when comparing signature (#10898)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 2.5 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.5 by this push: new 221a6d3 MINOR: Use MessageDigest equals when comparing signature (#10898) 221a6d3 is described below commit 221a6d35957b24004b9b48433b2ab2f2d2fe035c Author: Randall Hauch AuthorDate: Fri Jun 18 09:53:23 2021 -0500 MINOR: Use MessageDigest equals when comparing signature (#10898) --- .../apache/kafka/connect/runtime/rest/InternalRequestSignature.java| 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/InternalRequestSignature.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/InternalRequestSignature.java index d59425b..3cee577 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/InternalRequestSignature.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/InternalRequestSignature.java @@ -24,6 +24,7 @@ import javax.crypto.Mac; import javax.crypto.SecretKey; import javax.ws.rs.core.HttpHeaders; import java.security.InvalidKeyException; +import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.Arrays; import java.util.Base64; @@ -108,7 +109,7 @@ public class InternalRequestSignature { } public boolean isValid(SecretKey key) { -return Arrays.equals(sign(mac, key, requestBody), requestSignature); +return MessageDigest.isEqual(sign(mac, key, requestBody), requestSignature); } private static Mac mac(String signatureAlgorithm) throws NoSuchAlgorithmException {
[kafka] branch 2.6 updated: MINOR: Use MessageDigest equals when comparing signature (#10898)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 2.6 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.6 by this push: new 03bc8aa MINOR: Use MessageDigest equals when comparing signature (#10898) 03bc8aa is described below commit 03bc8aa3e65676a4e9604a5471ef8cd54825ea3c Author: Randall Hauch AuthorDate: Fri Jun 18 09:53:23 2021 -0500 MINOR: Use MessageDigest equals when comparing signature (#10898) --- .../apache/kafka/connect/runtime/rest/InternalRequestSignature.java| 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/InternalRequestSignature.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/InternalRequestSignature.java index d59425b..3cee577 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/InternalRequestSignature.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/InternalRequestSignature.java @@ -24,6 +24,7 @@ import javax.crypto.Mac; import javax.crypto.SecretKey; import javax.ws.rs.core.HttpHeaders; import java.security.InvalidKeyException; +import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.Arrays; import java.util.Base64; @@ -108,7 +109,7 @@ public class InternalRequestSignature { } public boolean isValid(SecretKey key) { -return Arrays.equals(sign(mac, key, requestBody), requestSignature); +return MessageDigest.isEqual(sign(mac, key, requestBody), requestSignature); } private static Mac mac(String signatureAlgorithm) throws NoSuchAlgorithmException {
[kafka] branch 2.7 updated: MINOR: Use MessageDigest equals when comparing signature (#10898)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 2.7 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.7 by this push: new 75fd76b MINOR: Use MessageDigest equals when comparing signature (#10898) 75fd76b is described below commit 75fd76bdeb9d02cdd103c68576867581f006c5f8 Author: Randall Hauch AuthorDate: Fri Jun 18 09:53:23 2021 -0500 MINOR: Use MessageDigest equals when comparing signature (#10898) --- .../apache/kafka/connect/runtime/rest/InternalRequestSignature.java| 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/InternalRequestSignature.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/InternalRequestSignature.java index d59425b..3cee577 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/InternalRequestSignature.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/InternalRequestSignature.java @@ -24,6 +24,7 @@ import javax.crypto.Mac; import javax.crypto.SecretKey; import javax.ws.rs.core.HttpHeaders; import java.security.InvalidKeyException; +import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.Arrays; import java.util.Base64; @@ -108,7 +109,7 @@ public class InternalRequestSignature { } public boolean isValid(SecretKey key) { -return Arrays.equals(sign(mac, key, requestBody), requestSignature); +return MessageDigest.isEqual(sign(mac, key, requestBody), requestSignature); } private static Mac mac(String signatureAlgorithm) throws NoSuchAlgorithmException {
[kafka] branch 2.8 updated: MINOR: Use MessageDigest equals when comparing signature (#10898)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 2.8 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.8 by this push: new d7abd32 MINOR: Use MessageDigest equals when comparing signature (#10898) d7abd32 is described below commit d7abd32f3569a65a4b59c7dd8a655b17ffa1b455 Author: Randall Hauch AuthorDate: Fri Jun 18 09:53:23 2021 -0500 MINOR: Use MessageDigest equals when comparing signature (#10898) --- .../apache/kafka/connect/runtime/rest/InternalRequestSignature.java| 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/InternalRequestSignature.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/InternalRequestSignature.java index d59425b..3cee577 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/InternalRequestSignature.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/InternalRequestSignature.java @@ -24,6 +24,7 @@ import javax.crypto.Mac; import javax.crypto.SecretKey; import javax.ws.rs.core.HttpHeaders; import java.security.InvalidKeyException; +import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.Arrays; import java.util.Base64; @@ -108,7 +109,7 @@ public class InternalRequestSignature { } public boolean isValid(SecretKey key) { -return Arrays.equals(sign(mac, key, requestBody), requestSignature); +return MessageDigest.isEqual(sign(mac, key, requestBody), requestSignature); } private static Mac mac(String signatureAlgorithm) throws NoSuchAlgorithmException {
[kafka] branch 2.6 updated: KAFKA-12252 and KAFKA-12262: Fix session key rotation when leadership changes (#10014)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 2.6 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.6 by this push: new 7795ca0 KAFKA-12252 and KAFKA-12262: Fix session key rotation when leadership changes (#10014) 7795ca0 is described below commit 7795ca0fee650db181cd5e83c99d4cab15b8d37e Author: Chris Egerton AuthorDate: Wed May 5 17:11:15 2021 -0400 KAFKA-12252 and KAFKA-12262: Fix session key rotation when leadership changes (#10014) Author: Chris Egerton Reviewers: Greg Harris , Randall Hauch --- checkstyle/suppressions.xml| 3 + .../runtime/distributed/DistributedHerder.java | 9 +- .../runtime/distributed/DistributedHerderTest.java | 140 + 3 files changed, 126 insertions(+), 26 deletions(-) diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index f2f642c..8ecfe05 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -142,6 +142,9 @@ + + diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java index 93f6141..df90c45 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java @@ -409,7 +409,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable { log.debug("Scheduled rebalance at: {} (now: {} nextRequestTimeoutMs: {}) ", scheduledRebalance, now, nextRequestTimeoutMs); } -if (internalRequestValidationEnabled() && keyExpiration < Long.MAX_VALUE) { +if (isLeader() && internalRequestValidationEnabled() && keyExpiration < Long.MAX_VALUE) { nextRequestTimeoutMs = Math.min(nextRequestTimeoutMs, Math.max(keyExpiration - now, 0)); log.debug("Scheduled next key rotation at: {} (now: {} nextRequestTimeoutMs: {}) ", keyExpiration, now, nextRequestTimeoutMs); @@ -1620,10 +1620,11 @@ public class DistributedHerder extends AbstractHerder implements Runnable { synchronized (DistributedHerder.this) { DistributedHerder.this.sessionKey = sessionKey.key(); -// Track the expiration of the key if and only if this worker is the leader +// Track the expiration of the key. // Followers will receive rotated keys from the leader and won't be responsible for -// tracking expiration and distributing new keys themselves -if (isLeader() && keyRotationIntervalMs > 0) { +// tracking expiration and distributing new keys themselves, but may become leaders +// later on and will need to know when to update the key. +if (keyRotationIntervalMs > 0) { DistributedHerder.this.keyExpiration = sessionKey.creationTimestamp() + keyRotationIntervalMs; } } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java index 7ededef..519cda8 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java @@ -2170,6 +2170,84 @@ public class DistributedHerderTest { PowerMock.verifyAll(); } +@Test +public void testKeyRotationWhenWorkerBecomesLeader() throws Exception { +EasyMock.expect(member.memberId()).andStubReturn("member"); + EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V2); + +expectRebalance(1, Collections.emptyList(), Collections.emptyList()); +expectPostRebalanceCatchup(SNAPSHOT); +// First rebalance: poll indefinitely as no key has been read yet, so expiration doesn't come into play +member.poll(Long.MAX_VALUE); +EasyMock.expectLastCall(); + +expectRebalance(2, Collections.emptyList(), Collections.emptyList()); +SessionKey initialKey = new SessionKey(EasyMock.mock(SecretKey.class), 0); +ClusterConfigState snapshotWithKey = new ClusterConfigState(2, initialKey, Collections.singletonMap(CONN1, 3), +Collections.singletonMap(CONN1, CONN1_CONFIG), Collections.singletonMap(CONN1, TargetState.STARTED), +TASK_CONFIGS_MAP, Collections.emptySet()); +expectPostRebalanc
[kafka] branch 2.7 updated: KAFKA-12252 and KAFKA-12262: Fix session key rotation when leadership changes (#10014)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 2.7 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.7 by this push: new 91d41e9 KAFKA-12252 and KAFKA-12262: Fix session key rotation when leadership changes (#10014) 91d41e9 is described below commit 91d41e948fc1d5317d056960b99570718b3ea2c6 Author: Chris Egerton AuthorDate: Wed May 5 17:11:15 2021 -0400 KAFKA-12252 and KAFKA-12262: Fix session key rotation when leadership changes (#10014) Author: Chris Egerton Reviewers: Greg Harris , Randall Hauch --- checkstyle/suppressions.xml| 3 + .../runtime/distributed/DistributedHerder.java | 9 +- .../runtime/distributed/DistributedHerderTest.java | 140 + 3 files changed, 126 insertions(+), 26 deletions(-) diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index 97a96c8..1665414 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -142,6 +142,9 @@ + + diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java index 93f6141..df90c45 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java @@ -409,7 +409,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable { log.debug("Scheduled rebalance at: {} (now: {} nextRequestTimeoutMs: {}) ", scheduledRebalance, now, nextRequestTimeoutMs); } -if (internalRequestValidationEnabled() && keyExpiration < Long.MAX_VALUE) { +if (isLeader() && internalRequestValidationEnabled() && keyExpiration < Long.MAX_VALUE) { nextRequestTimeoutMs = Math.min(nextRequestTimeoutMs, Math.max(keyExpiration - now, 0)); log.debug("Scheduled next key rotation at: {} (now: {} nextRequestTimeoutMs: {}) ", keyExpiration, now, nextRequestTimeoutMs); @@ -1620,10 +1620,11 @@ public class DistributedHerder extends AbstractHerder implements Runnable { synchronized (DistributedHerder.this) { DistributedHerder.this.sessionKey = sessionKey.key(); -// Track the expiration of the key if and only if this worker is the leader +// Track the expiration of the key. // Followers will receive rotated keys from the leader and won't be responsible for -// tracking expiration and distributing new keys themselves -if (isLeader() && keyRotationIntervalMs > 0) { +// tracking expiration and distributing new keys themselves, but may become leaders +// later on and will need to know when to update the key. +if (keyRotationIntervalMs > 0) { DistributedHerder.this.keyExpiration = sessionKey.creationTimestamp() + keyRotationIntervalMs; } } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java index 7ededef..519cda8 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java @@ -2170,6 +2170,84 @@ public class DistributedHerderTest { PowerMock.verifyAll(); } +@Test +public void testKeyRotationWhenWorkerBecomesLeader() throws Exception { +EasyMock.expect(member.memberId()).andStubReturn("member"); + EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V2); + +expectRebalance(1, Collections.emptyList(), Collections.emptyList()); +expectPostRebalanceCatchup(SNAPSHOT); +// First rebalance: poll indefinitely as no key has been read yet, so expiration doesn't come into play +member.poll(Long.MAX_VALUE); +EasyMock.expectLastCall(); + +expectRebalance(2, Collections.emptyList(), Collections.emptyList()); +SessionKey initialKey = new SessionKey(EasyMock.mock(SecretKey.class), 0); +ClusterConfigState snapshotWithKey = new ClusterConfigState(2, initialKey, Collections.singletonMap(CONN1, 3), +Collections.singletonMap(CONN1, CONN1_CONFIG), Collections.singletonMap(CONN1, TargetState.STARTED), +TASK_CONFIGS_MAP, Collections.emptySet()); +expectPostRebalanc
[kafka] branch trunk updated (ebef7d0 -> be5889d)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from ebef7d0 MINOR: TestSecurityRollingUpgrade system test fixes (#10886) add be5889d MINOR: Use MessageDigest equals when comparing signature (#10898) No new revisions were added by this update. Summary of changes: .../apache/kafka/connect/runtime/rest/InternalRequestSignature.java| 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-)
[kafka] branch 2.8 updated: KAFKA-12252 and KAFKA-12262: Fix session key rotation when leadership changes (#10014)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 2.8 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.8 by this push: new de64b6c KAFKA-12252 and KAFKA-12262: Fix session key rotation when leadership changes (#10014) de64b6c is described below commit de64b6ce576ef5b0b339d5199d8fb13ac1f21e2b Author: Chris Egerton AuthorDate: Wed May 5 17:11:15 2021 -0400 KAFKA-12252 and KAFKA-12262: Fix session key rotation when leadership changes (#10014) Author: Chris Egerton Reviewers: Greg Harris , Randall Hauch --- checkstyle/suppressions.xml| 3 + .../runtime/distributed/DistributedHerder.java | 9 +- .../runtime/distributed/DistributedHerderTest.java | 105 - 3 files changed, 110 insertions(+), 7 deletions(-) diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index 13dc59d..db760cb 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -150,6 +150,9 @@ + + diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java index c312e05..cb2c4da 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java @@ -408,7 +408,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable { log.debug("Scheduled rebalance at: {} (now: {} nextRequestTimeoutMs: {}) ", scheduledRebalance, now, nextRequestTimeoutMs); } -if (internalRequestValidationEnabled() && keyExpiration < Long.MAX_VALUE) { +if (isLeader() && internalRequestValidationEnabled() && keyExpiration < Long.MAX_VALUE) { nextRequestTimeoutMs = Math.min(nextRequestTimeoutMs, Math.max(keyExpiration - now, 0)); log.debug("Scheduled next key rotation at: {} (now: {} nextRequestTimeoutMs: {}) ", keyExpiration, now, nextRequestTimeoutMs); @@ -1583,10 +1583,11 @@ public class DistributedHerder extends AbstractHerder implements Runnable { synchronized (DistributedHerder.this) { DistributedHerder.this.sessionKey = sessionKey.key(); -// Track the expiration of the key if and only if this worker is the leader +// Track the expiration of the key. // Followers will receive rotated keys from the leader and won't be responsible for -// tracking expiration and distributing new keys themselves -if (isLeader() && keyRotationIntervalMs > 0) { +// tracking expiration and distributing new keys themselves, but may become leaders +// later on and will need to know when to update the key. +if (keyRotationIntervalMs > 0) { DistributedHerder.this.keyExpiration = sessionKey.creationTimestamp() + keyRotationIntervalMs; } } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java index 65ec89c..1843868 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java @@ -2076,6 +2076,84 @@ public class DistributedHerderTest { PowerMock.verifyAll(); } +@Test +public void testKeyRotationWhenWorkerBecomesLeader() throws Exception { +EasyMock.expect(member.memberId()).andStubReturn("member"); + EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V2); + +expectRebalance(1, Collections.emptyList(), Collections.emptyList()); +expectPostRebalanceCatchup(SNAPSHOT); +// First rebalance: poll indefinitely as no key has been read yet, so expiration doesn't come into play +member.poll(Long.MAX_VALUE); +EasyMock.expectLastCall(); + +expectRebalance(2, Collections.emptyList(), Collections.emptyList()); +SessionKey initialKey = new SessionKey(EasyMock.mock(SecretKey.class), 0); +ClusterConfigState snapshotWithKey = new ClusterConfigState(2, initialKey, Collections.singletonMap(CONN1, 3), +Collections.singletonMap(CONN1, CONN1_CONFIG), Collections.singletonMap(CONN1, TargetState.STARTED), +TASK_CONFIGS_MAP, Collections.emptySet()); +expectPostRebalanc
[kafka] branch trunk updated (d881d11 -> 9ba583f)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from d881d11 MINOR: fix streams_broker_compatibility_test.py (#10632) add 9ba583f KAFKA-12252 and KAFKA-12262: Fix session key rotation when leadership changes (#10014) No new revisions were added by this update. Summary of changes: checkstyle/suppressions.xml| 3 + .../runtime/distributed/DistributedHerder.java | 9 +- .../runtime/distributed/DistributedHerderTest.java | 105 - 3 files changed, 110 insertions(+), 7 deletions(-)
[kafka] branch 2.8 updated: KAFKA-10340: Proactively close producer when cancelling source tasks (#10016)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 2.8 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.8 by this push: new c8d1a44 KAFKA-10340: Proactively close producer when cancelling source tasks (#10016) c8d1a44 is described below commit c8d1a440ef4dfef11970bfa3ee42b1f74206cf17 Author: Chris Egerton AuthorDate: Mon Mar 1 11:03:34 2021 -0500 KAFKA-10340: Proactively close producer when cancelling source tasks (#10016) Close the producer in `WorkerSourceTask` when the latter is cancelled. If the broker do not autocreate the topic, and the connector is not configured to create topics written by the source connector, then the `WorkerSourceTask` main thread will block forever until the topic is created, and will not stop if cancelled or scheduled for shutdown by the worker. Expanded an existing unit test for the WorkerSourceTask class to ensure that the producer is closed when the task is abandoned, and added a new integration test that guarantees that tasks are still shut down even when their producers are trying to write to topics that do not exist. Author: Chris Egerton Reviewed: Greg Harris , Randall Hauch --- .../org/apache/kafka/connect/runtime/Worker.java | 2 +- .../kafka/connect/runtime/WorkerSourceTask.java| 38 +- .../apache/kafka/connect/runtime/WorkerTask.java | 4 ++ .../integration/ConnectWorkerIntegrationTest.java | 45 -- .../kafka/connect/integration/ConnectorHandle.java | 8 ++-- .../integration/MonitorableSourceConnector.java| 1 + .../connect/runtime/ErrorHandlingTaskTest.java | 3 +- .../ErrorHandlingTaskWithTopicCreationTest.java| 3 +- .../connect/runtime/WorkerSourceTaskTest.java | 5 ++- .../WorkerSourceTaskWithTopicCreationTest.java | 5 ++- .../apache/kafka/connect/runtime/WorkerTest.java | 4 +- .../runtime/WorkerWithTopicCreationTest.java | 4 +- 12 files changed, 98 insertions(+), 24 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java index 45a93df..fffe71a 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java @@ -626,7 +626,7 @@ public class Worker { // Note we pass the configState as it performs dynamic transformations under the covers return new WorkerSourceTask(id, (SourceTask) task, statusListener, initialState, keyConverter, valueConverter, headerConverter, transformationChain, producer, admin, topicCreationGroups, -offsetReader, offsetWriter, config, configState, metrics, loader, time, retryWithToleranceOperator, herder.statusBackingStore()); +offsetReader, offsetWriter, config, configState, metrics, loader, time, retryWithToleranceOperator, herder.statusBackingStore(), executor); } else if (task instanceof SinkTask) { TransformationChain transformationChain = new TransformationChain<>(connConfig.transformations(), retryWithToleranceOperator); log.info("Initializing: {}", transformationChain); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java index 342fa73..48660f3 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java @@ -60,6 +60,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -86,6 +87,7 @@ class WorkerSourceTask extends WorkerTask { private final TopicAdmin admin; private final CloseableOffsetStorageReader offsetReader; private final OffsetStorageWriter offsetWriter; +private final Executor closeExecutor; private final SourceTaskMetricsGroup sourceTaskMetricsGroup; private final AtomicReference producerSendException; private final boolean isTopicTrackingEnabled; @@ -123,7 +125,8 @@ class WorkerSourceTask extends WorkerTask { ClassLoader loader, Time time, RetryWithToleranceOperator retryWithToleranceOperator, -StatusBackingStore statusBackingStore) { +StatusBackingStore statusBackingStore, +Execut
svn commit: r47324 - /release/kafka/2.6.1/
Author: rhauch Date: Thu Apr 22 00:31:47 2021 New Revision: 47324 Log: Removed the 2.6.1 release after 2.6.2 was released Removed: release/kafka/2.6.1/
[kafka-site] branch asf-site updated: Changed the 2.6.1 links to use the archives (#350)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/kafka-site.git The following commit(s) were added to refs/heads/asf-site by this push: new 2c1ad02 Changed the 2.6.1 links to use the archives (#350) 2c1ad02 is described below commit 2c1ad021bbd3ca69081e3d7e2316ed3c52bef383 Author: Randall Hauch AuthorDate: Wed Apr 21 17:16:01 2021 -0500 Changed the 2.6.1 links to use the archives (#350) Prior to removing the 2.6.1 release artifacts from the distribution SVN repo. --- downloads.html | 10 +- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/downloads.html b/downloads.html index 84bdb9d..ee52290 100644 --- a/downloads.html +++ b/downloads.html @@ -137,16 +137,16 @@ Released January 07, 2021 -https://downloads.apache.org/kafka/2.6.1/RELEASE_NOTES.html;>Release Notes +https://archive.apache.org/dist/kafka/2.6.1/RELEASE_NOTES.html;>Release Notes -Source download: https://www.apache.org/dyn/closer.cgi?path=/kafka/2.6.1/kafka-2.6.1-src.tgz;>kafka-2.6.1-src.tgz (https://downloads.apache.org/kafka/2.6.1/kafka-2.6.1-src.tgz.asc;>asc, https://downloads.apache.org/kafka/2.6.1/kafka-2.6.1-src.tgz.sha512;>sha512) +Source download: https://archive.apache.org/dist/kafka/2.6.1/kafka-2.6.1-src.tgz;>kafka-2.6.1-src.tgz (https://archive.apache.org/dist/kafka/2.6.1/kafka-2.6.1-src.tgz.asc;>asc, https://archive.apache.org/dist/kafka/2.6.1/kafka-2.6.1-src.tgz.sha512;>sha512) Binary downloads: -Scala 2.12 - https://www.apache.org/dyn/closer.cgi?path=/kafka/2.6.1/kafka_2.12-2.6.1.tgz;>kafka_2.12-2.6.1.tgz (https://downloads.apache.org/kafka/2.6.1/kafka_2.12-2.6.1.tgz.asc;>asc, https://downloads.apache.org/kafka/2.6.1/kafka_2.12-2.6.1.tgz.sha512;>sha512) -Scala 2.13 - https://www.apache.org/dyn/closer.cgi?path=/kafka/2.6.1/kafka_2.13-2.6.1.tgz;>kafka_2.13-2.6.1.tgz (https://downloads.apache.org/kafka/2.6.1/kafka_2.13-2.6.1.tgz.asc;>asc, https://downloads.apache.org/kafka/2.6.1/kafka_2.13-2.6.1.tgz.sha512;>sha512) +Scala 2.12 - https://archive.apache.org/dist/kafka/2.6.1/kafka_2.12-2.6.1.tgz;>kafka_2.12-2.6.1.tgz (https://archive.apache.org/dist/kafka/2.6.1/kafka_2.12-2.6.1.tgz.asc;>asc, https://archive.apache.org/dist/kafka/2.6.1/kafka_2.12-2.6.1.tgz.sha512;>sha512) +Scala 2.13 - https://archive.apache.org/dist/kafka/2.6.1/kafka_2.13-2.6.1.tgz;>kafka_2.13-2.6.1.tgz (https://archive.apache.org/dist/kafka/2.6.1/kafka_2.13-2.6.1.tgz.asc;>asc, https://archive.apache.org/dist/kafka/2.6.1/kafka_2.13-2.6.1.tgz.sha512;>sha512) We build for multiple versions of Scala. This only matters if you are using Scala and you want a version built for the same Scala version you use. Otherwise any version should work (2.13 is recommended). @@ -155,7 +155,7 @@ Kafka 2.6.1 fixes 41 issues since the 2.6.0 release. -For more information, please read the detailed https://downloads.apache.org/kafka/2.6.1/RELEASE_NOTES.html;>Release Notes. +For more information, please read the detailed https://archive.apache.org/dist/kafka/2.6.1/RELEASE_NOTES.html;>Release Notes.
svn commit: r47191 - in /release/kafka/2.6.2: ./ javadoc/ javadoc/org/ javadoc/org/apache/ javadoc/org/apache/kafka/ javadoc/org/apache/kafka/clients/ javadoc/org/apache/kafka/clients/admin/ javadoc/o
Author: rhauch Date: Tue Apr 20 02:20:52 2021 New Revision: 47191 Log: Release 2.6.2 [This commit notification would consist of 204 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.]
svn commit: r47186 - /release/kafka/KEYS
Author: rhauch Date: Mon Apr 19 19:06:50 2021 New Revision: 47186 Log: Add Anna Sophie Blee-Goldman's key Modified: release/kafka/KEYS Modified: release/kafka/KEYS == --- release/kafka/KEYS (original) +++ release/kafka/KEYS Mon Apr 19 19:06:50 2021 @@ -1473,3 +1473,70 @@ r05GDxy5ZsjgNv7QddXgea48Q7vIP6l1j2twi8ZP EsM12DbAcyLR20E/xs+YGvkOZA== =EPnI -END PGP PUBLIC KEY BLOCK- + +pub rsa4096 2021-01-29 [SC] [expires: 2029-01-27] + 562DADAF4FAF35118DB8C0DF241A59C30616924E +uid [ultimate] Anna Sophie Blee-Goldman +sub rsa4096 2021-01-29 [E] [expires: 2029-01-27] + +-BEGIN PGP PUBLIC KEY BLOCK- + +mQINBGAUXosBEAC2kZ+e/1P+6gf29Y784QLtrnSGyk67r7UDhZUq9Pb4E8B0Elf0 +ven520iX0C3vl2VxFod9aOrJmF+IfvKOHat/mJniYZ/KL0f5b2l1bQWDFYOp8p68 +LAGU2ojpWCfTwzm8WL1DQXp99SLytWjuCdMR0gmTfVv9HK+IYSEooo1+Puha+/Dm +FxTrNArb/UidH8ZUc6PrHmZtVKI1LncKI2884PRRsqlCRSTV6Z67ffVNxru3oNWR +AGSnxD3ocjVSCLY/lVN+65KcXCU1SumeVMIE6oa1UsJEMy/m/cGFDRnhQk+kcj9c +MDYDYrP3wgNeAezFkByWevVHZXwv9xhX99B7iA3tYXBLULObQODkDsuER3aaE2tC +bxJC6AL9Eg7XHSxTZuEQppwANeUcW+cu+JjgmBFdbTtQGN7U3jwKJCQsZRadH9ab +txPISP2Z9q/+6028c/fx1W4ipnMHdSa+j7HFV/XCjN89Eq2755dbx6eGpt5W9RdH +fQwPVeTv60sZnml74TGuR1kreRq2gBmnQqYkHCc24492bptL/k9oo4atwb9ajisL +qrf8X8NPS7WPvF3XeZ38wDTHOZ2ej0xL3GWo+hB77BUMpXtv2XRoU0R+lOY29vyd +LXuQn7pILXS3ZMNbFXAgA4lHCAS4osCXx51PIIiG+dTvtbH3lav5LtqeLwARAQAB +tDJBbm5hIFNvcGhpZSBCbGVlLUdvbGRtYW4gPGFibGVlZ29sZG1hbkBhcGFjaGUu +b3JnPokCVAQTAQoAPhYhBFYtra9PrzURjbjA3yQaWcMGFpJOBQJgFF6LAhsDBQkP +CZwABQsJCAcDBRUKCQgLBRYCAwEAAh4BAheAAAoJECQaWcMGFpJOatUP/1JP/qc9 +7k5PkZBvbR1qxC4gAUfglcfK0AsHquxalOtYuszdd681q4zMzpMB+qhSBloVYH9c +eDpFPd8qXn7c0hWZ1XAuLpGZfrcjV2fH0B95iPbz/yWigrSWylgVHwLcVdHS75VX +2Emdq74RDT7YxnCEeM9zr+O3sO8feivF20i4cInFk6csDSewF86pqnVMNUwVn97z +9RS2SsW7o3tEvH7lXfb0bZcFphijV6fSAzTrqC0nMqDhHCwfjpdroJvABBur4E+1 +yjzbJ+RjcRON7GKw5MMBz0L38UIeoPedfrZa0ZLjVT5d5AdgoBG25NLw3NoqiLJg +rom+64eY46pW7Oe+/admFOvQ35QKg54ggre0Yj/Df3fuuxFv7i4/Di4SUgpbayj2 +YI4zUKdWfSFWb2YnRukn1yVet4zdcvs7Jstv1JQbK1TGI6I7g0IIyeXDrx3nt6bJ +M1/AN051USUmqVvY0OvY0vipRpc1sEBeqHyqLgLrzeWE3mGk72sPg8oKe75Ef8vj +Ks0VJCSrcn9JCEshlj+fpxbjMI0QbGkPpFx2OxM7YM5UDZzOzb92tqfZ4YD3Iaf5 +oH4zRpHoIPbNL5HVT2PIajllfYrlhkqCmRu6w6rdf4JLzimjS1qdPTY7umEg16Yq +fcC0p1bz5LvsQDBUk1CmRUCi5qIFNDMiRGYAiQGzBBABCgAdFiEEGRvLb7+3Diqe +9tUctaLporUHmhoFAmAUYOEACgkQtaLporUHmhoIEQv/R6X5x0wWXZIUm/fKeySP +MKUSBt1NTGiownyQC9lhkxRvWwhaXEweh4wadWyuvjyKP0tdzdZt1Z/F3M2zknqq +2MrvQP2PlXfPgg5yVz6ulMPizkCD8IF0QNShzX45JUENq7e2vrQFm9u/baqRO5Hj +KeGR1pMFhahsSnztZeOB+Hs/E3vgSGmdnQX/UEWxnae+wegEaWGNz8+hLQOYxiKA +PZ4jaARAKNoMEd42g1DarUM8MZLx5NodCtDCuze9BfK3OIbfGnGVPhBiLbtb5kGt +J8r6nqJzGnHKrOo4A/agJmVPRlG7rLmiOzkVVgXLdBK/h9QNCrDOskZ5E8tui44T +MNE2n/xBC8Q0XvdfCyieAzohfSBsJ5ckNa9IbOlGjX9ebEzTLT7d6fHrw7KtTRPx +cLiOnlaufm5jh3rDEDki4Sdkm9Lsq2PRsMl5pbcA2zttZ1pcKlg9Vwcljny/BvX2 +ge5fdIQ1AKc8zEsAnLXgnEiQkVwmlaK7woRwKFU8n7XNuQINBGAUXosBEADdL82m +gBsBvAWjSv6wdfYfv5W04MxVgYTOKcgQdd1Lg/U0DdPXCBuVJaii+wqs1dPDnAzN +fEwrmaZ1O0vj3S2D7QBo5/6asaGAptFltY6BTuu6YBr7JZ02XSCR2mv2ms/dxRI5 +gMOU0PYJfavVavStG2hLoaTS7YAF7DFZyE8IF4n2xaBU7t/5L4UVD4bbaR4Pq7uB +HQUR5BCNF8hhtEoS6I5Gdrpp2FH3icFBusAE6E6GEU4VPTb37LukbZgMdZpzyr+Q +NmMJAs0tPJ7kvnbHdTCxNDRoHQHHRhlKBCbd1mNd5H5UMQQWSEQ2Pezq5DPaHzpz +724egzYf+JXsrjBB+QtG7nAko/FAAKuviwGt4XUN1RB1lzeJ4jEvi6n3sEX/HEjK +Bzl3phIv4znE3HxJKuOfnjLS+6fcXd3Tva4S4aLVC+hctYQ51GPzJC+gSskjcIsW +O/i8QIR3zAxqQZhp/dv1KR3tqWswwejf9u0bYcudF15E6QYSA+V9YYqskobFNT06 +ihq9iDLffEnaNfMFGrljvgV6uAF5dkfjOrk8PCW7VOl5tKYAIJcRl2QRz3QHEheV +f4y0/C7yxZU5Ot7MiGI8iE5Xg7iAyff+Mrd2Bo/aEEVBVh9PFbIpfX6p13vQhikH +WtLtWuIYpfjnPHJOHlL2nxXnfEFjMmtQdnvGKwARAQABiQI8BBgBCgAmFiEEVi2t +r0+vNRGNuMDfJBpZwwYWkk4FAmAUXosCGwwFCQ8JnAAACgkQJBpZwwYWkk4S2RAA +tGNi38PRlSiUrLqCIJV3ShKVOxNi5SNhMYQD4CbHkPeysd7bmsYka7TPvzLuCvwX +LpuCIn9UQVWIU4xLnaiKfQ8wlkPvorh1dj8pR/yDUXiCS72mCSDej6aJy5TGiBDz +Lpe0O01Mq40jtKmLbWRWWoXdS6o+tte0drazXWSewXzChEgvvrvTlTssrU7pf/fp +pVW0HzpCFCOfqIuLKpMbinOKdVDvPD8AYC3zKxCKrE2j0VU3CjJnunHuwk5ASZqV +VytfRPbF3Ii9xZolvJ99zb/VNxJzU3yJHF0vctpE5FkVRT+DxfFAarDBphnpwOwn +NbizWgUCvvuB6fPDgVEbMqsmJVxebNWtzC+LTIq1h8xNJfPFOuUVnFkJTj6pER8A +yGhzPNt6wQMBJ8QcJBlNBTZIlskMCNiCCSUdCqFoWekPHeXEKP5Z1xGQFhKRKbE7 +JPwfQWf1MnCfKS70VKezuCBmxQm07NdBJrjlavIy+9Hxr9SG2/TRDenEiYDOG85o +dN5JVTwBnkJN0xpceZX8xfNUsZVz2ecVPiefds/CC8OZpHoYqgn2GJ5ko3YkNB+X +HWTr9cAsWRC2lTUXuDx3BG/TSbA58wCrSJ15JsKmEkSi5LcJz6a/CAObL0lBpo1i +qLcArxstPLcceYk9xSRBWVeoZJLCLILuBJ6r7FdKpKs= +=Wi95 +-END PGP PUBLIC KEY BLOCK-
[kafka-site] branch asf-site updated: Add Randall Hauch as PMC member (#348)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/kafka-site.git The following commit(s) were added to refs/heads/asf-site by this push: new 72e Add Randall Hauch as PMC member (#348) 72e is described below commit 72e974281e6d68d19ab25fc196fc3ca1030f Author: Randall Hauch AuthorDate: Mon Apr 19 11:59:59 2021 -0500 Add Randall Hauch as PMC member (#348) --- committers.html | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/committers.html b/committers.html index 6a14c29..42a5f1f 100644 --- a/committers.html +++ b/committers.html @@ -265,7 +265,7 @@ Randall Hauch - Committer + Committer, and PMC member https://www.linkedin.com/in/randallhauch/;>/in/randallhauch https://twitter.com/rhauch;>@rhauch
[kafka] branch 2.8 updated: KAFKA-12474: Handle failure to write new session keys gracefully (#10396)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 2.8 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.8 by this push: new d91d64f KAFKA-12474: Handle failure to write new session keys gracefully (#10396) d91d64f is described below commit d91d64f2ca1e0f39ffe14b8056edcab1be837df2 Author: Chris Egerton AuthorDate: Thu Apr 1 13:26:01 2021 -0400 KAFKA-12474: Handle failure to write new session keys gracefully (#10396) If a distributed worker fails to write (or read back) a new session key to/from the config topic, it dies. This fix softens the blow a bit by instead restarting the herder tick loop anew and forcing a read to the end of the config topic until the worker is able to successfully read to the end. At this point, if the worker was able to successfully write a new session key in its first attempt, it will have read that key back from the config topic and will not write a new key during the next tick iteration. If it was not able to write that key at all, it will try again to write a new key (if it is still the leader). Verified with new unit tests for both cases (failure to write, failure to read back after write). Author: Chris Egerton Reviewers: Greg Harris , Randall Hauch --- .../runtime/distributed/DistributedHerder.java | 20 -- .../runtime/distributed/DistributedHerderTest.java | 82 ++ 2 files changed, 97 insertions(+), 5 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java index 16dfbf9..c312e05 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java @@ -363,10 +363,16 @@ public class DistributedHerder extends AbstractHerder implements Runnable { if (checkForKeyRotation(now)) { log.debug("Distributing new session key"); keyExpiration = Long.MAX_VALUE; -configBackingStore.putSessionKey(new SessionKey( -keyGenerator.generateKey(), -now -)); +try { +configBackingStore.putSessionKey(new SessionKey( +keyGenerator.generateKey(), +now +)); +} catch (Exception e) { +log.info("Failed to write new session key to config topic; forcing a read to the end of the config topic before possibly retrying"); +canReadConfigs = false; +return; +} } // Process any external requests @@ -1173,7 +1179,11 @@ public class DistributedHerder extends AbstractHerder implements Runnable { * @return true if successful, false if timed out */ private boolean readConfigToEnd(long timeoutMs) { -log.info("Current config state offset {} is behind group assignment {}, reading to end of config log", configState.offset(), assignment.offset()); +if (configState.offset() < assignment.offset()) { +log.info("Current config state offset {} is behind group assignment {}, reading to end of config log", configState.offset(), assignment.offset()); +} else { +log.info("Reading to end of config log; current config state offset: {}", configState.offset()); +} try { configBackingStore.refresh(timeoutMs, TimeUnit.MILLISECONDS); configState = configBackingStore.snapshot(); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java index e31a03f..65ec89c 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java @@ -23,11 +23,13 @@ import org.apache.kafka.connect.connector.Connector; import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy; import org.apache.kafka.connect.connector.policy.NoneConnectorClientConfigOverridePolicy; import org.apache.kafka.connect.errors.AlreadyExistsException; +import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.NotFoundException; import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup; import org.apache.kafka.connect.runtime.ConnectorConfig; import org.apache.kafka.connect.runtime.Herder; import org.apa
[kafka] branch 2.5 updated: KAFKA-12474: Handle failure to write new session keys gracefully (#10396)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 2.5 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.5 by this push: new d5e85ef KAFKA-12474: Handle failure to write new session keys gracefully (#10396) d5e85ef is described below commit d5e85efb95fdf0e2a0a446d5d789402799deb148 Author: Chris Egerton AuthorDate: Thu Apr 1 13:26:01 2021 -0400 KAFKA-12474: Handle failure to write new session keys gracefully (#10396) If a distributed worker fails to write (or read back) a new session key to/from the config topic, it dies. This fix softens the blow a bit by instead restarting the herder tick loop anew and forcing a read to the end of the config topic until the worker is able to successfully read to the end. At this point, if the worker was able to successfully write a new session key in its first attempt, it will have read that key back from the config topic and will not write a new key during the next tick iteration. If it was not able to write that key at all, it will try again to write a new key (if it is still the leader). Verified with new unit tests for both cases (failure to write, failure to read back after write). Author: Chris Egerton Reviewers: Greg Harris , Randall Hauch --- .../runtime/distributed/DistributedHerder.java | 20 -- .../runtime/distributed/DistributedHerderTest.java | 82 ++ 2 files changed, 97 insertions(+), 5 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java index 6e8b6d7..bd0fbe7 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java @@ -358,10 +358,16 @@ public class DistributedHerder extends AbstractHerder implements Runnable { if (checkForKeyRotation(now)) { keyExpiration = Long.MAX_VALUE; -configBackingStore.putSessionKey(new SessionKey( -keyGenerator.generateKey(), -now -)); +try { +configBackingStore.putSessionKey(new SessionKey( +keyGenerator.generateKey(), +now +)); +} catch (Exception e) { +log.info("Failed to write new session key to config topic; forcing a read to the end of the config topic before possibly retrying"); +canReadConfigs = false; +return; +} } // Process any external requests @@ -1121,7 +1127,11 @@ public class DistributedHerder extends AbstractHerder implements Runnable { * @return true if successful, false if timed out */ private boolean readConfigToEnd(long timeoutMs) { -log.info("Current config state offset {} is behind group assignment {}, reading to end of config log", configState.offset(), assignment.offset()); +if (configState.offset() < assignment.offset()) { +log.info("Current config state offset {} is behind group assignment {}, reading to end of config log", configState.offset(), assignment.offset()); +} else { +log.info("Reading to end of config log; current config state offset: {}", configState.offset()); +} try { configBackingStore.refresh(timeoutMs, TimeUnit.MILLISECONDS); configState = configBackingStore.snapshot(); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java index 2a96850..bd93e33 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java @@ -26,11 +26,13 @@ import org.apache.kafka.connect.connector.ConnectorContext; import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy; import org.apache.kafka.connect.connector.policy.NoneConnectorClientConfigOverridePolicy; import org.apache.kafka.connect.errors.AlreadyExistsException; +import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.NotFoundException; import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup; import org.apache.kafka.connect.runtime.ConnectorConfig; import org.apache.kafka.connect.runtime.Herder; import org.apache.kafka.connect.runtime.MockConnectMetrics; +import org.apach
[kafka] branch 2.6 updated: KAFKA-12474: Handle failure to write new session keys gracefully (#10396)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 2.6 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.6 by this push: new 6a080db KAFKA-12474: Handle failure to write new session keys gracefully (#10396) 6a080db is described below commit 6a080dbe1e31819f4138c351bbd43aa1e75c2220 Author: Chris Egerton AuthorDate: Thu Apr 1 13:26:01 2021 -0400 KAFKA-12474: Handle failure to write new session keys gracefully (#10396) If a distributed worker fails to write (or read back) a new session key to/from the config topic, it dies. This fix softens the blow a bit by instead restarting the herder tick loop anew and forcing a read to the end of the config topic until the worker is able to successfully read to the end. At this point, if the worker was able to successfully write a new session key in its first attempt, it will have read that key back from the config topic and will not write a new key during the next tick iteration. If it was not able to write that key at all, it will try again to write a new key (if it is still the leader). Verified with new unit tests for both cases (failure to write, failure to read back after write). Author: Chris Egerton Reviewers: Greg Harris , Randall Hauch --- .../runtime/distributed/DistributedHerder.java | 20 -- .../runtime/distributed/DistributedHerderTest.java | 82 ++ 2 files changed, 97 insertions(+), 5 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java index b6dab25..93f6141 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java @@ -364,10 +364,16 @@ public class DistributedHerder extends AbstractHerder implements Runnable { if (checkForKeyRotation(now)) { log.debug("Distributing new session key"); keyExpiration = Long.MAX_VALUE; -configBackingStore.putSessionKey(new SessionKey( -keyGenerator.generateKey(), -now -)); +try { +configBackingStore.putSessionKey(new SessionKey( +keyGenerator.generateKey(), +now +)); +} catch (Exception e) { +log.info("Failed to write new session key to config topic; forcing a read to the end of the config topic before possibly retrying"); +canReadConfigs = false; +return; +} } // Process any external requests @@ -1186,7 +1192,11 @@ public class DistributedHerder extends AbstractHerder implements Runnable { * @return true if successful, false if timed out */ private boolean readConfigToEnd(long timeoutMs) { -log.info("Current config state offset {} is behind group assignment {}, reading to end of config log", configState.offset(), assignment.offset()); +if (configState.offset() < assignment.offset()) { +log.info("Current config state offset {} is behind group assignment {}, reading to end of config log", configState.offset(), assignment.offset()); +} else { +log.info("Reading to end of config log; current config state offset: {}", configState.offset()); +} try { configBackingStore.refresh(timeoutMs, TimeUnit.MILLISECONDS); configState = configBackingStore.snapshot(); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java index bed46a3..7ededef 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java @@ -23,12 +23,14 @@ import org.apache.kafka.connect.connector.Connector; import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy; import org.apache.kafka.connect.connector.policy.NoneConnectorClientConfigOverridePolicy; import org.apache.kafka.connect.errors.AlreadyExistsException; +import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.NotFoundException; import org.apache.kafka.connect.runtime.CloseableConnectorContext; import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup; import org.apache.kafka.connect.runtime.ConnectorConfig; im
[kafka] branch 2.7 updated: KAFKA-12474: Handle failure to write new session keys gracefully (#10396)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 2.7 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.7 by this push: new fe18043 KAFKA-12474: Handle failure to write new session keys gracefully (#10396) fe18043 is described below commit fe1804370680b965a68fdd2978e2afa450daafe4 Author: Chris Egerton AuthorDate: Thu Apr 1 13:26:01 2021 -0400 KAFKA-12474: Handle failure to write new session keys gracefully (#10396) If a distributed worker fails to write (or read back) a new session key to/from the config topic, it dies. This fix softens the blow a bit by instead restarting the herder tick loop anew and forcing a read to the end of the config topic until the worker is able to successfully read to the end. At this point, if the worker was able to successfully write a new session key in its first attempt, it will have read that key back from the config topic and will not write a new key during the next tick iteration. If it was not able to write that key at all, it will try again to write a new key (if it is still the leader). Verified with new unit tests for both cases (failure to write, failure to read back after write). Author: Chris Egerton Reviewers: Greg Harris , Randall Hauch --- .../runtime/distributed/DistributedHerder.java | 20 -- .../runtime/distributed/DistributedHerderTest.java | 82 ++ 2 files changed, 97 insertions(+), 5 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java index b6dab25..93f6141 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java @@ -364,10 +364,16 @@ public class DistributedHerder extends AbstractHerder implements Runnable { if (checkForKeyRotation(now)) { log.debug("Distributing new session key"); keyExpiration = Long.MAX_VALUE; -configBackingStore.putSessionKey(new SessionKey( -keyGenerator.generateKey(), -now -)); +try { +configBackingStore.putSessionKey(new SessionKey( +keyGenerator.generateKey(), +now +)); +} catch (Exception e) { +log.info("Failed to write new session key to config topic; forcing a read to the end of the config topic before possibly retrying"); +canReadConfigs = false; +return; +} } // Process any external requests @@ -1186,7 +1192,11 @@ public class DistributedHerder extends AbstractHerder implements Runnable { * @return true if successful, false if timed out */ private boolean readConfigToEnd(long timeoutMs) { -log.info("Current config state offset {} is behind group assignment {}, reading to end of config log", configState.offset(), assignment.offset()); +if (configState.offset() < assignment.offset()) { +log.info("Current config state offset {} is behind group assignment {}, reading to end of config log", configState.offset(), assignment.offset()); +} else { +log.info("Reading to end of config log; current config state offset: {}", configState.offset()); +} try { configBackingStore.refresh(timeoutMs, TimeUnit.MILLISECONDS); configState = configBackingStore.snapshot(); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java index bed46a3..7ededef 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java @@ -23,12 +23,14 @@ import org.apache.kafka.connect.connector.Connector; import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy; import org.apache.kafka.connect.connector.policy.NoneConnectorClientConfigOverridePolicy; import org.apache.kafka.connect.errors.AlreadyExistsException; +import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.NotFoundException; import org.apache.kafka.connect.runtime.CloseableConnectorContext; import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup; import org.apache.kafka.connect.runtime.ConnectorConfig; im
[kafka] branch trunk updated (4ed7f2c -> aea059a)
This is an automated email from the ASF dual-hosted git repository. rhauch pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from 4ed7f2c KAFKA-12593: Fix Apache License headers (#10452) add aea059a KAFKA-12474: Handle failure to write new session keys gracefully (#10396) No new revisions were added by this update. Summary of changes: .../runtime/distributed/DistributedHerder.java | 20 -- .../runtime/distributed/DistributedHerderTest.java | 82 ++ 2 files changed, 97 insertions(+), 5 deletions(-)