[kafka] branch trunk updated (a2e87fe -> e869d8f)

2018-11-27 Thread manikumar
This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.


from a2e87fe  KAFKA-7620: Fix restart logic for TTLs in 
WorkerConfigTransformer
 add e869d8f  MINOR: Improve docs by adding ToC links to Monitoring

No new revisions were added by this update.

Summary of changes:
 docs/ops.html | 3 ---
 docs/toc.html | 9 +
 2 files changed, 9 insertions(+), 3 deletions(-)



[kafka] branch 2.0 updated: KAFKA-7620: Fix restart logic for TTLs in WorkerConfigTransformer

2018-11-27 Thread ewencp
This is an automated email from the ASF dual-hosted git repository.

ewencp pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
 new e7298f4  KAFKA-7620: Fix restart logic for TTLs in 
WorkerConfigTransformer
e7298f4 is described below

commit e7298f4fc53f27f91564f60c3818fa392287ff33
Author: Robert Yokota 
AuthorDate: Tue Nov 27 22:01:21 2018 -0800

KAFKA-7620: Fix restart logic for TTLs in WorkerConfigTransformer

The restart logic for TTLs in `WorkerConfigTransformer` was broken when 
trying to make it toggle-able.   Accessing the toggle through the `Herder` 
causes the same code to be called recursively.  This fix just accesses the 
toggle by simply looking in the properties map that is passed to 
`WorkerConfigTransformer`.

Author: Robert Yokota 

Reviewers: Magesh Nandakumar , Ewen 
Cheslack-Postava 

Closes #5914 from rayokota/KAFKA-7620

(cherry picked from commit a2e87feb8b1db8200ca3a34aa72b0802e8f61096)
Signed-off-by: Ewen Cheslack-Postava 
---
 .../kafka/connect/runtime/ConnectorConfig.java |  5 ++-
 .../org/apache/kafka/connect/runtime/Herder.java   |  6 ---
 .../connect/runtime/WorkerConfigTransformer.java   | 44 ++
 .../runtime/distributed/DistributedHerder.java |  8 
 .../runtime/standalone/StandaloneHerder.java   |  8 
 .../runtime/WorkerConfigTransformerTest.java   | 13 ---
 6 files changed, 39 insertions(+), 45 deletions(-)

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
index 9d1a50d..d030fed 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
@@ -35,6 +35,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashSet;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 
 import static 
