[kafka] branch trunk updated (a2e87fe -> e869d8f)
This is an automated email from the ASF dual-hosted git repository. manikumar pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from a2e87fe KAFKA-7620: Fix restart logic for TTLs in WorkerConfigTransformer add e869d8f MINOR: Improve docs by adding ToC links to Monitoring No new revisions were added by this update. Summary of changes: docs/ops.html | 3 --- docs/toc.html | 9 + 2 files changed, 9 insertions(+), 3 deletions(-)
[kafka] branch 2.0 updated: KAFKA-7620: Fix restart logic for TTLs in WorkerConfigTransformer
This is an automated email from the ASF dual-hosted git repository. ewencp pushed a commit to branch 2.0 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.0 by this push: new e7298f4 KAFKA-7620: Fix restart logic for TTLs in WorkerConfigTransformer e7298f4 is described below commit e7298f4fc53f27f91564f60c3818fa392287ff33 Author: Robert Yokota AuthorDate: Tue Nov 27 22:01:21 2018 -0800 KAFKA-7620: Fix restart logic for TTLs in WorkerConfigTransformer The restart logic for TTLs in `WorkerConfigTransformer` was broken when trying to make it toggle-able. Accessing the toggle through the `Herder` causes the same code to be called recursively. This fix just accesses the toggle by simply looking in the properties map that is passed to `WorkerConfigTransformer`. Author: Robert Yokota Reviewers: Magesh Nandakumar , Ewen Cheslack-Postava Closes #5914 from rayokota/KAFKA-7620 (cherry picked from commit a2e87feb8b1db8200ca3a34aa72b0802e8f61096) Signed-off-by: Ewen Cheslack-Postava --- .../kafka/connect/runtime/ConnectorConfig.java | 5 ++- .../org/apache/kafka/connect/runtime/Herder.java | 6 --- .../connect/runtime/WorkerConfigTransformer.java | 44 ++ .../runtime/distributed/DistributedHerder.java | 8 .../runtime/standalone/StandaloneHerder.java | 8 .../runtime/WorkerConfigTransformerTest.java | 13 --- 6 files changed, 39 insertions(+), 45 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java index 9d1a50d..d030fed 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java @@ -35,6 +35,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashSet; import java.util.List; +import java.util.Locale; import java.util.Map; import static org.apache.kafka.common.config.ConfigDef.NonEmptyStringWithoutControlChars.nonEmptyStringWithoutControlChars; @@ -105,8 +106,8 @@ public class ConnectorConfig extends AbstractConfig { "indicates that a configuration value will expire in the future."; private static final String CONFIG_RELOAD_ACTION_DISPLAY = "Reload Action"; -public static final String CONFIG_RELOAD_ACTION_NONE = Herder.ConfigReloadAction.NONE.toString(); -public static final String CONFIG_RELOAD_ACTION_RESTART = Herder.ConfigReloadAction.RESTART.toString(); +public static final String CONFIG_RELOAD_ACTION_NONE = Herder.ConfigReloadAction.NONE.name().toLowerCase(Locale.ROOT); +public static final String CONFIG_RELOAD_ACTION_RESTART = Herder.ConfigReloadAction.RESTART.name().toLowerCase(Locale.ROOT); public static final String ERRORS_RETRY_TIMEOUT_CONFIG = "errors.retry.timeout"; public static final String ERRORS_RETRY_TIMEOUT_DISPLAY = "Retry Timeout for Errors"; diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java index 5c7cc14..c572e20 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java @@ -149,12 +149,6 @@ public interface Herder { void restartTask(ConnectorTaskId id, Callback cb); /** - * Get the configuration reload action. - * @param connName name of the connector - */ -ConfigReloadAction connectorConfigReloadAction(final String connName); - -/** * Restart the connector. * @param connName name of the connector * @param cb callback to invoke upon completion diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java index 1b715c7..3373d5c 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java @@ -16,10 +16,15 @@ */ package org.apache.kafka.connect.runtime; +import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.provider.ConfigProvider; import org.apache.kafka.common.config.ConfigTransformer; import org.apache.kafka.common.config.ConfigTransformerResult; +import org.apache.kafka.connect.runtime.Herder.ConfigReloadAction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.Locale; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -29,6 +34,8 @@ import java.util.concur
[kafka] branch 2.1 updated: KAFKA-7620: Fix restart logic for TTLs in WorkerConfigTransformer
This is an automated email from the ASF dual-hosted git repository. ewencp pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.1 by this push: new 9951bf9 KAFKA-7620: Fix restart logic for TTLs in WorkerConfigTransformer 9951bf9 is described below commit 9951bf911125c51a2574ac0dbb9913bc0500b594 Author: Robert Yokota AuthorDate: Tue Nov 27 22:01:21 2018 -0800 KAFKA-7620: Fix restart logic for TTLs in WorkerConfigTransformer The restart logic for TTLs in `WorkerConfigTransformer` was broken when trying to make it toggle-able. Accessing the toggle through the `Herder` causes the same code to be called recursively. This fix just accesses the toggle by simply looking in the properties map that is passed to `WorkerConfigTransformer`. Author: Robert Yokota Reviewers: Magesh Nandakumar , Ewen Cheslack-Postava Closes #5914 from rayokota/KAFKA-7620 (cherry picked from commit a2e87feb8b1db8200ca3a34aa72b0802e8f61096) Signed-off-by: Ewen Cheslack-Postava --- .../kafka/connect/runtime/ConnectorConfig.java | 5 ++- .../org/apache/kafka/connect/runtime/Herder.java | 6 --- .../connect/runtime/WorkerConfigTransformer.java | 44 ++ .../runtime/distributed/DistributedHerder.java | 8 .../runtime/standalone/StandaloneHerder.java | 8 .../runtime/WorkerConfigTransformerTest.java | 13 --- 6 files changed, 39 insertions(+), 45 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java index 10096a5..e915843 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java @@ -35,6 +35,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashSet; import java.util.List; +import java.util.Locale; import java.util.Map; import static org.apache.kafka.common.config.ConfigDef.NonEmptyStringWithoutControlChars.nonEmptyStringWithoutControlChars; @@ -105,8 +106,8 @@ public class ConnectorConfig extends AbstractConfig { "indicates that a configuration value will expire in the future."; private static final String CONFIG_RELOAD_ACTION_DISPLAY = "Reload Action"; -public static final String CONFIG_RELOAD_ACTION_NONE = Herder.ConfigReloadAction.NONE.toString(); -public static final String CONFIG_RELOAD_ACTION_RESTART = Herder.ConfigReloadAction.RESTART.toString(); +public static final String CONFIG_RELOAD_ACTION_NONE = Herder.ConfigReloadAction.NONE.name().toLowerCase(Locale.ROOT); +public static final String CONFIG_RELOAD_ACTION_RESTART = Herder.ConfigReloadAction.RESTART.name().toLowerCase(Locale.ROOT); public static final String ERRORS_RETRY_TIMEOUT_CONFIG = "errors.retry.timeout"; public static final String ERRORS_RETRY_TIMEOUT_DISPLAY = "Retry Timeout for Errors"; diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java index 5c7cc14..c572e20 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java @@ -149,12 +149,6 @@ public interface Herder { void restartTask(ConnectorTaskId id, Callback cb); /** - * Get the configuration reload action. - * @param connName name of the connector - */ -ConfigReloadAction connectorConfigReloadAction(final String connName); - -/** * Restart the connector. * @param connName name of the connector * @param cb callback to invoke upon completion diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java index 1b715c7..3373d5c 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java @@ -16,10 +16,15 @@ */ package org.apache.kafka.connect.runtime; +import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.provider.ConfigProvider; import org.apache.kafka.common.config.ConfigTransformer; import org.apache.kafka.common.config.ConfigTransformerResult; +import org.apache.kafka.connect.runtime.Herder.ConfigReloadAction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.Locale; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -29,6 +34,8 @@ import java.util.concur
[kafka] branch trunk updated: KAFKA-7620: Fix restart logic for TTLs in WorkerConfigTransformer
This is an automated email from the ASF dual-hosted git repository. ewencp pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new a2e87fe KAFKA-7620: Fix restart logic for TTLs in WorkerConfigTransformer a2e87fe is described below commit a2e87feb8b1db8200ca3a34aa72b0802e8f61096 Author: Robert Yokota AuthorDate: Tue Nov 27 22:01:21 2018 -0800 KAFKA-7620: Fix restart logic for TTLs in WorkerConfigTransformer The restart logic for TTLs in `WorkerConfigTransformer` was broken when trying to make it toggle-able. Accessing the toggle through the `Herder` causes the same code to be called recursively. This fix just accesses the toggle by simply looking in the properties map that is passed to `WorkerConfigTransformer`. Author: Robert Yokota Reviewers: Magesh Nandakumar , Ewen Cheslack-Postava Closes #5914 from rayokota/KAFKA-7620 --- .../kafka/connect/runtime/ConnectorConfig.java | 5 ++- .../org/apache/kafka/connect/runtime/Herder.java | 6 --- .../connect/runtime/WorkerConfigTransformer.java | 44 ++ .../runtime/distributed/DistributedHerder.java | 8 .../runtime/standalone/StandaloneHerder.java | 8 .../runtime/WorkerConfigTransformerTest.java | 13 --- 6 files changed, 39 insertions(+), 45 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java index efcc01d..8889aad 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java @@ -35,6 +35,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashSet; import java.util.List; +import java.util.Locale; import java.util.Map; import static org.apache.kafka.common.config.ConfigDef.NonEmptyStringWithoutControlChars.nonEmptyStringWithoutControlChars; @@ -105,8 +106,8 @@ public class ConnectorConfig extends AbstractConfig { "indicates that a configuration value will expire in the future."; private static final String CONFIG_RELOAD_ACTION_DISPLAY = "Reload Action"; -public static final String CONFIG_RELOAD_ACTION_NONE = Herder.ConfigReloadAction.NONE.toString(); -public static final String CONFIG_RELOAD_ACTION_RESTART = Herder.ConfigReloadAction.RESTART.toString(); +public static final String CONFIG_RELOAD_ACTION_NONE = Herder.ConfigReloadAction.NONE.name().toLowerCase(Locale.ROOT); +public static final String CONFIG_RELOAD_ACTION_RESTART = Herder.ConfigReloadAction.RESTART.name().toLowerCase(Locale.ROOT); public static final String ERRORS_RETRY_TIMEOUT_CONFIG = "errors.retry.timeout"; public static final String ERRORS_RETRY_TIMEOUT_DISPLAY = "Retry Timeout for Errors"; diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java index 5c7cc14..c572e20 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java @@ -149,12 +149,6 @@ public interface Herder { void restartTask(ConnectorTaskId id, Callback cb); /** - * Get the configuration reload action. - * @param connName name of the connector - */ -ConfigReloadAction connectorConfigReloadAction(final String connName); - -/** * Restart the connector. * @param connName name of the connector * @param cb callback to invoke upon completion diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java index 1b715c7..3373d5c 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java @@ -16,10 +16,15 @@ */ package org.apache.kafka.connect.runtime; +import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.provider.ConfigProvider; import org.apache.kafka.common.config.ConfigTransformer; import org.apache.kafka.common.config.ConfigTransformerResult; +import org.apache.kafka.connect.runtime.Herder.ConfigReloadAction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.Locale; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -29,6 +34,8 @@ import java.util.concurrent.ConcurrentMap; * retrieved TTL values. */ public class WorkerConfigTransformer { +private static final
[kafka] branch 2.1 updated: MINOR: Fix kafkatest/__init__.py to use dev0 instead of SNAPSHOT (#5955)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.1 by this push: new 9fa5477 MINOR: Fix kafkatest/__init__.py to use dev0 instead of SNAPSHOT (#5955) 9fa5477 is described below commit 9fa5477b7a4d980c60d729ef15e1468a3c425fcb Author: Lucas Bradstreet AuthorDate: Tue Nov 27 15:31:18 2018 -1000 MINOR: Fix kafkatest/__init__.py to use dev0 instead of SNAPSHOT (#5955) Fixes the system tests. Reviewers: Ismael Juma --- tests/kafkatest/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/kafkatest/__init__.py b/tests/kafkatest/__init__.py index 8dfb212..eb6e26b 100644 --- a/tests/kafkatest/__init__.py +++ b/tests/kafkatest/__init__.py @@ -22,4 +22,4 @@ # Instead, in development branches, the version should have a suffix of the form ".devN" # # For example, when Kafka is at version 1.0.0-SNAPSHOT, this should be something like "1.0.0.dev0" -__version__ = '2.1.1-SNAPSHOT' +__version__ = '2.1.1.dev0'
[kafka] branch trunk updated: KAFKA-7389: Enable spotBugs with Java 11 and disable false positive warnings (#5943)
This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 0ee1635 KAFKA-7389: Enable spotBugs with Java 11 and disable false positive warnings (#5943) 0ee1635 is described below commit 0ee16350ac2e1ac7fa16f276ae61d2899eaaf28e Author: Ismael Juma AuthorDate: Tue Nov 27 14:40:17 2018 -0800 KAFKA-7389: Enable spotBugs with Java 11 and disable false positive warnings (#5943) See https://github.com/spotbugs/spotbugs/issues/756 for details on the false positives affecting try with resources. An example is: > RCN | Nullcheck of fc at line 629 of value previously dereferenced in > org.apache.kafka.common.utils.Utils.readFileAsString(String, Charset) Reviewers: Manikumar Reddy --- build.gradle| 27 --- gradle/spotbugs-exclude.xml | 11 +++ 2 files changed, 23 insertions(+), 15 deletions(-) diff --git a/build.gradle b/build.gradle index 5ce648a..d9816a1 100644 --- a/build.gradle +++ b/build.gradle @@ -149,8 +149,7 @@ subprojects { apply plugin: 'maven' apply plugin: 'signing' apply plugin: 'checkstyle' - if (!JavaVersion.current().isJava11Compatible()) -apply plugin: "com.github.spotbugs" + apply plugin: "com.github.spotbugs" sourceCompatibility = minJavaVersion targetCompatibility = minJavaVersion @@ -372,20 +371,18 @@ subprojects { } test.dependsOn('checkstyleMain', 'checkstyleTest') - if (!JavaVersion.current().isJava11Compatible()) { -spotbugs { - toolVersion = '3.1.8' - excludeFilter = file("$rootDir/gradle/spotbugs-exclude.xml") - ignoreFailures = false -} -test.dependsOn('spotbugsMain') + spotbugs { +toolVersion = '3.1.8' +excludeFilter = file("$rootDir/gradle/spotbugs-exclude.xml") +ignoreFailures = false + } + test.dependsOn('spotbugsMain') -tasks.withType(com.github.spotbugs.SpotBugsTask) { - reports { -// Continue supporting `xmlFindBugsReport` for compatibility -xml.enabled(project.hasProperty('xmlSpotBugsReport') || project.hasProperty('xmlFindBugsReport')) -html.enabled(!project.hasProperty('xmlSpotBugsReport') && !project.hasProperty('xmlFindBugsReport')) - } + tasks.withType(com.github.spotbugs.SpotBugsTask) { +reports { + // Continue supporting `xmlFindBugsReport` for compatibility + xml.enabled(project.hasProperty('xmlSpotBugsReport') || project.hasProperty('xmlFindBugsReport')) + html.enabled(!project.hasProperty('xmlSpotBugsReport') && !project.hasProperty('xmlFindBugsReport')) } } diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml index d83c4c4..a954baf 100644 --- a/gradle/spotbugs-exclude.xml +++ b/gradle/spotbugs-exclude.xml @@ -23,6 +23,17 @@ This file dictates which categories of bugs and individual false positives that For a detailed description of spotbugs bug categories, see https://spotbugs.readthedocs.io/en/latest/bugDescriptions.html --> + + + + + + + + + + +
[kafka] branch trunk updated: KAFKA-7367: Streams should not create state store directories unless they are needed (#5696)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new de24d4a KAFKA-7367: Streams should not create state store directories unless they are needed (#5696) de24d4a is described below commit de24d4a459ccfac623e71722f8df8b1e99f2ad43 Author: Kamal Chandraprakash AuthorDate: Wed Nov 28 02:39:44 2018 +0530 KAFKA-7367: Streams should not create state store directories unless they are needed (#5696) * KAFKA-7367: Ensure stateless topologies don't require disk access * KAFKA-7367: Streams should not create state store directories unless they are needed. * Addressed the review comments. * Addressed the review-2 comments. * Fixed FileAlreadyExistsException * Addressed the review-3 comments. * Resolved the conflicts. --- .../org/apache/kafka/streams/KafkaStreams.java | 16 +- .../processor/internals/ProcessorTopology.java | 18 +++ .../processor/internals/StateDirectory.java| 37 +++-- .../org/apache/kafka/streams/KafkaStreamsTest.java | 161 - .../integration/RestoreIntegrationTest.java| 4 +- .../processor/internals/AbstractTaskTest.java | 2 +- .../internals/GlobalStateManagerImplTest.java | 12 +- .../internals/GlobalStreamThreadTest.java | 4 +- .../internals/ProcessorStateManagerTest.java | 4 +- .../processor/internals/ProcessorTopologyTest.java | 70 +++-- .../processor/internals/StandbyTaskTest.java | 2 +- .../processor/internals/StateDirectoryTest.java| 83 +++ .../processor/internals/StreamTaskTest.java| 4 +- .../processor/internals/StreamThreadTest.java | 2 +- .../StreamThreadStateStoreProviderTest.java| 2 +- .../apache/kafka/streams/TopologyTestDriver.java | 4 +- .../kafka/streams/TopologyTestDriverTest.java | 37 - 17 files changed, 375 insertions(+), 87 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index bbda11d..c29b7bc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -644,12 +644,6 @@ public class KafkaStreams implements AutoCloseable { final LogContext logContext = new LogContext(String.format("stream-client [%s] ", clientId)); this.log = logContext.logger(getClass()); -try { -stateDirectory = new StateDirectory(config, time); -} catch (final ProcessorStateException fatal) { -throw new StreamsException(fatal); -} - final MetricConfig metricConfig = new MetricConfig() .samples(config.getInt(StreamsConfig.METRICS_NUM_SAMPLES_CONFIG)) .recordLevel(Sensor.RecordingLevel.forName(config.getString(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG))) @@ -664,7 +658,7 @@ public class KafkaStreams implements AutoCloseable { internalTopologyBuilder.rewriteTopology(config); // sanity check to fail-fast in case we cannot build a ProcessorTopology due to an exception -internalTopologyBuilder.build(); +final ProcessorTopology taskTopology = internalTopologyBuilder.build(); streamsMetadataState = new StreamsMetadataState( internalTopologyBuilder, @@ -680,6 +674,14 @@ public class KafkaStreams implements AutoCloseable { } final ProcessorTopology globalTaskTopology = internalTopologyBuilder.buildGlobalStateTopology(); final long cacheSizePerThread = totalCacheSize / (threads.length + (globalTaskTopology == null ? 0 : 1)); +final boolean createStateDirectory = taskTopology.hasPersistentLocalStore() || +(globalTaskTopology != null && globalTaskTopology.hasPersistentGlobalStore()); + +try { +stateDirectory = new StateDirectory(config, time, createStateDirectory); +} catch (final ProcessorStateException fatal) { +throw new StreamsException(fatal); +} final StateRestoreListener delegatingStateRestoreListener = new DelegatingStateRestoreListener(); GlobalStreamThread.State globalThreadState = null; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java index 8fcbbb4..57af1f3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java @@ -152,6 +152,24 @@ public class ProcessorTopology { return repartitionTopics.contains(topic)
[kafka] branch trunk updated: MINOR: Add system test for optimization upgrades (#5912)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new dfd5454 MINOR: Add system test for optimization upgrades (#5912) dfd5454 is described below commit dfd545485ab0399e0c129ff2d68fbb3113f3e8c9 Author: Bill Bejeck AuthorDate: Tue Nov 27 16:07:34 2018 -0500 MINOR: Add system test for optimization upgrades (#5912) This is a new system test testing for optimizing an existing topology. This test takes the following steps 1. Start a Kafka Streams application that uses a selectKey then performs 3 groupByKey() operations and 1 join creating four repartition topics 2. Verify all instances start and process data 3. Stop all instances and verify stopped 4. For each stopped instance update the config for TOPOLOGY_OPTIMIZATION to all then restart the instance and verify the instance has started successfully also verifying Kafka Streams reduced the number of repartition topics from 4 to 1 5. Verify that each instance is processing data from the aggregation, reduce, and join operation Stop all instances and verify the shut down is complete. 6. For testing I ran two passes of the system test with 25 repeats for a total of 50 test runs. All test runs passed Reviewers: Matthias J. Sax , Bill Bejeck , Guozhang Wang --- .../kafka/streams/tests/StreamsOptimizedTest.java | 156 + tests/kafkatest/services/streams.py| 26 .../tests/streams/streams_optimized_test.py| 150 3 files changed, 332 insertions(+) diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsOptimizedTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsOptimizedTest.java new file mode 100644 index 000..6bde92f --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsOptimizedTest.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.tests; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KafkaStreams.State; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.JoinWindows; +import org.apache.kafka.streams.kstream.Joined; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.kstream.Reducer; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.Properties; +import java.util.function.Function; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static java.time.Duration.ofMillis; + +public class StreamsOptimizedTest { + + +public static void main(final String[] args) throws Exception { +if (args.length < 1) { +System.err.println("StreamsOptimizedTest requires one argument (properties-file) but no provided: "); +} +final String propFileName = args[0]; + +final Properties streamsProperties = Utils.loadProps(propFileName); + +System.out.println("StreamsTest instance started StreamsOptimizedTest"); +System.out.println("props=" + streamsProperties); + +final String inputTopic = (String) Objects.requireNonNull(streamsProperties.remove("input.topic")); +final String aggregationTopic = (String) Objects.requireNonNull(streamsProperties.remove("aggregation.topic")); +final String reduceTopic = (String) Objects.requireNonNull(streamsProperties.remove("reduce.topic")); +final String joinTopic = (String) Objects.requireNonNull(
[kafka] branch trunk updated: KAFKA-7223: Suppression Buffer Metrics (#5795)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 55c77eb KAFKA-7223: Suppression Buffer Metrics (#5795) 55c77eb is described below commit 55c77ebf01ea8662b98f73f6f6c17d05163a85b8 Author: John Roesler AuthorDate: Tue Nov 27 14:57:04 2018 -0600 KAFKA-7223: Suppression Buffer Metrics (#5795) Add the final batch of metrics from KIP-328 Reviewers: Matthias J. Sax , Bill Bejeck , Guozhang Wang --- .../streams/kstream/internals/metrics/Sensors.java | 47 - .../suppress/KTableSuppressProcessor.java | 8 +- .../streams/processor/internals/ProcessorNode.java | 21 +-- .../internals/metrics/StreamsMetricsImpl.java | 3 + .../InMemoryTimeOrderedKeyValueBuffer.java | 26 ++- .../streams/state/internals/metrics/Sensors.java | 69 ++- .../KTableSuppressProcessorMetricsTest.java| 203 + .../suppress/KTableSuppressProcessorTest.java | 13 +- .../streams/processor/MockProcessorContext.java| 9 +- 9 files changed, 375 insertions(+), 24 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java index 5b0d8b5..12c4813 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java @@ -20,10 +20,17 @@ import org.apache.kafka.common.MetricName; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.stats.Avg; import org.apache.kafka.common.metrics.stats.Max; +import org.apache.kafka.common.metrics.stats.Rate; +import org.apache.kafka.common.metrics.stats.Sum; +import org.apache.kafka.common.metrics.stats.Total; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESSOR_NODE_ID_TAG; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESSOR_NODE_METRICS_GROUP; public class Sensors { private Sensors() {} @@ -38,8 +45,8 @@ public class Sensors { ); StreamsMetricsImpl.addInvocationRateAndCount( sensor, -"stream-processor-node-metrics", -metrics.tagMap("task-id", context.taskId().toString(), "processor-node-id", context.currentNode().name()), +PROCESSOR_NODE_METRICS_GROUP, +metrics.tagMap("task-id", context.taskId().toString(), PROCESSOR_NODE_ID_TAG, context.currentNode().name()), "late-record-drop" ); return sensor; @@ -75,4 +82,40 @@ public class Sensors { ); return sensor; } + +public static Sensor suppressionEmitSensor(final InternalProcessorContext context) { +final StreamsMetricsImpl metrics = context.metrics(); + +final Sensor sensor = metrics.nodeLevelSensor( +context.taskId().toString(), +context.currentNode().name(), +"suppression-emit", +Sensor.RecordingLevel.DEBUG +); + +final Map tags = metrics.tagMap( +"task-id", context.taskId().toString(), +PROCESSOR_NODE_ID_TAG, context.currentNode().name() +); + +sensor.add( +new MetricName( +"suppression-emit-rate", +PROCESSOR_NODE_METRICS_GROUP, +"The average number of occurrence of suppression-emit operation per second.", +tags +), +new Rate(TimeUnit.SECONDS, new Sum()) +); +sensor.add( +new MetricName( +"suppression-emit-total", +PROCESSOR_NODE_METRICS_GROUP, +"The total number of occurrence of suppression-emit operations.", +tags +), +new Total() +); +return sensor; +} } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java index 50e74a3..06d5004 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java @@ -16,12 +16,14 @@ */ package org.apache.kafka.streams.kstream.internals.suppress; +import org.apache.kafka.common.metrics.Sensor; impor
[kafka] branch trunk updated: KAFKA-7597: Add transaction support to ProduceBenchWorker (#5885)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 9368743 KAFKA-7597: Add transaction support to ProduceBenchWorker (#5885) 9368743 is described below commit 9368743b8fd2b42a41b44860ea0f3588bb273cc8 Author: Stanislav Kozlovski AuthorDate: Tue Nov 27 20:49:53 2018 + KAFKA-7597: Add transaction support to ProduceBenchWorker (#5885) KAFKA-7597: Add configurable transaction support to ProduceBenchWorker. In order to get support for serializing Optional<> types to JSON, add a new library: jackson-datatype-jdk8. Once Jackson 3 comes out, this library will not be needed. Reviewers: Colin McCabe , Ismael Juma --- build.gradle | 5 + gradle/dependencies.gradle | 1 + .../bin/trogdor-run-transactional-produce-bench.sh | 51 ++ .../services/trogdor/produce_bench_workload.py | 4 +- tests/kafkatest/tests/core/produce_bench_test.py | 29 +- .../org/apache/kafka/trogdor/common/JsonUtil.java | 2 + .../kafka/trogdor/workload/ProduceBenchSpec.java | 37 +++ .../kafka/trogdor/workload/ProduceBenchWorker.java | 112 + .../trogdor/workload/TransactionGenerator.java | 43 .../workload/UniformTransactionsGenerator.java | 57 +++ .../trogdor/common/JsonSerializationTest.java | 3 +- 11 files changed, 318 insertions(+), 26 deletions(-) diff --git a/build.gradle b/build.gradle index 4d514df..5ce648a 100644 --- a/build.gradle +++ b/build.gradle @@ -565,6 +565,7 @@ project(':core') { dependencies { compile project(':clients') compile libs.jacksonDatabind +compile libs.jacksonJDK8Datatypes compile libs.joptSimple compile libs.metrics compile libs.scalaLibrary @@ -830,6 +831,7 @@ project(':clients') { compile libs.snappy compile libs.slf4jApi compileOnly libs.jacksonDatabind // for SASL/OAUTHBEARER bearer token parsing +compileOnly libs.jacksonJDK8Datatypes jacksonDatabindConfig libs.jacksonDatabind // to publish as provided scope dependency. @@ -839,6 +841,7 @@ project(':clients') { testRuntime libs.slf4jlog4j testRuntime libs.jacksonDatabind +testRuntime libs.jacksonJDK8Datatypes } task determineCommitId { @@ -918,6 +921,7 @@ project(':tools') { compile project(':log4j-appender') compile libs.argparse4j compile libs.jacksonDatabind +compile libs.jacksonJDK8Datatypes compile libs.slf4jApi compile libs.jacksonJaxrsJsonProvider @@ -1347,6 +1351,7 @@ project(':connect:json') { dependencies { compile project(':connect:api') compile libs.jacksonDatabind +compile libs.jacksonJDK8Datatypes compile libs.slf4jApi testCompile libs.easymock diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index 7dd3604..59f56fc 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -103,6 +103,7 @@ libs += [ bcpkix: "org.bouncycastle:bcpkix-jdk15on:$versions.bcpkix", easymock: "org.easymock:easymock:$versions.easymock", jacksonDatabind: "com.fasterxml.jackson.core:jackson-databind:$versions.jackson", + jacksonJDK8Datatypes: "com.fasterxml.jackson.datatype:jackson-datatype-jdk8:$versions.jackson", jacksonJaxrsJsonProvider: "com.fasterxml.jackson.jaxrs:jackson-jaxrs-json-provider:$versions.jackson", jaxbApi: "javax.xml.bind:jaxb-api:$versions.jaxb", jaxrsApi: "javax.ws.rs:javax.ws.rs-api:$versions.jaxrs", diff --git a/tests/bin/trogdor-run-transactional-produce-bench.sh b/tests/bin/trogdor-run-transactional-produce-bench.sh new file mode 100755 index 000..fd5ff0a --- /dev/null +++ b/tests/bin/trogdor-run-transactional-produce-bench.sh @@ -0,0 +1,51 @@ +#!/usr/bin/env bash +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +COORDINATOR_ENDPOINT="localhost:8889" +TASK_ID="produce_bench_$RANDOM" +TASK_SPEC=$( +cat < transactionGenerator; private final Map producerConf; private final Map adminClientConf; private final Map common