[GitHub] [kafka] showuon opened a new pull request #11234: [WIP] KAFKA-13212: add support infinite query for session store
showuon opened a new pull request #11234: URL: https://github.com/apache/kafka/pull/11234 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10038) ConsumerPerformance.scala supports the setting of client.id
[ https://issues.apache.org/jira/browse/KAFKA-10038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17401490#comment-17401490 ] Yanwen Lin commented on KAFKA-10038: Hi [~apompeev], are you still working on this one or I can take over? Thanks! > ConsumerPerformance.scala supports the setting of client.id > --- > > Key: KAFKA-10038 > URL: https://issues.apache.org/jira/browse/KAFKA-10038 > Project: Kafka > Issue Type: Improvement > Components: consumer, core >Affects Versions: 2.1.1 > Environment: Trunk branch >Reporter: tigertan >Priority: Minor > Labels: newbie, performance > > ConsumerPerformance.scala supports the setting of "client.id", which is a > reasonable requirement, and the way "console consumer" and "console producer" > handle "client.id" can be unified. "client.id" defaults to > "perf-consumer-client". > We often use client.id in quotas, if the script of > kafka-producer-perf-test.sh supports the setting of "client.id" , we can do > quota testing through scripts without writing our own consumer programs. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-12766) Consider Disabling WAL-related Options in RocksDB
[ https://issues.apache.org/jira/browse/KAFKA-12766?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tomer Wizman reassigned KAFKA-12766: Assignee: Tomer Wizman > Consider Disabling WAL-related Options in RocksDB > - > > Key: KAFKA-12766 > URL: https://issues.apache.org/jira/browse/KAFKA-12766 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Bruno Cadonna >Assignee: Tomer Wizman >Priority: Minor > Labels: newbie, newbie++ > Fix For: 3.1.0 > > > Streams disables the write-ahead log (WAL) provided by RocksDB since it > replicates the data in changelog topics. Hence, it does not make much sense > to set WAL-related configs for RocksDB instances within Streams. > Streams could: > - disable WAL-related options > - ignore WAL-related options > - throw an exception when a WAL-related option is set. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] kkonstantine merged pull request #11232: MINOR: Add missing licenses and update versions in LICENSE-binary for 3.0
kkonstantine merged pull request #11232: URL: https://github.com/apache/kafka/pull/11232 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rodesai commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams
rodesai commented on a change in pull request #11149: URL: https://github.com/apache/kafka/pull/11149#discussion_r691775378 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java ## @@ -1121,4 +1125,60 @@ public void shouldResetTransactionInitializedOnResetProducer() { verify(mockedProducer); } +@Test +public void shouldComputeTotalBlockedTime() { +setProducerMetrics(nonEosMockProducer, 1, 2, 3, 4, 5, 6, 7); + +final double expectedTotalBlocked = 1 + 2 + 3 + 4 + 5 + 6 + 7; +assertThat(nonEosStreamsProducer.totalBlockedTime(), equalTo(expectedTotalBlocked)); +} + +@Test +public void shouldComputeTotalBlockedTimeAfterReset() { +setProducerMetrics(eosBetaMockProducer, 1, 2, 3, 4, 5, 6, 7); +eosBetaStreamsProducer.resetProducer(); + +final double expectedTotalBlocked = 1 + 2 + 3 + 4 + 5 + 6 + 7; +assertThat(eosBetaStreamsProducer.totalBlockedTime(), greaterThan(2 * expectedTotalBlocked)); Review comment: ah somehow I thought we couldn't use the hamcrest matchers. thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rodesai commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams
rodesai commented on a change in pull request #11149: URL: https://github.com/apache/kafka/pull/11149#discussion_r691768579 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsThreadTotalBlockedTime.java ## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.processor.internals; + +import java.util.Map; +import java.util.function.Supplier; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; + +public class StreamsThreadTotalBlockedTime { +final Consumer consumer; +final Consumer restoreConsumer; +final Supplier producerTotalBlockedTime; + +StreamsThreadTotalBlockedTime( +final Consumer consumer, +final Consumer restoreConsumer, +final Supplier producerTotalBlockedTime) { +this.consumer = consumer; +this.restoreConsumer = restoreConsumer; +this.producerTotalBlockedTime = producerTotalBlockedTime; +} + +final double getMetricValue( Review comment: typo - i'll fix -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rodesai commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams
rodesai commented on a change in pull request #11149: URL: https://github.com/apache/kafka/pull/11149#discussion_r691767731 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java ## @@ -200,6 +201,30 @@ public RocksDBMetricsRecordingTrigger rocksDBMetricsRecordingTrigger() { } } +public void addThreadLevelImmutableMetric(final String name, +final String description, +final String threadId, +final T value) { +final MetricName metricName = metrics.metricName( +name, THREAD_LEVEL_GROUP, description, threadLevelTagMap(threadId)); +synchronized (threadLevelMetrics) { Review comment: Ah I wasn't aware of the external vs internal sensor names. Now that I read through this again it seems to be that some external caller with a reference to `StreamsMetrics` can add their own sensors, which don't get cleaned up when the thread goes away. In this case we don't have external callers adding any thread-level metrics to the map, so we don't really need the prefix. Happy to include it to keep things consistent. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13218) kafka deleted unexpired message unexpectedly
leiminghany created KAFKA-13218: --- Summary: kafka deleted unexpired message unexpectedly Key: KAFKA-13218 URL: https://issues.apache.org/jira/browse/KAFKA-13218 Project: Kafka Issue Type: Bug Components: core Affects Versions: 2.7.0 Environment: docker file : from openjdk:11-jre-slim-buster RUN apt-get update RUN apt-get -y install net-tools iputils-ping curl procps RUN curl -OL https://mirrors.bfsu.edu.cn/apache/kafka/2.7.0/kafka_2.13-2.7.0.tgz && tar -xzf kafka_2.13-2.7.0.tgz && rm -f kafka_2.13-2.7.0.tgz ENV PATH "$PATH:/kafka_2.13-2.7.0/bin" RUN mkdir /etc/kafka COPY server.properties /etc/kafka/server.properties CMD ["kafka-server-start.sh", "/etc/kafka/server.properties"] configure file: broker.id=2 log.dirs=/var/lib/kafka log.segment.bytes=10485760 zookeeper.connect=zk-cs.default.svc.cluster.local:2181 sasl.enabled.mechanisms=PLAIN sasl.mechanism.inter.broker.protocol=PLAIN inter.broker.listener.name=INTERNAL listener.security.protocol.map=INTERNAL:SASL_PLAINTEXT,EXTERNAL:SASL_PLAINTEXT listeners=INTERNAL://:9092,EXTERNAL://:30101 advertised.listeners=INTERNAL://kafka-2.kafka.default.svc.cluster.local:9092,EXTERNAL://192.168.0.13:30101 Reporter: leiminghany I created a topic like this : {code:java} kafka-topics.sh --create --zookeeper zk-cs.default.svc.cluster.local:2181 --partitions 64 --replication-factor 2 --topic signal --config retention.ms=6048000{code} and then I send several message into partition 2 of this topic . after than, I try to consumer the message from this partiton, but I can't get any messages. I read the kafka data directory, I found the log file was rolled, here is the files: {code:java} root@kafka-2:/var/lib/kafka/signal-2# ls 0005.index 0005.log 0005.snapshot 0005.timeindex leader-epoch-checkpoint {code} and the dump info is : {code:java} root@kafka-2:/var/lib/kafka/signal-2# kafka-run-class.sh kafka.tools.DumpLogSegments --deep-iteration --files 0005.log Dumping 0005.log Starting offset: 5 root@kafka-2:/var/lib/kafka/signal-2# root@kafka-2:/var/lib/kafka/signal-2# kafka-run-class.sh kafka.tools.DumpLogSegments --deep-iteration --files 0005.index Dumping 0005.index root@kafka-2:/var/lib/kafka/signal-2# kafka-run-class.sh kafka.tools.DumpLogSegments --deep-iteration --files 0005.snapshot Dumping 0005.snapshot root@kafka-2:/var/lib/kafka/signal-2# kafka-run-class.sh kafka.tools.DumpLogSegments --deep-iteration --files 0005.timeindex Dumping 0005.timeindex timestamp: 0 offset: 5 The following indexed offsets are not found in the log. Indexed offset: 5, found log offset: -1 root@kafka-2:/var/lib/kafka/signal-2# cat leader-epoch-checkpoint 0 1 0 5 {code} here is the kafka console log about this partition: {code:java} [2021-08-18 12:04:57,652] INFO [ProducerStateManager partition=signal-2] Writing producer snapshot at offset 5 (kafka.log.ProducerStateManager) [2021-08-18 12:04:57,653] INFO [Log partition=signal-2, dir=/var/lib/kafka] Rolled new log segment at offset 5 in 7 ms. (kafka.log.Log) [2021-08-18 12:04:57,653] INFO [Log partition=signal-2, dir=/var/lib/kafka] Deleting segment LogSegment(baseOffset=0, size=318, lastModifiedTime=1629288220552, largestRecordTimestamp=Some(0)) due to retention time 6048000ms breach based on the largest record timestamp in the segment (kafka.log.Log) [2021-08-18 12:04:57,653] INFO [Log partition=signal-2, dir=/var/lib/kafka] Incremented log start offset to 5 due to segment deletion (kafka.log.Log) [2021-08-18 12:05:57,671] INFO [Log partition=signal-2, dir=/var/lib/kafka] Deleting segment files LogSegment(baseOffset=0, size=318, lastModifiedTime=1629288220552, largestRecordTimestamp=Some(0)) (kafka.log.Log) [2021-08-18 12:05:57,672] INFO Deleted log /var/lib/kafka/signal-2/.log.deleted. (kafka.log.LogSegment) [2021-08-18 12:05:57,672] INFO Deleted offset index /var/lib/kafka/signal-2/.index.deleted. (kafka.log.LogSegment) [2021-08-18 12:05:57,673] INFO Deleted time index /var/lib/kafka/signal-2/.timeindex.deleted. (kafka.log.LogSegment) {code} I think the `largestRecordTimestamp=Some(0)` may be the clue to track this problem, But I can not find out the exact reason。 anyone can help me? this problem is happenned occasionally. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] rhauch commented on a change in pull request #11232: MINOR: Add missing licenses and update versions in LICENSE-binary for 3.0
rhauch commented on a change in pull request #11232: URL: https://github.com/apache/kafka/pull/11232#discussion_r691743181 ## File path: LICENSE-binary ## @@ -208,52 +208,52 @@ License Version 2.0: audience-annotations-0.5.0 commons-cli-1.4 commons-lang3-3.8.1 -jackson-annotations-2.10.5 -jackson-core-2.10.5 -jackson-databind-2.10.5.1 -jackson-dataformat-csv-2.10.5 -jackson-datatype-jdk8-2.10.5 -jackson-jaxrs-base-2.10.5 -jackson-jaxrs-json-provider-2.10.5 -jackson-module-jaxb-annotations-2.10.5 +jackson-annotations-2.12.3 +jackson-core-2.12.3 +jackson-databind-2.12.3 +jackson-dataformat-csv-2.12.3 +jackson-datatype-jdk8-2.12.3 +jackson-jaxrs-base-2.12.3 +jackson-jaxrs-json-provider-2.12.3 +jackson-module-jaxb-annotations-2.12.3 jackson-module-paranamer-2.10.5 -jackson-module-scala_2.13-2.10.5 +jackson-module-scala_2.13-2.12.3 jakarta.validation-api-2.0.2 javassist-3.27.0-GA -jetty-client-9.4.38.v20210224 -jetty-continuation-9.4.38.v20210224 -jetty-http-9.4.38.v20210224 -jetty-io-9.4.38.v20210224 -jetty-security-9.4.38.v20210224 -jetty-server-9.4.38.v20210224 -jetty-servlet-9.4.38.v20210224 -jetty-servlets-9.4.38.v20210224 -jetty-util-9.4.38.v20210224 -jetty-util-ajax-9.4.38.v20210224 -jersey-common-2.31 -jersey-server-2.31 +jetty-client-9.4.43.v20210629 +jetty-continuation-9.4.43.v20210629 +jetty-http-9.4.43.v20210629 +jetty-io-9.4.43.v20210629 +jetty-security-9.4.43.v20210629 +jetty-server-9.4.43.v20210629 +jetty-servlet-9.4.43.v20210629 +jetty-servlets-9.4.43.v20210629 +jetty-util-9.4.43.v20210629 +jetty-util-ajax-9.4.43.v20210629 +jersey-common-2.34 +jersey-server-2.34 log4j-1.2.17 lz4-java-1.7.1 -maven-artifact-3.6.3 -metrics-core-2.2.0 -netty-buffer-4.1.59.Final -netty-codec-4.1.59.Final -netty-common-4.1.59.Final -netty-handler-4.1.59.Final -netty-resolver-4.1.59.Final -netty-transport-4.1.59.Final -netty-transport-native-epoll-4.1.59.Final -netty-transport-native-unix-common-4.1.59.Final +maven-artifact-3.8.1 +metrics-core-4.1.12.1 Review comment: Ah, I see that now. Makes sense! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax opened a new pull request #11233: HOTIFX: Disable spurious left/outer stream-stream join fix
mjsax opened a new pull request #11233: URL: https://github.com/apache/kafka/pull/11233 KAFKA-10847 improves stream-stream left/outer joins to avoid spurious left/outer join results. However, it introduces regression bug KAFKA-13216. This PR disables KAFKA-10847 by partially rolling back KIP-633 changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13217) Reconsider skipping the LeaveGroup on close() or add an overload that does so
[ https://issues.apache.org/jira/browse/KAFKA-13217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17401433#comment-17401433 ] A. Sophie Blee-Goldman commented on KAFKA-13217: This is all the more important given the recent increase in default session.timeout to 45s, since that's a rather long time to go without noticing that a consumer has indeed permanently left the group > Reconsider skipping the LeaveGroup on close() or add an overload that does so > - > > Key: KAFKA-13217 > URL: https://issues.apache.org/jira/browse/KAFKA-13217 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: A. Sophie Blee-Goldman >Priority: Major > > In Kafka Streams, when an instance is shut down via the close() API, we > intentionally skip sending a LeaveGroup request. This is because often the > shutdown is not due to a scaling down event but instead some transient > closure, such as during a rolling bounce. In cases where the instance is > expected to start up again shortly after, we originally wanted to avoid that > member's tasks from being redistributed across the remaining group members > since this would disturb the stable assignment and could cause unnecessary > state migration and restoration. We also hoped > to limit the disruption to just a single rebalance, rather than forcing the > group to rebalance once when the member shuts down and then again when it > comes back up. So it's really an optimization for the case in which the > shutdown is temporary. > > That said, many of those optimizations are no longer necessary or at least > much less useful given recent features and improvements. For example > rebalances are now lightweight so skipping the 2nd rebalance is not as worth > optimizing for, and the new assignor will take into account the actual > underlying state for each task/partition assignment, rather than just the > previous assignment, so the assignment should be considerably more stable > across bounces and rolling restarts. > > Given that, it might be time to reconsider this optimization. Alternatively, > we could introduce another form of the close() API that forces the member to > leave the group, to be used in event of actual scale down rather than a > transient bounce. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13217) Reconsider skipping the LeaveGroup on close() or add an overload that does so
A. Sophie Blee-Goldman created KAFKA-13217: -- Summary: Reconsider skipping the LeaveGroup on close() or add an overload that does so Key: KAFKA-13217 URL: https://issues.apache.org/jira/browse/KAFKA-13217 Project: Kafka Issue Type: Improvement Components: streams Reporter: A. Sophie Blee-Goldman In Kafka Streams, when an instance is shut down via the close() API, we intentionally skip sending a LeaveGroup request. This is because often the shutdown is not due to a scaling down event but instead some transient closure, such as during a rolling bounce. In cases where the instance is expected to start up again shortly after, we originally wanted to avoid that member's tasks from being redistributed across the remaining group members since this would disturb the stable assignment and could cause unnecessary state migration and restoration. We also hoped to limit the disruption to just a single rebalance, rather than forcing the group to rebalance once when the member shuts down and then again when it comes back up. So it's really an optimization for the case in which the shutdown is temporary. That said, many of those optimizations are no longer necessary or at least much less useful given recent features and improvements. For example rebalances are now lightweight so skipping the 2nd rebalance is not as worth optimizing for, and the new assignor will take into account the actual underlying state for each task/partition assignment, rather than just the previous assignment, so the assignment should be considerably more stable across bounces and rolling restarts. Given that, it might be time to reconsider this optimization. Alternatively, we could introduce another form of the close() API that forces the member to leave the group, to be used in event of actual scale down rather than a transient bounce. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13204) wrong assignor selected if the assignor name is identical
[ https://issues.apache.org/jira/browse/KAFKA-13204?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-13204: --- Fix Version/s: 3.1.0 > wrong assignor selected if the assignor name is identical > - > > Key: KAFKA-13204 > URL: https://issues.apache.org/jira/browse/KAFKA-13204 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.8.0 >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > Fix For: 3.1.0 > > > We used the partition assignor name to identify which assignor to use in > consumer coordinator. But we didn't do any assignor name conflict check, > which will cause the wrong assignor got selected when performing assignment. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] kkonstantine edited a comment on pull request #11232: MINOR: Update package versions in LICENSE-binary for 3.0
kkonstantine edited a comment on pull request #11232: URL: https://github.com/apache/kafka/pull/11232#issuecomment-901540065 Hey @rhauch. I replied to the other comments inline. Regarding your question: > Do you plan to backport this, and if so to what branches? This was ran against the 3.0 produced package (`tgz`). So I'm only planning to port it to the 3.0 branch. [KAFKA-12622](https://issues.apache.org/jira/browse/KAFKA-12622) describes the process as it was followed previously. I created a program to automate license checking but it's not ready to be shared, so I'm excluding it from this PR here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kkonstantine edited a comment on pull request #11232: MINOR: Update package versions in LICENSE-binary for 3.0
kkonstantine edited a comment on pull request #11232: URL: https://github.com/apache/kafka/pull/11232#issuecomment-901540065 Hey @rhauch. I replied to the other comments inline. Regarding your question: > Do you plan to backport this, and if so to what branches? This was ran against the 3.0 produced package (`tgz`). So I'm only planning to port it to the 3.0 branch. [KAFKA-12622](https://issues.apache.org/jira/browse/KAFKA-12622) describes the process previously. I created a program to automate but it's not ready to be shared, so I'm excluding it from this PR here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kkonstantine commented on pull request #11232: MINOR: Update package versions in LICENSE-binary for 3.0
kkonstantine commented on pull request #11232: URL: https://github.com/apache/kafka/pull/11232#issuecomment-901540065 Hey @rhauch. I replied to the other comments inline. Regarding your question: > Do you plan to backport this, and if so to what branches? This was run against the 3.0 produced package (`tgz`). So I'm only planning to port it to the 3.0 branch. [KAFKA-12622](https://issues.apache.org/jira/browse/KAFKA-12622) describes the process previously. I created a program to automate but it's not ready to be shared, so I'm excluding it from this PR here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kkonstantine commented on a change in pull request #11232: MINOR: Update package versions in LICENSE-binary for 3.0
kkonstantine commented on a change in pull request #11232: URL: https://github.com/apache/kafka/pull/11232#discussion_r691710928 ## File path: LICENSE-binary ## @@ -208,52 +208,52 @@ License Version 2.0: audience-annotations-0.5.0 commons-cli-1.4 commons-lang3-3.8.1 -jackson-annotations-2.10.5 -jackson-core-2.10.5 -jackson-databind-2.10.5.1 -jackson-dataformat-csv-2.10.5 -jackson-datatype-jdk8-2.10.5 -jackson-jaxrs-base-2.10.5 -jackson-jaxrs-json-provider-2.10.5 -jackson-module-jaxb-annotations-2.10.5 +jackson-annotations-2.12.3 +jackson-core-2.12.3 +jackson-databind-2.12.3 +jackson-dataformat-csv-2.12.3 +jackson-datatype-jdk8-2.12.3 +jackson-jaxrs-base-2.12.3 +jackson-jaxrs-json-provider-2.12.3 +jackson-module-jaxb-annotations-2.12.3 jackson-module-paranamer-2.10.5 -jackson-module-scala_2.13-2.10.5 +jackson-module-scala_2.13-2.12.3 jakarta.validation-api-2.0.2 javassist-3.27.0-GA -jetty-client-9.4.38.v20210224 -jetty-continuation-9.4.38.v20210224 -jetty-http-9.4.38.v20210224 -jetty-io-9.4.38.v20210224 -jetty-security-9.4.38.v20210224 -jetty-server-9.4.38.v20210224 -jetty-servlet-9.4.38.v20210224 -jetty-servlets-9.4.38.v20210224 -jetty-util-9.4.38.v20210224 -jetty-util-ajax-9.4.38.v20210224 -jersey-common-2.31 -jersey-server-2.31 +jetty-client-9.4.43.v20210629 +jetty-continuation-9.4.43.v20210629 +jetty-http-9.4.43.v20210629 +jetty-io-9.4.43.v20210629 +jetty-security-9.4.43.v20210629 +jetty-server-9.4.43.v20210629 +jetty-servlet-9.4.43.v20210629 +jetty-servlets-9.4.43.v20210629 +jetty-util-9.4.43.v20210629 +jetty-util-ajax-9.4.43.v20210629 +jersey-common-2.34 +jersey-server-2.34 log4j-1.2.17 lz4-java-1.7.1 -maven-artifact-3.6.3 -metrics-core-2.2.0 -netty-buffer-4.1.59.Final -netty-codec-4.1.59.Final -netty-common-4.1.59.Final -netty-handler-4.1.59.Final -netty-resolver-4.1.59.Final -netty-transport-4.1.59.Final -netty-transport-native-epoll-4.1.59.Final -netty-transport-native-unix-common-4.1.59.Final +maven-artifact-3.8.1 +metrics-core-4.1.12.1 Review comment: The changes were autogenerated. I have a program that I'm hoping to share soon that checks for missing/stale licenses. It would correspond to: https://issues.apache.org/jira/browse/KAFKA-12622 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kkonstantine commented on a change in pull request #11232: MINOR: Update package versions in LICENSE-binary for 3.0
kkonstantine commented on a change in pull request #11232: URL: https://github.com/apache/kafka/pull/11232#discussion_r691710720 ## File path: LICENSE-binary ## @@ -208,52 +208,52 @@ License Version 2.0: audience-annotations-0.5.0 commons-cli-1.4 commons-lang3-3.8.1 -jackson-annotations-2.10.5 -jackson-core-2.10.5 -jackson-databind-2.10.5.1 -jackson-dataformat-csv-2.10.5 -jackson-datatype-jdk8-2.10.5 -jackson-jaxrs-base-2.10.5 -jackson-jaxrs-json-provider-2.10.5 -jackson-module-jaxb-annotations-2.10.5 +jackson-annotations-2.12.3 +jackson-core-2.12.3 +jackson-databind-2.12.3 +jackson-dataformat-csv-2.12.3 +jackson-datatype-jdk8-2.12.3 +jackson-jaxrs-base-2.12.3 +jackson-jaxrs-json-provider-2.12.3 +jackson-module-jaxb-annotations-2.12.3 jackson-module-paranamer-2.10.5 -jackson-module-scala_2.13-2.10.5 +jackson-module-scala_2.13-2.12.3 jakarta.validation-api-2.0.2 javassist-3.27.0-GA -jetty-client-9.4.38.v20210224 -jetty-continuation-9.4.38.v20210224 -jetty-http-9.4.38.v20210224 -jetty-io-9.4.38.v20210224 -jetty-security-9.4.38.v20210224 -jetty-server-9.4.38.v20210224 -jetty-servlet-9.4.38.v20210224 -jetty-servlets-9.4.38.v20210224 -jetty-util-9.4.38.v20210224 -jetty-util-ajax-9.4.38.v20210224 -jersey-common-2.31 -jersey-server-2.31 +jetty-client-9.4.43.v20210629 +jetty-continuation-9.4.43.v20210629 +jetty-http-9.4.43.v20210629 +jetty-io-9.4.43.v20210629 +jetty-security-9.4.43.v20210629 +jetty-server-9.4.43.v20210629 +jetty-servlet-9.4.43.v20210629 +jetty-servlets-9.4.43.v20210629 +jetty-util-9.4.43.v20210629 +jetty-util-ajax-9.4.43.v20210629 +jersey-common-2.34 +jersey-server-2.34 log4j-1.2.17 lz4-java-1.7.1 -maven-artifact-3.6.3 -metrics-core-2.2.0 -netty-buffer-4.1.59.Final -netty-codec-4.1.59.Final -netty-common-4.1.59.Final -netty-handler-4.1.59.Final -netty-resolver-4.1.59.Final -netty-transport-4.1.59.Final -netty-transport-native-epoll-4.1.59.Final -netty-transport-native-unix-common-4.1.59.Final +maven-artifact-3.8.1 +metrics-core-4.1.12.1 Review comment: Similar to: https://github.com/apache/kafka/pull/10474 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kkonstantine commented on a change in pull request #11232: MINOR: Update package versions in LICENSE-binary for 3.0
kkonstantine commented on a change in pull request #11232: URL: https://github.com/apache/kafka/pull/11232#discussion_r691710309 ## File path: LICENSE-binary ## @@ -208,52 +208,52 @@ License Version 2.0: audience-annotations-0.5.0 commons-cli-1.4 commons-lang3-3.8.1 -jackson-annotations-2.10.5 -jackson-core-2.10.5 -jackson-databind-2.10.5.1 -jackson-dataformat-csv-2.10.5 -jackson-datatype-jdk8-2.10.5 -jackson-jaxrs-base-2.10.5 -jackson-jaxrs-json-provider-2.10.5 -jackson-module-jaxb-annotations-2.10.5 +jackson-annotations-2.12.3 +jackson-core-2.12.3 +jackson-databind-2.12.3 +jackson-dataformat-csv-2.12.3 +jackson-datatype-jdk8-2.12.3 +jackson-jaxrs-base-2.12.3 +jackson-jaxrs-json-provider-2.12.3 +jackson-module-jaxb-annotations-2.12.3 jackson-module-paranamer-2.10.5 -jackson-module-scala_2.13-2.10.5 +jackson-module-scala_2.13-2.12.3 jakarta.validation-api-2.0.2 javassist-3.27.0-GA -jetty-client-9.4.38.v20210224 -jetty-continuation-9.4.38.v20210224 -jetty-http-9.4.38.v20210224 -jetty-io-9.4.38.v20210224 -jetty-security-9.4.38.v20210224 -jetty-server-9.4.38.v20210224 -jetty-servlet-9.4.38.v20210224 -jetty-servlets-9.4.38.v20210224 -jetty-util-9.4.38.v20210224 -jetty-util-ajax-9.4.38.v20210224 -jersey-common-2.31 -jersey-server-2.31 +jetty-client-9.4.43.v20210629 +jetty-continuation-9.4.43.v20210629 +jetty-http-9.4.43.v20210629 +jetty-io-9.4.43.v20210629 +jetty-security-9.4.43.v20210629 +jetty-server-9.4.43.v20210629 +jetty-servlet-9.4.43.v20210629 +jetty-servlets-9.4.43.v20210629 +jetty-util-9.4.43.v20210629 +jetty-util-ajax-9.4.43.v20210629 +jersey-common-2.34 +jersey-server-2.34 log4j-1.2.17 lz4-java-1.7.1 -maven-artifact-3.6.3 -metrics-core-2.2.0 -netty-buffer-4.1.59.Final -netty-codec-4.1.59.Final -netty-common-4.1.59.Final -netty-handler-4.1.59.Final -netty-resolver-4.1.59.Final -netty-transport-4.1.59.Final -netty-transport-native-epoll-4.1.59.Final -netty-transport-native-unix-common-4.1.59.Final +maven-artifact-3.8.1 +metrics-core-4.1.12.1 Review comment: I probably had to clarify. These updates are in the LICENSE file and reflect what we already ship with 3.0. They are not version upgrades themselves. So this PR brings the LICENSE file in sync to the actual versions we ship. In the process I discovered a single missing license. Apologies for the confusion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kkonstantine commented on a change in pull request #11232: MINOR: Update package versions in LICENSE-binary for 3.0
kkonstantine commented on a change in pull request #11232: URL: https://github.com/apache/kafka/pull/11232#discussion_r691710309 ## File path: LICENSE-binary ## @@ -208,52 +208,52 @@ License Version 2.0: audience-annotations-0.5.0 commons-cli-1.4 commons-lang3-3.8.1 -jackson-annotations-2.10.5 -jackson-core-2.10.5 -jackson-databind-2.10.5.1 -jackson-dataformat-csv-2.10.5 -jackson-datatype-jdk8-2.10.5 -jackson-jaxrs-base-2.10.5 -jackson-jaxrs-json-provider-2.10.5 -jackson-module-jaxb-annotations-2.10.5 +jackson-annotations-2.12.3 +jackson-core-2.12.3 +jackson-databind-2.12.3 +jackson-dataformat-csv-2.12.3 +jackson-datatype-jdk8-2.12.3 +jackson-jaxrs-base-2.12.3 +jackson-jaxrs-json-provider-2.12.3 +jackson-module-jaxb-annotations-2.12.3 jackson-module-paranamer-2.10.5 -jackson-module-scala_2.13-2.10.5 +jackson-module-scala_2.13-2.12.3 jakarta.validation-api-2.0.2 javassist-3.27.0-GA -jetty-client-9.4.38.v20210224 -jetty-continuation-9.4.38.v20210224 -jetty-http-9.4.38.v20210224 -jetty-io-9.4.38.v20210224 -jetty-security-9.4.38.v20210224 -jetty-server-9.4.38.v20210224 -jetty-servlet-9.4.38.v20210224 -jetty-servlets-9.4.38.v20210224 -jetty-util-9.4.38.v20210224 -jetty-util-ajax-9.4.38.v20210224 -jersey-common-2.31 -jersey-server-2.31 +jetty-client-9.4.43.v20210629 +jetty-continuation-9.4.43.v20210629 +jetty-http-9.4.43.v20210629 +jetty-io-9.4.43.v20210629 +jetty-security-9.4.43.v20210629 +jetty-server-9.4.43.v20210629 +jetty-servlet-9.4.43.v20210629 +jetty-servlets-9.4.43.v20210629 +jetty-util-9.4.43.v20210629 +jetty-util-ajax-9.4.43.v20210629 +jersey-common-2.34 +jersey-server-2.34 log4j-1.2.17 lz4-java-1.7.1 -maven-artifact-3.6.3 -metrics-core-2.2.0 -netty-buffer-4.1.59.Final -netty-codec-4.1.59.Final -netty-common-4.1.59.Final -netty-handler-4.1.59.Final -netty-resolver-4.1.59.Final -netty-transport-4.1.59.Final -netty-transport-native-epoll-4.1.59.Final -netty-transport-native-unix-common-4.1.59.Final +maven-artifact-3.8.1 +metrics-core-4.1.12.1 Review comment: I probably had to clarify. These updates are in the LICENSE file and reflect what we already ship with 3.0. They are not version upgrades themselves. So this PR bring the LICENSE file in sync to the actual versions we ship. In the process I discovered a single missing license. Apologies for the confusion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch commented on pull request #11232: MINOR: Update package versions in LICENSE-binary for 3.0
rhauch commented on pull request #11232: URL: https://github.com/apache/kafka/pull/11232#issuecomment-901534553 Do you plan to backport this, and if so to what branches? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch commented on a change in pull request #11232: MINOR: Update package versions in LICENSE-binary for 3.0
rhauch commented on a change in pull request #11232: URL: https://github.com/apache/kafka/pull/11232#discussion_r691705388 ## File path: LICENSE-binary ## @@ -208,52 +208,52 @@ License Version 2.0: audience-annotations-0.5.0 commons-cli-1.4 commons-lang3-3.8.1 -jackson-annotations-2.10.5 -jackson-core-2.10.5 -jackson-databind-2.10.5.1 -jackson-dataformat-csv-2.10.5 -jackson-datatype-jdk8-2.10.5 -jackson-jaxrs-base-2.10.5 -jackson-jaxrs-json-provider-2.10.5 -jackson-module-jaxb-annotations-2.10.5 +jackson-annotations-2.12.3 +jackson-core-2.12.3 +jackson-databind-2.12.3 +jackson-dataformat-csv-2.12.3 +jackson-datatype-jdk8-2.12.3 +jackson-jaxrs-base-2.12.3 +jackson-jaxrs-json-provider-2.12.3 +jackson-module-jaxb-annotations-2.12.3 jackson-module-paranamer-2.10.5 -jackson-module-scala_2.13-2.10.5 +jackson-module-scala_2.13-2.12.3 jakarta.validation-api-2.0.2 javassist-3.27.0-GA -jetty-client-9.4.38.v20210224 -jetty-continuation-9.4.38.v20210224 -jetty-http-9.4.38.v20210224 -jetty-io-9.4.38.v20210224 -jetty-security-9.4.38.v20210224 -jetty-server-9.4.38.v20210224 -jetty-servlet-9.4.38.v20210224 -jetty-servlets-9.4.38.v20210224 -jetty-util-9.4.38.v20210224 -jetty-util-ajax-9.4.38.v20210224 -jersey-common-2.31 -jersey-server-2.31 +jetty-client-9.4.43.v20210629 +jetty-continuation-9.4.43.v20210629 +jetty-http-9.4.43.v20210629 +jetty-io-9.4.43.v20210629 +jetty-security-9.4.43.v20210629 +jetty-server-9.4.43.v20210629 +jetty-servlet-9.4.43.v20210629 +jetty-servlets-9.4.43.v20210629 +jetty-util-9.4.43.v20210629 +jetty-util-ajax-9.4.43.v20210629 +jersey-common-2.34 +jersey-server-2.34 log4j-1.2.17 lz4-java-1.7.1 -maven-artifact-3.6.3 -metrics-core-2.2.0 -netty-buffer-4.1.59.Final -netty-codec-4.1.59.Final -netty-common-4.1.59.Final -netty-handler-4.1.59.Final -netty-resolver-4.1.59.Final -netty-transport-4.1.59.Final -netty-transport-native-epoll-4.1.59.Final -netty-transport-native-unix-common-4.1.59.Final +maven-artifact-3.8.1 +metrics-core-4.1.12.1 Review comment: This is a pretty big jump. How risky is this? ## File path: LICENSE-binary ## @@ -208,52 +208,52 @@ License Version 2.0: audience-annotations-0.5.0 commons-cli-1.4 commons-lang3-3.8.1 -jackson-annotations-2.10.5 -jackson-core-2.10.5 -jackson-databind-2.10.5.1 -jackson-dataformat-csv-2.10.5 -jackson-datatype-jdk8-2.10.5 -jackson-jaxrs-base-2.10.5 -jackson-jaxrs-json-provider-2.10.5 -jackson-module-jaxb-annotations-2.10.5 +jackson-annotations-2.12.3 +jackson-core-2.12.3 +jackson-databind-2.12.3 +jackson-dataformat-csv-2.12.3 +jackson-datatype-jdk8-2.12.3 +jackson-jaxrs-base-2.12.3 +jackson-jaxrs-json-provider-2.12.3 +jackson-module-jaxb-annotations-2.12.3 jackson-module-paranamer-2.10.5 -jackson-module-scala_2.13-2.10.5 +jackson-module-scala_2.13-2.12.3 jakarta.validation-api-2.0.2 javassist-3.27.0-GA -jetty-client-9.4.38.v20210224 -jetty-continuation-9.4.38.v20210224 -jetty-http-9.4.38.v20210224 -jetty-io-9.4.38.v20210224 -jetty-security-9.4.38.v20210224 -jetty-server-9.4.38.v20210224 -jetty-servlet-9.4.38.v20210224 -jetty-servlets-9.4.38.v20210224 -jetty-util-9.4.38.v20210224 -jetty-util-ajax-9.4.38.v20210224 -jersey-common-2.31 -jersey-server-2.31 +jetty-client-9.4.43.v20210629 +jetty-continuation-9.4.43.v20210629 +jetty-http-9.4.43.v20210629 +jetty-io-9.4.43.v20210629 +jetty-security-9.4.43.v20210629 +jetty-server-9.4.43.v20210629 +jetty-servlet-9.4.43.v20210629 +jetty-servlets-9.4.43.v20210629 +jetty-util-9.4.43.v20210629 +jetty-util-ajax-9.4.43.v20210629 +jersey-common-2.34 +jersey-server-2.34 log4j-1.2.17 lz4-java-1.7.1 -maven-artifact-3.6.3 -metrics-core-2.2.0 -netty-buffer-4.1.59.Final -netty-codec-4.1.59.Final -netty-common-4.1.59.Final -netty-handler-4.1.59.Final -netty-resolver-4.1.59.Final -netty-transport-4.1.59.Final -netty-transport-native-epoll-4.1.59.Final -netty-transport-native-unix-common-4.1.59.Final +maven-artifact-3.8.1 +metrics-core-4.1.12.1 +netty-buffer-4.1.62.Final +netty-codec-4.1.62.Final +netty-common-4.1.62.Final +netty-handler-4.1.62.Final +netty-resolver-4.1.62.Final +netty-transport-4.1.62.Final +netty-transport-native-epoll-4.1.62.Final +netty-transport-native-unix-common-4.1.62.Final plexus-utils-3.2.1 -rocksdbjni-5.18.4 -scala-collection-compat_2.13-2.3.0 +rocksdbjni-6.19.3 Review comment: This is also a major bump. How would you characterize the risk? Maybe @mjsax could weigh in. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #11225: MINOR; Small optimizations in `ReplicaManager#becomeLeaderOrFollower`
hachikuji commented on a change in pull request #11225: URL: https://github.com/apache/kafka/pull/11225#discussion_r691706079 ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ## @@ -1436,17 +1438,15 @@ class ReplicaManager(val config: KafkaConfig, leaderAndIsrRequest.partitionStates.forEach { partitionState => Review comment: This logic is a bit strange. It sounds like we are trying to handle the case where we fail to create the log after we have already created the partition. Would it make more sense to handle this in `getOrCreatePartition` if an exception is raised? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #11225: MINOR; Small optimizations in `ReplicaManager#becomeLeaderOrFollower`
hachikuji commented on a change in pull request #11225: URL: https://github.com/apache/kafka/pull/11225#discussion_r691703790 ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ## @@ -1436,17 +1438,15 @@ class ReplicaManager(val config: KafkaConfig, leaderAndIsrRequest.partitionStates.forEach { partitionState => val topicPartition = new TopicPartition(partitionState.topicName, partitionState.partitionIndex) -/* - * If there is offline log directory, a Partition object may have been created by getOrCreatePartition() - * before getOrCreateReplica() failed to create local replica due to KafkaStorageException. - * In this case ReplicaManager.allPartitions will map this topic-partition to an empty Partition object. - * we need to map this topic-partition to OfflinePartition instead. - */ +// If there is offline log directory, a Partition object may have been created by getOrCreatePartition() Review comment: nit: If there is _an_ offline log directory? ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ## @@ -1436,17 +1438,15 @@ class ReplicaManager(val config: KafkaConfig, leaderAndIsrRequest.partitionStates.forEach { partitionState => Review comment: This logic is a bit strange. It sounds like we need are trying to handle the case where we fail to create the log after we have already created the partition. Would it make more sense to handle this in `getOrCreatePartition` if an exception is raised? ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ## @@ -1436,17 +1438,15 @@ class ReplicaManager(val config: KafkaConfig, leaderAndIsrRequest.partitionStates.forEach { partitionState => val topicPartition = new TopicPartition(partitionState.topicName, partitionState.partitionIndex) -/* - * If there is offline log directory, a Partition object may have been created by getOrCreatePartition() - * before getOrCreateReplica() failed to create local replica due to KafkaStorageException. - * In this case ReplicaManager.allPartitions will map this topic-partition to an empty Partition object. - * we need to map this topic-partition to OfflinePartition instead. - */ +// If there is offline log directory, a Partition object may have been created by getOrCreatePartition() +// before getOrCreateReplica() failed to create local replica due to KafkaStorageException. Review comment: Hmm, `getOrCreateReplica` no longer exists. Maybe `createLogIfNotExists` is the replacement? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13216) Streams left/outer joins cause new internal changelog topic to grow unbounded
[ https://issues.apache.org/jira/browse/KAFKA-13216?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-13216: Fix Version/s: 3.0.0 > Streams left/outer joins cause new internal changelog topic to grow unbounded > - > > Key: KAFKA-13216 > URL: https://issues.apache.org/jira/browse/KAFKA-13216 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.0.0 >Reporter: Sergio Peña >Priority: Blocker > Fix For: 3.0.0 > > > This bug is caused by the improvements made in > https://issues.apache.org/jira/browse/KAFKA-10847, which fixes an issue with > stream-stream left/outer joins. The issue is only caused when a stream-stream > left/outer join is used with the new `JoinWindows.ofTimeDifferenceAndGrace()` > API that specifies the window time + grace period. This new API was added in > AK 3.0. No previous users are affected. > The issue causes that the internal changelog topic used by the new > OUTERSHARED window store keeps growing unbounded as new records come. The > topic is never cleaned up nor compacted even if tombstones are written to > delete the joined and/or expired records from the window store. The problem > is caused by a parameter required in the window store to retain duplicates. > This config causes that tombstones records have a new sequence ID as part of > the key ID in the changelog making those keys unique. Thus causing the > cleanup policy not working. > In 3.0, we deprecated {{JoinWindows.of(size)}} in favor of > {{JoinWindows.ofTimeDifferenceAndGrace()}} -- the old API uses the old > semantics and is thus not affected while the new API enable the new > semantics; the problem is that we deprecated the old API and thus tell users > that they should switch to the new broken API. > We have two ways forward: > * Fix the bug (non trivial) > * Un-deprecate the old {{JoinWindow.of(size)}} API (and tell users not to > use the new but broken API) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12994) Migrate all Tests to New API and Remove Suppression for Deprecation Warnings related to KIP-633
[ https://issues.apache.org/jira/browse/KAFKA-12994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17401410#comment-17401410 ] A. Sophie Blee-Goldman commented on KAFKA-12994: Hey guys, thanks for picking this up! The way you've approached this so far makes sense to me: separate branches/PRs which each tackle a different test class (to avoid ending up in merge conflict hell ;)). You can each take two and from there it's up to you whether you want to do one class per PR or just knock out both in one go. I can be the reviewer for this, and [~iekpo] if he has time. I just got back from PTO so I should be able to take a look this upcoming week. > Migrate all Tests to New API and Remove Suppression for Deprecation Warnings > related to KIP-633 > --- > > Key: KAFKA-12994 > URL: https://issues.apache.org/jira/browse/KAFKA-12994 > Project: Kafka > Issue Type: Improvement > Components: streams, unit tests >Affects Versions: 3.0.0 >Reporter: Israel Ekpo >Assignee: Andrew patterson >Priority: Major > Labels: kip-633, newbie, newbie++ > Fix For: 3.1.0 > > > Due to the API changes for KIP-633 a lot of deprecation warnings have been > generated in tests that are using the old deprecated APIs. There are a lot of > tests using the deprecated methods. We should absolutely migrate them all to > the new APIs and then get rid of all the applicable annotations for > suppressing the deprecation warnings. > The applies to all Java and Scala examples and tests using the deprecated > APIs in the JoinWindows, SessionWindows, TimeWindows and SlidingWindows > classes. > > This is based on the feedback from reviewers in this PR > > https://github.com/apache/kafka/pull/10926 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] kkonstantine opened a new pull request #11232: MINOR: Update package versions in LICENSE-binary for 3.0
kkonstantine opened a new pull request #11232: URL: https://github.com/apache/kafka/pull/11232 One new dependency was missing a license entry [jline](https://github.com/jline/jline3) The rest of the changes correspond to updated package versions. No functional changes in the code ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #11229: KAFKA-12961; Verify group generation in `DelayedJoin`
hachikuji commented on a change in pull request #11229: URL: https://github.com/apache/kafka/pull/11229#discussion_r691694091 ## File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala ## @@ -1391,78 +1392,92 @@ class GroupCoordinator(val brokerId: Int, } } - def tryCompleteJoin(group: GroupMetadata, forceComplete: () => Boolean): Boolean = { + def tryCompleteJoin( +group: GroupMetadata, +generationId: Int, +forceComplete: () => Boolean + ): Boolean = { group.inLock { - if (group.hasAllMembersJoined) + if (generationId != group.generationId) { +forceComplete() + } else if (group.hasAllMembersJoined) { forceComplete() - else false + } else false } } - def onCompleteJoin(group: GroupMetadata): Unit = { + def onCompleteJoin( +group: GroupMetadata, +generationId: Int + ): Unit = { group.inLock { - val notYetRejoinedDynamicMembers = group.notYetRejoinedMembers.filterNot(_._2.isStaticMember) - if (notYetRejoinedDynamicMembers.nonEmpty) { -info(s"Group ${group.groupId} removed dynamic members " + - s"who haven't joined: ${notYetRejoinedDynamicMembers.keySet}") - -notYetRejoinedDynamicMembers.values.foreach { failedMember => - group.remove(failedMember.memberId) - removeHeartbeatForLeavingMember(group, failedMember.memberId) -} - } - - if (group.is(Dead)) { -info(s"Group ${group.groupId} is dead, skipping rebalance stage") - } else if (!group.maybeElectNewJoinedLeader() && group.allMembers.nonEmpty) { -// If all members are not rejoining, we will postpone the completion -// of rebalance preparing stage, and send out another delayed operation -// until session timeout removes all the non-responsive members. -error(s"Group ${group.groupId} could not complete rebalance because no members rejoined") -rebalancePurgatory.tryCompleteElseWatch( - new DelayedJoin(this, group, group.rebalanceTimeoutMs), - Seq(GroupJoinKey(group.groupId))) + if (generationId != group.generationId) { +error(s"Received unexpected notification of join complete for ${group.groupId} " + + s"with an old generation $generationId while the group has ${group.generationId}.") } else { -group.initNextGeneration() -if (group.is(Empty)) { - info(s"Group ${group.groupId} with generation ${group.generationId} is now empty " + - s"(${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)})") +val notYetRejoinedDynamicMembers = group.notYetRejoinedMembers.filterNot(_._2.isStaticMember) Review comment: It looks like this branch is all of the existing logic, but thought I'd check if there are any changes to look at. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman merged pull request #11217: KAFKA-13204: assignor name conflict check
ableegoldman merged pull request #11217: URL: https://github.com/apache/kafka/pull/11217 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #11217: KAFKA-13204: assignor name conflict check
ableegoldman commented on pull request #11217: URL: https://github.com/apache/kafka/pull/11217#issuecomment-901516021 Merged to trunk -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #11217: KAFKA-13204: assignor name conflict check
ableegoldman commented on pull request #11217: URL: https://github.com/apache/kafka/pull/11217#issuecomment-901515775 Test failure is unrelated: `StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12550) Introduce RESTORING state to the KafkaStreams FSM
[ https://issues.apache.org/jira/browse/KAFKA-12550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17401408#comment-17401408 ] A. Sophie Blee-Goldman commented on KAFKA-12550: Yeah, the RESTORING would be an additional KafkaStreams state that exists in parallel to the existing REBALANCING state. imo REBALANCING should still take precedence over RESTORING, ie as long as at least one thread is going through a rebalance then the overall state should be REBALANCING. And if no threads are rebalancing but at least one is still restoring, then the overall state is RESTORING. And so on. I also think we should consider breaking away from the rebalance callbacks since the thread states don't really make sense to couple with these callbacks anymore since cooperative rebalancing was introduced. Before that, PARTITIONS_REVOKED always indicated the beginning of a rebalance, and PARTITIONS_ASSIGNED the end. But now with cooperative rebalancing, you may never invoke #onPartitionsRevoked to begin with, so it's actually possible for Streams to stay in RUNNING for the duration of the actual rebalance and then only when the rebalance ends do the threads transition to PARTITIONS_ASSIGNED and the overall state to REBALANCING. It's also a bit confusing since PARTITIONS_ASSIGNED is supposed to indicate the end of a rebalance, but if a followup rebalance is immediately triggered and the consumer must rejoin, then it may actually still be rebalancing even after entering PARTITIONS_ASSIGNED. The whole thing makes less and less sense. So, I'd propose to also clean up the StreamThread FSM at the same time by removing the PARTITIONS_ASSIGNED/PARTITIONS_REVOKED states and replacing them with the equivalent REBALANCING/RESTORING. As the names suggest, when the thread first rejoins the group (ie sends the Subscription for the rebalance) then it will transition to REBALANCING. At the end of the rebalance, when it receives its Assignment, it then transitions to RESTORING. That way it's always clear what the thread is doing, and if a followup rebalance is ever triggered then it will automatically transition back to the appropriate state, ie REBALANCING. Does that make sense? Unfortunately we'll now need to wait for another major release, since changing the FSM is a breaking change. But it would probably be a good idea to at least start the KIP now and get it accepted so that we can be ready when 4.0 comes along > Introduce RESTORING state to the KafkaStreams FSM > - > > Key: KAFKA-12550 > URL: https://issues.apache.org/jira/browse/KAFKA-12550 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: Sagar Rao >Priority: Major > Labels: needs-kip > Fix For: 4.0.0 > > > We should consider adding a new state to the KafkaStreams FSM: RESTORING > This would cover the time between the completion of a stable rebalance and > the completion of restoration across the client. Currently, Streams will > report the state during this time as REBALANCING even though it is generally > spending much more time restoring than rebalancing in most cases. > There are a few motivations/benefits behind this idea: > # Observability is a big one: using the umbrella REBALANCING state to cover > all aspects of rebalancing -> task initialization -> restoring has been a > common source of confusion in the past. It’s also proved to be a time sink > for us, during escalations, incidents, mailing list questions, and bug > reports. It often adds latency to escalations in particular as we have to go > through GTS and wait for the customer to clarify whether their “Kafka Streams > is stuck rebalancing” ticket means that it’s literally rebalancing, or just > in the REBALANCING state and actually stuck elsewhere in Streams > # Prereq for global thread improvements: for example [KIP-406: > GlobalStreamThread should honor custom reset policy > |https://cwiki.apache.org/confluence/display/KAFKA/KIP-406%3A+GlobalStreamThread+should+honor+custom+reset+policy] > was ultimately blocked on this as we needed to pause the Streams app while > the global thread restored from the appropriate offset. Since there’s > absolutely no rebalancing involved in this case, piggybacking on the > REBALANCING state would just be shooting ourselves in the foot. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman merged pull request #11068: KAFKA-13081: detect doubly assigned parition (for v2.8)
ableegoldman merged pull request #11068: URL: https://github.com/apache/kafka/pull/11068 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #11124: KAFKA-12839: Let SlidingWindows aggregation support window size of 0
ableegoldman commented on a change in pull request #11124: URL: https://github.com/apache/kafka/pull/11124#discussion_r691646220 ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java ## @@ -100,17 +99,78 @@ private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String()); private final String threadId = Thread.currentThread().getName(); +private final String topic = "topic"; +private final String defaultInOrderName = "InOrder"; +private final String defaultReverseName = "Reverse"; +private final long defaultWindowSize = 10L; +private final long defaultRetentionPeriod = 5000L; + +private WindowBytesStoreSupplier getStoreSupplier(final boolean inOrderIterator, + final String inOrderName, + final String reverseName, + final long windowSize) { +return inOrderIterator +? new InOrderMemoryWindowStoreSupplier(inOrderName, defaultRetentionPeriod, windowSize, false) +: Stores.inMemoryWindowStore(reverseName, ofMillis(defaultRetentionPeriod), ofMillis(windowSize), false); +} + +@SuppressWarnings("unchecked") +@Test +public void testAggregateSmallInputWithZeroTimeDifference() { +final StreamsBuilder builder = new StreamsBuilder(); + +// We use TimeWindow to represent the "windowed KTable" internally, so, the window size must be greater than 0 here +final WindowBytesStoreSupplier storeSupplier = getStoreSupplier(inOrderIterator, defaultInOrderName, defaultReverseName, 1L); Review comment: Seems ok to me to wait and fix alongside other issues like KIP-300 in a "new and improved" DSL (or whatever we do there). If users start to complain and request a fix sooner then we can re-evaluate, but it's not like this was a user-reported bug to begin with. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13216) Streams left/outer joins cause new internal changelog topic to grow unbounded
Sergio Peña created KAFKA-13216: --- Summary: Streams left/outer joins cause new internal changelog topic to grow unbounded Key: KAFKA-13216 URL: https://issues.apache.org/jira/browse/KAFKA-13216 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 3.0.0 Reporter: Sergio Peña This bug is caused by the improvements made in https://issues.apache.org/jira/browse/KAFKA-10847, which fixes an issue with stream-stream left/outer joins. The issue is only caused when a stream-stream left/outer join is used with the new `JoinWindows.ofTimeDifferenceAndGrace()` API that specifies the window time + grace period. This new API was added in AK 3.0. No previous users are affected. The issue causes that the internal changelog topic used by the new OUTERSHARED window store keeps growing unbounded as new records come. The topic is never cleaned up nor compacted even if tombstones are written to delete the joined and/or expired records from the window store. The problem is caused by a parameter required in the window store to retain duplicates. This config causes that tombstones records have a new sequence ID as part of the key ID in the changelog making those keys unique. Thus causing the cleanup policy not working. In 3.0, we deprecated {{JoinWindows.of(size)}} in favor of {{JoinWindows.ofTimeDifferenceAndGrace()}} -- the old API uses the old semantics and is thus not affected while the new API enable the new semantics; the problem is that we deprecated the old API and thus tell users that they should switch to the new broken API. We have two ways forward: * Fix the bug (non trivial) * Un-deprecate the old {{JoinWindow.of(size)}} API (and tell users not to use the new but broken API) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] guozhangwang commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams
guozhangwang commented on a change in pull request #11149: URL: https://github.com/apache/kafka/pull/11149#discussion_r691613659 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java ## @@ -1121,4 +1125,60 @@ public void shouldResetTransactionInitializedOnResetProducer() { verify(mockedProducer); } +@Test +public void shouldComputeTotalBlockedTime() { +setProducerMetrics(nonEosMockProducer, 1, 2, 3, 4, 5, 6, 7); + +final double expectedTotalBlocked = 1 + 2 + 3 + 4 + 5 + 6 + 7; +assertThat(nonEosStreamsProducer.totalBlockedTime(), equalTo(expectedTotalBlocked)); +} + +@Test +public void shouldComputeTotalBlockedTimeAfterReset() { +setProducerMetrics(eosBetaMockProducer, 1, 2, 3, 4, 5, 6, 7); +eosBetaStreamsProducer.resetProducer(); Review comment: +1 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13152) Replace "buffered.records.per.partition" with "input.buffer.max.bytes"
[ https://issues.apache.org/jira/browse/KAFKA-13152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17401318#comment-17401318 ] Guozhang Wang commented on KAFKA-13152: --- I think it's a good idea to continue our discussion on the KIP. I'm not strongly suggest that we do one option over the other, and maybe I could be over-thinking trying to get to where the task -> thread mapping is no longer static, which would not happen yet. > Replace "buffered.records.per.partition" with "input.buffer.max.bytes" > --- > > Key: KAFKA-13152 > URL: https://issues.apache.org/jira/browse/KAFKA-13152 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Sagar Rao >Priority: Major > Labels: needs-kip > > The current config "buffered.records.per.partition" controls how many records > in maximum to bookkeep, and hence it is exceed we would pause fetching from > this partition. However this config has two issues: > * It's a per-partition config, so the total memory consumed is dependent on > the dynamic number of partitions assigned. > * Record size could vary from case to case. > And hence it's hard to bound the memory usage for this buffering. We should > consider deprecating that config with a global, e.g. "input.buffer.max.bytes" > which controls how much bytes in total is allowed to be buffered. This is > doable since we buffer the raw records in . -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji commented on a change in pull request #11231: KAFKA-13214; Consumer should not reset state after retriable error in rebalance
hachikuji commented on a change in pull request #11231: URL: https://github.com/apache/kafka/pull/11231#discussion_r691522435 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ## @@ -476,7 +476,6 @@ boolean joinGroupIfNeeded(final Timer timer) { else if (!future.isRetriable()) throw exception; -resetStateAndRejoin(String.format("rebalance failed with retriable error %s", exception)); Review comment: My reasoning here is the following. First, there is no reason to reset the memberId/generation if it is a retriable error. For most transient errors, our session timeout will not have expired, so it is better to rejoin with the same memberId. This ensures that we will not have an extra delay while the old memberId gets expired. On the other hand, if the session timeout _did_ expire, then our next `JoinGroup` will fail immediately with `UNKNOWN_MEMBER_ID`, which will cause our state to be reset in the `JoinGroupResponseHandler`. Second, there should be no need to reset the `rejoinNeeded` flag. Once we begin a rebalance, this will remain set to `true` until the `SyncGroup` completes successfully which consequently causes the future to be completed successfully. Hence there's no way that I can see for `rejoinNeeded` to be `false` if the future has failed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #11231: KAFKA-13214; Consumer should not reset state after retriable error in rebalance
hachikuji commented on a change in pull request #11231: URL: https://github.com/apache/kafka/pull/11231#discussion_r691522435 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ## @@ -476,7 +476,6 @@ boolean joinGroupIfNeeded(final Timer timer) { else if (!future.isRetriable()) throw exception; -resetStateAndRejoin(String.format("rebalance failed with retriable error %s", exception)); Review comment: My reasoning here is the following. First, there is no reason to reset the memberId/generation if it is a retriable error. For most transient errors, our session timeout will not have expired, so it is better to rejoin with the same memberId. This ensures that we will not have an extra delay while the old memberId gets expired. On the other hand, if the session timeout _did_ expire, then our next `JoinGroup` will fail with `UNKNOWN_MEMBER_ID`, which will cause our state to be reset in the `JoinGroupResponseHandler`. Second, there should be no need to reset the `rejoinNeeded` flag. Once we begin a rebalance, this will remain set to `true` until the `SyncGroup` completes successfully which consequently causes the future to be completed successfully. Hence there's no way that I can see for `rejoinNeeded` to be `false` if the future has failed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13152) Replace "buffered.records.per.partition" with "input.buffer.max.bytes"
[ https://issues.apache.org/jira/browse/KAFKA-13152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17401302#comment-17401302 ] Matthias J. Sax commented on KAFKA-13152: - [~guozhang] – your proposal to only pause partitions with non-empty buffers SGTM; about fairness, I was not sure if we can/should rely on the consumer, but if you think it's the right way to go, I am ok with it. Just wanted to raise the question to ensure that we make a conscious decision. [~sagarrao] – I think we should take the discussion into the KIP? It seem the scope is clear now, and we have two proposal: Dividing the given buffer size across thread (or maybe even task etc) or follow Guozhang's proposal. It seems your concern is similar to my concern about fairness. Guozhang pointed out the we should achieve fairness within a thread (due to consumer round robin fetching) but I guess your point is a good one, that it's unclear if we achieve fairness across threads? [~guozhang] WDYT about this? In the end the question seems to be, if we can/should try to keep it simple vs. how complex we want to design the algorithm. Personally, I am afraid of pre-mature optimization and think keeping it simple might be the better way to get started. It might be best if you start to work on a KIP, and explain pros/cons of both approaches, and put one into "rejected alternatives" sections and we can discuss on the mailing list? > Replace "buffered.records.per.partition" with "input.buffer.max.bytes" > --- > > Key: KAFKA-13152 > URL: https://issues.apache.org/jira/browse/KAFKA-13152 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Sagar Rao >Priority: Major > Labels: needs-kip > > The current config "buffered.records.per.partition" controls how many records > in maximum to bookkeep, and hence it is exceed we would pause fetching from > this partition. However this config has two issues: > * It's a per-partition config, so the total memory consumed is dependent on > the dynamic number of partitions assigned. > * Record size could vary from case to case. > And hence it's hard to bound the memory usage for this buffering. We should > consider deprecating that config with a global, e.g. "input.buffer.max.bytes" > which controls how much bytes in total is allowed to be buffered. This is > doable since we buffer the raw records in . -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji opened a new pull request #11231: KAFKA-13214; Consumer should not reset state after retriable error in rebalance
hachikuji opened a new pull request #11231: URL: https://github.com/apache/kafka/pull/11231 Currently the consumer will reset state after any retriable error during a rebalance. This includes coordinator disconnects as well as coordinator changes. The impact of this is that rebalances get delayed since they will be blocked until the session timeout of the old memberId expires. The patch here fixes the problem by not resetting the member state after a retriable error. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lbradstreet commented on a change in pull request #11230: KAFKA-12840; Removing `compact` cleaning on a topic should abort on-going compactions
lbradstreet commented on a change in pull request #11230: URL: https://github.com/apache/kafka/pull/11230#discussion_r691487441 ## File path: core/src/main/scala/kafka/server/ConfigHandler.scala ## @@ -66,8 +66,13 @@ class TopicConfigHandler(private val logManager: LogManager, kafkaConfig: KafkaC topicConfig.asScala.forKeyValue { (key, value) => if (!configNamesToExclude.contains(key)) props.put(key, value) } - val logConfig = LogConfig.fromProps(logManager.currentDefaultConfig.originals, props) - logs.foreach(_.updateConfig(logConfig)) + val newLogConfig = LogConfig.fromProps(logManager.currentDefaultConfig.originals, props) Review comment: That does seem better. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12713) Report "REAL" follower/consumer fetch latency
[ https://issues.apache.org/jira/browse/KAFKA-12713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17401270#comment-17401270 ] Kai Huang commented on KAFKA-12713: --- [~ijuma] Thanks for the pointer. I've bumped up the [discussion thread|https://lists.apache.org/thread.html/r261915b64c819129bc6017adaa12e8f9a0feb74c24ba331f4a08f30c%40%3Cdev.kafka.apache.org%3E] again. > Report "REAL" follower/consumer fetch latency > - > > Key: KAFKA-12713 > URL: https://issues.apache.org/jira/browse/KAFKA-12713 > Project: Kafka > Issue Type: Bug >Reporter: Ming Liu >Assignee: Kai Huang >Priority: Major > > The fetch latency is an important metrics to monitor for the cluster > performance. With ACK=ALL, the produce latency is affected primarily by > broker fetch latency. > However, currently the reported fetch latency didn't reflect the true fetch > latency because it sometimes need to stay in purgatory and wait for > replica.fetch.wait.max.ms when data is not available. This greatly affect the > real P50, P99 etc. > I like to propose a KIP to be able track the real fetch latency for both > broker follower and consumer. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-13214) Consumer should not reset group state after disconnect
[ https://issues.apache.org/jira/browse/KAFKA-13214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson reassigned KAFKA-13214: --- Assignee: Jason Gustafson > Consumer should not reset group state after disconnect > -- > > Key: KAFKA-13214 > URL: https://issues.apache.org/jira/browse/KAFKA-13214 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.7.0, 2.8.0 >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Critical > > When the consumer disconnects from the coordinator while a rebalance is in > progress, we currently reset the memberId and generation. The coordinator > then must await the session timeout in order to expire the old memberId. This > was apparently a regression from > https://github.com/apache/kafka/commit/7e7bb184d2abe34280a7f0eb0f0d9fc0e32389f2#diff-15efe9b844f78b686393b6c2e2ad61306c3473225742caed05c7edab9a138832R478. > It would be better to keep the memberId/generation. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jsancio commented on a change in pull request #11216: KAFKA-13198: Stop replicas when reassigned
jsancio commented on a change in pull request #11216: URL: https://github.com/apache/kafka/pull/11216#discussion_r690620725 ## File path: core/src/main/scala/kafka/server/BrokerServer.scala ## @@ -113,7 +113,7 @@ class BrokerServer( var dynamicConfigHandlers: Map[String, ConfigHandler] = null - var replicaManager: ReplicaManager = null + @volatile private[this] var _replicaManager: ReplicaManager = null Review comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao merged pull request #11216: KAFKA-13198: Stop replicas when reassigned
junrao merged pull request #11216: URL: https://github.com/apache/kafka/pull/11216 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yangdaixai closed pull request #11185: KAFKA-13175; Better indicates for created topic is marked for deleti…
yangdaixai closed pull request #11185: URL: https://github.com/apache/kafka/pull/11185 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #11073: KAFKA-13092: Perf regression in LISR requests
dajac commented on pull request #11073: URL: https://github.com/apache/kafka/pull/11073#issuecomment-901015825 @jolshan is it something that we must add to 2.8.1? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jomach commented on pull request #9493: KAFKA-10640: Add recursive support to Connect Cast and ReplaceField transforms, and support for casting complex types to either a native or JSO
jomach commented on pull request #9493: URL: https://github.com/apache/kafka/pull/9493#issuecomment-901121979 seems to be some conflicts. @joshuagrisham can you fix them please ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on pull request #11221: KAFKA-13207: Skip truncation on fetch response with diverging epoch if partition removed from fetcher
rajinisivaram commented on pull request #11221: URL: https://github.com/apache/kafka/pull/11221#issuecomment-900433010 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #11227: KAFKA-13211: add support for infinite range query for WindowStore
showuon commented on a change in pull request #11227: URL: https://github.com/apache/kafka/pull/11227#discussion_r691052026 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java ## @@ -124,8 +124,8 @@ final List searchSpace = keySchema.segmentsToSearch(segments, from, to, forward); -final Bytes binaryFrom = keySchema.lowerRange(keyFrom, from); -final Bytes binaryTo = keySchema.upperRange(keyTo, to); +final Bytes binaryFrom = keyFrom == null ? null : keySchema.lowerRange(keyFrom, from); +final Bytes binaryTo = keyTo == null ? null : keySchema.upperRange(keyTo, to); Review comment: We already support `null` `binaryFrom` and `binaryTo` already. ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java ## @@ -573,12 +573,14 @@ private void setCacheKeyRange(final long lowerRangeEndTime, final long upperRang throw new IllegalStateException("Error iterating over segments: segment interval has changed"); } -if (keyFrom.equals(keyTo)) { +if (keyFrom != null && keyTo != null && keyFrom.equals(keyTo)) { cacheKeyFrom = cacheFunction.cacheKey(segmentLowerRangeFixedSize(keyFrom, lowerRangeEndTime)); cacheKeyTo = cacheFunction.cacheKey(segmentUpperRangeFixedSize(keyTo, upperRangeEndTime)); } else { -cacheKeyFrom = cacheFunction.cacheKey(keySchema.lowerRange(keyFrom, lowerRangeEndTime), currentSegmentId); -cacheKeyTo = cacheFunction.cacheKey(keySchema.upperRange(keyTo, timeTo), currentSegmentId); +cacheKeyFrom = keyFrom == null ? null : +cacheFunction.cacheKey(keySchema.lowerRange(keyFrom, lowerRangeEndTime), currentSegmentId); +cacheKeyTo = keyTo == null ? null : +cacheFunction.cacheKey(keySchema.upperRange(keyTo, timeTo), currentSegmentId); Review comment: The `null` `cacheKeyFrom` and `cacheKeyTo` will use `range` query, which is already supported in KIP-763. ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java ## @@ -74,7 +74,7 @@ public boolean hasNext() { close(); currentSegment = segments.next(); try { -if (from == null || to == null) { +if (from == null && to == null) { if (forward) { currentIterator = currentSegment.all(); Review comment: side fix: the `all` case should be `from == null && to == null`. Otherwise, call `range` method, which is already supported null range query in KIP-763 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #11213: KAFKA-13201: Convert KTable aggregations to new PAPI
vvcephei commented on a change in pull request #11213: URL: https://github.com/apache/kafka/pull/11213#discussion_r690642563 ## File path: streams/src/test/java/org/apache/kafka/test/MockInternalNewProcessorContext.java ## @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.test; + +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.processor.MockProcessorContext; +import org.apache.kafka.streams.processor.StateRestoreCallback; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.To; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.api.RecordMetadata; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.ProcessorNode; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.streams.processor.internals.RecordCollector; +import org.apache.kafka.streams.processor.internals.StreamTask; +import org.apache.kafka.streams.processor.internals.Task.TaskType; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.internals.ThreadCache; +import org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListener; + +import java.io.File; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; + +public class MockInternalNewProcessorContext extends MockProcessorContext implements InternalProcessorContext { Review comment: It looks like this is copied and modified from https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java . Is that right? We also have https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java , and it's been a longstanding thorn in our side that there's a proliferation of these context implementations. I'm hoping that by the time we're done with all these migrations, we can actually converge on this new class and delete the other two. ## File path: streams/src/test/java/org/apache/kafka/test/MockInternalNewProcessorContext.java ## @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.test; + +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.processor.MockProcessorContext; Review comment: This is the one defined in `test-utils`, right? Should we be using the new PAPI one (https://github.com/apache/kafka/blob/trunk/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java) instead? ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java ## @@ -150,16 +153,16 @@ public void init(final org.apache.kafka.streams.processor.ProcessorContext conte } @Override -public void process(final K key, final Change value) { +public void process(final Record> record) { observedStreamTime = Math.max(observedStreamTime, internalProcessorContext.timestamp()); -buffer(key, value); +buffer(record);
[GitHub] [kafka] jlprat commented on pull request #11228: KAFKA-12887 Skip some RuntimeExceptions from exception handler
jlprat commented on pull request #11228: URL: https://github.com/apache/kafka/pull/11228#issuecomment-901030641 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Justinwins commented on a change in pull request #10652: KAFKA-9726 IdentityReplicationPolicy
Justinwins commented on a change in pull request #10652: URL: https://github.com/apache/kafka/pull/10652#discussion_r684042834 ## File path: docs/upgrade.html ## @@ -83,7 +83,13 @@ Notable changes in 3 understood by brokers or version 2.5 or higher, so you must upgrade your kafka cluster to get the stronger semantics. Otherwise, you can just pass in new ConsumerGroupMetadata(consumerGroupId) to work with older brokers. See https://cwiki.apache.org/confluence/x/zJONCg;>KIP-732 for more details. - + The Connect-based MirrorMaker (MM2) includes changes to support IdentityReplicationPolicy, enabling replication without renaming topics. +The existing DefaultReplicationPolicy is still used by default, but identity replication can be enabled via the +replication.policy configuration property. This is especially useful for users migrating from the older MirrorMaker (MM1), or for Review comment: i think it's more clear to say "replication.policy.class " here ,you know , which means it's configured in that form in mm2.properties file. Friendly to beginners . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #11225: MINOR; Small optimizations in `ReplicaManager#becomeLeaderOrFollower`
dajac commented on a change in pull request #11225: URL: https://github.com/apache/kafka/pull/11225#discussion_r691041467 ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ## @@ -1436,17 +1438,15 @@ class ReplicaManager(val config: KafkaConfig, leaderAndIsrRequest.partitionStates.forEach { partitionState => Review comment: I wonder if we really have to re-iterate over all the partition states present in the request here. Intuitively, I would have thought that considering only the ones in `partitionStates` would be sufficient. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #11221: KAFKA-13207: Skip truncation on fetch response with diverging epoch if partition removed from fetcher
dajac commented on pull request #11221: URL: https://github.com/apache/kafka/pull/11221#issuecomment-900541736 @rajinisivaram Should we also add it to 2.8 branch? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #11216: KAFKA-13198: Stop replicas when reassigned
junrao commented on a change in pull request #11216: URL: https://github.com/apache/kafka/pull/11216#discussion_r690492221 ## File path: core/src/main/scala/kafka/server/BrokerServer.scala ## @@ -113,7 +113,7 @@ class BrokerServer( var dynamicConfigHandlers: Map[String, ConfigHandler] = null - var replicaManager: ReplicaManager = null + @volatile private[this] var _replicaManager: ReplicaManager = null Review comment: Could you make the same change for KafkaServer too? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang merged pull request #11184: KAFKA-13172: Add downgrade guidance note for 3.0
guozhangwang merged pull request #11184: URL: https://github.com/apache/kafka/pull/11184 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lbradstreet commented on a change in pull request #11230: KAFKA-12840; Removing `compact` cleaning on a topic should abort on-going compactions
lbradstreet commented on a change in pull request #11230: URL: https://github.com/apache/kafka/pull/11230#discussion_r691368381 ## File path: core/src/main/scala/kafka/server/ConfigHandler.scala ## @@ -66,8 +66,13 @@ class TopicConfigHandler(private val logManager: LogManager, kafkaConfig: KafkaC topicConfig.asScala.forKeyValue { (key, value) => if (!configNamesToExclude.contains(key)) props.put(key, value) } - val logConfig = LogConfig.fromProps(logManager.currentDefaultConfig.originals, props) - logs.foreach(_.updateConfig(logConfig)) + val newLogConfig = LogConfig.fromProps(logManager.currentDefaultConfig.originals, props) Review comment: I assume we're doing this here because Log doesn't have access to the log manager? Would it be better to have the comparison logic in the LogManager where it'll be more visible, e.g. maybeAbortCleaning(oldConfig, newConfig)? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #11184: KAFKA-13172: Add downgrade guidance note for 3.0
guozhangwang commented on a change in pull request #11184: URL: https://github.com/apache/kafka/pull/11184#discussion_r690792216 ## File path: docs/streams/upgrade-guide.html ## @@ -52,6 +52,15 @@ Upgrade Guide and API Changes restart all new ({{fullDotVersion}}) application instances + +Since 3.0.0 release, Kafka Streams uses a newer RocksDB version which bumped its footer version persisted on files. +This means that old versioned RocksDB would not be able to recognize the bytes written by that newer versioned RocksDB, +and hence it is harder to downgrade Kafka Streams with version 3.0.0 or newer to older versions in-flight. +Users need to wipe out the local RocksDB state stores written by the new versioned Kafka Streams before swapping in the +older versioned Kafka Streams bytecode, which when then restore the state stores with the old versioned footer from the Review comment: ack. ## File path: docs/streams/upgrade-guide.html ## @@ -52,6 +52,15 @@ Upgrade Guide and API Changes restart all new ({{fullDotVersion}}) application instances + +Since 3.0.0 release, Kafka Streams uses a newer RocksDB version which bumped its footer version persisted on files. Review comment: Ack. ## File path: docs/streams/upgrade-guide.html ## @@ -52,6 +52,15 @@ Upgrade Guide and API Changes restart all new ({{fullDotVersion}}) application instances + +Since 3.0.0 release, Kafka Streams uses a newer RocksDB version which bumped its footer version persisted on files. +This means that old versioned RocksDB would not be able to recognize the bytes written by that newer versioned RocksDB, Review comment: Ack. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram merged pull request #11221: KAFKA-13207: Skip truncation on fetch response with diverging epoch if partition removed from fetcher
rajinisivaram merged pull request #11221: URL: https://github.com/apache/kafka/pull/11221 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #11073: KAFKA-13092: Perf regression in LISR requests
jolshan commented on pull request #11073: URL: https://github.com/apache/kafka/pull/11073#issuecomment-901220052 Ah yes. I had a feeling I still had a pr open for this. I think I can clean it up a bit and it should be ready for review by tomorrow. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #11184: KAFKA-13172: Add downgrade guidance note for 3.0
guozhangwang commented on pull request #11184: URL: https://github.com/apache/kafka/pull/11184#issuecomment-900710127 Cherry-picked to 3.0 to be included in the 3.0 docs cc @kkonstantine -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams
cadonna commented on a change in pull request #11149: URL: https://github.com/apache/kafka/pull/11149#discussion_r691098389 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsThreadTotalBlockedTime.java ## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.processor.internals; + +import java.util.Map; +import java.util.function.Supplier; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; + +public class StreamsThreadTotalBlockedTime { +final Consumer consumer; +final Consumer restoreConsumer; +final Supplier producerTotalBlockedTime; + +StreamsThreadTotalBlockedTime( +final Consumer consumer, +final Consumer restoreConsumer, +final Supplier producerTotalBlockedTime) { +this.consumer = consumer; +this.restoreConsumer = restoreConsumer; +this.producerTotalBlockedTime = producerTotalBlockedTime; +} + +final double getMetricValue( Review comment: Could you please make this method private? Out of curiosity, why did you define this method as `final`? ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java ## @@ -310,6 +317,28 @@ public static Sensor commitRatioSensor(final String threadId, return sensor; } +public static void addThreadStartTimeMetric(final String threadId, Review comment: Could you please add tests in `ThreadMetricsTest`? A similar test is `ClientMetricsTest#shouldAddVersionMetric()`. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsThreadTotalBlockedTime.java ## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.processor.internals; + +import java.util.Map; +import java.util.function.Supplier; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; + +public class StreamsThreadTotalBlockedTime { +final Consumer consumer; +final Consumer restoreConsumer; +final Supplier producerTotalBlockedTime; Review comment: Could you please make this member fields private? ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java ## @@ -310,6 +317,28 @@ public static Sensor commitRatioSensor(final String threadId, return sensor; } +public static void addThreadStartTimeMetric(final String threadId, +final StreamsMetricsImpl streamsMetrics, +final long startTime) { +streamsMetrics.addThreadLevelImmutableMetric( +THREAD_START_TIME, +THREAD_START_TIME_DESCRIPTION, +threadId, +startTime +); +} + +public static void addThreadBlockedTimeMetric(final String threadId, Review comment: Could you please add tests in `ThreadMetricsTest`? A similar test is `ClientMetricsTest#shouldAddStateMetric()` ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java ## @@ -1121,4 +1125,60 @@ public void
[jira] [Updated] (KAFKA-13215) Flaky test org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectEndOffsetInformation
[ https://issues.apache.org/jira/browse/KAFKA-13215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Konstantine Karantasis updated KAFKA-13215: --- Description: Integration test {{test org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectCommittedOffsetInformation()}} sometimes fails with {code:java} java.lang.AssertionError: only one task at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26) at org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.getTaskMetadata(TaskMetadataIntegrationTest.java:163) at org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectEndOffsetInformation(TaskMetadataIntegrationTest.java:144) {code} was: Integration test {{test org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectCommittedOffsetInformation()}} sometimes fails with {code:java} java.lang.AssertionError: only one task at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26) at org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.getTaskMetadata(TaskMetadataIntegrationTest.java:162) at org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectCommittedOffsetInformation(TaskMetadataIntegrationTest.java:117) {code} > Flaky test > org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectEndOffsetInformation > --- > > Key: KAFKA-13215 > URL: https://issues.apache.org/jira/browse/KAFKA-13215 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Konstantine Karantasis >Priority: Major > Labels: flaky-test > Fix For: 3.1.0 > > > Integration test {{test > org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectCommittedOffsetInformation()}} > sometimes fails with > {code:java} > java.lang.AssertionError: only one task > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26) > at > org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.getTaskMetadata(TaskMetadataIntegrationTest.java:163) > at > org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectEndOffsetInformation(TaskMetadataIntegrationTest.java:144) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] omkreddy merged pull request #11224: KAFKA-13209: Upgrade jetty-server to fix CVE-2021-34429
omkreddy merged pull request #11224: URL: https://github.com/apache/kafka/pull/11224 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path
dajac commented on a change in pull request #11170: URL: https://github.com/apache/kafka/pull/11170#discussion_r690993461 ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ## @@ -1755,6 +1765,78 @@ class ReplicaManager(val config: KafkaConfig, partitionsToMakeFollower } + private def updateTopicIdForFollowers(controllerId: Int, Review comment: Have we considered relying on `makeFollowers` instead of introducing `updateTopicIdForFollowers`? The two methods are really similar. The main notable difference is that `makeFollowers` shutdowns the fetcher thread. We could perhaps optimize this part to not do it if the broker is already a follower for the partition. ## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ## @@ -854,6 +859,10 @@ case class PartitionFetchState(fetchOffset: Long, s", delay=${delay.map(_.delayMs).getOrElse(0)}ms" + s")" } + + def updateTopicId(topicId: Option[Uuid]): PartitionFetchState = { +PartitionFetchState(topicId, fetchOffset, lag, currentLeaderEpoch, delay, state, lastFetchedEpoch) Review comment: nit: We could use `this.copy(topicId = topicId`). ## File path: core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala ## @@ -262,7 +263,8 @@ class ReplicaAlterLogDirsThread(name: String, private def buildFetchForPartition(tp: TopicPartition, fetchState: PartitionFetchState): ResultWithPartitions[Option[ReplicaFetch]] = { val requestMap = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData] val partitionsWithError = mutable.Set[TopicPartition]() -val topicIds = replicaMgr.metadataCache.topicNamesToIds() +val topicId = fetchState.topicId +val topicIds = Collections.singletonMap(tp.topic(), topicId.getOrElse(Uuid.ZERO_UUID)) Review comment: nit: I would bring this one closer to `requestBuilder` as it is only used by the request builder or we could even inline it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-13215) Flaky test org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectEndOffsetInformation
[ https://issues.apache.org/jira/browse/KAFKA-13215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Konstantine Karantasis reassigned KAFKA-13215: -- Assignee: (was: Walker Carlson) > Flaky test > org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectEndOffsetInformation > --- > > Key: KAFKA-13215 > URL: https://issues.apache.org/jira/browse/KAFKA-13215 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Konstantine Karantasis >Priority: Major > Labels: flaky-test > Fix For: 3.1.0 > > > Integration test {{test > org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectCommittedOffsetInformation()}} > sometimes fails with > {code:java} > java.lang.AssertionError: only one task > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26) > at > org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.getTaskMetadata(TaskMetadataIntegrationTest.java:162) > at > org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectCommittedOffsetInformation(TaskMetadataIntegrationTest.java:117) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13215) Flaky test org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectEndOffsetInformation
Konstantine Karantasis created KAFKA-13215: -- Summary: Flaky test org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectEndOffsetInformation Key: KAFKA-13215 URL: https://issues.apache.org/jira/browse/KAFKA-13215 Project: Kafka Issue Type: Bug Components: streams Reporter: Bruno Cadonna Assignee: Walker Carlson Fix For: 3.1.0 Integration test {{test org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectCommittedOffsetInformation()}} sometimes fails with {code:java} java.lang.AssertionError: only one task at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26) at org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.getTaskMetadata(TaskMetadataIntegrationTest.java:162) at org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectCommittedOffsetInformation(TaskMetadataIntegrationTest.java:117) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13215) Flaky test org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectEndOffsetInformation
[ https://issues.apache.org/jira/browse/KAFKA-13215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Konstantine Karantasis updated KAFKA-13215: --- Reporter: Konstantine Karantasis (was: Bruno Cadonna) > Flaky test > org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectEndOffsetInformation > --- > > Key: KAFKA-13215 > URL: https://issues.apache.org/jira/browse/KAFKA-13215 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Konstantine Karantasis >Assignee: Walker Carlson >Priority: Major > Labels: flaky-test > Fix For: 3.1.0 > > > Integration test {{test > org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectCommittedOffsetInformation()}} > sometimes fails with > {code:java} > java.lang.AssertionError: only one task > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26) > at > org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.getTaskMetadata(TaskMetadataIntegrationTest.java:162) > at > org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectCommittedOffsetInformation(TaskMetadataIntegrationTest.java:117) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] rondagostino commented on pull request #6233: KAFKA-7902: Replace original loginContext if SASL/OAUTHBEARER refresh login fails
rondagostino commented on pull request #6233: URL: https://github.com/apache/kafka/pull/6233#issuecomment-900456795 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path
jolshan commented on a change in pull request #11170: URL: https://github.com/apache/kafka/pull/11170#discussion_r691366479 ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ## @@ -1755,6 +1765,78 @@ class ReplicaManager(val config: KafkaConfig, partitionsToMakeFollower } + private def updateTopicIdForFollowers(controllerId: Int, Review comment: I talked to Jason about this and there were a few issues with the makeFollowers path. Namely `updateAssignmentAndIsr` in the `makeFollower` method. This can override a pending isr state and we do not want this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] venkatesh010 commented on pull request #6233: KAFKA-7902: Replace original loginContext if SASL/OAUTHBEARER refresh login fails
venkatesh010 commented on pull request #6233: URL: https://github.com/apache/kafka/pull/6233#issuecomment-900514731 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #11173: MINOR: Support max timestamp in GetOffsetShell
dajac commented on a change in pull request #11173: URL: https://github.com/apache/kafka/pull/11173#discussion_r691375264 ## File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala ## @@ -224,9 +227,14 @@ object GetOffsetShell { /** * Return the partition infos. Filter them with topicPartitionFilter. */ - private def listPartitionInfos(consumer: KafkaConsumer[_, _], topicPartitionFilter: PartitionInfo => Boolean): Seq[PartitionInfo] = { -consumer.listTopics.asScala.values.flatMap { partitions => - partitions.asScala.filter(topicPartitionFilter) + private def listPartitionInfos(client: Admin, topicPartitionFilter: PartitionInfo => Boolean, listInternal: Boolean): Seq[PartitionInfo] = { +val topics = client.listTopics(new ListTopicsOptions().listInternal(listInternal)).names().get().asScala Review comment: I agree with your suggestion. It would be great if we could filter out the list of topics before describing them. Is it complicated? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #11230: KAFKA-12840; Removing `compact` cleaning on a topic should abort on-going compactions
dajac commented on a change in pull request #11230: URL: https://github.com/apache/kafka/pull/11230#discussion_r691396770 ## File path: core/src/main/scala/kafka/server/ConfigHandler.scala ## @@ -66,8 +66,13 @@ class TopicConfigHandler(private val logManager: LogManager, kafkaConfig: KafkaC topicConfig.asScala.forKeyValue { (key, value) => if (!configNamesToExclude.contains(key)) props.put(key, value) } - val logConfig = LogConfig.fromProps(logManager.currentDefaultConfig.originals, props) - logs.foreach(_.updateConfig(logConfig)) + val newLogConfig = LogConfig.fromProps(logManager.currentDefaultConfig.originals, props) Review comment: That is correct. Yeah, that's a good suggestion. I will do this. ## File path: core/src/main/scala/kafka/server/ConfigHandler.scala ## @@ -66,8 +66,13 @@ class TopicConfigHandler(private val logManager: LogManager, kafkaConfig: KafkaC topicConfig.asScala.forKeyValue { (key, value) => if (!configNamesToExclude.contains(key)) props.put(key, value) } - val logConfig = LogConfig.fromProps(logManager.currentDefaultConfig.originals, props) - logs.foreach(_.updateConfig(logConfig)) + val newLogConfig = LogConfig.fromProps(logManager.currentDefaultConfig.originals, props) Review comment: Another way would be to move `updateLogConfig` in the LogManager. It would make sense to delegate the whole logic to it. What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13214) Consumer should not reset group state after disconnect
Jason Gustafson created KAFKA-13214: --- Summary: Consumer should not reset group state after disconnect Key: KAFKA-13214 URL: https://issues.apache.org/jira/browse/KAFKA-13214 Project: Kafka Issue Type: Bug Affects Versions: 2.8.0, 2.7.0 Reporter: Jason Gustafson When the consumer disconnects from the coordinator while a rebalance is in progress, we currently reset the memberId and generation. The coordinator then must await the session timeout in order to expire the old memberId. This was apparently a regression from https://github.com/apache/kafka/commit/7e7bb184d2abe34280a7f0eb0f0d9fc0e32389f2#diff-15efe9b844f78b686393b6c2e2ad61306c3473225742caed05c7edab9a138832R478. It would be better to keep the memberId/generation. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dajac commented on a change in pull request #11230: KAFKA-12840; Removing `compact` cleaning on a topic should abort on-going compactions
dajac commented on a change in pull request #11230: URL: https://github.com/apache/kafka/pull/11230#discussion_r691400434 ## File path: core/src/main/scala/kafka/server/ConfigHandler.scala ## @@ -66,8 +66,13 @@ class TopicConfigHandler(private val logManager: LogManager, kafkaConfig: KafkaC topicConfig.asScala.forKeyValue { (key, value) => if (!configNamesToExclude.contains(key)) props.put(key, value) } - val logConfig = LogConfig.fromProps(logManager.currentDefaultConfig.originals, props) - logs.foreach(_.updateConfig(logConfig)) + val newLogConfig = LogConfig.fromProps(logManager.currentDefaultConfig.originals, props) Review comment: Another way would be to move `updateLogConfig` in the LogManager. It would make sense to delegate the whole logic to it. What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #11230: KAFKA-12840; Removing `compact` cleaning on a topic should abort on-going compactions
dajac commented on a change in pull request #11230: URL: https://github.com/apache/kafka/pull/11230#discussion_r691396770 ## File path: core/src/main/scala/kafka/server/ConfigHandler.scala ## @@ -66,8 +66,13 @@ class TopicConfigHandler(private val logManager: LogManager, kafkaConfig: KafkaC topicConfig.asScala.forKeyValue { (key, value) => if (!configNamesToExclude.contains(key)) props.put(key, value) } - val logConfig = LogConfig.fromProps(logManager.currentDefaultConfig.originals, props) - logs.foreach(_.updateConfig(logConfig)) + val newLogConfig = LogConfig.fromProps(logManager.currentDefaultConfig.originals, props) Review comment: That is correct. Yeah, that's a good suggestion. I will do this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #11173: MINOR: Support max timestamp in GetOffsetShell
dajac commented on a change in pull request #11173: URL: https://github.com/apache/kafka/pull/11173#discussion_r691375264 ## File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala ## @@ -224,9 +227,14 @@ object GetOffsetShell { /** * Return the partition infos. Filter them with topicPartitionFilter. */ - private def listPartitionInfos(consumer: KafkaConsumer[_, _], topicPartitionFilter: PartitionInfo => Boolean): Seq[PartitionInfo] = { -consumer.listTopics.asScala.values.flatMap { partitions => - partitions.asScala.filter(topicPartitionFilter) + private def listPartitionInfos(client: Admin, topicPartitionFilter: PartitionInfo => Boolean, listInternal: Boolean): Seq[PartitionInfo] = { +val topics = client.listTopics(new ListTopicsOptions().listInternal(listInternal)).names().get().asScala Review comment: I agree with your suggestion. It would be great if we could filter out the list of topics before describing them. Is it complicated? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #11073: KAFKA-13092: Perf regression in LISR requests
jolshan commented on pull request #11073: URL: https://github.com/apache/kafka/pull/11073#issuecomment-901220052 Ah yes. I had a feeling I still had a pr open for this. I think I can clean it up a bit and it should be ready for review by tomorrow. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lbradstreet commented on a change in pull request #11230: KAFKA-12840; Removing `compact` cleaning on a topic should abort on-going compactions
lbradstreet commented on a change in pull request #11230: URL: https://github.com/apache/kafka/pull/11230#discussion_r691368381 ## File path: core/src/main/scala/kafka/server/ConfigHandler.scala ## @@ -66,8 +66,13 @@ class TopicConfigHandler(private val logManager: LogManager, kafkaConfig: KafkaC topicConfig.asScala.forKeyValue { (key, value) => if (!configNamesToExclude.contains(key)) props.put(key, value) } - val logConfig = LogConfig.fromProps(logManager.currentDefaultConfig.originals, props) - logs.foreach(_.updateConfig(logConfig)) + val newLogConfig = LogConfig.fromProps(logManager.currentDefaultConfig.originals, props) Review comment: I assume we're doing this here because Log doesn't have access to the log manager? Would it be better to have the comparison logic in the LogManager where it'll be more visible, e.g. maybeAbortCleaning(oldConfig, newConfig)? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path
jolshan commented on a change in pull request #11170: URL: https://github.com/apache/kafka/pull/11170#discussion_r691366479 ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ## @@ -1755,6 +1765,78 @@ class ReplicaManager(val config: KafkaConfig, partitionsToMakeFollower } + private def updateTopicIdForFollowers(controllerId: Int, Review comment: I talked to Jason about this and there were a few issues with the makeFollowers path. Namely `updateAssignmentAndIsr` in the `makeFollower` method. This can override a pending isr state and we do not want this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac opened a new pull request #11230: KAFKA-12840; Removing `compact` cleaning on a topic should abort on-going compactions
dajac opened a new pull request #11230: URL: https://github.com/apache/kafka/pull/11230 When `compact` is removed from the `cleanup.policy` of a topic, the compactions of that topic should be aborted. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams
cadonna commented on a change in pull request #11149: URL: https://github.com/apache/kafka/pull/11149#discussion_r691098389 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsThreadTotalBlockedTime.java ## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.processor.internals; + +import java.util.Map; +import java.util.function.Supplier; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; + +public class StreamsThreadTotalBlockedTime { +final Consumer consumer; +final Consumer restoreConsumer; +final Supplier producerTotalBlockedTime; + +StreamsThreadTotalBlockedTime( +final Consumer consumer, +final Consumer restoreConsumer, +final Supplier producerTotalBlockedTime) { +this.consumer = consumer; +this.restoreConsumer = restoreConsumer; +this.producerTotalBlockedTime = producerTotalBlockedTime; +} + +final double getMetricValue( Review comment: Could you please make this method private? Out of curiosity, why did you define this method as `final`? ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java ## @@ -310,6 +317,28 @@ public static Sensor commitRatioSensor(final String threadId, return sensor; } +public static void addThreadStartTimeMetric(final String threadId, Review comment: Could you please add tests in `ThreadMetricsTest`? A similar test is `ClientMetricsTest#shouldAddVersionMetric()`. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsThreadTotalBlockedTime.java ## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.processor.internals; + +import java.util.Map; +import java.util.function.Supplier; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; + +public class StreamsThreadTotalBlockedTime { +final Consumer consumer; +final Consumer restoreConsumer; +final Supplier producerTotalBlockedTime; Review comment: Could you please make this member fields private? ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java ## @@ -310,6 +317,28 @@ public static Sensor commitRatioSensor(final String threadId, return sensor; } +public static void addThreadStartTimeMetric(final String threadId, +final StreamsMetricsImpl streamsMetrics, +final long startTime) { +streamsMetrics.addThreadLevelImmutableMetric( +THREAD_START_TIME, +THREAD_START_TIME_DESCRIPTION, +threadId, +startTime +); +} + +public static void addThreadBlockedTimeMetric(final String threadId, Review comment: Could you please add tests in `ThreadMetricsTest`? A similar test is `ClientMetricsTest#shouldAddStateMetric()` ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java ## @@ -1121,4 +1125,60 @@ public void
[GitHub] [kafka] jlprat commented on pull request #11228: KAFKA-12887 Skip some RuntimeExceptions from exception handler
jlprat commented on pull request #11228: URL: https://github.com/apache/kafka/pull/11228#issuecomment-901123276 Test failure was https://issues.apache.org/jira/browse/KAFKA-8529 which was apparently resolved. I reopened the issue. I rebased this PR to the trunk of a couple of hours ago. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jomach commented on pull request #9493: KAFKA-10640: Add recursive support to Connect Cast and ReplaceField transforms, and support for casting complex types to either a native or JSO
jomach commented on pull request #9493: URL: https://github.com/apache/kafka/pull/9493#issuecomment-901121979 seems to be some conflicts. @joshuagrisham can you fix them please ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-8529) Flakey test ConsumerBounceTest#testCloseDuringRebalance
[ https://issues.apache.org/jira/browse/KAFKA-8529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17401062#comment-17401062 ] Josep Prat commented on KAFKA-8529: --- I'll reopen this issue as it happened again on a fresh PR: https://github.com/apache/kafka/pull/11228 https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-11228/2/testReport/junit/kafka.api/ConsumerBounceTest/Build___JDK_8_and_Scala_2_12___testCloseDuringRebalance__/ > Flakey test ConsumerBounceTest#testCloseDuringRebalance > --- > > Key: KAFKA-8529 > URL: https://issues.apache.org/jira/browse/KAFKA-8529 > Project: Kafka > Issue Type: Bug > Components: consumer >Reporter: Boyang Chen >Priority: Major > > [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/5450/consoleFull] > > *16:16:10* kafka.api.ConsumerBounceTest > testCloseDuringRebalance > STARTED*16:16:22* kafka.api.ConsumerBounceTest.testCloseDuringRebalance > failed, log available in > /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.12/core/build/reports/testOutput/kafka.api.ConsumerBounceTest.testCloseDuringRebalance.test.stdout*16:16:22* > *16:16:22* kafka.api.ConsumerBounceTest > testCloseDuringRebalance > FAILED*16:16:22* java.lang.AssertionError: Rebalance did not complete in > time*16:16:22* at org.junit.Assert.fail(Assert.java:89)*16:16:22* > at org.junit.Assert.assertTrue(Assert.java:42)*16:16:22* at > kafka.api.ConsumerBounceTest.waitForRebalance$1(ConsumerBounceTest.scala:402)*16:16:22* > at > kafka.api.ConsumerBounceTest.checkCloseDuringRebalance(ConsumerBounceTest.scala:416)*16:16:22* > at > kafka.api.ConsumerBounceTest.testCloseDuringRebalance(ConsumerBounceTest.scala:379) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Reopened] (KAFKA-8529) Flakey test ConsumerBounceTest#testCloseDuringRebalance
[ https://issues.apache.org/jira/browse/KAFKA-8529?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josep Prat reopened KAFKA-8529: --- > Flakey test ConsumerBounceTest#testCloseDuringRebalance > --- > > Key: KAFKA-8529 > URL: https://issues.apache.org/jira/browse/KAFKA-8529 > Project: Kafka > Issue Type: Bug > Components: consumer >Reporter: Boyang Chen >Priority: Major > > [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/5450/consoleFull] > > *16:16:10* kafka.api.ConsumerBounceTest > testCloseDuringRebalance > STARTED*16:16:22* kafka.api.ConsumerBounceTest.testCloseDuringRebalance > failed, log available in > /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.12/core/build/reports/testOutput/kafka.api.ConsumerBounceTest.testCloseDuringRebalance.test.stdout*16:16:22* > *16:16:22* kafka.api.ConsumerBounceTest > testCloseDuringRebalance > FAILED*16:16:22* java.lang.AssertionError: Rebalance did not complete in > time*16:16:22* at org.junit.Assert.fail(Assert.java:89)*16:16:22* > at org.junit.Assert.assertTrue(Assert.java:42)*16:16:22* at > kafka.api.ConsumerBounceTest.waitForRebalance$1(ConsumerBounceTest.scala:402)*16:16:22* > at > kafka.api.ConsumerBounceTest.checkCloseDuringRebalance(ConsumerBounceTest.scala:416)*16:16:22* > at > kafka.api.ConsumerBounceTest.testCloseDuringRebalance(ConsumerBounceTest.scala:379) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8295) Add merge() operator to State Stores.
[ https://issues.apache.org/jira/browse/KAFKA-8295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17401043#comment-17401043 ] Sagar Rao commented on KAFKA-8295: -- I see. Is there a KIP for list append types for windowed joins? I would be happy to assist there :) > Add merge() operator to State Stores. > - > > Key: KAFKA-8295 > URL: https://issues.apache.org/jira/browse/KAFKA-8295 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: Sagar Rao >Priority: Major > > In addition to regular put/get/delete RocksDB provides a fourth operation, > merge. This essentially provides an optimized read/update/write path in a > single operation. One of the built-in (C++) merge operators exposed over the > Java API is a counter. We should be able to leverage this for a more > efficient implementation of count() > > (Note: Unfortunately it seems unlikely we can use this to optimize general > aggregations, even if RocksJava allowed for a custom merge operator, unless > we provide a way for the user to specify and connect a C++ implemented > aggregator – otherwise we incur too much cost crossing the jni for a net > performance benefit) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13152) Replace "buffered.records.per.partition" with "input.buffer.max.bytes"
[ https://issues.apache.org/jira/browse/KAFKA-13152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17401034#comment-17401034 ] Sagar Rao commented on KAFKA-13152: --- [~mjsax], [~guozhang] I had a slightly different thought whereby we can provision it as a global setting and then distribute it, similar to how `cache.max.bytes.buffering` works atm. So, assuming we have T Threads and C is the max buffer size, each thread gets C/T bytes. Once the individual StreamThreads have been assigned their respective shares of bytes, then we can look at the total bytes consumed across tasks for that thread and if it exceeds it's share, we can pause the partitions. We can go 1 step further here and further assign each task it's share of bytes(by dividing equally) and finally bytes per partition(similar to the current per partition records config but considering bytes by further equal division) but that's just extra complexity so can be ignored. I think, where assigning C/T bytes among threads might be beneficial as compared to the option suggested by Guozhang here: *instead we just monitor the aggregated total bytes across all tasks within the instance, when it has been reached* is that, let's say there are more than 1 Stream Threads in an instance, and only one of them is exceeding the bounds individually, but because we are looking at the overall bytes count across all tasks, the other Threads would also end up paying the penalty and be paused. If the users provision the config properly, they should be able to pause only relevant tasks and not all. What do you think? Regarding pausing the partitions, i think, it makes sense to pause only those partitions that have some data as you both had discussed for simplicity, Maybe, we can look at heuristics like if there's only one partition which is accounting for say X % of bytes or pick the one with most bytes and pause only those. That might make it more complicated, but lead towards pausing only relevant partitions which is what `buffered.records.per.partition` is able to achieve. > Replace "buffered.records.per.partition" with "input.buffer.max.bytes" > --- > > Key: KAFKA-13152 > URL: https://issues.apache.org/jira/browse/KAFKA-13152 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Sagar Rao >Priority: Major > Labels: needs-kip > > The current config "buffered.records.per.partition" controls how many records > in maximum to bookkeep, and hence it is exceed we would pause fetching from > this partition. However this config has two issues: > * It's a per-partition config, so the total memory consumed is dependent on > the dynamic number of partitions assigned. > * Record size could vary from case to case. > And hence it's hard to bound the memory usage for this buffering. We should > consider deprecating that config with a global, e.g. "input.buffer.max.bytes" > which controls how much bytes in total is allowed to be buffered. This is > doable since we buffer the raw records in . -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13213) StreamThread: State transition from PARTITIONS_ASSIGNED to PARTITIONS_ASSIGNED
Patrik Kleindl created KAFKA-13213: -- Summary: StreamThread: State transition from PARTITIONS_ASSIGNED to PARTITIONS_ASSIGNED Key: KAFKA-13213 URL: https://issues.apache.org/jira/browse/KAFKA-13213 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 2.6.0 Reporter: Patrik Kleindl One of our streaming apps sometimes enters a strange looping behaviour. It is deployed on 2 pods in a kubernetes cluster, but only one of them shows this: {code:java} 2021-08-18 11:27:20,402 [app-53da371b-98fb-4c73-ad08-5b93b9c0d1d4-StreamThread-1] INFO o.a.k.s.p.i.StreamThread - stream-thread [app-53da371b-98fb-4c73-ad08-5b93b9c0d1d4-StreamThread-1] State transition from PARTITIONS_ASSIGNED to PARTITIONS_ASSIGNED2021-08-18 11:27:20,402 [app-53da371b-98fb-4c73-ad08-5b93b9c0d1d4-StreamThread-1] INFO o.a.k.s.p.i.StreamThread - stream-thread [app-53da371b-98fb-4c73-ad08-5b93b9c0d1d4-StreamThread-1] State transition from PARTITIONS_ASSIGNED to PARTITIONS_ASSIGNED2021-08-18 11:37:23,516 [app-53da371b-98fb-4c73-ad08-5b93b9c0d1d4-StreamThread-1] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=app-53da371b-98fb-4c73-ad08-5b93b9c0d1d4-StreamThread-1-consumer, groupId=app] Attempt to heartbeat failed since group is rebalancing2021-08-18 11:37:23,518 [app-53da371b-98fb-4c73-ad08-5b93b9c0d1d4-StreamThread-1] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=app-53da371b-98fb-4c73-ad08-5b93b9c0d1d4-StreamThread-1-consumer, groupId=app] (Re-)joining group2021-08-18 11:37:23,676 [app-53da371b-98fb-4c73-ad08-5b93b9c0d1d4-StreamThread-1] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=app-53da371b-98fb-4c73-ad08-5b93b9c0d1d4-StreamThread-1-consumer, groupId=app] Successfully joined group with generation 135...2021-08-18 11:37:23,678 [app-53da371b-98fb-4c73-ad08-5b93b9c0d1d4-StreamThread-1] INFO o.a.k.s.p.i.TaskManager - stream-thread [app-53da371b-98fb-4c73-ad08-5b93b9c0d1d4-StreamThread-1] Handle new assignment with: New active tasks: [0_0, 0_1, 0_2, 0_3, 0_4, 3_1, 0_5, 0_6, 3_3, 0_7, 0_8, 3_5, 0_9, 0_10, 3_7, 0_11, 3_9, 3_11] New standby tasks: [1_3, 1_1] Existing active tasks: [0_0, 0_1, 0_2, 0_3, 0_4, 3_1, 0_5, 0_6, 3_3, 0_7, 0_8, 3_5, 0_9, 0_10, 3_7, 0_11, 3_9, 3_11] Existing standby tasks: [1_1, 1_3]2021-08-18 11:37:23,678 [app-53da371b-98fb-4c73-ad08-5b93b9c0d1d4-StreamThread-1] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=app-53da371b-98fb-4c73-ad08-5b93b9c0d1d4-StreamThread-1-consumer, groupId=app] Adding newly assigned partitions: 2021-08-18 11:37:23,679 [app-53da371b-98fb-4c73-ad08-5b93b9c0d1d4-StreamThread-1] INFO o.a.k.s.p.i.StreamThread - stream-thread [app-53da371b-98fb-4c73-ad08-5b93b9c0d1d4-StreamThread-1] State transition from PARTITIONS_ASSIGNED to PARTITIONS_ASSIGNED2021-08-18 11:47:26,768 [app-53da371b-98fb-4c73-ad08-5b93b9c0d1d4-StreamThread-1] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=app-53da371b-98fb-4c73-ad08-5b93b9c0d1d4-StreamThread-1-consumer, groupId=app] Attempt to heartbeat failed since group is rebalancing{code} The other one shows: {code:java} 2021-08-18 11:37:23,710 [app-1c2d16b0-7456-409d-89c5-792026f08647-StreamThread-1] INFO o.a.k.s.p.i.StreamsPartitionAssignor - stream-thread [app-1c2d16b0-7456-409d-89c5-792026f08647-StreamThread-1-consumer] Requested to schedule probing rebalance for 1629280043706 ms.2021-08-18 11:37:23,710 [app-1c2d16b0-7456-409d-89c5-792026f08647-StreamThread-1] INFO o.a.k.s.p.i.StreamsPartitionAssignor - stream-thread [app-1c2d16b0-7456-409d-89c5-792026f08647-StreamThread-1-consumer] Requested to schedule probing rebalance for 1629280043706 ms.2021-08-18 11:37:23,710 [app-1c2d16b0-7456-409d-89c5-792026f08647-StreamThread-1] INFO o.a.k.s.p.i.TaskManager - stream-thread [app-1c2d16b0-7456-409d-89c5-792026f08647-StreamThread-1] Handle new assignment with: New active tasks: [1_0, 2_0, 1_1, 2_1, 3_0, 1_2, 2_2, 1_3, 2_3, 3_2, 1_4, 2_4, 1_5, 2_5, 3_4, 1_6, 2_6, 1_7, 2_7, 3_6, 1_8, 2_8, 1_9, 2_9, 3_8, 1_10, 2_10, 1_11, 2_11, 3_10] New standby tasks: [] Existing active tasks: [1_0, 1_1, 2_0, 1_2, 2_1, 3_0, 1_3, 2_2, 1_4, 2_3, 3_2, 1_5, 2_4, 1_6, 2_5, 3_4, 1_7, 2_6, 1_8, 2_7, 3_6, 1_9, 2_8, 1_10, 2_9, 3_8, 1_11, 2_10, 2_11, 3_10] Existing standby tasks: []2021-08-18 11:37:23,711 [app-1c2d16b0-7456-409d-89c5-792026f08647-StreamThread-1] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=app-1c2d16b0-7456-409d-89c5-792026f08647-StreamThread-1-consumer, groupId=app] Adding newly assigned partitions: 2021-08-18 11:37:23,711
[GitHub] [kafka] dajac opened a new pull request #11229: KAFKA-12961; Verify group generation in `DelayedJoin`
dajac opened a new pull request #11229: URL: https://github.com/apache/kafka/pull/11229 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat commented on pull request #11228: KAFKA-12887 Skip some RuntimeExceptions from exception handler
jlprat commented on pull request #11228: URL: https://github.com/apache/kafka/pull/11228#issuecomment-901030641 cc. @guozhangwang Feel free to review this PR and let me know if this is what you had in mind with https://issues.apache.org/jira/browse/KAFKA-12887 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat opened a new pull request #11228: KAFKA-12887 Skip some RuntimeExceptions from exception handler
jlprat opened a new pull request #11228: URL: https://github.com/apache/kafka/pull/11228 Instead of letting all `RuntimeException`s go through and be processed by the uncaught exception handler, `IllegalStateException` and `IllegalArgumentException` are not passed through and fail fast. Added test checking this new case. For the test I use an existing test as a baseline that was checking the uncaught exception handler was called and checked that it wasn't called for this new particular case. Possible extensions would be to add more types of `RuntimeException`s on the new `catch` clause. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #11073: KAFKA-13092: Perf regression in LISR requests
dajac commented on pull request #11073: URL: https://github.com/apache/kafka/pull/11073#issuecomment-901015825 @jolshan is it something that we must add to 2.8.1? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #11227: KAFKA-13211: add support for infinite range query for WindowStore
showuon commented on a change in pull request #11227: URL: https://github.com/apache/kafka/pull/11227#discussion_r691055719 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java ## @@ -74,7 +74,7 @@ public boolean hasNext() { close(); currentSegment = segments.next(); try { -if (from == null || to == null) { +if (from == null && to == null) { if (forward) { currentIterator = currentSegment.all(); Review comment: side fix: the `all` case should be `from == null && to == null`. Otherwise, call `range` method, which is already supported null range query in KIP-763 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #11227: KAFKA-13211: add support for infinite range query for WindowStore
showuon commented on a change in pull request #11227: URL: https://github.com/apache/kafka/pull/11227#discussion_r691053240 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java ## @@ -573,12 +573,14 @@ private void setCacheKeyRange(final long lowerRangeEndTime, final long upperRang throw new IllegalStateException("Error iterating over segments: segment interval has changed"); } -if (keyFrom.equals(keyTo)) { +if (keyFrom != null && keyTo != null && keyFrom.equals(keyTo)) { cacheKeyFrom = cacheFunction.cacheKey(segmentLowerRangeFixedSize(keyFrom, lowerRangeEndTime)); cacheKeyTo = cacheFunction.cacheKey(segmentUpperRangeFixedSize(keyTo, upperRangeEndTime)); } else { -cacheKeyFrom = cacheFunction.cacheKey(keySchema.lowerRange(keyFrom, lowerRangeEndTime), currentSegmentId); -cacheKeyTo = cacheFunction.cacheKey(keySchema.upperRange(keyTo, timeTo), currentSegmentId); +cacheKeyFrom = keyFrom == null ? null : +cacheFunction.cacheKey(keySchema.lowerRange(keyFrom, lowerRangeEndTime), currentSegmentId); +cacheKeyTo = keyTo == null ? null : +cacheFunction.cacheKey(keySchema.upperRange(keyTo, timeTo), currentSegmentId); Review comment: The `null` `cacheKeyFrom` and `cacheKeyTo` will use `range` query, which is already supported in KIP-763. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #11227: KAFKA-13211: add support for infinite range query for WindowStore
showuon commented on a change in pull request #11227: URL: https://github.com/apache/kafka/pull/11227#discussion_r691052026 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java ## @@ -124,8 +124,8 @@ final List searchSpace = keySchema.segmentsToSearch(segments, from, to, forward); -final Bytes binaryFrom = keySchema.lowerRange(keyFrom, from); -final Bytes binaryTo = keySchema.upperRange(keyTo, to); +final Bytes binaryFrom = keyFrom == null ? null : keySchema.lowerRange(keyFrom, from); +final Bytes binaryTo = keyTo == null ? null : keySchema.upperRange(keyTo, to); Review comment: We already support `null` `binaryFrom` and `binaryTo` already. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon opened a new pull request #11227: KAFKA-13211: add support for infinite range query for WindowStore
showuon opened a new pull request #11227: URL: https://github.com/apache/kafka/pull/11227 Add support for infinite range query for WindowStore. Story JIRA: https://issues.apache.org/jira/browse/KAFKA-13210 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #11225: MINOR; Small optimizations in `ReplicaManager#becomeLeaderOrFollower`
dajac commented on a change in pull request #11225: URL: https://github.com/apache/kafka/pull/11225#discussion_r691041467 ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ## @@ -1436,17 +1438,15 @@ class ReplicaManager(val config: KafkaConfig, leaderAndIsrRequest.partitionStates.forEach { partitionState => Review comment: I wonder if we really have to re-iterate over all the partition states present in the request here. Intuitively, I would have thought that considering only the ones in `partitionStates` would be sufficient. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org