org.apache.kafka.common.config.ConfigDef.NonEmptyStringWithoutControlChars.nonEmptyStringWithoutControlChars;
@@ -105,8 +106,8 @@ public class ConnectorConfig extends AbstractConfig {
 "indicates that a configuration value will expire in the future.";
 
 private static final String CONFIG_RELOAD_ACTION_DISPLAY = "Reload Action";
-public static final String CONFIG_RELOAD_ACTION_NONE = 
Herder.ConfigReloadAction.NONE.toString();
-public static final String CONFIG_RELOAD_ACTION_RESTART = 
Herder.ConfigReloadAction.RESTART.toString();
+public static final String CONFIG_RELOAD_ACTION_NONE = 
Herder.ConfigReloadAction.NONE.name().toLowerCase(Locale.ROOT);
+public static final String CONFIG_RELOAD_ACTION_RESTART = 
Herder.ConfigReloadAction.RESTART.name().toLowerCase(Locale.ROOT);
 
 public static final String ERRORS_RETRY_TIMEOUT_CONFIG = 
"errors.retry.timeout";
 public static final String ERRORS_RETRY_TIMEOUT_DISPLAY = "Retry Timeout 
for Errors";
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
index 5c7cc14..c572e20 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
@@ -149,12 +149,6 @@ public interface Herder {
 void restartTask(ConnectorTaskId id, Callback cb);
 
 /**
- * Get the configuration reload action.
- * @param connName name of the connector
- */
-ConfigReloadAction connectorConfigReloadAction(final String connName);
-
-/**
  * Restart the connector.
  * @param connName name of the connector
  * @param cb callback to invoke upon completion
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
index 1b715c7..3373d5c 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
@@ -16,10 +16,15 @@
  */
 package org.apache.kafka.connect.runtime;
 
+import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.provider.ConfigProvider;
 import org.apache.kafka.common.config.ConfigTransformer;
 import org.apache.kafka.common.config.ConfigTransformerResult;
+import org.apache.kafka.connect.runtime.Herder.ConfigReloadAction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.util.Locale;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -29,6 +34,8 @@ import java.util.concur

[kafka] branch 2.1 updated: KAFKA-7620: Fix restart logic for TTLs in WorkerConfigTransformer

2018-11-27 Thread ewencp
This is an automated email from the ASF dual-hosted git repository.

ewencp pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
 new 9951bf9  KAFKA-7620: Fix restart logic for TTLs in 
WorkerConfigTransformer
9951bf9 is described below

commit 9951bf911125c51a2574ac0dbb9913bc0500b594
Author: Robert Yokota 
AuthorDate: Tue Nov 27 22:01:21 2018 -0800

KAFKA-7620: Fix restart logic for TTLs in WorkerConfigTransformer

The restart logic for TTLs in `WorkerConfigTransformer` was broken when 
trying to make it toggle-able.   Accessing the toggle through the `Herder` 
causes the same code to be called recursively.  This fix just accesses the 
toggle by simply looking in the properties map that is passed to 
`WorkerConfigTransformer`.

Author: Robert Yokota 

Reviewers: Magesh Nandakumar , Ewen 
Cheslack-Postava 

Closes #5914 from rayokota/KAFKA-7620

(cherry picked from commit a2e87feb8b1db8200ca3a34aa72b0802e8f61096)
Signed-off-by: Ewen Cheslack-Postava 
---
 .../kafka/connect/runtime/ConnectorConfig.java |  5 ++-
 .../org/apache/kafka/connect/runtime/Herder.java   |  6 ---
 .../connect/runtime/WorkerConfigTransformer.java   | 44 ++
 .../runtime/distributed/DistributedHerder.java |  8 
 .../runtime/standalone/StandaloneHerder.java   |  8 
 .../runtime/WorkerConfigTransformerTest.java   | 13 ---
 6 files changed, 39 insertions(+), 45 deletions(-)

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
index 10096a5..e915843 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
@@ -35,6 +35,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashSet;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 
 import static 
org.apache.kafka.common.config.ConfigDef.NonEmptyStringWithoutControlChars.nonEmptyStringWithoutControlChars;
@@ -105,8 +106,8 @@ public class ConnectorConfig extends AbstractConfig {
 "indicates that a configuration value will expire in the future.";
 
 private static final String CONFIG_RELOAD_ACTION_DISPLAY = "Reload Action";
-public static final String CONFIG_RELOAD_ACTION_NONE = 
Herder.ConfigReloadAction.NONE.toString();
-public static final String CONFIG_RELOAD_ACTION_RESTART = 
Herder.ConfigReloadAction.RESTART.toString();
+public static final String CONFIG_RELOAD_ACTION_NONE = 
Herder.ConfigReloadAction.NONE.name().toLowerCase(Locale.ROOT);
+public static final String CONFIG_RELOAD_ACTION_RESTART = 
Herder.ConfigReloadAction.RESTART.name().toLowerCase(Locale.ROOT);
 
 public static final String ERRORS_RETRY_TIMEOUT_CONFIG = 
"errors.retry.timeout";
 public static final String ERRORS_RETRY_TIMEOUT_DISPLAY = "Retry Timeout 
for Errors";
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
index 5c7cc14..c572e20 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
@@ -149,12 +149,6 @@ public interface Herder {
 void restartTask(ConnectorTaskId id, Callback cb);
 
 /**
- * Get the configuration reload action.
- * @param connName name of the connector
- */
-ConfigReloadAction connectorConfigReloadAction(final String connName);
-
-/**
  * Restart the connector.
  * @param connName name of the connector
  * @param cb callback to invoke upon completion
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
index 1b715c7..3373d5c 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
@@ -16,10 +16,15 @@
  */
 package org.apache.kafka.connect.runtime;
 
+import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.provider.ConfigProvider;
 import org.apache.kafka.common.config.ConfigTransformer;
 import org.apache.kafka.common.config.ConfigTransformerResult;
+import org.apache.kafka.connect.runtime.Herder.ConfigReloadAction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.util.Locale;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -29,6 +34,8 @@ import java.util.concur

[kafka] branch trunk updated: KAFKA-7620: Fix restart logic for TTLs in WorkerConfigTransformer

2018-11-27 Thread ewencp
This is an automated email from the ASF dual-hosted git repository.

ewencp pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new a2e87fe  KAFKA-7620: Fix restart logic for TTLs in 
WorkerConfigTransformer
a2e87fe is described below

commit a2e87feb8b1db8200ca3a34aa72b0802e8f61096
Author: Robert Yokota 
AuthorDate: Tue Nov 27 22:01:21 2018 -0800

KAFKA-7620: Fix restart logic for TTLs in WorkerConfigTransformer

The restart logic for TTLs in `WorkerConfigTransformer` was broken when 
trying to make it toggle-able.   Accessing the toggle through the `Herder` 
causes the same code to be called recursively.  This fix just accesses the 
toggle by simply looking in the properties map that is passed to 
`WorkerConfigTransformer`.

Author: Robert Yokota 

Reviewers: Magesh Nandakumar , Ewen 
Cheslack-Postava 

Closes #5914 from rayokota/KAFKA-7620
---
 .../kafka/connect/runtime/ConnectorConfig.java |  5 ++-
 .../org/apache/kafka/connect/runtime/Herder.java   |  6 ---
 .../connect/runtime/WorkerConfigTransformer.java   | 44 ++
 .../runtime/distributed/DistributedHerder.java |  8 
 .../runtime/standalone/StandaloneHerder.java   |  8 
 .../runtime/WorkerConfigTransformerTest.java   | 13 ---
 6 files changed, 39 insertions(+), 45 deletions(-)

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
index efcc01d..8889aad 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
@@ -35,6 +35,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashSet;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 
 import static 
org.apache.kafka.common.config.ConfigDef.NonEmptyStringWithoutControlChars.nonEmptyStringWithoutControlChars;
@@ -105,8 +106,8 @@ public class ConnectorConfig extends AbstractConfig {
 "indicates that a configuration value will expire in the future.";
 
 private static final String CONFIG_RELOAD_ACTION_DISPLAY = "Reload Action";
-public static final String CONFIG_RELOAD_ACTION_NONE = 
Herder.ConfigReloadAction.NONE.toString();
-public static final String CONFIG_RELOAD_ACTION_RESTART = 
Herder.ConfigReloadAction.RESTART.toString();
+public static final String CONFIG_RELOAD_ACTION_NONE = 
Herder.ConfigReloadAction.NONE.name().toLowerCase(Locale.ROOT);
+public static final String CONFIG_RELOAD_ACTION_RESTART = 
Herder.ConfigReloadAction.RESTART.name().toLowerCase(Locale.ROOT);
 
 public static final String ERRORS_RETRY_TIMEOUT_CONFIG = 
"errors.retry.timeout";
 public static final String ERRORS_RETRY_TIMEOUT_DISPLAY = "Retry Timeout 
for Errors";
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
index 5c7cc14..c572e20 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
@@ -149,12 +149,6 @@ public interface Herder {
 void restartTask(ConnectorTaskId id, Callback cb);
 
 /**
- * Get the configuration reload action.
- * @param connName name of the connector
- */
-ConfigReloadAction connectorConfigReloadAction(final String connName);
-
-/**
  * Restart the connector.
  * @param connName name of the connector
  * @param cb callback to invoke upon completion
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
index 1b715c7..3373d5c 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
@@ -16,10 +16,15 @@
  */
 package org.apache.kafka.connect.runtime;
 
+import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.provider.ConfigProvider;
 import org.apache.kafka.common.config.ConfigTransformer;
 import org.apache.kafka.common.config.ConfigTransformerResult;
+import org.apache.kafka.connect.runtime.Herder.ConfigReloadAction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.util.Locale;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -29,6 +34,8 @@ import java.util.concurrent.ConcurrentMap;
  * retrieved TTL values.
  */
 public class WorkerConfigTransformer {
+private static final

[kafka] branch 2.1 updated: MINOR: Fix kafkatest/__init__.py to use dev0 instead of SNAPSHOT (#5955)

2018-11-27 Thread ijuma
This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
 new 9fa5477  MINOR: Fix kafkatest/__init__.py to use dev0 instead of 
SNAPSHOT (#5955)
9fa5477 is described below

commit 9fa5477b7a4d980c60d729ef15e1468a3c425fcb
Author: Lucas Bradstreet 
AuthorDate: Tue Nov 27 15:31:18 2018 -1000

MINOR: Fix kafkatest/__init__.py to use dev0 instead of SNAPSHOT (#5955)

Fixes the system tests.

Reviewers: Ismael Juma 
---
 tests/kafkatest/__init__.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/tests/kafkatest/__init__.py b/tests/kafkatest/__init__.py
index 8dfb212..eb6e26b 100644
--- a/tests/kafkatest/__init__.py
+++ b/tests/kafkatest/__init__.py
@@ -22,4 +22,4 @@
 # Instead, in development branches, the version should have a suffix of the 
form ".devN"
 #
 # For example, when Kafka is at version 1.0.0-SNAPSHOT, this should be 
something like "1.0.0.dev0"
-__version__ = '2.1.1-SNAPSHOT'
+__version__ = '2.1.1.dev0'



[kafka] branch trunk updated: KAFKA-7389: Enable spotBugs with Java 11 and disable false positive warnings (#5943)

2018-11-27 Thread ijuma
This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 0ee1635  KAFKA-7389: Enable spotBugs with Java 11 and disable false 
positive warnings (#5943)
0ee1635 is described below

commit 0ee16350ac2e1ac7fa16f276ae61d2899eaaf28e
Author: Ismael Juma 
AuthorDate: Tue Nov 27 14:40:17 2018 -0800

KAFKA-7389: Enable spotBugs with Java 11 and disable false positive 
warnings (#5943)

See https://github.com/spotbugs/spotbugs/issues/756 for details on
the false positives affecting try with resources. An example is:

> RCN | Nullcheck of fc at line 629 of value previously dereferenced in
> org.apache.kafka.common.utils.Utils.readFileAsString(String, Charset)

Reviewers: Manikumar Reddy 
---
 build.gradle| 27 ---
 gradle/spotbugs-exclude.xml | 11 +++
 2 files changed, 23 insertions(+), 15 deletions(-)

diff --git a/build.gradle b/build.gradle
index 5ce648a..d9816a1 100644
--- a/build.gradle
+++ b/build.gradle
@@ -149,8 +149,7 @@ subprojects {
   apply plugin: 'maven'
   apply plugin: 'signing'
   apply plugin: 'checkstyle'
-  if (!JavaVersion.current().isJava11Compatible())
-apply plugin: "com.github.spotbugs"
+  apply plugin: "com.github.spotbugs"
 
   sourceCompatibility = minJavaVersion
   targetCompatibility = minJavaVersion
@@ -372,20 +371,18 @@ subprojects {
   }
   test.dependsOn('checkstyleMain', 'checkstyleTest')
 
-  if (!JavaVersion.current().isJava11Compatible()) {
-spotbugs {
-  toolVersion = '3.1.8'
-  excludeFilter = file("$rootDir/gradle/spotbugs-exclude.xml")
-  ignoreFailures = false
-}
-test.dependsOn('spotbugsMain')
+  spotbugs {
+toolVersion = '3.1.8'
+excludeFilter = file("$rootDir/gradle/spotbugs-exclude.xml")
+ignoreFailures = false
+  }
+  test.dependsOn('spotbugsMain')
 
-tasks.withType(com.github.spotbugs.SpotBugsTask) {
-  reports {
-// Continue supporting `xmlFindBugsReport` for compatibility
-xml.enabled(project.hasProperty('xmlSpotBugsReport') || 
project.hasProperty('xmlFindBugsReport'))
-html.enabled(!project.hasProperty('xmlSpotBugsReport') && 
!project.hasProperty('xmlFindBugsReport'))
-  }
+  tasks.withType(com.github.spotbugs.SpotBugsTask) {
+reports {
+  // Continue supporting `xmlFindBugsReport` for compatibility
+  xml.enabled(project.hasProperty('xmlSpotBugsReport') || 
project.hasProperty('xmlFindBugsReport'))
+  html.enabled(!project.hasProperty('xmlSpotBugsReport') && 
!project.hasProperty('xmlFindBugsReport'))
 }
   }
 
diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml
index d83c4c4..a954baf 100644
--- a/gradle/spotbugs-exclude.xml
+++ b/gradle/spotbugs-exclude.xml
@@ -23,6 +23,17 @@ This file dictates which categories of bugs and individual 
false positives that
 For a detailed description of spotbugs bug categories, see 
https://spotbugs.readthedocs.io/en/latest/bugDescriptions.html
 -->
 
+
+
+
+
+
+
+
+
+
+
+
 
 

[kafka] branch trunk updated: KAFKA-7367: Streams should not create state store directories unless they are needed (#5696)

2018-11-27 Thread mjsax
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new de24d4a  KAFKA-7367: Streams should not create state store directories 
unless they are needed (#5696)
de24d4a is described below

commit de24d4a459ccfac623e71722f8df8b1e99f2ad43
Author: Kamal Chandraprakash 
AuthorDate: Wed Nov 28 02:39:44 2018 +0530

KAFKA-7367: Streams should not create state store directories unless they 
are needed (#5696)

* KAFKA-7367: Ensure stateless topologies don't require disk access

* KAFKA-7367: Streams should not create state store directories unless they 
are needed.

* Addressed the review comments.

* Addressed the review-2 comments.

* Fixed FileAlreadyExistsException

* Addressed the review-3 comments.

* Resolved the conflicts.
---
 .../org/apache/kafka/streams/KafkaStreams.java |  16 +-
 .../processor/internals/ProcessorTopology.java |  18 +++
 .../processor/internals/StateDirectory.java|  37 +++--
 .../org/apache/kafka/streams/KafkaStreamsTest.java | 161 -
 .../integration/RestoreIntegrationTest.java|   4 +-
 .../processor/internals/AbstractTaskTest.java  |   2 +-
 .../internals/GlobalStateManagerImplTest.java  |  12 +-
 .../internals/GlobalStreamThreadTest.java  |   4 +-
 .../internals/ProcessorStateManagerTest.java   |   4 +-
 .../processor/internals/ProcessorTopologyTest.java |  70 +++--
 .../processor/internals/StandbyTaskTest.java   |   2 +-
 .../processor/internals/StateDirectoryTest.java|  83 +++
 .../processor/internals/StreamTaskTest.java|   4 +-
 .../processor/internals/StreamThreadTest.java  |   2 +-
 .../StreamThreadStateStoreProviderTest.java|   2 +-
 .../apache/kafka/streams/TopologyTestDriver.java   |   4 +-
 .../kafka/streams/TopologyTestDriverTest.java  |  37 -
 17 files changed, 375 insertions(+), 87 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java 
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index bbda11d..c29b7bc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -644,12 +644,6 @@ public class KafkaStreams implements AutoCloseable {
 final LogContext logContext = new 
LogContext(String.format("stream-client [%s] ", clientId));
 this.log = logContext.logger(getClass());
 
-try {
-stateDirectory = new StateDirectory(config, time);
-} catch (final ProcessorStateException fatal) {
-throw new StreamsException(fatal);
-}
-
 final MetricConfig metricConfig = new MetricConfig()
 .samples(config.getInt(StreamsConfig.METRICS_NUM_SAMPLES_CONFIG))
 
.recordLevel(Sensor.RecordingLevel.forName(config.getString(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG)))
@@ -664,7 +658,7 @@ public class KafkaStreams implements AutoCloseable {
 internalTopologyBuilder.rewriteTopology(config);
 
 // sanity check to fail-fast in case we cannot build a 
ProcessorTopology due to an exception
-internalTopologyBuilder.build();
+final ProcessorTopology taskTopology = internalTopologyBuilder.build();
 
 streamsMetadataState = new StreamsMetadataState(
 internalTopologyBuilder,
@@ -680,6 +674,14 @@ public class KafkaStreams implements AutoCloseable {
 }
 final ProcessorTopology globalTaskTopology = 
internalTopologyBuilder.buildGlobalStateTopology();
 final long cacheSizePerThread = totalCacheSize / (threads.length + 
(globalTaskTopology == null ? 0 : 1));
+final boolean createStateDirectory = 
taskTopology.hasPersistentLocalStore() ||
+(globalTaskTopology != null && 
globalTaskTopology.hasPersistentGlobalStore());
+
+try {
+stateDirectory = new StateDirectory(config, time, 
createStateDirectory);
+} catch (final ProcessorStateException fatal) {
+throw new StreamsException(fatal);
+}
 
 final StateRestoreListener delegatingStateRestoreListener = new 
DelegatingStateRestoreListener();
 GlobalStreamThread.State globalThreadState = null;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
index 8fcbbb4..57af1f3 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
@@ -152,6 +152,24 @@ public class ProcessorTopology {
 return repartitionTopics.contains(topic)

[kafka] branch trunk updated: MINOR: Add system test for optimization upgrades (#5912)

2018-11-27 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new dfd5454  MINOR: Add system test for optimization upgrades (#5912)
dfd5454 is described below

commit dfd545485ab0399e0c129ff2d68fbb3113f3e8c9
Author: Bill Bejeck 
AuthorDate: Tue Nov 27 16:07:34 2018 -0500

MINOR: Add system test for optimization upgrades (#5912)

This is a new system test testing for optimizing an existing topology. This 
test takes the following steps

1. Start a Kafka Streams application that uses a selectKey then performs 3 
groupByKey() operations and 1 join creating four repartition topics
2. Verify all instances start and process data
3. Stop all instances and verify stopped
4. For each stopped instance update the config for TOPOLOGY_OPTIMIZATION to 
all then restart the instance and verify the instance has started successfully 
also verifying Kafka Streams reduced the number of repartition topics from 4 to 
1
5. Verify that each instance is processing data from the aggregation, 
reduce, and join operation
Stop all instances and verify the shut down is complete.
6. For testing I ran two passes of the system test with 25 repeats for a 
total of 50 test runs.

All test runs passed

Reviewers: Matthias J. Sax , Bill Bejeck 
, Guozhang Wang 
---
 .../kafka/streams/tests/StreamsOptimizedTest.java  | 156 +
 tests/kafkatest/services/streams.py|  26 
 .../tests/streams/streams_optimized_test.py| 150 
 3 files changed, 332 insertions(+)

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsOptimizedTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsOptimizedTest.java
new file mode 100644
index 000..6bde92f
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsOptimizedTest.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.State;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.Joined;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.Reducer;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static java.time.Duration.ofMillis;
+
+public class StreamsOptimizedTest {
+
+
+public static void main(final String[] args) throws Exception {
+if (args.length < 1) {
+System.err.println("StreamsOptimizedTest requires one argument 
(properties-file) but no provided: ");
+}
+final String propFileName = args[0];
+
+final Properties streamsProperties = Utils.loadProps(propFileName);
+
+System.out.println("StreamsTest instance started 
StreamsOptimizedTest");
+System.out.println("props=" + streamsProperties);
+
+final String inputTopic = (String) 
Objects.requireNonNull(streamsProperties.remove("input.topic"));
+final String aggregationTopic = (String) 
Objects.requireNonNull(streamsProperties.remove("aggregation.topic"));
+final String reduceTopic = (String) 
Objects.requireNonNull(streamsProperties.remove("reduce.topic"));
+final String joinTopic = (String) 
Objects.requireNonNull(

[kafka] branch trunk updated: KAFKA-7223: Suppression Buffer Metrics (#5795)

2018-11-27 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 55c77eb  KAFKA-7223: Suppression Buffer Metrics (#5795)
55c77eb is described below

commit 55c77ebf01ea8662b98f73f6f6c17d05163a85b8
Author: John Roesler 
AuthorDate: Tue Nov 27 14:57:04 2018 -0600

KAFKA-7223: Suppression Buffer Metrics (#5795)

Add the final batch of metrics from KIP-328

Reviewers: Matthias J. Sax , Bill Bejeck 
, Guozhang Wang 
---
 .../streams/kstream/internals/metrics/Sensors.java |  47 -
 .../suppress/KTableSuppressProcessor.java  |   8 +-
 .../streams/processor/internals/ProcessorNode.java |  21 +--
 .../internals/metrics/StreamsMetricsImpl.java  |   3 +
 .../InMemoryTimeOrderedKeyValueBuffer.java |  26 ++-
 .../streams/state/internals/metrics/Sensors.java   |  69 ++-
 .../KTableSuppressProcessorMetricsTest.java| 203 +
 .../suppress/KTableSuppressProcessorTest.java  |  13 +-
 .../streams/processor/MockProcessorContext.java|   9 +-
 9 files changed, 375 insertions(+), 24 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java
index 5b0d8b5..12c4813 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java
@@ -20,10 +20,17 @@ import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.metrics.stats.Sum;
+import org.apache.kafka.common.metrics.stats.Total;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESSOR_NODE_ID_TAG;
+import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESSOR_NODE_METRICS_GROUP;
 
 public class Sensors {
 private Sensors() {}
@@ -38,8 +45,8 @@ public class Sensors {
 );
 StreamsMetricsImpl.addInvocationRateAndCount(
 sensor,
-"stream-processor-node-metrics",
-metrics.tagMap("task-id", context.taskId().toString(), 
"processor-node-id", context.currentNode().name()),
+PROCESSOR_NODE_METRICS_GROUP,
+metrics.tagMap("task-id", context.taskId().toString(), 
PROCESSOR_NODE_ID_TAG, context.currentNode().name()),
 "late-record-drop"
 );
 return sensor;
@@ -75,4 +82,40 @@ public class Sensors {
 );
 return sensor;
 }
+
+public static Sensor suppressionEmitSensor(final InternalProcessorContext 
context) {
+final StreamsMetricsImpl metrics = context.metrics();
+
+final Sensor sensor = metrics.nodeLevelSensor(
+context.taskId().toString(),
+context.currentNode().name(),
+"suppression-emit",
+Sensor.RecordingLevel.DEBUG
+);
+
+final Map tags = metrics.tagMap(
+"task-id", context.taskId().toString(),
+PROCESSOR_NODE_ID_TAG, context.currentNode().name()
+);
+
+sensor.add(
+new MetricName(
+"suppression-emit-rate",
+PROCESSOR_NODE_METRICS_GROUP,
+"The average number of occurrence of suppression-emit 
operation per second.",
+tags
+),
+new Rate(TimeUnit.SECONDS, new Sum())
+);
+sensor.add(
+new MetricName(
+"suppression-emit-total",
+PROCESSOR_NODE_METRICS_GROUP,
+"The total number of occurrence of suppression-emit 
operations.",
+tags
+),
+new Total()
+);
+return sensor;
+}
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java
index 50e74a3..06d5004 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java
@@ -16,12 +16,14 @@
  */
 package org.apache.kafka.streams.kstream.internals.suppress;
 
+import org.apache.kafka.common.metrics.Sensor;
 impor

[kafka] branch trunk updated: KAFKA-7597: Add transaction support to ProduceBenchWorker (#5885)

2018-11-27 Thread cmccabe
This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 9368743  KAFKA-7597: Add transaction support to ProduceBenchWorker 
(#5885)
9368743 is described below

commit 9368743b8fd2b42a41b44860ea0f3588bb273cc8
Author: Stanislav Kozlovski 
AuthorDate: Tue Nov 27 20:49:53 2018 +

KAFKA-7597: Add transaction support to ProduceBenchWorker (#5885)

KAFKA-7597: Add configurable transaction support to ProduceBenchWorker.  In 
order to get support for serializing Optional<> types to JSON, add a new 
library: jackson-datatype-jdk8. Once Jackson 3 comes out, this library will not 
be needed.

Reviewers: Colin McCabe , Ismael Juma 

---
 build.gradle   |   5 +
 gradle/dependencies.gradle |   1 +
 .../bin/trogdor-run-transactional-produce-bench.sh |  51 ++
 .../services/trogdor/produce_bench_workload.py |   4 +-
 tests/kafkatest/tests/core/produce_bench_test.py   |  29 +-
 .../org/apache/kafka/trogdor/common/JsonUtil.java  |   2 +
 .../kafka/trogdor/workload/ProduceBenchSpec.java   |  37 +++
 .../kafka/trogdor/workload/ProduceBenchWorker.java | 112 +
 .../trogdor/workload/TransactionGenerator.java |  43 
 .../workload/UniformTransactionsGenerator.java |  57 +++
 .../trogdor/common/JsonSerializationTest.java  |   3 +-
 11 files changed, 318 insertions(+), 26 deletions(-)

diff --git a/build.gradle b/build.gradle
index 4d514df..5ce648a 100644
--- a/build.gradle
+++ b/build.gradle
@@ -565,6 +565,7 @@ project(':core') {
   dependencies {
 compile project(':clients')
 compile libs.jacksonDatabind
+compile libs.jacksonJDK8Datatypes
 compile libs.joptSimple
 compile libs.metrics
 compile libs.scalaLibrary
@@ -830,6 +831,7 @@ project(':clients') {
 compile libs.snappy
 compile libs.slf4jApi
 compileOnly libs.jacksonDatabind // for SASL/OAUTHBEARER bearer token 
parsing
+compileOnly libs.jacksonJDK8Datatypes
 
 jacksonDatabindConfig libs.jacksonDatabind // to publish as provided scope 
dependency.
 
@@ -839,6 +841,7 @@ project(':clients') {
 
 testRuntime libs.slf4jlog4j
 testRuntime libs.jacksonDatabind
+testRuntime libs.jacksonJDK8Datatypes
   }
 
   task determineCommitId {
@@ -918,6 +921,7 @@ project(':tools') {
 compile project(':log4j-appender')
 compile libs.argparse4j
 compile libs.jacksonDatabind
+compile libs.jacksonJDK8Datatypes
 compile libs.slf4jApi
 
 compile libs.jacksonJaxrsJsonProvider
@@ -1347,6 +1351,7 @@ project(':connect:json') {
   dependencies {
 compile project(':connect:api')
 compile libs.jacksonDatabind
+compile libs.jacksonJDK8Datatypes
 compile libs.slf4jApi
 
 testCompile libs.easymock
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index 7dd3604..59f56fc 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -103,6 +103,7 @@ libs += [
   bcpkix: "org.bouncycastle:bcpkix-jdk15on:$versions.bcpkix",
   easymock: "org.easymock:easymock:$versions.easymock",
   jacksonDatabind: 
"com.fasterxml.jackson.core:jackson-databind:$versions.jackson",
+  jacksonJDK8Datatypes: 
"com.fasterxml.jackson.datatype:jackson-datatype-jdk8:$versions.jackson",
   jacksonJaxrsJsonProvider: 
"com.fasterxml.jackson.jaxrs:jackson-jaxrs-json-provider:$versions.jackson",
   jaxbApi: "javax.xml.bind:jaxb-api:$versions.jaxb",
   jaxrsApi: "javax.ws.rs:javax.ws.rs-api:$versions.jaxrs",
diff --git a/tests/bin/trogdor-run-transactional-produce-bench.sh 
b/tests/bin/trogdor-run-transactional-produce-bench.sh
new file mode 100755
index 000..fd5ff0a
--- /dev/null
+++ b/tests/bin/trogdor-run-transactional-produce-bench.sh
@@ -0,0 +1,51 @@
+#!/usr/bin/env bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+COORDINATOR_ENDPOINT="localhost:8889"
+TASK_ID="produce_bench_$RANDOM"
+TASK_SPEC=$(
+cat < transactionGenerator;
 private final Map producerConf;
 private final Map adminClientConf;
 private final Map common