[kafka] branch trunk updated (ee565f5f6b -> 17637c4ad5)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from ee565f5f6b KAFKA-13939: Only track dirty keys if logging is enabled. (#12263) add 17637c4ad5 MINOR: Clean up tmp files created by tests (#12233) No new revisions were added by this update. Summary of changes: .../kafka/common/security/ssl/SslFactoryTest.java | 41 +++--- .../security/ssl/mock/TestKeyManagerFactory.java | 3 +- .../java/org/apache/kafka/test/TestSslUtils.java | 6 ++-- .../test/java/org/apache/kafka/test/TestUtils.java | 32 - .../kafka/controller/BootstrapMetadataTest.java| 22 +--- .../kafka/streams/tests/SmokeTestClient.java | 29 ++- 6 files changed, 68 insertions(+), 65 deletions(-)
[kafka] branch trunk updated: KAFKA-13939: Only track dirty keys if logging is enabled. (#12263)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new ee565f5f6b KAFKA-13939: Only track dirty keys if logging is enabled. (#12263) ee565f5f6b is described below commit ee565f5f6b97c84d4f7f895fcb79188822284414 Author: jnewhouse AuthorDate: Thu Jun 16 14:27:38 2022 -0700 KAFKA-13939: Only track dirty keys if logging is enabled. (#12263) InMemoryTimeOrderedKeyValueBuffer keeps a Set of keys that have been seen in order to log them for durability. This set is never used nor cleared if logging is not enabled. Having it be populated creates a memory leak. This change stops populating the set if logging is not enabled. Reviewers: Divij Vaidya , Kvicii <42023367+kvi...@users.noreply.github.com>, Guozhang Wang --- .../state/internals/InMemoryTimeOrderedKeyValueBuffer.java| 8 ++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java index 5894023bbe..5403f9e703 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java @@ -423,7 +423,9 @@ public final class InMemoryTimeOrderedKeyValueBuffer implements TimeOrdere delegate.remove(); index.remove(next.getKey().key()); -dirtyKeys.add(next.getKey().key()); +if (loggingEnabled) { +dirtyKeys.add(next.getKey().key()); +} memBufferSize -= computeRecordSize(next.getKey().key(), bufferValue); @@ -497,7 +499,9 @@ public final class InMemoryTimeOrderedKeyValueBuffer implements TimeOrdere serializedKey, new BufferValue(serializedPriorValue, serialChange.oldValue, serialChange.newValue, recordContext) ); -dirtyKeys.add(serializedKey); +if (loggingEnabled) { +dirtyKeys.add(serializedKey); +} updateBufferMetrics(); }
[kafka] branch trunk updated: MINOR: Guard against decrementing `totalCommittedSinceLastSummary` during rebalancing. (#12299)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 683d0bbc4c MINOR: Guard against decrementing `totalCommittedSinceLastSummary` during rebalancing. (#12299) 683d0bbc4c is described below commit 683d0bbc4ca7223e0ade55be58497af8aded6823 Author: James Hughes AuthorDate: Thu Jun 16 12:40:08 2022 -0400 MINOR: Guard against decrementing `totalCommittedSinceLastSummary` during rebalancing. (#12299) Reviewers: Matthias J. Sax --- .../java/org/apache/kafka/streams/processor/internals/StreamThread.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 7d3be3b1a6..64a4ff5433 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -818,10 +818,10 @@ public class StreamThread extends Thread { final long beforeCommitMs = now; final int committed = maybeCommit(); -totalCommittedSinceLastSummary += committed; final long commitLatency = Math.max(now - beforeCommitMs, 0); totalCommitLatency += commitLatency; if (committed > 0) { +totalCommittedSinceLastSummary += committed; commitSensor.record(commitLatency / (double) committed, now); if (log.isDebugEnabled()) {
[kafka] branch trunk updated: KAFKA-13873 Add ability to Pause / Resume KafkaStreams Topologies (#12161)
This is an automated email from the ASF dual-hosted git repository. cadonna pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 7ed3748a46 KAFKA-13873 Add ability to Pause / Resume KafkaStreams Topologies (#12161) 7ed3748a46 is described below commit 7ed3748a462cd1ce7c30bb9a331b94a3cd79a401 Author: James Hughes AuthorDate: Thu Jun 16 10:06:02 2022 -0400 KAFKA-13873 Add ability to Pause / Resume KafkaStreams Topologies (#12161) This PR adds the ability to pause and resume KafkaStreams instances as well as named/modular topologies (KIP-834). Co-authored-by: Bruno Cadonna Reviewers: Bonnie Varghese , Walker Carlson , Guozhang Wang , Bruno Cadonna --- checkstyle/suppressions.xml| 2 +- .../org/apache/kafka/streams/KafkaStreams.java | 48 ++- .../processor/internals/ActiveTaskCreator.java | 2 +- .../processor/internals/StoreChangelogReader.java | 55 ++- .../streams/processor/internals/StreamThread.java | 3 +- .../processor/internals/TaskExecutionMetadata.java | 14 +- .../streams/processor/internals/TaskExecutor.java | 2 +- .../streams/processor/internals/TaskManager.java | 7 +- .../kafka/streams/processor/internals/Tasks.java | 24 +- .../processor/internals/TopologyMetadata.java | 36 +- .../KafkaStreamsNamedTopologyWrapper.java | 25 ++ .../org/apache/kafka/streams/KafkaStreamsTest.java | 64 +++- .../integration/PauseResumeIntegrationTest.java| 422 + .../integration/utils/IntegrationTestUtils.java| 35 ++ .../internals/StoreChangelogReaderTest.java| 2 +- .../processor/internals/StreamThreadTest.java | 154 ++-- .../internals/TaskExecutionMetadataTest.java | 124 ++ .../processor/internals/TaskExecutorTest.java | 41 ++ .../streams/processor/internals/TasksTest.java | 65 .../processor/internals/TopologyMetadataTest.java | 49 +++ 20 files changed, 1125 insertions(+), 49 deletions(-) diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index f2c24541a1..033017f0fe 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -225,7 +225,7 @@ files="(EosV2UpgradeIntegrationTest|KStreamKStreamJoinTest|KTableKTableForeignKeyJoinIntegrationTest|RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest|RelationalSmokeTest|MockProcessorContextStateStoreTest).java"/> + files="(EosV2UpgradeIntegrationTest|KStreamKStreamJoinTest|StreamThreadTest|TaskManagerTest).java"/> diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index ed62f89ebc..86ab83f67d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -56,7 +56,6 @@ import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.ClientUtils; import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier; import org.apache.kafka.streams.processor.internals.GlobalStreamThread; -import org.apache.kafka.streams.processor.internals.GlobalStreamThread.State; import org.apache.kafka.streams.processor.internals.StateDirectory; import org.apache.kafka.streams.processor.internals.StreamThread; import org.apache.kafka.streams.processor.internals.StreamsMetadataState; @@ -65,6 +64,7 @@ import org.apache.kafka.streams.processor.internals.ThreadStateTransitionValidat import org.apache.kafka.streams.processor.internals.TopologyMetadata; import org.apache.kafka.streams.processor.internals.assignment.AssignorError; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopology; import org.apache.kafka.streams.query.FailureReason; import org.apache.kafka.streams.query.PositionBound; import org.apache.kafka.streams.query.QueryConfig; @@ -111,6 +111,7 @@ import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFail import static org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration; import static org.apache.kafka.streams.internals.StreamsConfigUtils.getTotalCacheSize; import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchEndOffsets; +import static org.apache.kafka.streams.processor.internals.TopologyMetadata.UNNAMED_TOPOLOGY; /** * A Kafka client that allows for performing continuous computation on input coming from one or more input topics and @@ -1735,6 +1736,51 @@ public class KafkaStreams implements AutoCloseable { return queryableStoreProvider.getStore(storeQueryParameters); } +/** + * This method pauses processing for the KafkaStreams