[kafka] branch trunk updated (ee565f5f6b -> 17637c4ad5)

2022-06-16 Thread ijuma
This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


from ee565f5f6b KAFKA-13939: Only track dirty keys if logging is enabled. 
(#12263)
 add 17637c4ad5 MINOR: Clean up tmp files created by tests (#12233)

No new revisions were added by this update.

Summary of changes:
 .../kafka/common/security/ssl/SslFactoryTest.java  | 41 +++---
 .../security/ssl/mock/TestKeyManagerFactory.java   |  3 +-
 .../java/org/apache/kafka/test/TestSslUtils.java   |  6 ++--
 .../test/java/org/apache/kafka/test/TestUtils.java | 32 -
 .../kafka/controller/BootstrapMetadataTest.java| 22 +---
 .../kafka/streams/tests/SmokeTestClient.java   | 29 ++-
 6 files changed, 68 insertions(+), 65 deletions(-)



[kafka] branch trunk updated: KAFKA-13939: Only track dirty keys if logging is enabled. (#12263)

2022-06-16 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new ee565f5f6b KAFKA-13939: Only track dirty keys if logging is enabled. 
(#12263)
ee565f5f6b is described below

commit ee565f5f6b97c84d4f7f895fcb79188822284414
Author: jnewhouse 
AuthorDate: Thu Jun 16 14:27:38 2022 -0700

KAFKA-13939: Only track dirty keys if logging is enabled. (#12263)

InMemoryTimeOrderedKeyValueBuffer keeps a Set of keys that have been seen 
in order to log them for durability. This set is never used nor cleared if 
logging is not enabled. Having it be populated creates a memory leak. This 
change stops populating the set if logging is not enabled.

Reviewers: Divij Vaidya , Kvicii 
<42023367+kvi...@users.noreply.github.com>, Guozhang Wang 
---
 .../state/internals/InMemoryTimeOrderedKeyValueBuffer.java| 8 ++--
 1 file changed, 6 insertions(+), 2 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
index 5894023bbe..5403f9e703 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
@@ -423,7 +423,9 @@ public final class InMemoryTimeOrderedKeyValueBuffer 
implements TimeOrdere
 delegate.remove();
 index.remove(next.getKey().key());
 
-dirtyKeys.add(next.getKey().key());
+if (loggingEnabled) {
+dirtyKeys.add(next.getKey().key());
+}
 
 memBufferSize -= computeRecordSize(next.getKey().key(), 
bufferValue);
 
@@ -497,7 +499,9 @@ public final class InMemoryTimeOrderedKeyValueBuffer 
implements TimeOrdere
 serializedKey,
 new BufferValue(serializedPriorValue, serialChange.oldValue, 
serialChange.newValue, recordContext)
 );
-dirtyKeys.add(serializedKey);
+if (loggingEnabled) {
+dirtyKeys.add(serializedKey);
+}
 updateBufferMetrics();
 }
 



[kafka] branch trunk updated: MINOR: Guard against decrementing `totalCommittedSinceLastSummary` during rebalancing. (#12299)

2022-06-16 Thread mjsax
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 683d0bbc4c MINOR: Guard against decrementing 
`totalCommittedSinceLastSummary` during rebalancing. (#12299)
683d0bbc4c is described below

commit 683d0bbc4ca7223e0ade55be58497af8aded6823
Author: James Hughes 
AuthorDate: Thu Jun 16 12:40:08 2022 -0400

MINOR: Guard against decrementing `totalCommittedSinceLastSummary` during 
rebalancing. (#12299)

Reviewers: Matthias J. Sax 
---
 .../java/org/apache/kafka/streams/processor/internals/StreamThread.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 7d3be3b1a6..64a4ff5433 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -818,10 +818,10 @@ public class StreamThread extends Thread {
 
 final long beforeCommitMs = now;
 final int committed = maybeCommit();
-totalCommittedSinceLastSummary += committed;
 final long commitLatency = Math.max(now - beforeCommitMs, 0);
 totalCommitLatency += commitLatency;
 if (committed > 0) {
+totalCommittedSinceLastSummary += committed;
 commitSensor.record(commitLatency / (double) committed, 
now);
 
 if (log.isDebugEnabled()) {



[kafka] branch trunk updated: KAFKA-13873 Add ability to Pause / Resume KafkaStreams Topologies (#12161)

2022-06-16 Thread cadonna
This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 7ed3748a46 KAFKA-13873 Add ability to Pause / Resume KafkaStreams 
Topologies (#12161)
7ed3748a46 is described below

commit 7ed3748a462cd1ce7c30bb9a331b94a3cd79a401
Author: James Hughes 
AuthorDate: Thu Jun 16 10:06:02 2022 -0400

KAFKA-13873 Add ability to Pause / Resume KafkaStreams Topologies (#12161)

This PR adds the ability to pause and resume KafkaStreams instances as well 
as named/modular topologies (KIP-834).

Co-authored-by: Bruno Cadonna 

Reviewers: Bonnie Varghese , Walker Carlson 
, Guozhang Wang , Bruno Cadonna 

---
 checkstyle/suppressions.xml|   2 +-
 .../org/apache/kafka/streams/KafkaStreams.java |  48 ++-
 .../processor/internals/ActiveTaskCreator.java |   2 +-
 .../processor/internals/StoreChangelogReader.java  |  55 ++-
 .../streams/processor/internals/StreamThread.java  |   3 +-
 .../processor/internals/TaskExecutionMetadata.java |  14 +-
 .../streams/processor/internals/TaskExecutor.java  |   2 +-
 .../streams/processor/internals/TaskManager.java   |   7 +-
 .../kafka/streams/processor/internals/Tasks.java   |  24 +-
 .../processor/internals/TopologyMetadata.java  |  36 +-
 .../KafkaStreamsNamedTopologyWrapper.java  |  25 ++
 .../org/apache/kafka/streams/KafkaStreamsTest.java |  64 +++-
 .../integration/PauseResumeIntegrationTest.java| 422 +
 .../integration/utils/IntegrationTestUtils.java|  35 ++
 .../internals/StoreChangelogReaderTest.java|   2 +-
 .../processor/internals/StreamThreadTest.java  | 154 ++--
 .../internals/TaskExecutionMetadataTest.java   | 124 ++
 .../processor/internals/TaskExecutorTest.java  |  41 ++
 .../streams/processor/internals/TasksTest.java |  65 
 .../processor/internals/TopologyMetadataTest.java  |  49 +++
 20 files changed, 1125 insertions(+), 49 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index f2c24541a1..033017f0fe 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -225,7 +225,7 @@
   
files="(EosV2UpgradeIntegrationTest|KStreamKStreamJoinTest|KTableKTableForeignKeyJoinIntegrationTest|RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest|RelationalSmokeTest|MockProcessorContextStateStoreTest).java"/>
 
 
+  
files="(EosV2UpgradeIntegrationTest|KStreamKStreamJoinTest|StreamThreadTest|TaskManagerTest).java"/>
 
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java 
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index ed62f89ebc..86ab83f67d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -56,7 +56,6 @@ import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.ClientUtils;
 import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
 import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
-import org.apache.kafka.streams.processor.internals.GlobalStreamThread.State;
 import org.apache.kafka.streams.processor.internals.StateDirectory;
 import org.apache.kafka.streams.processor.internals.StreamThread;
 import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
@@ -65,6 +64,7 @@ import 
org.apache.kafka.streams.processor.internals.ThreadStateTransitionValidat
 import org.apache.kafka.streams.processor.internals.TopologyMetadata;
 import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import 
org.apache.kafka.streams.processor.internals.namedtopology.NamedTopology;
 import org.apache.kafka.streams.query.FailureReason;
 import org.apache.kafka.streams.query.PositionBound;
 import org.apache.kafka.streams.query.QueryConfig;
@@ -111,6 +111,7 @@ import static 
org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFail
 import static 
org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration;
 import static 
org.apache.kafka.streams.internals.StreamsConfigUtils.getTotalCacheSize;
 import static 
org.apache.kafka.streams.processor.internals.ClientUtils.fetchEndOffsets;
+import static 
org.apache.kafka.streams.processor.internals.TopologyMetadata.UNNAMED_TOPOLOGY;
 
 /**
  * A Kafka client that allows for performing continuous computation on input 
coming from one or more input topics and
@@ -1735,6 +1736,51 @@ public class KafkaStreams implements AutoCloseable {
 return queryableStoreProvider.getStore(storeQueryParameters);
 }
 
+/**
+ *  This method pauses processing for the KafkaStreams