[kafka] branch trunk updated (2a54347 -> 78f5da9)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from 2a54347 MINOR: Improve FK Join docs and optimize null-fk case (#7536) add 78f5da9 KAFKA-9053: AssignmentInfo#encode hardcodes the LATEST_SUPPORTED_VERSION (#7537) No new revisions were added by this update. Summary of changes: .../internals/StreamsPartitionAssignor.java| 22 ++ .../internals/assignment/AssignmentInfo.java | 22 +++--- .../internals/assignment/SubscriptionInfo.java | 4 ++-- .../internals/assignment/AssignmentInfoTest.java | 9 + .../internals/assignment/SubscriptionInfoTest.java | 17 + 5 files changed, 57 insertions(+), 17 deletions(-)
[kafka] branch trunk updated (1d6d370 -> 3e30bf5)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from 1d6d370 MINOR: Update Kafka Streams upgrade docs for KIP-444, KIP-470, KIP-471, KIP-474, KIP-528 (#7515) add 3e30bf5 Explain the separate upgrade paths for consumer groups and Streams (#7516) No new revisions were added by this update. Summary of changes: docs/streams/upgrade-guide.html | 17 +++-- docs/upgrade.html | 25 + 2 files changed, 36 insertions(+), 6 deletions(-)
[kafka] branch 2.4 updated: Explain the separate upgrade paths for consumer groups and Streams (#7516)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch 2.4 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.4 by this push: new ee4f65d Explain the separate upgrade paths for consumer groups and Streams (#7516) ee4f65d is described below commit ee4f65dfa5bd2a14e17805734be3d9e1fdfc25a2 Author: A. Sophie Blee-Goldman AuthorDate: Wed Oct 16 16:09:09 2019 -0700 Explain the separate upgrade paths for consumer groups and Streams (#7516) Document the upgrade path for the consumer and for Streams (note that they differ significantly). Needs to be cherry-picked to 2.4 Reviewers: Guozhang Wang --- docs/streams/upgrade-guide.html | 17 +++-- docs/upgrade.html | 25 + 2 files changed, 36 insertions(+), 6 deletions(-) diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html index 1cc5b50..2de7917 100644 --- a/docs/streams/upgrade-guide.html +++ b/docs/streams/upgrade-guide.html @@ -34,10 +34,10 @@ -Upgrading from any older version to {{fullDotVersion}} is possible: (1) if you are upgrading from 2.0.x to {{fullDotVersion}} then a single rolling bounce is needed to swap in the new jar, -(2) if you are upgrading from older versions than 2.0.x in the online mode, you would need two rolling bounces where -the first rolling bounce phase you need to set config upgrade.from="older version" (possible values are "0.10.0", "0.10.1", "0.10.2", "0.11.0", "1.0", and "1.1") -(cf. https://cwiki.apache.org/confluence/display/KAFKA/KIP-268%3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade;>KIP-268): +Upgrading from any older version to {{fullDotVersion}} is possible: you will need to do two rolling bounces, where during the first rolling bounce phase you set the config upgrade.from="older version" +(possible values are "0.10.0" - "2.3") and during the second you remove it. This is required to safely upgrade to the new cooperative rebalancing protocol of the embedded consumer. Note that you will remain using the old eager +rebalancing protocol if you skip or delay the second rolling bounce, but you can safely switch over to cooperative at any time once the entire group is on 2.4+ by removing the config value and bouncing. For more details please refer to +https://cwiki.apache.org/confluence/x/vAclBg;>KIP-429: prepare your application instances for a rolling bounce and make sure that config upgrade.from is set to the version from which it is being upgrade. @@ -75,11 +75,16 @@ Streams API changes in 2.4.0 - - + +With the introduction of incremental cooperative rebalancing, Streams no longer requires all tasks be revoked at the beginning of a rebalance. Instead, at the completion of the rebalance only those tasks which are to be migrated to another consumer +for overall load balance will need to be closed and revoked. This changes the semantics of the StateListener a bit, as it will not necessarily transition to REBALANCING at the beginning of a rebalance anymore. Note that +this means IQ will now be available at all times except during state restoration, including while a rebalance is in progress. If restoration is occurring when a rebalance begins, we will continue to actively restore the state stores and/or process +standby tasks during a cooperative rebalance. Note that with this new rebalancing protocol, you may sometimes see a rebalance be followed by a second short rebalance that ensures all tasks are safely distributed. For details on please see +https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol;>KIP-429. + The 2.4.0 release contains newly added and reworked metrics. diff --git a/docs/upgrade.html b/docs/upgrade.html index 0c69336..6d9f702 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -45,6 +45,31 @@ The old overloaded functions are deprecated and we would recommend users to make their code changes to leverage the new methods (details can be found in https://cwiki.apache.org/confluence/display/KAFKA/KIP-520%3A+Add+overloaded+Consumer%23committed+for+batching+partitions;>KIP-520). +We are introducing incremental cooperative rebalancing to the clients' group protocol, which allows consumers to keep all of their assigned partitions during a rebalance +and at the end revoke only those which must be migrated to another consumer for overall cluster balance. The ConsumerCoordinator will choose the latest RebalanceProtocol +that is commonly supported by all of
[kafka] branch trunk updated (1d6d370 -> 3e30bf5)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from 1d6d370 MINOR: Update Kafka Streams upgrade docs for KIP-444, KIP-470, KIP-471, KIP-474, KIP-528 (#7515) add 3e30bf5 Explain the separate upgrade paths for consumer groups and Streams (#7516) No new revisions were added by this update. Summary of changes: docs/streams/upgrade-guide.html | 17 +++-- docs/upgrade.html | 25 + 2 files changed, 36 insertions(+), 6 deletions(-)
[kafka] branch trunk updated (0725035 -> 8966d06)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from 0725035 KAFKA-9032: Bypass serdes for tombstones (#7518) add 8966d06 KAFKA-9039: Optimize ReplicaFetcher fetch path (#7443) No new revisions were added by this update. Summary of changes: checkstyle/import-control-jmh-benchmarks.xml | 2 + .../apache/kafka/clients/FetchSessionHandler.java | 44 ++- .../kafka/common/internals/PartitionStates.java| 2 +- .../scala/kafka/server/AbstractFetcherThread.scala | 117 .../kafka/server/ReplicaAlterLogDirsThread.scala | 19 +- .../scala/kafka/server/ReplicaFetcherThread.scala | 30 +- .../kafka/server/AbstractFetcherManagerTest.scala | 2 +- .../kafka/server/AbstractFetcherThreadTest.scala | 15 +- .../server/ReplicaAlterLogDirsThreadTest.scala | 27 +- gradle/spotbugs-exclude.xml| 2 + .../jmh/fetcher/ReplicaFetcherThreadBenchmark.java | 312 + .../jmh/fetchsession/FetchSessionBenchmark.java| 119 12 files changed, 579 insertions(+), 112 deletions(-) create mode 100644 jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java create mode 100644 jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetchsession/FetchSessionBenchmark.java
[kafka] branch 2.4 updated: KAFKA-9032: Bypass serdes for tombstones (#7518)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch 2.4 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.4 by this push: new f2495e6 KAFKA-9032: Bypass serdes for tombstones (#7518) f2495e6 is described below commit f2495e6fc81a4bfcc9ce0908643fb1aa63d3dcd3 Author: John Roesler AuthorDate: Wed Oct 16 11:34:52 2019 -0500 KAFKA-9032: Bypass serdes for tombstones (#7518) In a KTable context, null record values have a special "tombstone" significance. We should always bypass the serdes for such tombstones, since otherwise the serde could violate Streams' table semantics. Added test coverage for this case and fixed the code accordingly. Reviewers: A. Sophie Blee-Goldman , Matthias J. Sax , Guozhang Wang , Bill Bejeck --- .../SubscriptionResponseWrapperSerde.java | 12 +++-- .../SubscriptionResponseWrapperSerdeTest.java | 63 ++ 2 files changed, 60 insertions(+), 15 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java index 6524b4f..4277f9a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java @@ -58,7 +58,7 @@ public class SubscriptionResponseWrapperSerde implements Serde implements Serde 0) { +final byte[] serializedValue; serializedValue = new byte[data.length - lengthSum]; buf.get(serializedValue, 0, serializedValue.length); -} else -serializedValue = null; +value = deserializer.deserialize(topic, serializedValue); +} else { +value = null; +} -final V value = deserializer.deserialize(topic, serializedValue); return new SubscriptionResponseWrapper<>(hash, value, version); } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerdeTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerdeTest.java index fde9bdd..40948e3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerdeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerdeTest.java @@ -17,15 +17,58 @@ package org.apache.kafka.streams.kstream.internals.foreignkeyjoin; import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.state.internals.Murmur3; import org.junit.Test; -import static org.junit.Assert.assertEquals; +import java.util.Map; + +import static java.util.Objects.requireNonNull; import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; public class SubscriptionResponseWrapperSerdeTest { +private static final class NonNullableSerde implements Serde, Serializer, Deserializer { +private final Serde delegate; + +NonNullableSerde(final Serde delegate) { +this.delegate = delegate; +} + +@Override +public void configure(final Map configs, final boolean isKey) { + +} + +@Override +public void close() { + +} + +@Override +public Serializer serializer() { +return this; +} + +@Override +public Deserializer deserializer() { +return this; +} + +@Override +public byte[] serialize(final String topic, final T data) { +return delegate.serializer().serialize(topic, requireNonNull(data)); +} + +@Override +public T deserialize(final String topic, final byte[] data) { +return delegate.deserializer().deserialize(topic, requireNonNull(data)); +} +} @Test @SuppressWarnings("unchecked") @@ -33,9 +76,9 @@ public class SubscriptionResponseWrapperSerdeTest { final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0x01, (byte) 0x9A, (byte) 0xFF, (byte) 0x00}); final String foreignValue = "foreignValue"; final SubscriptionResponseWrapper srw = new Subsc
[kafka] branch trunk updated (3e24495 -> 0725035)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from 3e24495 KAFKA-8897: Warn about no guaranteed backwards compatibility in RocksDBConfigSetter (#7483) add 0725035 KAFKA-9032: Bypass serdes for tombstones (#7518) No new revisions were added by this update. Summary of changes: .../SubscriptionResponseWrapperSerde.java | 12 +++-- .../SubscriptionResponseWrapperSerdeTest.java | 63 ++ 2 files changed, 60 insertions(+), 15 deletions(-)
[kafka] branch 2.4 updated: MINOR: remove unused import in QueryableStateIntegrationTest (#7521)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch 2.4 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.4 by this push: new 4755084 MINOR: remove unused import in QueryableStateIntegrationTest (#7521) 4755084 is described below commit 475508405eff45b149b5725e174c0f85690b9c08 Author: Lucas Bradstreet AuthorDate: Tue Oct 15 12:18:20 2019 -0700 MINOR: remove unused import in QueryableStateIntegrationTest (#7521) Reviewers: Guozhang Wang --- .../apache/kafka/streams/integration/QueryableStateIntegrationTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java index de2ae27..a0d38dc 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java @@ -54,7 +54,6 @@ import org.apache.kafka.streams.state.StreamsMetadata; import org.apache.kafka.streams.state.WindowStoreIterator; import org.apache.kafka.test.IntegrationTest; import org.apache.kafka.test.MockMapper; -import org.apache.kafka.test.StreamsTestUtils; import org.apache.kafka.test.TestCondition; import org.apache.kafka.test.TestUtils; import org.junit.After;
[kafka] branch trunk updated (e66ed2d -> caf3499)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from e66ed2d MINOR: code and JavaDoc cleanup (#7462) add caf3499 MINOR: remove unused import in QueryableStateIntegrationTest (#7521) No new revisions were added by this update. Summary of changes: .../apache/kafka/streams/integration/QueryableStateIntegrationTest.java | 1 - 1 file changed, 1 deletion(-)
[kafka] branch 2.4 updated: KAFKA-4422 / KAFKA-8700 / KAFKA-5566: Wait for state to transit to RUNNING upon start (#7519)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch 2.4 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.4 by this push: new bb75897 KAFKA-4422 / KAFKA-8700 / KAFKA-5566: Wait for state to transit to RUNNING upon start (#7519) bb75897 is described below commit bb758977e91c10b0e56da1c51dcaee4d6a094d73 Author: Guozhang Wang AuthorDate: Tue Oct 15 11:34:48 2019 -0700 KAFKA-4422 / KAFKA-8700 / KAFKA-5566: Wait for state to transit to RUNNING upon start (#7519) I looked into the logs of the above tickets, and I think for a couple fo them it is due to the fact that the threads takes time to restore, or just stabilize the rebalance since there are multi-threads. Adding the hook to wait for state to transit to RUNNING upon starting. Reviewers: Chris Pettitt , Matthias J. Sax --- .../integration/QueryableStateIntegrationTest.java | 33 -- 1 file changed, 24 insertions(+), 9 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java index c5dbabe..de2ae27 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java @@ -54,6 +54,7 @@ import org.apache.kafka.streams.state.StreamsMetadata; import org.apache.kafka.streams.state.WindowStoreIterator; import org.apache.kafka.test.IntegrationTest; import org.apache.kafka.test.MockMapper; +import org.apache.kafka.test.StreamsTestUtils; import org.apache.kafka.test.TestCondition; import org.apache.kafka.test.TestUtils; import org.junit.After; @@ -92,6 +93,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.apache.kafka.test.StreamsTestUtils.startKafkaStreamsAndWaitForRunningState; @Category({IntegrationTest.class}) public class QueryableStateIntegrationTest { @@ -261,6 +263,16 @@ public class QueryableStateIntegrationTest { @Override public void run() { myStream.start(); + +try { +TestUtils.waitForCondition( +() -> stateListener.mapStates.containsKey(KafkaStreams.State.RUNNING), +"Did not start successfully after " + TestUtils.DEFAULT_MAX_WAIT_MS + " ms" +); +} catch (final InterruptedException e) { +if (!stateListener.mapStates.containsKey(KafkaStreams.State.RUNNING)) +fail("Did not start successfully"); +} } public void close() { @@ -446,7 +458,8 @@ public class QueryableStateIntegrationTest { windowStoreName, streamsConfiguration); -kafkaStreams.start(); +startKafkaStreamsAndWaitForRunningState(kafkaStreams); + producerThread.start(); try { @@ -515,7 +528,7 @@ public class QueryableStateIntegrationTest { t2.toStream().to(outputTopic); kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration); -kafkaStreams.start(); +startKafkaStreamsAndWaitForRunningState(kafkaStreams); waitUntilAtLeastNumRecordProcessed(outputTopic, 1); @@ -581,7 +594,7 @@ public class QueryableStateIntegrationTest { .to(outputTopic, Produced.with(Serdes.String(), Serdes.Long())); kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration); -kafkaStreams.start(); +startKafkaStreamsAndWaitForRunningState(kafkaStreams); waitUntilAtLeastNumRecordProcessed(outputTopic, 5); @@ -629,7 +642,7 @@ public class QueryableStateIntegrationTest { t3.toStream().to(outputTopic, Produced.with(Serdes.String(), Serdes.Long())); kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration); -kafkaStreams.start(); +startKafkaStreamsAndWaitForRunningState(kafkaStreams); waitUntilAtLeastNumRecordProcessed(outputTopic, 1); @@ -690,7 +703,7 @@ public class QueryableStateIntegrationTest { .windowedBy(TimeWindows.of(ofMillis(WINDOW_SIZE))) .count(Materialized.as(windowStoreName)); kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration); -kafkaStreams.start(); +startKafkaStreamsAndWaitForRunningState(kafkaStreams); waitUntilAtLeastNumRecordProcessed(outputTopic, 1); @@ -716,7 +729,7 @@ public class QueryableStateIntegrationTest { final String storeName = "count-by-key"; stream.groupByKey().count(Materialized.as(storeName)
[kafka] branch trunk updated (176c934 -> 76fcabc)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from 176c934 KAFKA-8813: Refresh log config if it's updated before initialization (#7305) add 76fcabc KAFKA-4422 / KAFKA-8700 / KAFKA-5566: Wait for state to transit to RUNNING upon start (#7519) No new revisions were added by this update. Summary of changes: .../integration/QueryableStateIntegrationTest.java | 33 -- 1 file changed, 24 insertions(+), 9 deletions(-)
[kafka] branch 2.4 updated: KAFKA-8942: Document RocksDB metrics
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch 2.4 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.4 by this push: new 2667773 KAFKA-8942: Document RocksDB metrics 2667773 is described below commit 26677731f87116154f74da421d4f440b80599da1 Author: Bruno Cadonna AuthorDate: Tue Oct 15 11:10:42 2019 -0700 KAFKA-8942: Document RocksDB metrics Author: Bruno Cadonna Reviewers: Guozhang Wang Closes #7490 from cadonna/AK8942-docs-rocksdb_metrics Minor comments (cherry picked from commit 9c80a06466dbfade96308ef26c20c6555612191e) Signed-off-by: Guozhang Wang --- docs/ops.html | 102 ++ 1 file changed, 102 insertions(+) diff --git a/docs/ops.html b/docs/ops.html index 05d7de1..d87d16a 100644 --- a/docs/ops.html +++ b/docs/ops.html @@ -1979,6 +1979,108 @@ All the following metrics have a recording level of debug: + RocksDB Metrics + All the following metrics have a recording level of debug. + The metrics are collected every minute from the RocksDB state stores. + If a state store consists of multiple RocksDB instances as it is the case for aggregations over time and session windows, + each metric reports an aggregation over the RocksDB instances of the state store. + Note that the store-scope for built-in RocksDB state stores are currently the following: + +rocksdb-state (for RocksDB backed key-value store) +rocksdb-window-state (for RocksDB backed window store) +rocksdb-session-state (for RocksDB backed session store) + + + + + + Metric/Attribute name + Description + Mbean name + + + bytes-written-rate + The average number of bytes written per second to the RocksDB state store. + kafka.streams:type=stream-state-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) + + + bytes-written-total + The total number of bytes written to the RocksDB state store. + kafka.streams:type=stream-state-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) + + + bytes-read-rate + The average number of bytes read per second from the RocksDB state store. + kafka.streams:type=stream-state-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) + + + bytes-read-total + The total number of bytes read from the RocksDB state store. + kafka.streams:type=stream-state-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) + + + memtable-bytes-flushed-rate + The average number of bytes flushed per second from the memtable to disk. + kafka.streams:type=stream-state-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) + + + memtable-bytes-flushed-total + The total number of bytes flushed from the memtable to disk. + kafka.streams:type=stream-state-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) + + + memtable-hit-ratio + The ratio of memtable hits relative to all lookups to the memtable. + kafka.streams:type=stream-state-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) + + + block-cache-data-hit-ratio + The ratio of block cache hits for data blocks relative to all lookups for data blocks to the block cache. + kafka.streams:type=stream-state-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) + + + block-cache-index-hit-ratio + The ratio of block cache hits for index blocks relative to all lookups for index blocks to the block cache. + kafka.streams:type=stream-state-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) + + + block-cache-filter-hit-ratio + The ratio of block cache hits for filter blocks relative to all lookups for filter blocks to the block cache. + kafka.streams:type=stream-state-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) + + + write-stall-duration-avg + The average duration of write stalls in ms. + kafka.streams:type=stream-state-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) + + + write-stall-duration-total + The total duration of write stalls in ms. + kafka.streams:type=stream-state-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) + + + bytes-read-compaction-rate + The average number of bytes read per second during compaction. + kafka.streams:type=stream-state-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) + + + bytes-written-compaction-rate + The average number of bytes written per second during compaction
[kafka] branch trunk updated (2fce203 -> 9c80a06)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from 2fce203 KAFKA-8671: NullPointerException occurs if topic associated with GlobalKTable changes (#7437) add 9c80a06 KAFKA-8942: Document RocksDB metrics No new revisions were added by this update. Summary of changes: docs/ops.html | 102 ++ 1 file changed, 102 insertions(+)
[kafka] branch trunk updated (11a401d -> c9c3adc)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from 11a401d delete (#7504) add c9c3adc MINOR: move "Added/Removed sensor" log messages to TRACE (#7502) No new revisions were added by this update. Summary of changes: clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-)
[kafka] branch 2.4 updated: KAFKA-9029: Flaky Test CooperativeStickyAssignorTest.testReassignmentWithRand: bump to 4 (#7503)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch 2.4 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.4 by this push: new f965e79 KAFKA-9029: Flaky Test CooperativeStickyAssignorTest.testReassignmentWithRand: bump to 4 (#7503) f965e79 is described below commit f965e79d49b85591a3d5bd7b6d3562b9278da05e Author: A. Sophie Blee-Goldman AuthorDate: Sat Oct 12 11:07:12 2019 -0700 KAFKA-9029: Flaky Test CooperativeStickyAssignorTest.testReassignmentWithRand: bump to 4 (#7503) One of the sticky assignor tests involves a random change in subscriptions that the current assignor algorithm struggles to react to and in cooperative mode ends up requiring more than one followup rebalance. Apparently, in rare cases it can also require more than 2. Bumping the "allowed subsequent rebalances" to 4 (increase of 2) to allow some breathing room and reduce flakiness (technically any number is "correct", but if it turns out to ever require more than 4 we should revisit and improve the algorithm because that would be excessive (see KAFKA-8767) Reviewers: Guozhang Wang --- .../apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java| 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java index 72a6d89..aed8c09 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java @@ -68,7 +68,7 @@ public class CooperativeStickyAssignorTest extends AbstractStickyAssignorTest { assignments.putAll(assignor.assign(partitionsPerTopic, subscriptions)); ++rebalances; -assertTrue(rebalances <= 2); +assertTrue(rebalances <= 4); } // Check the validity and balance of the final assignment
[kafka] branch trunk updated (2ff8fa0 -> 6fb8bd0)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from 2ff8fa0 KAFKA-8122: Fix Kafka Streams EOS integration test (#7470) add 6fb8bd0 KAFKA-9029: Flaky Test CooperativeStickyAssignorTest.testReassignmentWithRand: bump to 4 (#7503) No new revisions were added by this update. Summary of changes: .../apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java| 2 +- 1 file changed, 1 insertion(+), 1 deletion(-)
[kafka] branch 2.4 updated: HOTFIX: fix checkstyle in Streams system test (#7494)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch 2.4 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.4 by this push: new 8e048ba HOTFIX: fix checkstyle in Streams system test (#7494) 8e048ba is described below commit 8e048bac4576b0da00d0542a9e848af95c1dea03 Author: A. Sophie Blee-Goldman AuthorDate: Thu Oct 10 19:38:14 2019 -0700 HOTFIX: fix checkstyle in Streams system test (#7494) Reviewers: Guozhang Wang --- .../kafka/streams/tests/StreamsBrokerDownResilienceTest.java | 12 ++-- 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java index 4dd4954..b605f46 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java @@ -104,7 +104,7 @@ public class StreamsBrokerDownResilienceTest { final KafkaStreams streams = new KafkaStreams(builder.build(), streamsProperties); -streams.setUncaughtExceptionHandler( (t,e) -> { +streams.setUncaughtExceptionHandler((t, e) -> { System.err.println("FATAL: An unexpected exception " + e); System.err.flush(); streams.close(Duration.ofSeconds(30)); @@ -114,11 +114,11 @@ public class StreamsBrokerDownResilienceTest { System.out.println("Start Kafka Streams"); streams.start(); -Runtime.getRuntime().addShutdownHook(new Thread( () -> { -streams.close(Duration.ofSeconds(30)); -System.out.println("Complete shutdown of streams resilience test app now"); -System.out.flush(); -} +Runtime.getRuntime().addShutdownHook(new Thread(() -> { +streams.close(Duration.ofSeconds(30)); +System.out.println("Complete shutdown of streams resilience test app now"); +System.out.flush(); +} )); }
[kafka] branch trunk updated (cc6525a -> b80a572)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from cc6525a KAFKA-8743: Flaky Test Repartition{WithMerge}OptimizingIntegrationTest (#7472) add b80a572 HOTFIX: fix checkstyle in Streams system test (#7494) No new revisions were added by this update. Summary of changes: .../kafka/streams/tests/StreamsBrokerDownResilienceTest.java | 12 ++-- 1 file changed, 6 insertions(+), 6 deletions(-)
[kafka] branch 2.4 updated: KAFKA-8743: Flaky Test Repartition{WithMerge}OptimizingIntegrationTest (#7472)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch 2.4 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.4 by this push: new de202a6 KAFKA-8743: Flaky Test Repartition{WithMerge}OptimizingIntegrationTest (#7472) de202a6 is described below commit de202a669777becd4a79fcdf6c0597e5a12d2370 Author: A. Sophie Blee-Goldman AuthorDate: Thu Oct 10 16:23:18 2019 -0700 KAFKA-8743: Flaky Test Repartition{WithMerge}OptimizingIntegrationTest (#7472) All four flavors of the repartition/optimization tests have been reported as flaky and failed in one place or another: * RepartitionOptimizingIntegrationTest.shouldSendCorrectRecords_OPTIMIZED * RepartitionOptimizingIntegrationTest.shouldSendCorrectRecords_NO_OPTIMIZATION * RepartitionWithMergeOptimizingIntegrationTest.shouldSendCorrectRecords_OPTIMIZED * RepartitionWithMergeOptimizingIntegrationTest.shouldSendCorrectRecords_NO_OPTIMIZATION They're pretty similar so it makes sense to knock them all out at once. This PR does three things: * Switch to in-memory stores wherever possible * Name all operators and update the Topology accordingly (not really a flaky test fix, but had to update the topology names anyway because of the IM stores so figured might as well) * Port to TopologyTestDriver -- this is the "real" fix, should make a big difference as these repartition tests required multiple roundtrips with the Kafka cluster (while using only the default timeout) Reviewers: Bill Bejeck , Guozhang Wang --- .../internals/RepartitionOptimizingTest.java} | 548 +++-- .../RepartitionWithMergeOptimizingTest.java} | 328 ++-- .../tests/StreamsBrokerDownResilienceTest.java | 15 +- .../tests/streams/streams_upgrade_test.py | 2 +- 4 files changed, 465 insertions(+), 428 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionOptimizingIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java similarity index 54% rename from streams/src/test/java/org/apache/kafka/streams/integration/RepartitionOptimizingIntegrationTest.java rename to streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java index bea32f2..8425ad7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionOptimizingIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java @@ -15,40 +15,41 @@ * limitations under the License. */ -package org.apache.kafka.streams.integration; +package org.apache.kafka.streams.processor.internals; - -import kafka.utils.MockTime; +import java.util.HashMap; +import java.util.Map; +import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.TestInputTopic; +import org.apache.kafka.streams.TestOutputTopic; import org.apache.kafka.streams.Topology; -import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; -import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.TopologyTestDriver; import org.apache.kafka.streams.kstream.Aggregator; import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Grouped; import org.apache.kafka.streams.kstream.Initializer; import org.apache.kafka.streams.kstream.JoinWindows; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Named; import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.Reducer; import org.apache.kafka.streams.kstream.StreamJoined; import org.apache.kafka.streams.processor.AbstractProcessor; -import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.streams.state.Stores; import org.apache.kafka.test.StreamsTestUtils; -import org.apache.kafka.test.TestUtils; import org.junit.After; import org.junit.Before; -import org.junit.ClassRule; import org.junit.Test; -import org.junit.experimental.categories.Category; import java.util.ArrayList; import java.util.Arrays; @@ -57,17 +58,19 @@ import java.util.Local
[kafka] branch trunk updated (fa2a9f0 -> cc6525a)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from fa2a9f0 MINOR: ListPartitionReassignmentsResponse should not be entirely failed when a topic-partition does not exist (#7486) add cc6525a KAFKA-8743: Flaky Test Repartition{WithMerge}OptimizingIntegrationTest (#7472) No new revisions were added by this update. Summary of changes: .../internals/RepartitionOptimizingTest.java} | 548 +++-- .../RepartitionWithMergeOptimizingTest.java} | 328 ++-- .../tests/StreamsBrokerDownResilienceTest.java | 15 +- .../tests/streams/streams_upgrade_test.py | 2 +- 4 files changed, 465 insertions(+), 428 deletions(-) rename streams/src/test/java/org/apache/kafka/streams/{integration/RepartitionOptimizingIntegrationTest.java => processor/internals/RepartitionOptimizingTest.java} (54%) rename streams/src/test/java/org/apache/kafka/streams/{integration/RepartitionWithMergeOptimizingIntegrationTest.java => processor/internals/RepartitionWithMergeOptimizingTest.java} (54%)
[kafka] branch trunk updated (c3177a1 -> f41a5c2)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from c3177a1 Add wait condition for state RUNNING (#7476) add f41a5c2 KAFKA-8729, pt 3: Add broker-side logic to handle the case when there are record_errors and error_message (#7167) No new revisions were added by this update. Summary of changes: .../kafka/common/requests/ProduceRequest.java | 2 +- .../kafka/common/requests/ProduceResponse.java | 99 ++ .../resources/common/message/ProduceRequest.json | 2 +- .../resources/common/message/ProduceResponse.json | 12 +-- .../apache/kafka/common/message/MessageTest.java | 14 +-- .../kafka/common/requests/RequestResponseTest.java | 3 +- ...ption.scala => RecordValidationException.scala} | 9 +- core/src/main/scala/kafka/log/Log.scala| 18 +++- core/src/main/scala/kafka/log/LogValidator.scala | 69 ++- .../main/scala/kafka/server/ReplicaManager.scala | 37 +--- core/src/test/scala/unit/kafka/log/LogTest.scala | 10 +-- .../scala/unit/kafka/log/LogValidatorTest.scala| 58 +++-- .../unit/kafka/server/ProduceRequestTest.scala | 36 +++- 13 files changed, 265 insertions(+), 104 deletions(-) copy core/src/main/scala/kafka/common/{LogCleaningAbortedException.scala => RecordValidationException.scala} (74%)
[kafka] branch 2.4 updated: KAFKA-8729, pt 3: Add broker-side logic to handle the case when there are record_errors and error_message (#7167)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch 2.4 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.4 by this push: new 18cfd13 KAFKA-8729, pt 3: Add broker-side logic to handle the case when there are record_errors and error_message (#7167) 18cfd13 is described below commit 18cfd13a9b211cbe272563544904d463c0ed93bb Author: Tu V. Tran AuthorDate: Thu Oct 10 14:44:37 2019 -0700 KAFKA-8729, pt 3: Add broker-side logic to handle the case when there are record_errors and error_message (#7167) All the changes are in ReplicaManager.appendToLocalLog and ReplicaManager.appendRecords. Also, replaced LogAppendInfo.unknownLogAppendInfoWithLogStartOffset with LogAppendInfo.unknownLogAppendInfoWithAdditionalInfo to include those 2 new fields. Reviewers: Guozhang Wang , Jason Gustafson --- .../kafka/common/requests/ProduceRequest.java | 2 +- .../kafka/common/requests/ProduceResponse.java | 99 ++ .../resources/common/message/ProduceRequest.json | 2 +- .../resources/common/message/ProduceResponse.json | 12 +-- .../apache/kafka/common/message/MessageTest.java | 14 +-- .../kafka/common/requests/RequestResponseTest.java | 3 +- .../kafka/common/RecordValidationException.scala | 25 ++ core/src/main/scala/kafka/log/Log.scala| 18 +++- core/src/main/scala/kafka/log/LogValidator.scala | 69 ++- .../main/scala/kafka/server/ReplicaManager.scala | 37 +--- core/src/test/scala/unit/kafka/log/LogTest.scala | 10 +-- .../scala/unit/kafka/log/LogValidatorTest.scala| 58 +++-- .../unit/kafka/server/ProduceRequestTest.scala | 36 +++- 13 files changed, 285 insertions(+), 100 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java index 932473c..7b3ae1c 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java @@ -121,7 +121,7 @@ public class ProduceRequest extends AbstractRequest { private static final Schema PRODUCE_REQUEST_V7 = PRODUCE_REQUEST_V6; /** - * V8 bumped up to add two new fields error_records offset list and error_message to {@link org.apache.kafka.common.requests.ProduceResponse.PartitionResponse} + * V8 bumped up to add two new fields record_errors offset list and error_message to {@link org.apache.kafka.common.requests.ProduceResponse.PartitionResponse} * (See KIP-467) */ private static final Schema PRODUCE_REQUEST_V8 = PRODUCE_REQUEST_V7; diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java index a6df880..8bfc629 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java @@ -74,14 +74,14 @@ public class ProduceResponse extends AbstractResponse { private static final String BASE_OFFSET_KEY_NAME = "base_offset"; private static final String LOG_APPEND_TIME_KEY_NAME = "log_append_time"; private static final String LOG_START_OFFSET_KEY_NAME = "log_start_offset"; -private static final String ERROR_RECORDS_KEY_NAME = "error_records"; -private static final String RELATIVE_OFFSET_KEY_NAME = "relative_offset"; -private static final String RELATIVE_OFFSET_ERROR_MESSAGE_KEY_NAME = "relative_offset_error_message"; +private static final String RECORD_ERRORS_KEY_NAME = "record_errors"; +private static final String BATCH_INDEX_KEY_NAME = "batch_index"; +private static final String BATCH_INDEX_ERROR_MESSAGE_KEY_NAME = "batch_index_error_message"; private static final String ERROR_MESSAGE_KEY_NAME = "error_message"; private static final Field.Int64 LOG_START_OFFSET_FIELD = new Field.Int64(LOG_START_OFFSET_KEY_NAME, "The start offset of the log at the time this produce response was created", INVALID_OFFSET); -private static final Field.NullableStr RELATIVE_OFFSET_ERROR_MESSAGE_FIELD = new Field.NullableStr(RELATIVE_OFFSET_ERROR_MESSAGE_KEY_NAME, +private static final Field.NullableStr BATCH_INDEX_ERROR_MESSAGE_FIELD = new Field.NullableStr(BATCH_INDEX_ERROR_MESSAGE_KEY_NAME, "The error message of the record that caused the batch to be dropped"); private static final Field.NullableStr ERROR_MESSAGE_FIELD = new Field.NullableStr(ERROR_MESSAGE_KEY_NAME, "The global error message summarizing the common root cause of the records that caused the batch
[kafka] branch trunk updated (e0824e2 -> fb9b0df)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from e0824e2 MINOR: Just one put and flush to generation rocksDB File in RocksDBStoreTest (#7469) add fb9b0df MINOR: Augment log4j to add generation number in performAssign (#7451) No new revisions were added by this update. Summary of changes: .../consumer/internals/AbstractCoordinator.java| 66 ++ .../consumer/internals/ConsumerCoordinator.java| 11 ++-- .../internals/AbstractCoordinatorTest.java | 2 +- .../internals/ConsumerCoordinatorTest.java | 6 +- .../runtime/distributed/WorkerCoordinator.java | 2 +- 5 files changed, 52 insertions(+), 35 deletions(-)
[kafka] branch trunk updated (32a3dc8 -> e0824e2)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from 32a3dc8 KAFKA-8991: Enable scalac optimizer (#7452) add e0824e2 MINOR: Just one put and flush to generation rocksDB File in RocksDBStoreTest (#7469) No new revisions were added by this update. Summary of changes: .../kafka/streams/state/internals/RocksDBStoreTest.java| 14 +++--- 1 file changed, 3 insertions(+), 11 deletions(-)
[kafka] branch trunk updated (f9934e7 -> e3c2148)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from f9934e7 MINOR: remove unused imports in Streams system tests (#7468) add e3c2148 KAFKA-8964: Rename tag client-id for thread-level metrics and below (#7429) No new revisions were added by this update. Summary of changes: .../streams/kstream/internals/metrics/Sensors.java | 24 ++-- .../streams/processor/internals/ProcessorNode.java | 18 +- .../streams/processor/internals/StreamTask.java| 3 +- .../internals/metrics/StreamsMetricsImpl.java | 64 -- .../AbstractRocksDBSegmentedBytesStore.java| 2 +- .../state/internals/InMemorySessionStore.java | 2 +- .../state/internals/InMemoryWindowStore.java | 4 +- .../streams/state/internals/metrics/Sensors.java | 8 +-- ...KStreamSessionWindowAggregateProcessorTest.java | 8 +-- .../internals/KStreamWindowAggregateTest.java | 8 +-- .../KTableSuppressProcessorMetricsTest.java| 18 +++--- .../processor/internals/ProcessorNodeTest.java | 4 +- .../processor/internals/StreamTaskTest.java| 8 +-- .../processor/internals/StreamThreadTest.java | 16 +++--- .../internals/metrics/StreamsMetricsImplTest.java | 46 ++-- .../AbstractRocksDBSegmentedBytesStoreTest.java| 8 +-- .../state/internals/MeteredKeyValueStoreTest.java | 8 +-- .../state/internals/MeteredSessionStoreTest.java | 6 +- .../MeteredTimestampedKeyValueStoreTest.java | 8 +-- .../state/internals/MeteredWindowStoreTest.java| 4 +- .../state/internals/SessionBytesStoreTest.java | 8 +-- .../state/internals/WindowBytesStoreTest.java | 8 +-- .../apache/kafka/streams/TopologyTestDriver.java | 4 +- 23 files changed, 146 insertions(+), 141 deletions(-)
[kafka] branch trunk updated (c49775b -> f9934e7)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from c49775b KAFKA-7190; Retain producer state until transactionalIdExpiration time passes (#7388) add f9934e7 MINOR: remove unused imports in Streams system tests (#7468) No new revisions were added by this update. Summary of changes: tests/kafkatest/tests/streams/streams_broker_bounce_test.py | 1 - tests/kafkatest/tests/streams/streams_eos_test.py | 2 -- 2 files changed, 3 deletions(-)
[kafka] branch 2.3 updated: KAFKA-8262, KAFKA-8263: Fix flaky test `MetricsIntegrationTest` (#6922) (#7457)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch 2.3 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.3 by this push: new 97be73e KAFKA-8262, KAFKA-8263: Fix flaky test `MetricsIntegrationTest` (#6922) (#7457) 97be73e is described below commit 97be73ec9822cbbaeda41b958821982359bcba3e Author: Bruno Cadonna AuthorDate: Mon Oct 7 21:59:07 2019 +0200 KAFKA-8262, KAFKA-8263: Fix flaky test `MetricsIntegrationTest` (#6922) (#7457) - Timeout occurred due to initial slow rebalancing. - Added code to wait until `KafkaStreams` instance is in state RUNNING to check registration of metrics and in state NOT_RUNNING to check deregistration of metrics. - I removed all other wait conditions, because they are not needed if `KafkaStreams` instance is in the right state. Reviewers: Guozhang Wang --- .../integration/MetricsIntegrationTest.java| 456 + 1 file changed, 205 insertions(+), 251 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java index 3f6c806..c9e0b8b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KafkaStreams.State; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; @@ -50,6 +51,9 @@ import java.util.List; import java.util.Properties; import java.util.stream.Collectors; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + @SuppressWarnings("unchecked") @Category({IntegrationTest.class}) public class MetricsIntegrationTest { @@ -187,27 +191,35 @@ public class MetricsIntegrationTest { CLUSTER.deleteTopics(STREAM_INPUT, STREAM_OUTPUT_1, STREAM_OUTPUT_2, STREAM_OUTPUT_3, STREAM_OUTPUT_4); } -private void startApplication() { +private void startApplication() throws InterruptedException { kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration); kafkaStreams.start(); +final long timeout = 6; +TestUtils.waitForCondition( +() -> kafkaStreams.state() == State.RUNNING, +timeout, +() -> "Kafka Streams application did not reach state RUNNING in " + timeout + " ms"); } private void closeApplication() throws Exception { kafkaStreams.close(); kafkaStreams.cleanUp(); IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration); +final long timeout = 6; +TestUtils.waitForCondition( +() -> kafkaStreams.state() == State.NOT_RUNNING, +timeout, +() -> "Kafka Streams application did not reach state NOT_RUNNING in " + timeout + " ms"); } -private void checkMetricDeregistration() throws InterruptedException { -TestUtils.waitForCondition(() -> { -final List listMetricAfterClosingApp = new ArrayList(kafkaStreams.metrics().values()).stream().filter(m -> m.metricName().group().contains(STREAM_STRING)).collect(Collectors.toList()); -return listMetricAfterClosingApp.size() == 0; -}, 1, "de-registration of metrics"); +private void checkMetricDeregistration() { +final List listMetricAfterClosingApp = new ArrayList(kafkaStreams.metrics().values()).stream() +.filter(m -> m.metricName().group().contains(STREAM_STRING)).collect(Collectors.toList()); +assertThat(listMetricAfterClosingApp.size(), is(0)); } @Test public void testStreamMetric() throws Exception { -final StringBuilder errorMessage = new StringBuilder(); stream = builder.stream(STREAM_INPUT, Consumed.with(Serdes.Integer(), Serdes.String())); stream.to(STREAM_OUTPUT_1, Produced.with(Serdes.Integer(), Serdes.String())); builder.table(STREAM_OUTPUT_1, Materialized.as(Stores.inMemoryKeyValueStore(MY_STORE_IN_MEMORY)).withCachingEnabled()) @@ -222,22 +234,13 @@ public class MetricsIntegrationTest { startApplication(); -// metric level : Thread -TestUtils.waitForCondition(() -> testThreadMetric(errorMessage), 1, () -> "testThreadMetric -> " + errorMessage.toString()); - -// metric level : Task -
[kafka] branch 2.4 updated: HOTFIX: Hide built-in metrics version (#7459)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch 2.4 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.4 by this push: new 6859820 HOTFIX: Hide built-in metrics version (#7459) 6859820 is described below commit 6859820755332eb90f386a5b76a0c43e035637dd Author: Bruno Cadonna AuthorDate: Mon Oct 7 19:47:50 2019 +0200 HOTFIX: Hide built-in metrics version (#7459) Since currently not all refactorings on streams metrics proposed in KIP-444 has yet been implemented, this commit hides the built-in metrics version config from the user. Thus, the user cannot switch to the refactored streams metrics. Reviewers: Guozhang Wang --- .../org/apache/kafka/streams/KafkaStreams.java | 3 +- .../org/apache/kafka/streams/StreamsConfig.java| 23 --- .../internals/metrics/StreamsMetricsImpl.java | 16 -- .../apache/kafka/streams/StreamsConfigTest.java| 34 -- .../integration/MetricsIntegrationTest.java| 21 + .../internals/GlobalStreamThreadTest.java | 4 +-- .../processor/internals/MockStreamsMetrics.java| 3 +- .../processor/internals/StandbyTaskTest.java | 3 +- .../processor/internals/StreamThreadTest.java | 18 +--- .../internals/metrics/StreamsMetricsImplTest.java | 16 +- .../internals/GlobalStateStoreProviderTest.java| 3 +- .../MeteredTimestampedWindowStoreTest.java | 2 +- .../state/internals/MeteredWindowStoreTest.java| 2 +- .../kafka/test/InternalMockProcessorContext.java | 10 +++ .../apache/kafka/streams/TopologyTestDriver.java | 4 +-- .../streams/processor/MockProcessorContext.java| 3 +- 16 files changed, 47 insertions(+), 118 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 5a00fe3..146eefd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -675,8 +675,7 @@ public class KafkaStreams implements AutoCloseable { Collections.singletonMap(StreamsConfig.CLIENT_ID_CONFIG, clientId)); reporters.add(new JmxReporter(JMX_PREFIX)); metrics = new Metrics(metricConfig, reporters, time); -streamsMetrics = -new StreamsMetricsImpl(metrics, clientId, config.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG)); +streamsMetrics = new StreamsMetricsImpl(metrics, clientId, StreamsMetricsImpl.METRICS_0100_TO_23); streamsMetrics.setRocksDBMetricsRecordingTrigger(rocksDBMetricsRecordingTrigger); ClientMetrics.addVersionMetric(streamsMetrics); ClientMetrics.addCommitIdMetric(streamsMetrics); diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index eb6faff..0a4bfd1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -287,16 +287,6 @@ public class StreamsConfig extends AbstractConfig { @SuppressWarnings("WeakerAccess") public static final String EXACTLY_ONCE = "exactly_once"; -/** - * Config value for parameter {@link #BUILT_IN_METRICS_VERSION_CONFIG "built.in.metrics.version"} for built-in metrics from version 0.10.0. to 2.3 - */ -public static final String METRICS_0100_TO_23 = "0.10.0-2.3"; - -/** - * Config value for parameter {@link #BUILT_IN_METRICS_VERSION_CONFIG "built.in.metrics.version"} for the latest built-in metrics version. - */ -public static final String METRICS_LATEST = "latest"; - /** {@code application.id} */ @SuppressWarnings("WeakerAccess") public static final String APPLICATION_ID_CONFIG = "application.id"; @@ -316,10 +306,6 @@ public class StreamsConfig extends AbstractConfig { public static final String BUFFERED_RECORDS_PER_PARTITION_CONFIG = "buffered.records.per.partition"; private static final String BUFFERED_RECORDS_PER_PARTITION_DOC = "Maximum number of records to buffer per partition."; -/** {@code built.in.metrics.version} */ -public static final String BUILT_IN_METRICS_VERSION_CONFIG = "built.in.metrics.version"; -private static final String BUILT_IN_METRICS_VERSION_DOC = "Version of the built-in metrics to use."; - /** {@code cache.max.bytes.buffering} */ @SuppressWarnings("WeakerAccess") public static final String CACHE_MAX_BYTES_BUFFERING_CONFIG = "cache.max.bytes.buffering"; @@ -625,15 +611,6 @@ public class StreamsConfig extends AbstractConfig {
[kafka] branch 2.4 updated: KAFKA-8179: Part 7, cooperative rebalancing in Streams (#7386)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch 2.4 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.4 by this push: new 133c33f KAFKA-8179: Part 7, cooperative rebalancing in Streams (#7386) 133c33f is described below commit 133c33fde1c4d3b79196b72522f83a75cb6b0e65 Author: A. Sophie Blee-Goldman AuthorDate: Mon Oct 7 09:27:09 2019 -0700 KAFKA-8179: Part 7, cooperative rebalancing in Streams (#7386) Key improvements with this PR: * tasks will remain available for IQ during a rebalance (but not during restore) * continue restoring and processing standby tasks during a rebalance * continue processing active tasks during rebalance until the RecordQueue is empty* * only revoked tasks must suspended/closed * StreamsPartitionAssignor tries to return tasks to their previous consumers within a client * but do not try to commit, for now (pending KAFKA-7312) Reviewers: John Roesler , Boyang Chen , Guozhang Wang --- checkstyle/suppressions.xml| 3 + .../consumer/internals/ConsumerCoordinator.java| 20 +- .../consumer/internals/SubscriptionState.java | 15 +- .../clients/consumer/internals/FetcherTest.java| 31 +- .../processor/internals/AssignedStreamsTasks.java | 15 +- .../streams/processor/internals/StandbyTask.java | 19 - .../processor/internals/StoreChangelogReader.java | 16 +- .../streams/processor/internals/StreamTask.java| 8 +- .../streams/processor/internals/StreamThread.java | 27 +- .../internals/StreamsPartitionAssignor.java| 612 +++-- .../kafka/streams/processor/internals/Task.java| 8 +- .../streams/processor/internals/TaskManager.java | 34 +- .../assignment/AssignorConfiguration.java | 12 +- .../internals/assignment/ClientState.java | 56 +- .../internals/assignment/StickyTaskAssignor.java | 8 +- .../processor/internals/AbstractTaskTest.java | 6 - .../processor/internals/StandbyTaskTest.java | 23 - .../internals/StreamsPartitionAssignorTest.java| 481 ++-- .../processor/internals/TaskManagerTest.java | 28 +- .../internals/assignment/ClientStateTest.java | 8 +- .../assignment/StickyTaskAssignorTest.java | 34 +- .../kafka/streams/tests/SmokeTestDriver.java | 2 +- .../kafka/streams/tests/StreamsUpgradeTest.java| 26 +- tests/kafkatest/tests/streams/streams_eos_test.py | 2 +- 24 files changed, 1058 insertions(+), 436 deletions(-) diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index 8927849..2f21309 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -197,6 +197,9 @@ + + diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index b5b5ce2..6b39acb 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -355,26 +355,29 @@ public final class ConsumerCoordinator extends AbstractCoordinator { Set addedPartitions = new HashSet<>(assignedPartitions); addedPartitions.removeAll(ownedPartitions); -// Invoke user's revocation callback before changing assignment or updating state if (protocol == RebalanceProtocol.COOPERATIVE) { Set revokedPartitions = new HashSet<>(ownedPartitions); revokedPartitions.removeAll(assignedPartitions); -log.info("Updating with newly assigned partitions: {}, compare with already owned partitions: {}, " + -"newly added partitions: {}, revoking partitions: {}", +log.info("Updating assignment with\n" + +"now assigned partitions: {}\n" + +"compare with previously owned partitions: {}\n" + +"newly added partitions: {}\n" + +"revoked partitions: {}\n", Utils.join(assignedPartitions, ", "), Utils.join(ownedPartitions, ", "), Utils.join(addedPartitions, ", "), -Utils.join(revokedPartitions, ", ")); - +Utils.join(revokedPartitions, ", ") +); if (!revokedPartitions.isEmpty()) { -// revoke partitions that was previously owned but no longer assigned; -// note that we should only change the assignment AFTER we've triggered -// the revoke callback +// revoke partitio
[kafka] branch trunk updated (c06e45a -> d88f104)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from c06e45a KAFKA-8985; Add flexible version support to inter-broker APIs (#7453) add d88f104 KAFKA-8179: Part 7, cooperative rebalancing in Streams (#7386) No new revisions were added by this update. Summary of changes: checkstyle/suppressions.xml| 3 + .../consumer/internals/ConsumerCoordinator.java| 20 +- .../consumer/internals/SubscriptionState.java | 15 +- .../clients/consumer/internals/FetcherTest.java| 31 +- .../processor/internals/AssignedStreamsTasks.java | 15 +- .../streams/processor/internals/StandbyTask.java | 19 - .../processor/internals/StoreChangelogReader.java | 16 +- .../streams/processor/internals/StreamTask.java| 8 +- .../streams/processor/internals/StreamThread.java | 27 +- .../internals/StreamsPartitionAssignor.java| 612 +++-- .../kafka/streams/processor/internals/Task.java| 8 +- .../streams/processor/internals/TaskManager.java | 34 +- .../assignment/AssignorConfiguration.java | 12 +- .../internals/assignment/ClientState.java | 56 +- .../internals/assignment/StickyTaskAssignor.java | 8 +- .../processor/internals/AbstractTaskTest.java | 6 - .../processor/internals/StandbyTaskTest.java | 23 - .../internals/StreamsPartitionAssignorTest.java| 481 ++-- .../processor/internals/TaskManagerTest.java | 28 +- .../internals/assignment/ClientStateTest.java | 8 +- .../assignment/StickyTaskAssignorTest.java | 34 +- .../kafka/streams/tests/SmokeTestDriver.java | 2 +- .../kafka/streams/tests/StreamsUpgradeTest.java| 26 +- tests/kafkatest/tests/streams/streams_eos_test.py | 2 +- 24 files changed, 1058 insertions(+), 436 deletions(-)
[kafka] branch trunk updated (3b05dc6 -> 52007e8)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from 3b05dc6 MINOR: just remove leader on trunk like we did on 2.3 (#7447) add 52007e8 KAFKA-8934: Introduce instance-level metrics for streams applications (#7416) No new revisions were added by this update. Summary of changes: .../org/apache/kafka/streams/KafkaStreams.java | 22 +- .../streams/internals/metrics/ClientMetrics.java | 116 + .../kstream/internals/KStreamAggregate.java| 2 +- .../kstream/internals/KStreamKStreamJoin.java | 2 +- .../internals/KStreamKTableJoinProcessor.java | 2 +- .../streams/kstream/internals/KStreamReduce.java | 2 +- .../internals/KStreamSessionWindowAggregate.java | 2 +- .../kstream/internals/KStreamWindowAggregate.java | 2 +- .../kstream/internals/KTableKTableInnerJoin.java | 2 +- .../kstream/internals/KTableKTableLeftJoin.java| 2 +- .../kstream/internals/KTableKTableOuterJoin.java | 2 +- .../kstream/internals/KTableKTableRightJoin.java | 2 +- .../streams/kstream/internals/KTableSource.java| 2 +- .../ForeignJoinSubscriptionProcessorSupplier.java | 3 +- .../SubscriptionStoreReceiveProcessorSupplier.java | 2 +- .../streams/kstream/internals/metrics/Sensors.java | 26 +- .../processor/internals/GlobalStateUpdateTask.java | 2 +- .../processor/internals/GlobalStreamThread.java| 20 +- .../streams/processor/internals/ProcessorNode.java | 48 +++- .../streams/processor/internals/RecordQueue.java | 3 +- .../streams/processor/internals/StandbyTask.java | 3 +- .../streams/processor/internals/StreamTask.java| 33 ++- .../streams/processor/internals/StreamThread.java | 64 ++--- .../internals/metrics/StreamsMetricsImpl.java | 273 +++-- .../processor/internals/metrics/ThreadMetrics.java | 170 +++-- .../AbstractRocksDBSegmentedBytesStore.java| 4 +- .../state/internals/InMemorySessionStore.java | 4 +- .../state/internals/InMemoryWindowStore.java | 4 +- .../streams/state/internals/KeyValueSegments.java | 2 +- .../state/internals/MeteredKeyValueStore.java | 119 - .../state/internals/MeteredSessionStore.java | 70 +- .../state/internals/MeteredWindowStore.java| 58 - .../kafka/streams/state/internals/NamedCache.java | 9 +- .../streams/state/internals/RocksDBStore.java | 2 +- .../state/internals/TimestampedSegments.java | 2 +- .../state/internals/metrics/NamedCacheMetrics.java | 8 +- .../state/internals/metrics/RocksDBMetrics.java| 159 +--- .../internals/metrics/RocksDBMetricsRecorder.java | 9 +- .../streams/state/internals/metrics/Sensors.java | 18 +- .../org/apache/kafka/streams/KafkaStreamsTest.java | 36 ++- .../integration/MetricsIntegrationTest.java| 57 - .../internals/metrics/ClientMetricsTest.java | 137 +++ ...KStreamSessionWindowAggregateProcessorTest.java | 11 +- .../internals/KStreamWindowAggregateTest.java | 9 +- .../KTableSuppressProcessorMetricsTest.java| 65 ++--- .../internals/GlobalStreamThreadTest.java | 11 +- .../processor/internals/ProcessorNodeTest.java | 7 +- .../processor/internals/StandbyTaskTest.java | 3 +- .../processor/internals/StreamTaskTest.java| 20 +- .../processor/internals/StreamThreadTest.java | 190 +++--- .../internals/metrics/StreamsMetricsImplTest.java | 237 +++--- .../internals/metrics/ThreadMetricsTest.java | 49 ++-- .../AbstractRocksDBSegmentedBytesStoreTest.java| 5 +- .../state/internals/KeyValueSegmentTest.java | 3 +- .../state/internals/MeteredKeyValueStoreTest.java | 6 +- .../state/internals/MeteredSessionStoreTest.java | 7 +- .../MeteredTimestampedKeyValueStoreTest.java | 7 +- .../state/internals/MeteredWindowStoreTest.java| 5 +- .../state/internals/SegmentIteratorTest.java | 3 +- .../state/internals/SessionBytesStoreTest.java | 5 +- .../state/internals/TimestampedSegmentTest.java| 3 +- .../state/internals/WindowBytesStoreTest.java | 5 +- .../internals/metrics/NamedCacheMetricsTest.java | 16 +- .../metrics/RocksDBMetricsRecorderTest.java| 5 +- .../internals/metrics/RocksDBMetricsTest.java | 21 +- .../apache/kafka/streams/TopologyTestDriver.java | 11 +- .../streams/processor/MockProcessorContext.java| 6 +- 67 files changed, 1625 insertions(+), 590 deletions(-) create mode 100644 streams/src/main/java/org/apache/kafka/streams/internals/metrics/ClientMetrics.java create mode 100644 streams/src/test/java/org/apache/kafka/streams/internals/metrics/ClientMetricsTest.java
[kafka] branch trunk updated (586c587 -> 3b05dc6)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from 586c587 KAFKA-8974: Trim whitespaces in topic names in sink connector configs (#7442) add 3b05dc6 MINOR: just remove leader on trunk like we did on 2.3 (#7447) No new revisions were added by this update. Summary of changes: .../tests/streams/streams_upgrade_test.py | 44 ++ 1 file changed, 3 insertions(+), 41 deletions(-)
[kafka] branch 2.1 updated: KAFKA-8649: send latest commonly supported version in assignment (#7427)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.1 by this push: new 8784899 KAFKA-8649: send latest commonly supported version in assignment (#7427) 8784899 is described below commit 8784899168cf3b79980384c72c7273b6679f9a22 Author: A. Sophie Blee-Goldman AuthorDate: Fri Oct 4 14:17:47 2019 -0700 KAFKA-8649: send latest commonly supported version in assignment (#7427) PR 7423 but targeted at 2.1. Reviewers: Guozhang Wang --- .gitignore | 2 + .../streams/processor/internals/StreamThread.java | 7 +- .../internals/StreamsPartitionAssignor.java| 275 + .../streams/processor/internals/TaskManager.java | 11 +- .../internals/assignment/AssignmentInfo.java | 70 +++--- .../internals/assignment/ClientState.java | 37 ++- .../internals/StreamsPartitionAssignorTest.java| 4 +- .../processor/internals/TaskManagerTest.java | 1 + .../internals/assignment/ClientStateTest.java | 4 +- .../assignment/StickyTaskAssignorTest.java | 34 +-- .../kafka/streams/tests/StreamsUpgradeTest.java| 6 + .../tests/streams/streams_upgrade_test.py | 121 - 12 files changed, 323 insertions(+), 249 deletions(-) diff --git a/.gitignore b/.gitignore index fe191ee..0c6854d 100644 --- a/.gitignore +++ b/.gitignore @@ -52,3 +52,5 @@ docs/generated/ kafkatest.egg-info/ systest/ *.swp +clients/src/generated +clients/src/generated-test \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 6c99797..8f976d5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -280,9 +280,10 @@ public class StreamThread extends Thread { ); } else if (streamThread.assignmentErrorCode.get() != StreamsPartitionAssignor.Error.NONE.code()) { log.debug( -"Encountered assignment error during partition assignment: {}. Skipping task initialization", -streamThread.assignmentErrorCode -); +"Encountered assignment error during partition assignment: {}. Skipping task initialization and " ++ "pausing any partitions we may have been assigned.", +streamThread.assignmentErrorCode); +taskManager.pausePartitions(); } else { log.debug("Creating tasks based on assignment."); taskManager.createTasks(assignment); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java index 493f56b..f99866e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java @@ -159,8 +159,8 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable void addConsumer(final String consumerMemberId, final SubscriptionInfo info) { consumers.add(consumerMemberId); -state.addPreviousActiveTasks(info.prevTasks()); -state.addPreviousStandbyTasks(info.standbyTasks()); +state.addPreviousActiveTasks(consumerMemberId, info.prevTasks()); +state.addPreviousStandbyTasks(consumerMemberId, info.standbyTasks()); state.incrementCapacity(); } @@ -396,6 +396,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable final Set futureConsumers = new HashSet<>(); int minReceivedMetadataVersion = SubscriptionInfo.LATEST_SUPPORTED_VERSION; +int minSupportedMetadataVersion = SubscriptionInfo.LATEST_SUPPORTED_VERSION; supportedVersions.clear(); int futureMetadataVersion = UNKNOWN; @@ -415,6 +416,11 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable minReceivedMetadataVersion = usedVersion; } +final int latestSupportedVersion = info.latestSupportedVersion(); +if (latestSupportedVersion < minSupportedMetadataVersion) { +minSupportedMetadataVersion = latestSupportedVersion; +} + // create the new client
[kafka] branch trunk updated (791d0d6 -> 11ab6e7)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from 791d0d6 KAFKA-8804: Secure internal Connect REST endpoints (#7310) add 11ab6e7 HOTFIX: remove unsued StreamsConfig from StreamsPartitionAssignor No new revisions were added by this update. Summary of changes: .../kafka/streams/processor/internals/StreamsPartitionAssignor.java | 1 - 1 file changed, 1 deletion(-)
[kafka] branch 2.2 updated: KAFKA-8649: send latest commonly supported version in assignment (#7426)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch 2.2 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.2 by this push: new d3bf3cd KAFKA-8649: send latest commonly supported version in assignment (#7426) d3bf3cd is described below commit d3bf3cd3d16945c17d45cf3b2bc4ef0d699cf418 Author: A. Sophie Blee-Goldman AuthorDate: Wed Oct 2 16:01:54 2019 -0700 KAFKA-8649: send latest commonly supported version in assignment (#7426) PR 7423 but targeted at 2.2. Reviewers: Guozhang Wang --- .gitignore | 1 + .../streams/processor/internals/StreamThread.java | 7 +- .../internals/StreamsPartitionAssignor.java| 275 + .../streams/processor/internals/TaskManager.java | 11 +- .../internals/assignment/AssignmentInfo.java | 70 +++--- .../internals/assignment/ClientState.java | 36 ++- .../internals/StreamsPartitionAssignorTest.java| 4 +- .../processor/internals/TaskManagerTest.java | 1 + .../internals/assignment/ClientStateTest.java | 4 +- .../assignment/StickyTaskAssignorTest.java | 34 +-- .../kafka/streams/tests/StreamsUpgradeTest.java| 6 + .../tests/streams/streams_upgrade_test.py | 28 +-- 12 files changed, 285 insertions(+), 192 deletions(-) diff --git a/.gitignore b/.gitignore index a31643f..0c6854d 100644 --- a/.gitignore +++ b/.gitignore @@ -53,3 +53,4 @@ kafkatest.egg-info/ systest/ *.swp clients/src/generated +clients/src/generated-test \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 3083ec7..4c995e9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -286,9 +286,10 @@ public class StreamThread extends Thread { ); } else if (streamThread.assignmentErrorCode.get() != StreamsPartitionAssignor.Error.NONE.code()) { log.debug( -"Encountered assignment error during partition assignment: {}. Skipping task initialization", -streamThread.assignmentErrorCode -); +"Encountered assignment error during partition assignment: {}. Skipping task initialization and " ++ "pausing any partitions we may have been assigned.", +streamThread.assignmentErrorCode); +taskManager.pausePartitions(); } else { log.debug("Creating tasks based on assignment."); taskManager.createTasks(assignment); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java index 89e95b6..3dd0836 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java @@ -159,8 +159,8 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable void addConsumer(final String consumerMemberId, final SubscriptionInfo info) { consumers.add(consumerMemberId); -state.addPreviousActiveTasks(info.prevTasks()); -state.addPreviousStandbyTasks(info.standbyTasks()); +state.addPreviousActiveTasks(consumerMemberId, info.prevTasks()); +state.addPreviousStandbyTasks(consumerMemberId, info.standbyTasks()); state.incrementCapacity(); } @@ -397,6 +397,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable final Set futureConsumers = new HashSet<>(); int minReceivedMetadataVersion = SubscriptionInfo.LATEST_SUPPORTED_VERSION; +int minSupportedMetadataVersion = SubscriptionInfo.LATEST_SUPPORTED_VERSION; supportedVersions.clear(); int futureMetadataVersion = UNKNOWN; @@ -416,6 +417,11 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable minReceivedMetadataVersion = usedVersion; } +final int latestSupportedVersion = info.latestSupportedVersion(); +if (latestSupportedVersion < minSupportedMetadataVersion) { +minSupportedMetadataVersion = latestSupportedVersion; +} + // create the new client metadata if nec
[kafka] branch trunk updated (8da6993 -> c7efc36)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from 8da6993 KAFKA-8649: Send latest commonly supported version in assignment (#7423) add c7efc36 HOTFIX: don't throw if upgrading from very old versions (#7436) No new revisions were added by this update. Summary of changes: .../internals/StreamsPartitionAssignor.java| 69 +++--- 1 file changed, 35 insertions(+), 34 deletions(-)
[kafka] branch trunk updated (dc7dd5f -> 8da6993)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from dc7dd5f MINOR: Do not falsely log that partitions are being reassigned on controller startup (#7431) add 8da6993 KAFKA-8649: Send latest commonly supported version in assignment (#7423) No new revisions were added by this update. Summary of changes: .../internals/StreamsPartitionAssignor.java| 216 + .../internals/StreamsRebalanceListener.java| 4 +- .../streams/processor/internals/TaskManager.java | 8 +- .../internals/assignment/AssignmentInfo.java | 60 +++--- .../internals/assignment/ClientState.java | 36 +++- .../internals/StreamsPartitionAssignorTest.java| 2 +- .../processor/internals/TaskManagerTest.java | 1 + .../internals/assignment/ClientStateTest.java | 4 +- .../assignment/StickyTaskAssignorTest.java | 34 ++-- .../kafka/streams/tests/StreamsUpgradeTest.java| 7 + .../tests/streams/streams_upgrade_test.py | 27 +-- 11 files changed, 241 insertions(+), 158 deletions(-)
[kafka] branch trunk updated (9fbb0de -> f6f24c4)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from 9fbb0de KAFKA-8927: Deprecate PartitionGrouper interface (#7376) add f6f24c4 KAFKA-8729, pt 2: Add error_records and error_message to PartitionResponse (#7150) No new revisions were added by this update. Summary of changes: .../kafka/clients/consumer/internals/Fetcher.java | 6 +- .../{record => }/InvalidRecordException.java | 6 +- .../org/apache/kafka/common/protocol/Errors.java | 4 +- .../common/record/AbstractLegacyRecordBatch.java | 1 + .../kafka/common/record/ControlRecordType.java | 1 + .../apache/kafka/common/record/DefaultRecord.java | 1 + .../kafka/common/record/DefaultRecordBatch.java| 6 +- .../kafka/common/record/EndTransactionMarker.java | 1 + .../apache/kafka/common/record/LegacyRecord.java | 5 +- .../kafka/common/requests/ProduceRequest.java | 12 ++- .../kafka/common/requests/ProduceResponse.java | 106 + .../resources/common/message/ProduceRequest.json | 4 +- .../resources/common/message/ProduceResponse.json | 16 +++- .../apache/kafka/common/message/MessageTest.java | 72 ++ .../record/AbstractLegacyRecordBatchTest.java | 1 + .../common/record/DefaultRecordBatchTest.java | 6 +- .../kafka/common/record/DefaultRecordTest.java | 1 + .../common/record/EndTransactionMarkerTest.java| 1 + .../kafka/common/record/LegacyRecordTest.java | 3 +- .../common/record/SimpleLegacyRecordTest.java | 6 +- .../kafka/common/requests/ProduceRequestTest.java | 2 +- .../kafka/common/requests/RequestResponseTest.java | 8 ++ .../apache/kafka/connect/util/ConnectUtils.java| 2 +- .../connect/runtime/WorkerSourceTaskTest.java | 2 +- core/src/main/scala/kafka/log/Log.scala| 4 +- core/src/main/scala/kafka/log/LogSegment.scala | 7 +- core/src/main/scala/kafka/log/LogValidator.scala | 7 +- .../scala/kafka/server/AbstractFetcherThread.scala | 4 +- core/src/test/scala/unit/kafka/log/LogTest.scala | 2 +- .../scala/unit/kafka/log/LogValidatorTest.scala| 1 + .../unit/kafka/server/ProduceRequestTest.scala | 1 + 31 files changed, 248 insertions(+), 51 deletions(-) rename clients/src/main/java/org/apache/kafka/common/{record => }/InvalidRecordException.java (85%)
[kafka] branch trunk updated (ef2fbfd -> 9fbb0de)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from ef2fbfd KAFKA-8609: Add rebalance-latency-total (#7401) add 9fbb0de KAFKA-8927: Deprecate PartitionGrouper interface (#7376) No new revisions were added by this update. Summary of changes: .../org/apache/kafka/streams/StreamsConfig.java| 22 +++--- .../streams/processor/DefaultPartitionGrouper.java | 3 +++ .../kafka/streams/processor/PartitionGrouper.java | 3 +++ .../internals/StreamsPartitionAssignor.java| 6 +++--- .../assignment/AssignorConfiguration.java | 10 ++ .../apache/kafka/streams/StreamsConfigTest.java| 22 -- .../processor/DefaultPartitionGrouperTest.java | 1 + .../internals/SingleGroupPartitionGrouperStub.java | 11 +-- .../internals/StreamsPartitionAssignorTest.java| 1 + 9 files changed, 57 insertions(+), 22 deletions(-)
[kafka] branch trunk updated (d53eab1 -> ef2fbfd)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from d53eab1 MINOR: Adjust logic of conditions to set number of partitions in step zero of assignment. (#7419) add ef2fbfd KAFKA-8609: Add rebalance-latency-total (#7401) No new revisions were added by this update. Summary of changes: .../apache/kafka/clients/consumer/internals/AbstractCoordinator.java | 5 + .../kafka/clients/consumer/internals/AbstractCoordinatorTest.java| 2 ++ 2 files changed, 7 insertions(+)
[kafka] branch trunk updated (45c800f -> d53eab1)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from 45c800f KAFKA-8911: Using proper WindowSerdes constructors in their implicit definitions (#7352) add d53eab1 MINOR: Adjust logic of conditions to set number of partitions in step zero of assignment. (#7419) No new revisions were added by this update. Summary of changes: .../streams/processor/internals/StreamsPartitionAssignor.java| 9 + 1 file changed, 5 insertions(+), 4 deletions(-)
[kafka] branch trunk updated (22434e6 -> 863210d)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from 22434e6 KAFKA-8319: Make KafkaStreamsTest a non-integration test class (#7382) add 863210d KAFKA-8934: Create version file during build for Streams (#7397) No new revisions were added by this update. Summary of changes: build.gradle | 40 1 file changed, 40 insertions(+)
[kafka] branch trunk updated (30443af -> 22434e6)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from 30443af KAFKA-6883: Add toUpperCase support to sasl.kerberos.principal.to.local rule (KIP-309) add 22434e6 KAFKA-8319: Make KafkaStreamsTest a non-integration test class (#7382) No new revisions were added by this update. Summary of changes: .../kafka/clients/consumer/MockConsumer.java | 1 - .../org/apache/kafka/streams/KafkaStreams.java | 3 +- .../org/apache/kafka/streams/KafkaStreamsTest.java | 878 ++--- .../apache/kafka/streams/StreamsConfigTest.java| 26 + .../processor/internals/StreamThreadTest.java | 20 +- .../org/apache/kafka/test/MockClientSupplier.java | 2 +- 6 files changed, 461 insertions(+), 469 deletions(-)
[kafka] branch trunk updated (74f8ae1 -> d112ffd)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from 74f8ae1 KAFKA-8179: do not suspend standby tasks during rebalance (#7321) add d112ffd KAFKA-8880: Docs on upgrade-guide (#7385) No new revisions were added by this update. Summary of changes: docs/upgrade.html | 5 + 1 file changed, 5 insertions(+)
[kafka] branch trunk updated (ad3b843 -> 74f8ae1)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from ad3b843 KAFKA-8580: Compute RocksDB metrics (#7263) add 74f8ae1 KAFKA-8179: do not suspend standby tasks during rebalance (#7321) No new revisions were added by this update. Summary of changes: .../consumer/ConsumerRebalanceListener.java| 16 +- .../consumer/internals/ConsumerCoordinator.java| 74 +++-- .../org/apache/kafka/streams/KafkaStreams.java | 15 +- .../org/apache/kafka/streams/StreamsConfig.java| 43 ++- .../processor/internals/AssignedStandbyTasks.java | 42 +++ .../processor/internals/AssignedStreamsTasks.java | 293 ++-- .../streams/processor/internals/AssignedTasks.java | 137 +- .../processor/internals/ChangelogReader.java | 15 +- .../processor/internals/StoreChangelogReader.java | 24 +- .../streams/processor/internals/StreamThread.java | 214 +-- .../internals/StreamsPartitionAssignor.java| 39 ++- .../internals/StreamsRebalanceListener.java| 169 .../streams/processor/internals/TaskManager.java | 297 ++--- .../assignment/AssignorConfiguration.java | 24 ++ .../integration/RegexSourceIntegrationTest.java| 4 +- .../internals/AssignedStreamsTasksTest.java| 36 ++- .../processor/internals/MockChangelogReader.java | 10 +- .../processor/internals/StreamThreadTest.java | 66 +++-- .../internals/StreamsPartitionAssignorTest.java| 2 +- .../processor/internals/TaskManagerTest.java | 114 .../kafka/streams/tests/StreamsUpgradeTest.java| 7 +- 21 files changed, 1085 insertions(+), 556 deletions(-) create mode 100644 streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java
[kafka] branch trunk updated (bcc0237 -> ad3b843)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from bcc0237 KAFKA-8880: Add overloaded function of Consumer.committed (#7304) add ad3b843 KAFKA-8580: Compute RocksDB metrics (#7263) No new revisions were added by this update. Summary of changes: .../org/apache/kafka/streams/KafkaStreams.java | 72 +++-- .../processor/internals/GlobalStreamThread.java| 7 +- .../streams/processor/internals/StreamThread.java | 5 + .../internals/metrics/StreamsMetricsImpl.java | 30 +- .../streams/state/internals/KeyValueSegments.java | 8 +- .../streams/state/internals/RocksDBStore.java | 34 +-- .../state/internals/TimestampedSegments.java | 6 - .../state/internals/metrics/RocksDBMetrics.java| 3 +- .../internals/metrics/RocksDBMetricsRecorder.java | 182 .../metrics/RocksDBMetricsRecordingTrigger.java| 56 .../integration/MetricsIntegrationTest.java| 8 +- .../internals/GlobalStreamThreadTest.java | 42 +-- .../processor/internals/StreamThreadTest.java | 3 +- .../streams/state/internals/RocksDBStoreTest.java | 142 +++--- .../state/internals/SegmentIteratorTest.java | 5 +- .../metrics/RocksDBMetricsRecorderTest.java| 315 + .../RocksDBMetricsRecordingTriggerTest.java| 87 ++ .../internals/metrics/RocksDBMetricsTest.java | 22 +- .../kafka/test/InternalMockProcessorContext.java | 2 + .../apache/kafka/streams/TopologyTestDriver.java | 2 + 20 files changed, 782 insertions(+), 249 deletions(-) create mode 100644 streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecordingTrigger.java create mode 100644 streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecordingTriggerTest.java
[kafka] branch trunk updated (1ae0956 -> bcc0237)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from 1ae0956 HOTFIX: fix Kafka Streams upgrade note for broker backward compatibility (#7363) add bcc0237 KAFKA-8880: Add overloaded function of Consumer.committed (#7304) No new revisions were added by this update. Summary of changes: .../apache/kafka/clients/consumer/Consumer.java| 12 .../kafka/clients/consumer/KafkaConsumer.java | 80 -- .../kafka/clients/consumer/MockConsumer.java | 28 ++-- .../kafka/clients/consumer/KafkaConsumerTest.java | 20 +++--- .../kafka/clients/consumer/MockConsumerTest.java | 10 +-- .../integration/kafka/api/ConsumerBounceTest.scala | 6 +- .../kafka/api/PlaintextConsumerTest.scala | 46 ++--- .../integration/kafka/api/TransactionsTest.scala | 2 +- .../kafka/admin/ConsumerGroupCommandTest.scala | 14 ++-- .../test/scala/unit/kafka/utils/TestUtils.scala| 9 +-- .../streams/processor/internals/AbstractTask.java | 20 -- .../processor/internals/ProcessorStateManager.java | 9 ++- .../streams/processor/internals/StandbyTask.java | 34 + .../streams/processor/internals/StreamTask.java| 50 +++--- .../processor/internals/StandbyTaskTest.java | 8 +-- .../processor/internals/StreamTaskTest.java| 4 +- .../kafka/tools/TransactionalMessageCopier.java| 10 +-- 17 files changed, 235 insertions(+), 127 deletions(-)
[kafka] branch trunk updated (e98e239 -> b403561)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from e98e239 KAFKA-8859: Refactor cache-level metrics (#7367) add b403561 KAFKA-8086: Use 1 partition for offset topic when possible (#7356) No new revisions were added by this update. Summary of changes: .../test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala | 1 + .../scala/integration/kafka/network/DynamicConnectionQuotaTest.scala | 4 core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala | 1 + 3 files changed, 2 insertions(+), 4 deletions(-)
[kafka] branch trunk updated (8001aff -> d91a94e)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from 8001aff KAFKA-8892: Display the sorted configs in Kafka Configs Help Command. add d91a94e KAFKA-8609: Add consumer rebalance metrics (#7347) No new revisions were added by this update. Summary of changes: .../consumer/internals/AbstractCoordinator.java| 136 - .../consumer/internals/ConsumerCoordinator.java| 45 ++- .../clients/consumer/internals/Heartbeat.java | 2 +- .../apache/kafka/common/metrics/JmxReporter.java | 1 + .../internals/AbstractCoordinatorTest.java | 93 +- .../internals/ConsumerCoordinatorTest.java | 54 6 files changed, 287 insertions(+), 44 deletions(-)
[kafka] branch trunk updated (8aff871 -> a047072)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from 8aff871 MINOR: Fix bug where we would incorrectly load partition reassignment info from ZK (#7334) add 2d0cd2e MINOR: Murmur3 Hash with Guava dependency add a047072 MINOR: Move Murmur3 to Streams No new revisions were added by this update. Summary of changes: checkstyle/suppressions.xml| 7 + gradle/spotbugs-exclude.xml| 19 + .../kafka/streams/state/internals/Murmur3.java | 548 + .../kafka/streams/state/internals/Murmur3Test.java | 70 +++ 4 files changed, 644 insertions(+) create mode 100644 streams/src/main/java/org/apache/kafka/streams/state/internals/Murmur3.java create mode 100644 streams/src/test/java/org/apache/kafka/streams/state/internals/Murmur3Test.java
[kafka-site] branch asf-site updated (52173cb -> 43379b5)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch asf-site in repository https://gitbox.apache.org/repos/asf/kafka-site.git. from 52173cb KAFKA-8474; Update CSS for new config layout (#211) add 43379b5 MINOR: Update the meetup list (#230) No new revisions were added by this update. Summary of changes: events.html | 294 ++-- 1 file changed, 229 insertions(+), 65 deletions(-)
[kafka] branch trunk updated (935b280 -> 4962c81)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from 935b280 MINOR: Default to 5 partitions of the __consumer_offsets topic in Streams integration tests (#7331) add 4962c81 KAFKA-8839 : Improve streams debug logging (#7258) No new revisions were added by this update. Summary of changes: docs/streams/developer-guide/config-streams.html| 4 +++- streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java | 2 +- .../kafka/streams/processor/internals/AssignedStreamsTasks.java | 2 +- .../org/apache/kafka/streams/processor/internals/AssignedTasks.java | 5 +++-- .../kafka/streams/processor/internals/InternalTopicManager.java | 6 +++--- .../org/apache/kafka/streams/processor/internals/StreamTask.java| 3 +-- .../org/apache/kafka/streams/processor/internals/StreamThread.java | 4 ++-- .../kafka/streams/processor/internals/StreamsPartitionAssignor.java | 5 - .../org/apache/kafka/streams/processor/internals/TaskManager.java | 2 +- 9 files changed, 19 insertions(+), 14 deletions(-)
[kafka] branch trunk updated (bab3e08 -> 935b280)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from bab3e08 KAFKA-8859: Expose built-in streams metrics version in `StreamsMetricsImpl` (#7323) add 935b280 MINOR: Default to 5 partitions of the __consumer_offsets topic in Streams integration tests (#7331) No new revisions were added by this update. Summary of changes: .../org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java | 1 + 1 file changed, 1 insertion(+)
[kafka] branch trunk updated (c5dfb90 -> bab3e08)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from c5dfb90 MINOR: Cleanup scala warnings (#7335) add bab3e08 KAFKA-8859: Expose built-in streams metrics version in `StreamsMetricsImpl` (#7323) No new revisions were added by this update. Summary of changes: .../processor/internals/GlobalStreamThread.java| 6 +++- .../streams/processor/internals/StreamThread.java | 6 +++- .../internals/metrics/StreamsMetricsImpl.java | 24 - .../processor/internals/MockStreamsMetrics.java| 3 +- .../processor/internals/StandbyTaskTest.java | 3 +- .../processor/internals/StreamThreadTest.java | 26 -- .../internals/metrics/StreamsMetricsImplTest.java | 42 ++ .../internals/GlobalStateStoreProviderTest.java| 5 ++- .../MeteredTimestampedWindowStoreTest.java | 3 +- .../state/internals/MeteredWindowStoreTest.java| 3 +- .../kafka/test/InternalMockProcessorContext.java | 26 +++--- .../apache/kafka/streams/TopologyTestDriver.java | 6 +++- .../streams/processor/MockProcessorContext.java| 2 +- 13 files changed, 116 insertions(+), 39 deletions(-)
[kafka] branch trunk updated (6530600 -> 83c7c01)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from 6530600 MINOR: Add UUID type to Kafka API code generation (#7291) add 83c7c01 KAFKA-8755: Fix state restore for standby tasks with optimized topology (#7238) No new revisions were added by this update. Summary of changes: .../streams/processor/internals/AbstractTask.java | 37 +-- .../processor/internals/AssignedStandbyTasks.java | 10 + .../streams/processor/internals/StandbyTask.java | 67 - .../processor/internals/StoreChangelogReader.java | 60 ++-- .../streams/processor/internals/StreamTask.java| 42 ++- .../OptimizedKTableIntegrationTest.java| 335 + .../processor/internals/AbstractTaskTest.java | 69 + .../processor/internals/StandbyTaskTest.java | 90 ++ .../internals/StoreChangelogReaderTest.java| 58 .../processor/internals/StreamTaskTest.java| 63 +++- .../StreamThreadStateStoreProviderTest.java| 5 +- 11 files changed, 698 insertions(+), 138 deletions(-) create mode 100644 streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java
[kafka] branch trunk updated (d3559f6 -> 23708b7)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from d3559f6 KAFKA-8875; CreateTopic API should check topic existence before replication factor (#7298) add 23708b7 KAFKA-8355: add static membership to range assignor (#7014) No new revisions were added by this update. Summary of changes: .../kafka/clients/consumer/RangeAssignor.java | 45 +++- .../kafka/clients/consumer/RoundRobinAssignor.java | 3 +- .../kafka/clients/consumer/RangeAssignorTest.java | 252 - .../clients/consumer/RoundRobinAssignorTest.java | 119 -- 4 files changed, 283 insertions(+), 136 deletions(-)
[kafka] branch trunk updated (6882b3b -> 6a3a580)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from 6882b3b KAFKA-8886; Make Authorizer create/delete API asynchronous (#7316) add 6a3a580 KAFKA-8856: Add Streams config for backward-compatible metrics (#7279) No new revisions were added by this update. Summary of changes: .../org/apache/kafka/streams/StreamsConfig.java| 23 ++ .../apache/kafka/streams/StreamsConfigTest.java| 36 +- 2 files changed, 58 insertions(+), 1 deletion(-)
[kafka] branch trunk updated (7012fa3 -> a043edb)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from 7012fa3 KAFKA-8747; Add atomic counter to fix flaky testEventQueueTime test (#7320) add a043edb KAFKA-8817: Remove timeout for the whole test (#7313) No new revisions were added by this update. Summary of changes: .../org/apache/kafka/clients/producer/KafkaProducerTest.java | 12 ++-- 1 file changed, 6 insertions(+), 6 deletions(-)
[kafka] branch trunk updated (e59e4ca -> d54285f)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from e59e4ca KAFKA-8222 & KIP-345 part 5: admin request to batch remove members (#7122) add d54285f KAFKA-8889: Log the details about error (#7317) No new revisions were added by this update. Summary of changes: clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-)
[kafka] branch 2.0 updated (e6eda88 -> 9b35df9)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch 2.0 in repository https://gitbox.apache.org/repos/asf/kafka.git. from e6eda88 KAFKA-8861 Fix flaky RegexSourceIntegrationTest.testMultipleConsumersCanReadFromPartitionedTopic (#7281) add 9b35df9 KAFKA-8816: Make offsets immutable to users of RecordCollector.offsets (#7223) No new revisions were added by this update. Summary of changes: .../processor/internals/RecordCollector.java | 2 +- .../processor/internals/RecordCollectorImpl.java | 3 ++- .../streams/processor/internals/StreamTask.java| 3 ++- .../processor/internals/RecordCollectorTest.java | 28 ++ 4 files changed, 33 insertions(+), 3 deletions(-)
[kafka] branch 2.2 updated (619729c -> ad14d92)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch 2.2 in repository https://gitbox.apache.org/repos/asf/kafka.git. from 619729c HOTFIX: The cherry-pick for https://github.com/apache/kafka/pull/7207 to 2.2 causes a failure in AssignedStreamsTasksTest add ad14d92 KAFKA-8816: Make offsets immutable to users of RecordCollector.offsets (#7223) No new revisions were added by this update. Summary of changes: .../processor/internals/RecordCollector.java | 2 +- .../processor/internals/RecordCollectorImpl.java | 3 ++- .../streams/processor/internals/StreamTask.java| 3 ++- .../processor/internals/RecordCollectorTest.java | 28 ++ 4 files changed, 33 insertions(+), 3 deletions(-)
[kafka] branch 2.3 updated: KAFKA-8816: Make offsets immutable to users of RecordCollector.offsets (#7223)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch 2.3 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.3 by this push: new 29514bd KAFKA-8816: Make offsets immutable to users of RecordCollector.offsets (#7223) 29514bd is described below commit 29514bd45c8c2eadfb1bee112917ec0bbc68f884 Author: cpettitt-confluent <53191309+cpettitt-conflu...@users.noreply.github.com> AuthorDate: Mon Aug 26 13:59:49 2019 -0700 KAFKA-8816: Make offsets immutable to users of RecordCollector.offsets (#7223) Make offsets immutable to users of RecordCollector.offsets. Fix up an existing case where offsets could be modified in this way. Add a simple test to verify offsets cannot be changed externally. Reviewers: Bruno Cadonna , Guozhang Wang , Matthias J. Sax --- .../processor/internals/RecordCollector.java | 2 +- .../processor/internals/RecordCollectorImpl.java | 3 ++- .../streams/processor/internals/StreamTask.java| 3 ++- .../processor/internals/RecordCollectorTest.java | 28 ++ 4 files changed, 33 insertions(+), 3 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java index bbfb049..b8b99a3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java @@ -63,7 +63,7 @@ public interface RecordCollector extends AutoCloseable { /** * The last acked offsets from the internal {@link Producer}. * - * @return the map from TopicPartition to offset + * @return an immutable map from TopicPartition to offset */ Map offsets(); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java index 2e9ead8..45dda41 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.processor.internals; +import java.util.Collections; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; @@ -281,7 +282,7 @@ public class RecordCollectorImpl implements RecordCollector { @Override public Map offsets() { -return offsets; +return Collections.unmodifiableMap(offsets); } // for testing only diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 3ab2524..067b36c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -504,7 +504,8 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator @Override protected Map activeTaskCheckpointableOffsets() { -final Map checkpointableOffsets = recordCollector.offsets(); +final Map checkpointableOffsets = +new HashMap<>(recordCollector.offsets()); for (final Map.Entry entry : consumedOffsets.entrySet()) { checkpointableOffsets.putIfAbsent(entry.getKey(), entry.getValue()); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java index a7da2cb..22fb2ec 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java @@ -50,7 +50,10 @@ import java.util.List; import java.util.Map; import java.util.concurrent.Future; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -145,6 +148,31 @@ public class RecordCollectorTest { assertEquals((Long) 0L, offsets.get(new TopicPartition("topic1", 2))); } +@Test +public void shouldNotAllowOffsetsToBeUpdatedExternally() { +final String topic = "topic1"; +final TopicPartition topicPartition = new TopicPartition(topic, 0); + +final Rec
[kafka] branch 2.1 updated: KAFKA-8816: Make offsets immutable to users of RecordCollector.offsets (#7223)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.1 by this push: new 3242843 KAFKA-8816: Make offsets immutable to users of RecordCollector.offsets (#7223) 3242843 is described below commit 3242843021933ea9a51e0633e413bc24f6c9165c Author: cpettitt-confluent <53191309+cpettitt-conflu...@users.noreply.github.com> AuthorDate: Mon Aug 26 13:59:49 2019 -0700 KAFKA-8816: Make offsets immutable to users of RecordCollector.offsets (#7223) Make offsets immutable to users of RecordCollector.offsets. Fix up an existing case where offsets could be modified in this way. Add a simple test to verify offsets cannot be changed externally. Reviewers: Bruno Cadonna , Guozhang Wang , Matthias J. Sax --- .../processor/internals/RecordCollector.java | 2 +- .../processor/internals/RecordCollectorImpl.java | 3 ++- .../streams/processor/internals/StreamTask.java| 3 ++- .../processor/internals/RecordCollectorTest.java | 28 ++ 4 files changed, 33 insertions(+), 3 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java index 09de11d..d31d596 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java @@ -63,7 +63,7 @@ public interface RecordCollector { /** * The last acked offsets from the internal {@link Producer}. * - * @return the map from TopicPartition to offset + * @return an immutable map from TopicPartition to offset */ Map offsets(); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java index 9f9e100..5294c4d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.processor.internals; +import java.util.Collections; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; @@ -267,7 +268,7 @@ public class RecordCollectorImpl implements RecordCollector { @Override public Map offsets() { -return offsets; +return Collections.unmodifiableMap(offsets); } // for testing only diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 5ee00e8..4c12757 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -479,7 +479,8 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator @Override protected Map activeTaskCheckpointableOffsets() { -final Map checkpointableOffsets = recordCollector.offsets(); +final Map checkpointableOffsets = +new HashMap<>(recordCollector.offsets()); for (final Map.Entry entry : consumedOffsets.entrySet()) { checkpointableOffsets.putIfAbsent(entry.getKey(), entry.getValue()); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java index 22163ce..ea58333 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java @@ -50,7 +50,10 @@ import java.util.List; import java.util.Map; import java.util.concurrent.Future; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -145,6 +148,31 @@ public class RecordCollectorTest { assertEquals((Long) 0L, offsets.get(new TopicPartition("topic1", 2))); } +@Test +public void shouldNotAllowOffsetsToBeUpdatedExternally() { +final String topic = "topic1"; +final TopicPartition topicPartition = new TopicPartition(topic, 0); + +final RecordCollectorImpl col
[kafka] branch trunk updated (312e4db -> e59e4ca)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from 312e4db MINOR. implement --expose-ports option in ducker-ak (#7269) add e59e4ca KAFKA-8222 & KIP-345 part 5: admin request to batch remove members (#7122) No new revisions were added by this update. Summary of changes: .../java/org/apache/kafka/clients/admin/Admin.java | 12 ++ .../kafka/clients/admin/KafkaAdminClient.java | 67 - ...AclsResult.java => MembershipChangeResult.java} | 25 ++-- .../RemoveMemberFromConsumerGroupOptions.java | 50 +++ .../clients/admin/RemoveMemberFromGroupResult.java | 85 .../kafka/common/requests/LeaveGroupResponse.java | 4 + .../common/message/LeaveGroupResponse.json | 2 +- .../kafka/clients/admin/KafkaAdminClientTest.java | 133 -- .../clients/admin/MembershipChangeResultTest.java | 50 +++ .../kafka/clients/admin/MockAdminClient.java | 5 + .../RemoveMemberFromConsumerGroupOptionsTest.java | 21 ++- .../admin/RemoveMemberFromGroupResultTest.java | 154 + .../kafka/api/AdminClientIntegrationTest.scala | 85 ++-- 13 files changed, 652 insertions(+), 41 deletions(-) copy clients/src/main/java/org/apache/kafka/clients/admin/{DescribeAclsResult.java => MembershipChangeResult.java} (60%) create mode 100644 clients/src/main/java/org/apache/kafka/clients/admin/RemoveMemberFromConsumerGroupOptions.java create mode 100644 clients/src/main/java/org/apache/kafka/clients/admin/RemoveMemberFromGroupResult.java create mode 100644 clients/src/test/java/org/apache/kafka/clients/admin/MembershipChangeResultTest.java copy connect/api/src/test/java/org/apache/kafka/connect/storage/ConverterTypeTest.java => clients/src/test/java/org/apache/kafka/clients/admin/RemoveMemberFromConsumerGroupOptionsTest.java (59%) create mode 100644 clients/src/test/java/org/apache/kafka/clients/admin/RemoveMemberFromGroupResultTest.java
[kafka] branch trunk updated (7b62248 -> 0f177ea)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from 7b62248 MINOR: Add cause to thrown exception when deleting topic in TopicCommand (#7301) add 0f177ea MINOR: Clean up partition assignment logic (#7249) No new revisions were added by this update. Summary of changes: .../streams/internals/QuietStreamsConfig.java | 0 .../processor/internals/InternalTopicConfig.java | 10 +- .../processor/internals/InternalTopicManager.java | 13 +- .../streams/processor/internals/StreamThread.java | 9 +- .../internals/StreamsPartitionAssignor.java| 548 - .../streams/processor/internals/TaskManager.java | 6 +- .../internals/assignment/AssignmentInfo.java | 7 +- .../assignment/AssignorConfiguration.java | 188 +++ .../{TaskAssignor.java => AssignorError.java} | 17 +- .../assignment/CopartitionedTopicsEnforcer.java| 110 + ...java => StreamsAssignmentProtocolVersions.java} | 13 +- .../internals/assignment/SubscriptionInfo.java | 7 +- ...t.java => CopartitionedTopicsEnforcerTest.java} | 43 +- .../internals/StreamsPartitionAssignorTest.java| 174 --- .../internals/assignment/AssignmentInfoTest.java | 16 +- .../internals/assignment/SubscriptionInfoTest.java | 22 +- .../kafka/streams/tests/StreamsUpgradeTest.java| 35 +- .../kafka/test/MockInternalTopicManager.java | 2 +- 18 files changed, 711 insertions(+), 509 deletions(-) rename streams/{test-utils => }/src/main/java/org/apache/kafka/streams/internals/QuietStreamsConfig.java (100%) create mode 100644 streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java copy streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/{TaskAssignor.java => AssignorError.java} (77%) create mode 100644 streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/CopartitionedTopicsEnforcer.java copy streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/{TaskAssignor.java => StreamsAssignmentProtocolVersions.java} (62%) rename streams/src/test/java/org/apache/kafka/streams/processor/internals/{CopartitionedTopicsValidatorTest.java => CopartitionedTopicsEnforcerTest.java} (73%)
[kafka] branch trunk updated (deac5d9 -> ffef087)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from deac5d9 KAFKA-8724; Improve range checking when computing cleanable partitions (#7264) add ffef087 KAFKA-7149 : Reducing streams assignment data size (#7185) No new revisions were added by this update. Summary of changes: .../internals/StreamsPartitionAssignor.java| 45 +--- .../internals/assignment/AssignmentInfo.java | 118 ++--- .../internals/assignment/SubscriptionInfo.java | 34 +++--- .../internals/assignment/AssignmentInfoTest.java | 7 ++ .../kafka/streams/tests/StreamsUpgradeTest.java| 2 +- .../tests/streams/streams_upgrade_test.py | 14 +-- 6 files changed, 163 insertions(+), 57 deletions(-)
[kafka] branch trunk updated (18e6bb2 -> 8d8e2fb)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from 18e6bb2 KAFKA-8861 Fix flaky RegexSourceIntegrationTest.testMultipleConsumersCanReadFromPartitionedTopic (#7281) add 8d8e2fb KAFKA-8729, pt 1: Add 4 new metrics to keep track of various types of invalid record rejections (#7142) No new revisions were added by this update. Summary of changes: core/src/main/scala/kafka/log/Log.scala| 9 +- core/src/main/scala/kafka/log/LogValidator.scala | 119 - .../scala/kafka/server/KafkaRequestHandler.scala | 21 ++- core/src/test/scala/unit/kafka/log/LogTest.scala | 12 +- .../scala/unit/kafka/log/LogValidatorTest.scala| 197 + .../scala/unit/kafka/metrics/MetricsTest.scala | 32 ++-- .../unit/kafka/server/ProduceRequestTest.scala | 5 + .../test/scala/unit/kafka/utils/TestUtils.scala| 11 ++ 8 files changed, 300 insertions(+), 106 deletions(-)
[kafka] branch trunk updated (d18d6b0 -> 40432e3)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from d18d6b0 MINOR: Refactor tag key for store level metrics (#7257) add 40432e3 MONIR: Check for NULL in case of version probing (#7275) No new revisions were added by this update. Summary of changes: .../org/apache/kafka/streams/processor/internals/TaskManager.java| 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-)
[kafka] branch 2.3 updated: HOTFIX: AssignedStreamsTasksTest lacks one parameter
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch 2.3 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.3 by this push: new 2a38ae7 HOTFIX: AssignedStreamsTasksTest lacks one parameter 2a38ae7 is described below commit 2a38ae7c492292282ed4c42845d4348e2eb166d5 Author: Guozhang Wang AuthorDate: Thu Aug 29 18:18:31 2019 -0700 HOTFIX: AssignedStreamsTasksTest lacks one parameter --- .../kafka/streams/processor/internals/AssignedStreamsTasksTest.java| 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java index ca51a3b..1833052 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java @@ -519,7 +519,8 @@ public class AssignedStreamsTasksTest { stateDirectory, null, time, -() -> producer); +() -> producer, +metrics.sensor("dummy")); assignedTasks.addNewTask(task); assignedTasks.initializeNewTasks();
[kafka] branch trunk updated (d2741e5 -> 09ad6b8)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from d2741e5 MINOR: Remove `activeTaskCheckpointableOffsets` from `AbstractTask` (#7253) add 09ad6b8 MINOR. Fix 2.3.0 streams systest dockerfile typo (#7272) No new revisions were added by this update. Summary of changes: tests/docker/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-)
[kafka] branch trunk updated (fcfee61 -> d2741e5)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from fcfee61 MINOR: Only send delete request if there are offsets in map (#7256) add d2741e5 MINOR: Remove `activeTaskCheckpointableOffsets` from `AbstractTask` (#7253) No new revisions were added by this update. Summary of changes: .../org/apache/kafka/streams/processor/internals/AbstractTask.java | 6 -- .../org/apache/kafka/streams/processor/internals/StreamTask.java | 7 ++- 2 files changed, 2 insertions(+), 11 deletions(-)
[kafka] branch 2.3 updated: MINOR: Only send delete request if there are offsets in map (#7256)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch 2.3 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.3 by this push: new f1244e5 MINOR: Only send delete request if there are offsets in map (#7256) f1244e5 is described below commit f1244e508d6e25c2ee603578a0c897af235fc93a Author: Bill Bejeck AuthorDate: Wed Aug 28 12:22:36 2019 -0400 MINOR: Only send delete request if there are offsets in map (#7256) Currently on commit streams will attempt to delete offsets from repartition topics. However, if a topology does not have any repartition topics, then the recordsToDelete map will be empty. This PR adds a check that the recordsToDelete is not empty before executing the AdminClient#deleteRecords() method. Reviewers: A. Sophie Blee-Goldman , Guozhang Wang --- .../org/apache/kafka/streams/processor/internals/TaskManager.java | 7 --- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index c136fdb..3dc8404 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -449,9 +449,10 @@ public class TaskManager { for (final Map.Entry entry : active.recordsToDelete().entrySet()) { recordsToDelete.put(entry.getKey(), RecordsToDelete.beforeOffset(entry.getValue())); } -deleteRecordsResult = adminClient.deleteRecords(recordsToDelete); - -log.trace("Sent delete-records request: {}", recordsToDelete); +if (!recordsToDelete.isEmpty()) { +deleteRecordsResult = adminClient.deleteRecords(recordsToDelete); +log.trace("Sent delete-records request: {}", recordsToDelete); +} } }
[kafka] branch trunk updated (d32a2d1 -> fcfee61)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from d32a2d1 KAFKA-8837: KafkaMetricReporterClusterIdTest may not shutdown ZooKeeperTestHarness (#7255) add fcfee61 MINOR: Only send delete request if there are offsets in map (#7256) No new revisions were added by this update. Summary of changes: .../org/apache/kafka/streams/processor/internals/TaskManager.java | 7 --- 1 file changed, 4 insertions(+), 3 deletions(-)
[kafka-site] branch asf-site updated: MINOR: Added Agoda and TokenAnalyst to powered-by page (#226)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/kafka-site.git The following commit(s) were added to refs/heads/asf-site by this push: new af666db MINOR: Added Agoda and TokenAnalyst to powered-by page (#226) af666db is described below commit af666dbee7956aca3688d9b1be341ec6042e7f63 Author: Hartanto Thio AuthorDate: Wed Aug 28 08:56:16 2019 -0700 MINOR: Added Agoda and TokenAnalyst to powered-by page (#226) --- images/powered-by/agoda.png| Bin 0 -> 50229 bytes images/powered-by/tokenanalyst.png | Bin 0 -> 20335 bytes powered-by.html| 10 ++ 3 files changed, 10 insertions(+) diff --git a/images/powered-by/agoda.png b/images/powered-by/agoda.png new file mode 100644 index 000..1724421 Binary files /dev/null and b/images/powered-by/agoda.png differ diff --git a/images/powered-by/tokenanalyst.png b/images/powered-by/tokenanalyst.png new file mode 100644 index 000..dcbd717 Binary files /dev/null and b/images/powered-by/tokenanalyst.png differ diff --git a/powered-by.html b/powered-by.html index e8269fb..c7d4faa 100644 --- a/powered-by.html +++ b/powered-by.html @@ -38,6 +38,11 @@ "logoBgColor": "#ff", "description": "adidas uses Kafka as the core of Fast Data Streaming Platform, integrating source systems and enabling teams to implement real-time event processing for monitoring, analytics and reporting solutions." }, { +"link": "https://www.agoda.com/;, +"logo": "agoda.png", +"logoBgColor": "#ff", +"description": "Apache Kafka powers the backbone of Agoda's data pipeline with trillions of events streaming through daily across multiple data centers. The majority of the events are destined for analytical systems and directly influence business decisions at one of the world’s fastest growing online travel booking platforms." +}, { "link": "http://www.amadeus.com;, "logo": "amadeus.jpg", "logoBgColor": "#ff", @@ -398,6 +403,11 @@ "logoBgColor": "#ff", "description": "Apache Kafka drives our new pub sub system which delivers real-time events for users in our latest game - Deckadence. It will soon be used in a host of new use cases including group chat and back end stats and log collection." }, { +"link": "https://www.tokenanalyst.io/;, +"logo": "tokenanalyst.png", +"logoBgColor": "#ff", +"description": "At TokenAnalyst, we’re using Kafka for ingestion of blockchain data—which is directly pushed from our cluster of Bitcoin and Ethereum nodes—to different streams of transformation and loading processes." +}, { "link": "https://www.tumblr.com/;, "logo": "tumblr.png", "logoBgColor": "#5eba8c",
[kafka] branch trunk updated (e23a718 -> cf32a1a)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from e23a718 KAFKA-8745: DumpLogSegments doesn't show keys, when the message is null (#7152) add cf32a1a KAFKA-8179: Part 4, add CooperativeStickyAssignor (#7130) No new revisions were added by this update. Summary of changes: .../consumer/CooperativeStickyAssignor.java| 105 +++ .../kafka/clients/consumer/StickyAssignor.java | 885 + .../AbstractStickyAssignor.java} | 331 ++-- .../consumer/internals/PartitionAssignor.java | 5 +- .../internals/PartitionAssignorAdapter.java| 6 +- .../consumer/CooperativeStickyAssignorTest.java| 105 +++ .../kafka/clients/consumer/StickyAssignorTest.java | 868 ++-- .../AbstractStickyAssignorTest.java} | 513 docs/upgrade.html | 8 +- .../streams/integration/EosIntegrationTest.java| 2 +- 10 files changed, 513 insertions(+), 2315 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.java copy clients/src/main/java/org/apache/kafka/clients/consumer/{StickyAssignor.java => internals/AbstractStickyAssignor.java} (75%) create mode 100644 clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java copy clients/src/test/java/org/apache/kafka/clients/consumer/{StickyAssignorTest.java => internals/AbstractStickyAssignorTest.java} (57%)
[kafka] branch trunk updated (add6629 -> e23a718)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from add6629 MINOR: fix ProduceBenchWorker not to fail on final produce (#7254) add e23a718 KAFKA-8745: DumpLogSegments doesn't show keys, when the message is null (#7152) No new revisions were added by this update. Summary of changes: .../main/scala/kafka/tools/DumpLogSegments.scala | 12 ++--- .../unit/kafka/tools/DumpLogSegmentsTest.scala | 61 +- 2 files changed, 43 insertions(+), 30 deletions(-)
[kafka] branch trunk updated (24547b8 -> add6629)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from 24547b8 KAFKA-8579: Expose RocksDB metrics (#7209) add add6629 MINOR: fix ProduceBenchWorker not to fail on final produce (#7254) No new revisions were added by this update. Summary of changes: .../java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java | 6 +- 1 file changed, 5 insertions(+), 1 deletion(-)
[kafka] branch trunk updated (6b24b2e -> 24547b8)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from 6b24b2e KAFKA-8816: Make offsets immutable to users of RecordCollector.offsets (#7223) add 24547b8 KAFKA-8579: Expose RocksDB metrics (#7209) No new revisions were added by this update. Summary of changes: .../internals/metrics/StreamsMetricsImpl.java | 2 +- .../streams/state/internals/KeyValueSegment.java | 6 +- .../streams/state/internals/KeyValueSegments.java | 14 +- .../internals/RocksDBSegmentedBytesStore.java | 4 +- .../streams/state/internals/RocksDBStore.java | 50 ++- .../RocksDBTimestampedSegmentedBytesStore.java | 4 +- .../state/internals/RocksDBTimestampedStore.java | 11 +- .../RocksDbKeyValueBytesStoreSupplier.java | 4 +- .../state/internals/TimestampedSegment.java| 6 +- .../state/internals/TimestampedSegments.java | 14 +- .../state/internals/metrics/RocksDBMetrics.java| 67 ++--- .../internals/metrics/RocksDBMetricsRecorder.java | 133 + .../integration/MetricsIntegrationTest.java| 163 +++-- .../streams/state/KeyValueStoreTestDriver.java | 1 + .../state/internals/KeyValueSegmentTest.java | 33 +++-- .../state/internals/KeyValueSegmentsTest.java | 11 +- .../internals/RocksDBSegmentedBytesStoreTest.java | 6 +- .../streams/state/internals/RocksDBStoreTest.java | 89 ++- .../RocksDBTimestampedSegmentedBytesStoreTest.java | 6 +- .../internals/RocksDBTimestampedStoreTest.java | 4 +- .../state/internals/RocksDBWindowStoreTest.java| 4 +- .../state/internals/SegmentIteratorTest.java | 7 +- .../TimestampedKeyValueStoreBuilderTest.java | 4 +- .../state/internals/TimestampedSegmentTest.java| 33 +++-- .../state/internals/TimestampedSegmentsTest.java | 12 +- .../metrics/RocksDBMetricsRecorderTest.java| 144 ++ .../internals/metrics/RocksDBMetricsTest.java | 5 +- 27 files changed, 733 insertions(+), 104 deletions(-) create mode 100644 streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java create mode 100644 streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java
[kafka] branch trunk updated (d08bcae -> 6b24b2e)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from d08bcae KAFKA-8669: Add security providers in kafka security config (#7090) add 6b24b2e KAFKA-8816: Make offsets immutable to users of RecordCollector.offsets (#7223) No new revisions were added by this update. Summary of changes: .../processor/internals/RecordCollector.java | 2 +- .../processor/internals/RecordCollectorImpl.java | 3 ++- .../streams/processor/internals/StreamTask.java| 3 ++- .../processor/internals/RecordCollectorTest.java | 28 ++ 4 files changed, 33 insertions(+), 3 deletions(-)
[kafka] branch 2.1 updated: KAFKA-8412: Fix nullpointer exception thrown on flushing before closing producers (#7207)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.1 by this push: new 8cdc2d6 KAFKA-8412: Fix nullpointer exception thrown on flushing before closing producers (#7207) 8cdc2d6 is described below commit 8cdc2d6d1b2a227f5e62f148179a52684b230bd6 Author: cpettitt-confluent <53191309+cpettitt-conflu...@users.noreply.github.com> AuthorDate: Mon Aug 26 09:53:36 2019 -0700 KAFKA-8412: Fix nullpointer exception thrown on flushing before closing producers (#7207) Prior to this change an NPE is raised when calling AssignedTasks.close under the following conditions: 1. EOS is enabled 2. The task was in a suspended state The cause for the NPE is that when a clean close is requested for a StreamTask the StreamTask tries to commit. However, in the suspended state there is no producer so ultimately an NPE is thrown for the contained RecordCollector in flush. The fix put forth in this commit is to have AssignedTasks call closeSuspended when it knows the underlying StreamTask is suspended. Note also that this test is quite involved. I could have just tested that AssignedTasks calls closeSuspended when appropriate, but that is testing, IMO, a detail of the implementation and doesn't actually verify we reproduced the original problem as it was described. I feel much more confident that we are reproducing the behavior - and we can test exactly the conditions that lead to it - when testing across AssignedTasks and StreamTask. I believe this is an additional support for the argument of eventually consolidating the state split across classes. Reviewers: Matthias J. Sax , Guozhang Wang --- checkstyle/import-control.xml | 2 + .../streams/processor/internals/AssignedTasks.java | 17 ++- .../internals/AssignedStreamsTasksTest.java| 129 +++-- .../processor/internals/StreamTaskTest.java| 10 +- 4 files changed, 136 insertions(+), 22 deletions(-) diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index cad8f8b..636fae0 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -251,8 +251,10 @@ + + diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java index 3cc396d..9ef225f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java @@ -392,18 +392,23 @@ abstract class AssignedTasks { void close(final boolean clean) { final AtomicReference firstException = new AtomicReference<>(null); -for (final T task : allTasks()) { + +for (final T task: allTasks()) { try { -task.close(clean, false); +if (suspended.containsKey(task.id())) { +task.closeSuspended(clean, false, null); +} else { +task.close(clean, false); +} } catch (final TaskMigratedException e) { log.info("Failed to close {} {} since it got migrated to another thread already. " + -"Closing it as zombie and move on.", taskTypeName, task.id()); +"Closing it as zombie and move on.", taskTypeName, task.id()); firstException.compareAndSet(null, closeZombieTask(task)); } catch (final RuntimeException t) { log.error("Failed while closing {} {} due to the following error:", - task.getClass().getSimpleName(), - task.id(), - t); +task.getClass().getSimpleName(), +task.id(), +t); if (clean) { if (!closeUnclean(task)) { firstException.compareAndSet(null, t); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java index fe71135..349c463 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java @@ -17,27 +17,42 @@ package org.apache.kafka.streams.processor.internals; +import static org.hamcrest.CoreMatchers.not
[kafka] branch 2.2 updated: KAFKA-8412: Fix nullpointer exception thrown on flushing before closing producers (#7207)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch 2.2 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.2 by this push: new b29dde6 KAFKA-8412: Fix nullpointer exception thrown on flushing before closing producers (#7207) b29dde6 is described below commit b29dde6f6eabcf2d658f1d4055a55d139783eb4c Author: cpettitt-confluent <53191309+cpettitt-conflu...@users.noreply.github.com> AuthorDate: Mon Aug 26 09:53:36 2019 -0700 KAFKA-8412: Fix nullpointer exception thrown on flushing before closing producers (#7207) Prior to this change an NPE is raised when calling AssignedTasks.close under the following conditions: 1. EOS is enabled 2. The task was in a suspended state The cause for the NPE is that when a clean close is requested for a StreamTask the StreamTask tries to commit. However, in the suspended state there is no producer so ultimately an NPE is thrown for the contained RecordCollector in flush. The fix put forth in this commit is to have AssignedTasks call closeSuspended when it knows the underlying StreamTask is suspended. Note also that this test is quite involved. I could have just tested that AssignedTasks calls closeSuspended when appropriate, but that is testing, IMO, a detail of the implementation and doesn't actually verify we reproduced the original problem as it was described. I feel much more confident that we are reproducing the behavior - and we can test exactly the conditions that lead to it - when testing across AssignedTasks and StreamTask. I believe this is an additional support for the argument of eventually consolidating the state split across classes. Reviewers: Matthias J. Sax , Guozhang Wang --- checkstyle/import-control.xml | 2 + .../streams/processor/internals/AssignedTasks.java | 17 ++- .../internals/AssignedStreamsTasksTest.java| 129 +++-- .../processor/internals/StreamTaskTest.java| 10 +- 4 files changed, 136 insertions(+), 22 deletions(-) diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index f4955ce..55e4cf2 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -265,8 +265,10 @@ + + diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java index a9baa3f..6a39df1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java @@ -332,18 +332,23 @@ abstract class AssignedTasks { void close(final boolean clean) { final AtomicReference firstException = new AtomicReference<>(null); -for (final T task : allTasks()) { + +for (final T task: allTasks()) { try { -task.close(clean, false); +if (suspended.containsKey(task.id())) { +task.closeSuspended(clean, false, null); +} else { +task.close(clean, false); +} } catch (final TaskMigratedException e) { log.info("Failed to close {} {} since it got migrated to another thread already. " + -"Closing it as zombie and move on.", taskTypeName, task.id()); +"Closing it as zombie and move on.", taskTypeName, task.id()); firstException.compareAndSet(null, closeZombieTask(task)); } catch (final RuntimeException t) { log.error("Failed while closing {} {} due to the following error:", - task.getClass().getSimpleName(), - task.id(), - t); +task.getClass().getSimpleName(), +task.id(), +t); if (clean) { if (!closeUnclean(task)) { firstException.compareAndSet(null, t); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java index ffd0f8b..ca51a3b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java @@ -17,27 +17,42 @@ package org.apache.kafka.streams.processor.internals; +import static org.hamcrest.CoreMatchers.not
[kafka] branch 2.3 updated: KAFKA-8412: Fix nullpointer exception thrown on flushing before closing producers (#7207)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch 2.3 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.3 by this push: new e0e2dbc KAFKA-8412: Fix nullpointer exception thrown on flushing before closing producers (#7207) e0e2dbc is described below commit e0e2dbc9d89b0fcbf49ac9bf30a7159d3c27c9f8 Author: cpettitt-confluent <53191309+cpettitt-conflu...@users.noreply.github.com> AuthorDate: Mon Aug 26 09:53:36 2019 -0700 KAFKA-8412: Fix nullpointer exception thrown on flushing before closing producers (#7207) Prior to this change an NPE is raised when calling AssignedTasks.close under the following conditions: 1. EOS is enabled 2. The task was in a suspended state The cause for the NPE is that when a clean close is requested for a StreamTask the StreamTask tries to commit. However, in the suspended state there is no producer so ultimately an NPE is thrown for the contained RecordCollector in flush. The fix put forth in this commit is to have AssignedTasks call closeSuspended when it knows the underlying StreamTask is suspended. Note also that this test is quite involved. I could have just tested that AssignedTasks calls closeSuspended when appropriate, but that is testing, IMO, a detail of the implementation and doesn't actually verify we reproduced the original problem as it was described. I feel much more confident that we are reproducing the behavior - and we can test exactly the conditions that lead to it - when testing across AssignedTasks and StreamTask. I believe this is an additional support for the argument of eventually consolidating the state split across classes. Reviewers: Matthias J. Sax , Guozhang Wang --- checkstyle/import-control.xml | 2 + .../streams/processor/internals/AssignedTasks.java | 17 ++- .../internals/AssignedStreamsTasksTest.java| 129 +++-- .../processor/internals/StreamTaskTest.java| 10 +- 4 files changed, 136 insertions(+), 22 deletions(-) diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index a76fd1d..dfaa3f6 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -266,8 +266,10 @@ + + diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java index a9baa3f..6a39df1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java @@ -332,18 +332,23 @@ abstract class AssignedTasks { void close(final boolean clean) { final AtomicReference firstException = new AtomicReference<>(null); -for (final T task : allTasks()) { + +for (final T task: allTasks()) { try { -task.close(clean, false); +if (suspended.containsKey(task.id())) { +task.closeSuspended(clean, false, null); +} else { +task.close(clean, false); +} } catch (final TaskMigratedException e) { log.info("Failed to close {} {} since it got migrated to another thread already. " + -"Closing it as zombie and move on.", taskTypeName, task.id()); +"Closing it as zombie and move on.", taskTypeName, task.id()); firstException.compareAndSet(null, closeZombieTask(task)); } catch (final RuntimeException t) { log.error("Failed while closing {} {} due to the following error:", - task.getClass().getSimpleName(), - task.id(), - t); +task.getClass().getSimpleName(), +task.id(), +t); if (clean) { if (!closeUnclean(task)) { firstException.compareAndSet(null, t); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java index ffd0f8b..ca51a3b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java @@ -17,27 +17,42 @@ package org.apache.kafka.streams.processor.internals; +import static org.hamcrest.CoreMatchers.not
[kafka] branch trunk updated (d7f8ec8 -> 7334222)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from d7f8ec8 MINOR: Fix the doc of scheduled.rebalance.max.delay.ms config property (#7242) add 7334222 KAFKA-8412: Fix nullpointer exception thrown on flushing before closing producers (#7207) No new revisions were added by this update. Summary of changes: checkstyle/import-control.xml | 2 + .../streams/processor/internals/AssignedTasks.java | 17 ++- .../internals/AssignedStreamsTasksTest.java| 129 +++-- .../processor/internals/StreamTaskTest.java| 10 +- 4 files changed, 136 insertions(+), 22 deletions(-)
[kafka] branch trunk updated (c1f2b0f -> c6664e1)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from c1f2b0f KAFKA-8753; Expose controller topic deletion metrics (KIP-503) (#7156) add c6664e1 MINOR: Move the resetting from revoked to the thread loop (#7243) No new revisions were added by this update. Summary of changes: .../org/apache/kafka/streams/processor/internals/StreamThread.java | 7 ++- 1 file changed, 2 insertions(+), 5 deletions(-)
[kafka] branch 2.3 updated: KAFKA-8824: bypass value serde on null (#7235)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch 2.3 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.3 by this push: new 919 KAFKA-8824: bypass value serde on null (#7235) 919 is described below commit 9195c2685cac3ac6788fad42076a795f8085 Author: John Roesler AuthorDate: Thu Aug 22 15:35:33 2019 -0500 KAFKA-8824: bypass value serde on null (#7235) In a KTable context, we should not pass null into a user-supplied serde. Testing: I verified that the change to the test results in test failures without the patch. Reviewers: Matthias J. Sax , Bill Bejeck , Guozhang Wang , --- .../state/internals/InMemoryTimeOrderedKeyValueBuffer.java | 2 +- .../state/internals/TimeOrderedKeyValueBufferTest.java | 14 +- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java index 6c5022f..14a41ea 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java @@ -448,7 +448,7 @@ public final class InMemoryTimeOrderedKeyValueBuffer implements TimeOrdere final byte[] serializedPriorValue; if (buffered == null) { final V priorValue = value.oldValue; -serializedPriorValue = valueSerde.innerSerde().serializer().serialize(changelogTopic, priorValue); +serializedPriorValue = (priorValue == null) ? null : valueSerde.innerSerde().serializer().serialize(changelogTopic, priorValue); } else { serializedPriorValue = buffered.priorValue(); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java index 941832b..e8a62d0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java @@ -24,6 +24,8 @@ import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; @@ -68,6 +70,16 @@ public class TimeOrderedKeyValueBufferTest bufferSupplier; private final String testName; +public static final class NullRejectingStringSerializer extends StringSerializer { +@Override +public byte[] serialize(final String topic, final String data) { +if (data == null) { +throw new IllegalArgumentException(); +} +return super.serialize(topic, data); +} +} + // As we add more buffer implementations/configurations, we can add them here @Parameterized.Parameters(name = "{index}: test={0}") public static Collection parameters() { @@ -76,7 +88,7 @@ public class TimeOrderedKeyValueBufferTest>) name -> new InMemoryTimeOrderedKeyValueBuffer -.Builder<>(name, Serdes.String(), Serdes.String()) +.Builder<>(name, Serdes.String(), Serdes.serdeFrom(new NullRejectingStringSerializer(), new StringDeserializer())) .build() } );
[kafka] branch trunk updated (e4215c1 -> e213608)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from e4215c1 KAFKA-8325; Remove batch from in-flight requests on MESSAGE_TOO_LARGE errors (#7176) add e213608 KAFKA-8824: bypass value serde on null (#7235) No new revisions were added by this update. Summary of changes: .../state/internals/InMemoryTimeOrderedKeyValueBuffer.java | 2 +- .../state/internals/TimeOrderedKeyValueBufferTest.java | 14 +- 2 files changed, 14 insertions(+), 2 deletions(-)
[kafka] branch trunk updated (b8605c9 -> e2d16b5)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from b8605c9 KAFKA-8600: Use RPC generation for DescribeDelegationTokens protocol add e2d16b5 KAFKA-8802: ConcurrentSkipListMap shows performance regression in cache and in-memory store (#7212) No new revisions were added by this update. Summary of changes: .../state/internals/InMemoryKeyValueStore.java | 37 +++--- .../kafka/streams/state/internals/NamedCache.java | 17 ++ .../kafka/streams/state/internals/ThreadCache.java | 24 +++--- 3 files changed, 42 insertions(+), 36 deletions(-)
[kafka] branch 2.3 updated: KAFKA-8802: ConcurrentSkipListMap shows performance regression in cache and in-memory store (#7212)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch 2.3 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.3 by this push: new 051d290 KAFKA-8802: ConcurrentSkipListMap shows performance regression in cache and in-memory store (#7212) 051d290 is described below commit 051d29098ac7529f54475899c3441ed918a6882c Author: A. Sophie Blee-Goldman AuthorDate: Fri Aug 16 10:41:30 2019 -0700 KAFKA-8802: ConcurrentSkipListMap shows performance regression in cache and in-memory store (#7212) Reverts the TreeMap -> ConcurrentSkipListMap change that caused a performance regression in 2.3, and fixes the ConcurrentModificationException by copying (just) the key set to iterate over Reviewers: Bill Bejeck , Guozhang Wang --- .../state/internals/InMemoryKeyValueStore.java | 37 +++--- .../kafka/streams/state/internals/NamedCache.java | 17 ++ .../kafka/streams/state/internals/ThreadCache.java | 24 +++--- 3 files changed, 42 insertions(+), 36 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java index 2d68214..ed3d024 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java @@ -17,8 +17,10 @@ package org.apache.kafka.streams.state.internals; import java.util.List; -import java.util.concurrent.ConcurrentNavigableMap; -import java.util.concurrent.ConcurrentSkipListMap; +import java.util.NavigableMap; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; @@ -27,13 +29,12 @@ import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import java.util.Iterator; -import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class InMemoryKeyValueStore implements KeyValueStore { private final String name; -private final ConcurrentNavigableMap map = new ConcurrentSkipListMap<>(); +private final NavigableMap map = new TreeMap<>(); private volatile boolean open = false; private long size = 0L; // SkipListMap#size is O(N) so we just do our best to track it @@ -71,12 +72,12 @@ public class InMemoryKeyValueStore implements KeyValueStore { } @Override -public byte[] get(final Bytes key) { +public synchronized byte[] get(final Bytes key) { return map.get(key); } @Override -public void put(final Bytes key, final byte[] value) { +public synchronized void put(final Bytes key, final byte[] value) { if (value == null) { size -= map.remove(key) == null ? 0 : 1; } else { @@ -85,7 +86,7 @@ public class InMemoryKeyValueStore implements KeyValueStore { } @Override -public byte[] putIfAbsent(final Bytes key, final byte[] value) { +public synchronized byte[] putIfAbsent(final Bytes key, final byte[] value) { final byte[] originalValue = get(key); if (originalValue == null) { put(key, value); @@ -101,14 +102,14 @@ public class InMemoryKeyValueStore implements KeyValueStore { } @Override -public byte[] delete(final Bytes key) { +public synchronized byte[] delete(final Bytes key) { final byte[] oldValue = map.remove(key); size -= oldValue == null ? 0 : 1; return oldValue; } @Override -public KeyValueIterator range(final Bytes from, final Bytes to) { +public synchronized KeyValueIterator range(final Bytes from, final Bytes to) { if (from.compareTo(to) > 0) { LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. " @@ -119,14 +120,14 @@ public class InMemoryKeyValueStore implements KeyValueStore { return new DelegatingPeekingKeyValueIterator<>( name, -new InMemoryKeyValueIterator(map.subMap(from, true, to, true).entrySet().iterator())); +new InMemoryKeyValueIterator(map.subMap(from, true, to, true).keySet())); } @Override -public KeyValueIterator all() { +public synchronized KeyValueIterator all() { return new DelegatingPeekingKeyValueIterator<>( name, -new InMemoryKeyValueIterator(map.entrySet().iterator())); +new InMemoryKeyValueIterator(map.keySet())); } @Override @@ -146,11 +147,11 @@ public class InMemoryKeyValueStore implements KeyValueStore { open
[kafka] branch trunk updated: MINOR: remove unnecessary #remove overrides (#7178)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new e07b46d MINOR: remove unnecessary #remove overrides (#7178) e07b46d is described below commit e07b46dd0424717829f5e12fb0fa91065b943fcd Author: A. Sophie Blee-Goldman AuthorDate: Tue Aug 13 16:30:48 2019 -0700 MINOR: remove unnecessary #remove overrides (#7178) Iterator#remove has a default implementation that throws UnsupportedOperatorException so there's no need to override it with the same thing. Should be cherry-picked back to whenever we switched to Java 8 Reviewers: Bill Bejeck , Matthias J. Sax , Guozhang Wang --- .../apache/kafka/streams/state/KeyValueIterator.java | 3 ++- .../AbstractMergedSortedCacheStoreIterator.java | 5 - .../state/internals/CompositeKeyValueIterator.java | 4 .../internals/DelegatingPeekingKeyValueIterator.java | 5 - .../state/internals/FilteredCacheIterator.java | 9 - .../state/internals/InMemoryKeyValueStore.java | 5 - .../streams/state/internals/KeyValueIterators.java | 3 --- .../state/internals/MemoryNavigableLRUCache.java | 5 - .../state/internals/MeteredKeyValueStore.java| 5 - .../state/internals/MeteredWindowStoreIterator.java | 5 - .../internals/MeteredWindowedKeyValueIterator.java | 5 - .../state/internals/RocksDBTimestampedStore.java | 5 - .../streams/state/internals/RocksDbIterator.java | 5 - .../streams/state/internals/SegmentIterator.java | 6 ++ .../kafka/streams/state/internals/ThreadCache.java | 5 - .../state/internals/WindowStoreIteratorWrapper.java | 10 -- .../state/internals/WrappedSessionStoreIterator.java | 5 - .../apache/kafka/streams/state/NoOpWindowStore.java | 4 .../state/internals/ReadOnlyWindowStoreStub.java | 20 .../kafka/test/GenericInMemoryKeyValueStore.java | 5 - .../GenericInMemoryTimestampedKeyValueStore.java | 5 - .../org/apache/kafka/test/KeyValueIteratorStub.java | 4 .../apache/kafka/test/ReadOnlySessionStoreStub.java | 4 23 files changed, 4 insertions(+), 128 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java index 70a142b..b1f5e2c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java @@ -23,9 +23,10 @@ import java.util.Iterator; /** * Iterator interface of {@link KeyValue}. - * + * * Users must call its {@code close} method explicitly upon completeness to release resources, * or use try-with-resources statement (available since JDK7) for this {@link Closeable} class. + * Note that {@code remove()} is not supported. * * @param Type of keys * @param Type of values diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java index dfcd763..16bdbeb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java @@ -148,11 +148,6 @@ abstract class AbstractMergedSortedCacheStoreIterator implements K } @Override -public void remove() { -throw new UnsupportedOperationException("remove() is not supported in " + getClass().getName()); -} - -@Override public void close() { cacheIterator.close(); storeIterator.close(); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeKeyValueIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeKeyValueIterator.java index faccc16..4ac6fee 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeKeyValueIterator.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeKeyValueIterator.java @@ -67,8 +67,4 @@ class CompositeKeyValueIterator implements KeyValueIterator implements KeyValueIterator } @Override -public void remove() { -throw new UnsupportedOperationException("remove() is not supported in " + getClass().getName()); -} - -@Override public KeyValue peekNext() { if (!hasNext()) { throw new NoSuchElementException(); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/FilteredCacheIterator.java b/streams/src/main/java/org/apache
[kafka] branch trunk updated: KAFKA-8179: Part 3, Add PartitionsLost API for resetGenerations and metadata/subscription change (#6884)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new e867a58 KAFKA-8179: Part 3, Add PartitionsLost API for resetGenerations and metadata/subscription change (#6884) e867a58 is described below commit e867a58425876767b952e06892c72b5e13066acc Author: Guozhang Wang AuthorDate: Thu Aug 8 14:31:22 2019 -0700 KAFKA-8179: Part 3, Add PartitionsLost API for resetGenerations and metadata/subscription change (#6884) 1. Add onPartitionsLost into the RebalanceListener, which will be triggered when the consumer found that the generation is reset due to fatal errors in response handling. 2. Semantical behavior change: with COOPERATIVE protocol, if the revoked / lost partitions are empty, do not trigger the corresponding callback at all. For added partitions though, even if it is empty we would still trigger the callback as a way to notify the rebalance event; with EAGER protocol, revoked / assigned callbacks are always triggered. The ordering of the callback would be the following: a. Callback onPartitionsRevoked / onPartitionsLost triggered. b. Update the assignment (both revoked and added). c. Callback onPartitionsAssigned triggered. In this way we are assured that users can still access the partitions being revoked, whereas they can also access the partitions being added. 3. Semantical behavior change (KAFKA-4600): if the rebalance listener throws an exception, pass it along all the way to the consumer.poll caller, but still completes the rest of the actions. Also, the newly assigned partitions list does not gets affected with exception thrown since it is just for notifying the users. 4. Semantical behavior change: the ConsumerCoordinator would not try to modify assignor's returned assignments, instead it will validate that assignments and set the error code accordingly: if there are overlaps between added / revoked partitions, it is a fatal error and would be communicated to all members to stop; if revoked is not empty, it is an error indicate re-join; otherwise, it is normal. 5. Minor: with the error code removed from the Assignment, ConsumerCoordinator will request re-join if the revoked partitions list is not empty. 6. Updated ConsumerCoordinatorTest accordingly. Also found a minor bug in MetadataUpdate that removed topic would still be retained with null value of num.partitions. 6. Updated a few other flaky tests that are exposed due to this change. Reviewers: John Roesler , A. Sophie Blee-Goldman , Jason Gustafson --- .../consumer/ConsumerPartitionAssignor.java| 17 ++ .../consumer/ConsumerRebalanceListener.java| 98 +- .../kafka/clients/consumer/KafkaConsumer.java | 19 +- .../consumer/internals/AbstractCoordinator.java| 62 ++-- .../consumer/internals/ConsumerCoordinator.java| 332 +++-- .../consumer/internals/PartitionAssignor.java | 1 - .../consumer/internals/SubscriptionState.java | 55 ++-- .../kafka/clients/consumer/KafkaConsumerTest.java | 64 +++- .../clients/consumer/RoundRobinAssignorTest.java | 20 +- .../internals/ConsumerCoordinatorTest.java | 184 .../consumer/internals/SubscriptionStateTest.java | 40 ++- core/src/main/scala/kafka/tools/MirrorMaker.scala | 2 + .../kafka/api/AbstractConsumerTest.scala | 14 +- .../integration/kafka/api/ConsumerBounceTest.scala | 13 +- .../kafka/api/PlaintextConsumerTest.scala | 8 +- .../org/apache/kafka/streams/KafkaStreams.java | 5 +- .../streams/processor/internals/StreamThread.java | 5 +- .../StreamTableJoinIntegrationTest.java| 5 +- .../integration/utils/IntegrationTestUtils.java| 17 +- 19 files changed, 686 insertions(+), 275 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java index 07e153e..f9a4217 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java @@ -180,6 +180,23 @@ public interface ConsumerPartitionAssignor { } } +/** + * The rebalance protocol defines partition assignment and revocation semantics. The purpose is to establish a + * consistent set of rules that all consumers in a group follow in order to transfer ownership of a partition. + * {@link ConsumerPartitionAssignor} implementors can claim supporting one or more rebalance protocols via the + * {@link ConsumerPartitionAssignor#supportedProtocols()}, and it is their responsibility
[kafka] branch 1.1 updated: KAFKA-8602: Backport bugfix for standby task creation (#7147)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch 1.1 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/1.1 by this push: new f6316c3 KAFKA-8602: Backport bugfix for standby task creation (#7147) f6316c3 is described below commit f6316c39b9d2d9630066416d15eca7ad5cec99fb Author: cadonna AuthorDate: Wed Aug 7 17:13:40 2019 +0200 KAFKA-8602: Backport bugfix for standby task creation (#7147) Backports bugfix in standby task creation from PR #7008. A separate PR is needed because some tests in the original PR use topology optimizations and mocks that were introduced afterwards. Reviewers: Bill Bejeck , Guozhang Wang --- .../streams/processor/internals/StreamThread.java | 20 ++- .../StandbyTaskCreationIntegrationTest.java| 189 + .../processor/internals/StreamThreadTest.java | 133 +-- 3 files changed, 318 insertions(+), 24 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index a5199da..1cc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -472,15 +472,17 @@ public class StreamThread extends Thread { final ProcessorTopology topology = builder.build(taskId.topicGroupId); -if (!topology.stateStores().isEmpty()) { -return new StandbyTask(taskId, - partitions, - topology, - consumer, - storeChangelogReader, - config, - streamsMetrics, - stateDirectory); +if (!topology.stateStores().isEmpty() && !topology.storeToChangelogTopic().isEmpty()) { +return new StandbyTask( +taskId, +partitions, +topology, +consumer, +storeChangelogReader, +config, +streamsMetrics, +stateDirectory +); } else { log.trace("Skipped standby task {} with assigned partitions {} " + "since it does not have any state stores to materialize", taskId, partitions); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java new file mode 100644 index 000..ed2781f --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.Consumed; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KafkaStreams.State; +import org.apache.kafka.streams.KafkaStreams.StateListener; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.kstream.Transformer; +import org.apache.kafka.streams.kstream.TransformerSupplier; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.ThreadMetadata; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.test.IntegrationTest; +imp
[kafka] branch trunk updated: Minor: Refactor methods to add metrics to sensor in `StreamsMetricsImpl` (#7161)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 7663a6c Minor: Refactor methods to add metrics to sensor in `StreamsMetricsImpl` (#7161) 7663a6c is described below commit 7663a6c44daae5d72f38cbba79d728416e11167d Author: cadonna AuthorDate: Tue Aug 6 17:51:08 2019 +0200 Minor: Refactor methods to add metrics to sensor in `StreamsMetricsImpl` (#7161) Renames method names in StreamsMetricsImpl to make them consistent. Reviewers: A. Sophie Blee-Goldman , Guozhang Wang --- .../streams/kstream/internals/metrics/Sensors.java | 2 +- .../streams/processor/internals/ProcessorNode.java | 13 +++--- .../internals/metrics/StreamsMetricsImpl.java | 50 +++--- .../processor/internals/metrics/ThreadMetrics.java | 30 ++--- .../AbstractRocksDBSegmentedBytesStore.java| 4 +- .../state/internals/InMemorySessionStore.java | 8 ++-- .../state/internals/InMemoryWindowStore.java | 4 +- .../streams/state/internals/metrics/Sensors.java | 12 +++--- .../internals/metrics/StreamsMetricsImplTest.java | 20 - .../internals/metrics/ThreadMetricsTest.java | 26 +-- 10 files changed, 85 insertions(+), 84 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java index 038b8ac..363ec6e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java @@ -44,7 +44,7 @@ public class Sensors { LATE_RECORD_DROP, Sensor.RecordingLevel.INFO ); -StreamsMetricsImpl.addInvocationRateAndCount( +StreamsMetricsImpl.addInvocationRateAndCountToSensor( sensor, PROCESSOR_NODE_METRICS_GROUP, metrics.tagMap("task-id", context.taskId().toString(), PROCESSOR_NODE_ID_TAG, context.currentNode().name()), diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java index 01e3e56..bc66ede 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java @@ -33,8 +33,7 @@ import java.util.Set; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESSOR_NODE_ID_TAG; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESSOR_NODE_METRICS_GROUP; -import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgMaxLatency; -import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCount; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgAndMaxLatencyToSensor; public class ProcessorNode { @@ -232,12 +231,14 @@ public class ProcessorNode { final Map taskTags, final Map nodeTags) { final Sensor parent = metrics.taskLevelSensor(taskName, operation, Sensor.RecordingLevel.DEBUG); -addAvgMaxLatency(parent, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation); -addInvocationRateAndCount(parent, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation); +addAvgAndMaxLatencyToSensor(parent, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation); +StreamsMetricsImpl +.addInvocationRateAndCountToSensor(parent, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation); final Sensor sensor = metrics.nodeLevelSensor(taskName, processorNodeName, operation, Sensor.RecordingLevel.DEBUG, parent); -addAvgMaxLatency(sensor, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation); -addInvocationRateAndCount(sensor, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation); +addAvgAndMaxLatencyToSensor(sensor, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation); +StreamsMetricsImpl +.addInvocationRateAndCountToSensor(sensor, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation); return sensor; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java index ae3d953..5ac2f33 100644 --- a/streams/src/mai
[kafka] branch trunk updated: KAFKA-8578: Add basic functionality to expose RocksDB metrics (#6979)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new a7d0fdd KAFKA-8578: Add basic functionality to expose RocksDB metrics (#6979) a7d0fdd is described below commit a7d0fdd534ef55533a868ea7388bbc081ee42718 Author: cadonna AuthorDate: Fri Aug 2 18:51:03 2019 +0200 KAFKA-8578: Add basic functionality to expose RocksDB metrics (#6979) * Adds RocksDBMetrics class that provides methods to get sensors from the Kafka metrics registry and to setup the sensors to record RocksDB metrics * Extends StreamsMetricsImpl with functionality to add the required metrics to the sensors. Reviewers: Boyang Chen , Bill Bejeck , Matthias J. Sax , John Roesler , Guozhang Wang --- .../internals/metrics/StreamsMetricsImpl.java | 83 - .../state/internals/metrics/RocksDBMetrics.java| 382 + .../internals/metrics/StreamsMetricsImplTest.java | 113 +- .../internals/metrics/RocksDBMetricsTest.java | 283 +++ 4 files changed, 852 insertions(+), 9 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java index b6bfcc5..ae3d953 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java @@ -23,9 +23,12 @@ import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.Sensor.RecordingLevel; import org.apache.kafka.common.metrics.stats.Avg; import org.apache.kafka.common.metrics.stats.CumulativeCount; +import org.apache.kafka.common.metrics.stats.CumulativeSum; import org.apache.kafka.common.metrics.stats.Max; import org.apache.kafka.common.metrics.stats.Rate; +import org.apache.kafka.common.metrics.stats.Value; import org.apache.kafka.common.metrics.stats.WindowedCount; +import org.apache.kafka.common.metrics.stats.WindowedSum; import org.apache.kafka.streams.StreamsMetrics; import java.util.Arrays; @@ -54,17 +57,21 @@ public class StreamsMetricsImpl implements StreamsMetrics { public static final String THREAD_ID_TAG = "client-id"; public static final String TASK_ID_TAG = "task-id"; +public static final String STORE_ID_TAG = "state-id"; public static final String ALL_TASKS = "all"; public static final String LATENCY_SUFFIX = "-latency"; public static final String AVG_SUFFIX = "-avg"; public static final String MAX_SUFFIX = "-max"; +public static final String MIN_SUFFIX = "-min"; public static final String RATE_SUFFIX = "-rate"; public static final String TOTAL_SUFFIX = "-total"; +public static final String RATIO_SUFFIX = "-ratio"; public static final String THREAD_LEVEL_GROUP = "stream-metrics"; public static final String TASK_LEVEL_GROUP = "stream-task-metrics"; +public static final String STATE_LEVEL_GROUP = "stream-state-metrics"; public static final String PROCESSOR_NODE_METRICS_GROUP = "stream-processor-node-metrics"; public static final String PROCESSOR_NODE_ID_TAG = "processor-node-id"; @@ -123,6 +130,18 @@ public class StreamsMetricsImpl implements StreamsMetrics { } } +public Map taskLevelTagMap(final String taskName) { +final Map tagMap = threadLevelTagMap(); +tagMap.put(TASK_ID_TAG, taskName); +return tagMap; +} + +public Map storeLevelTagMap(final String taskName, final String storeType, final String storeName) { +final Map tagMap = taskLevelTagMap(taskName); +tagMap.put(storeType + "-" + STORE_ID_TAG, storeName); +return tagMap; +} + public final Sensor taskLevelSensor(final String taskName, final String sensorName, final RecordingLevel recordingLevel, @@ -237,9 +256,7 @@ public class StreamsMetricsImpl implements StreamsMetrics { if (!storeLevelSensors.containsKey(key)) { storeLevelSensors.put(key, new LinkedList<>()); } - final String fullSensorName = key + SENSOR_NAME_DELIMITER + sensorName; - final Sensor sensor = metrics.sensor(fullSensorName, recordingLevel, parents); storeLevelSensors.get(key).push(fullSensorName); @@ -454,12 +471,62 @@ public class StreamsMetricsImpl implements StreamsMetrics {
[kafka] branch trunk updated: KAFKA-8179: PartitionAssignorAdapter (#7110)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 6fbac3c KAFKA-8179: PartitionAssignorAdapter (#7110) 6fbac3c is described below commit 6fbac3cfa88bd3eba36b2d3e254d7ce5e11a550b Author: A. Sophie Blee-Goldman AuthorDate: Wed Jul 31 13:53:38 2019 -0700 KAFKA-8179: PartitionAssignorAdapter (#7110) Follow up to new PartitionAssignor interface merged in 7108 is merged Adds a PartitionAssignorAdapter class to maintain backwards compatibility Reviewers: Boyang Chen , Jason Gustafson , Guozhang Wang --- .../kafka/clients/admin/KafkaAdminClient.java | 4 +- .../kafka/clients/consumer/ConsumerConfig.java | 2 +- .../consumer/ConsumerPartitionAssignor.java| 10 +- .../kafka/clients/consumer/KafkaConsumer.java | 7 +- .../internals/AbstractPartitionAssignor.java | 4 +- .../consumer/internals/ConsumerCoordinator.java| 3 +- .../consumer/internals/PartitionAssignor.java | 3 + .../internals/PartitionAssignorAdapter.java| 136 .../internals/PartitionAssignorAdapterTest.java| 173 + docs/upgrade.html | 2 + .../internals/StreamsPartitionAssignor.java| 4 +- 11 files changed, 333 insertions(+), 15 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index f2ee21e..227a03b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -29,7 +29,7 @@ import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResult; import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResults; import org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult.ReplicaLogDirInfo; import org.apache.kafka.clients.admin.internals.AdminMetadataManager; -import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor; +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Assignment; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.internals.ConsumerProtocol; import org.apache.kafka.common.Cluster; @@ -2724,7 +2724,7 @@ public class KafkaAdminClient extends AdminClient { for (DescribedGroupMember groupMember : members) { Set partitions = Collections.emptySet(); if (groupMember.memberAssignment().length > 0) { -final ConsumerPartitionAssignor.Assignment assignment = ConsumerProtocol. +final Assignment assignment = ConsumerProtocol. deserializeAssignment(ByteBuffer.wrap(groupMember.memberAssignment())); partitions = new HashSet<>(assignment.partitions()); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java index 8a18bd5..2e4507a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -102,7 +102,7 @@ public class ConsumerConfig extends AbstractConfig { * partition.assignment.strategy */ public static final String PARTITION_ASSIGNMENT_STRATEGY_CONFIG = "partition.assignment.strategy"; -private static final String PARTITION_ASSIGNMENT_STRATEGY_DOC = "The class name or class type of the assignor implementing the partition assignment strategy that the client will use to distribute partition ownership amongst consumer instances when group management is used. A custom assignor that implements ConsumerPartitionAssignor can be plugged in"; +private static final String PARTITION_ASSIGNMENT_STRATEGY_DOC = "A list of class names or class types, ordered by preference, of supported assignors responsible for the partition assignment strategy that the client will use to distribute partition ownership amongst consumer instances when group management is used. Implementing the org.apache.kafka.clients.consumer.ConsumerPartitionAssignor interface allows you to plug in a custom assignment strategy."; /** * auto.offset.reset diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java index 72d5d6e..07e153e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java
[kafka] branch trunk updated: MINOR: Refactor abstractConfig#configuredInstance (#7129)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 73ed9ea MINOR: Refactor abstractConfig#configuredInstance (#7129) 73ed9ea is described below commit 73ed9eac2c875a12d933af48b7a7817ea57ad447 Author: Guozhang Wang AuthorDate: Tue Jul 30 13:35:02 2019 -0700 MINOR: Refactor abstractConfig#configuredInstance (#7129) Reviewers: Jason Gustafson --- .../apache/kafka/common/config/AbstractConfig.java | 50 +++--- 1 file changed, 26 insertions(+), 24 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java index a48856f..832837d 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java +++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java @@ -355,6 +355,29 @@ public class AbstractConfig { log.warn("The configuration '{}' was supplied but isn't a known config.", key); } +private T getConfiguredInstance(Object klass, Class t, Map configPairs) { +if (klass == null) +return null; + +Object o; +if (klass instanceof String) { +try { +o = Utils.newInstance((String) klass, t); +} catch (ClassNotFoundException e) { +throw new KafkaException("Class " + klass + " cannot be found", e); +} +} else if (klass instanceof Class) { +o = Utils.newInstance((Class) klass); +} else +throw new KafkaException("Unexpected element of type " + klass.getClass().getName() + ", expected String or Class"); +if (!t.isInstance(o)) +throw new KafkaException(klass + " is not an instance of " + t.getName()); +if (o instanceof Configurable) +((Configurable) o).configure(configPairs); + +return t.cast(o); +} + /** * Get a configured instance of the give class specified by the given configuration key. If the object implements * Configurable configure it using the configuration. @@ -365,14 +388,8 @@ public class AbstractConfig { */ public T getConfiguredInstance(String key, Class t) { Class c = getClass(key); -if (c == null) -return null; -Object o = Utils.newInstance(c); -if (!t.isInstance(o)) -throw new KafkaException(c.getName() + " is not an instance of " + t.getName()); -if (o instanceof Configurable) -((Configurable) o).configure(originals()); -return t.cast(o); + +return getConfiguredInstance(c, t, originals()); } /** @@ -400,7 +417,6 @@ public class AbstractConfig { return getConfiguredInstances(getList(key), t, configOverrides); } - /** * Get a list of configured instances of the given class specified by the given configuration key. The configuration * may specify either null or an empty string to indicate no configured instances. In both cases, this method @@ -417,21 +433,7 @@ public class AbstractConfig { Map configPairs = originals(); configPairs.putAll(configOverrides); for (Object klass : classNames) { -Object o; -if (klass instanceof String) { -try { -o = Utils.newInstance((String) klass, t); -} catch (ClassNotFoundException e) { -throw new KafkaException(klass + " ClassNotFoundException exception occurred", e); -} -} else if (klass instanceof Class) { -o = Utils.newInstance((Class) klass); -} else -throw new KafkaException("List contains element of type " + klass.getClass().getName() + ", expected String or Class"); -if (!t.isInstance(o)) -throw new KafkaException(klass + " is not an instance of " + t.getName()); -if (o instanceof Configurable) -((Configurable) o).configure(configPairs); +Object o = getConfiguredInstance(klass, t, configPairs); objects.add(t.cast(o)); } return objects;
[kafka] branch trunk updated: KAFKA-8179: add public ConsumerPartitionAssignor interface (#7108)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 69d86a1 KAFKA-8179: add public ConsumerPartitionAssignor interface (#7108) 69d86a1 is described below commit 69d86a197f86ad4c6f1636b5ab4678907e30a4c0 Author: A. Sophie Blee-Goldman AuthorDate: Thu Jul 25 13:02:09 2019 -0700 KAFKA-8179: add public ConsumerPartitionAssignor interface (#7108) Main changes of this PR: * Deprecate old consumer.internal.PartitionAssignor and add public consumer.ConsumerPartitionAssignor with all OOTB assignors migrated to new interface * Refactor assignor's assignment/subscription related classes for easier to evolve API * Removed version number from classes as it is only needed for serialization/deserialization * Other previously-discussed cleanup included in this PR: * Remove Assignment.error added in pt 1 * Remove ConsumerCoordinator#adjustAssignment added in pt 2 Reviewers: Boyang Chen , Jason Gustafson , Guozhang Wang --- .../kafka/clients/admin/KafkaAdminClient.java | 4 +- .../kafka/clients/consumer/ConsumerConfig.java | 2 +- .../clients/consumer/ConsumerGroupMetadata.java| 50 + ...ssignor.java => ConsumerPartitionAssignor.java} | 210 - .../kafka/clients/consumer/KafkaConsumer.java | 7 +- .../kafka/clients/consumer/StickyAssignor.java | 11 +- .../internals/AbstractPartitionAssignor.java | 18 +- .../consumer/internals/ConsumerCoordinator.java| 86 +++-- .../consumer/internals/ConsumerProtocol.java | 95 -- .../consumer/internals/PartitionAssignor.java | 142 +- .../consumer/internals/SubscriptionState.java | 8 + .../kafka/clients/admin/KafkaAdminClientTest.java | 6 +- .../kafka/clients/consumer/KafkaConsumerTest.java | 69 --- .../kafka/clients/consumer/RangeAssignorTest.java | 2 +- .../clients/consumer/RoundRobinAssignorTest.java | 2 +- .../kafka/clients/consumer/StickyAssignorTest.java | 2 +- .../internals/ConsumerCoordinatorTest.java | 31 +-- .../consumer/internals/ConsumerProtocolTest.java | 45 + .../group/GroupMetadataManagerTest.scala | 2 +- .../internals/StreamsPartitionAssignor.java| 20 +- .../internals/StreamsPartitionAssignorTest.java| 126 +++-- .../kafka/streams/tests/StreamsUpgradeTest.java| 21 ++- 22 files changed, 363 insertions(+), 596 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 349fc2a..f2ee21e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -29,9 +29,9 @@ import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResult; import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResults; import org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult.ReplicaLogDirInfo; import org.apache.kafka.clients.admin.internals.AdminMetadataManager; +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.internals.ConsumerProtocol; -import org.apache.kafka.clients.consumer.internals.PartitionAssignor; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.ConsumerGroupState; import org.apache.kafka.common.ElectionType; @@ -2724,7 +2724,7 @@ public class KafkaAdminClient extends AdminClient { for (DescribedGroupMember groupMember : members) { Set partitions = Collections.emptySet(); if (groupMember.memberAssignment().length > 0) { -final PartitionAssignor.Assignment assignment = ConsumerProtocol. +final ConsumerPartitionAssignor.Assignment assignment = ConsumerProtocol. deserializeAssignment(ByteBuffer.wrap(groupMember.memberAssignment())); partitions = new HashSet<>(assignment.partitions()); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java index 7eb34d4..8a18bd5 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -102,7 +102,7 @@ public class ConsumerConfig extends AbstractConfig { * partition.assignment.strategy */ public static final String PARTITION_ASSIGNMENT_STR
[kafka] branch trunk updated: KAFKA-8696: clean up Sum/Count/Total metrics (#7057)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new a8aedc8 KAFKA-8696: clean up Sum/Count/Total metrics (#7057) a8aedc8 is described below commit a8aedc85ebfadcf1472acafe2e0311a73d3040be Author: John Roesler AuthorDate: Tue Jul 23 18:54:20 2019 -0500 KAFKA-8696: clean up Sum/Count/Total metrics (#7057) * Clean up one redundant and one misplaced metric * Clarify the relationship among these metrics to avoid future confusion Reviewers: Matthias J. Sax , Bill Bejeck , Guozhang Wang --- .../consumer/internals/AbstractCoordinator.java| 4 +- .../kafka/clients/consumer/internals/Fetcher.java | 4 +- .../kafka/common/metrics/MeasurableStat.java | 2 +- .../apache/kafka/common/metrics/stats/Count.java | 30 +++--- .../common/metrics/stats}/CumulativeCount.java | 22 +++- .../stats/{Total.java => CumulativeSum.java} | 19 --- .../apache/kafka/common/metrics/stats/Meter.java | 24 .../apache/kafka/common/metrics/stats/Rate.java| 27 ++--- .../org/apache/kafka/common/metrics/stats/Sum.java | 30 +++--- .../apache/kafka/common/metrics/stats/Total.java | 36 +++- .../stats/{Count.java => WindowedCount.java} | 26 +++-- .../metrics/stats/{Sum.java => WindowedSum.java} | 11 ++-- .../org/apache/kafka/common/network/Selector.java | 18 +++--- .../kafka/common/metrics/JmxReporterTest.java | 14 ++--- .../kafka/common/metrics/KafkaMbeanTest.java | 8 +-- .../apache/kafka/common/metrics/MetricsTest.java | 64 +++--- .../apache/kafka/common/metrics/SensorTest.java| 4 +- .../kafka/common/metrics/stats/MeterTest.java | 2 +- .../java/org/apache/kafka/test/MetricsBench.java | 4 +- .../org/apache/kafka/connect/runtime/Worker.java | 14 ++--- .../kafka/connect/runtime/WorkerSinkTask.java | 10 ++-- .../kafka/connect/runtime/WorkerSourceTask.java| 6 +- .../runtime/distributed/DistributedHerder.java | 4 +- .../runtime/errors/ErrorHandlingMetrics.java | 16 +++--- .../main/scala/kafka/network/SocketServer.scala| 5 +- .../scala/kafka/server/ClientQuotaManager.scala| 4 +- .../streams/kstream/internals/metrics/Sensors.java | 8 +-- .../streams/processor/internals/StreamTask.java| 8 +-- .../internals/metrics/StreamsMetricsImpl.java | 5 +- .../processor/internals/RecordCollectorTest.java | 4 +- .../processor/internals/StandbyTaskTest.java | 4 +- .../apache/kafka/streams/TopologyTestDriver.java | 8 +-- 32 files changed, 185 insertions(+), 260 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index d5faa6e..b92a4a6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -42,9 +42,9 @@ import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.stats.Avg; -import org.apache.kafka.common.metrics.stats.Count; import org.apache.kafka.common.metrics.stats.Max; import org.apache.kafka.common.metrics.stats.Meter; +import org.apache.kafka.common.metrics.stats.WindowedCount; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.FindCoordinatorRequest; import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType; @@ -961,7 +961,7 @@ public abstract class AbstractCoordinator implements Closeable { } protected Meter createMeter(Metrics metrics, String groupName, String baseName, String descriptiveName) { -return new Meter(new Count(), +return new Meter(new WindowedCount(), metrics.metricName(baseName + "-rate", groupName, String.format("The number of %s per second", descriptiveName)), metrics.metricName(baseName + "-total", groupName, diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 08ab9fb..d4d028d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -47,10 +47,10 @@ import org.apache.kafka.common.metrics.Gauge; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; import org.apache.
[kafka] branch trunk updated: KAFKA-8392: Fix old metrics leakage by brokers that have no leadership over any partition for a topic (#6977)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new aa8e850 KAFKA-8392: Fix old metrics leakage by brokers that have no leadership over any partition for a topic (#6977) aa8e850 is described below commit aa8e850afb7baa447882b493eae29f99a39e35a0 Author: Tu V. Tran AuthorDate: Fri Jul 19 15:25:18 2019 -0700 KAFKA-8392: Fix old metrics leakage by brokers that have no leadership over any partition for a topic (#6977) * Added removeOldLeaderMetrics in BrokerTopicStats to remove MessagesInPerSec, BytesInPerSec, BytesOutPerSec for any broker that is no longer a leader of any partition for a particular topic * Modified ReplicaManager to remove the metrics of any topic that the current broker has no leadership (meaning the broker either becomes a follower for all of the partitions in that topic or stops being a replica) Reviewers: Guozhang Wang , Jun Rao --- .../scala/kafka/server/KafkaRequestHandler.scala | 99 ++ .../main/scala/kafka/server/ReplicaManager.scala | 13 ++ .../scala/unit/kafka/metrics/MetricsTest.scala | 10 +- .../unit/kafka/server/ReplicaManagerTest.scala | 148 + 4 files changed, 239 insertions(+), 31 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala index e0ad1b6..0397795 100755 --- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala +++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala @@ -146,38 +146,63 @@ class BrokerTopicMetrics(name: Option[String]) extends KafkaMetricsGroup { case Some(topic) => Map("topic" -> topic) } - val messagesInRate = newMeter(BrokerTopicStats.MessagesInPerSec, "messages", TimeUnit.SECONDS, tags) - val bytesInRate = newMeter(BrokerTopicStats.BytesInPerSec, "bytes", TimeUnit.SECONDS, tags) - val bytesOutRate = newMeter(BrokerTopicStats.BytesOutPerSec, "bytes", TimeUnit.SECONDS, tags) - val bytesRejectedRate = newMeter(BrokerTopicStats.BytesRejectedPerSec, "bytes", TimeUnit.SECONDS, tags) - private[server] val replicationBytesInRate = -if (name.isEmpty) Some(newMeter(BrokerTopicStats.ReplicationBytesInPerSec, "bytes", TimeUnit.SECONDS, tags)) + // an internal map for "lazy initialization" of certain metrics + private val metricTypeMap = new Pool[String, Meter] + + def messagesInRate = metricTypeMap.getAndMaybePut(BrokerTopicStats.MessagesInPerSec, +newMeter(BrokerTopicStats.MessagesInPerSec, "messages", TimeUnit.SECONDS, tags)) + def bytesInRate = metricTypeMap.getAndMaybePut(BrokerTopicStats.BytesInPerSec, +newMeter(BrokerTopicStats.BytesInPerSec, "bytes", TimeUnit.SECONDS, tags)) + def bytesOutRate = metricTypeMap.getAndMaybePut(BrokerTopicStats.BytesOutPerSec, +newMeter(BrokerTopicStats.BytesOutPerSec, "bytes", TimeUnit.SECONDS, tags)) + def bytesRejectedRate = metricTypeMap.getAndMaybePut(BrokerTopicStats.BytesRejectedPerSec, +newMeter(BrokerTopicStats.BytesRejectedPerSec, "bytes", TimeUnit.SECONDS, tags)) + private[server] def replicationBytesInRate = +if (name.isEmpty) Some(metricTypeMap.getAndMaybePut( + BrokerTopicStats.ReplicationBytesInPerSec, + newMeter(BrokerTopicStats.ReplicationBytesInPerSec, "bytes", TimeUnit.SECONDS, tags))) else None - private[server] val replicationBytesOutRate = -if (name.isEmpty) Some(newMeter(BrokerTopicStats.ReplicationBytesOutPerSec, "bytes", TimeUnit.SECONDS, tags)) + private[server] def replicationBytesOutRate = +if (name.isEmpty) Some(metricTypeMap.getAndMaybePut( + BrokerTopicStats.ReplicationBytesOutPerSec, + newMeter(BrokerTopicStats.ReplicationBytesOutPerSec, "bytes", TimeUnit.SECONDS, tags))) else None - val failedProduceRequestRate = newMeter(BrokerTopicStats.FailedProduceRequestsPerSec, "requests", TimeUnit.SECONDS, tags) - val failedFetchRequestRate = newMeter(BrokerTopicStats.FailedFetchRequestsPerSec, "requests", TimeUnit.SECONDS, tags) - val totalProduceRequestRate = newMeter(BrokerTopicStats.TotalProduceRequestsPerSec, "requests", TimeUnit.SECONDS, tags) - val totalFetchRequestRate = newMeter(BrokerTopicStats.TotalFetchRequestsPerSec, "requests", TimeUnit.SECONDS, tags) - val fetchMessageConversionsRate = newMeter(BrokerTopicStats.FetchMessageConversionsPerSec, "requests", TimeUnit.SECONDS, tags) - val produceMessageConversionsRate = newMeter(BrokerTopicStats.ProduceMessageConversionsPerSec, "requests", TimeUnit.SECONDS, tags) + def failedProduceRequestRate = metricTypeMap.getAndMaybePut
[kafka] branch 2.1 updated: KAFKA-8615: Change to track partition time breaks TimestampExtractor (#7054)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.1 by this push: new 772d764 KAFKA-8615: Change to track partition time breaks TimestampExtractor (#7054) 772d764 is described below commit 772d7647fe3a7fe3c3ff65a0348f3aa61664ef43 Author: A. Sophie Blee-Goldman AuthorDate: Thu Jul 18 13:54:46 2019 -0700 KAFKA-8615: Change to track partition time breaks TimestampExtractor (#7054) The timestamp extractor takes a previousTimestamp parameter which should be the partition time. This PR adds back in partition time tracking for the extractor, and renames previousTimestamp --> partitionTime Reviewers: Guozhang Wang , Bill Bejeck , Matthias J. Sax --- .../examples/pageview/JsonTimestampExtractor.java | 2 +- .../processor/ExtractRecordMetadataTimestamp.java | 10 +- .../streams/processor/FailOnInvalidTimestamp.java | 4 +- .../processor/LogAndSkipOnInvalidTimestamp.java| 4 +- .../streams/processor/TimestampExtractor.java | 4 +- .../UsePreviousTimeOnInvalidTimestamp.java | 10 +- .../processor/WallclockTimestampExtractor.java | 4 +- .../processor/internals/PartitionGroup.java| 36 --- .../streams/processor/internals/RecordQueue.java | 23 - .../streams/processor/internals/StreamTask.java| 10 +- .../apache/kafka/streams/StreamsConfigTest.java| 2 +- .../processor/internals/PartitionGroupTest.java| 8 +- .../processor/internals/RecordQueueTest.java | 106 ++--- .../apache/kafka/test/MockTimestampExtractor.java | 2 +- 14 files changed, 166 insertions(+), 59 deletions(-) diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java index 4f6257a..d760183 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java @@ -27,7 +27,7 @@ import org.apache.kafka.streams.processor.TimestampExtractor; public class JsonTimestampExtractor implements TimestampExtractor { @Override -public long extract(final ConsumerRecord record, final long previousTimestamp) { +public long extract(final ConsumerRecord record, final long partitionTime) { if (record.value() instanceof PageViewTypedDemo.PageView) { return ((PageViewTypedDemo.PageView) record.value()).timestamp; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ExtractRecordMetadataTimestamp.java b/streams/src/main/java/org/apache/kafka/streams/processor/ExtractRecordMetadataTimestamp.java index 79c8dd3..3c7428a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/ExtractRecordMetadataTimestamp.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/ExtractRecordMetadataTimestamp.java @@ -50,15 +50,15 @@ abstract class ExtractRecordMetadataTimestamp implements TimestampExtractor { * Extracts the embedded metadata timestamp from the given {@link ConsumerRecord}. * * @param record a data record - * @param previousTimestamp the latest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown) + * @param partitionTime the highest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown) * @return the embedded metadata timestamp of the given {@link ConsumerRecord} */ @Override -public long extract(final ConsumerRecord record, final long previousTimestamp) { +public long extract(final ConsumerRecord record, final long partitionTime) { final long timestamp = record.timestamp(); if (timestamp < 0) { -return onInvalidTimestamp(record, timestamp, previousTimestamp); +return onInvalidTimestamp(record, timestamp, partitionTime); } return timestamp; @@ -69,10 +69,10 @@ abstract class ExtractRecordMetadataTimestamp implements TimestampExtractor { * * @param record a data record * @param recordTimestamp the timestamp extractor from the record - * @param previousTimestamp the latest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown) + * @param partitionTime the highest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown) * @return a new timestamp for the record (if negative, record will not be processed but dropped silently) */ public abstract long onInvalidTimestamp(final ConsumerRecord record, final long recordTim
[kafka] branch 2.2 updated: KAFKA-8615: Change to track partition time breaks TimestampExtractor (#7054)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch 2.2 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.2 by this push: new f1300c7 KAFKA-8615: Change to track partition time breaks TimestampExtractor (#7054) f1300c7 is described below commit f1300c71c7ba9d0ef6571949587923675109ba7b Author: A. Sophie Blee-Goldman AuthorDate: Thu Jul 18 13:54:46 2019 -0700 KAFKA-8615: Change to track partition time breaks TimestampExtractor (#7054) The timestamp extractor takes a previousTimestamp parameter which should be the partition time. This PR adds back in partition time tracking for the extractor, and renames previousTimestamp --> partitionTime Reviewers: Guozhang Wang , Bill Bejeck , Matthias J. Sax --- .../examples/pageview/JsonTimestampExtractor.java | 2 +- .../processor/ExtractRecordMetadataTimestamp.java | 10 +- .../streams/processor/FailOnInvalidTimestamp.java | 4 +- .../processor/LogAndSkipOnInvalidTimestamp.java| 4 +- .../streams/processor/TimestampExtractor.java | 4 +- .../UsePreviousTimeOnInvalidTimestamp.java | 10 +- .../processor/WallclockTimestampExtractor.java | 4 +- .../processor/internals/PartitionGroup.java| 18 ++-- .../streams/processor/internals/RecordQueue.java | 23 - .../streams/processor/internals/StreamTask.java| 10 +- .../apache/kafka/streams/StreamsConfigTest.java| 2 +- .../processor/internals/PartitionGroupTest.java| 8 +- .../processor/internals/ProcessorTopologyTest.java | 2 +- .../processor/internals/RecordQueueTest.java | 106 ++--- .../apache/kafka/test/MockTimestampExtractor.java | 2 +- 15 files changed, 151 insertions(+), 58 deletions(-) diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java index 4f6257a..d760183 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java @@ -27,7 +27,7 @@ import org.apache.kafka.streams.processor.TimestampExtractor; public class JsonTimestampExtractor implements TimestampExtractor { @Override -public long extract(final ConsumerRecord record, final long previousTimestamp) { +public long extract(final ConsumerRecord record, final long partitionTime) { if (record.value() instanceof PageViewTypedDemo.PageView) { return ((PageViewTypedDemo.PageView) record.value()).timestamp; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ExtractRecordMetadataTimestamp.java b/streams/src/main/java/org/apache/kafka/streams/processor/ExtractRecordMetadataTimestamp.java index 79c8dd3..3c7428a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/ExtractRecordMetadataTimestamp.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/ExtractRecordMetadataTimestamp.java @@ -50,15 +50,15 @@ abstract class ExtractRecordMetadataTimestamp implements TimestampExtractor { * Extracts the embedded metadata timestamp from the given {@link ConsumerRecord}. * * @param record a data record - * @param previousTimestamp the latest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown) + * @param partitionTime the highest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown) * @return the embedded metadata timestamp of the given {@link ConsumerRecord} */ @Override -public long extract(final ConsumerRecord record, final long previousTimestamp) { +public long extract(final ConsumerRecord record, final long partitionTime) { final long timestamp = record.timestamp(); if (timestamp < 0) { -return onInvalidTimestamp(record, timestamp, previousTimestamp); +return onInvalidTimestamp(record, timestamp, partitionTime); } return timestamp; @@ -69,10 +69,10 @@ abstract class ExtractRecordMetadataTimestamp implements TimestampExtractor { * * @param record a data record * @param recordTimestamp the timestamp extractor from the record - * @param previousTimestamp the latest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown) + * @param partitionTime the highest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown) * @return a new timestamp for the record (if negative, record will not be processed but dropped silently) */ public abstract long onInvalidTimestamp(final ConsumerRecord