[kafka] branch trunk updated (82d5720aae7 -> 322ac86ba28)

2023-02-16 Thread showuon
This is an automated email from the ASF dual-hosted git repository.

showuon pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


from 82d5720aae7 KAFKA-14253 - More informative logging (#13253)
 add 322ac86ba28 KAFKA-14706: Move/rewrite ShutdownableThread to 
server-common module. (#13234)

No new revisions were added by this update.

Summary of changes:
 build.gradle   |   2 +-
 checkstyle/import-control-core.xml |   1 +
 .../scala/kafka/common/InterBrokerSendThread.scala |   8 +-
 .../common/ZkNodeChangeNotificationListener.scala  |   5 +-
 .../controller/ControllerChannelManager.scala  |   6 +-
 .../kafka/controller/ControllerEventManager.scala  |  11 +-
 core/src/main/scala/kafka/log/LogCleaner.scala |   6 +-
 .../scala/kafka/log/remote/RemoteIndexCache.scala  |   4 +-
 core/src/main/scala/kafka/raft/RaftManager.scala   |  11 +-
 .../kafka/raft/TimingWheelExpirationService.scala  |   6 +-
 .../scala/kafka/server/AbstractFetcherThread.scala |   7 +-
 .../scala/kafka/server/ClientQuotaManager.scala|   3 +-
 .../main/scala/kafka/server/DelayedOperation.scala |   2 +-
 .../server/FinalizedFeatureChangeListener.scala|   8 +-
 .../main/scala/kafka/server/ReplicaManager.scala   |   2 +-
 .../kafka/tools/ReplicaVerificationTool.scala  |   6 +-
 .../main/scala/kafka/tools/TestRaftServer.scala|   6 +-
 .../scala/kafka/utils/ShutdownableThread.scala | 113 
 .../kafka/api/AbstractConsumerTest.scala   |   3 +-
 .../integration/kafka/api/ConsumerBounceTest.scala |   3 +-
 .../kafka/api/TransactionsBounceTest.scala |   3 +-
 .../server/DynamicBrokerReconfigurationTest.scala  |   5 +-
 .../other/kafka/TestPurgatoryPerformance.scala |   6 +-
 .../server/ReplicaManagerConcurrencyTest.scala |  11 +-
 .../unit/kafka/utils/ShutdownableThreadTest.scala  |   2 +-
 .../kafka/server/util/ShutdownableThread.java  | 146 +
 26 files changed, 222 insertions(+), 164 deletions(-)
 delete mode 100644 core/src/main/scala/kafka/utils/ShutdownableThread.scala
 create mode 100644 
server-common/src/main/java/org/apache/kafka/server/util/ShutdownableThread.java



[kafka] branch trunk updated: KAFKA-14253 - More informative logging (#13253)

2023-02-16 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 82d5720aae7 KAFKA-14253 - More informative logging (#13253)
82d5720aae7 is described below

commit 82d5720aae78c9e17606c8345dfc208557f9a8f2
Author: Philip Nee 
AuthorDate: Thu Feb 16 16:54:50 2023 -0800

KAFKA-14253 - More informative logging (#13253)

Includes 2 requirements from the ticket:
* Include the number of members in the group (I.e., "15 members 
participating" and "to 15 clients as")
* Sort the member ids (to help compare the membership and assignment across 
rebalances)

Reviewers: Guozhang Wang 
---
 .../internals/StreamsPartitionAssignor.java| 22 ++
 1 file changed, 14 insertions(+), 8 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 1875f57b649..46c1e41e6c1 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -76,9 +76,8 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
-
+import static java.util.Map.Entry.comparingByKey;
 import static java.util.UUID.randomUUID;
-
 import static org.apache.kafka.common.utils.Utils.filterMap;
 import static 
org.apache.kafka.streams.processor.internals.ClientUtils.fetchCommittedOffsets;
 import static 
org.apache.kafka.streams.processor.internals.ClientUtils.fetchEndOffsetsFuture;
@@ -619,10 +618,12 @@ public class StreamsPartitionAssignor implements 
ConsumerPartitionAssignor, Conf
 final boolean lagComputationSuccessful =
 populateClientStatesMap(clientStates, clientMetadataMap, 
taskForPartition, changelogTopics);
 
-log.info("All members participating in this rebalance: \n{}.",
- clientStates.entrySet().stream()
- .map(entry -> entry.getKey() + ": " + 
entry.getValue().consumers())
- .collect(Collectors.joining(Utils.NL)));
+log.info("{} members participating in this rebalance: \n{}.",
+clientStates.size(),
+clientStates.entrySet().stream()
+.sorted(comparingByKey())
+.map(entry -> entry.getKey() + ": " + 
entry.getValue().consumers())
+.collect(Collectors.joining(Utils.NL)));
 
 final Set allTasks = partitionsForTask.keySet();
 statefulTasks.addAll(changelogTopics.statefulTaskIds());
@@ -637,8 +638,13 @@ public class StreamsPartitionAssignor implements 
ConsumerPartitionAssignor, Conf

statefulTasks,

assignmentConfigs);
 
-log.info("Assigned tasks {} including stateful {} to clients as: 
\n{}.",
-allTasks, statefulTasks, clientStates.entrySet().stream()
+log.info("{} assigned tasks {} including stateful {} to {} clients as: 
\n{}.",
+allTasks.size(),
+allTasks,
+statefulTasks,
+clientStates.size(),
+clientStates.entrySet().stream()
+.sorted(comparingByKey())
 .map(entry -> entry.getKey() + "=" + 
entry.getValue().currentAssignment())
 .collect(Collectors.joining(Utils.NL)));
 



[kafka] branch 3.4 updated: KAFKA-14727: Enable periodic offset commits for EOS source tasks (#13262)

2023-02-16 Thread cegerton
This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.4 by this push:
 new c8d0168b1fb KAFKA-14727: Enable periodic offset commits for EOS source 
tasks (#13262)
c8d0168b1fb is described below

commit c8d0168b1fba07f1f782ba862058765df9df6797
Author: Greg Harris 
AuthorDate: Thu Feb 16 15:51:34 2023 -0800

KAFKA-14727: Enable periodic offset commits for EOS source tasks (#13262)

Reviewers: Chris Egerton 
---
 .../connect/runtime/AbstractWorkerSourceTask.java  |  4 +-
 .../runtime/ExactlyOnceWorkerSourceTask.java   | 20 ++---
 .../kafka/connect/storage/OffsetStorageWriter.java |  7 ---
 .../runtime/ExactlyOnceWorkerSourceTaskTest.java   | 51 +-
 4 files changed, 39 insertions(+), 43 deletions(-)

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
index ff15f631a73..fb3c04be6cf 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
@@ -353,8 +353,10 @@ public abstract class AbstractWorkerSourceTask extends 
WorkerTask {
 recordPollReturned(toSend.size(), time.milliseconds() 
- start);
 }
 }
-if (toSend == null)
+if (toSend == null) {
+batchDispatched();
 continue;
+}
 log.trace("{} About to send {} records to Kafka", this, 
toSend.size());
 if (sendRecords()) {
 batchDispatched();
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
index 2642ae776ac..21f6bd4f59d 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
@@ -255,10 +255,6 @@ class ExactlyOnceWorkerSourceTask extends 
AbstractWorkerSourceTask {
 
 long started = time.milliseconds();
 
-// We might have just aborted a transaction, in which case we'll have 
to begin a new one
-// in order to commit offsets
-maybeBeginTransaction();
-
 AtomicReference flushError = new AtomicReference<>();
 boolean shouldFlush = false;
 try {
@@ -269,6 +265,20 @@ class ExactlyOnceWorkerSourceTask extends 
AbstractWorkerSourceTask {
 } catch (Throwable e) {
 flushError.compareAndSet(null, e);
 }
+if (flushError.get() == null && !transactionOpen && !shouldFlush) {
+// There is no contents on the framework side to commit, so skip 
the offset flush and producer commit
+long durationMillis = time.milliseconds() - started;
+recordCommitSuccess(durationMillis);
+log.debug("{} Finished commitOffsets successfully in {} ms", this, 
durationMillis);
+
+commitSourceTask();
+return;
+}
+
+// We might have just aborted a transaction, in which case we'll have 
to begin a new one
+// in order to commit offsets
+maybeBeginTransaction();
+
 if (shouldFlush) {
 // Now we can actually write the offsets to the internal topic.
 // No need to track the flush future here since it's guaranteed to 
complete by the time
@@ -393,7 +403,7 @@ class ExactlyOnceWorkerSourceTask extends 
AbstractWorkerSourceTask {
 }
 
 private void maybeCommitTransaction(boolean shouldCommit) {
-if (shouldCommit && (transactionOpen || offsetWriter.willFlush())) 
{
+if (shouldCommit) {
 try (LoggingContext loggingContext = 
LoggingContext.forOffsets(id)) {
 commitTransaction();
 }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java
index 692669e7544..cb944034db1 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java
@@ -150,13 +150,6 @@ public class OffsetStorageWriter {
 }
 }
 
-/**
- * @return whether there's anything to flush right now.
- */
-public synchronized boolean willFlush() {
-return !data.isEmpty();
-}
-
 /**
  * Flush the current offsets and clear them 

[kafka] branch trunk updated: KAFKA-14727: Enable periodic offset commits for EOS source tasks (#13262)

2023-02-16 Thread cegerton
This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new aea6090ce47 KAFKA-14727: Enable periodic offset commits for EOS source 
tasks (#13262)
aea6090ce47 is described below

commit aea6090ce479a06ea5489a54aeecf9b40233a3a1
Author: Greg Harris 
AuthorDate: Thu Feb 16 15:51:34 2023 -0800

KAFKA-14727: Enable periodic offset commits for EOS source tasks (#13262)

Reviewers: Chris Egerton 
---
 .../connect/runtime/AbstractWorkerSourceTask.java  |  4 +-
 .../runtime/ExactlyOnceWorkerSourceTask.java   | 20 ++---
 .../kafka/connect/storage/OffsetStorageWriter.java |  7 ---
 .../runtime/ExactlyOnceWorkerSourceTaskTest.java   | 51 +-
 4 files changed, 39 insertions(+), 43 deletions(-)

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
index ff15f631a73..fb3c04be6cf 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
@@ -353,8 +353,10 @@ public abstract class AbstractWorkerSourceTask extends 
WorkerTask {
 recordPollReturned(toSend.size(), time.milliseconds() 
- start);
 }
 }
-if (toSend == null)
+if (toSend == null) {
+batchDispatched();
 continue;
+}
 log.trace("{} About to send {} records to Kafka", this, 
toSend.size());
 if (sendRecords()) {
 batchDispatched();
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
index d4ef5ba8106..8b4a8d3c9cc 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
@@ -255,10 +255,6 @@ class ExactlyOnceWorkerSourceTask extends 
AbstractWorkerSourceTask {
 
 long started = time.milliseconds();
 
-// We might have just aborted a transaction, in which case we'll have 
to begin a new one
-// in order to commit offsets
-maybeBeginTransaction();
-
 AtomicReference flushError = new AtomicReference<>();
 boolean shouldFlush = false;
 try {
@@ -269,6 +265,20 @@ class ExactlyOnceWorkerSourceTask extends 
AbstractWorkerSourceTask {
 } catch (Throwable e) {
 flushError.compareAndSet(null, e);
 }
+if (flushError.get() == null && !transactionOpen && !shouldFlush) {
+// There is no contents on the framework side to commit, so skip 
the offset flush and producer commit
+long durationMillis = time.milliseconds() - started;
+recordCommitSuccess(durationMillis);
+log.debug("{} Finished commitOffsets successfully in {} ms", this, 
durationMillis);
+
+commitSourceTask();
+return;
+}
+
+// We might have just aborted a transaction, in which case we'll have 
to begin a new one
+// in order to commit offsets
+maybeBeginTransaction();
+
 if (shouldFlush) {
 // Now we can actually write the offsets to the internal topic.
 // No need to track the flush future here since it's guaranteed to 
complete by the time
@@ -393,7 +403,7 @@ class ExactlyOnceWorkerSourceTask extends 
AbstractWorkerSourceTask {
 }
 
 private void maybeCommitTransaction(boolean shouldCommit) {
-if (shouldCommit && (transactionOpen || offsetWriter.willFlush())) 
{
+if (shouldCommit) {
 try (LoggingContext loggingContext = 
LoggingContext.forOffsets(id)) {
 commitTransaction();
 }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java
index d3141d4758e..89e7824a65b 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java
@@ -150,13 +150,6 @@ public class OffsetStorageWriter {
 }
 }
 
-/**
- * @return whether there's anything to flush right now.
- */
-public synchronized boolean willFlush() {
-return !data.isEmpty();
-}
-
 /**
  * Flush the current offsets and clear 

[kafka] branch 3.3 updated: Kafka 14565: On failure, close AutoCloseable objects instantiated and configured by AbstractConfig (#13168)

2023-02-16 Thread cegerton
This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.3 by this push:
 new 5ec2fed20e5 Kafka 14565: On failure, close AutoCloseable objects 
instantiated and configured by AbstractConfig (#13168)
5ec2fed20e5 is described below

commit 5ec2fed20e51bfa4653f44424bc198cc7cdbf1f6
Author: Terry 
AuthorDate: Thu Feb 16 12:39:24 2023 -0500

Kafka 14565: On failure, close AutoCloseable objects instantiated and 
configured by AbstractConfig (#13168)

Reviewers: Chris Egerton 
---
 .../apache/kafka/common/config/AbstractConfig.java | 101 +
 .../kafka/clients/consumer/KafkaConsumerTest.java  |  27 ++
 .../kafka/clients/producer/KafkaProducerTest.java  |  24 +
 .../kafka/common/config/AbstractConfigTest.java|  26 +-
 .../apache/kafka/test/MockConsumerInterceptor.java |  14 +++
 .../apache/kafka/test/MockProducerInterceptor.java |  14 +++
 6 files changed, 166 insertions(+), 40 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java 
b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index e3fda4d9f54..13637163311 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -66,44 +66,43 @@ public class AbstractConfig {
  * Construct a configuration with a ConfigDef and the configuration 
properties, which can include properties
  * for zero or more {@link ConfigProvider} that will be used to resolve 
variables in configuration property
  * values.
- *
+ * 
  * The originals is a name-value pair configuration properties and 
optional config provider configs. The
  * value of the configuration can be a variable as defined below or the 
actual value. This constructor will
  * first instantiate the ConfigProviders using the config provider 
configs, then it will find all the
  * variables in the values of the originals configurations, attempt to 
resolve the variables using the named
  * ConfigProviders, and then parse and validate the configurations.
- *
+ * 
  * ConfigProvider configs can be passed either as configs in the originals 
map or in the separate
  * configProviderProps map. If config providers properties are passed in 
the configProviderProps any config
  * provider properties in originals map will be ignored. If ConfigProvider 
properties are not provided, the
  * constructor will skip the variable substitution step and will simply 
validate and parse the supplied
  * configuration.
- *
+ * 
  * The "{@code config.providers}" configuration property and all 
configuration properties that begin with the
  * "{@code config.providers.}" prefix are reserved. The "{@code 
config.providers}" configuration property
  * specifies the names of the config providers, and properties that begin 
with the "{@code config.providers..}"
  * prefix correspond to the properties for that named provider. For 
example, the "{@code config.providers..class}"
  * property specifies the name of the {@link ConfigProvider} 
implementation class that should be used for
  * the provider.
- *
+ * 
  * The keys for ConfigProvider configs in both originals and 
configProviderProps will start with the above
  * mentioned "{@code config.providers.}" prefix.
- *
+ * 
  * Variables have the form "${providerName:[path:]key}", where 
"providerName" is the name of a ConfigProvider,
  * "path" is an optional string, and "key" is a required string. This 
variable is resolved by passing the "key"
  * and optional "path" to a ConfigProvider with the specified name, and 
the result from the ConfigProvider is
  * then used in place of the variable. Variables that cannot be resolved 
by the AbstractConfig constructor will
  * be left unchanged in the configuration.
  *
- *
- * @param definition the definition of the configurations; may not be null
- * @param originals the configuration properties plus any optional config 
provider properties;
+ * @param definition  the definition of the configurations; may 
not be null
+ * @param originals   the configuration properties plus any 
optional config provider properties;
  * @param configProviderProps the map of properties of config providers 
which will be instantiated by
- *the constructor to resolve any variables in {@code originals}; 
may be null or empty
- * @param doLog whether the configurations should be logged
+ *the constructor to resolve any variables in 
{@code originals}; may be null or empty
+ * @param doLog   whether the configurations should be logged
  */

[kafka] branch 3.4 updated: Kafka 14565: On failure, close AutoCloseable objects instantiated and configured by AbstractConfig (#13168)

2023-02-16 Thread cegerton
This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.4 by this push:
 new 2da64826fbc Kafka 14565: On failure, close AutoCloseable objects 
instantiated and configured by AbstractConfig (#13168)
2da64826fbc is described below

commit 2da64826fbc39567c424a1114e5777d6b84d184f
Author: Terry 
AuthorDate: Thu Feb 16 12:39:24 2023 -0500

Kafka 14565: On failure, close AutoCloseable objects instantiated and 
configured by AbstractConfig (#13168)

Reviewers: Chris Egerton 
---
 .../apache/kafka/common/config/AbstractConfig.java | 101 +
 .../kafka/clients/consumer/KafkaConsumerTest.java  |  27 ++
 .../kafka/clients/producer/KafkaProducerTest.java  |  24 +
 .../kafka/common/config/AbstractConfigTest.java|  26 +-
 .../apache/kafka/test/MockConsumerInterceptor.java |  14 +++
 .../apache/kafka/test/MockProducerInterceptor.java |  14 +++
 6 files changed, 166 insertions(+), 40 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java 
b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index e3fda4d9f54..13637163311 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -66,44 +66,43 @@ public class AbstractConfig {
  * Construct a configuration with a ConfigDef and the configuration 
properties, which can include properties
  * for zero or more {@link ConfigProvider} that will be used to resolve 
variables in configuration property
  * values.
- *
+ * 
  * The originals is a name-value pair configuration properties and 
optional config provider configs. The
  * value of the configuration can be a variable as defined below or the 
actual value. This constructor will
  * first instantiate the ConfigProviders using the config provider 
configs, then it will find all the
  * variables in the values of the originals configurations, attempt to 
resolve the variables using the named
  * ConfigProviders, and then parse and validate the configurations.
- *
+ * 
  * ConfigProvider configs can be passed either as configs in the originals 
map or in the separate
  * configProviderProps map. If config providers properties are passed in 
the configProviderProps any config
  * provider properties in originals map will be ignored. If ConfigProvider 
properties are not provided, the
  * constructor will skip the variable substitution step and will simply 
validate and parse the supplied
  * configuration.
- *
+ * 
  * The "{@code config.providers}" configuration property and all 
configuration properties that begin with the
  * "{@code config.providers.}" prefix are reserved. The "{@code 
config.providers}" configuration property
  * specifies the names of the config providers, and properties that begin 
with the "{@code config.providers..}"
  * prefix correspond to the properties for that named provider. For 
example, the "{@code config.providers..class}"
  * property specifies the name of the {@link ConfigProvider} 
implementation class that should be used for
  * the provider.
- *
+ * 
  * The keys for ConfigProvider configs in both originals and 
configProviderProps will start with the above
  * mentioned "{@code config.providers.}" prefix.
- *
+ * 
  * Variables have the form "${providerName:[path:]key}", where 
"providerName" is the name of a ConfigProvider,
  * "path" is an optional string, and "key" is a required string. This 
variable is resolved by passing the "key"
  * and optional "path" to a ConfigProvider with the specified name, and 
the result from the ConfigProvider is
  * then used in place of the variable. Variables that cannot be resolved 
by the AbstractConfig constructor will
  * be left unchanged in the configuration.
  *
- *
- * @param definition the definition of the configurations; may not be null
- * @param originals the configuration properties plus any optional config 
provider properties;
+ * @param definition  the definition of the configurations; may 
not be null
+ * @param originals   the configuration properties plus any 
optional config provider properties;
  * @param configProviderProps the map of properties of config providers 
which will be instantiated by
- *the constructor to resolve any variables in {@code originals}; 
may be null or empty
- * @param doLog whether the configurations should be logged
+ *the constructor to resolve any variables in 
{@code originals}; may be null or empty
+ * @param doLog   whether the configurations should be logged
  */

[kafka] branch trunk updated (ba0c5b0902d -> f3dc3f0dad1)

2023-02-16 Thread cegerton
This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


from ba0c5b0902d MINOR: Simplify JUnit assertions in tests; remove 
accidental unnecessary code in tests (#13219)
 add f3dc3f0dad1 Kafka 14565: On failure, close AutoCloseable objects 
instantiated and configured by AbstractConfig (#13168)

No new revisions were added by this update.

Summary of changes:
 .../apache/kafka/common/config/AbstractConfig.java | 101 +
 .../kafka/clients/consumer/KafkaConsumerTest.java  |  27 ++
 .../kafka/clients/producer/KafkaProducerTest.java  |  24 +
 .../kafka/common/config/AbstractConfigTest.java|  26 +-
 .../apache/kafka/test/MockConsumerInterceptor.java |  14 +++
 .../apache/kafka/test/MockProducerInterceptor.java |  14 +++
 6 files changed, 166 insertions(+), 40 deletions(-)



[kafka] branch trunk updated: MINOR: Simplify JUnit assertions in tests; remove accidental unnecessary code in tests (#13219)

2023-02-16 Thread mimaison
This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new ba0c5b0902d MINOR: Simplify JUnit assertions in tests; remove 
accidental unnecessary code in tests (#13219)
ba0c5b0902d is described below

commit ba0c5b0902d4b259505cf4a7c2a45e98182a372b
Author: Christo Lolov 
AuthorDate: Thu Feb 16 15:13:31 2023 +

MINOR: Simplify JUnit assertions in tests; remove accidental unnecessary 
code in tests (#13219)

* assertEquals called on array
* Method is identical to its super method
* Simplifiable assertions
* Unused imports

Reviewers: Mickael Maison , Divij Vaidya 

---
 .../kafka/clients/admin/KafkaAdminClientTest.java  |  3 +-
 .../consumer/ConsumerPartitionAssignorTest.java| 22 --
 .../common/header/internals/RecordHeadersTest.java |  4 +-
 .../apache/kafka/common/network/SelectorTest.java  |  3 --
 .../utils/ImplicitLinkedHashCollectionTest.java|  7 ++--
 .../ImplicitLinkedHashMultiCollectionTest.java |  8 ++--
 .../auth/extension/JaasBasicAuthFilterTest.java|  3 +-
 .../connect/integration/BlockingConnectorTest.java | 10 -
 .../runtime/AbstractWorkerSourceTaskTest.java  |  9 ++--
 .../runtime/ExactlyOnceWorkerSourceTaskTest.java   |  4 +-
 .../runtime/rest/entities/PluginInfoTest.java  | 12 +++---
 .../controller/BrokerHeartbeatManagerTest.java |  3 +-
 .../kafka/controller/QuorumControllerTest.java |  4 +-
 .../controller/ReplicationControlManagerTest.java  |  4 +-
 .../apache/kafka/controller/ResultOrErrorTest.java | 20 +
 .../org/apache/kafka/image/TopicsImageTest.java|  5 ++-
 .../kafka/metadata/KafkaConfigSchemaTest.java  |  5 ++-
 .../authorizer/StandardAuthorizerTest.java | 33 +++
 .../apache/kafka/metalog/LocalLogManagerTest.java  |  7 ++--
 .../apache/kafka/timeline/BaseHashTableTest.java   | 25 +--
 .../kafka/timeline/SnapshottableHashTableTest.java | 38 +
 .../apache/kafka/timeline/TimelineHashMapTest.java | 14 ---
 .../org/apache/kafka/shell/GlobComponentTest.java  |  7 ++--
 .../org/apache/kafka/shell/MetadataNodeTest.java   |  6 +--
 .../storage/RemoteLogSegmentLifecycleTest.java |  4 --
 .../org/apache/kafka/streams/KeyValueTest.java | 48 +++---
 .../integration/StoreUpgradeIntegrationTest.java   |  8 
 .../internals/KStreamNewProcessorApiTest.java  |  4 --
 .../internals/graph/GraphGraceSearchUtilTest.java  |  9 
 .../internals/graph/TableProcessorNodeTest.java|  7 
 .../CachingInMemoryKeyValueStoreTest.java  |  6 ---
 .../MeteredTimestampedKeyValueStoreTest.java   |  3 +-
 .../streams/state/internals/ThreadCacheTest.java   |  4 +-
 .../internals/TimeOrderedWindowStoreTest.java  |  3 --
 .../kafka/streams/TopologyTestDriverTest.java  | 17 
 35 files changed, 155 insertions(+), 214 deletions(-)

diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index d050f08e320..bad3e391bfa 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -229,7 +229,6 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.OptionalInt;
 import java.util.OptionalLong;
@@ -5725,7 +5724,7 @@ public class KafkaAdminClientTest {
 }
 
 private ClientQuotaEntity newClientQuotaEntity(String... args) {
-assertTrue(args.length % 2 == 0);
+assertEquals(0, args.length % 2);
 
 Map entityMap = new HashMap<>(args.length / 2);
 for (int index = 0; index < args.length; index += 2) {
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignorTest.java
index 1298f8c23bd..635f38f7f38 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignorTest.java
@@ -22,12 +22,10 @@ import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.junit.jupiter.api.Test;
 
-import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
-import java.util.Set;
 
 import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
 import static 

[kafka] branch trunk updated (6d640752304 -> 38662bb52de)

2023-02-16 Thread satishd
This is an automated email from the ASF dual-hosted git repository.

satishd pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


from 6d640752304 KAFKA-14708: Use Java thread instead of kafka library for 
example purpose (#13238)
 add 38662bb52de MINOR Added assertion checks in tests for 
ProducerStateManager.activeProducers usages (#13235)

No new revisions were added by this update.

Summary of changes:
 .../scala/integration/kafka/api/TransactionsTest.scala| 15 ++-
 .../scala/unit/kafka/log/ProducerStateManagerTest.scala   | 10 --
 2 files changed, 18 insertions(+), 7 deletions(-)