[kafka] branch trunk updated (2a54347 -> 78f5da9)

2019-10-16 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.


from 2a54347  MINOR: Improve FK Join docs and optimize null-fk case (#7536)
 add 78f5da9  KAFKA-9053: AssignmentInfo#encode hardcodes the 
LATEST_SUPPORTED_VERSION (#7537)

No new revisions were added by this update.

Summary of changes:
 .../internals/StreamsPartitionAssignor.java| 22 ++
 .../internals/assignment/AssignmentInfo.java   | 22 +++---
 .../internals/assignment/SubscriptionInfo.java |  4 ++--
 .../internals/assignment/AssignmentInfoTest.java   |  9 +
 .../internals/assignment/SubscriptionInfoTest.java | 17 +
 5 files changed, 57 insertions(+), 17 deletions(-)



[kafka] branch trunk updated (1d6d370 -> 3e30bf5)

2019-10-16 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.


from 1d6d370  MINOR: Update Kafka Streams upgrade docs for KIP-444, 
KIP-470, KIP-471, KIP-474, KIP-528 (#7515)
 add 3e30bf5  Explain the separate  upgrade paths for consumer groups and 
Streams (#7516)

No new revisions were added by this update.

Summary of changes:
 docs/streams/upgrade-guide.html | 17 +++--
 docs/upgrade.html   | 25 +
 2 files changed, 36 insertions(+), 6 deletions(-)



[kafka] branch 2.4 updated: Explain the separate upgrade paths for consumer groups and Streams (#7516)

2019-10-16 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
 new ee4f65d  Explain the separate  upgrade paths for consumer groups and 
Streams (#7516)
ee4f65d is described below

commit ee4f65dfa5bd2a14e17805734be3d9e1fdfc25a2
Author: A. Sophie Blee-Goldman 
AuthorDate: Wed Oct 16 16:09:09 2019 -0700

Explain the separate  upgrade paths for consumer groups and Streams (#7516)

Document the upgrade path for the consumer and for Streams (note that they 
differ significantly).

Needs to be cherry-picked to 2.4

Reviewers: Guozhang Wang 
---
 docs/streams/upgrade-guide.html | 17 +++--
 docs/upgrade.html   | 25 +
 2 files changed, 36 insertions(+), 6 deletions(-)

diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index 1cc5b50..2de7917 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -34,10 +34,10 @@
 
 
 
-Upgrading from any older version to {{fullDotVersion}} is possible: 
(1) if you are upgrading from 2.0.x to {{fullDotVersion}} then a single rolling 
bounce is needed to swap in the new jar,
-(2) if you are upgrading from older versions than 2.0.x in the online 
mode, you would need two rolling bounces where
-the first rolling bounce phase you need to set config 
upgrade.from="older version" (possible values are "0.10.0", 
"0.10.1", "0.10.2", "0.11.0", "1.0", and "1.1")
-(cf. https://cwiki.apache.org/confluence/display/KAFKA/KIP-268%3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade;>KIP-268):
+Upgrading from any older version to {{fullDotVersion}} is possible: 
you will need to do two rolling bounces, where during the first rolling bounce 
phase you set the config upgrade.from="older version"
+(possible values are "0.10.0" - "2.3") and during the 
second you remove it. This is required to safely upgrade to the new cooperative 
rebalancing protocol of the embedded consumer. Note that you will remain using 
the old eager
+rebalancing protocol if you skip or delay the second rolling bounce, 
but you can safely switch over to cooperative at any time once the entire group 
is on 2.4+ by removing the config value and bouncing. For more details please 
refer to
+https://cwiki.apache.org/confluence/x/vAclBg;>KIP-429:
 
 
  prepare your application instances for a rolling bounce and make 
sure that config upgrade.from is set to the version from which it 
is being upgrade.
@@ -75,11 +75,16 @@
 Streams API changes in 2.4.0
 
 
-
 
 
 
-
+
+With the introduction of incremental cooperative rebalancing, Streams 
no longer requires all tasks be revoked at the beginning of a rebalance. 
Instead, at the completion of the rebalance only those tasks which are to be 
migrated to another consumer
+for overall load balance will need to be closed and revoked. This 
changes the semantics of the StateListener a bit, as it will not 
necessarily transition to REBALANCING at the beginning of a 
rebalance anymore. Note that
+this means IQ will now be available at all times except during state 
restoration, including while a rebalance is in progress. If restoration is 
occurring when a rebalance begins, we will continue to actively restore the 
state stores and/or process
+standby tasks during a cooperative rebalance. Note that with this new 
rebalancing protocol, you may sometimes see a rebalance be followed by a second 
short rebalance that ensures all tasks are safely distributed. For details on 
please see
+https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol;>KIP-429.
+
 
 
 The 2.4.0 release contains newly added and reworked metrics.
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 0c69336..6d9f702 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -45,6 +45,31 @@
 The old overloaded functions are deprecated and we would recommend 
users to make their code changes to leverage the new methods (details
 can be found in https://cwiki.apache.org/confluence/display/KAFKA/KIP-520%3A+Add+overloaded+Consumer%23committed+for+batching+partitions;>KIP-520).
 
+We are introducing incremental cooperative rebalancing to the clients' 
group protocol, which allows consumers to keep all of their assigned partitions 
during a rebalance
+and at the end revoke only those which must be migrated to another 
consumer for overall cluster balance. The ConsumerCoordinator will 
choose the latest RebalanceProtocol
+that is commonly supported by all of 

[kafka] branch trunk updated (1d6d370 -> 3e30bf5)

2019-10-16 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.


from 1d6d370  MINOR: Update Kafka Streams upgrade docs for KIP-444, 
KIP-470, KIP-471, KIP-474, KIP-528 (#7515)
 add 3e30bf5  Explain the separate  upgrade paths for consumer groups and 
Streams (#7516)

No new revisions were added by this update.

Summary of changes:
 docs/streams/upgrade-guide.html | 17 +++--
 docs/upgrade.html   | 25 +
 2 files changed, 36 insertions(+), 6 deletions(-)



[kafka] branch trunk updated (0725035 -> 8966d06)

2019-10-16 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.


from 0725035  KAFKA-9032: Bypass serdes for tombstones (#7518)
 add 8966d06  KAFKA-9039: Optimize ReplicaFetcher fetch path (#7443)

No new revisions were added by this update.

Summary of changes:
 checkstyle/import-control-jmh-benchmarks.xml   |   2 +
 .../apache/kafka/clients/FetchSessionHandler.java  |  44 ++-
 .../kafka/common/internals/PartitionStates.java|   2 +-
 .../scala/kafka/server/AbstractFetcherThread.scala | 117 
 .../kafka/server/ReplicaAlterLogDirsThread.scala   |  19 +-
 .../scala/kafka/server/ReplicaFetcherThread.scala  |  30 +-
 .../kafka/server/AbstractFetcherManagerTest.scala  |   2 +-
 .../kafka/server/AbstractFetcherThreadTest.scala   |  15 +-
 .../server/ReplicaAlterLogDirsThreadTest.scala |  27 +-
 gradle/spotbugs-exclude.xml|   2 +
 .../jmh/fetcher/ReplicaFetcherThreadBenchmark.java | 312 +
 .../jmh/fetchsession/FetchSessionBenchmark.java| 119 
 12 files changed, 579 insertions(+), 112 deletions(-)
 create mode 100644 
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
 create mode 100644 
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetchsession/FetchSessionBenchmark.java



[kafka] branch 2.4 updated: KAFKA-9032: Bypass serdes for tombstones (#7518)

2019-10-16 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
 new f2495e6  KAFKA-9032: Bypass serdes for tombstones (#7518)
f2495e6 is described below

commit f2495e6fc81a4bfcc9ce0908643fb1aa63d3dcd3
Author: John Roesler 
AuthorDate: Wed Oct 16 11:34:52 2019 -0500

KAFKA-9032: Bypass serdes for tombstones (#7518)

In a KTable context, null record values have a special "tombstone" 
significance. We should always bypass the serdes for such tombstones, since 
otherwise the serde could violate Streams' table semantics.

Added test coverage for this case and fixed the code accordingly.

Reviewers: A. Sophie Blee-Goldman , Matthias J. Sax 
, Guozhang Wang , Bill Bejeck 

---
 .../SubscriptionResponseWrapperSerde.java  | 12 +++--
 .../SubscriptionResponseWrapperSerdeTest.java  | 63 ++
 2 files changed, 60 insertions(+), 15 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java
index 6524b4f..4277f9a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java
@@ -58,7 +58,7 @@ public class SubscriptionResponseWrapperSerde implements 
Serde 
implements Serde 0) {
+final byte[] serializedValue;
 serializedValue = new byte[data.length - lengthSum];
 buf.get(serializedValue, 0, serializedValue.length);
-} else
-serializedValue = null;
+value = deserializer.deserialize(topic, serializedValue);
+} else {
+value = null;
+}
 
-final V value = deserializer.deserialize(topic, serializedValue);
 return new SubscriptionResponseWrapper<>(hash, value, version);
 }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerdeTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerdeTest.java
index fde9bdd..40948e3 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerdeTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerdeTest.java
@@ -17,15 +17,58 @@
 package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;
 
 import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.state.internals.Murmur3;
 import org.junit.Test;
 
-import static org.junit.Assert.assertEquals;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
 import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 
 public class SubscriptionResponseWrapperSerdeTest {
+private static final class NonNullableSerde implements Serde, 
Serializer, Deserializer {
+private final Serde delegate;
+
+NonNullableSerde(final Serde delegate) {
+this.delegate = delegate;
+}
+
+@Override
+public void configure(final Map configs, final boolean 
isKey) {
+
+}
+
+@Override
+public void close() {
+
+}
+
+@Override
+public Serializer serializer() {
+return this;
+}
+
+@Override
+public Deserializer deserializer() {
+return this;
+}
+
+@Override
+public byte[] serialize(final String topic, final T data) {
+return delegate.serializer().serialize(topic, 
requireNonNull(data));
+}
+
+@Override
+public T deserialize(final String topic, final byte[] data) {
+return delegate.deserializer().deserialize(topic, 
requireNonNull(data));
+}
+}
 
 @Test
 @SuppressWarnings("unchecked")
@@ -33,9 +76,9 @@ public class SubscriptionResponseWrapperSerdeTest {
 final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0x01, 
(byte) 0x9A, (byte) 0xFF, (byte) 0x00});
 final String foreignValue = "foreignValue";
 final SubscriptionResponseWrapper srw = new 
Subsc

[kafka] branch trunk updated (3e24495 -> 0725035)

2019-10-16 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.


from 3e24495  KAFKA-8897: Warn about no guaranteed backwards compatibility 
in RocksDBConfigSetter (#7483)
 add 0725035  KAFKA-9032: Bypass serdes for tombstones (#7518)

No new revisions were added by this update.

Summary of changes:
 .../SubscriptionResponseWrapperSerde.java  | 12 +++--
 .../SubscriptionResponseWrapperSerdeTest.java  | 63 ++
 2 files changed, 60 insertions(+), 15 deletions(-)



[kafka] branch 2.4 updated: MINOR: remove unused import in QueryableStateIntegrationTest (#7521)

2019-10-15 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
 new 4755084  MINOR: remove unused import in QueryableStateIntegrationTest 
(#7521)
4755084 is described below

commit 475508405eff45b149b5725e174c0f85690b9c08
Author: Lucas Bradstreet 
AuthorDate: Tue Oct 15 12:18:20 2019 -0700

MINOR: remove unused import in QueryableStateIntegrationTest (#7521)

Reviewers: Guozhang Wang 
---
 .../apache/kafka/streams/integration/QueryableStateIntegrationTest.java  | 1 -
 1 file changed, 1 deletion(-)

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index de2ae27..a0d38dc 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -54,7 +54,6 @@ import org.apache.kafka.streams.state.StreamsMetadata;
 import org.apache.kafka.streams.state.WindowStoreIterator;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.MockMapper;
-import org.apache.kafka.test.StreamsTestUtils;
 import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;



[kafka] branch trunk updated (e66ed2d -> caf3499)

2019-10-15 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.


from e66ed2d  MINOR: code and JavaDoc cleanup (#7462)
 add caf3499  MINOR: remove unused import in QueryableStateIntegrationTest 
(#7521)

No new revisions were added by this update.

Summary of changes:
 .../apache/kafka/streams/integration/QueryableStateIntegrationTest.java  | 1 -
 1 file changed, 1 deletion(-)



[kafka] branch 2.4 updated: KAFKA-4422 / KAFKA-8700 / KAFKA-5566: Wait for state to transit to RUNNING upon start (#7519)

2019-10-15 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
 new bb75897  KAFKA-4422 / KAFKA-8700 / KAFKA-5566: Wait for state to 
transit to RUNNING upon start (#7519)
bb75897 is described below

commit bb758977e91c10b0e56da1c51dcaee4d6a094d73
Author: Guozhang Wang 
AuthorDate: Tue Oct 15 11:34:48 2019 -0700

KAFKA-4422 / KAFKA-8700 / KAFKA-5566: Wait for state to transit to RUNNING 
upon start (#7519)

I looked into the logs of the above tickets, and I think for a couple fo 
them it is due to the fact that the threads takes time to restore, or just 
stabilize the rebalance since there are multi-threads. Adding the hook to wait 
for state to transit to RUNNING upon starting.

Reviewers: Chris Pettitt , Matthias J. Sax 

---
 .../integration/QueryableStateIntegrationTest.java | 33 --
 1 file changed, 24 insertions(+), 9 deletions(-)

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index c5dbabe..de2ae27 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -54,6 +54,7 @@ import org.apache.kafka.streams.state.StreamsMetadata;
 import org.apache.kafka.streams.state.WindowStoreIterator;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.MockMapper;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
@@ -92,6 +93,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static 
org.apache.kafka.test.StreamsTestUtils.startKafkaStreamsAndWaitForRunningState;
 
 @Category({IntegrationTest.class})
 public class QueryableStateIntegrationTest {
@@ -261,6 +263,16 @@ public class QueryableStateIntegrationTest {
 @Override
 public void run() {
 myStream.start();
+
+try {
+TestUtils.waitForCondition(
+() -> 
stateListener.mapStates.containsKey(KafkaStreams.State.RUNNING),
+"Did not start successfully after " + 
TestUtils.DEFAULT_MAX_WAIT_MS + " ms"
+);
+} catch (final InterruptedException e) {
+if 
(!stateListener.mapStates.containsKey(KafkaStreams.State.RUNNING))
+fail("Did not start successfully");
+}
 }
 
 public void close() {
@@ -446,7 +458,8 @@ public class QueryableStateIntegrationTest {
 windowStoreName,
 streamsConfiguration);
 
-kafkaStreams.start();
+startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+
 producerThread.start();
 
 try {
@@ -515,7 +528,7 @@ public class QueryableStateIntegrationTest {
 t2.toStream().to(outputTopic);
 
 kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
-kafkaStreams.start();
+startKafkaStreamsAndWaitForRunningState(kafkaStreams);
 
 waitUntilAtLeastNumRecordProcessed(outputTopic, 1);
 
@@ -581,7 +594,7 @@ public class QueryableStateIntegrationTest {
 .to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
 
 kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
-kafkaStreams.start();
+startKafkaStreamsAndWaitForRunningState(kafkaStreams);
 
 waitUntilAtLeastNumRecordProcessed(outputTopic, 5);
 
@@ -629,7 +642,7 @@ public class QueryableStateIntegrationTest {
 t3.toStream().to(outputTopic, Produced.with(Serdes.String(), 
Serdes.Long()));
 
 kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
-kafkaStreams.start();
+startKafkaStreamsAndWaitForRunningState(kafkaStreams);
 
 waitUntilAtLeastNumRecordProcessed(outputTopic, 1);
 
@@ -690,7 +703,7 @@ public class QueryableStateIntegrationTest {
 .windowedBy(TimeWindows.of(ofMillis(WINDOW_SIZE)))
 .count(Materialized.as(windowStoreName));
 kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
-kafkaStreams.start();
+startKafkaStreamsAndWaitForRunningState(kafkaStreams);
 
 waitUntilAtLeastNumRecordProcessed(outputTopic, 1);
 
@@ -716,7 +729,7 @@ public class QueryableStateIntegrationTest {
 final String storeName = "count-by-key";
 stream.groupByKey().count(Materialized.as(storeName)

[kafka] branch trunk updated (176c934 -> 76fcabc)

2019-10-15 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.


from 176c934  KAFKA-8813: Refresh log config if it's updated before 
initialization (#7305)
 add 76fcabc  KAFKA-4422 / KAFKA-8700 / KAFKA-5566: Wait for state to 
transit to RUNNING upon start (#7519)

No new revisions were added by this update.

Summary of changes:
 .../integration/QueryableStateIntegrationTest.java | 33 --
 1 file changed, 24 insertions(+), 9 deletions(-)



[kafka] branch 2.4 updated: KAFKA-8942: Document RocksDB metrics

2019-10-15 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
 new 2667773  KAFKA-8942: Document RocksDB metrics
2667773 is described below

commit 26677731f87116154f74da421d4f440b80599da1
Author: Bruno Cadonna 
AuthorDate: Tue Oct 15 11:10:42 2019 -0700

KAFKA-8942: Document RocksDB metrics

Author: Bruno Cadonna 

Reviewers: Guozhang Wang 

Closes #7490 from cadonna/AK8942-docs-rocksdb_metrics

Minor comments

(cherry picked from commit 9c80a06466dbfade96308ef26c20c6555612191e)
Signed-off-by: Guozhang Wang 
---
 docs/ops.html | 102 ++
 1 file changed, 102 insertions(+)

diff --git a/docs/ops.html b/docs/ops.html
index 05d7de1..d87d16a 100644
--- a/docs/ops.html
+++ b/docs/ops.html
@@ -1979,6 +1979,108 @@ All the following metrics have a recording level of 
debug:
 
  
 
+  RocksDB Metrics
+  All the following metrics have a recording level of debug.
+  The metrics are collected every minute from the RocksDB state stores.
+  If a state store consists of multiple RocksDB instances as it is the case 
for aggregations over time and session windows,
+  each metric reports an aggregation over the RocksDB instances of the state 
store.
+  Note that the store-scope for built-in RocksDB state stores are 
currently the following:
+  
+rocksdb-state (for RocksDB backed key-value store)
+rocksdb-window-state (for RocksDB backed window 
store)
+rocksdb-session-state (for RocksDB backed session 
store)
+  
+
+  
+
+
+  Metric/Attribute name
+  Description
+  Mbean name
+
+
+  bytes-written-rate
+  The average number of bytes written per second to the RocksDB state 
store.
+  
kafka.streams:type=stream-state-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
+
+
+  bytes-written-total
+  The total number of bytes written to the RocksDB state store.
+  
kafka.streams:type=stream-state-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
+
+
+  bytes-read-rate
+  The average number of bytes read per second from the RocksDB state 
store.
+  
kafka.streams:type=stream-state-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
+
+
+  bytes-read-total
+  The total number of bytes read from the RocksDB state store.
+  
kafka.streams:type=stream-state-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
+
+
+  memtable-bytes-flushed-rate
+  The average number of bytes flushed per second from the memtable to 
disk.
+  
kafka.streams:type=stream-state-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
+
+
+  memtable-bytes-flushed-total
+  The total number of bytes flushed from the memtable to disk.
+  
kafka.streams:type=stream-state-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
+
+
+  memtable-hit-ratio
+  The ratio of memtable hits relative to all lookups to the 
memtable.
+  
kafka.streams:type=stream-state-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
+
+
+  block-cache-data-hit-ratio
+  The ratio of block cache hits for data blocks relative to all 
lookups for data blocks to the block cache.
+  
kafka.streams:type=stream-state-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
+
+
+  block-cache-index-hit-ratio
+  The ratio of block cache hits for index blocks relative to all 
lookups for index blocks to the block cache.
+  
kafka.streams:type=stream-state-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
+
+
+  block-cache-filter-hit-ratio
+  The ratio of block cache hits for filter blocks relative to all 
lookups for filter blocks to the block cache.
+  
kafka.streams:type=stream-state-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
+
+
+  write-stall-duration-avg
+  The average duration of write stalls in ms.
+  
kafka.streams:type=stream-state-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
+
+
+  write-stall-duration-total
+  The total duration of write stalls in ms.
+  
kafka.streams:type=stream-state-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
+
+
+  bytes-read-compaction-rate
+  The average number of bytes read per second during compaction.
+  
kafka.streams:type=stream-state-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
+
+
+  bytes-written-compaction-rate
+  The average number of bytes written per second during 
compaction

[kafka] branch trunk updated (2fce203 -> 9c80a06)

2019-10-15 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.


from 2fce203  KAFKA-8671: NullPointerException occurs if topic associated 
with GlobalKTable changes (#7437)
 add 9c80a06  KAFKA-8942: Document RocksDB metrics

No new revisions were added by this update.

Summary of changes:
 docs/ops.html | 102 ++
 1 file changed, 102 insertions(+)



[kafka] branch trunk updated (11a401d -> c9c3adc)

2019-10-14 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.


from 11a401d  delete (#7504)
 add c9c3adc  MINOR: move "Added/Removed sensor" log messages to TRACE 
(#7502)

No new revisions were added by this update.

Summary of changes:
 clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)



[kafka] branch 2.4 updated: KAFKA-9029: Flaky Test CooperativeStickyAssignorTest.testReassignmentWithRand: bump to 4 (#7503)

2019-10-12 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
 new f965e79  KAFKA-9029: Flaky Test 
CooperativeStickyAssignorTest.testReassignmentWithRand: bump to 4 (#7503)
f965e79 is described below

commit f965e79d49b85591a3d5bd7b6d3562b9278da05e
Author: A. Sophie Blee-Goldman 
AuthorDate: Sat Oct 12 11:07:12 2019 -0700

KAFKA-9029: Flaky Test 
CooperativeStickyAssignorTest.testReassignmentWithRand: bump to 4 (#7503)

One of the sticky assignor tests involves a random change in subscriptions 
that the current assignor algorithm struggles to react to and in cooperative 
mode ends up requiring more than one followup rebalance.

Apparently, in rare cases it can also require more than 2. Bumping the 
"allowed subsequent rebalances" to 4 (increase of 2) to allow some breathing 
room and reduce flakiness (technically any number is "correct", but if it turns 
out to ever require more than 4 we should revisit and improve the algorithm 
because that would be excessive (see KAFKA-8767)

    Reviewers: Guozhang Wang 
---
 .../apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java| 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java
index 72a6d89..aed8c09 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java
@@ -68,7 +68,7 @@ public class CooperativeStickyAssignorTest extends 
AbstractStickyAssignorTest {
 assignments.putAll(assignor.assign(partitionsPerTopic, 
subscriptions));
 ++rebalances;
 
-assertTrue(rebalances <= 2);
+assertTrue(rebalances <= 4);
 }
 
 // Check the validity and balance of the final assignment



[kafka] branch trunk updated (2ff8fa0 -> 6fb8bd0)

2019-10-12 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.


from 2ff8fa0  KAFKA-8122: Fix Kafka Streams EOS integration test (#7470)
 add 6fb8bd0  KAFKA-9029: Flaky Test 
CooperativeStickyAssignorTest.testReassignmentWithRand: bump to 4 (#7503)

No new revisions were added by this update.

Summary of changes:
 .../apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java| 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)



[kafka] branch 2.4 updated: HOTFIX: fix checkstyle in Streams system test (#7494)

2019-10-10 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
 new 8e048ba  HOTFIX: fix checkstyle in Streams system test (#7494)
8e048ba is described below

commit 8e048bac4576b0da00d0542a9e848af95c1dea03
Author: A. Sophie Blee-Goldman 
AuthorDate: Thu Oct 10 19:38:14 2019 -0700

HOTFIX: fix checkstyle in Streams system test (#7494)

Reviewers: Guozhang Wang 
---
 .../kafka/streams/tests/StreamsBrokerDownResilienceTest.java | 12 ++--
 1 file changed, 6 insertions(+), 6 deletions(-)

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java
index 4dd4954..b605f46 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java
@@ -104,7 +104,7 @@ public class StreamsBrokerDownResilienceTest {
 
 final KafkaStreams streams = new KafkaStreams(builder.build(), 
streamsProperties);
 
-streams.setUncaughtExceptionHandler( (t,e) -> {
+streams.setUncaughtExceptionHandler((t, e) -> {
 System.err.println("FATAL: An unexpected exception " + e);
 System.err.flush();
 streams.close(Duration.ofSeconds(30));
@@ -114,11 +114,11 @@ public class StreamsBrokerDownResilienceTest {
 System.out.println("Start Kafka Streams");
 streams.start();
 
-Runtime.getRuntime().addShutdownHook(new Thread( () -> {
-streams.close(Duration.ofSeconds(30));
-System.out.println("Complete shutdown of streams resilience 
test app now");
-System.out.flush();
-}
+Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+streams.close(Duration.ofSeconds(30));
+System.out.println("Complete shutdown of streams resilience test 
app now");
+System.out.flush();
+}
 ));
 }
 



[kafka] branch trunk updated (cc6525a -> b80a572)

2019-10-10 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.


from cc6525a  KAFKA-8743: Flaky Test 
Repartition{WithMerge}OptimizingIntegrationTest (#7472)
 add b80a572  HOTFIX: fix checkstyle in Streams system test (#7494)

No new revisions were added by this update.

Summary of changes:
 .../kafka/streams/tests/StreamsBrokerDownResilienceTest.java | 12 ++--
 1 file changed, 6 insertions(+), 6 deletions(-)



[kafka] branch 2.4 updated: KAFKA-8743: Flaky Test Repartition{WithMerge}OptimizingIntegrationTest (#7472)

2019-10-10 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
 new de202a6  KAFKA-8743: Flaky Test 
Repartition{WithMerge}OptimizingIntegrationTest (#7472)
de202a6 is described below

commit de202a669777becd4a79fcdf6c0597e5a12d2370
Author: A. Sophie Blee-Goldman 
AuthorDate: Thu Oct 10 16:23:18 2019 -0700

KAFKA-8743: Flaky Test Repartition{WithMerge}OptimizingIntegrationTest 
(#7472)

All four flavors of the repartition/optimization tests have been reported 
as flaky and failed in one place or another:
* RepartitionOptimizingIntegrationTest.shouldSendCorrectRecords_OPTIMIZED
* 
RepartitionOptimizingIntegrationTest.shouldSendCorrectRecords_NO_OPTIMIZATION
* 
RepartitionWithMergeOptimizingIntegrationTest.shouldSendCorrectRecords_OPTIMIZED
* 
RepartitionWithMergeOptimizingIntegrationTest.shouldSendCorrectRecords_NO_OPTIMIZATION

They're pretty similar so it makes sense to knock them all out at once. 
This PR does three things:

* Switch to in-memory stores wherever possible
* Name all operators and update the Topology accordingly (not really a 
flaky test fix, but had to update the topology names anyway because of the IM 
stores so figured might as well)
* Port to TopologyTestDriver -- this is the "real" fix, should make a big 
difference as these repartition tests required multiple roundtrips with the 
Kafka cluster (while using only the default timeout)

Reviewers: Bill Bejeck , Guozhang Wang 

---
 .../internals/RepartitionOptimizingTest.java}  | 548 +++--
 .../RepartitionWithMergeOptimizingTest.java}   | 328 ++--
 .../tests/StreamsBrokerDownResilienceTest.java |  15 +-
 .../tests/streams/streams_upgrade_test.py  |   2 +-
 4 files changed, 465 insertions(+), 428 deletions(-)

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionOptimizingIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java
similarity index 54%
rename from 
streams/src/test/java/org/apache/kafka/streams/integration/RepartitionOptimizingIntegrationTest.java
rename to 
streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java
index bea32f2..8425ad7 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionOptimizingIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java
@@ -15,40 +15,41 @@
  * limitations under the License.
  */
 
-package org.apache.kafka.streams.integration;
+package org.apache.kafka.streams.processor.internals;
 
-
-import kafka.utils.MockTime;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.common.serialization.LongDeserializer;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.TestOutputTopic;
 import org.apache.kafka.streams.Topology;
-import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
-import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Grouped;
 import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Named;
 import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.kstream.StreamJoined;
 import org.apache.kafka.streams.processor.AbstractProcessor;
-import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.test.StreamsTestUtils;
-import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.Before;
-import org.junit.ClassRule;
 import org.junit.Test;
-import org.junit.experimental.categories.Category;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -57,17 +58,19 @@ import java.util.Local

[kafka] branch trunk updated (fa2a9f0 -> cc6525a)

2019-10-10 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.


from fa2a9f0  MINOR: ListPartitionReassignmentsResponse should not be 
entirely failed when a topic-partition does not exist  (#7486)
 add cc6525a  KAFKA-8743: Flaky Test 
Repartition{WithMerge}OptimizingIntegrationTest (#7472)

No new revisions were added by this update.

Summary of changes:
 .../internals/RepartitionOptimizingTest.java}  | 548 +++--
 .../RepartitionWithMergeOptimizingTest.java}   | 328 ++--
 .../tests/StreamsBrokerDownResilienceTest.java |  15 +-
 .../tests/streams/streams_upgrade_test.py  |   2 +-
 4 files changed, 465 insertions(+), 428 deletions(-)
 rename 
streams/src/test/java/org/apache/kafka/streams/{integration/RepartitionOptimizingIntegrationTest.java
 => processor/internals/RepartitionOptimizingTest.java} (54%)
 rename 
streams/src/test/java/org/apache/kafka/streams/{integration/RepartitionWithMergeOptimizingIntegrationTest.java
 => processor/internals/RepartitionWithMergeOptimizingTest.java} (54%)



[kafka] branch trunk updated (c3177a1 -> f41a5c2)

2019-10-10 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.


from c3177a1  Add wait condition for state RUNNING (#7476)
 add f41a5c2  KAFKA-8729, pt 3: Add broker-side logic to handle the case 
when there are record_errors and error_message (#7167)

No new revisions were added by this update.

Summary of changes:
 .../kafka/common/requests/ProduceRequest.java  |  2 +-
 .../kafka/common/requests/ProduceResponse.java | 99 ++
 .../resources/common/message/ProduceRequest.json   |  2 +-
 .../resources/common/message/ProduceResponse.json  | 12 +--
 .../apache/kafka/common/message/MessageTest.java   | 14 +--
 .../kafka/common/requests/RequestResponseTest.java |  3 +-
 ...ption.scala => RecordValidationException.scala} |  9 +-
 core/src/main/scala/kafka/log/Log.scala| 18 +++-
 core/src/main/scala/kafka/log/LogValidator.scala   | 69 ++-
 .../main/scala/kafka/server/ReplicaManager.scala   | 37 +---
 core/src/test/scala/unit/kafka/log/LogTest.scala   | 10 +--
 .../scala/unit/kafka/log/LogValidatorTest.scala| 58 +++--
 .../unit/kafka/server/ProduceRequestTest.scala | 36 +++-
 13 files changed, 265 insertions(+), 104 deletions(-)
 copy core/src/main/scala/kafka/common/{LogCleaningAbortedException.scala => 
RecordValidationException.scala} (74%)



[kafka] branch 2.4 updated: KAFKA-8729, pt 3: Add broker-side logic to handle the case when there are record_errors and error_message (#7167)

2019-10-10 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
 new 18cfd13  KAFKA-8729, pt 3: Add broker-side logic to handle the case 
when there are record_errors and error_message (#7167)
18cfd13 is described below

commit 18cfd13a9b211cbe272563544904d463c0ed93bb
Author: Tu V. Tran 
AuthorDate: Thu Oct 10 14:44:37 2019 -0700

KAFKA-8729, pt 3: Add broker-side logic to handle the case when there are 
record_errors and error_message (#7167)

All the changes are in ReplicaManager.appendToLocalLog and 
ReplicaManager.appendRecords. Also, replaced 
LogAppendInfo.unknownLogAppendInfoWithLogStartOffset with 
LogAppendInfo.unknownLogAppendInfoWithAdditionalInfo to include those 2 new 
fields.

Reviewers: Guozhang Wang , Jason Gustafson 

---
 .../kafka/common/requests/ProduceRequest.java  |  2 +-
 .../kafka/common/requests/ProduceResponse.java | 99 ++
 .../resources/common/message/ProduceRequest.json   |  2 +-
 .../resources/common/message/ProduceResponse.json  | 12 +--
 .../apache/kafka/common/message/MessageTest.java   | 14 +--
 .../kafka/common/requests/RequestResponseTest.java |  3 +-
 .../kafka/common/RecordValidationException.scala   | 25 ++
 core/src/main/scala/kafka/log/Log.scala| 18 +++-
 core/src/main/scala/kafka/log/LogValidator.scala   | 69 ++-
 .../main/scala/kafka/server/ReplicaManager.scala   | 37 +---
 core/src/test/scala/unit/kafka/log/LogTest.scala   | 10 +--
 .../scala/unit/kafka/log/LogValidatorTest.scala| 58 +++--
 .../unit/kafka/server/ProduceRequestTest.scala | 36 +++-
 13 files changed, 285 insertions(+), 100 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
index 932473c..7b3ae1c 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
@@ -121,7 +121,7 @@ public class ProduceRequest extends AbstractRequest {
 private static final Schema PRODUCE_REQUEST_V7 = PRODUCE_REQUEST_V6;
 
 /**
- * V8 bumped up to add two new fields error_records offset list and 
error_message to {@link 
org.apache.kafka.common.requests.ProduceResponse.PartitionResponse}
+ * V8 bumped up to add two new fields record_errors offset list and 
error_message to {@link 
org.apache.kafka.common.requests.ProduceResponse.PartitionResponse}
  * (See KIP-467)
  */
 private static final Schema PRODUCE_REQUEST_V8 = PRODUCE_REQUEST_V7;
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java 
b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
index a6df880..8bfc629 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
@@ -74,14 +74,14 @@ public class ProduceResponse extends AbstractResponse {
 private static final String BASE_OFFSET_KEY_NAME = "base_offset";
 private static final String LOG_APPEND_TIME_KEY_NAME = "log_append_time";
 private static final String LOG_START_OFFSET_KEY_NAME = "log_start_offset";
-private static final String ERROR_RECORDS_KEY_NAME = "error_records";
-private static final String RELATIVE_OFFSET_KEY_NAME = "relative_offset";
-private static final String RELATIVE_OFFSET_ERROR_MESSAGE_KEY_NAME = 
"relative_offset_error_message";
+private static final String RECORD_ERRORS_KEY_NAME = "record_errors";
+private static final String BATCH_INDEX_KEY_NAME = "batch_index";
+private static final String BATCH_INDEX_ERROR_MESSAGE_KEY_NAME = 
"batch_index_error_message";
 private static final String ERROR_MESSAGE_KEY_NAME = "error_message";
 
 private static final Field.Int64 LOG_START_OFFSET_FIELD = new 
Field.Int64(LOG_START_OFFSET_KEY_NAME,
 "The start offset of the log at the time this produce response was 
created", INVALID_OFFSET);
-private static final Field.NullableStr RELATIVE_OFFSET_ERROR_MESSAGE_FIELD 
= new Field.NullableStr(RELATIVE_OFFSET_ERROR_MESSAGE_KEY_NAME,
+private static final Field.NullableStr BATCH_INDEX_ERROR_MESSAGE_FIELD = 
new Field.NullableStr(BATCH_INDEX_ERROR_MESSAGE_KEY_NAME,
 "The error message of the record that caused the batch to be 
dropped");
 private static final Field.NullableStr ERROR_MESSAGE_FIELD = new 
Field.NullableStr(ERROR_MESSAGE_KEY_NAME,
 "The global error message summarizing the common root cause of the 
records that caused the batch 

[kafka] branch trunk updated (e0824e2 -> fb9b0df)

2019-10-09 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.


from e0824e2  MINOR: Just one put and flush to generation rocksDB File in 
RocksDBStoreTest (#7469)
 add fb9b0df  MINOR: Augment log4j to add generation number in 
performAssign (#7451)

No new revisions were added by this update.

Summary of changes:
 .../consumer/internals/AbstractCoordinator.java| 66 ++
 .../consumer/internals/ConsumerCoordinator.java| 11 ++--
 .../internals/AbstractCoordinatorTest.java |  2 +-
 .../internals/ConsumerCoordinatorTest.java |  6 +-
 .../runtime/distributed/WorkerCoordinator.java |  2 +-
 5 files changed, 52 insertions(+), 35 deletions(-)



[kafka] branch trunk updated (32a3dc8 -> e0824e2)

2019-10-09 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.


from 32a3dc8  KAFKA-8991: Enable scalac optimizer (#7452)
 add e0824e2  MINOR: Just one put and flush to generation rocksDB File in 
RocksDBStoreTest (#7469)

No new revisions were added by this update.

Summary of changes:
 .../kafka/streams/state/internals/RocksDBStoreTest.java| 14 +++---
 1 file changed, 3 insertions(+), 11 deletions(-)



[kafka] branch trunk updated (f9934e7 -> e3c2148)

2019-10-08 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.


from f9934e7  MINOR: remove unused imports in Streams system tests (#7468)
 add e3c2148  KAFKA-8964: Rename tag client-id for thread-level metrics and 
below (#7429)

No new revisions were added by this update.

Summary of changes:
 .../streams/kstream/internals/metrics/Sensors.java | 24 ++--
 .../streams/processor/internals/ProcessorNode.java | 18 +-
 .../streams/processor/internals/StreamTask.java|  3 +-
 .../internals/metrics/StreamsMetricsImpl.java  | 64 --
 .../AbstractRocksDBSegmentedBytesStore.java|  2 +-
 .../state/internals/InMemorySessionStore.java  |  2 +-
 .../state/internals/InMemoryWindowStore.java   |  4 +-
 .../streams/state/internals/metrics/Sensors.java   |  8 +--
 ...KStreamSessionWindowAggregateProcessorTest.java |  8 +--
 .../internals/KStreamWindowAggregateTest.java  |  8 +--
 .../KTableSuppressProcessorMetricsTest.java| 18 +++---
 .../processor/internals/ProcessorNodeTest.java |  4 +-
 .../processor/internals/StreamTaskTest.java|  8 +--
 .../processor/internals/StreamThreadTest.java  | 16 +++---
 .../internals/metrics/StreamsMetricsImplTest.java  | 46 ++--
 .../AbstractRocksDBSegmentedBytesStoreTest.java|  8 +--
 .../state/internals/MeteredKeyValueStoreTest.java  |  8 +--
 .../state/internals/MeteredSessionStoreTest.java   |  6 +-
 .../MeteredTimestampedKeyValueStoreTest.java   |  8 +--
 .../state/internals/MeteredWindowStoreTest.java|  4 +-
 .../state/internals/SessionBytesStoreTest.java |  8 +--
 .../state/internals/WindowBytesStoreTest.java  |  8 +--
 .../apache/kafka/streams/TopologyTestDriver.java   |  4 +-
 23 files changed, 146 insertions(+), 141 deletions(-)



[kafka] branch trunk updated (c49775b -> f9934e7)

2019-10-08 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.


from c49775b  KAFKA-7190; Retain producer state until 
transactionalIdExpiration time passes (#7388)
 add f9934e7  MINOR: remove unused imports in Streams system tests (#7468)

No new revisions were added by this update.

Summary of changes:
 tests/kafkatest/tests/streams/streams_broker_bounce_test.py | 1 -
 tests/kafkatest/tests/streams/streams_eos_test.py   | 2 --
 2 files changed, 3 deletions(-)



[kafka] branch 2.3 updated: KAFKA-8262, KAFKA-8263: Fix flaky test `MetricsIntegrationTest` (#6922) (#7457)

2019-10-07 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
 new 97be73e  KAFKA-8262, KAFKA-8263: Fix flaky test 
`MetricsIntegrationTest` (#6922) (#7457)
97be73e is described below

commit 97be73ec9822cbbaeda41b958821982359bcba3e
Author: Bruno Cadonna 
AuthorDate: Mon Oct 7 21:59:07 2019 +0200

KAFKA-8262, KAFKA-8263: Fix flaky test `MetricsIntegrationTest` (#6922) 
(#7457)

- Timeout occurred due to initial slow rebalancing.
- Added code to wait until `KafkaStreams` instance is in state RUNNING to 
check registration of metrics and in state NOT_RUNNING to check deregistration 
of metrics.
- I removed all other wait conditions, because they are not needed if 
`KafkaStreams` instance is in the right state.

Reviewers: Guozhang Wang 
---
 .../integration/MetricsIntegrationTest.java| 456 +
 1 file changed, 205 insertions(+), 251 deletions(-)

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
index 3f6c806..c9e0b8b 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.State;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
@@ -50,6 +51,9 @@ import java.util.List;
 import java.util.Properties;
 import java.util.stream.Collectors;
 
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
 @SuppressWarnings("unchecked")
 @Category({IntegrationTest.class})
 public class MetricsIntegrationTest {
@@ -187,27 +191,35 @@ public class MetricsIntegrationTest {
 CLUSTER.deleteTopics(STREAM_INPUT, STREAM_OUTPUT_1, STREAM_OUTPUT_2, 
STREAM_OUTPUT_3, STREAM_OUTPUT_4);
 }
 
-private void startApplication() {
+private void startApplication() throws InterruptedException {
 kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
 kafkaStreams.start();
+final long timeout = 6;
+TestUtils.waitForCondition(
+() -> kafkaStreams.state() == State.RUNNING,
+timeout,
+() -> "Kafka Streams application did not reach state RUNNING in " 
+ timeout + " ms");
 }
 
 private void closeApplication() throws Exception {
 kafkaStreams.close();
 kafkaStreams.cleanUp();
 IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
+final long timeout = 6;
+TestUtils.waitForCondition(
+() -> kafkaStreams.state() == State.NOT_RUNNING,
+timeout,
+() -> "Kafka Streams application did not reach state NOT_RUNNING 
in " + timeout + " ms");
 }
 
-private void checkMetricDeregistration() throws InterruptedException {
-TestUtils.waitForCondition(() -> {
-final List listMetricAfterClosingApp = new 
ArrayList(kafkaStreams.metrics().values()).stream().filter(m -> 
m.metricName().group().contains(STREAM_STRING)).collect(Collectors.toList());
-return listMetricAfterClosingApp.size() == 0;
-}, 1, "de-registration of metrics");
+private void checkMetricDeregistration() {
+final List listMetricAfterClosingApp = new 
ArrayList(kafkaStreams.metrics().values()).stream()
+.filter(m -> 
m.metricName().group().contains(STREAM_STRING)).collect(Collectors.toList());
+assertThat(listMetricAfterClosingApp.size(), is(0));
 }
 
 @Test
 public void testStreamMetric() throws Exception {
-final StringBuilder errorMessage = new StringBuilder();
 stream = builder.stream(STREAM_INPUT, Consumed.with(Serdes.Integer(), 
Serdes.String()));
 stream.to(STREAM_OUTPUT_1, Produced.with(Serdes.Integer(), 
Serdes.String()));
 builder.table(STREAM_OUTPUT_1, 
Materialized.as(Stores.inMemoryKeyValueStore(MY_STORE_IN_MEMORY)).withCachingEnabled())
@@ -222,22 +234,13 @@ public class MetricsIntegrationTest {
 
 startApplication();
 
-// metric level : Thread
-TestUtils.waitForCondition(() -> testThreadMetric(errorMessage), 
1, () -> "testThreadMetric -> " + errorMessage.toString());
-
-// metric level : Task
-

[kafka] branch 2.4 updated: HOTFIX: Hide built-in metrics version (#7459)

2019-10-07 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
 new 6859820  HOTFIX: Hide built-in metrics version (#7459)
6859820 is described below

commit 6859820755332eb90f386a5b76a0c43e035637dd
Author: Bruno Cadonna 
AuthorDate: Mon Oct 7 19:47:50 2019 +0200

HOTFIX: Hide built-in metrics version (#7459)

Since currently not all refactorings on streams metrics proposed in KIP-444 
has yet been implemented, this commit hides the built-in metrics version config 
from the user. Thus, the user cannot switch to the refactored streams metrics.

Reviewers: Guozhang Wang 
---
 .../org/apache/kafka/streams/KafkaStreams.java |  3 +-
 .../org/apache/kafka/streams/StreamsConfig.java| 23 ---
 .../internals/metrics/StreamsMetricsImpl.java  | 16 --
 .../apache/kafka/streams/StreamsConfigTest.java| 34 --
 .../integration/MetricsIntegrationTest.java| 21 +
 .../internals/GlobalStreamThreadTest.java  |  4 +--
 .../processor/internals/MockStreamsMetrics.java|  3 +-
 .../processor/internals/StandbyTaskTest.java   |  3 +-
 .../processor/internals/StreamThreadTest.java  | 18 +---
 .../internals/metrics/StreamsMetricsImplTest.java  | 16 +-
 .../internals/GlobalStateStoreProviderTest.java|  3 +-
 .../MeteredTimestampedWindowStoreTest.java |  2 +-
 .../state/internals/MeteredWindowStoreTest.java|  2 +-
 .../kafka/test/InternalMockProcessorContext.java   | 10 +++
 .../apache/kafka/streams/TopologyTestDriver.java   |  4 +--
 .../streams/processor/MockProcessorContext.java|  3 +-
 16 files changed, 47 insertions(+), 118 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java 
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 5a00fe3..146eefd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -675,8 +675,7 @@ public class KafkaStreams implements AutoCloseable {
 Collections.singletonMap(StreamsConfig.CLIENT_ID_CONFIG, 
clientId));
 reporters.add(new JmxReporter(JMX_PREFIX));
 metrics = new Metrics(metricConfig, reporters, time);
-streamsMetrics =
-new StreamsMetricsImpl(metrics, clientId, 
config.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG));
+streamsMetrics = new StreamsMetricsImpl(metrics, clientId, 
StreamsMetricsImpl.METRICS_0100_TO_23);
 
streamsMetrics.setRocksDBMetricsRecordingTrigger(rocksDBMetricsRecordingTrigger);
 ClientMetrics.addVersionMetric(streamsMetrics);
 ClientMetrics.addCommitIdMetric(streamsMetrics);
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java 
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index eb6faff..0a4bfd1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -287,16 +287,6 @@ public class StreamsConfig extends AbstractConfig {
 @SuppressWarnings("WeakerAccess")
 public static final String EXACTLY_ONCE = "exactly_once";
 
-/**
- * Config value for parameter {@link #BUILT_IN_METRICS_VERSION_CONFIG 
"built.in.metrics.version"} for built-in metrics from version 0.10.0. to 2.3
- */
-public static final String METRICS_0100_TO_23 = "0.10.0-2.3";
-
-/**
- * Config value for parameter {@link #BUILT_IN_METRICS_VERSION_CONFIG 
"built.in.metrics.version"} for the latest built-in metrics version.
- */
-public static final String METRICS_LATEST = "latest";
-
 /** {@code application.id} */
 @SuppressWarnings("WeakerAccess")
 public static final String APPLICATION_ID_CONFIG = "application.id";
@@ -316,10 +306,6 @@ public class StreamsConfig extends AbstractConfig {
 public static final String BUFFERED_RECORDS_PER_PARTITION_CONFIG = 
"buffered.records.per.partition";
 private static final String BUFFERED_RECORDS_PER_PARTITION_DOC = "Maximum 
number of records to buffer per partition.";
 
-/** {@code built.in.metrics.version} */
-public static final String BUILT_IN_METRICS_VERSION_CONFIG = 
"built.in.metrics.version";
-private static final String BUILT_IN_METRICS_VERSION_DOC = "Version of the 
built-in metrics to use.";
-
 /** {@code cache.max.bytes.buffering} */
 @SuppressWarnings("WeakerAccess")
 public static final String CACHE_MAX_BYTES_BUFFERING_CONFIG = 
"cache.max.bytes.buffering";
@@ -625,15 +611,6 @@ public class StreamsConfig extends AbstractConfig {
 

[kafka] branch 2.4 updated: KAFKA-8179: Part 7, cooperative rebalancing in Streams (#7386)

2019-10-07 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
 new 133c33f  KAFKA-8179: Part 7, cooperative rebalancing in Streams (#7386)
133c33f is described below

commit 133c33fde1c4d3b79196b72522f83a75cb6b0e65
Author: A. Sophie Blee-Goldman 
AuthorDate: Mon Oct 7 09:27:09 2019 -0700

KAFKA-8179: Part 7, cooperative rebalancing in Streams (#7386)

Key improvements with this PR:

* tasks will remain available for IQ during a rebalance (but not during 
restore)
* continue restoring and processing standby tasks during a rebalance
* continue processing active tasks during rebalance until the RecordQueue 
is empty*
* only revoked tasks must suspended/closed
* StreamsPartitionAssignor tries to return tasks to their previous 
consumers within a client
* but do not try to commit, for now (pending KAFKA-7312)


Reviewers: John Roesler , Boyang Chen 
, Guozhang Wang 
---
 checkstyle/suppressions.xml|   3 +
 .../consumer/internals/ConsumerCoordinator.java|  20 +-
 .../consumer/internals/SubscriptionState.java  |  15 +-
 .../clients/consumer/internals/FetcherTest.java|  31 +-
 .../processor/internals/AssignedStreamsTasks.java  |  15 +-
 .../streams/processor/internals/StandbyTask.java   |  19 -
 .../processor/internals/StoreChangelogReader.java  |  16 +-
 .../streams/processor/internals/StreamTask.java|   8 +-
 .../streams/processor/internals/StreamThread.java  |  27 +-
 .../internals/StreamsPartitionAssignor.java| 612 +++--
 .../kafka/streams/processor/internals/Task.java|   8 +-
 .../streams/processor/internals/TaskManager.java   |  34 +-
 .../assignment/AssignorConfiguration.java  |  12 +-
 .../internals/assignment/ClientState.java  |  56 +-
 .../internals/assignment/StickyTaskAssignor.java   |   8 +-
 .../processor/internals/AbstractTaskTest.java  |   6 -
 .../processor/internals/StandbyTaskTest.java   |  23 -
 .../internals/StreamsPartitionAssignorTest.java| 481 ++--
 .../processor/internals/TaskManagerTest.java   |  28 +-
 .../internals/assignment/ClientStateTest.java  |   8 +-
 .../assignment/StickyTaskAssignorTest.java |  34 +-
 .../kafka/streams/tests/SmokeTestDriver.java   |   2 +-
 .../kafka/streams/tests/StreamsUpgradeTest.java|  26 +-
 tests/kafkatest/tests/streams/streams_eos_test.py  |   2 +-
 24 files changed, 1058 insertions(+), 436 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 8927849..2f21309 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -197,6 +197,9 @@
 
 
+
+
 
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index b5b5ce2..6b39acb 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -355,26 +355,29 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
 Set addedPartitions = new 
HashSet<>(assignedPartitions);
 addedPartitions.removeAll(ownedPartitions);
 
-// Invoke user's revocation callback before changing assignment or 
updating state
 if (protocol == RebalanceProtocol.COOPERATIVE) {
 Set revokedPartitions = new 
HashSet<>(ownedPartitions);
 revokedPartitions.removeAll(assignedPartitions);
 
-log.info("Updating with newly assigned partitions: {}, compare 
with already owned partitions: {}, " +
-"newly added partitions: {}, revoking partitions: {}",
+log.info("Updating assignment with\n" +
+"now assigned partitions: {}\n" +
+"compare with previously owned partitions: {}\n" +
+"newly added partitions: {}\n" +
+"revoked partitions: {}\n",
 Utils.join(assignedPartitions, ", "),
 Utils.join(ownedPartitions, ", "),
 Utils.join(addedPartitions, ", "),
-Utils.join(revokedPartitions, ", "));
-
+Utils.join(revokedPartitions, ", ")
+);
 
 if (!revokedPartitions.isEmpty()) {
-// revoke partitions that was previously owned but no longer 
assigned;
-// note that we should only change the assignment AFTER we've 
triggered
-// the revoke callback
+// revoke partitio

[kafka] branch trunk updated (c06e45a -> d88f104)

2019-10-07 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.


from c06e45a  KAFKA-8985; Add flexible version support to inter-broker APIs 
(#7453)
 add d88f104  KAFKA-8179: Part 7, cooperative rebalancing in Streams (#7386)

No new revisions were added by this update.

Summary of changes:
 checkstyle/suppressions.xml|   3 +
 .../consumer/internals/ConsumerCoordinator.java|  20 +-
 .../consumer/internals/SubscriptionState.java  |  15 +-
 .../clients/consumer/internals/FetcherTest.java|  31 +-
 .../processor/internals/AssignedStreamsTasks.java  |  15 +-
 .../streams/processor/internals/StandbyTask.java   |  19 -
 .../processor/internals/StoreChangelogReader.java  |  16 +-
 .../streams/processor/internals/StreamTask.java|   8 +-
 .../streams/processor/internals/StreamThread.java  |  27 +-
 .../internals/StreamsPartitionAssignor.java| 612 +++--
 .../kafka/streams/processor/internals/Task.java|   8 +-
 .../streams/processor/internals/TaskManager.java   |  34 +-
 .../assignment/AssignorConfiguration.java  |  12 +-
 .../internals/assignment/ClientState.java  |  56 +-
 .../internals/assignment/StickyTaskAssignor.java   |   8 +-
 .../processor/internals/AbstractTaskTest.java  |   6 -
 .../processor/internals/StandbyTaskTest.java   |  23 -
 .../internals/StreamsPartitionAssignorTest.java| 481 ++--
 .../processor/internals/TaskManagerTest.java   |  28 +-
 .../internals/assignment/ClientStateTest.java  |   8 +-
 .../assignment/StickyTaskAssignorTest.java |  34 +-
 .../kafka/streams/tests/SmokeTestDriver.java   |   2 +-
 .../kafka/streams/tests/StreamsUpgradeTest.java|  26 +-
 tests/kafkatest/tests/streams/streams_eos_test.py  |   2 +-
 24 files changed, 1058 insertions(+), 436 deletions(-)



[kafka] branch trunk updated (3b05dc6 -> 52007e8)

2019-10-04 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.


from 3b05dc6  MINOR: just remove leader on trunk like we did on 2.3 (#7447)
 add 52007e8  KAFKA-8934: Introduce instance-level metrics for streams 
applications (#7416)

No new revisions were added by this update.

Summary of changes:
 .../org/apache/kafka/streams/KafkaStreams.java |  22 +-
 .../streams/internals/metrics/ClientMetrics.java   | 116 +
 .../kstream/internals/KStreamAggregate.java|   2 +-
 .../kstream/internals/KStreamKStreamJoin.java  |   2 +-
 .../internals/KStreamKTableJoinProcessor.java  |   2 +-
 .../streams/kstream/internals/KStreamReduce.java   |   2 +-
 .../internals/KStreamSessionWindowAggregate.java   |   2 +-
 .../kstream/internals/KStreamWindowAggregate.java  |   2 +-
 .../kstream/internals/KTableKTableInnerJoin.java   |   2 +-
 .../kstream/internals/KTableKTableLeftJoin.java|   2 +-
 .../kstream/internals/KTableKTableOuterJoin.java   |   2 +-
 .../kstream/internals/KTableKTableRightJoin.java   |   2 +-
 .../streams/kstream/internals/KTableSource.java|   2 +-
 .../ForeignJoinSubscriptionProcessorSupplier.java  |   3 +-
 .../SubscriptionStoreReceiveProcessorSupplier.java |   2 +-
 .../streams/kstream/internals/metrics/Sensors.java |  26 +-
 .../processor/internals/GlobalStateUpdateTask.java |   2 +-
 .../processor/internals/GlobalStreamThread.java|  20 +-
 .../streams/processor/internals/ProcessorNode.java |  48 +++-
 .../streams/processor/internals/RecordQueue.java   |   3 +-
 .../streams/processor/internals/StandbyTask.java   |   3 +-
 .../streams/processor/internals/StreamTask.java|  33 ++-
 .../streams/processor/internals/StreamThread.java  |  64 ++---
 .../internals/metrics/StreamsMetricsImpl.java  | 273 +++--
 .../processor/internals/metrics/ThreadMetrics.java | 170 +++--
 .../AbstractRocksDBSegmentedBytesStore.java|   4 +-
 .../state/internals/InMemorySessionStore.java  |   4 +-
 .../state/internals/InMemoryWindowStore.java   |   4 +-
 .../streams/state/internals/KeyValueSegments.java  |   2 +-
 .../state/internals/MeteredKeyValueStore.java  | 119 -
 .../state/internals/MeteredSessionStore.java   |  70 +-
 .../state/internals/MeteredWindowStore.java|  58 -
 .../kafka/streams/state/internals/NamedCache.java  |   9 +-
 .../streams/state/internals/RocksDBStore.java  |   2 +-
 .../state/internals/TimestampedSegments.java   |   2 +-
 .../state/internals/metrics/NamedCacheMetrics.java |   8 +-
 .../state/internals/metrics/RocksDBMetrics.java| 159 +---
 .../internals/metrics/RocksDBMetricsRecorder.java  |   9 +-
 .../streams/state/internals/metrics/Sensors.java   |  18 +-
 .../org/apache/kafka/streams/KafkaStreamsTest.java |  36 ++-
 .../integration/MetricsIntegrationTest.java|  57 -
 .../internals/metrics/ClientMetricsTest.java   | 137 +++
 ...KStreamSessionWindowAggregateProcessorTest.java |  11 +-
 .../internals/KStreamWindowAggregateTest.java  |   9 +-
 .../KTableSuppressProcessorMetricsTest.java|  65 ++---
 .../internals/GlobalStreamThreadTest.java  |  11 +-
 .../processor/internals/ProcessorNodeTest.java |   7 +-
 .../processor/internals/StandbyTaskTest.java   |   3 +-
 .../processor/internals/StreamTaskTest.java|  20 +-
 .../processor/internals/StreamThreadTest.java  | 190 +++---
 .../internals/metrics/StreamsMetricsImplTest.java  | 237 +++---
 .../internals/metrics/ThreadMetricsTest.java   |  49 ++--
 .../AbstractRocksDBSegmentedBytesStoreTest.java|   5 +-
 .../state/internals/KeyValueSegmentTest.java   |   3 +-
 .../state/internals/MeteredKeyValueStoreTest.java  |   6 +-
 .../state/internals/MeteredSessionStoreTest.java   |   7 +-
 .../MeteredTimestampedKeyValueStoreTest.java   |   7 +-
 .../state/internals/MeteredWindowStoreTest.java|   5 +-
 .../state/internals/SegmentIteratorTest.java   |   3 +-
 .../state/internals/SessionBytesStoreTest.java |   5 +-
 .../state/internals/TimestampedSegmentTest.java|   3 +-
 .../state/internals/WindowBytesStoreTest.java  |   5 +-
 .../internals/metrics/NamedCacheMetricsTest.java   |  16 +-
 .../metrics/RocksDBMetricsRecorderTest.java|   5 +-
 .../internals/metrics/RocksDBMetricsTest.java  |  21 +-
 .../apache/kafka/streams/TopologyTestDriver.java   |  11 +-
 .../streams/processor/MockProcessorContext.java|   6 +-
 67 files changed, 1625 insertions(+), 590 deletions(-)
 create mode 100644 
streams/src/main/java/org/apache/kafka/streams/internals/metrics/ClientMetrics.java
 create mode 100644 
streams/src/test/java/org/apache/kafka/streams/internals/metrics/ClientMetricsTest.java



[kafka] branch trunk updated (586c587 -> 3b05dc6)

2019-10-04 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.


from 586c587  KAFKA-8974: Trim whitespaces in topic names in sink connector 
configs (#7442)
 add 3b05dc6  MINOR: just remove leader on trunk like we did on 2.3 (#7447)

No new revisions were added by this update.

Summary of changes:
 .../tests/streams/streams_upgrade_test.py  | 44 ++
 1 file changed, 3 insertions(+), 41 deletions(-)



[kafka] branch 2.1 updated: KAFKA-8649: send latest commonly supported version in assignment (#7427)

2019-10-04 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
 new 8784899  KAFKA-8649: send latest commonly supported version in 
assignment (#7427)
8784899 is described below

commit 8784899168cf3b79980384c72c7273b6679f9a22
Author: A. Sophie Blee-Goldman 
AuthorDate: Fri Oct 4 14:17:47 2019 -0700

KAFKA-8649: send latest commonly supported version in assignment (#7427)

PR 7423 but targeted at 2.1.

Reviewers: Guozhang Wang 
---
 .gitignore |   2 +
 .../streams/processor/internals/StreamThread.java  |   7 +-
 .../internals/StreamsPartitionAssignor.java| 275 +
 .../streams/processor/internals/TaskManager.java   |  11 +-
 .../internals/assignment/AssignmentInfo.java   |  70 +++---
 .../internals/assignment/ClientState.java  |  37 ++-
 .../internals/StreamsPartitionAssignorTest.java|   4 +-
 .../processor/internals/TaskManagerTest.java   |   1 +
 .../internals/assignment/ClientStateTest.java  |   4 +-
 .../assignment/StickyTaskAssignorTest.java |  34 +--
 .../kafka/streams/tests/StreamsUpgradeTest.java|   6 +
 .../tests/streams/streams_upgrade_test.py  | 121 -
 12 files changed, 323 insertions(+), 249 deletions(-)

diff --git a/.gitignore b/.gitignore
index fe191ee..0c6854d 100644
--- a/.gitignore
+++ b/.gitignore
@@ -52,3 +52,5 @@ docs/generated/
 kafkatest.egg-info/
 systest/
 *.swp
+clients/src/generated
+clients/src/generated-test
\ No newline at end of file
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 6c99797..8f976d5 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -280,9 +280,10 @@ public class StreamThread extends Thread {
 );
 } else if (streamThread.assignmentErrorCode.get() != 
StreamsPartitionAssignor.Error.NONE.code()) {
 log.debug(
-"Encountered assignment error during partition 
assignment: {}. Skipping task initialization",
-streamThread.assignmentErrorCode
-);
+"Encountered assignment error during partition 
assignment: {}. Skipping task initialization and "
++ "pausing any partitions we may have been 
assigned.",
+streamThread.assignmentErrorCode);
+taskManager.pausePartitions();
 } else {
 log.debug("Creating tasks based on assignment.");
 taskManager.createTasks(assignment);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 493f56b..f99866e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -159,8 +159,8 @@ public class StreamsPartitionAssignor implements 
PartitionAssignor, Configurable
 void addConsumer(final String consumerMemberId,
  final SubscriptionInfo info) {
 consumers.add(consumerMemberId);
-state.addPreviousActiveTasks(info.prevTasks());
-state.addPreviousStandbyTasks(info.standbyTasks());
+state.addPreviousActiveTasks(consumerMemberId, info.prevTasks());
+state.addPreviousStandbyTasks(consumerMemberId, 
info.standbyTasks());
 state.incrementCapacity();
 }
 
@@ -396,6 +396,7 @@ public class StreamsPartitionAssignor implements 
PartitionAssignor, Configurable
 final Set futureConsumers = new HashSet<>();
 
 int minReceivedMetadataVersion = 
SubscriptionInfo.LATEST_SUPPORTED_VERSION;
+int minSupportedMetadataVersion = 
SubscriptionInfo.LATEST_SUPPORTED_VERSION;
 
 supportedVersions.clear();
 int futureMetadataVersion = UNKNOWN;
@@ -415,6 +416,11 @@ public class StreamsPartitionAssignor implements 
PartitionAssignor, Configurable
 minReceivedMetadataVersion = usedVersion;
 }
 
+final int latestSupportedVersion = info.latestSupportedVersion();
+if (latestSupportedVersion < minSupportedMetadataVersion) {
+minSupportedMetadataVersion = latestSupportedVersion;
+}
+
 // create the new client 

[kafka] branch trunk updated (791d0d6 -> 11ab6e7)

2019-10-02 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.


from 791d0d6  KAFKA-8804: Secure internal Connect REST endpoints (#7310)
 add 11ab6e7  HOTFIX: remove unsued StreamsConfig from 
StreamsPartitionAssignor

No new revisions were added by this update.

Summary of changes:
 .../kafka/streams/processor/internals/StreamsPartitionAssignor.java  | 1 -
 1 file changed, 1 deletion(-)



[kafka] branch 2.2 updated: KAFKA-8649: send latest commonly supported version in assignment (#7426)

2019-10-02 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.2 by this push:
 new d3bf3cd  KAFKA-8649: send latest commonly supported version in 
assignment (#7426)
d3bf3cd is described below

commit d3bf3cd3d16945c17d45cf3b2bc4ef0d699cf418
Author: A. Sophie Blee-Goldman 
AuthorDate: Wed Oct 2 16:01:54 2019 -0700

KAFKA-8649: send latest commonly supported version in assignment (#7426)

PR 7423 but targeted at 2.2.

Reviewers: Guozhang Wang 
---
 .gitignore |   1 +
 .../streams/processor/internals/StreamThread.java  |   7 +-
 .../internals/StreamsPartitionAssignor.java| 275 +
 .../streams/processor/internals/TaskManager.java   |  11 +-
 .../internals/assignment/AssignmentInfo.java   |  70 +++---
 .../internals/assignment/ClientState.java  |  36 ++-
 .../internals/StreamsPartitionAssignorTest.java|   4 +-
 .../processor/internals/TaskManagerTest.java   |   1 +
 .../internals/assignment/ClientStateTest.java  |   4 +-
 .../assignment/StickyTaskAssignorTest.java |  34 +--
 .../kafka/streams/tests/StreamsUpgradeTest.java|   6 +
 .../tests/streams/streams_upgrade_test.py  |  28 +--
 12 files changed, 285 insertions(+), 192 deletions(-)

diff --git a/.gitignore b/.gitignore
index a31643f..0c6854d 100644
--- a/.gitignore
+++ b/.gitignore
@@ -53,3 +53,4 @@ kafkatest.egg-info/
 systest/
 *.swp
 clients/src/generated
+clients/src/generated-test
\ No newline at end of file
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 3083ec7..4c995e9 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -286,9 +286,10 @@ public class StreamThread extends Thread {
 );
 } else if (streamThread.assignmentErrorCode.get() != 
StreamsPartitionAssignor.Error.NONE.code()) {
 log.debug(
-"Encountered assignment error during partition 
assignment: {}. Skipping task initialization",
-streamThread.assignmentErrorCode
-);
+"Encountered assignment error during partition 
assignment: {}. Skipping task initialization and "
++ "pausing any partitions we may have been 
assigned.",
+streamThread.assignmentErrorCode);
+taskManager.pausePartitions();
 } else {
 log.debug("Creating tasks based on assignment.");
 taskManager.createTasks(assignment);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 89e95b6..3dd0836 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -159,8 +159,8 @@ public class StreamsPartitionAssignor implements 
PartitionAssignor, Configurable
 void addConsumer(final String consumerMemberId,
  final SubscriptionInfo info) {
 consumers.add(consumerMemberId);
-state.addPreviousActiveTasks(info.prevTasks());
-state.addPreviousStandbyTasks(info.standbyTasks());
+state.addPreviousActiveTasks(consumerMemberId, info.prevTasks());
+state.addPreviousStandbyTasks(consumerMemberId, 
info.standbyTasks());
 state.incrementCapacity();
 }
 
@@ -397,6 +397,7 @@ public class StreamsPartitionAssignor implements 
PartitionAssignor, Configurable
 final Set futureConsumers = new HashSet<>();
 
 int minReceivedMetadataVersion = 
SubscriptionInfo.LATEST_SUPPORTED_VERSION;
+int minSupportedMetadataVersion = 
SubscriptionInfo.LATEST_SUPPORTED_VERSION;
 
 supportedVersions.clear();
 int futureMetadataVersion = UNKNOWN;
@@ -416,6 +417,11 @@ public class StreamsPartitionAssignor implements 
PartitionAssignor, Configurable
 minReceivedMetadataVersion = usedVersion;
 }
 
+final int latestSupportedVersion = info.latestSupportedVersion();
+if (latestSupportedVersion < minSupportedMetadataVersion) {
+minSupportedMetadataVersion = latestSupportedVersion;
+}
+
 // create the new client metadata if nec

[kafka] branch trunk updated (8da6993 -> c7efc36)

2019-10-02 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.


from 8da6993  KAFKA-8649: Send latest commonly supported version in 
assignment (#7423)
 add c7efc36  HOTFIX: don't throw if upgrading from very old versions 
(#7436)

No new revisions were added by this update.

Summary of changes:
 .../internals/StreamsPartitionAssignor.java| 69 +++---
 1 file changed, 35 insertions(+), 34 deletions(-)



[kafka] branch trunk updated (dc7dd5f -> 8da6993)

2019-10-02 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.


from dc7dd5f  MINOR: Do not falsely log that partitions are being 
reassigned on controller startup (#7431)
 add 8da6993  KAFKA-8649: Send latest commonly supported version in 
assignment (#7423)

No new revisions were added by this update.

Summary of changes:
 .../internals/StreamsPartitionAssignor.java| 216 +
 .../internals/StreamsRebalanceListener.java|   4 +-
 .../streams/processor/internals/TaskManager.java   |   8 +-
 .../internals/assignment/AssignmentInfo.java   |  60 +++---
 .../internals/assignment/ClientState.java  |  36 +++-
 .../internals/StreamsPartitionAssignorTest.java|   2 +-
 .../processor/internals/TaskManagerTest.java   |   1 +
 .../internals/assignment/ClientStateTest.java  |   4 +-
 .../assignment/StickyTaskAssignorTest.java |  34 ++--
 .../kafka/streams/tests/StreamsUpgradeTest.java|   7 +
 .../tests/streams/streams_upgrade_test.py  |  27 +--
 11 files changed, 241 insertions(+), 158 deletions(-)



[kafka] branch trunk updated (9fbb0de -> f6f24c4)

2019-09-30 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.


from 9fbb0de  KAFKA-8927: Deprecate PartitionGrouper interface (#7376)
 add f6f24c4  KAFKA-8729, pt 2: Add error_records and error_message to 
PartitionResponse (#7150)

No new revisions were added by this update.

Summary of changes:
 .../kafka/clients/consumer/internals/Fetcher.java  |   6 +-
 .../{record => }/InvalidRecordException.java   |   6 +-
 .../org/apache/kafka/common/protocol/Errors.java   |   4 +-
 .../common/record/AbstractLegacyRecordBatch.java   |   1 +
 .../kafka/common/record/ControlRecordType.java |   1 +
 .../apache/kafka/common/record/DefaultRecord.java  |   1 +
 .../kafka/common/record/DefaultRecordBatch.java|   6 +-
 .../kafka/common/record/EndTransactionMarker.java  |   1 +
 .../apache/kafka/common/record/LegacyRecord.java   |   5 +-
 .../kafka/common/requests/ProduceRequest.java  |  12 ++-
 .../kafka/common/requests/ProduceResponse.java | 106 +
 .../resources/common/message/ProduceRequest.json   |   4 +-
 .../resources/common/message/ProduceResponse.json  |  16 +++-
 .../apache/kafka/common/message/MessageTest.java   |  72 ++
 .../record/AbstractLegacyRecordBatchTest.java  |   1 +
 .../common/record/DefaultRecordBatchTest.java  |   6 +-
 .../kafka/common/record/DefaultRecordTest.java |   1 +
 .../common/record/EndTransactionMarkerTest.java|   1 +
 .../kafka/common/record/LegacyRecordTest.java  |   3 +-
 .../common/record/SimpleLegacyRecordTest.java  |   6 +-
 .../kafka/common/requests/ProduceRequestTest.java  |   2 +-
 .../kafka/common/requests/RequestResponseTest.java |   8 ++
 .../apache/kafka/connect/util/ConnectUtils.java|   2 +-
 .../connect/runtime/WorkerSourceTaskTest.java  |   2 +-
 core/src/main/scala/kafka/log/Log.scala|   4 +-
 core/src/main/scala/kafka/log/LogSegment.scala |   7 +-
 core/src/main/scala/kafka/log/LogValidator.scala   |   7 +-
 .../scala/kafka/server/AbstractFetcherThread.scala |   4 +-
 core/src/test/scala/unit/kafka/log/LogTest.scala   |   2 +-
 .../scala/unit/kafka/log/LogValidatorTest.scala|   1 +
 .../unit/kafka/server/ProduceRequestTest.scala |   1 +
 31 files changed, 248 insertions(+), 51 deletions(-)
 rename clients/src/main/java/org/apache/kafka/common/{record => 
}/InvalidRecordException.java (85%)



[kafka] branch trunk updated (ef2fbfd -> 9fbb0de)

2019-09-30 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.


from ef2fbfd  KAFKA-8609: Add rebalance-latency-total (#7401)
 add 9fbb0de  KAFKA-8927: Deprecate PartitionGrouper interface (#7376)

No new revisions were added by this update.

Summary of changes:
 .../org/apache/kafka/streams/StreamsConfig.java| 22 +++---
 .../streams/processor/DefaultPartitionGrouper.java |  3 +++
 .../kafka/streams/processor/PartitionGrouper.java  |  3 +++
 .../internals/StreamsPartitionAssignor.java|  6 +++---
 .../assignment/AssignorConfiguration.java  | 10 ++
 .../apache/kafka/streams/StreamsConfigTest.java| 22 --
 .../processor/DefaultPartitionGrouperTest.java |  1 +
 .../internals/SingleGroupPartitionGrouperStub.java | 11 +--
 .../internals/StreamsPartitionAssignorTest.java|  1 +
 9 files changed, 57 insertions(+), 22 deletions(-)



[kafka] branch trunk updated (d53eab1 -> ef2fbfd)

2019-09-30 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.


from d53eab1  MINOR: Adjust logic of conditions to set number of partitions 
in step zero of assignment. (#7419)
 add ef2fbfd  KAFKA-8609: Add rebalance-latency-total (#7401)

No new revisions were added by this update.

Summary of changes:
 .../apache/kafka/clients/consumer/internals/AbstractCoordinator.java | 5 +
 .../kafka/clients/consumer/internals/AbstractCoordinatorTest.java| 2 ++
 2 files changed, 7 insertions(+)



[kafka] branch trunk updated (45c800f -> d53eab1)

2019-09-30 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.


from 45c800f  KAFKA-8911: Using proper WindowSerdes constructors in their 
implicit definitions (#7352)
 add d53eab1  MINOR: Adjust logic of conditions to set number of partitions 
in step zero of assignment. (#7419)

No new revisions were added by this update.

Summary of changes:
 .../streams/processor/internals/StreamsPartitionAssignor.java| 9 +
 1 file changed, 5 insertions(+), 4 deletions(-)



[kafka] branch trunk updated (22434e6 -> 863210d)

2019-09-27 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.


from 22434e6  KAFKA-8319: Make KafkaStreamsTest a non-integration test 
class (#7382)
 add 863210d  KAFKA-8934: Create version file during build for Streams 
(#7397)

No new revisions were added by this update.

Summary of changes:
 build.gradle | 40 
 1 file changed, 40 insertions(+)



[kafka] branch trunk updated (30443af -> 22434e6)

2019-09-27 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.


from 30443af  KAFKA-6883: Add toUpperCase support to 
sasl.kerberos.principal.to.local rule (KIP-309)
 add 22434e6  KAFKA-8319: Make KafkaStreamsTest a non-integration test 
class (#7382)

No new revisions were added by this update.

Summary of changes:
 .../kafka/clients/consumer/MockConsumer.java   |   1 -
 .../org/apache/kafka/streams/KafkaStreams.java |   3 +-
 .../org/apache/kafka/streams/KafkaStreamsTest.java | 878 ++---
 .../apache/kafka/streams/StreamsConfigTest.java|  26 +
 .../processor/internals/StreamThreadTest.java  |  20 +-
 .../org/apache/kafka/test/MockClientSupplier.java  |   2 +-
 6 files changed, 461 insertions(+), 469 deletions(-)



[kafka] branch trunk updated (74f8ae1 -> d112ffd)

2019-09-24 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.


from 74f8ae1  KAFKA-8179: do not suspend standby tasks during rebalance 
(#7321)
 add d112ffd  KAFKA-8880: Docs on upgrade-guide (#7385)

No new revisions were added by this update.

Summary of changes:
 docs/upgrade.html | 5 +
 1 file changed, 5 insertions(+)



[kafka] branch trunk updated (ad3b843 -> 74f8ae1)

2019-09-24 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.


from ad3b843  KAFKA-8580: Compute RocksDB metrics (#7263)
 add 74f8ae1  KAFKA-8179: do not suspend standby tasks during rebalance 
(#7321)

No new revisions were added by this update.

Summary of changes:
 .../consumer/ConsumerRebalanceListener.java|  16 +-
 .../consumer/internals/ConsumerCoordinator.java|  74 +++--
 .../org/apache/kafka/streams/KafkaStreams.java |  15 +-
 .../org/apache/kafka/streams/StreamsConfig.java|  43 ++-
 .../processor/internals/AssignedStandbyTasks.java  |  42 +++
 .../processor/internals/AssignedStreamsTasks.java  | 293 ++--
 .../streams/processor/internals/AssignedTasks.java | 137 +-
 .../processor/internals/ChangelogReader.java   |  15 +-
 .../processor/internals/StoreChangelogReader.java  |  24 +-
 .../streams/processor/internals/StreamThread.java  | 214 +--
 .../internals/StreamsPartitionAssignor.java|  39 ++-
 .../internals/StreamsRebalanceListener.java| 169 
 .../streams/processor/internals/TaskManager.java   | 297 ++---
 .../assignment/AssignorConfiguration.java  |  24 ++
 .../integration/RegexSourceIntegrationTest.java|   4 +-
 .../internals/AssignedStreamsTasksTest.java|  36 ++-
 .../processor/internals/MockChangelogReader.java   |  10 +-
 .../processor/internals/StreamThreadTest.java  |  66 +++--
 .../internals/StreamsPartitionAssignorTest.java|   2 +-
 .../processor/internals/TaskManagerTest.java   | 114 
 .../kafka/streams/tests/StreamsUpgradeTest.java|   7 +-
 21 files changed, 1085 insertions(+), 556 deletions(-)
 create mode 100644 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java



[kafka] branch trunk updated (bcc0237 -> ad3b843)

2019-09-24 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.


from bcc0237  KAFKA-8880: Add overloaded function of Consumer.committed 
(#7304)
 add ad3b843  KAFKA-8580: Compute RocksDB metrics (#7263)

No new revisions were added by this update.

Summary of changes:
 .../org/apache/kafka/streams/KafkaStreams.java |  72 +++--
 .../processor/internals/GlobalStreamThread.java|   7 +-
 .../streams/processor/internals/StreamThread.java  |   5 +
 .../internals/metrics/StreamsMetricsImpl.java  |  30 +-
 .../streams/state/internals/KeyValueSegments.java  |   8 +-
 .../streams/state/internals/RocksDBStore.java  |  34 +--
 .../state/internals/TimestampedSegments.java   |   6 -
 .../state/internals/metrics/RocksDBMetrics.java|   3 +-
 .../internals/metrics/RocksDBMetricsRecorder.java  | 182 
 .../metrics/RocksDBMetricsRecordingTrigger.java|  56 
 .../integration/MetricsIntegrationTest.java|   8 +-
 .../internals/GlobalStreamThreadTest.java  |  42 +--
 .../processor/internals/StreamThreadTest.java  |   3 +-
 .../streams/state/internals/RocksDBStoreTest.java  | 142 +++---
 .../state/internals/SegmentIteratorTest.java   |   5 +-
 .../metrics/RocksDBMetricsRecorderTest.java| 315 +
 .../RocksDBMetricsRecordingTriggerTest.java|  87 ++
 .../internals/metrics/RocksDBMetricsTest.java  |  22 +-
 .../kafka/test/InternalMockProcessorContext.java   |   2 +
 .../apache/kafka/streams/TopologyTestDriver.java   |   2 +
 20 files changed, 782 insertions(+), 249 deletions(-)
 create mode 100644 
streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecordingTrigger.java
 create mode 100644 
streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecordingTriggerTest.java



[kafka] branch trunk updated (1ae0956 -> bcc0237)

2019-09-24 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.


from 1ae0956  HOTFIX: fix Kafka Streams upgrade note for broker backward 
compatibility (#7363)
 add bcc0237  KAFKA-8880: Add overloaded function of Consumer.committed 
(#7304)

No new revisions were added by this update.

Summary of changes:
 .../apache/kafka/clients/consumer/Consumer.java| 12 
 .../kafka/clients/consumer/KafkaConsumer.java  | 80 --
 .../kafka/clients/consumer/MockConsumer.java   | 28 ++--
 .../kafka/clients/consumer/KafkaConsumerTest.java  | 20 +++---
 .../kafka/clients/consumer/MockConsumerTest.java   | 10 +--
 .../integration/kafka/api/ConsumerBounceTest.scala |  6 +-
 .../kafka/api/PlaintextConsumerTest.scala  | 46 ++---
 .../integration/kafka/api/TransactionsTest.scala   |  2 +-
 .../kafka/admin/ConsumerGroupCommandTest.scala | 14 ++--
 .../test/scala/unit/kafka/utils/TestUtils.scala|  9 +--
 .../streams/processor/internals/AbstractTask.java  | 20 --
 .../processor/internals/ProcessorStateManager.java |  9 ++-
 .../streams/processor/internals/StandbyTask.java   | 34 +
 .../streams/processor/internals/StreamTask.java| 50 +++---
 .../processor/internals/StandbyTaskTest.java   |  8 +--
 .../processor/internals/StreamTaskTest.java|  4 +-
 .../kafka/tools/TransactionalMessageCopier.java| 10 +--
 17 files changed, 235 insertions(+), 127 deletions(-)



[kafka] branch trunk updated (e98e239 -> b403561)

2019-09-23 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.


from e98e239  KAFKA-8859: Refactor cache-level metrics (#7367)
 add b403561  KAFKA-8086: Use 1 partition for offset topic when possible 
(#7356)

No new revisions were added by this update.

Summary of changes:
 .../test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala  | 1 +
 .../scala/integration/kafka/network/DynamicConnectionQuotaTest.scala  | 4 
 core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala   | 1 +
 3 files changed, 2 insertions(+), 4 deletions(-)



[kafka] branch trunk updated (8001aff -> d91a94e)

2019-09-20 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.


from 8001aff  KAFKA-8892: Display the sorted configs in Kafka Configs Help 
Command.
 add d91a94e  KAFKA-8609: Add consumer rebalance metrics (#7347)

No new revisions were added by this update.

Summary of changes:
 .../consumer/internals/AbstractCoordinator.java| 136 -
 .../consumer/internals/ConsumerCoordinator.java|  45 ++-
 .../clients/consumer/internals/Heartbeat.java  |   2 +-
 .../apache/kafka/common/metrics/JmxReporter.java   |   1 +
 .../internals/AbstractCoordinatorTest.java |  93 +-
 .../internals/ConsumerCoordinatorTest.java |  54 
 6 files changed, 287 insertions(+), 44 deletions(-)



[kafka] branch trunk updated (8aff871 -> a047072)

2019-09-19 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.


from 8aff871  MINOR: Fix bug where we would incorrectly load partition 
reassignment info from ZK (#7334)
 add 2d0cd2e  MINOR: Murmur3 Hash with Guava dependency
 add a047072  MINOR: Move Murmur3 to Streams

No new revisions were added by this update.

Summary of changes:
 checkstyle/suppressions.xml|   7 +
 gradle/spotbugs-exclude.xml|  19 +
 .../kafka/streams/state/internals/Murmur3.java | 548 +
 .../kafka/streams/state/internals/Murmur3Test.java |  70 +++
 4 files changed, 644 insertions(+)
 create mode 100644 
streams/src/main/java/org/apache/kafka/streams/state/internals/Murmur3.java
 create mode 100644 
streams/src/test/java/org/apache/kafka/streams/state/internals/Murmur3Test.java



[kafka-site] branch asf-site updated (52173cb -> 43379b5)

2019-09-18 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch asf-site
in repository https://gitbox.apache.org/repos/asf/kafka-site.git.


from 52173cb  KAFKA-8474; Update CSS for new config layout (#211)
 add 43379b5  MINOR: Update the meetup list (#230)

No new revisions were added by this update.

Summary of changes:
 events.html | 294 ++--
 1 file changed, 229 insertions(+), 65 deletions(-)



[kafka] branch trunk updated (935b280 -> 4962c81)

2019-09-17 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.


from 935b280  MINOR: Default to 5 partitions of the __consumer_offsets 
topic in Streams integration tests (#7331)
 add 4962c81  KAFKA-8839 : Improve streams debug logging (#7258)

No new revisions were added by this update.

Summary of changes:
 docs/streams/developer-guide/config-streams.html| 4 +++-
 streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java   | 2 +-
 .../kafka/streams/processor/internals/AssignedStreamsTasks.java | 2 +-
 .../org/apache/kafka/streams/processor/internals/AssignedTasks.java | 5 +++--
 .../kafka/streams/processor/internals/InternalTopicManager.java | 6 +++---
 .../org/apache/kafka/streams/processor/internals/StreamTask.java| 3 +--
 .../org/apache/kafka/streams/processor/internals/StreamThread.java  | 4 ++--
 .../kafka/streams/processor/internals/StreamsPartitionAssignor.java | 5 -
 .../org/apache/kafka/streams/processor/internals/TaskManager.java   | 2 +-
 9 files changed, 19 insertions(+), 14 deletions(-)



[kafka] branch trunk updated (bab3e08 -> 935b280)

2019-09-17 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.


from bab3e08  KAFKA-8859: Expose built-in streams metrics version in 
`StreamsMetricsImpl` (#7323)
 add 935b280  MINOR: Default to 5 partitions of the __consumer_offsets 
topic in Streams integration tests (#7331)

No new revisions were added by this update.

Summary of changes:
 .../org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java | 1 +
 1 file changed, 1 insertion(+)



[kafka] branch trunk updated (c5dfb90 -> bab3e08)

2019-09-16 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.


from c5dfb90  MINOR: Cleanup scala warnings (#7335)
 add bab3e08  KAFKA-8859: Expose built-in streams metrics version in 
`StreamsMetricsImpl` (#7323)

No new revisions were added by this update.

Summary of changes:
 .../processor/internals/GlobalStreamThread.java|  6 +++-
 .../streams/processor/internals/StreamThread.java  |  6 +++-
 .../internals/metrics/StreamsMetricsImpl.java  | 24 -
 .../processor/internals/MockStreamsMetrics.java|  3 +-
 .../processor/internals/StandbyTaskTest.java   |  3 +-
 .../processor/internals/StreamThreadTest.java  | 26 --
 .../internals/metrics/StreamsMetricsImplTest.java  | 42 ++
 .../internals/GlobalStateStoreProviderTest.java|  5 ++-
 .../MeteredTimestampedWindowStoreTest.java |  3 +-
 .../state/internals/MeteredWindowStoreTest.java|  3 +-
 .../kafka/test/InternalMockProcessorContext.java   | 26 +++---
 .../apache/kafka/streams/TopologyTestDriver.java   |  6 +++-
 .../streams/processor/MockProcessorContext.java|  2 +-
 13 files changed, 116 insertions(+), 39 deletions(-)



[kafka] branch trunk updated (6530600 -> 83c7c01)

2019-09-13 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.


from 6530600  MINOR: Add UUID type to Kafka API code generation (#7291)
 add 83c7c01  KAFKA-8755: Fix state restore for standby tasks with 
optimized topology (#7238)

No new revisions were added by this update.

Summary of changes:
 .../streams/processor/internals/AbstractTask.java  |  37 +--
 .../processor/internals/AssignedStandbyTasks.java  |  10 +
 .../streams/processor/internals/StandbyTask.java   |  67 -
 .../processor/internals/StoreChangelogReader.java  |  60 ++--
 .../streams/processor/internals/StreamTask.java|  42 ++-
 .../OptimizedKTableIntegrationTest.java| 335 +
 .../processor/internals/AbstractTaskTest.java  |  69 +
 .../processor/internals/StandbyTaskTest.java   |  90 ++
 .../internals/StoreChangelogReaderTest.java|  58 
 .../processor/internals/StreamTaskTest.java|  63 +++-
 .../StreamThreadStateStoreProviderTest.java|   5 +-
 11 files changed, 698 insertions(+), 138 deletions(-)
 create mode 100644 
streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java



[kafka] branch trunk updated (d3559f6 -> 23708b7)

2019-09-12 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.


from d3559f6  KAFKA-8875; CreateTopic API should check topic existence 
before replication factor (#7298)
 add 23708b7  KAFKA-8355: add static membership to range assignor (#7014)

No new revisions were added by this update.

Summary of changes:
 .../kafka/clients/consumer/RangeAssignor.java  |  45 +++-
 .../kafka/clients/consumer/RoundRobinAssignor.java |   3 +-
 .../kafka/clients/consumer/RangeAssignorTest.java  | 252 -
 .../clients/consumer/RoundRobinAssignorTest.java   | 119 --
 4 files changed, 283 insertions(+), 136 deletions(-)



[kafka] branch trunk updated (6882b3b -> 6a3a580)

2019-09-11 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.


from 6882b3b  KAFKA-8886; Make Authorizer create/delete API asynchronous 
(#7316)
 add 6a3a580  KAFKA-8856: Add Streams config for backward-compatible 
metrics (#7279)

No new revisions were added by this update.

Summary of changes:
 .../org/apache/kafka/streams/StreamsConfig.java| 23 ++
 .../apache/kafka/streams/StreamsConfigTest.java| 36 +-
 2 files changed, 58 insertions(+), 1 deletion(-)



[kafka] branch trunk updated (7012fa3 -> a043edb)

2019-09-10 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.


from 7012fa3  KAFKA-8747; Add atomic counter to fix flaky 
testEventQueueTime test (#7320)
 add a043edb  KAFKA-8817: Remove timeout for the whole test (#7313)

No new revisions were added by this update.

Summary of changes:
 .../org/apache/kafka/clients/producer/KafkaProducerTest.java | 12 ++--
 1 file changed, 6 insertions(+), 6 deletions(-)



[kafka] branch trunk updated (e59e4ca -> d54285f)

2019-09-09 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.


from e59e4ca  KAFKA-8222 & KIP-345 part 5: admin request to batch remove 
members (#7122)
 add d54285f  KAFKA-8889: Log the details about error (#7317)

No new revisions were added by this update.

Summary of changes:
 clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)



[kafka] branch 2.0 updated (e6eda88 -> 9b35df9)

2019-09-09 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git.


from e6eda88  KAFKA-8861 Fix flaky 
RegexSourceIntegrationTest.testMultipleConsumersCanReadFromPartitionedTopic 
(#7281)
 add 9b35df9  KAFKA-8816: Make offsets immutable to users of 
RecordCollector.offsets (#7223)

No new revisions were added by this update.

Summary of changes:
 .../processor/internals/RecordCollector.java   |  2 +-
 .../processor/internals/RecordCollectorImpl.java   |  3 ++-
 .../streams/processor/internals/StreamTask.java|  3 ++-
 .../processor/internals/RecordCollectorTest.java   | 28 ++
 4 files changed, 33 insertions(+), 3 deletions(-)



[kafka] branch 2.2 updated (619729c -> ad14d92)

2019-09-09 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git.


from 619729c  HOTFIX: The cherry-pick for 
https://github.com/apache/kafka/pull/7207 to 2.2 causes a failure in 
AssignedStreamsTasksTest
 add ad14d92  KAFKA-8816: Make offsets immutable to users of 
RecordCollector.offsets (#7223)

No new revisions were added by this update.

Summary of changes:
 .../processor/internals/RecordCollector.java   |  2 +-
 .../processor/internals/RecordCollectorImpl.java   |  3 ++-
 .../streams/processor/internals/StreamTask.java|  3 ++-
 .../processor/internals/RecordCollectorTest.java   | 28 ++
 4 files changed, 33 insertions(+), 3 deletions(-)



[kafka] branch 2.3 updated: KAFKA-8816: Make offsets immutable to users of RecordCollector.offsets (#7223)

2019-09-09 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
 new 29514bd  KAFKA-8816: Make offsets immutable to users of 
RecordCollector.offsets (#7223)
29514bd is described below

commit 29514bd45c8c2eadfb1bee112917ec0bbc68f884
Author: cpettitt-confluent 
<53191309+cpettitt-conflu...@users.noreply.github.com>
AuthorDate: Mon Aug 26 13:59:49 2019 -0700

KAFKA-8816: Make offsets immutable to users of RecordCollector.offsets 
(#7223)

Make offsets immutable to users of RecordCollector.offsets. Fix up an
existing case where offsets could be modified in this way. Add a simple
test to verify offsets cannot be changed externally.

Reviewers: Bruno Cadonna , Guozhang Wang 
, Matthias J. Sax 
---
 .../processor/internals/RecordCollector.java   |  2 +-
 .../processor/internals/RecordCollectorImpl.java   |  3 ++-
 .../streams/processor/internals/StreamTask.java|  3 ++-
 .../processor/internals/RecordCollectorTest.java   | 28 ++
 4 files changed, 33 insertions(+), 3 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
index bbfb049..b8b99a3 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
@@ -63,7 +63,7 @@ public interface RecordCollector extends AutoCloseable {
 /**
  * The last acked offsets from the internal {@link Producer}.
  *
- * @return the map from TopicPartition to offset
+ * @return an immutable map from TopicPartition to offset
  */
 Map offsets();
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
index 2e9ead8..45dda41 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import java.util.Collections;
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -281,7 +282,7 @@ public class RecordCollectorImpl implements RecordCollector 
{
 
 @Override
 public Map offsets() {
-return offsets;
+return Collections.unmodifiableMap(offsets);
 }
 
 // for testing only
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 3ab2524..067b36c 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -504,7 +504,8 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator
 
 @Override
 protected Map activeTaskCheckpointableOffsets() {
-final Map checkpointableOffsets = 
recordCollector.offsets();
+final Map checkpointableOffsets =
+new HashMap<>(recordCollector.offsets());
 for (final Map.Entry entry : 
consumedOffsets.entrySet()) {
 checkpointableOffsets.putIfAbsent(entry.getKey(), 
entry.getValue());
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
index a7da2cb..22fb2ec 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
@@ -50,7 +50,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Future;
 
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -145,6 +148,31 @@ public class RecordCollectorTest {
 assertEquals((Long) 0L, offsets.get(new TopicPartition("topic1", 2)));
 }
 
+@Test
+public void shouldNotAllowOffsetsToBeUpdatedExternally() {
+final String topic = "topic1";
+final TopicPartition topicPartition = new TopicPartition(topic, 0);
+
+final Rec

[kafka] branch 2.1 updated: KAFKA-8816: Make offsets immutable to users of RecordCollector.offsets (#7223)

2019-09-09 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
 new 3242843  KAFKA-8816: Make offsets immutable to users of 
RecordCollector.offsets (#7223)
3242843 is described below

commit 3242843021933ea9a51e0633e413bc24f6c9165c
Author: cpettitt-confluent 
<53191309+cpettitt-conflu...@users.noreply.github.com>
AuthorDate: Mon Aug 26 13:59:49 2019 -0700

KAFKA-8816: Make offsets immutable to users of RecordCollector.offsets 
(#7223)

Make offsets immutable to users of RecordCollector.offsets. Fix up an
existing case where offsets could be modified in this way. Add a simple
test to verify offsets cannot be changed externally.

Reviewers: Bruno Cadonna , Guozhang Wang 
, Matthias J. Sax 
---
 .../processor/internals/RecordCollector.java   |  2 +-
 .../processor/internals/RecordCollectorImpl.java   |  3 ++-
 .../streams/processor/internals/StreamTask.java|  3 ++-
 .../processor/internals/RecordCollectorTest.java   | 28 ++
 4 files changed, 33 insertions(+), 3 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
index 09de11d..d31d596 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
@@ -63,7 +63,7 @@ public interface RecordCollector {
 /**
  * The last acked offsets from the internal {@link Producer}.
  *
- * @return the map from TopicPartition to offset
+ * @return an immutable map from TopicPartition to offset
  */
 Map offsets();
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
index 9f9e100..5294c4d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import java.util.Collections;
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -267,7 +268,7 @@ public class RecordCollectorImpl implements RecordCollector 
{
 
 @Override
 public Map offsets() {
-return offsets;
+return Collections.unmodifiableMap(offsets);
 }
 
 // for testing only
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 5ee00e8..4c12757 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -479,7 +479,8 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator
 
 @Override
 protected Map activeTaskCheckpointableOffsets() {
-final Map checkpointableOffsets = 
recordCollector.offsets();
+final Map checkpointableOffsets =
+new HashMap<>(recordCollector.offsets());
 for (final Map.Entry entry : 
consumedOffsets.entrySet()) {
 checkpointableOffsets.putIfAbsent(entry.getKey(), 
entry.getValue());
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
index 22163ce..ea58333 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
@@ -50,7 +50,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Future;
 
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -145,6 +148,31 @@ public class RecordCollectorTest {
 assertEquals((Long) 0L, offsets.get(new TopicPartition("topic1", 2)));
 }
 
+@Test
+public void shouldNotAllowOffsetsToBeUpdatedExternally() {
+final String topic = "topic1";
+final TopicPartition topicPartition = new TopicPartition(topic, 0);
+
+final RecordCollectorImpl col

[kafka] branch trunk updated (312e4db -> e59e4ca)

2019-09-09 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.


from 312e4db  MINOR. implement --expose-ports option in ducker-ak (#7269)
 add e59e4ca  KAFKA-8222 & KIP-345 part 5: admin request to batch remove 
members (#7122)

No new revisions were added by this update.

Summary of changes:
 .../java/org/apache/kafka/clients/admin/Admin.java |  12 ++
 .../kafka/clients/admin/KafkaAdminClient.java  |  67 -
 ...AclsResult.java => MembershipChangeResult.java} |  25 ++--
 .../RemoveMemberFromConsumerGroupOptions.java  |  50 +++
 .../clients/admin/RemoveMemberFromGroupResult.java |  85 
 .../kafka/common/requests/LeaveGroupResponse.java  |   4 +
 .../common/message/LeaveGroupResponse.json |   2 +-
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 133 --
 .../clients/admin/MembershipChangeResultTest.java  |  50 +++
 .../kafka/clients/admin/MockAdminClient.java   |   5 +
 .../RemoveMemberFromConsumerGroupOptionsTest.java  |  21 ++-
 .../admin/RemoveMemberFromGroupResultTest.java | 154 +
 .../kafka/api/AdminClientIntegrationTest.scala |  85 ++--
 13 files changed, 652 insertions(+), 41 deletions(-)
 copy 
clients/src/main/java/org/apache/kafka/clients/admin/{DescribeAclsResult.java 
=> MembershipChangeResult.java} (60%)
 create mode 100644 
clients/src/main/java/org/apache/kafka/clients/admin/RemoveMemberFromConsumerGroupOptions.java
 create mode 100644 
clients/src/main/java/org/apache/kafka/clients/admin/RemoveMemberFromGroupResult.java
 create mode 100644 
clients/src/test/java/org/apache/kafka/clients/admin/MembershipChangeResultTest.java
 copy 
connect/api/src/test/java/org/apache/kafka/connect/storage/ConverterTypeTest.java
 => 
clients/src/test/java/org/apache/kafka/clients/admin/RemoveMemberFromConsumerGroupOptionsTest.java
 (59%)
 create mode 100644 
clients/src/test/java/org/apache/kafka/clients/admin/RemoveMemberFromGroupResultTest.java



[kafka] branch trunk updated (7b62248 -> 0f177ea)

2019-09-08 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.


from 7b62248  MINOR: Add cause to thrown exception when deleting topic in 
TopicCommand (#7301)
 add 0f177ea  MINOR: Clean up partition assignment logic (#7249)

No new revisions were added by this update.

Summary of changes:
 .../streams/internals/QuietStreamsConfig.java  |   0
 .../processor/internals/InternalTopicConfig.java   |  10 +-
 .../processor/internals/InternalTopicManager.java  |  13 +-
 .../streams/processor/internals/StreamThread.java  |   9 +-
 .../internals/StreamsPartitionAssignor.java| 548 -
 .../streams/processor/internals/TaskManager.java   |   6 +-
 .../internals/assignment/AssignmentInfo.java   |   7 +-
 .../assignment/AssignorConfiguration.java  | 188 +++
 .../{TaskAssignor.java => AssignorError.java}  |  17 +-
 .../assignment/CopartitionedTopicsEnforcer.java| 110 +
 ...java => StreamsAssignmentProtocolVersions.java} |  13 +-
 .../internals/assignment/SubscriptionInfo.java |   7 +-
 ...t.java => CopartitionedTopicsEnforcerTest.java} |  43 +-
 .../internals/StreamsPartitionAssignorTest.java| 174 ---
 .../internals/assignment/AssignmentInfoTest.java   |  16 +-
 .../internals/assignment/SubscriptionInfoTest.java |  22 +-
 .../kafka/streams/tests/StreamsUpgradeTest.java|  35 +-
 .../kafka/test/MockInternalTopicManager.java   |   2 +-
 18 files changed, 711 insertions(+), 509 deletions(-)
 rename streams/{test-utils => 
}/src/main/java/org/apache/kafka/streams/internals/QuietStreamsConfig.java 
(100%)
 create mode 100644 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
 copy 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/{TaskAssignor.java
 => AssignorError.java} (77%)
 create mode 100644 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/CopartitionedTopicsEnforcer.java
 copy 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/{TaskAssignor.java
 => StreamsAssignmentProtocolVersions.java} (62%)
 rename 
streams/src/test/java/org/apache/kafka/streams/processor/internals/{CopartitionedTopicsValidatorTest.java
 => CopartitionedTopicsEnforcerTest.java} (73%)



[kafka] branch trunk updated (deac5d9 -> ffef087)

2019-09-05 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.


from deac5d9  KAFKA-8724; Improve range checking when computing cleanable 
partitions (#7264)
 add ffef087  KAFKA-7149 : Reducing streams assignment data size  (#7185)

No new revisions were added by this update.

Summary of changes:
 .../internals/StreamsPartitionAssignor.java|  45 +---
 .../internals/assignment/AssignmentInfo.java   | 118 ++---
 .../internals/assignment/SubscriptionInfo.java |  34 +++---
 .../internals/assignment/AssignmentInfoTest.java   |   7 ++
 .../kafka/streams/tests/StreamsUpgradeTest.java|   2 +-
 .../tests/streams/streams_upgrade_test.py  |  14 +--
 6 files changed, 163 insertions(+), 57 deletions(-)



[kafka] branch trunk updated (18e6bb2 -> 8d8e2fb)

2019-09-04 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.


from 18e6bb2  KAFKA-8861 Fix flaky 
RegexSourceIntegrationTest.testMultipleConsumersCanReadFromPartitionedTopic 
(#7281)
 add 8d8e2fb  KAFKA-8729, pt 1: Add 4 new metrics to keep track of various 
types of invalid record rejections (#7142)

No new revisions were added by this update.

Summary of changes:
 core/src/main/scala/kafka/log/Log.scala|   9 +-
 core/src/main/scala/kafka/log/LogValidator.scala   | 119 -
 .../scala/kafka/server/KafkaRequestHandler.scala   |  21 ++-
 core/src/test/scala/unit/kafka/log/LogTest.scala   |  12 +-
 .../scala/unit/kafka/log/LogValidatorTest.scala| 197 +
 .../scala/unit/kafka/metrics/MetricsTest.scala |  32 ++--
 .../unit/kafka/server/ProduceRequestTest.scala |   5 +
 .../test/scala/unit/kafka/utils/TestUtils.scala|  11 ++
 8 files changed, 300 insertions(+), 106 deletions(-)



[kafka] branch trunk updated (d18d6b0 -> 40432e3)

2019-08-30 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.


from d18d6b0  MINOR: Refactor tag key for store level metrics (#7257)
 add 40432e3  MONIR: Check for NULL in case of version probing (#7275)

No new revisions were added by this update.

Summary of changes:
 .../org/apache/kafka/streams/processor/internals/TaskManager.java| 5 ++---
 1 file changed, 2 insertions(+), 3 deletions(-)



[kafka] branch 2.3 updated: HOTFIX: AssignedStreamsTasksTest lacks one parameter

2019-08-29 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
 new 2a38ae7  HOTFIX: AssignedStreamsTasksTest lacks one parameter
2a38ae7 is described below

commit 2a38ae7c492292282ed4c42845d4348e2eb166d5
Author: Guozhang Wang 
AuthorDate: Thu Aug 29 18:18:31 2019 -0700

HOTFIX: AssignedStreamsTasksTest lacks one parameter
---
 .../kafka/streams/processor/internals/AssignedStreamsTasksTest.java| 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
index ca51a3b..1833052 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
@@ -519,7 +519,8 @@ public class AssignedStreamsTasksTest {
 stateDirectory,
 null,
 time,
-() -> producer);
+() -> producer,
+metrics.sensor("dummy"));
 
 assignedTasks.addNewTask(task);
 assignedTasks.initializeNewTasks();



[kafka] branch trunk updated (d2741e5 -> 09ad6b8)

2019-08-29 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.


from d2741e5  MINOR: Remove `activeTaskCheckpointableOffsets` from 
`AbstractTask` (#7253)
 add 09ad6b8  MINOR. Fix 2.3.0 streams systest dockerfile typo (#7272)

No new revisions were added by this update.

Summary of changes:
 tests/docker/Dockerfile | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)



[kafka] branch trunk updated (fcfee61 -> d2741e5)

2019-08-29 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.


from fcfee61  MINOR: Only send delete request if there are offsets in map 
(#7256)
 add d2741e5  MINOR: Remove `activeTaskCheckpointableOffsets` from 
`AbstractTask` (#7253)

No new revisions were added by this update.

Summary of changes:
 .../org/apache/kafka/streams/processor/internals/AbstractTask.java | 6 --
 .../org/apache/kafka/streams/processor/internals/StreamTask.java   | 7 ++-
 2 files changed, 2 insertions(+), 11 deletions(-)



[kafka] branch 2.3 updated: MINOR: Only send delete request if there are offsets in map (#7256)

2019-08-28 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
 new f1244e5  MINOR: Only send delete request if there are offsets in map 
(#7256)
f1244e5 is described below

commit f1244e508d6e25c2ee603578a0c897af235fc93a
Author: Bill Bejeck 
AuthorDate: Wed Aug 28 12:22:36 2019 -0400

MINOR: Only send delete request if there are offsets in map (#7256)

Currently on commit streams will attempt to delete offsets from repartition 
topics. However, if a topology does not have any repartition topics, then the 
recordsToDelete map will be empty.

This PR adds a check that the recordsToDelete is not empty before executing 
the AdminClient#deleteRecords() method.

Reviewers: A. Sophie Blee-Goldman , Guozhang Wang 

---
 .../org/apache/kafka/streams/processor/internals/TaskManager.java  | 7 ---
 1 file changed, 4 insertions(+), 3 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index c136fdb..3dc8404 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -449,9 +449,10 @@ public class TaskManager {
 for (final Map.Entry entry : 
active.recordsToDelete().entrySet()) {
 recordsToDelete.put(entry.getKey(), 
RecordsToDelete.beforeOffset(entry.getValue()));
 }
-deleteRecordsResult = adminClient.deleteRecords(recordsToDelete);
-
-log.trace("Sent delete-records request: {}", recordsToDelete);
+if (!recordsToDelete.isEmpty()) {
+deleteRecordsResult = 
adminClient.deleteRecords(recordsToDelete);
+log.trace("Sent delete-records request: {}", recordsToDelete);
+}
 }
 }
 



[kafka] branch trunk updated (d32a2d1 -> fcfee61)

2019-08-28 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.


from d32a2d1  KAFKA-8837: KafkaMetricReporterClusterIdTest may not shutdown 
ZooKeeperTestHarness (#7255)
 add fcfee61  MINOR: Only send delete request if there are offsets in map 
(#7256)

No new revisions were added by this update.

Summary of changes:
 .../org/apache/kafka/streams/processor/internals/TaskManager.java  | 7 ---
 1 file changed, 4 insertions(+), 3 deletions(-)



[kafka-site] branch asf-site updated: MINOR: Added Agoda and TokenAnalyst to powered-by page (#226)

2019-08-28 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/kafka-site.git


The following commit(s) were added to refs/heads/asf-site by this push:
 new af666db  MINOR: Added Agoda and TokenAnalyst to powered-by page (#226)
af666db is described below

commit af666dbee7956aca3688d9b1be341ec6042e7f63
Author: Hartanto Thio 
AuthorDate: Wed Aug 28 08:56:16 2019 -0700

MINOR: Added Agoda and TokenAnalyst to powered-by page (#226)
---
 images/powered-by/agoda.png| Bin 0 -> 50229 bytes
 images/powered-by/tokenanalyst.png | Bin 0 -> 20335 bytes
 powered-by.html|  10 ++
 3 files changed, 10 insertions(+)

diff --git a/images/powered-by/agoda.png b/images/powered-by/agoda.png
new file mode 100644
index 000..1724421
Binary files /dev/null and b/images/powered-by/agoda.png differ
diff --git a/images/powered-by/tokenanalyst.png 
b/images/powered-by/tokenanalyst.png
new file mode 100644
index 000..dcbd717
Binary files /dev/null and b/images/powered-by/tokenanalyst.png differ
diff --git a/powered-by.html b/powered-by.html
index e8269fb..c7d4faa 100644
--- a/powered-by.html
+++ b/powered-by.html
@@ -38,6 +38,11 @@
 "logoBgColor": "#ff",
 "description": "adidas uses Kafka as the core of Fast Data Streaming 
Platform, integrating source systems and enabling teams to implement real-time 
event processing for monitoring, analytics and reporting solutions."
 }, {
+"link": "https://www.agoda.com/;,
+"logo": "agoda.png",
+"logoBgColor": "#ff",
+"description": "Apache Kafka powers the backbone of Agoda's data 
pipeline with trillions of events streaming through daily across multiple data 
centers. The majority of the events are destined for analytical systems and 
directly influence business decisions at one of the world’s fastest growing 
online travel booking platforms."
+}, {
 "link": "http://www.amadeus.com;,
 "logo": "amadeus.jpg",
 "logoBgColor": "#ff",
@@ -398,6 +403,11 @@
 "logoBgColor": "#ff",
 "description": "Apache Kafka drives our new pub sub system which 
delivers real-time events for users in our latest game - Deckadence. It will 
soon be used in a host of new use cases including group chat and back end stats 
and log collection."
 }, {
+"link": "https://www.tokenanalyst.io/;,
+"logo": "tokenanalyst.png",
+"logoBgColor": "#ff",
+"description": "At TokenAnalyst, we’re using Kafka for ingestion of 
blockchain data—which is directly pushed from our cluster of Bitcoin and 
Ethereum nodes—to different streams of transformation and loading processes."
+}, {
 "link": "https://www.tumblr.com/;,
 "logo": "tumblr.png",
 "logoBgColor": "#5eba8c",



[kafka] branch trunk updated (e23a718 -> cf32a1a)

2019-08-27 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.


from e23a718  KAFKA-8745: DumpLogSegments doesn't show keys, when the 
message is null (#7152)
 add cf32a1a  KAFKA-8179: Part 4, add CooperativeStickyAssignor (#7130)

No new revisions were added by this update.

Summary of changes:
 .../consumer/CooperativeStickyAssignor.java| 105 +++
 .../kafka/clients/consumer/StickyAssignor.java | 885 +
 .../AbstractStickyAssignor.java}   | 331 ++--
 .../consumer/internals/PartitionAssignor.java  |   5 +-
 .../internals/PartitionAssignorAdapter.java|   6 +-
 .../consumer/CooperativeStickyAssignorTest.java| 105 +++
 .../kafka/clients/consumer/StickyAssignorTest.java | 868 ++--
 .../AbstractStickyAssignorTest.java}   | 513 
 docs/upgrade.html  |   8 +-
 .../streams/integration/EosIntegrationTest.java|   2 +-
 10 files changed, 513 insertions(+), 2315 deletions(-)
 create mode 100644 
clients/src/main/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.java
 copy 
clients/src/main/java/org/apache/kafka/clients/consumer/{StickyAssignor.java => 
internals/AbstractStickyAssignor.java} (75%)
 create mode 100644 
clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java
 copy 
clients/src/test/java/org/apache/kafka/clients/consumer/{StickyAssignorTest.java
 => internals/AbstractStickyAssignorTest.java} (57%)



[kafka] branch trunk updated (add6629 -> e23a718)

2019-08-27 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.


from add6629  MINOR: fix ProduceBenchWorker not to fail on final produce 
(#7254)
 add e23a718  KAFKA-8745: DumpLogSegments doesn't show keys, when the 
message is null (#7152)

No new revisions were added by this update.

Summary of changes:
 .../main/scala/kafka/tools/DumpLogSegments.scala   | 12 ++---
 .../unit/kafka/tools/DumpLogSegmentsTest.scala | 61 +-
 2 files changed, 43 insertions(+), 30 deletions(-)



[kafka] branch trunk updated (24547b8 -> add6629)

2019-08-27 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.


from 24547b8  KAFKA-8579: Expose RocksDB metrics (#7209)
 add add6629  MINOR: fix ProduceBenchWorker not to fail on final produce 
(#7254)

No new revisions were added by this update.

Summary of changes:
 .../java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java  | 6 +-
 1 file changed, 5 insertions(+), 1 deletion(-)



[kafka] branch trunk updated (6b24b2e -> 24547b8)

2019-08-26 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.


from 6b24b2e  KAFKA-8816: Make offsets immutable to users of 
RecordCollector.offsets (#7223)
 add 24547b8  KAFKA-8579: Expose RocksDB metrics (#7209)

No new revisions were added by this update.

Summary of changes:
 .../internals/metrics/StreamsMetricsImpl.java  |   2 +-
 .../streams/state/internals/KeyValueSegment.java   |   6 +-
 .../streams/state/internals/KeyValueSegments.java  |  14 +-
 .../internals/RocksDBSegmentedBytesStore.java  |   4 +-
 .../streams/state/internals/RocksDBStore.java  |  50 ++-
 .../RocksDBTimestampedSegmentedBytesStore.java |   4 +-
 .../state/internals/RocksDBTimestampedStore.java   |  11 +-
 .../RocksDbKeyValueBytesStoreSupplier.java |   4 +-
 .../state/internals/TimestampedSegment.java|   6 +-
 .../state/internals/TimestampedSegments.java   |  14 +-
 .../state/internals/metrics/RocksDBMetrics.java|  67 ++---
 .../internals/metrics/RocksDBMetricsRecorder.java  | 133 +
 .../integration/MetricsIntegrationTest.java| 163 +++--
 .../streams/state/KeyValueStoreTestDriver.java |   1 +
 .../state/internals/KeyValueSegmentTest.java   |  33 +++--
 .../state/internals/KeyValueSegmentsTest.java  |  11 +-
 .../internals/RocksDBSegmentedBytesStoreTest.java  |   6 +-
 .../streams/state/internals/RocksDBStoreTest.java  |  89 ++-
 .../RocksDBTimestampedSegmentedBytesStoreTest.java |   6 +-
 .../internals/RocksDBTimestampedStoreTest.java |   4 +-
 .../state/internals/RocksDBWindowStoreTest.java|   4 +-
 .../state/internals/SegmentIteratorTest.java   |   7 +-
 .../TimestampedKeyValueStoreBuilderTest.java   |   4 +-
 .../state/internals/TimestampedSegmentTest.java|  33 +++--
 .../state/internals/TimestampedSegmentsTest.java   |  12 +-
 .../metrics/RocksDBMetricsRecorderTest.java| 144 ++
 .../internals/metrics/RocksDBMetricsTest.java  |   5 +-
 27 files changed, 733 insertions(+), 104 deletions(-)
 create mode 100644 
streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java
 create mode 100644 
streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java



[kafka] branch trunk updated (d08bcae -> 6b24b2e)

2019-08-26 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.


from d08bcae  KAFKA-8669: Add security providers in kafka security config 
(#7090)
 add 6b24b2e  KAFKA-8816: Make offsets immutable to users of 
RecordCollector.offsets (#7223)

No new revisions were added by this update.

Summary of changes:
 .../processor/internals/RecordCollector.java   |  2 +-
 .../processor/internals/RecordCollectorImpl.java   |  3 ++-
 .../streams/processor/internals/StreamTask.java|  3 ++-
 .../processor/internals/RecordCollectorTest.java   | 28 ++
 4 files changed, 33 insertions(+), 3 deletions(-)



[kafka] branch 2.1 updated: KAFKA-8412: Fix nullpointer exception thrown on flushing before closing producers (#7207)

2019-08-26 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
 new 8cdc2d6  KAFKA-8412: Fix nullpointer exception thrown on flushing 
before closing producers (#7207)
8cdc2d6 is described below

commit 8cdc2d6d1b2a227f5e62f148179a52684b230bd6
Author: cpettitt-confluent 
<53191309+cpettitt-conflu...@users.noreply.github.com>
AuthorDate: Mon Aug 26 09:53:36 2019 -0700

KAFKA-8412: Fix nullpointer exception thrown on flushing before closing 
producers (#7207)

Prior to this change an NPE is raised when calling AssignedTasks.close
under the following conditions:

1. EOS is enabled
2. The task was in a suspended state

The cause for the NPE is that when a clean close is requested for a
StreamTask the StreamTask tries to commit. However, in the suspended
state there is no producer so ultimately an NPE is thrown for the
contained RecordCollector in flush.

The fix put forth in this commit is to have AssignedTasks call
closeSuspended when it knows the underlying StreamTask is suspended.

Note also that this test is quite involved. I could have just tested
that AssignedTasks calls closeSuspended when appropriate, but that is
testing, IMO, a detail of the implementation and doesn't actually verify
we reproduced the original problem as it was described. I feel much more
confident that we are reproducing the behavior - and we can test exactly
the conditions that lead to it - when testing across AssignedTasks and
StreamTask. I believe this is an additional support for the argument of
eventually consolidating the state split across classes.

Reviewers: Matthias J. Sax , Guozhang Wang 

---
 checkstyle/import-control.xml  |   2 +
 .../streams/processor/internals/AssignedTasks.java |  17 ++-
 .../internals/AssignedStreamsTasksTest.java| 129 +++--
 .../processor/internals/StreamTaskTest.java|  10 +-
 4 files changed, 136 insertions(+), 22 deletions(-)

diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index cad8f8b..636fae0 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -251,8 +251,10 @@
   
 
 
+
 
 
+
 
   
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index 3cc396d..9ef225f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -392,18 +392,23 @@ abstract class AssignedTasks {
 
 void close(final boolean clean) {
 final AtomicReference firstException = new 
AtomicReference<>(null);
-for (final T task : allTasks()) {
+
+for (final T task: allTasks()) {
 try {
-task.close(clean, false);
+if (suspended.containsKey(task.id())) {
+task.closeSuspended(clean, false, null);
+} else {
+task.close(clean, false);
+}
 } catch (final TaskMigratedException e) {
 log.info("Failed to close {} {} since it got migrated to 
another thread already. " +
-"Closing it as zombie and move on.", taskTypeName, 
task.id());
+"Closing it as zombie and move on.", taskTypeName, 
task.id());
 firstException.compareAndSet(null, closeZombieTask(task));
 } catch (final RuntimeException t) {
 log.error("Failed while closing {} {} due to the following 
error:",
-  task.getClass().getSimpleName(),
-  task.id(),
-  t);
+task.getClass().getSimpleName(),
+task.id(),
+t);
 if (clean) {
 if (!closeUnclean(task)) {
 firstException.compareAndSet(null, t);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
index fe71135..349c463 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
@@ -17,27 +17,42 @@
 
 package org.apache.kafka.streams.processor.internals;
 
+import static org.hamcrest.CoreMatchers.not

[kafka] branch 2.2 updated: KAFKA-8412: Fix nullpointer exception thrown on flushing before closing producers (#7207)

2019-08-26 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.2 by this push:
 new b29dde6  KAFKA-8412: Fix nullpointer exception thrown on flushing 
before closing producers (#7207)
b29dde6 is described below

commit b29dde6f6eabcf2d658f1d4055a55d139783eb4c
Author: cpettitt-confluent 
<53191309+cpettitt-conflu...@users.noreply.github.com>
AuthorDate: Mon Aug 26 09:53:36 2019 -0700

KAFKA-8412: Fix nullpointer exception thrown on flushing before closing 
producers (#7207)

Prior to this change an NPE is raised when calling AssignedTasks.close
under the following conditions:

1. EOS is enabled
2. The task was in a suspended state

The cause for the NPE is that when a clean close is requested for a
StreamTask the StreamTask tries to commit. However, in the suspended
state there is no producer so ultimately an NPE is thrown for the
contained RecordCollector in flush.

The fix put forth in this commit is to have AssignedTasks call
closeSuspended when it knows the underlying StreamTask is suspended.

Note also that this test is quite involved. I could have just tested
that AssignedTasks calls closeSuspended when appropriate, but that is
testing, IMO, a detail of the implementation and doesn't actually verify
we reproduced the original problem as it was described. I feel much more
confident that we are reproducing the behavior - and we can test exactly
the conditions that lead to it - when testing across AssignedTasks and
StreamTask. I believe this is an additional support for the argument of
eventually consolidating the state split across classes.

Reviewers: Matthias J. Sax , Guozhang Wang 

---
 checkstyle/import-control.xml  |   2 +
 .../streams/processor/internals/AssignedTasks.java |  17 ++-
 .../internals/AssignedStreamsTasksTest.java| 129 +++--
 .../processor/internals/StreamTaskTest.java|  10 +-
 4 files changed, 136 insertions(+), 22 deletions(-)

diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index f4955ce..55e4cf2 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -265,8 +265,10 @@
   
 
 
+
 
 
+
 
   
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index a9baa3f..6a39df1 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -332,18 +332,23 @@ abstract class AssignedTasks {
 
 void close(final boolean clean) {
 final AtomicReference firstException = new 
AtomicReference<>(null);
-for (final T task : allTasks()) {
+
+for (final T task: allTasks()) {
 try {
-task.close(clean, false);
+if (suspended.containsKey(task.id())) {
+task.closeSuspended(clean, false, null);
+} else {
+task.close(clean, false);
+}
 } catch (final TaskMigratedException e) {
 log.info("Failed to close {} {} since it got migrated to 
another thread already. " +
-"Closing it as zombie and move on.", taskTypeName, 
task.id());
+"Closing it as zombie and move on.", taskTypeName, 
task.id());
 firstException.compareAndSet(null, closeZombieTask(task));
 } catch (final RuntimeException t) {
 log.error("Failed while closing {} {} due to the following 
error:",
-  task.getClass().getSimpleName(),
-  task.id(),
-  t);
+task.getClass().getSimpleName(),
+task.id(),
+t);
 if (clean) {
 if (!closeUnclean(task)) {
 firstException.compareAndSet(null, t);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
index ffd0f8b..ca51a3b 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
@@ -17,27 +17,42 @@
 
 package org.apache.kafka.streams.processor.internals;
 
+import static org.hamcrest.CoreMatchers.not

[kafka] branch 2.3 updated: KAFKA-8412: Fix nullpointer exception thrown on flushing before closing producers (#7207)

2019-08-26 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
 new e0e2dbc  KAFKA-8412: Fix nullpointer exception thrown on flushing 
before closing producers (#7207)
e0e2dbc is described below

commit e0e2dbc9d89b0fcbf49ac9bf30a7159d3c27c9f8
Author: cpettitt-confluent 
<53191309+cpettitt-conflu...@users.noreply.github.com>
AuthorDate: Mon Aug 26 09:53:36 2019 -0700

KAFKA-8412: Fix nullpointer exception thrown on flushing before closing 
producers (#7207)

Prior to this change an NPE is raised when calling AssignedTasks.close
under the following conditions:

1. EOS is enabled
2. The task was in a suspended state

The cause for the NPE is that when a clean close is requested for a
StreamTask the StreamTask tries to commit. However, in the suspended
state there is no producer so ultimately an NPE is thrown for the
contained RecordCollector in flush.

The fix put forth in this commit is to have AssignedTasks call
closeSuspended when it knows the underlying StreamTask is suspended.

Note also that this test is quite involved. I could have just tested
that AssignedTasks calls closeSuspended when appropriate, but that is
testing, IMO, a detail of the implementation and doesn't actually verify
we reproduced the original problem as it was described. I feel much more
confident that we are reproducing the behavior - and we can test exactly
the conditions that lead to it - when testing across AssignedTasks and
StreamTask. I believe this is an additional support for the argument of
eventually consolidating the state split across classes.

Reviewers: Matthias J. Sax , Guozhang Wang 

---
 checkstyle/import-control.xml  |   2 +
 .../streams/processor/internals/AssignedTasks.java |  17 ++-
 .../internals/AssignedStreamsTasksTest.java| 129 +++--
 .../processor/internals/StreamTaskTest.java|  10 +-
 4 files changed, 136 insertions(+), 22 deletions(-)

diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index a76fd1d..dfaa3f6 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -266,8 +266,10 @@
   
 
 
+
 
 
+
 
   
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index a9baa3f..6a39df1 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -332,18 +332,23 @@ abstract class AssignedTasks {
 
 void close(final boolean clean) {
 final AtomicReference firstException = new 
AtomicReference<>(null);
-for (final T task : allTasks()) {
+
+for (final T task: allTasks()) {
 try {
-task.close(clean, false);
+if (suspended.containsKey(task.id())) {
+task.closeSuspended(clean, false, null);
+} else {
+task.close(clean, false);
+}
 } catch (final TaskMigratedException e) {
 log.info("Failed to close {} {} since it got migrated to 
another thread already. " +
-"Closing it as zombie and move on.", taskTypeName, 
task.id());
+"Closing it as zombie and move on.", taskTypeName, 
task.id());
 firstException.compareAndSet(null, closeZombieTask(task));
 } catch (final RuntimeException t) {
 log.error("Failed while closing {} {} due to the following 
error:",
-  task.getClass().getSimpleName(),
-  task.id(),
-  t);
+task.getClass().getSimpleName(),
+task.id(),
+t);
 if (clean) {
 if (!closeUnclean(task)) {
 firstException.compareAndSet(null, t);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
index ffd0f8b..ca51a3b 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
@@ -17,27 +17,42 @@
 
 package org.apache.kafka.streams.processor.internals;
 
+import static org.hamcrest.CoreMatchers.not

[kafka] branch trunk updated (d7f8ec8 -> 7334222)

2019-08-26 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.


from d7f8ec8  MINOR: Fix the doc of scheduled.rebalance.max.delay.ms config 
property (#7242)
 add 7334222  KAFKA-8412: Fix nullpointer exception thrown on flushing 
before closing producers (#7207)

No new revisions were added by this update.

Summary of changes:
 checkstyle/import-control.xml  |   2 +
 .../streams/processor/internals/AssignedTasks.java |  17 ++-
 .../internals/AssignedStreamsTasksTest.java| 129 +++--
 .../processor/internals/StreamTaskTest.java|  10 +-
 4 files changed, 136 insertions(+), 22 deletions(-)



[kafka] branch trunk updated (c1f2b0f -> c6664e1)

2019-08-23 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.


from c1f2b0f  KAFKA-8753; Expose controller topic deletion metrics 
(KIP-503) (#7156)
 add c6664e1  MINOR: Move the resetting from revoked to the thread loop 
(#7243)

No new revisions were added by this update.

Summary of changes:
 .../org/apache/kafka/streams/processor/internals/StreamThread.java | 7 ++-
 1 file changed, 2 insertions(+), 5 deletions(-)



[kafka] branch 2.3 updated: KAFKA-8824: bypass value serde on null (#7235)

2019-08-22 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
 new 919  KAFKA-8824: bypass value serde on null (#7235)
919 is described below

commit 9195c2685cac3ac6788fad42076a795f8085
Author: John Roesler 
AuthorDate: Thu Aug 22 15:35:33 2019 -0500

KAFKA-8824: bypass value serde on null (#7235)

In a KTable context, we should not pass null into a user-supplied serde.

Testing: I verified that the change to the test results in test failures 
without the patch.

Reviewers: Matthias J. Sax , Bill Bejeck 
, Guozhang Wang ,
---
 .../state/internals/InMemoryTimeOrderedKeyValueBuffer.java |  2 +-
 .../state/internals/TimeOrderedKeyValueBufferTest.java | 14 +-
 2 files changed, 14 insertions(+), 2 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
index 6c5022f..14a41ea 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
@@ -448,7 +448,7 @@ public final class InMemoryTimeOrderedKeyValueBuffer 
implements TimeOrdere
 final byte[] serializedPriorValue;
 if (buffered == null) {
 final V priorValue = value.oldValue;
-serializedPriorValue = 
valueSerde.innerSerde().serializer().serialize(changelogTopic, priorValue);
+serializedPriorValue = (priorValue == null) ? null : 
valueSerde.innerSerde().serializer().serialize(changelogTopic, priorValue);
 } else {
 serializedPriorValue = buffered.priorValue();
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
index 941832b..e8a62d0 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
@@ -24,6 +24,8 @@ import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
@@ -68,6 +70,16 @@ public class TimeOrderedKeyValueBufferTest bufferSupplier;
 private final String testName;
 
+public static final class NullRejectingStringSerializer extends 
StringSerializer {
+@Override
+public byte[] serialize(final String topic, final String data) {
+if (data == null) {
+throw new IllegalArgumentException();
+}
+return super.serialize(topic, data);
+}
+}
+
 // As we add more buffer implementations/configurations, we can add them 
here
 @Parameterized.Parameters(name = "{index}: test={0}")
 public static Collection parameters() {
@@ -76,7 +88,7 @@ public class TimeOrderedKeyValueBufferTest>) name ->
 new InMemoryTimeOrderedKeyValueBuffer
-.Builder<>(name, Serdes.String(), Serdes.String())
+.Builder<>(name, Serdes.String(), Serdes.serdeFrom(new 
NullRejectingStringSerializer(), new StringDeserializer()))
 .build()
 }
 );



[kafka] branch trunk updated (e4215c1 -> e213608)

2019-08-22 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.


from e4215c1  KAFKA-8325; Remove batch from in-flight requests on 
MESSAGE_TOO_LARGE errors (#7176)
 add e213608  KAFKA-8824: bypass value serde on null (#7235)

No new revisions were added by this update.

Summary of changes:
 .../state/internals/InMemoryTimeOrderedKeyValueBuffer.java |  2 +-
 .../state/internals/TimeOrderedKeyValueBufferTest.java | 14 +-
 2 files changed, 14 insertions(+), 2 deletions(-)



[kafka] branch trunk updated (b8605c9 -> e2d16b5)

2019-08-16 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.


from b8605c9  KAFKA-8600: Use RPC generation for DescribeDelegationTokens 
protocol
 add e2d16b5  KAFKA-8802: ConcurrentSkipListMap shows performance 
regression in cache and in-memory store (#7212)

No new revisions were added by this update.

Summary of changes:
 .../state/internals/InMemoryKeyValueStore.java | 37 +++---
 .../kafka/streams/state/internals/NamedCache.java  | 17 ++
 .../kafka/streams/state/internals/ThreadCache.java | 24 +++---
 3 files changed, 42 insertions(+), 36 deletions(-)



[kafka] branch 2.3 updated: KAFKA-8802: ConcurrentSkipListMap shows performance regression in cache and in-memory store (#7212)

2019-08-16 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
 new 051d290  KAFKA-8802: ConcurrentSkipListMap shows performance 
regression in cache and in-memory store (#7212)
051d290 is described below

commit 051d29098ac7529f54475899c3441ed918a6882c
Author: A. Sophie Blee-Goldman 
AuthorDate: Fri Aug 16 10:41:30 2019 -0700

KAFKA-8802: ConcurrentSkipListMap shows performance regression in cache and 
in-memory store (#7212)

Reverts the TreeMap -> ConcurrentSkipListMap change that caused a 
performance regression in 2.3, and fixes the ConcurrentModificationException by 
copying (just) the key set to iterate over

Reviewers: Bill Bejeck , Guozhang Wang 

---
 .../state/internals/InMemoryKeyValueStore.java | 37 +++---
 .../kafka/streams/state/internals/NamedCache.java  | 17 ++
 .../kafka/streams/state/internals/ThreadCache.java | 24 +++---
 3 files changed, 42 insertions(+), 36 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
index 2d68214..ed3d024 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
@@ -17,8 +17,10 @@
 package org.apache.kafka.streams.state.internals;
 
 import java.util.List;
-import java.util.concurrent.ConcurrentNavigableMap;
-import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
@@ -27,13 +29,12 @@ import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 
 import java.util.Iterator;
-import java.util.Map;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class InMemoryKeyValueStore implements KeyValueStore {
 private final String name;
-private final ConcurrentNavigableMap map = new 
ConcurrentSkipListMap<>();
+private final NavigableMap map = new TreeMap<>();
 private volatile boolean open = false;
 private long size = 0L; // SkipListMap#size is O(N) so we just do our best 
to track it
 
@@ -71,12 +72,12 @@ public class InMemoryKeyValueStore implements 
KeyValueStore {
 }
 
 @Override
-public byte[] get(final Bytes key) {
+public synchronized byte[] get(final Bytes key) {
 return map.get(key);
 }
 
 @Override
-public void put(final Bytes key, final byte[] value) {
+public synchronized void put(final Bytes key, final byte[] value) {
 if (value == null) {
 size -= map.remove(key) == null ? 0 : 1;
 } else {
@@ -85,7 +86,7 @@ public class InMemoryKeyValueStore implements 
KeyValueStore {
 }
 
 @Override
-public byte[] putIfAbsent(final Bytes key, final byte[] value) {
+public synchronized byte[] putIfAbsent(final Bytes key, final byte[] 
value) {
 final byte[] originalValue = get(key);
 if (originalValue == null) {
 put(key, value);
@@ -101,14 +102,14 @@ public class InMemoryKeyValueStore implements 
KeyValueStore {
 }
 
 @Override
-public byte[] delete(final Bytes key) {
+public synchronized byte[] delete(final Bytes key) {
 final byte[] oldValue = map.remove(key);
 size -= oldValue == null ? 0 : 1;
 return oldValue;
 }
 
 @Override
-public KeyValueIterator range(final Bytes from, final Bytes 
to) {
+public synchronized KeyValueIterator range(final Bytes 
from, final Bytes to) {
 
 if (from.compareTo(to) > 0) {
 LOG.warn("Returning empty iterator for fetch with invalid key 
range: from > to. "
@@ -119,14 +120,14 @@ public class InMemoryKeyValueStore implements 
KeyValueStore {
 
 return new DelegatingPeekingKeyValueIterator<>(
 name,
-new InMemoryKeyValueIterator(map.subMap(from, true, to, 
true).entrySet().iterator()));
+new InMemoryKeyValueIterator(map.subMap(from, true, to, 
true).keySet()));
 }
 
 @Override
-public KeyValueIterator all() {
+public synchronized KeyValueIterator all() {
 return new DelegatingPeekingKeyValueIterator<>(
 name,
-new InMemoryKeyValueIterator(map.entrySet().iterator()));
+new InMemoryKeyValueIterator(map.keySet()));
 }
 
 @Override
@@ -146,11 +147,11 @@ public class InMemoryKeyValueStore implements 
KeyValueStore {
 open 

[kafka] branch trunk updated: MINOR: remove unnecessary #remove overrides (#7178)

2019-08-13 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new e07b46d  MINOR: remove unnecessary #remove overrides (#7178)
e07b46d is described below

commit e07b46dd0424717829f5e12fb0fa91065b943fcd
Author: A. Sophie Blee-Goldman 
AuthorDate: Tue Aug 13 16:30:48 2019 -0700

MINOR: remove unnecessary #remove overrides (#7178)

Iterator#remove has a default implementation that throws 
UnsupportedOperatorException so there's no need to override it with the same 
thing.

Should be cherry-picked back to whenever we switched to Java 8

Reviewers: Bill Bejeck , Matthias J. Sax 
, Guozhang Wang 
---
 .../apache/kafka/streams/state/KeyValueIterator.java |  3 ++-
 .../AbstractMergedSortedCacheStoreIterator.java  |  5 -
 .../state/internals/CompositeKeyValueIterator.java   |  4 
 .../internals/DelegatingPeekingKeyValueIterator.java |  5 -
 .../state/internals/FilteredCacheIterator.java   |  9 -
 .../state/internals/InMemoryKeyValueStore.java   |  5 -
 .../streams/state/internals/KeyValueIterators.java   |  3 ---
 .../state/internals/MemoryNavigableLRUCache.java |  5 -
 .../state/internals/MeteredKeyValueStore.java|  5 -
 .../state/internals/MeteredWindowStoreIterator.java  |  5 -
 .../internals/MeteredWindowedKeyValueIterator.java   |  5 -
 .../state/internals/RocksDBTimestampedStore.java |  5 -
 .../streams/state/internals/RocksDbIterator.java |  5 -
 .../streams/state/internals/SegmentIterator.java |  6 ++
 .../kafka/streams/state/internals/ThreadCache.java   |  5 -
 .../state/internals/WindowStoreIteratorWrapper.java  | 10 --
 .../state/internals/WrappedSessionStoreIterator.java |  5 -
 .../apache/kafka/streams/state/NoOpWindowStore.java  |  4 
 .../state/internals/ReadOnlyWindowStoreStub.java | 20 
 .../kafka/test/GenericInMemoryKeyValueStore.java |  5 -
 .../GenericInMemoryTimestampedKeyValueStore.java |  5 -
 .../org/apache/kafka/test/KeyValueIteratorStub.java  |  4 
 .../apache/kafka/test/ReadOnlySessionStoreStub.java  |  4 
 23 files changed, 4 insertions(+), 128 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java 
b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java
index 70a142b..b1f5e2c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java
@@ -23,9 +23,10 @@ import java.util.Iterator;
 
 /**
  * Iterator interface of {@link KeyValue}.
- *
+ * 
  * Users must call its {@code close} method explicitly upon completeness to 
release resources,
  * or use try-with-resources statement (available since JDK7) for this {@link 
Closeable} class.
+ * Note that {@code remove()} is not supported.
  *
  * @param  Type of keys
  * @param  Type of values
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java
index dfcd763..16bdbeb 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java
@@ -148,11 +148,6 @@ abstract class AbstractMergedSortedCacheStoreIterator implements K
 }
 
 @Override
-public void remove() {
-throw new UnsupportedOperationException("remove() is not supported in 
" + getClass().getName());
-}
-
-@Override
 public void close() {
 cacheIterator.close();
 storeIterator.close();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeKeyValueIterator.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeKeyValueIterator.java
index faccc16..4ac6fee 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeKeyValueIterator.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeKeyValueIterator.java
@@ -67,8 +67,4 @@ class CompositeKeyValueIterator implements 
KeyValueIterator 
implements KeyValueIterator
 }
 
 @Override
-public void remove() {
-throw new UnsupportedOperationException("remove() is not supported in 
" + getClass().getName());
-}
-
-@Override
 public KeyValue peekNext() {
 if (!hasNext()) {
 throw new NoSuchElementException();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/FilteredCacheIterator.java
 
b/streams/src/main/java/org/apache

[kafka] branch trunk updated: KAFKA-8179: Part 3, Add PartitionsLost API for resetGenerations and metadata/subscription change (#6884)

2019-08-08 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new e867a58  KAFKA-8179: Part 3, Add PartitionsLost API for 
resetGenerations and metadata/subscription change (#6884)
e867a58 is described below

commit e867a58425876767b952e06892c72b5e13066acc
Author: Guozhang Wang 
AuthorDate: Thu Aug 8 14:31:22 2019 -0700

KAFKA-8179: Part 3, Add PartitionsLost API for resetGenerations and 
metadata/subscription change (#6884)

1. Add onPartitionsLost into the RebalanceListener, which will be triggered 
when the consumer found that the generation is reset due to fatal errors in 
response handling.

2. Semantical behavior change: with COOPERATIVE protocol, if the revoked / 
lost partitions are empty, do not trigger the corresponding callback at all. 
For added partitions though, even if it is empty we would still trigger the 
callback as a way to notify the rebalance event; with EAGER protocol, revoked / 
assigned callbacks are always triggered.

The ordering of the callback would be the following:

a. Callback onPartitionsRevoked / onPartitionsLost triggered.
b. Update the assignment (both revoked and added).
c. Callback onPartitionsAssigned triggered.

In this way we are assured that users can still access the partitions being 
revoked, whereas they can also access the partitions being added.

3. Semantical behavior change (KAFKA-4600): if the rebalance listener 
throws an exception, pass it along all the way to the consumer.poll caller, but 
still completes the rest of the actions. Also, the newly assigned partitions 
list does not gets affected with exception thrown since it is just for 
notifying the users.

4. Semantical behavior change: the ConsumerCoordinator would not try to 
modify assignor's returned assignments, instead it will validate that 
assignments and set the error code accordingly: if there are overlaps between 
added / revoked partitions, it is a fatal error and would be communicated to 
all members to stop; if revoked is not empty, it is an error indicate re-join; 
otherwise, it is normal.

5. Minor: with the error code removed from the Assignment, 
ConsumerCoordinator will request re-join if the revoked partitions list is not 
empty.

6. Updated ConsumerCoordinatorTest accordingly. Also found a minor bug in 
MetadataUpdate that removed topic would still be retained with null value of 
num.partitions.

6. Updated a few other flaky tests that are exposed due to this change.

Reviewers: John Roesler , A. Sophie 
Blee-Goldman , Jason Gustafson 
---
 .../consumer/ConsumerPartitionAssignor.java|  17 ++
 .../consumer/ConsumerRebalanceListener.java|  98 +-
 .../kafka/clients/consumer/KafkaConsumer.java  |  19 +-
 .../consumer/internals/AbstractCoordinator.java|  62 ++--
 .../consumer/internals/ConsumerCoordinator.java| 332 +++--
 .../consumer/internals/PartitionAssignor.java  |   1 -
 .../consumer/internals/SubscriptionState.java  |  55 ++--
 .../kafka/clients/consumer/KafkaConsumerTest.java  |  64 +++-
 .../clients/consumer/RoundRobinAssignorTest.java   |  20 +-
 .../internals/ConsumerCoordinatorTest.java | 184 
 .../consumer/internals/SubscriptionStateTest.java  |  40 ++-
 core/src/main/scala/kafka/tools/MirrorMaker.scala  |   2 +
 .../kafka/api/AbstractConsumerTest.scala   |  14 +-
 .../integration/kafka/api/ConsumerBounceTest.scala |  13 +-
 .../kafka/api/PlaintextConsumerTest.scala  |   8 +-
 .../org/apache/kafka/streams/KafkaStreams.java |   5 +-
 .../streams/processor/internals/StreamThread.java  |   5 +-
 .../StreamTableJoinIntegrationTest.java|   5 +-
 .../integration/utils/IntegrationTestUtils.java|  17 +-
 19 files changed, 686 insertions(+), 275 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java
index 07e153e..f9a4217 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java
@@ -180,6 +180,23 @@ public interface ConsumerPartitionAssignor {
 }
 }
 
+/**
+ * The rebalance protocol defines partition assignment and revocation 
semantics. The purpose is to establish a
+ * consistent set of rules that all consumers in a group follow in order 
to transfer ownership of a partition.
+ * {@link ConsumerPartitionAssignor} implementors can claim supporting one 
or more rebalance protocols via the
+ * {@link ConsumerPartitionAssignor#supportedProtocols()}, and it is their 
responsibility

[kafka] branch 1.1 updated: KAFKA-8602: Backport bugfix for standby task creation (#7147)

2019-08-07 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
 new f6316c3  KAFKA-8602: Backport bugfix for standby task creation (#7147)
f6316c3 is described below

commit f6316c39b9d2d9630066416d15eca7ad5cec99fb
Author: cadonna 
AuthorDate: Wed Aug 7 17:13:40 2019 +0200

KAFKA-8602: Backport bugfix for standby task creation (#7147)

Backports bugfix in standby task creation from PR #7008.
A separate PR is needed because some tests in the original PR
use topology optimizations and mocks that were introduced afterwards.

Reviewers: Bill Bejeck , Guozhang Wang 

---
 .../streams/processor/internals/StreamThread.java  |  20 ++-
 .../StandbyTaskCreationIntegrationTest.java| 189 +
 .../processor/internals/StreamThreadTest.java  | 133 +--
 3 files changed, 318 insertions(+), 24 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index a5199da..1cc 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -472,15 +472,17 @@ public class StreamThread extends Thread {
 
 final ProcessorTopology topology = 
builder.build(taskId.topicGroupId);
 
-if (!topology.stateStores().isEmpty()) {
-return new StandbyTask(taskId,
-   partitions,
-   topology,
-   consumer,
-   storeChangelogReader,
-   config,
-   streamsMetrics,
-   stateDirectory);
+if (!topology.stateStores().isEmpty() && 
!topology.storeToChangelogTopic().isEmpty()) {
+return new StandbyTask(
+taskId,
+partitions,
+topology,
+consumer,
+storeChangelogReader,
+config,
+streamsMetrics,
+stateDirectory
+);
 } else {
 log.trace("Skipped standby task {} with assigned partitions {} 
" +
 "since it does not have any state stores to materialize", 
taskId, partitions);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java
new file mode 100644
index 000..ed2781f
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.Consumed;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.State;
+import org.apache.kafka.streams.KafkaStreams.StateListener;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.kstream.Transformer;
+import org.apache.kafka.streams.kstream.TransformerSupplier;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ThreadMetadata;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.IntegrationTest;
+imp

[kafka] branch trunk updated: Minor: Refactor methods to add metrics to sensor in `StreamsMetricsImpl` (#7161)

2019-08-06 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 7663a6c  Minor: Refactor methods to add metrics to sensor in 
`StreamsMetricsImpl` (#7161)
7663a6c is described below

commit 7663a6c44daae5d72f38cbba79d728416e11167d
Author: cadonna 
AuthorDate: Tue Aug 6 17:51:08 2019 +0200

Minor: Refactor methods to add metrics to sensor in `StreamsMetricsImpl` 
(#7161)

Renames method names in StreamsMetricsImpl to make them consistent.

Reviewers: A. Sophie Blee-Goldman , Guozhang Wang 

---
 .../streams/kstream/internals/metrics/Sensors.java |  2 +-
 .../streams/processor/internals/ProcessorNode.java | 13 +++---
 .../internals/metrics/StreamsMetricsImpl.java  | 50 +++---
 .../processor/internals/metrics/ThreadMetrics.java | 30 ++---
 .../AbstractRocksDBSegmentedBytesStore.java|  4 +-
 .../state/internals/InMemorySessionStore.java  |  8 ++--
 .../state/internals/InMemoryWindowStore.java   |  4 +-
 .../streams/state/internals/metrics/Sensors.java   | 12 +++---
 .../internals/metrics/StreamsMetricsImplTest.java  | 20 -
 .../internals/metrics/ThreadMetricsTest.java   | 26 +--
 10 files changed, 85 insertions(+), 84 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java
index 038b8ac..363ec6e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java
@@ -44,7 +44,7 @@ public class Sensors {
 LATE_RECORD_DROP,
 Sensor.RecordingLevel.INFO
 );
-StreamsMetricsImpl.addInvocationRateAndCount(
+StreamsMetricsImpl.addInvocationRateAndCountToSensor(
 sensor,
 PROCESSOR_NODE_METRICS_GROUP,
 metrics.tagMap("task-id", context.taskId().toString(), 
PROCESSOR_NODE_ID_TAG, context.currentNode().name()),
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
index 01e3e56..bc66ede 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
@@ -33,8 +33,7 @@ import java.util.Set;
 
 import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESSOR_NODE_ID_TAG;
 import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESSOR_NODE_METRICS_GROUP;
-import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgMaxLatency;
-import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCount;
+import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgAndMaxLatencyToSensor;
 
 public class ProcessorNode {
 
@@ -232,12 +231,14 @@ public class ProcessorNode {

final Map taskTags,

final Map nodeTags) {
 final Sensor parent = metrics.taskLevelSensor(taskName, operation, 
Sensor.RecordingLevel.DEBUG);
-addAvgMaxLatency(parent, PROCESSOR_NODE_METRICS_GROUP, taskTags, 
operation);
-addInvocationRateAndCount(parent, PROCESSOR_NODE_METRICS_GROUP, 
taskTags, operation);
+addAvgAndMaxLatencyToSensor(parent, PROCESSOR_NODE_METRICS_GROUP, 
taskTags, operation);
+StreamsMetricsImpl
+.addInvocationRateAndCountToSensor(parent, 
PROCESSOR_NODE_METRICS_GROUP, taskTags, operation);
 
 final Sensor sensor = metrics.nodeLevelSensor(taskName, 
processorNodeName, operation, Sensor.RecordingLevel.DEBUG, parent);
-addAvgMaxLatency(sensor, PROCESSOR_NODE_METRICS_GROUP, nodeTags, 
operation);
-addInvocationRateAndCount(sensor, PROCESSOR_NODE_METRICS_GROUP, 
nodeTags, operation);
+addAvgAndMaxLatencyToSensor(sensor, PROCESSOR_NODE_METRICS_GROUP, 
nodeTags, operation);
+StreamsMetricsImpl
+.addInvocationRateAndCountToSensor(sensor, 
PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation);
 
 return sensor;
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
index ae3d953..5ac2f33 100644
--- 
a/streams/src/mai

[kafka] branch trunk updated: KAFKA-8578: Add basic functionality to expose RocksDB metrics (#6979)

2019-08-02 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new a7d0fdd  KAFKA-8578: Add basic functionality to expose RocksDB metrics 
(#6979)
a7d0fdd is described below

commit a7d0fdd534ef55533a868ea7388bbc081ee42718
Author: cadonna 
AuthorDate: Fri Aug 2 18:51:03 2019 +0200

KAFKA-8578: Add basic functionality to expose RocksDB metrics (#6979)

* Adds RocksDBMetrics class that provides methods to get sensors from the 
Kafka metrics registry and to setup the sensors to record RocksDB metrics

* Extends StreamsMetricsImpl with functionality to add the required metrics 
to the sensors.

Reviewers: Boyang Chen , Bill Bejeck 
, Matthias J. Sax , John Roesler 
, Guozhang Wang 
---
 .../internals/metrics/StreamsMetricsImpl.java  |  83 -
 .../state/internals/metrics/RocksDBMetrics.java| 382 +
 .../internals/metrics/StreamsMetricsImplTest.java  | 113 +-
 .../internals/metrics/RocksDBMetricsTest.java  | 283 +++
 4 files changed, 852 insertions(+), 9 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
index b6bfcc5..ae3d953 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
@@ -23,9 +23,12 @@ import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
 import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.CumulativeCount;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
 import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.metrics.stats.Value;
 import org.apache.kafka.common.metrics.stats.WindowedCount;
+import org.apache.kafka.common.metrics.stats.WindowedSum;
 import org.apache.kafka.streams.StreamsMetrics;
 
 import java.util.Arrays;
@@ -54,17 +57,21 @@ public class StreamsMetricsImpl implements StreamsMetrics {
 
 public static final String THREAD_ID_TAG = "client-id";
 public static final String TASK_ID_TAG = "task-id";
+public static final String STORE_ID_TAG = "state-id";
 
 public static final String ALL_TASKS = "all";
 
 public static final String LATENCY_SUFFIX = "-latency";
 public static final String AVG_SUFFIX = "-avg";
 public static final String MAX_SUFFIX = "-max";
+public static final String MIN_SUFFIX = "-min";
 public static final String RATE_SUFFIX = "-rate";
 public static final String TOTAL_SUFFIX = "-total";
+public static final String RATIO_SUFFIX = "-ratio";
 
 public static final String THREAD_LEVEL_GROUP = "stream-metrics";
 public static final String TASK_LEVEL_GROUP = "stream-task-metrics";
+public static final String STATE_LEVEL_GROUP = "stream-state-metrics";
 
 public static final String PROCESSOR_NODE_METRICS_GROUP = 
"stream-processor-node-metrics";
 public static final String PROCESSOR_NODE_ID_TAG = "processor-node-id";
@@ -123,6 +130,18 @@ public class StreamsMetricsImpl implements StreamsMetrics {
 }
 }
 
+public Map taskLevelTagMap(final String taskName) {
+final Map tagMap = threadLevelTagMap();
+tagMap.put(TASK_ID_TAG, taskName);
+return tagMap;
+}
+
+public Map storeLevelTagMap(final String taskName, final 
String storeType, final String storeName) {
+final Map tagMap = taskLevelTagMap(taskName);
+tagMap.put(storeType + "-" + STORE_ID_TAG, storeName);
+return tagMap;
+}
+
 public final Sensor taskLevelSensor(final String taskName,
 final String sensorName,
 final RecordingLevel recordingLevel,
@@ -237,9 +256,7 @@ public class StreamsMetricsImpl implements StreamsMetrics {
 if (!storeLevelSensors.containsKey(key)) {
 storeLevelSensors.put(key, new LinkedList<>());
 }
-
 final String fullSensorName = key + SENSOR_NAME_DELIMITER + 
sensorName;
-
 final Sensor sensor = metrics.sensor(fullSensorName, 
recordingLevel, parents);
 
 storeLevelSensors.get(key).push(fullSensorName);
@@ -454,12 +471,62 @@ public class StreamsMetricsImpl implements StreamsMetrics 
{
 

[kafka] branch trunk updated: KAFKA-8179: PartitionAssignorAdapter (#7110)

2019-07-31 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 6fbac3c   KAFKA-8179: PartitionAssignorAdapter (#7110)
6fbac3c is described below

commit 6fbac3cfa88bd3eba36b2d3e254d7ce5e11a550b
Author: A. Sophie Blee-Goldman 
AuthorDate: Wed Jul 31 13:53:38 2019 -0700

 KAFKA-8179: PartitionAssignorAdapter (#7110)

Follow up to new PartitionAssignor interface merged in 7108 is merged

Adds a PartitionAssignorAdapter class to maintain backwards compatibility

Reviewers: Boyang Chen , Jason Gustafson 
, Guozhang Wang 
---
 .../kafka/clients/admin/KafkaAdminClient.java  |   4 +-
 .../kafka/clients/consumer/ConsumerConfig.java |   2 +-
 .../consumer/ConsumerPartitionAssignor.java|  10 +-
 .../kafka/clients/consumer/KafkaConsumer.java  |   7 +-
 .../internals/AbstractPartitionAssignor.java   |   4 +-
 .../consumer/internals/ConsumerCoordinator.java|   3 +-
 .../consumer/internals/PartitionAssignor.java  |   3 +
 .../internals/PartitionAssignorAdapter.java| 136 
 .../internals/PartitionAssignorAdapterTest.java| 173 +
 docs/upgrade.html  |   2 +
 .../internals/StreamsPartitionAssignor.java|   4 +-
 11 files changed, 333 insertions(+), 15 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index f2ee21e..227a03b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -29,7 +29,7 @@ import 
org.apache.kafka.clients.admin.DeleteAclsResult.FilterResult;
 import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResults;
 import 
org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult.ReplicaLogDirInfo;
 import org.apache.kafka.clients.admin.internals.AdminMetadataManager;
-import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Assignment;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
 import org.apache.kafka.common.Cluster;
@@ -2724,7 +2724,7 @@ public class KafkaAdminClient extends AdminClient {
 for (DescribedGroupMember groupMember : members) {
 Set partitions = 
Collections.emptySet();
 if (groupMember.memberAssignment().length > 0) {
-final ConsumerPartitionAssignor.Assignment 
assignment = ConsumerProtocol.
+final Assignment assignment = ConsumerProtocol.
 
deserializeAssignment(ByteBuffer.wrap(groupMember.memberAssignment()));
 partitions = new 
HashSet<>(assignment.partitions());
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 8a18bd5..2e4507a 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -102,7 +102,7 @@ public class ConsumerConfig extends AbstractConfig {
  * partition.assignment.strategy
  */
 public static final String PARTITION_ASSIGNMENT_STRATEGY_CONFIG = 
"partition.assignment.strategy";
-private static final String PARTITION_ASSIGNMENT_STRATEGY_DOC = "The class 
name or class type of the assignor implementing the partition assignment 
strategy that the client will use to distribute partition ownership amongst 
consumer instances when group management is used. A custom assignor that 
implements ConsumerPartitionAssignor can be plugged in";
+private static final String PARTITION_ASSIGNMENT_STRATEGY_DOC = "A list of 
class names or class types, ordered by preference, of supported assignors 
responsible for the partition assignment strategy that the client will use to 
distribute partition ownership amongst consumer instances when group management 
is used. Implementing the 
org.apache.kafka.clients.consumer.ConsumerPartitionAssignor 
interface allows you to plug in a custom assignment strategy.";
 
 /**
  * auto.offset.reset
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java
index 72d5d6e..07e153e 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java

[kafka] branch trunk updated: MINOR: Refactor abstractConfig#configuredInstance (#7129)

2019-07-30 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 73ed9ea  MINOR: Refactor abstractConfig#configuredInstance (#7129)
73ed9ea is described below

commit 73ed9eac2c875a12d933af48b7a7817ea57ad447
Author: Guozhang Wang 
AuthorDate: Tue Jul 30 13:35:02 2019 -0700

MINOR: Refactor abstractConfig#configuredInstance (#7129)

Reviewers: Jason Gustafson 
---
 .../apache/kafka/common/config/AbstractConfig.java | 50 +++---
 1 file changed, 26 insertions(+), 24 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java 
b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index a48856f..832837d 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -355,6 +355,29 @@ public class AbstractConfig {
 log.warn("The configuration '{}' was supplied but isn't a known 
config.", key);
 }
 
+private  T getConfiguredInstance(Object klass, Class t, Map configPairs) {
+if (klass == null)
+return null;
+
+Object o;
+if (klass instanceof String) {
+try {
+o = Utils.newInstance((String) klass, t);
+} catch (ClassNotFoundException e) {
+throw new KafkaException("Class " + klass + " cannot be 
found", e);
+}
+} else if (klass instanceof Class) {
+o = Utils.newInstance((Class) klass);
+} else
+throw new KafkaException("Unexpected element of type " + 
klass.getClass().getName() + ", expected String or Class");
+if (!t.isInstance(o))
+throw new KafkaException(klass + " is not an instance of " + 
t.getName());
+if (o instanceof Configurable)
+((Configurable) o).configure(configPairs);
+
+return t.cast(o);
+}
+
 /**
  * Get a configured instance of the give class specified by the given 
configuration key. If the object implements
  * Configurable configure it using the configuration.
@@ -365,14 +388,8 @@ public class AbstractConfig {
  */
 public  T getConfiguredInstance(String key, Class t) {
 Class c = getClass(key);
-if (c == null)
-return null;
-Object o = Utils.newInstance(c);
-if (!t.isInstance(o))
-throw new KafkaException(c.getName() + " is not an instance of " + 
t.getName());
-if (o instanceof Configurable)
-((Configurable) o).configure(originals());
-return t.cast(o);
+
+return getConfiguredInstance(c, t, originals());
 }
 
 /**
@@ -400,7 +417,6 @@ public class AbstractConfig {
 return getConfiguredInstances(getList(key), t, configOverrides);
 }
 
-
 /**
  * Get a list of configured instances of the given class specified by the 
given configuration key. The configuration
  * may specify either null or an empty string to indicate no configured 
instances. In both cases, this method
@@ -417,21 +433,7 @@ public class AbstractConfig {
 Map configPairs = originals();
 configPairs.putAll(configOverrides);
 for (Object klass : classNames) {
-Object o;
-if (klass instanceof String) {
-try {
-o = Utils.newInstance((String) klass, t);
-} catch (ClassNotFoundException e) {
-throw new KafkaException(klass + " ClassNotFoundException 
exception occurred", e);
-}
-} else if (klass instanceof Class) {
-o = Utils.newInstance((Class) klass);
-} else
-throw new KafkaException("List contains element of type " + 
klass.getClass().getName() + ", expected String or Class");
-if (!t.isInstance(o))
-throw new KafkaException(klass + " is not an instance of " + 
t.getName());
-if (o instanceof Configurable)
-((Configurable) o).configure(configPairs);
+Object o = getConfiguredInstance(klass, t, configPairs);
 objects.add(t.cast(o));
 }
 return objects;



[kafka] branch trunk updated: KAFKA-8179: add public ConsumerPartitionAssignor interface (#7108)

2019-07-25 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 69d86a1  KAFKA-8179: add public ConsumerPartitionAssignor interface 
(#7108)
69d86a1 is described below

commit 69d86a197f86ad4c6f1636b5ab4678907e30a4c0
Author: A. Sophie Blee-Goldman 
AuthorDate: Thu Jul 25 13:02:09 2019 -0700

KAFKA-8179: add public ConsumerPartitionAssignor interface (#7108)

Main changes of this PR:

* Deprecate old consumer.internal.PartitionAssignor and add public 
consumer.ConsumerPartitionAssignor with all OOTB assignors migrated to new 
interface
* Refactor assignor's assignment/subscription related classes for easier to 
evolve API
* Removed version number from classes as it is only needed for 
serialization/deserialization
* Other previously-discussed cleanup included in this PR:

* Remove Assignment.error added in pt 1
* Remove ConsumerCoordinator#adjustAssignment added in pt 2

Reviewers: Boyang Chen , Jason Gustafson 
, Guozhang Wang 
---
 .../kafka/clients/admin/KafkaAdminClient.java  |   4 +-
 .../kafka/clients/consumer/ConsumerConfig.java |   2 +-
 .../clients/consumer/ConsumerGroupMetadata.java|  50 +
 ...ssignor.java => ConsumerPartitionAssignor.java} | 210 -
 .../kafka/clients/consumer/KafkaConsumer.java  |   7 +-
 .../kafka/clients/consumer/StickyAssignor.java |  11 +-
 .../internals/AbstractPartitionAssignor.java   |  18 +-
 .../consumer/internals/ConsumerCoordinator.java|  86 +++--
 .../consumer/internals/ConsumerProtocol.java   |  95 --
 .../consumer/internals/PartitionAssignor.java  | 142 +-
 .../consumer/internals/SubscriptionState.java  |   8 +
 .../kafka/clients/admin/KafkaAdminClientTest.java  |   6 +-
 .../kafka/clients/consumer/KafkaConsumerTest.java  |  69 ---
 .../kafka/clients/consumer/RangeAssignorTest.java  |   2 +-
 .../clients/consumer/RoundRobinAssignorTest.java   |   2 +-
 .../kafka/clients/consumer/StickyAssignorTest.java |   2 +-
 .../internals/ConsumerCoordinatorTest.java |  31 +--
 .../consumer/internals/ConsumerProtocolTest.java   |  45 +
 .../group/GroupMetadataManagerTest.scala   |   2 +-
 .../internals/StreamsPartitionAssignor.java|  20 +-
 .../internals/StreamsPartitionAssignorTest.java| 126 +++--
 .../kafka/streams/tests/StreamsUpgradeTest.java|  21 ++-
 22 files changed, 363 insertions(+), 596 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 349fc2a..f2ee21e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -29,9 +29,9 @@ import 
org.apache.kafka.clients.admin.DeleteAclsResult.FilterResult;
 import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResults;
 import 
org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult.ReplicaLogDirInfo;
 import org.apache.kafka.clients.admin.internals.AdminMetadataManager;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
-import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.ConsumerGroupState;
 import org.apache.kafka.common.ElectionType;
@@ -2724,7 +2724,7 @@ public class KafkaAdminClient extends AdminClient {
 for (DescribedGroupMember groupMember : members) {
 Set partitions = 
Collections.emptySet();
 if (groupMember.memberAssignment().length > 0) {
-final PartitionAssignor.Assignment assignment = 
ConsumerProtocol.
+final ConsumerPartitionAssignor.Assignment 
assignment = ConsumerProtocol.
 
deserializeAssignment(ByteBuffer.wrap(groupMember.memberAssignment()));
 partitions = new 
HashSet<>(assignment.partitions());
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 7eb34d4..8a18bd5 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -102,7 +102,7 @@ public class ConsumerConfig extends AbstractConfig {
  * partition.assignment.strategy
  */
 public static final String PARTITION_ASSIGNMENT_STR

[kafka] branch trunk updated: KAFKA-8696: clean up Sum/Count/Total metrics (#7057)

2019-07-23 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new a8aedc8  KAFKA-8696: clean up Sum/Count/Total metrics (#7057)
a8aedc8 is described below

commit a8aedc85ebfadcf1472acafe2e0311a73d3040be
Author: John Roesler 
AuthorDate: Tue Jul 23 18:54:20 2019 -0500

KAFKA-8696: clean up Sum/Count/Total metrics (#7057)

* Clean up one redundant and one misplaced metric
* Clarify the relationship among these metrics to avoid future confusion

Reviewers: Matthias J. Sax , Bill Bejeck 
, Guozhang Wang 
---
 .../consumer/internals/AbstractCoordinator.java|  4 +-
 .../kafka/clients/consumer/internals/Fetcher.java  |  4 +-
 .../kafka/common/metrics/MeasurableStat.java   |  2 +-
 .../apache/kafka/common/metrics/stats/Count.java   | 30 +++---
 .../common/metrics/stats}/CumulativeCount.java | 22 +++-
 .../stats/{Total.java => CumulativeSum.java}   | 19 ---
 .../apache/kafka/common/metrics/stats/Meter.java   | 24 
 .../apache/kafka/common/metrics/stats/Rate.java| 27 ++---
 .../org/apache/kafka/common/metrics/stats/Sum.java | 30 +++---
 .../apache/kafka/common/metrics/stats/Total.java   | 36 +++-
 .../stats/{Count.java => WindowedCount.java}   | 26 +++--
 .../metrics/stats/{Sum.java => WindowedSum.java}   | 11 ++--
 .../org/apache/kafka/common/network/Selector.java  | 18 +++---
 .../kafka/common/metrics/JmxReporterTest.java  | 14 ++---
 .../kafka/common/metrics/KafkaMbeanTest.java   |  8 +--
 .../apache/kafka/common/metrics/MetricsTest.java   | 64 +++---
 .../apache/kafka/common/metrics/SensorTest.java|  4 +-
 .../kafka/common/metrics/stats/MeterTest.java  |  2 +-
 .../java/org/apache/kafka/test/MetricsBench.java   |  4 +-
 .../org/apache/kafka/connect/runtime/Worker.java   | 14 ++---
 .../kafka/connect/runtime/WorkerSinkTask.java  | 10 ++--
 .../kafka/connect/runtime/WorkerSourceTask.java|  6 +-
 .../runtime/distributed/DistributedHerder.java |  4 +-
 .../runtime/errors/ErrorHandlingMetrics.java   | 16 +++---
 .../main/scala/kafka/network/SocketServer.scala|  5 +-
 .../scala/kafka/server/ClientQuotaManager.scala|  4 +-
 .../streams/kstream/internals/metrics/Sensors.java |  8 +--
 .../streams/processor/internals/StreamTask.java|  8 +--
 .../internals/metrics/StreamsMetricsImpl.java  |  5 +-
 .../processor/internals/RecordCollectorTest.java   |  4 +-
 .../processor/internals/StandbyTaskTest.java   |  4 +-
 .../apache/kafka/streams/TopologyTestDriver.java   |  8 +--
 32 files changed, 185 insertions(+), 260 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index d5faa6e..b92a4a6 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -42,9 +42,9 @@ import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
-import org.apache.kafka.common.metrics.stats.Count;
 import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.common.metrics.stats.Meter;
+import org.apache.kafka.common.metrics.stats.WindowedCount;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.FindCoordinatorRequest;
 import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
@@ -961,7 +961,7 @@ public abstract class AbstractCoordinator implements 
Closeable {
 }
 
 protected Meter createMeter(Metrics metrics, String groupName, String 
baseName, String descriptiveName) {
-return new Meter(new Count(),
+return new Meter(new WindowedCount(),
 metrics.metricName(baseName + "-rate", groupName,
 String.format("The number of %s per second", 
descriptiveName)),
 metrics.metricName(baseName + "-total", groupName,
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 08ab9fb..d4d028d 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -47,10 +47,10 @@ import org.apache.kafka.common.metrics.Gauge;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.

[kafka] branch trunk updated: KAFKA-8392: Fix old metrics leakage by brokers that have no leadership over any partition for a topic (#6977)

2019-07-19 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new aa8e850  KAFKA-8392: Fix old metrics leakage by brokers that have no 
leadership over any partition for a topic (#6977)
aa8e850 is described below

commit aa8e850afb7baa447882b493eae29f99a39e35a0
Author: Tu V. Tran 
AuthorDate: Fri Jul 19 15:25:18 2019 -0700

KAFKA-8392: Fix old metrics leakage by brokers that have no leadership over 
any partition for a topic (#6977)

* Added removeOldLeaderMetrics in BrokerTopicStats to remove 
MessagesInPerSec, BytesInPerSec, BytesOutPerSec for any broker that is no 
longer a leader of any partition for a particular topic

* Modified ReplicaManager to remove the metrics of any topic that the 
current broker has no leadership (meaning the broker either becomes a follower 
for all of the partitions in that topic or stops being a replica)

Reviewers: Guozhang Wang , Jun Rao 
---
 .../scala/kafka/server/KafkaRequestHandler.scala   |  99 ++
 .../main/scala/kafka/server/ReplicaManager.scala   |  13 ++
 .../scala/unit/kafka/metrics/MetricsTest.scala |  10 +-
 .../unit/kafka/server/ReplicaManagerTest.scala | 148 +
 4 files changed, 239 insertions(+), 31 deletions(-)

diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala 
b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
index e0ad1b6..0397795 100755
--- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
+++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
@@ -146,38 +146,63 @@ class BrokerTopicMetrics(name: Option[String]) extends 
KafkaMetricsGroup {
 case Some(topic) => Map("topic" -> topic)
   }
 
-  val messagesInRate = newMeter(BrokerTopicStats.MessagesInPerSec, "messages", 
TimeUnit.SECONDS, tags)
-  val bytesInRate = newMeter(BrokerTopicStats.BytesInPerSec, "bytes", 
TimeUnit.SECONDS, tags)
-  val bytesOutRate = newMeter(BrokerTopicStats.BytesOutPerSec, "bytes", 
TimeUnit.SECONDS, tags)
-  val bytesRejectedRate = newMeter(BrokerTopicStats.BytesRejectedPerSec, 
"bytes", TimeUnit.SECONDS, tags)
-  private[server] val replicationBytesInRate =
-if (name.isEmpty) Some(newMeter(BrokerTopicStats.ReplicationBytesInPerSec, 
"bytes", TimeUnit.SECONDS, tags))
+  // an internal map for "lazy initialization" of certain metrics
+  private val metricTypeMap = new Pool[String, Meter]
+
+  def messagesInRate = 
metricTypeMap.getAndMaybePut(BrokerTopicStats.MessagesInPerSec,
+newMeter(BrokerTopicStats.MessagesInPerSec, "messages", TimeUnit.SECONDS, 
tags))
+  def bytesInRate = 
metricTypeMap.getAndMaybePut(BrokerTopicStats.BytesInPerSec,
+newMeter(BrokerTopicStats.BytesInPerSec, "bytes", TimeUnit.SECONDS, tags))
+  def bytesOutRate = 
metricTypeMap.getAndMaybePut(BrokerTopicStats.BytesOutPerSec,
+newMeter(BrokerTopicStats.BytesOutPerSec, "bytes", TimeUnit.SECONDS, tags))
+  def bytesRejectedRate = 
metricTypeMap.getAndMaybePut(BrokerTopicStats.BytesRejectedPerSec,
+newMeter(BrokerTopicStats.BytesRejectedPerSec, "bytes", TimeUnit.SECONDS, 
tags))
+  private[server] def replicationBytesInRate =
+if (name.isEmpty) Some(metricTypeMap.getAndMaybePut(
+  BrokerTopicStats.ReplicationBytesInPerSec,
+  newMeter(BrokerTopicStats.ReplicationBytesInPerSec, "bytes", 
TimeUnit.SECONDS, tags)))
 else None
-  private[server] val replicationBytesOutRate =
-if (name.isEmpty) 
Some(newMeter(BrokerTopicStats.ReplicationBytesOutPerSec, "bytes", 
TimeUnit.SECONDS, tags))
+  private[server] def replicationBytesOutRate =
+if (name.isEmpty) Some(metricTypeMap.getAndMaybePut(
+  BrokerTopicStats.ReplicationBytesOutPerSec,
+  newMeter(BrokerTopicStats.ReplicationBytesOutPerSec, "bytes", 
TimeUnit.SECONDS, tags)))
 else None
-  val failedProduceRequestRate = 
newMeter(BrokerTopicStats.FailedProduceRequestsPerSec, "requests", 
TimeUnit.SECONDS, tags)
-  val failedFetchRequestRate = 
newMeter(BrokerTopicStats.FailedFetchRequestsPerSec, "requests", 
TimeUnit.SECONDS, tags)
-  val totalProduceRequestRate = 
newMeter(BrokerTopicStats.TotalProduceRequestsPerSec, "requests", 
TimeUnit.SECONDS, tags)
-  val totalFetchRequestRate = 
newMeter(BrokerTopicStats.TotalFetchRequestsPerSec, "requests", 
TimeUnit.SECONDS, tags)
-  val fetchMessageConversionsRate = 
newMeter(BrokerTopicStats.FetchMessageConversionsPerSec, "requests", 
TimeUnit.SECONDS, tags)
-  val produceMessageConversionsRate = 
newMeter(BrokerTopicStats.ProduceMessageConversionsPerSec, "requests", 
TimeUnit.SECONDS, tags)
+  def failedProduceRequestRate = 
metricTypeMap.getAndMaybePut

[kafka] branch 2.1 updated: KAFKA-8615: Change to track partition time breaks TimestampExtractor (#7054)

2019-07-18 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
 new 772d764  KAFKA-8615: Change to track partition time breaks 
TimestampExtractor (#7054)
772d764 is described below

commit 772d7647fe3a7fe3c3ff65a0348f3aa61664ef43
Author: A. Sophie Blee-Goldman 
AuthorDate: Thu Jul 18 13:54:46 2019 -0700

KAFKA-8615: Change to track partition time breaks TimestampExtractor (#7054)

The timestamp extractor takes a previousTimestamp parameter which should be 
the partition time. This PR adds back in partition time tracking for the 
extractor, and renames previousTimestamp --> partitionTime

Reviewers: Guozhang Wang , Bill Bejeck 
, Matthias J. Sax 
---
 .../examples/pageview/JsonTimestampExtractor.java  |   2 +-
 .../processor/ExtractRecordMetadataTimestamp.java  |  10 +-
 .../streams/processor/FailOnInvalidTimestamp.java  |   4 +-
 .../processor/LogAndSkipOnInvalidTimestamp.java|   4 +-
 .../streams/processor/TimestampExtractor.java  |   4 +-
 .../UsePreviousTimeOnInvalidTimestamp.java |  10 +-
 .../processor/WallclockTimestampExtractor.java |   4 +-
 .../processor/internals/PartitionGroup.java|  36 ---
 .../streams/processor/internals/RecordQueue.java   |  23 -
 .../streams/processor/internals/StreamTask.java|  10 +-
 .../apache/kafka/streams/StreamsConfigTest.java|   2 +-
 .../processor/internals/PartitionGroupTest.java|   8 +-
 .../processor/internals/RecordQueueTest.java   | 106 ++---
 .../apache/kafka/test/MockTimestampExtractor.java  |   2 +-
 14 files changed, 166 insertions(+), 59 deletions(-)

diff --git 
a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java
 
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java
index 4f6257a..d760183 100644
--- 
a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java
+++ 
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java
@@ -27,7 +27,7 @@ import org.apache.kafka.streams.processor.TimestampExtractor;
 public class JsonTimestampExtractor implements TimestampExtractor {
 
 @Override
-public long extract(final ConsumerRecord record, final 
long previousTimestamp) {
+public long extract(final ConsumerRecord record, final 
long partitionTime) {
 if (record.value() instanceof PageViewTypedDemo.PageView) {
 return ((PageViewTypedDemo.PageView) record.value()).timestamp;
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/ExtractRecordMetadataTimestamp.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/ExtractRecordMetadataTimestamp.java
index 79c8dd3..3c7428a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/ExtractRecordMetadataTimestamp.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/ExtractRecordMetadataTimestamp.java
@@ -50,15 +50,15 @@ abstract class ExtractRecordMetadataTimestamp implements 
TimestampExtractor {
  * Extracts the embedded metadata timestamp from the given {@link 
ConsumerRecord}.
  *
  * @param record a data record
- * @param previousTimestamp the latest extracted valid timestamp of the 
current record's partition˙ (could be -1 if unknown)
+ * @param partitionTime the highest extracted valid timestamp of the 
current record's partition˙ (could be -1 if unknown)
  * @return the embedded metadata timestamp of the given {@link 
ConsumerRecord}
  */
 @Override
-public long extract(final ConsumerRecord record, final 
long previousTimestamp) {
+public long extract(final ConsumerRecord record, final 
long partitionTime) {
 final long timestamp = record.timestamp();
 
 if (timestamp < 0) {
-return onInvalidTimestamp(record, timestamp, previousTimestamp);
+return onInvalidTimestamp(record, timestamp, partitionTime);
 }
 
 return timestamp;
@@ -69,10 +69,10 @@ abstract class ExtractRecordMetadataTimestamp implements 
TimestampExtractor {
  *
  * @param record a data record
  * @param recordTimestamp the timestamp extractor from the record
- * @param previousTimestamp the latest extracted valid timestamp of the 
current record's partition˙ (could be -1 if unknown)
+ * @param partitionTime the highest extracted valid timestamp of the 
current record's partition˙ (could be -1 if unknown)
  * @return a new timestamp for the record (if negative, record will not be 
processed but dropped silently)
  */
 public abstract long onInvalidTimestamp(final ConsumerRecord record,
 final long recordTim

[kafka] branch 2.2 updated: KAFKA-8615: Change to track partition time breaks TimestampExtractor (#7054)

2019-07-18 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.2 by this push:
 new f1300c7  KAFKA-8615: Change to track partition time breaks 
TimestampExtractor (#7054)
f1300c7 is described below

commit f1300c71c7ba9d0ef6571949587923675109ba7b
Author: A. Sophie Blee-Goldman 
AuthorDate: Thu Jul 18 13:54:46 2019 -0700

KAFKA-8615: Change to track partition time breaks TimestampExtractor (#7054)

The timestamp extractor takes a previousTimestamp parameter which should be 
the partition time. This PR adds back in partition time tracking for the 
extractor, and renames previousTimestamp --> partitionTime

Reviewers: Guozhang Wang , Bill Bejeck 
, Matthias J. Sax 
---
 .../examples/pageview/JsonTimestampExtractor.java  |   2 +-
 .../processor/ExtractRecordMetadataTimestamp.java  |  10 +-
 .../streams/processor/FailOnInvalidTimestamp.java  |   4 +-
 .../processor/LogAndSkipOnInvalidTimestamp.java|   4 +-
 .../streams/processor/TimestampExtractor.java  |   4 +-
 .../UsePreviousTimeOnInvalidTimestamp.java |  10 +-
 .../processor/WallclockTimestampExtractor.java |   4 +-
 .../processor/internals/PartitionGroup.java|  18 ++--
 .../streams/processor/internals/RecordQueue.java   |  23 -
 .../streams/processor/internals/StreamTask.java|  10 +-
 .../apache/kafka/streams/StreamsConfigTest.java|   2 +-
 .../processor/internals/PartitionGroupTest.java|   8 +-
 .../processor/internals/ProcessorTopologyTest.java |   2 +-
 .../processor/internals/RecordQueueTest.java   | 106 ++---
 .../apache/kafka/test/MockTimestampExtractor.java  |   2 +-
 15 files changed, 151 insertions(+), 58 deletions(-)

diff --git 
a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java
 
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java
index 4f6257a..d760183 100644
--- 
a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java
+++ 
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java
@@ -27,7 +27,7 @@ import org.apache.kafka.streams.processor.TimestampExtractor;
 public class JsonTimestampExtractor implements TimestampExtractor {
 
 @Override
-public long extract(final ConsumerRecord record, final 
long previousTimestamp) {
+public long extract(final ConsumerRecord record, final 
long partitionTime) {
 if (record.value() instanceof PageViewTypedDemo.PageView) {
 return ((PageViewTypedDemo.PageView) record.value()).timestamp;
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/ExtractRecordMetadataTimestamp.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/ExtractRecordMetadataTimestamp.java
index 79c8dd3..3c7428a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/ExtractRecordMetadataTimestamp.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/ExtractRecordMetadataTimestamp.java
@@ -50,15 +50,15 @@ abstract class ExtractRecordMetadataTimestamp implements 
TimestampExtractor {
  * Extracts the embedded metadata timestamp from the given {@link 
ConsumerRecord}.
  *
  * @param record a data record
- * @param previousTimestamp the latest extracted valid timestamp of the 
current record's partition˙ (could be -1 if unknown)
+ * @param partitionTime the highest extracted valid timestamp of the 
current record's partition˙ (could be -1 if unknown)
  * @return the embedded metadata timestamp of the given {@link 
ConsumerRecord}
  */
 @Override
-public long extract(final ConsumerRecord record, final 
long previousTimestamp) {
+public long extract(final ConsumerRecord record, final 
long partitionTime) {
 final long timestamp = record.timestamp();
 
 if (timestamp < 0) {
-return onInvalidTimestamp(record, timestamp, previousTimestamp);
+return onInvalidTimestamp(record, timestamp, partitionTime);
 }
 
 return timestamp;
@@ -69,10 +69,10 @@ abstract class ExtractRecordMetadataTimestamp implements 
TimestampExtractor {
  *
  * @param record a data record
  * @param recordTimestamp the timestamp extractor from the record
- * @param previousTimestamp the latest extracted valid timestamp of the 
current record's partition˙ (could be -1 if unknown)
+ * @param partitionTime the highest extracted valid timestamp of the 
current record's partition˙ (could be -1 if unknown)
  * @return a new timestamp for the record (if negative, record will not be 
processed but dropped silently)
  */
 public abstract long onInvalidTimestamp(final ConsumerRecord 

<    2   3   4   5   6   7   8   9   10   11   >