[kafka] branch trunk updated (82d5720aae7 -> 322ac86ba28)
This is an automated email from the ASF dual-hosted git repository. showuon pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from 82d5720aae7 KAFKA-14253 - More informative logging (#13253) add 322ac86ba28 KAFKA-14706: Move/rewrite ShutdownableThread to server-common module. (#13234) No new revisions were added by this update. Summary of changes: build.gradle | 2 +- checkstyle/import-control-core.xml | 1 + .../scala/kafka/common/InterBrokerSendThread.scala | 8 +- .../common/ZkNodeChangeNotificationListener.scala | 5 +- .../controller/ControllerChannelManager.scala | 6 +- .../kafka/controller/ControllerEventManager.scala | 11 +- core/src/main/scala/kafka/log/LogCleaner.scala | 6 +- .../scala/kafka/log/remote/RemoteIndexCache.scala | 4 +- core/src/main/scala/kafka/raft/RaftManager.scala | 11 +- .../kafka/raft/TimingWheelExpirationService.scala | 6 +- .../scala/kafka/server/AbstractFetcherThread.scala | 7 +- .../scala/kafka/server/ClientQuotaManager.scala| 3 +- .../main/scala/kafka/server/DelayedOperation.scala | 2 +- .../server/FinalizedFeatureChangeListener.scala| 8 +- .../main/scala/kafka/server/ReplicaManager.scala | 2 +- .../kafka/tools/ReplicaVerificationTool.scala | 6 +- .../main/scala/kafka/tools/TestRaftServer.scala| 6 +- .../scala/kafka/utils/ShutdownableThread.scala | 113 .../kafka/api/AbstractConsumerTest.scala | 3 +- .../integration/kafka/api/ConsumerBounceTest.scala | 3 +- .../kafka/api/TransactionsBounceTest.scala | 3 +- .../server/DynamicBrokerReconfigurationTest.scala | 5 +- .../other/kafka/TestPurgatoryPerformance.scala | 6 +- .../server/ReplicaManagerConcurrencyTest.scala | 11 +- .../unit/kafka/utils/ShutdownableThreadTest.scala | 2 +- .../kafka/server/util/ShutdownableThread.java | 146 + 26 files changed, 222 insertions(+), 164 deletions(-) delete mode 100644 core/src/main/scala/kafka/utils/ShutdownableThread.scala create mode 100644 server-common/src/main/java/org/apache/kafka/server/util/ShutdownableThread.java
[kafka] branch trunk updated: KAFKA-14253 - More informative logging (#13253)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 82d5720aae7 KAFKA-14253 - More informative logging (#13253) 82d5720aae7 is described below commit 82d5720aae78c9e17606c8345dfc208557f9a8f2 Author: Philip Nee AuthorDate: Thu Feb 16 16:54:50 2023 -0800 KAFKA-14253 - More informative logging (#13253) Includes 2 requirements from the ticket: * Include the number of members in the group (I.e., "15 members participating" and "to 15 clients as") * Sort the member ids (to help compare the membership and assignment across rebalances) Reviewers: Guozhang Wang --- .../internals/StreamsPartitionAssignor.java| 22 ++ 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java index 1875f57b649..46c1e41e6c1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java @@ -76,9 +76,8 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; import java.util.stream.Collectors; - +import static java.util.Map.Entry.comparingByKey; import static java.util.UUID.randomUUID; - import static org.apache.kafka.common.utils.Utils.filterMap; import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchCommittedOffsets; import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchEndOffsetsFuture; @@ -619,10 +618,12 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf final boolean lagComputationSuccessful = populateClientStatesMap(clientStates, clientMetadataMap, taskForPartition, changelogTopics); -log.info("All members participating in this rebalance: \n{}.", - clientStates.entrySet().stream() - .map(entry -> entry.getKey() + ": " + entry.getValue().consumers()) - .collect(Collectors.joining(Utils.NL))); +log.info("{} members participating in this rebalance: \n{}.", +clientStates.size(), +clientStates.entrySet().stream() +.sorted(comparingByKey()) +.map(entry -> entry.getKey() + ": " + entry.getValue().consumers()) +.collect(Collectors.joining(Utils.NL))); final Set allTasks = partitionsForTask.keySet(); statefulTasks.addAll(changelogTopics.statefulTaskIds()); @@ -637,8 +638,13 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf statefulTasks, assignmentConfigs); -log.info("Assigned tasks {} including stateful {} to clients as: \n{}.", -allTasks, statefulTasks, clientStates.entrySet().stream() +log.info("{} assigned tasks {} including stateful {} to {} clients as: \n{}.", +allTasks.size(), +allTasks, +statefulTasks, +clientStates.size(), +clientStates.entrySet().stream() +.sorted(comparingByKey()) .map(entry -> entry.getKey() + "=" + entry.getValue().currentAssignment()) .collect(Collectors.joining(Utils.NL)));
[kafka] branch 3.4 updated: KAFKA-14727: Enable periodic offset commits for EOS source tasks (#13262)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch 3.4 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.4 by this push: new c8d0168b1fb KAFKA-14727: Enable periodic offset commits for EOS source tasks (#13262) c8d0168b1fb is described below commit c8d0168b1fba07f1f782ba862058765df9df6797 Author: Greg Harris AuthorDate: Thu Feb 16 15:51:34 2023 -0800 KAFKA-14727: Enable periodic offset commits for EOS source tasks (#13262) Reviewers: Chris Egerton --- .../connect/runtime/AbstractWorkerSourceTask.java | 4 +- .../runtime/ExactlyOnceWorkerSourceTask.java | 20 ++--- .../kafka/connect/storage/OffsetStorageWriter.java | 7 --- .../runtime/ExactlyOnceWorkerSourceTaskTest.java | 51 +- 4 files changed, 39 insertions(+), 43 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java index ff15f631a73..fb3c04be6cf 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java @@ -353,8 +353,10 @@ public abstract class AbstractWorkerSourceTask extends WorkerTask { recordPollReturned(toSend.size(), time.milliseconds() - start); } } -if (toSend == null) +if (toSend == null) { +batchDispatched(); continue; +} log.trace("{} About to send {} records to Kafka", this, toSend.size()); if (sendRecords()) { batchDispatched(); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java index 2642ae776ac..21f6bd4f59d 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java @@ -255,10 +255,6 @@ class ExactlyOnceWorkerSourceTask extends AbstractWorkerSourceTask { long started = time.milliseconds(); -// We might have just aborted a transaction, in which case we'll have to begin a new one -// in order to commit offsets -maybeBeginTransaction(); - AtomicReference flushError = new AtomicReference<>(); boolean shouldFlush = false; try { @@ -269,6 +265,20 @@ class ExactlyOnceWorkerSourceTask extends AbstractWorkerSourceTask { } catch (Throwable e) { flushError.compareAndSet(null, e); } +if (flushError.get() == null && !transactionOpen && !shouldFlush) { +// There is no contents on the framework side to commit, so skip the offset flush and producer commit +long durationMillis = time.milliseconds() - started; +recordCommitSuccess(durationMillis); +log.debug("{} Finished commitOffsets successfully in {} ms", this, durationMillis); + +commitSourceTask(); +return; +} + +// We might have just aborted a transaction, in which case we'll have to begin a new one +// in order to commit offsets +maybeBeginTransaction(); + if (shouldFlush) { // Now we can actually write the offsets to the internal topic. // No need to track the flush future here since it's guaranteed to complete by the time @@ -393,7 +403,7 @@ class ExactlyOnceWorkerSourceTask extends AbstractWorkerSourceTask { } private void maybeCommitTransaction(boolean shouldCommit) { -if (shouldCommit && (transactionOpen || offsetWriter.willFlush())) { +if (shouldCommit) { try (LoggingContext loggingContext = LoggingContext.forOffsets(id)) { commitTransaction(); } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java index 692669e7544..cb944034db1 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java @@ -150,13 +150,6 @@ public class OffsetStorageWriter { } } -/** - * @return whether there's anything to flush right now. - */ -public synchronized boolean willFlush() { -return !data.isEmpty(); -} - /** * Flush the current offsets and clear them
[kafka] branch trunk updated: KAFKA-14727: Enable periodic offset commits for EOS source tasks (#13262)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new aea6090ce47 KAFKA-14727: Enable periodic offset commits for EOS source tasks (#13262) aea6090ce47 is described below commit aea6090ce479a06ea5489a54aeecf9b40233a3a1 Author: Greg Harris AuthorDate: Thu Feb 16 15:51:34 2023 -0800 KAFKA-14727: Enable periodic offset commits for EOS source tasks (#13262) Reviewers: Chris Egerton --- .../connect/runtime/AbstractWorkerSourceTask.java | 4 +- .../runtime/ExactlyOnceWorkerSourceTask.java | 20 ++--- .../kafka/connect/storage/OffsetStorageWriter.java | 7 --- .../runtime/ExactlyOnceWorkerSourceTaskTest.java | 51 +- 4 files changed, 39 insertions(+), 43 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java index ff15f631a73..fb3c04be6cf 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java @@ -353,8 +353,10 @@ public abstract class AbstractWorkerSourceTask extends WorkerTask { recordPollReturned(toSend.size(), time.milliseconds() - start); } } -if (toSend == null) +if (toSend == null) { +batchDispatched(); continue; +} log.trace("{} About to send {} records to Kafka", this, toSend.size()); if (sendRecords()) { batchDispatched(); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java index d4ef5ba8106..8b4a8d3c9cc 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java @@ -255,10 +255,6 @@ class ExactlyOnceWorkerSourceTask extends AbstractWorkerSourceTask { long started = time.milliseconds(); -// We might have just aborted a transaction, in which case we'll have to begin a new one -// in order to commit offsets -maybeBeginTransaction(); - AtomicReference flushError = new AtomicReference<>(); boolean shouldFlush = false; try { @@ -269,6 +265,20 @@ class ExactlyOnceWorkerSourceTask extends AbstractWorkerSourceTask { } catch (Throwable e) { flushError.compareAndSet(null, e); } +if (flushError.get() == null && !transactionOpen && !shouldFlush) { +// There is no contents on the framework side to commit, so skip the offset flush and producer commit +long durationMillis = time.milliseconds() - started; +recordCommitSuccess(durationMillis); +log.debug("{} Finished commitOffsets successfully in {} ms", this, durationMillis); + +commitSourceTask(); +return; +} + +// We might have just aborted a transaction, in which case we'll have to begin a new one +// in order to commit offsets +maybeBeginTransaction(); + if (shouldFlush) { // Now we can actually write the offsets to the internal topic. // No need to track the flush future here since it's guaranteed to complete by the time @@ -393,7 +403,7 @@ class ExactlyOnceWorkerSourceTask extends AbstractWorkerSourceTask { } private void maybeCommitTransaction(boolean shouldCommit) { -if (shouldCommit && (transactionOpen || offsetWriter.willFlush())) { +if (shouldCommit) { try (LoggingContext loggingContext = LoggingContext.forOffsets(id)) { commitTransaction(); } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java index d3141d4758e..89e7824a65b 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java @@ -150,13 +150,6 @@ public class OffsetStorageWriter { } } -/** - * @return whether there's anything to flush right now. - */ -public synchronized boolean willFlush() { -return !data.isEmpty(); -} - /** * Flush the current offsets and clear
[kafka] branch 3.3 updated: Kafka 14565: On failure, close AutoCloseable objects instantiated and configured by AbstractConfig (#13168)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch 3.3 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.3 by this push: new 5ec2fed20e5 Kafka 14565: On failure, close AutoCloseable objects instantiated and configured by AbstractConfig (#13168) 5ec2fed20e5 is described below commit 5ec2fed20e51bfa4653f44424bc198cc7cdbf1f6 Author: Terry AuthorDate: Thu Feb 16 12:39:24 2023 -0500 Kafka 14565: On failure, close AutoCloseable objects instantiated and configured by AbstractConfig (#13168) Reviewers: Chris Egerton --- .../apache/kafka/common/config/AbstractConfig.java | 101 + .../kafka/clients/consumer/KafkaConsumerTest.java | 27 ++ .../kafka/clients/producer/KafkaProducerTest.java | 24 + .../kafka/common/config/AbstractConfigTest.java| 26 +- .../apache/kafka/test/MockConsumerInterceptor.java | 14 +++ .../apache/kafka/test/MockProducerInterceptor.java | 14 +++ 6 files changed, 166 insertions(+), 40 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java index e3fda4d9f54..13637163311 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java +++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java @@ -66,44 +66,43 @@ public class AbstractConfig { * Construct a configuration with a ConfigDef and the configuration properties, which can include properties * for zero or more {@link ConfigProvider} that will be used to resolve variables in configuration property * values. - * + * * The originals is a name-value pair configuration properties and optional config provider configs. The * value of the configuration can be a variable as defined below or the actual value. This constructor will * first instantiate the ConfigProviders using the config provider configs, then it will find all the * variables in the values of the originals configurations, attempt to resolve the variables using the named * ConfigProviders, and then parse and validate the configurations. - * + * * ConfigProvider configs can be passed either as configs in the originals map or in the separate * configProviderProps map. If config providers properties are passed in the configProviderProps any config * provider properties in originals map will be ignored. If ConfigProvider properties are not provided, the * constructor will skip the variable substitution step and will simply validate and parse the supplied * configuration. - * + * * The "{@code config.providers}" configuration property and all configuration properties that begin with the * "{@code config.providers.}" prefix are reserved. The "{@code config.providers}" configuration property * specifies the names of the config providers, and properties that begin with the "{@code config.providers..}" * prefix correspond to the properties for that named provider. For example, the "{@code config.providers..class}" * property specifies the name of the {@link ConfigProvider} implementation class that should be used for * the provider. - * + * * The keys for ConfigProvider configs in both originals and configProviderProps will start with the above * mentioned "{@code config.providers.}" prefix. - * + * * Variables have the form "${providerName:[path:]key}", where "providerName" is the name of a ConfigProvider, * "path" is an optional string, and "key" is a required string. This variable is resolved by passing the "key" * and optional "path" to a ConfigProvider with the specified name, and the result from the ConfigProvider is * then used in place of the variable. Variables that cannot be resolved by the AbstractConfig constructor will * be left unchanged in the configuration. * - * - * @param definition the definition of the configurations; may not be null - * @param originals the configuration properties plus any optional config provider properties; + * @param definition the definition of the configurations; may not be null + * @param originals the configuration properties plus any optional config provider properties; * @param configProviderProps the map of properties of config providers which will be instantiated by - *the constructor to resolve any variables in {@code originals}; may be null or empty - * @param doLog whether the configurations should be logged + *the constructor to resolve any variables in {@code originals}; may be null or empty + * @param doLog whether the configurations should be logged */
[kafka] branch 3.4 updated: Kafka 14565: On failure, close AutoCloseable objects instantiated and configured by AbstractConfig (#13168)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch 3.4 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.4 by this push: new 2da64826fbc Kafka 14565: On failure, close AutoCloseable objects instantiated and configured by AbstractConfig (#13168) 2da64826fbc is described below commit 2da64826fbc39567c424a1114e5777d6b84d184f Author: Terry AuthorDate: Thu Feb 16 12:39:24 2023 -0500 Kafka 14565: On failure, close AutoCloseable objects instantiated and configured by AbstractConfig (#13168) Reviewers: Chris Egerton --- .../apache/kafka/common/config/AbstractConfig.java | 101 + .../kafka/clients/consumer/KafkaConsumerTest.java | 27 ++ .../kafka/clients/producer/KafkaProducerTest.java | 24 + .../kafka/common/config/AbstractConfigTest.java| 26 +- .../apache/kafka/test/MockConsumerInterceptor.java | 14 +++ .../apache/kafka/test/MockProducerInterceptor.java | 14 +++ 6 files changed, 166 insertions(+), 40 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java index e3fda4d9f54..13637163311 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java +++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java @@ -66,44 +66,43 @@ public class AbstractConfig { * Construct a configuration with a ConfigDef and the configuration properties, which can include properties * for zero or more {@link ConfigProvider} that will be used to resolve variables in configuration property * values. - * + * * The originals is a name-value pair configuration properties and optional config provider configs. The * value of the configuration can be a variable as defined below or the actual value. This constructor will * first instantiate the ConfigProviders using the config provider configs, then it will find all the * variables in the values of the originals configurations, attempt to resolve the variables using the named * ConfigProviders, and then parse and validate the configurations. - * + * * ConfigProvider configs can be passed either as configs in the originals map or in the separate * configProviderProps map. If config providers properties are passed in the configProviderProps any config * provider properties in originals map will be ignored. If ConfigProvider properties are not provided, the * constructor will skip the variable substitution step and will simply validate and parse the supplied * configuration. - * + * * The "{@code config.providers}" configuration property and all configuration properties that begin with the * "{@code config.providers.}" prefix are reserved. The "{@code config.providers}" configuration property * specifies the names of the config providers, and properties that begin with the "{@code config.providers..}" * prefix correspond to the properties for that named provider. For example, the "{@code config.providers..class}" * property specifies the name of the {@link ConfigProvider} implementation class that should be used for * the provider. - * + * * The keys for ConfigProvider configs in both originals and configProviderProps will start with the above * mentioned "{@code config.providers.}" prefix. - * + * * Variables have the form "${providerName:[path:]key}", where "providerName" is the name of a ConfigProvider, * "path" is an optional string, and "key" is a required string. This variable is resolved by passing the "key" * and optional "path" to a ConfigProvider with the specified name, and the result from the ConfigProvider is * then used in place of the variable. Variables that cannot be resolved by the AbstractConfig constructor will * be left unchanged in the configuration. * - * - * @param definition the definition of the configurations; may not be null - * @param originals the configuration properties plus any optional config provider properties; + * @param definition the definition of the configurations; may not be null + * @param originals the configuration properties plus any optional config provider properties; * @param configProviderProps the map of properties of config providers which will be instantiated by - *the constructor to resolve any variables in {@code originals}; may be null or empty - * @param doLog whether the configurations should be logged + *the constructor to resolve any variables in {@code originals}; may be null or empty + * @param doLog whether the configurations should be logged */
[kafka] branch trunk updated (ba0c5b0902d -> f3dc3f0dad1)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from ba0c5b0902d MINOR: Simplify JUnit assertions in tests; remove accidental unnecessary code in tests (#13219) add f3dc3f0dad1 Kafka 14565: On failure, close AutoCloseable objects instantiated and configured by AbstractConfig (#13168) No new revisions were added by this update. Summary of changes: .../apache/kafka/common/config/AbstractConfig.java | 101 + .../kafka/clients/consumer/KafkaConsumerTest.java | 27 ++ .../kafka/clients/producer/KafkaProducerTest.java | 24 + .../kafka/common/config/AbstractConfigTest.java| 26 +- .../apache/kafka/test/MockConsumerInterceptor.java | 14 +++ .../apache/kafka/test/MockProducerInterceptor.java | 14 +++ 6 files changed, 166 insertions(+), 40 deletions(-)
[kafka] branch trunk updated: MINOR: Simplify JUnit assertions in tests; remove accidental unnecessary code in tests (#13219)
This is an automated email from the ASF dual-hosted git repository. mimaison pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new ba0c5b0902d MINOR: Simplify JUnit assertions in tests; remove accidental unnecessary code in tests (#13219) ba0c5b0902d is described below commit ba0c5b0902d4b259505cf4a7c2a45e98182a372b Author: Christo Lolov AuthorDate: Thu Feb 16 15:13:31 2023 + MINOR: Simplify JUnit assertions in tests; remove accidental unnecessary code in tests (#13219) * assertEquals called on array * Method is identical to its super method * Simplifiable assertions * Unused imports Reviewers: Mickael Maison , Divij Vaidya --- .../kafka/clients/admin/KafkaAdminClientTest.java | 3 +- .../consumer/ConsumerPartitionAssignorTest.java| 22 -- .../common/header/internals/RecordHeadersTest.java | 4 +- .../apache/kafka/common/network/SelectorTest.java | 3 -- .../utils/ImplicitLinkedHashCollectionTest.java| 7 ++-- .../ImplicitLinkedHashMultiCollectionTest.java | 8 ++-- .../auth/extension/JaasBasicAuthFilterTest.java| 3 +- .../connect/integration/BlockingConnectorTest.java | 10 - .../runtime/AbstractWorkerSourceTaskTest.java | 9 ++-- .../runtime/ExactlyOnceWorkerSourceTaskTest.java | 4 +- .../runtime/rest/entities/PluginInfoTest.java | 12 +++--- .../controller/BrokerHeartbeatManagerTest.java | 3 +- .../kafka/controller/QuorumControllerTest.java | 4 +- .../controller/ReplicationControlManagerTest.java | 4 +- .../apache/kafka/controller/ResultOrErrorTest.java | 20 + .../org/apache/kafka/image/TopicsImageTest.java| 5 ++- .../kafka/metadata/KafkaConfigSchemaTest.java | 5 ++- .../authorizer/StandardAuthorizerTest.java | 33 +++ .../apache/kafka/metalog/LocalLogManagerTest.java | 7 ++-- .../apache/kafka/timeline/BaseHashTableTest.java | 25 +-- .../kafka/timeline/SnapshottableHashTableTest.java | 38 + .../apache/kafka/timeline/TimelineHashMapTest.java | 14 --- .../org/apache/kafka/shell/GlobComponentTest.java | 7 ++-- .../org/apache/kafka/shell/MetadataNodeTest.java | 6 +-- .../storage/RemoteLogSegmentLifecycleTest.java | 4 -- .../org/apache/kafka/streams/KeyValueTest.java | 48 +++--- .../integration/StoreUpgradeIntegrationTest.java | 8 .../internals/KStreamNewProcessorApiTest.java | 4 -- .../internals/graph/GraphGraceSearchUtilTest.java | 9 .../internals/graph/TableProcessorNodeTest.java| 7 .../CachingInMemoryKeyValueStoreTest.java | 6 --- .../MeteredTimestampedKeyValueStoreTest.java | 3 +- .../streams/state/internals/ThreadCacheTest.java | 4 +- .../internals/TimeOrderedWindowStoreTest.java | 3 -- .../kafka/streams/TopologyTestDriverTest.java | 17 35 files changed, 155 insertions(+), 214 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index d050f08e320..bad3e391bfa 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -229,7 +229,6 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.Optional; import java.util.OptionalInt; import java.util.OptionalLong; @@ -5725,7 +5724,7 @@ public class KafkaAdminClientTest { } private ClientQuotaEntity newClientQuotaEntity(String... args) { -assertTrue(args.length % 2 == 0); +assertEquals(0, args.length % 2); Map entityMap = new HashMap<>(args.length / 2); for (int index = 0; index < args.length; index += 2) { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignorTest.java index 1298f8c23bd..635f38f7f38 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignorTest.java @@ -22,12 +22,10 @@ import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.serialization.StringDeserializer; import org.junit.jupiter.api.Test; -import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Properties; -import java.util.Set; import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; import static
[kafka] branch trunk updated (6d640752304 -> 38662bb52de)
This is an automated email from the ASF dual-hosted git repository. satishd pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from 6d640752304 KAFKA-14708: Use Java thread instead of kafka library for example purpose (#13238) add 38662bb52de MINOR Added assertion checks in tests for ProducerStateManager.activeProducers usages (#13235) No new revisions were added by this update. Summary of changes: .../scala/integration/kafka/api/TransactionsTest.scala| 15 ++- .../scala/unit/kafka/log/ProducerStateManagerTest.scala | 10 -- 2 files changed, 18 insertions(+), 7 deletions(-)